A Different Take on Network Programming

In recent months, I’ve spent some time programming with Node.  Node is great because it contains zlib, openssl, a fast javascript runtime, and supports an asynchronous eventing model.  It’s fairly light weight, compact, and efficient.  Node can handle thousands of concurrent connections on basic hardware.  The community around Node is very large, and growing.  What more could you want in life?

The one thing I don’t like about node is the programming model.  This refers to the http stack primarily.  The basic model is to use callbacks, and chains of callbacks, to achieve most things.  You pay for the efficiency through constraints in the programming model.

Recently, I’ve been playing with different models of programming.  Rather than direct callbacks, I’m using queues of commands.  Queuing systems are nothing new.  The basic premise is, instead of one function calling the next function, it will place an ‘alert’ or ‘event’ or some bit of information into a queue indicating that the next action needs to occur.  Some other bit of code is watching the queue waiting to see if new bits of information show up, and once it does, it takes appropriate action.

Queuing systems are great because they allow for greater flexibility, and ease decoupling a tightly coupled system.  One are where I use this is in the core loop where my service accepts new connections.  But first, a little setup.  In my system, I assume there is a system wide scheduler already available.  In the lua context, this deals with cooperative coroutines.  The same can be achieved in other environments, as long as you have the ability to start code running, and allow other bits of code to run relatively concurrently.

 

-- Setup the runtime
-- This must happen before anything else
local SimpleDispatcher = require("SimpleDispatcher");
Runtime = {
  Scheduler = SimpleDispatcher();
}


local http_service = require "http_service"

local port = tonumber(arg[1]) or 8080

Runtime.Scheduler:Spawn(http_service, {port=port, backlog =10});
Runtime.Scheduler:Start();

So, this is just setting up the main routine, which is an http server. Nothing too special from the queuing perspective as yet. One of the reasons I wanted to explore this line of reasoning in the first place is to reduce the amount of code required to talk to something connected to an IP network. Node is great, and it runs almost everywhere. But, at runtime, it can take up about 16Mb minimum, and the on disk footprint is in the megabytes as well. In the “internet of things”, this is kind of hefty.

And so, here’s the http_server portion:

local Collections = require("Collections")

local NetStream = require "NetStream"
local HttpRequest = require "HttpRequest"
local HttpResponse = require "HTTPResponse"
local URL = require("url");
local CoSocketIo = require ("CoSocketIo")
local SocketUtils = require ("SocketUtils")
local StaticService = require("StaticService");
local StopWatch = require("StopWatch")


local Scheduler = Runtime.Scheduler;


local contentTemplate = [[

	This is the title
	
		Hello, World!
	

]]

local ServiceHandler = function(config)
  local PreamblePending = Collections.Queue.new();

  local HandleSingleRequest = function(stream, pendingqueue)
    -- try parsing the header
    -- parse the request to see what we've got
    local request, err  = HttpRequest.Parse(stream);

    if not request then
      -- dump the stream
      return 
    end

    local urlparts = URL.parse(request.Resource)
    if urlparts.path == "/echo/" then
      local response = HttpResponse.Open(stream)
      response:writeHead("204")
      response:writeEnd();
      return pendingqueue:Enqueue(stream)
    elseif urlparts.path == "/status/" then
      local response = HttpResponse.Open(stream)
      response:writeHead("200")
      response:writeEnd(contentTemplate);
      return pendingqueue:Enqueue(stream)
    end

    local filename = '.'..urlparts.path;	
    -- Send the file
    local response = HttpResponse.Open(request.DataStream);

    StaticService.SendFile(filename, response)
	
    -- recycle the stream in case a new request comes 
    -- in on it.
    pendingqueue:Enqueue(stream)
  end

  local HandlePendingRequests = function(pendingqueue)
    while true do
      local netstream = pendingqueue:Dequeue();
      
      if netstream then
        if  netstream:IsConnected() then
          Scheduler:Spawn(HandleSingleRequest, netstream, pendingqueue)
        else
          print("netstream disconnected")
        end
      end

      coroutine.yield();
    end
  end

  local HandleNewConnection = function(config, pendingqueue)
    config = config or {port=8080, backlog = 10}

    local port = config.port or 8080
    local backlog = config.backlog or 10

    local Acceptor, err = SocketUtils.CreateTcpServerSocket({port = port, backlog = backlog, nonblocking=true, nodelay=true});

    if not Acceptor then
      print("Exiting Acceptor: ", err)
      return nil, err
    end

		
    while true do
      local accepted, err = Acceptor:Accept();

      if accepted then	
        -- create a stream to wrap the raw socket
        local res, err = accepted:SetNonBlocking(true);
        res, err = accepted:SetNoDelay(true);
        res, err = accepted:SetKeepAlive(true, 10*1000, 500);
        local netstream = NetStream.new(accepted, CoSocketIo)

        pendingqueue:Enqueue(netstream);
      end

      if err and err ~= WSAEWOULDBLOCK then
        print("EXIT MAIN LOOP: ", err)
        break
      end

      coroutine.yield()
    end
  end

  Scheduler:Spawn(HandlePendingRequests, PreamblePending);
  Scheduler:Spawn(HandleNewConnection, config, PreamblePending);
