This tutorial shows three variants of the same functionality: a WebSocket client producing random bytes, and a WebSocket server computing a SHA-256 over those bytes and sending back the computed digests.
We show how to do
The examples demonstrate the use of the advanced API of AutobahnPython for frame-based/streaming processing and the differences to the basic API for message-based processing.
You can find all the code for this tutorial in the source code repository.
For this tutorial you will need
A guide how to setup AutobahnPython can be found in the getting started.
A WebSocket message consists of a potentially unlimited number of fragments ("message frames"), which can each have a payload between 0 and 2^63 octets.
AutobahnPython's basic API is message-based. This API is convenient to use and when you produce/consume messages of limited size, the basic API should be all you need and is the way to go.
However, due to the nature of a message-based API, the underlying implementation has to buffer all data received for a message frame, and buffer all frames received for the message, and only when the message finally ends, flatten all buffered data and fires the onMessage method.
Consequently, when you need to process messages consisting of a large number of message fragments, or you need to process messages that contain message fragments of large size, this buffering will result in excessive memory consumption.
In these cases, you might want to process message fragments on a per frame basis, or you may even want to process incoming data as it arrives (streaming). This is where the advanced API of AutobahnPython comes into play.
The advanced API provides you with all the necessary methods and callbacks to program WebSocket clients and servers using frame-based processing or even fully streaming processing - both sending and receiving.
For this tutorial, we implement the functionality of a client sending random bytes to a server, and the server computing a SHA-256 over the received bytes and sending back digests. This functionality is implemented in three different variants, for message-base, frame-based and streaming processing.
To create random byte strings of varying length, we use a little helper function:
import random, struct
def randomByteString(len):
"""
Generate a string of random bytes.
"""
return ''.join([struct.pack("!Q",
random.getrandbits(64)) for x in
xrange(0, len / 8 + int(len % 8 > 0))])[:len]
The message-based client is a straight-forward application of the basic API:
from ranstring import randomByteString
from twisted.internet import reactor
from autobahn.websocket import WebSocketClientFactory, \
WebSocketClientProtocol, \
connectWS
MESSAGE_SIZE = 1 * 2**20
class MessageBasedHashClientProtocol(WebSocketClientProtocol):
"""
Message-based WebSockets client that generates stream of random octets
sent to WebSockets server as a sequence of messages. The server will
respond to us with the SHA-256 computed over each message. When
we receive response, we repeat by sending a new message.
"""
def sendOneMessage(self):
data = randomByteString(MESSAGE_SIZE)
self.sendMessage(data, binary = True)
def onOpen(self):
self.count = 0
self.sendOneMessage()
def onMessage(self, message, binary):
print "Digest for message %d computed by server: %s" \
% (self.count, message)
self.count += 1
self.sendOneMessage()
if __name__ == '__main__':
factory = WebSocketClientFactory("ws://localhost:9000")
factory.protocol = MessageBasedHashClientProtocol
connectWS(factory)
reactor.run()
We send out a message of MESSAGE_SIZE random bytes with sendMessage (line 20) as soon as the WebSocket opening handshake with the server has been completed in onOpen (line 22).
When we receive a message with a computed digest (line 26), we just print the digest and send another message (line 30).
The message-based server is even simpler:
import hashlib
from twisted.internet import reactor
from autobahn.websocket import WebSocketServerFactory, \
WebSocketServerProtocol, \
listenWS
class MessageBasedHashServerProtocol(WebSocketServerProtocol):
"""
Message-based WebSockets server that computes a SHA-256 for every
message it receives and sends back the computed digest.
"""
def onMessage(self, message, binary):
sha256 = hashlib.sha256()
sha256.update(message)
digest = sha256.hexdigest()
self.sendMessage(digest)
print "Sent digest for message: %s" % digest
if __name__ == '__main__':
factory = WebSocketServerFactory("ws://localhost:9000")
factory.protocol = MessageBasedHashServerProtocol
listenWS(factory)
reactor.run()
When the server receives a message from the client (line 14), it computes the digest over the complete message payload (line 15 - 17), and sends back the digest as a Hex string (line 18).
The frame-based client uses methods and callbacks from the advanced API for frame-based processing:
from ranstring import randomByteString
from twisted.internet import reactor
from autobahn.websocket import WebSocketProtocol, \
WebSocketClientFactory, \
WebSocketClientProtocol, \
connectWS
FRAME_SIZE = 1 * 2**20
class FrameBasedHashClientProtocol(WebSocketClientProtocol):
"""
Message-based WebSockets client that generates stream of random octets
sent to WebSockets server as a sequence of frames all in one message.
The server will respond to us with the SHA-256 computed over frames.
When we receive response, we repeat by sending a new frame.
"""
def sendOneFrame(self):
data = randomByteString(FRAME_SIZE)
self.sendMessageFrame(data)
def onOpen(self):
self.count = 0
self.beginMessage(opcode = WebSocketProtocol.MESSAGE_TYPE_BINARY)
self.sendOneFrame()
def onMessage(self, message, binary):
print "Digest for frame %d computed by server: %s" \
% (self.count, message)
self.count += 1
self.sendOneFrame()
if __name__ == '__main__':
factory = WebSocketClientFactory("ws://localhost:9000")
factory.protocol = FrameBasedHashClientProtocol
connectWS(factory)
reactor.run()
The frame-based client explicitly starts a new message (line 25) beginMessage within onOpen, which is called after the WebSocket handshake has been completed.
It then sends each FRAME_SIZE number of random bytes as a new frame within this this single message via sendMessageFrame (line 21).
When we receive a digest from the server (line 25), we send another frame (line 28) within the same single message that was already begun.
Since there is no stop condition, we never end the message (which could be done using endMessage), but send an infinite sequence of frames (message fragments) within that single message started.
The frame-based server now no longer overrides onMessage, but the frame-based hooks onMessageBegin, onMessageFrame and onMessageEnd:
import hashlib
from twisted.internet import reactor
from autobahn.websocket import WebSocketServerFactory, \
WebSocketServerProtocol, \
listenWS
class FrameBasedHashServerProtocol(WebSocketServerProtocol):
"""
Frame-based WebSockets server that computes a running SHA-256 for message
data received. It will respond after every frame received with the digest
computed up to that point. It can receive messages of unlimited number
of frames. Digest is reset upon new message.
"""
def onMessageBegin(self, opcode):
WebSocketServerProtocol.onMessageBegin(self, opcode)
self.sha256 = hashlib.sha256()
def onMessageFrame(self, payload, reserved):
data = ''.join(payload)
self.sha256.update(data)
digest = self.sha256.hexdigest()
self.sendMessage(digest)
print "Sent digest for frame: %s" % digest
def onMessageEnd(self):
pass
if __name__ == '__main__':
factory = WebSocketServerFactory("ws://localhost:9000")
factory.protocol = FrameBasedHashServerProtocol
listenWS(factory)
reactor.run()
Note that we must override onMessageEnd, even when we don't do anything here, since otherwise the default implementation of the basic API will still be active (and would try to assemble frames into a message to fire onMessage, which we don't want it to do or try to do).
With the message-based client/server, we sent an infinite sequence of messages and got back one digest per message. With the frame-based client/server, we started one message, sent an infinite sequence of frames, and got back one digest per message frame.
With a streaming client, we don't rely on message/frame structuring, but will start a single message, then a single frame, and then send data within that frame. We want to get back a digest for every BATCH_SIZE bytes.
from ranstring import randomByteString
from zope.interface import implements
from twisted.internet import reactor, interfaces
from autobahn.websocket import WebSocketProtocol, \
WebSocketClientFactory, \
WebSocketClientProtocol, \
connectWS
BATCH_SIZE = 1 * 2**20
class StreamingHashClientProtocol(WebSocketClientProtocol):
"""
Streaming WebSockets client that generates stream of random octets
sent to WebSockets server as a sequence of batches in one frame, in
one message. The server computes a running SHA-256, which it will send
every BATCH_SIZE octets back to us. When we receive a response, we
repeat by sending another batch of data.
"""
def sendOneBatch(self):
data = randomByteString(BATCH_SIZE)
# Note, that this could complete the frame, when the frame length is
# reached. Since the frame length here is 2^63, we don't bother, since
# it'll take _very_ long to reach that.
self.sendMessageFrameData(data)
def onOpen(self):
self.count = 0
self.beginMessage(opcode = WebSocketProtocol.MESSAGE_TYPE_BINARY)
# 2^63 - This is the maximum imposed by the WS protocol
self.beginMessageFrame(0x7FFFFFFFFFFFFFFF)
self.sendOneBatch()
def onMessage(self, message, binary):
print "Digest for batch %d computed by server: %s" \
% (self.count, message)
self.count += 1
self.sendOneBatch()
if __name__ == '__main__':
factory = WebSocketClientFactory("ws://localhost:9000")
factory.protocol = StreamingHashClientProtocol
connectWS(factory)
reactor.run()
The client again in onOpen begins a new message (line 27). However, now we also start a new frame immediately (line 28). We specify the maximum frame length allowed by the WebSocket protocol, which is 2^63. This is practically infinite. No setup in the foreseeable future will be able to exhaust that.
Random bytes are sent as the payload of that single frame (line 23).
Note that again we never end the message which could be done using endMessage), and that there is no endMessageFrame method, since the size of the message frame was already specified when the frame was begun. The frame is finished as soon as all data has been written via sendMessageFrameData.
The server now overrides even more methods, since it wants to process frame payload data as it arrives:
import hashlib
from twisted.internet import reactor
from autobahn.websocket import WebSocketServerFactory, \
WebSocketServerProtocol, \
listenWS
from streaming_client import BATCH_SIZE
class StreamingHashServerProtocol(WebSocketServerProtocol):
"""
Streaming WebSockets server that computes a running SHA-256 for data
received. It will respond every BATCH_SIZE bytes with the digest
up to that point. It can receive messages of unlimited number of frames
and frames of unlimited length (actually, up to 2^63, which is the
WebSockets protocol imposed limit on frame size). Digest is reset upon
new message.
"""
def onMessageBegin(self, opcode):
WebSocketServerProtocol.onMessageBegin(self, opcode)
self.sha256 = hashlib.sha256()
self.count = 0
self.received = 0
self.next = BATCH_SIZE
def onMessageFrameBegin(self, length, reserved):
WebSocketServerProtocol.onMessageFrameBegin(self, length, reserved)
def onMessageFrameData(self, data):
length = len(data)
self.received += length
## when the data received exceeds the next BATCH_SIZE ..
if self.received >= self.next:
## update digest up to batch size
rest = length - (self.received - self.next)
self.sha256.update(data[:rest])
## send digest
digest = self.sha256.hexdigest()
self.sendMessage(digest)
print "Sent digest for batch %d : %s" % (self.count, digest)
## advance to next batch
self.next += BATCH_SIZE
self.count += 1
## .. and update the digest for the rest
self.sha256.update(data[rest:])
else:
## otherwise we just update the digest for received data
self.sha256.update(data)
def onMessageFrameEnd(self):
pass
def onMessageEnd(self):
pass
if __name__ == '__main__':
factory = WebSocketServerFactory("ws://localhost:9000")
factory.protocol = StreamingHashServerProtocol
listenWS(factory)
reactor.run()
Since the server now also no longer relies on message or frame boundaries, it needs to know the BATCH_SIZE (line 4) it is expected to compute digests for.
All the processing now happens in onMessageFrameData, which it overrides. This hook is called by AutobahnPython when new payload data for a started frame arrives.
There are aspects with all three variants related to flow-control that can be improved. The Flow control with the Producer-Consumer Pattern tutorial shows how to manage these.
Autobahn WebSocket technology is brought to you by
Tavendo, WAMP and "Autobahn WebSocket" are trademarks of Tavendo GmbH. All other trademarks are those of their respective entities.
Copyright © 2011-2013, Tavendo GmbH. Content licensed under Creative Commons CC-BY-SA and code licensed under Apache 2.0.