AutobahnPython
Overview Get Started Tutorials Reference Downloads Get in touch

Using the Frame-based and Streaming APIs

This tutorial shows three variants of the same functionality: a WebSocket client producing random bytes, and a WebSocket server computing a SHA-256 over those bytes and sending back the computed digests.

We show how to do

  • message-based processing,
  • frame-based processing and
  • streaming processing

The examples demonstrate the use of the advanced API of AutobahnPython for frame-based/streaming processing and the differences to the basic API for message-based processing.

You can find all the code for this tutorial in the source code repository.

Prerequisites

For this tutorial you will need

  • AutobahnPython installed

A guide how to setup AutobahnPython can be found in the getting started.

Introduction

A WebSocket message consists of a potentially unlimited number of fragments ("message frames"), which can each have a payload between 0 and 2^63 octets.

AutobahnPython's basic API is message-based. This API is convenient to use and when you produce/consume messages of limited size, the basic API should be all you need and is the way to go.

However, due to the nature of a message-based API, the underlying implementation has to buffer all data received for a message frame, and buffer all frames received for the message, and only when the message finally ends, flatten all buffered data and fires the onMessage method.

Consequently, when you need to process messages consisting of a large number of message fragments, or you need to process messages that contain message fragments of large size, this buffering will result in excessive memory consumption.

In these cases, you might want to process message fragments on a per frame basis, or you may even want to process incoming data as it arrives (streaming). This is where the advanced API of AutobahnPython comes into play.

The advanced API provides you with all the necessary methods and callbacks to program WebSocket clients and servers using frame-based processing or even fully streaming processing - both sending and receiving.

For this tutorial, we implement the functionality of a client sending random bytes to a server, and the server computing a SHA-256 over the received bytes and sending back digests. This functionality is implemented in three different variants, for message-base, frame-based and streaming processing.

To create random byte strings of varying length, we use a little helper function:

Random Byte String Generator

            import random, struct

            def randomByteString(len):
               """
               Generate a string of random bytes.
               """
               return ''.join([struct.pack("!Q",
                               random.getrandbits(64)) for x in
                                  xrange(0, len / 8 + int(len % 8 > 0))])[:len]
         

Message-based Processing

The message-based client is a straight-forward application of the basic API:

Message-based Client

from ranstring import randomByteString
from twisted.internet import reactor
from autobahn.websocket import WebSocketClientFactory, \
                               WebSocketClientProtocol, \
                               connectWS

MESSAGE_SIZE = 1 * 2**20


class MessageBasedHashClientProtocol(WebSocketClientProtocol):
   """
   Message-based WebSockets client that generates stream of random octets
   sent to WebSockets server as a sequence of messages. The server will
   respond to us with the SHA-256 computed over each message. When
   we receive response, we repeat by sending a new message.
   """

   def sendOneMessage(self):
      data = randomByteString(MESSAGE_SIZE)
      self.sendMessage(data, binary = True)

   def onOpen(self):
      self.count = 0
      self.sendOneMessage()

   def onMessage(self, message, binary):
      print "Digest for message %d computed by server: %s" \
            % (self.count, message)
      self.count += 1
      self.sendOneMessage()


if __name__ == '__main__':

   factory = WebSocketClientFactory("ws://localhost:9000")
   factory.protocol = MessageBasedHashClientProtocol
   connectWS(factory)
   reactor.run()
         

We send out a message of MESSAGE_SIZE random bytes with sendMessage (line 20) as soon as the WebSocket opening handshake with the server has been completed in onOpen (line 22).

When we receive a message with a computed digest (line 26), we just print the digest and send another message (line 30).

The message-based server is even simpler:

Message-based Server

import hashlib
from twisted.internet import reactor
from autobahn.websocket import WebSocketServerFactory, \
                               WebSocketServerProtocol, \
                               listenWS


class MessageBasedHashServerProtocol(WebSocketServerProtocol):
   """
   Message-based WebSockets server that computes a SHA-256 for every
   message it receives and sends back the computed digest.
   """

   def onMessage(self, message, binary):
      sha256 = hashlib.sha256()
      sha256.update(message)
      digest = sha256.hexdigest()
      self.sendMessage(digest)
      print "Sent digest for message: %s" % digest


if __name__ == '__main__':
   factory = WebSocketServerFactory("ws://localhost:9000")
   factory.protocol = MessageBasedHashServerProtocol
   listenWS(factory)
   reactor.run()
         

