Browser Based Push Notifications with Mongrel2 and EventSource

One of the interesting things about Mongrel2 is its ability to send output to multiple clients with a single handler message. This has a lot of potential for push applications and while I was investigating Mongrel2 a new version of iOS came out that included changes to Safari. While looking at the list of Safari changes in iOS 4.2 I noticed something called EventSource and went to investigate what it was.

As it turns out EventSource is a newer way of doing browser push currently supported by Chrome, Opera and Safari (mobile Safari as well). There is a good HTML5Rocks post on Server-Sent Events that goes into more detail on the differences of using it over something like WebSockets. One of the differences is that EventSource specially addresses mobile device use with the ability to do a "Connectionless push" through a proxy so the end device can sleep but still receive push notifications.

Before reading on check out my example Mongrel2 ruby handler post if you haven't already. The following examples will be based on the code from that post. I'm also going to use Modernizr to detect support for EventSource so check out my post on using Modernizr to detect browser support as well.

The first thing to do is configure Mongrel2. The following configuration will tell Mongrel2 to serve static content from a directory named "html" and connect the /esupdates path to a ruby handler we will write later (also note that it listens on more than one host, make sure you put your IP in place of the example IP):

root_dir = Dir(base='html/', index_file='index.html', default_ctype='text/plain')

esupdates = Handler(send_spec='tcp://127.0.0.1:9999', send_ident='54c6755b-9628-40a4-9a2d-cc82a816345e',
                    recv_spec='tcp://127.0.0.1:9998', recv_ident='')
 
routes = {
    '/esupdates': esupdates,
    '/': root_dir
}
 
main = Server(
    uuid="2f62bd5-9e59-49cd-993c-3b6013c28f05",
    access_log="/logs/access.log",
    error_log="/logs/error.log",
    chroot="./",
    pid_file="/run/mongrel2.pid",
    default_host="localhost",
    name="main",
    port=6767,
    hosts=[ Host(name="192.168.1.1", routes=routes)
            Host(name="localhost", routes=routes) ]
)
 
settings = {"zeromq.threads": 1}
 
servers = [main]

The next step is to create the page that will set up the EventSource connection and display any push messages. In this example I'm using a modified version of Modernizr to detect EventSource support but you could do the test by hand or assume the user has support. The current version of Modernizr doesn't support EventSource yet but you can grab my fork of Modernizr where I have added it.

<!doctype html>
<html style="no-js">
  <head>
    <title>EventSource Test</title>

    <style type="text/css" media="screen">
      div.esno, div.esyes { display: none }
      .no-eventsource div.esno { display: block }
      .eventsource div.esyes { display: block }
    </style>

    <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.4.4/jquery.min.js" type="text/javascript"></script>
    <script src="modernizr-min.js" type="text/javascript"></script>
    <script type="text/javascript">
      function startUpdates()
      {
        var source = new EventSource('/esupdates');
        source.onmessage = function (event) 
        {
          $("#events").innerHTML += event.data + '<br/>';
        };

        source.onerror = function (event) 
        {
          $("#events").innerHTML += 'error<br/>';
        };

        source.onopen = function (event) 
        {
           $("#events").innerHTML += 'open<br/>';
        };
      }

      $(document).ready(function()
      { 
        if (Modernizr.eventsource)
        {
          setTimeout(startUpdates, 10); 
        } 
      });
    </script>
  </head>
  <body>

    <div class="esno">
      Your browser does not support EventSource.
    </div>

    <div class="esyes">
      Messages:<br/>
      <div id="events"></div>
    </div>

  </body>
</html>

The above creates the EventSource connection to the /esupdates URL and when any event comes in it appends that event to the div with the id of "events". This and the Modernizr javascript need to go in the "html" directory.

And last comes the handler code. I have modified the ruby handler example from my previous post by adding a new ZMQ queue called "eventsource_queue" as well as a beacon thread that pushes a message to the "eventsource_queue" every 5 seconds. Messages coming in on the "eventsource_queue" are sent to every connected client:

require 'zmq'
require 'json'
 
