AutobahnPython
Overview Get Started Tutorials Reference Downloads Get in touch

Flow Control with the Producer/Consumer Pattern

This tutorial is a continuation of the Using the Frame-based and Streaming APIs tutorial. It assumes you have read the latter.

The tutorial demonstrates how to write WebSocket clients/servers that don't overwhelm the receiving side when sending data to quickly.

More importantly, it demonstrates how to do this without inventing or introducing any kind of application-level flow-control, by using a concept built into Twisted: producers and consumers.

You can read up about Twisted producers and consumers here: Producers and Consumers: Efficient High-Volume Streaming

You can find all the code for this tutorial in the source code repository.

Prerequisites

For this tutorial you will need

  • AutobahnPython installed

A guide how to setup AutobahnPython can be found in the getting started.


Introduction

You may have noticed that in all three variants shown in the Using the Frame-based and Streaming APIs tutorial the generation of new data sent by the client was coupled to the reception of a digest computed by the server. This effectively established a kind of application-level flow control.

If we had sent data just as fast as we could generate it, then server would not have been able to keep up, and more and more data would have been buffered on the client. Since our client/server pair was intended to run forever, an ever-increasing memory consumption on the client would be unacceptable.

This application-level flow control works, but is not only ad-hoc and inconvenient, it also shows another undesirable effect. The client will only start to generate and send new data when it receives a digest message. However, at that point, the server has already run out of work.

You can see the effect by looking at CPU consumption for client and server. Both will be uneven.

We will now write a fourth variant of the streaming client variant of the streaming tutorial which does a better job at balancing this. No changes to the streaming server are necessary.

Streaming Producer Client

The streaming producer client is a variant of the streaming client. It does not rely any longer upon receiving digests to trigger sending of new data, but relies on Twisted start and stop producing:

Streaming Producer Client

from ranstring import randomByteString
from zope.interface import implements
from twisted.internet import reactor, interfaces
from autobahn.websocket import WebSocketProtocol, \
                               WebSocketClientFactory, \
                               WebSocketClientProtocol, \
                               connectWS

# 2^63 - This is the maximum imposed by the WS protocol
FRAME_SIZE = 0x7FFFFFFFFFFFFFFF


class RandomByteStreamProducer:
   """
   A Twisted Push Producer generating a stream of random octets sending out data
   in a WebSockets message frame.
   """
   implements(interfaces.IPushProducer)

   def __init__(self, proto):
      self.proto = proto
      self.started = False
      self.paused = False

   def pauseProducing(self):
      self.paused = True

   def resumeProducing(self):
      self.paused = False

      if not self.started:
         self.proto.beginMessage(opcode = WebSocketProtocol.MESSAGE_TYPE_BINARY)
         self.proto.beginMessageFrame(FRAME_SIZE)
         self.started = True

      while not self.paused:
         data = randomByteString(1024)
         if self.proto.sendMessageFrameData(data) <= 0:
            self.proto.beginMessageFrame(FRAME_SIZE)
            print "new frame started!"

   def stopProducing(self):
      pass


class StreamingProducerHashClientProtocol(WebSocketClientProtocol):
   """
   Streaming WebSockets client that generates stream of random octets
   sent to streaming WebSockets server, which computes a running SHA-256,
   which it will send every BATCH_SIZE octets back to us. This example
   uses a Twisted producer to produce the byte stream as fast as the
   receiver can consume, but not faster. Therefor, we don't need the
   application-level flow control as with the other examples.
   """

   def onOpen(self):
      self.count = 0
      producer = RandomByteStreamProducer(self)
      self.registerProducer(producer, True)
      producer.resumeProducing()

   def onMessage(self, message, binary):
      print "Digest for batch %d computed by server: %s" % (self.count, message)
      self.count += 1


if __name__ == '__main__':

   factory = WebSocketClientFactory("ws://localhost:9000")
   factory.protocol = StreamingProducerHashClientProtocol
   connectWS(factory)
   reactor.run()
         

The client will produce and send new data in a loop (lines 36-40) as long as self.paused is False. Hence, this flag controls producing, and the flag in turn is set and reset within pauseProducing and resumeProducing.

The methods pauseProducing and resumeProducing are called by Twisted in response to the receiving side (the server) being able to consume more data or being still busy consuming data.

Twisted will detect that based on TCP backpressure, and signal this to the application by calling the appropriate hooks. By overriding the hooks, an application can then implement flow-control.

When you run this client against the streaming server, you will notice that CPU load now remains even (non-bursty) all the time. The client will produce and send data exactly as fast as the server is able to consume, process and send back data.

Perspective

We have used the Producer-Consumer pattern here together with streaming data processing. The pattern can of course also be used with frame- or message-based processing.

Autobahn WebSocket technology is brought to you by

Tavendo, WAMP and "Autobahn WebSocket" are trademarks of Tavendo GmbH. All other trademarks are those of their respective entities.

Copyright © 2011-2013, Tavendo GmbH. Content licensed under Creative Commons CC-BY-SA and code licensed under Apache 2.0.