end

return ServiceHandler

Starting from the bottom, you notice that ‘ServiceHandler’ is the return value. This matches up with the way in which the service is started.

Runtime.Scheduler:Spawn(http_service, {port=port, backlog =100});
Runtime.Scheduler:Start();

That is, basically spawn a fiber which will run this ‘main’ routine, and start the scheduler running. Great, now anything running within the code knows that it is running within a coroutine context.

At the top of the ServiceHandler routine, we find this:

 local PreamblePending = Collections.Queue.new();

When you’re dealing with the http protocol, the first thing you have to do is read the preamble of a request. The preamble consists of the first line, which might be something like:

GET / HTTP/1.1

This is then followed by any number of headers, finishing with a blank line.

The PreamblePending queue is the place where streams are placed when we believe their preamble is ready to be read. This occurs in two situations. The first is once a new socket is accepted by the server. The second is after we’ve dealt with one request, and we want to recycle the stream onto this queue in case there is another request that comes in on the same socket.

The first case is dealt with by the HandleNewConnection() routine. This routine is basically a continuous while loop which is polling the Accept() method of a socket which is listening on the specified port. The Accept() routine will return a valid socket, whenever there is a new connection. Otherwise it returns nil. This is a the most CPU intensive way to do things. The more efficient mechanism would be to have this performed as an asynchronous operation, whereby the process would effectively be sleeping until there was a new connection. Much lower CPU utilization. But, I’ll get to that eventually. Right now my focus is on the fundamental decoupling.

Once the new connection is accepted, it is wrapped up in a NetStream object, and placed onto the queue. This is all this routine has to deal with. I does not make any callbacks, or do anything else fancy. Just go on waiting for the next connection.

      if accepted then	
        -- create a stream to wrap the raw socket
        local res, err = accepted:SetNonBlocking(true);
        res, err = accepted:SetNoDelay(true);
        res, err = accepted:SetKeepAlive(true, 10*1000, 500);
        local netstream = NetStream.new(accepted, CoSocketIo)

        pendingqueue:Enqueue(netstream);
      end

So far so good. I like this model because each little bit is fairly simple. I can easily understand the concept of accepting a new connection and throwing that connection onto a queue to be dealt with. It’s the same as a basic factory assembly line. If I were making cars, and I’m on the part of the assembly line that places the motor, I’m not overly concerned with the part of the assembly line that puts seats in place. I just do my motor placement, and move the car along to the next station. The same applies here. Once I accept the connection, I do a little bit to condition the connection (wrapping in a stream), then I move it along to the next station by placing it into the queue.

The other routine which is running continuously is “HandlePendingRequests()”. This routine’s sole purpose in life is to pull the next stream off the queue and handle the request. When pulling from the queue, if there’s nothing on the queue, a nil will be returned. In this infinite loop, this will simply result in the coroutine.yield() being called. This is how the two routines cooperate. In Lua, coroutines are NOT preemptive threads, but rather cooperative. If a coroutine never calls ‘yield()’, then it will hog all the CPU.

So what does the new routine do once it pulls a netstream off the queue?

      if netstream then
        if  netstream:IsConnected() then
          Scheduler:Spawn(HandleSingleRequest, netstream, pendingqueue)
        else
          print("netstream disconnected")
        end
      end

First it checks to see if the stream is still connected. If it is, then it will spawn a new fiber in the scheduler. This new fiber will run the “HandleSingleRequest” routine. Notice the two parameters to that routine are the stream itself, and the pendingqueue. Rather than have the “HandleSingleRequest()” routine call a callback once it is done withe the request, it will simply place the stream back into the pendingqueue, and it will ultimately be dealt with by the HandleRequests() routine.

This little bit of indirection might seem pointless. Why couldn’t the HandleNewConnection() routine have spawned the HandleSingleRequest() directly? It could have, and I could have collapsed the responsibility of checking the pending que into the routine as well. But, with this separation, I gain flexibility in how requests can be processed, without overly complicating the architecture, and without making the code more fragile. I can easily swap out the HandleRequests() routine for something more exotic, and the HandleNewConnection() will never notice.

Finally, within the HandleSingleRequest() routine, the request is dealt with. The preamble is parsed, and the resource is returned, and that’s the end of the routine.

This is the basic skeleton of the system. The usage of queues allows for easy decoupling of concerns. Unlike in node, you aren’t forced to utilize a (request, response) callback chain. You could easily implement that style if you wanted to by composing your parts that way. But, fundamentally, the system is based on queues at the lowest level. By explicitly exposing the scheduling, and the queuing system, your system can be composed however you want, to meet your specific needs. Some polling, some notifications. This gets back to the difference between IEnumerable vs IObservable.

This is great, and I’m enjoying the freedom. You could in fact do this same thing in node, but it would be breaking with the general programming patterns that are emerging in that environment. If you want a much smaller footprint, and a more decoupled programming style, this simple queuing system might be the ticket.

Next time I’ll look at being more efficient with the lowest level socket IO so that I don’t waste so much CPU time in polling.

Advertisements


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s