When the server receives a message from the client (line 14), it computes the digest over the complete message payload (line 15 - 17), and sends back the digest as a Hex string (line 18).


Frame-based Processing

The frame-based client uses methods and callbacks from the advanced API for frame-based processing:

Frame-based Client

from ranstring import randomByteString
from twisted.internet import reactor
from autobahn.websocket import WebSocketProtocol, \
                               WebSocketClientFactory, \
                               WebSocketClientProtocol, \
                               connectWS

FRAME_SIZE = 1 * 2**20


class FrameBasedHashClientProtocol(WebSocketClientProtocol):
   """
   Message-based WebSockets client that generates stream of random octets
   sent to WebSockets server as a sequence of frames all in one message.
   The server will respond to us with the SHA-256 computed over frames.
   When we receive response, we repeat by sending a new frame.
   """

   def sendOneFrame(self):
      data = randomByteString(FRAME_SIZE)
      self.sendMessageFrame(data)

   def onOpen(self):
      self.count = 0
      self.beginMessage(opcode = WebSocketProtocol.MESSAGE_TYPE_BINARY)
      self.sendOneFrame()

   def onMessage(self, message, binary):
      print "Digest for frame %d computed by server: %s" \
            % (self.count, message)
      self.count += 1
      self.sendOneFrame()


if __name__ == '__main__':

   factory = WebSocketClientFactory("ws://localhost:9000")
   factory.protocol = FrameBasedHashClientProtocol
   connectWS(factory)
   reactor.run()
         

The frame-based client explicitly starts a new message (line 25) beginMessage within onOpen, which is called after the WebSocket handshake has been completed.

It then sends each FRAME_SIZE number of random bytes as a new frame within this this single message via sendMessageFrame (line 21).

When we receive a digest from the server (line 25), we send another frame (line 28) within the same single message that was already begun.

Since there is no stop condition, we never end the message (which could be done using endMessage), but send an infinite sequence of frames (message fragments) within that single message started.

The frame-based server now no longer overrides onMessage, but the frame-based hooks onMessageBegin, onMessageFrame and onMessageEnd:

Frame-based Server

import hashlib
from twisted.internet import reactor
from autobahn.websocket import WebSocketServerFactory, \
                               WebSocketServerProtocol, \
                               listenWS


class FrameBasedHashServerProtocol(WebSocketServerProtocol):
   """
   Frame-based WebSockets server that computes a running SHA-256 for message
   data received. It will respond after every frame received with the digest
   computed up to that point. It can receive messages of unlimited number
   of frames. Digest is reset upon new message.
   """

   def onMessageBegin(self, opcode):
      WebSocketServerProtocol.onMessageBegin(self, opcode)
      self.sha256 = hashlib.sha256()

   def onMessageFrame(self, payload, reserved):
      data = ''.join(payload)
      self.sha256.update(data)
      digest = self.sha256.hexdigest()
      self.sendMessage(digest)
      print "Sent digest for frame: %s" % digest

   def onMessageEnd(self):
      pass


if __name__ == '__main__':
   factory = WebSocketServerFactory("ws://localhost:9000")
   factory.protocol = FrameBasedHashServerProtocol
   listenWS(factory)
   reactor.run()
         

Note that we must override onMessageEnd, even when we don't do anything here, since otherwise the default implementation of the basic API will still be active (and would try to assemble frames into a message to fire onMessage, which we don't want it to do or try to do).


Streaming Processing

With the message-based client/server, we sent an infinite sequence of messages and got back one digest per message. With the frame-based client/server, we started one message, sent an infinite sequence of frames, and got back one digest per message frame.

With a streaming client, we don't rely on message/frame structuring, but will start a single message, then a single frame, and then send data within that frame. We want to get back a digest for every BATCH_SIZE bytes.

Streaming Client

from ranstring import randomByteString
from zope.interface import implements
from twisted.internet import reactor, interfaces
from autobahn.websocket import WebSocketProtocol, \
                               WebSocketClientFactory, \
                               WebSocketClientProtocol, \
                               connectWS

BATCH_SIZE = 1 * 2**20


