Name That Framework – Echo Service in two lines of code

… and here they are:

SocketServer = require("SocketServer");
SocketServer(9090):run(function(s, b, l)  s:send(b, l); end);

Back in the day there was this gameshow called “Name That Tune”, where contestants would be told a clue about a song, then they would bid on the fewest number of notes it would take for them to name the tune. Once the bids were fixed, the orchestra would play the number of notes, and the contestant would have to correctly guess the name of the tune.

So, above are two lines of code which implement a highly scalable “echo” service. Can you name the framework?

It’s TINN of course!

Here’s a more reasonable rendition of the same:

local SocketServer = require("SocketServer");

local function OnData(socket, buff, bufflen)
  socket:send(buff, bufflen);
end;

local server = SocketServer(9090);
server:run(OnData)

Simply put, a SocketServer is a generic service that will listen on a particular port that you specify. Whenever it receives any data on the port, it will call the supplied ‘OnData’ function. Each time ‘OnData’ is called, it could be with a different socket and data. You could build a fairly rudimentary http server on top of this if you like. what’s most important to me is the fact that you don’t have to write any of the underlying low level networking code. Nothing about accept, IO Completion ports, etc. Just, call me when some data comes in on this specified port.

The SocketServer code itself looks like this:

local ffi = require("ffi");
local IOProcessor = require("IOProcessor");
local IOCPSocket = require("IOCPSocket");

IOProcessor:setMessageQuanta(nil);

SocketServer = {}
setmetatable(SocketServer, {
  __call = function(self, ...)
    return self:create(...);
  end,
});

SocketServer_mt = {
  __index = SocketServer;
}

SocketServer.init = function(self, socket, datafunc)
--print("SocketServer.init: ", socket, datafunc)
  local obj = {
    ServerSocket = socket;
    OnData = datafunc;
  };

  setmetatable(obj, SocketServer_mt);

  return obj;
end

SocketServer.create = function(self, port, datafunc)
  port = port or 9090;

  local socket, err = IOProcessor:createServerSocket({port = port, backlog = 15});

  if not socket then
    print("Server Socket not created!!")
    return nil, err
  end

  return self:init(socket, datafunc);
end

-- The primary application loop
SocketServer.loop = function(self)
  local bufflen = 1500;
  local buff = ffi.new("uint8_t[?]", bufflen);

  while true do
    local sock, err = self.ServerSocket:accept();

    if sock then
      local socket = IOCPSocket:init(sock, IOProcessor);
      local bytesread, err = socket:receive(buff, bufflen);

      if not bytesread then
        print("RECEIVE ERROR: ", err);
      elseif self.OnData ~= nil then
        self.OnData(socket, buff, bytesread);
      else
        socket:closeDown();
        socket = nil
      end
    else
       print("Accept ERROR: ", err);
    end

    collectgarbage();
  end
end

SocketServer.run = function(self, datafunc)
  if datafunc then
    self.OnData = datafunc;
  end

  IOProcessor:spawn(self.loop, self));
  IOProcessor:run();
end

return SocketServer;

This basic server loop is good for a lot of little tiny tasks where you just need to put a listener on the front of something. No massive scaleout, not multi-threading, just good straight forward stuff. But, it’s already plumbed to go big too.

Here’s a slight modification:

SocketServer.handleAccepted = function(self, sock)
  local handleNewSocket = function()
    local bufflen = 1500;
    local buff = ffi.new("uint8_t[?]", bufflen);
    
    local socket = IOCPSocket:init(sock, IOProcessor);

    if self.OnAccepted then
    else
      local bytesread, err = socket:receive(buff, bufflen);
  
      if not bytesread then
        print("RECEIVE ERROR: ", err);
      elseif self.OnData ~= nil then
        self.OnData(socket, buff, bytesread);
      else
        socket:closeDown();
        socket = nil
      end
    end
  end

  return IOProcessor:spawn(handleNewSocket);
end

-- The primary application loop
SocketServer.loop = function(self)

  while true do
    local sock, err = self.ServerSocket:accept();

    if sock then
      self:handleAccepted(sock);
    else
       print("Accept ERROR: ", err);
    end

    collectgarbage();
  end
end

In the main loop, instead of doing the processing directly, call the ‘self:handleAccepted()’ function. That function in turn will spawn an internal function to actually handle the request. Everything else remains the same.

