This tutorial is a continuation of the Using the Frame-based and Streaming APIs tutorial. It assumes you have read the latter.
The tutorial demonstrates how to write WebSocket clients/servers that don't overwhelm the receiving side when sending data to quickly.
More importantly, it demonstrates how to do this without inventing or introducing any kind of application-level flow-control, by using a concept built into Twisted: producers and consumers.
You can read up about Twisted producers and consumers here: Producers and Consumers: Efficient High-Volume Streaming
You can find all the code for this tutorial in the source code repository.
For this tutorial you will need
A guide how to setup AutobahnPython can be found in the getting started.
You may have noticed that in all three variants shown in the Using the Frame-based and Streaming APIs tutorial the generation of new data sent by the client was coupled to the reception of a digest computed by the server. This effectively established a kind of application-level flow control.
If we had sent data just as fast as we could generate it, then server would not have been able to keep up, and more and more data would have been buffered on the client. Since our client/server pair was intended to run forever, an ever-increasing memory consumption on the client would be unacceptable.
This application-level flow control works, but is not only ad-hoc and inconvenient, it also shows another undesirable effect. The client will only start to generate and send new data when it receives a digest message. However, at that point, the server has already run out of work.
You can see the effect by looking at CPU consumption for client and server. Both will be uneven.
We will now write a fourth variant of the streaming client variant of the streaming tutorial which does a better job at balancing this. No changes to the streaming server are necessary.
The streaming producer client is a variant of the streaming client. It does not rely any longer upon receiving digests to trigger sending of new data, but relies on Twisted start and stop producing:
from ranstring import randomByteString
from zope.interface import implements
from twisted.internet import reactor, interfaces
from autobahn.websocket import WebSocketProtocol, \
WebSocketClientFactory, \
WebSocketClientProtocol, \
connectWS
# 2^63 - This is the maximum imposed by the WS protocol
FRAME_SIZE = 0x7FFFFFFFFFFFFFFF
class RandomByteStreamProducer:
"""
A Twisted Push Producer generating a stream of random octets sending out data
in a WebSockets message frame.
"""
implements(interfaces.IPushProducer)
def __init__(self, proto):
self.proto = proto
self.started = False
self.paused = False
def pauseProducing(self):
self.paused = True
def resumeProducing(self):
self.paused = False
if not self.started:
self.proto.beginMessage(opcode = WebSocketProtocol.MESSAGE_TYPE_BINARY)
self.proto.beginMessageFrame(FRAME_SIZE)
self.started = True
while not self.paused:
data = randomByteString(1024)
if self.proto.sendMessageFrameData(data) <= 0:
self.proto.beginMessageFrame(FRAME_SIZE)
print "new frame started!"
def stopProducing(self):
pass
class StreamingProducerHashClientProtocol(WebSocketClientProtocol):
"""
Streaming WebSockets client that generates stream of random octets
sent to streaming WebSockets server, which computes a running SHA-256,
which it will send every BATCH_SIZE octets back to us. This example
uses a Twisted producer to produce the byte stream as fast as the
receiver can consume, but not faster. Therefor, we don't need the
application-level flow control as with the other examples.
"""
def onOpen(self):
self.count = 0
producer = RandomByteStreamProducer(self)
self.registerProducer(producer, True)
producer.resumeProducing()
def onMessage(self, message, binary):
print "Digest for batch %d computed by server: %s" % (self.count, message)
self.count += 1
if __name__ == '__main__':
factory = WebSocketClientFactory("ws://localhost:9000")
factory.protocol = StreamingProducerHashClientProtocol
connectWS(factory)
reactor.run()
The client will produce and send new data in a loop (lines 36-40) as long as self.paused is False. Hence, this flag controls producing, and the flag in turn is set and reset within pauseProducing and resumeProducing.
The methods pauseProducing and resumeProducing are called by Twisted in response to the receiving side (the server) being able to consume more data or being still busy consuming data.
Twisted will detect that based on TCP backpressure, and signal this to the application by calling the appropriate hooks. By overriding the hooks, an application can then implement flow-control.
When you run this client against the streaming server, you will notice that CPU load now remains even (non-bursty) all the time. The client will produce and send data exactly as fast as the server is able to consume, process and send back data.
We have used the Producer-Consumer pattern here together with streaming data processing. The pattern can of course also be used with frame- or message-based processing.
Autobahn WebSocket technology is brought to you by
Tavendo, WAMP and "Autobahn WebSocket" are trademarks of Tavendo GmbH. All other trademarks are those of their respective entities.
Copyright © 2011-2013, Tavendo GmbH. Content licensed under Creative Commons CC-BY-SA and code licensed under Apache 2.0.