class StreamingHashClientProtocol(WebSocketClientProtocol):
   """
   Streaming WebSockets client that generates stream of random octets
   sent to WebSockets server as a sequence of batches in one frame, in
   one message. The server computes a running SHA-256, which it will send
   every BATCH_SIZE octets back to us. When we receive a response, we
   repeat by sending another batch of data.
   """

   def sendOneBatch(self):
      data = randomByteString(BATCH_SIZE)

      # Note, that this could complete the frame, when the frame length is
      # reached. Since the frame length here is 2^63, we don't bother, since
      # it'll take _very_ long to reach that.
      self.sendMessageFrameData(data)

   def onOpen(self):
      self.count = 0
      self.beginMessage(opcode = WebSocketProtocol.MESSAGE_TYPE_BINARY)
      # 2^63 - This is the maximum imposed by the WS protocol
      self.beginMessageFrame(0x7FFFFFFFFFFFFFFF)
      self.sendOneBatch()

   def onMessage(self, message, binary):
      print "Digest for batch %d computed by server: %s" \
            % (self.count, message)
      self.count += 1
      self.sendOneBatch()


if __name__ == '__main__':

   factory = WebSocketClientFactory("ws://localhost:9000")
   factory.protocol = StreamingHashClientProtocol
   connectWS(factory)
   reactor.run()
         

The client again in onOpen begins a new message (line 27). However, now we also start a new frame immediately (line 28). We specify the maximum frame length allowed by the WebSocket protocol, which is 2^63. This is practically infinite. No setup in the foreseeable future will be able to exhaust that.

Random bytes are sent as the payload of that single frame (line 23).

Note that again we never end the message which could be done using endMessage), and that there is no endMessageFrame method, since the size of the message frame was already specified when the frame was begun. The frame is finished as soon as all data has been written via sendMessageFrameData.

The server now overrides even more methods, since it wants to process frame payload data as it arrives:

Streaming Server

import hashlib
from twisted.internet import reactor
from autobahn.websocket import WebSocketServerFactory, \
                               WebSocketServerProtocol, \
                               listenWS

from streaming_client import BATCH_SIZE


class StreamingHashServerProtocol(WebSocketServerProtocol):
   """
   Streaming WebSockets server that computes a running SHA-256 for data
   received. It will respond every BATCH_SIZE bytes with the digest
   up to that point. It can receive messages of unlimited number of frames
   and frames of unlimited length (actually, up to 2^63, which is the
   WebSockets protocol imposed limit on frame size). Digest is reset upon
   new message.
   """

   def onMessageBegin(self, opcode):
      WebSocketServerProtocol.onMessageBegin(self, opcode)
      self.sha256 = hashlib.sha256()
      self.count = 0
      self.received = 0
      self.next = BATCH_SIZE

   def onMessageFrameBegin(self, length, reserved):
      WebSocketServerProtocol.onMessageFrameBegin(self, length, reserved)

   def onMessageFrameData(self, data):
      length = len(data)
      self.received += length

      ## when the data received exceeds the next BATCH_SIZE ..
      if self.received >= self.next:

         ## update digest up to batch size
         rest = length - (self.received - self.next)
         self.sha256.update(data[:rest])

         ## send digest
         digest = self.sha256.hexdigest()
         self.sendMessage(digest)
         print "Sent digest for batch %d : %s" % (self.count, digest)

         ## advance to next batch
         self.next += BATCH_SIZE
         self.count += 1

         ## .. and update the digest for the rest
         self.sha256.update(data[rest:])
      else:
         ## otherwise we just update the digest for received data
         self.sha256.update(data)

   def onMessageFrameEnd(self):
      pass

   def onMessageEnd(self):
      pass


if __name__ == '__main__':
   factory = WebSocketServerFactory("ws://localhost:9000")
   factory.protocol = StreamingHashServerProtocol
   listenWS(factory)
   reactor.run()
         

Since the server now also no longer relies on message or frame boundaries, it needs to know the BATCH_SIZE (line 4) it is expected to compute digests for.

All the processing now happens in onMessageFrameData, which it overrides. This hook is called by AutobahnPython when new payload data for a started frame arrives.


Outlook

There are aspects with all three variants related to flow-control that can be improved. The Flow control with the Producer-Consumer Pattern tutorial shows how to manage these.

Autobahn WebSocket technology is brought to you by

Tavendo, WAMP and "Autobahn WebSocket" are trademarks of Tavendo GmbH. All other trademarks are those of their respective entities.

Copyright © 2011-2013, Tavendo GmbH. Content licensed under Creative Commons CC-BY-SA and code licensed under Apache 2.0.