If you do it this way, then the ‘OnData’ will run cooperatively with other accepts that might be going on. Also, this highlights, in an invisible way, that the ‘accept()’ call is actually cooperative. Meaning, since IO completion ports are being used int he background, the accept call is actually async. As soon as it issues the accept, that coroutine will wait in place until another socket comes in. Meanwhile, the last socket that was being handled will get some time slice to do what it wants.

And thus, you get massive scale (thousands of potential connections) from using this fairly simple code.

Well, those are the basics. Now that I have plumbed TINN from the ground up to utilize the IO Completion Ports, I can start to build upon that. There are a couple of nice benefits to marrying IOCP and Lua coroutines. I’ll be exploring this some more, but it’s basically a match made in heaven.


The Lazy Programmer Multitasks – Using IOCP to get work done

I’ve written about event driven stuff, and brushed up IO Completion Ports in the past, but I’ve never really delivered the goods on these things.  Well, I was wanting to up the scalability, and reduce the CPU utilization of my standard eventing model, so I finally bit the bullet and started down the path of using IO Completion ports.

First of all, IO Completion Ports is this mechanism that Windows uses to facilitate respectably fast asynchronous IO.  They are typically associated with file read/writes as well as sockets.  But what are they really?

At the end of the day, the IO Completion Port is not much more than a kernel managed queue.  In fact, I’ll just call it that for simplicity, because I approached using them from a direction that really has nothing at all to do with IO, and the whole “Completion Port” part of the name, while somewhat indicative of one pattern of usage, really does just get in the way of understanding a fairly simple and powerful mechanism in Windows.

So, let’s begin at the beginning.  I want to do the following:

I have some thread that is generating “work”.  The items of work are placed on a queue to be processed by some thread.  I have several threads which spend their lives simply pulling work items out of a queue, performing some action related to the work, and doing that over and over again.

Seems simple enough.  There are few pieces to solving this puzzle:

  • threads – I need a really easy way to initiate a thread.  I’d like the body of the thread to be nothing more than script.  I don’t want to know too much about the OS, I don’t want to have to deal with locking primitives.  I just want to write some code that does it’s job, and doesn’t really know much about the outside world.
  • queues – The work gets placed into a queue.  The queue must at least deal with single writer, multiple reader, or something, but really, I don’t want to have to worry about that at all.  From the writer’s perspective, I just want to do: queue:enqueue(workItem).  From the reader’s perspective, I just want: workItem = queue:dequeue();
  • memory management – I need to be able to share pieces of data across multiple threads.  I will need to have a way to indicate ownership of the data, so that it can be safely cleaned up from any thread that has access to the data.

If I have these basic pieces in place, I’m all set.  First of all, I’ll start with the worker thread and what it will be doing. First is the definition of the data that will be used to define the work that is to be done.

The code for this example can be found here: https://github.com/Wiladams/TINN/tree/master/tests

local ffi = require("ffi");
require("WTypes");

ffi.cdef[[
typedef struct {
    HWND hwnd;
    UINT message;
    WPARAM wParam;
    LPARAM lParam;
    DWORD time;
    POINT pt;
} AMSG, *PAMSG;
]]

return {
  AMSG = ffi.typeof("AMSG");

  print = function(msg)
    msg = ffi.cast("PAMSG", ffi.cast("void *",msg));

    print(string.format("== AMSG: %d", msg.message));
  end,
}

Easy enough. This is a simple data structure that contains some fields that might be useful for certain kinds of applications. The data structure is very application specific, and can be whatever is required for your application. This one is just really easy, and familiar to a Windows UI programmer.

Now that worker thread:

local workerTmpl = [==[
local ffi = require("ffi");
local core_synch = require("core_synch_l1_2_0");
local Heap = require("Heap");
local IOCompletionPort = require("IOCompletionPort");
local TINNThread = require("TINNThread");
local MSG = require("AMSG");
local random = math.random;

local QueueHandle = ffi.cast("HANDLE", ffi.cast("intptr_t", %s));
local HeapHandle = ffi.cast("HANDLE", ffi.cast("intptr_t", %s));

local iocp, err = IOCompletionPort:init(QueueHandle);
local memman, err = Heap(HeapHandle);

while true do
  local key, bytes, overlap = iocp:dequeue();

  if not key then
    break;
  end

  MSG.print(key);
  memman:free(ffi.cast("void *",key));
end
]==];