handler_thread = Thread.new do
  handler_ctx = ZMQ::Context.new(1)
 
  receive_queue = handler_ctx.socket(ZMQ::PULL)
  receive_queue.connect("tcp://127.0.0.1:9999")
 
  response_publisher = handler_ctx.socket(ZMQ::PUB)
  response_publisher.connect("tcp://127.0.0.1:9998")
  response_publisher.setsockopt(ZMQ::IDENTITY, "82209006-86FF-4982-B5EA-D1E29E55D481")
 
  stop_queue = handler_ctx.socket(ZMQ::PULL)
  stop_queue.connect("ipc://shutdown_queue")
 
  eventsource_queue = handler_ctx.socket(ZMQ::PULL)
  eventsource_queue.connect("ipc://eventsource_queue")

  connections = []
 
  stopped = false
  until stopped do
    selected_queue = ZMQ.select([receive_queue, stop_queue, eventsource_queue])
    if selected_queue[0][0] == stop_queue # Anything on the stop_queue ends processing
      stop_queue.close
      receive_queue.close
      eventsource_queue.close
      response_publisher.close
      handler_ctx.close
      stopped = true
    elsif selected_queue[0][0] == eventsource_queue # A message that goes out to all clients
      next if connections.size == 0 

      # The following must be the send_ident value used in the Handler config file entry
      sender_uuid = '54c6755b-9628-40a4-9a2d-cc82a816345e' 

      es_message = eventsource_queue.recv(0)

      # Note: This can only handle 128 connections without modification
      client_str = connections.join(' ')

      response_value = "#{sender_uuid} #{client_str.length}:#{client_str}, #{es_message}"
      response_publisher.send(response_value, 0)
    else
      # Request comes in as "UUID ID PATH SIZE:HEADERS,SIZE:BODY,"
      sender_uuid, client_id, request_path, request_message = receive_queue.recv(0).split(' ', 4)
      len, rest = request_message.split(':', 2)
      headers = JSON.parse(rest[0...len.to_i])
      len, rest = rest[(len.to_i+1)..-1].split(':', 2)
      body = rest[0...len.to_i]

      if headers['METHOD'] == 'JSON' and JSON.parse(body)['type'] == 'disconnect'
        connections.delete(client_id) # A client has disconnected, remove them from the list
        next
      end
 
      # Respond with the opening EventSource information
      connections << client_id

      headers = {}
      headers['Content-Type'] = 'text/event-stream'
      headers['Expires'] = 'Fri, 01 Jan 1990 00:00:00 GMT'
      headers['Cache-Control'] = 'no-cache, no-store, max-age=0, must-revalidate'
      headers['Pragma'] = 'no-cache'
      headers_s = headers.map{|k, v| "%s: %s" % [k,v]}.join("\r\n")

      content_body = ": time stream\nretry: 5000\n\n";

      response_value = "#{sender_uuid} #{client_id.to_s.length}:#{client_id}, HTTP/1.1 200 OK\r\n#{headers_s}\r\n\r\n#{content_body}"
      response_publisher.send(response_value, 0)
    end
  end
end

# This thread sends out a message every 5 seconds to all connected clients
beacon_thread = Thread.new do
  beacon_ctx = ZMQ::Context.new(1)
  eventsource_queue = beacon_ctx.socket(ZMQ::PUSH)
  eventsource_queue.bind("ipc://eventsource_queue")

  index = 1
  loop do
    eventsource_queue.send("id: #{index}\r\ndata: Timestamp #{Time.now}\r\n\r\n")
    sleep(5)
    index = index + 1
  end
end
 
ctx = ZMQ::Context.new(1)
stop_push_queue = ctx.socket(ZMQ::PUSH)
trap('INT') do # Send a message to shutdown on SIGINT
  stop_push_queue.bind("ipc://shutdown_queue")
  stop_push_queue.send("shutdown")
end

handler_thread.join

beacon_thread.kill
beacon_thread.join
 
stop_push_queue.close

There is a limitation in the above of 128 client connections at one time. This is something that could be fixed by splitting the set of connections into subsets of 128 each and sending each subset a message. I left that out to try to reduce the complexity of this example.

Although this example shows a single event going out to every client there is no need to limit it to just that. You could easily do the same thing for subsets of the full set of clients.

As a next step it might be interesting to see how well this setup could handle the C10k problem or even the C500k problem.

Leave a Reply

Your email address will not be published. Required fields are marked *