The while loop is the working end of this bit of code. Basically, it will just spin forever, trying to pull some work out of the iocp queue. The code will ‘block’ until it actually receives something from the queue, or an error. Once it gets the data from the queue, it will simply call the ‘print’ function, and that’s the end of that piece of work.

This system assumes that the receiver of the data will be responsible for freeing this bit of memory that it’s using, so memman:free() does that task.

Where does that memman object come from? Well, as can be seen in the code, it is an instance of the Heap object, constructed using a handle that is passed in as a string value, and turned into a “HANDLE”. The same is true of the IOCompletionPort.

This is a neat little trick. Basically, the parent thread constructs these objects that will be shared with the sibling threads, and passes a pointer to them. The objects know how to construct themselves from nothing more than this pointer value, and away you go!

So, how about the main thread then?

-- test_workerthreads.lua

local ffi = require("ffi");
local core_synch = require("core_synch_l1_2_0");
local IOCompletionPort = require("IOCompletionPort");
local TINNThread = require("TINNThread");
local AMSG = require("AMSG");
local Heap = require("Heap");

local main = function()
  local memman = Heap:create();
  local threads = {};

  local iocp, err = IOCompletionPort();

  if not iocp then
    return false, err;
  end

  -- launch worker threads
  local iocphandle = TINNThread:CreatePointerString(iocp:getNativeHandle());
  local heaphandle = TINNThread:CreatePointerString(memman:getNativeHandle());
  local codechunk = string.format(workerTmpl, iocphandle, heaphandle);

  for i=1,4 do
    local worker, err = TINNThread({CodeChunk = codechunk});
    table.insert(threads, worker);
  end

  -- continuously put 'work' items into queue
  local numWorkItems = 1000;
  local counter = 0;
  local sleepInterval = 50;
  local workSize = ffi.sizeof("AMSG");

  while true do
    for i = 1,numWorkItems do
      local newWork = memman:alloc(workSize);
      ffi.cast("AMSG *",newWork).message = (counter*numWorkItems)+i;
      iocp:enqueue(newWork, workSize);
    end

    collectgarbage();
    counter = counter + 1;

    -- The worker threads should continue to get work done
    -- while we sleep for a little bit
    core_synch.Sleep(sleepInterval);
  end
end

main();

The most interesting part here might be the creation of the codechunk. A simple string.format() is used to replace the string parameters in the template with the string values for the heap handle, and the io completion port handle.

Once the codechunk is created, a thread is created that uses the codechunk as its body. This thread will start automatically, so nothing needs to be done beyond simply constructing it. In this case, 4 threads are created. This matches the number of virtual cores I have on my machine, so there’s one thread per core. This is a good number as any more would be wasteful in this particular case.

Then there’s simply a loop that constantly creates work items and places them on the queue. Notice how the work items are constructed from the heap, and then never freed. Remember, back in the worker thread, the workItem is cleaned up using memman:free(). You must do something like this because although you could use the ffi.new() to construct the bits of memory, you can’t guarantee the lifetime of such a thing across the call to enqueue. Lua might try to free up the allocated memory before the worker actually has time to do anything with it. Doing allocation using the Heap gives you full control of the lifetime of that bit of memory.

Well, there you have it. Basically a very simple work sharing multi-threaded program in LuaJIT with a minimal amount of fuss. Of course, this works out fairly well because Windows already has the IO Completion Port (queue), so it’s mainly just gluing things together correctly.

Of course, you don’t have to do it this way, true to Windows, there’s a thousand different ways you could spin up threads and deal with work sharing. One easy alternative is so called Thread Pooling. I must say though, using IOCP is a much easier and less intensive mechanism. The challenge with thread pooling is that when a piece of work becomes ready, all the threads might try to jump on the queue to get at it. This might cause a lot of context switching in the kernel, which is a performance killer. With IO Completion Port, only one of the threads will be awakened, and that’s that. Not a lot of thrashing.

This is now half way to using IO Completion Port for low CPU utilization network server. The other half is to add in the Windows Sockets, and event scheduler. Well, I already have a handy event scheduler, and the Socket class is functioning quite well.

On the way to doing this, I discovered the joys of Registered IO in Windows 8 (RIO), which is even faster for networking, but I’ll save that for another iteration.

In the meanwhile, if you want to do the lazy programmers multi-threaded programming, using IO Completion Ports is the way to go. This Lua code makes it so brain dead simple, I feel like using threads just for the heck of it now.