The Lazy Programmer Multitasks – Using IOCP to get work done

I’ve written about event driven stuff, and brushed up IO Completion Ports in the past, but I’ve never really delivered the goods on these things.  Well, I was wanting to up the scalability, and reduce the CPU utilization of my standard eventing model, so I finally bit the bullet and started down the path of using IO Completion ports.

First of all, IO Completion Ports is this mechanism that Windows uses to facilitate respectably fast asynchronous IO.  They are typically associated with file read/writes as well as sockets.  But what are they really?

At the end of the day, the IO Completion Port is not much more than a kernel managed queue.  In fact, I’ll just call it that for simplicity, because I approached using them from a direction that really has nothing at all to do with IO, and the whole “Completion Port” part of the name, while somewhat indicative of one pattern of usage, really does just get in the way of understanding a fairly simple and powerful mechanism in Windows.

So, let’s begin at the beginning.  I want to do the following:

I have some thread that is generating “work”.  The items of work are placed on a queue to be processed by some thread.  I have several threads which spend their lives simply pulling work items out of a queue, performing some action related to the work, and doing that over and over again.

Seems simple enough.  There are few pieces to solving this puzzle:

  • threads – I need a really easy way to initiate a thread.  I’d like the body of the thread to be nothing more than script.  I don’t want to know too much about the OS, I don’t want to have to deal with locking primitives.  I just want to write some code that does it’s job, and doesn’t really know much about the outside world.
  • queues – The work gets placed into a queue.  The queue must at least deal with single writer, multiple reader, or something, but really, I don’t want to have to worry about that at all.  From the writer’s perspective, I just want to do: queue:enqueue(workItem).  From the reader’s perspective, I just want: workItem = queue:dequeue();
  • memory management – I need to be able to share pieces of data across multiple threads.  I will need to have a way to indicate ownership of the data, so that it can be safely cleaned up from any thread that has access to the data.

If I have these basic pieces in place, I’m all set.  First of all, I’ll start with the worker thread and what it will be doing. First is the definition of the data that will be used to define the work that is to be done.

The code for this example can be found here: https://github.com/Wiladams/TINN/tree/master/tests

local ffi = require("ffi");
require("WTypes");

ffi.cdef[[
typedef struct {
    HWND hwnd;
    UINT message;
    WPARAM wParam;
    LPARAM lParam;
    DWORD time;
    POINT pt;
} AMSG, *PAMSG;
]]

return {
  AMSG = ffi.typeof("AMSG");

  print = function(msg)
    msg = ffi.cast("PAMSG", ffi.cast("void *",msg));

    print(string.format("== AMSG: %d", msg.message));
  end,
}

Easy enough. This is a simple data structure that contains some fields that might be useful for certain kinds of applications. The data structure is very application specific, and can be whatever is required for your application. This one is just really easy, and familiar to a Windows UI programmer.

Now that worker thread:

local workerTmpl = [==[
local ffi = require("ffi");
local core_synch = require("core_synch_l1_2_0");
local Heap = require("Heap");
local IOCompletionPort = require("IOCompletionPort");
local TINNThread = require("TINNThread");
local MSG = require("AMSG");
local random = math.random;

local QueueHandle = ffi.cast("HANDLE", ffi.cast("intptr_t", %s));
local HeapHandle = ffi.cast("HANDLE", ffi.cast("intptr_t", %s));

local iocp, err = IOCompletionPort:init(QueueHandle);
local memman, err = Heap(HeapHandle);

while true do
  local key, bytes, overlap = iocp:dequeue();

  if not key then
    break;
  end

  MSG.print(key);
  memman:free(ffi.cast("void *",key));
end
]==];

The while loop is the working end of this bit of code. Basically, it will just spin forever, trying to pull some work out of the iocp queue. The code will ‘block’ until it actually receives something from the queue, or an error. Once it gets the data from the queue, it will simply call the ‘print’ function, and that’s the end of that piece of work.

This system assumes that the receiver of the data will be responsible for freeing this bit of memory that it’s using, so memman:free() does that task.

Where does that memman object come from? Well, as can be seen in the code, it is an instance of the Heap object, constructed using a handle that is passed in as a string value, and turned into a “HANDLE”. The same is true of the IOCompletionPort.

This is a neat little trick. Basically, the parent thread constructs these objects that will be shared with the sibling threads, and passes a pointer to them. The objects know how to construct themselves from nothing more than this pointer value, and away you go!

So, how about the main thread then?

-- test_workerthreads.lua

local ffi = require("ffi");
local core_synch = require("core_synch_l1_2_0");
local IOCompletionPort = require("IOCompletionPort");
local TINNThread = require("TINNThread");
local AMSG = require("AMSG");
local Heap = require("Heap");

local main = function()
  local memman = Heap:create();
  local threads = {};

  local iocp, err = IOCompletionPort();

  if not iocp then
    return false, err;
  end

  -- launch worker threads
  local iocphandle = TINNThread:CreatePointerString(iocp:getNativeHandle());
  local heaphandle = TINNThread:CreatePointerString(memman:getNativeHandle());
  local codechunk = string.format(workerTmpl, iocphandle, heaphandle);

  for i=1,4 do
    local worker, err = TINNThread({CodeChunk = codechunk});
    table.insert(threads, worker);
  end

  -- continuously put 'work' items into queue
  local numWorkItems = 1000;
  local counter = 0;
  local sleepInterval = 50;
  local workSize = ffi.sizeof("AMSG");

  while true do
    for i = 1,numWorkItems do
      local newWork = memman:alloc(workSize);
      ffi.cast("AMSG *",newWork).message = (counter*numWorkItems)+i;
      iocp:enqueue(newWork, workSize);
    end

    collectgarbage();
    counter = counter + 1;

    -- The worker threads should continue to get work done
    -- while we sleep for a little bit
    core_synch.Sleep(sleepInterval);
  end
end

main();

The most interesting part here might be the creation of the codechunk. A simple string.format() is used to replace the string parameters in the template with the string values for the heap handle, and the io completion port handle.

Once the codechunk is created, a thread is created that uses the codechunk as its body. This thread will start automatically, so nothing needs to be done beyond simply constructing it. In this case, 4 threads are created. This matches the number of virtual cores I have on my machine, so there’s one thread per core. This is a good number as any more would be wasteful in this particular case.

Then there’s simply a loop that constantly creates work items and places them on the queue. Notice how the work items are constructed from the heap, and then never freed. Remember, back in the worker thread, the workItem is cleaned up using memman:free(). You must do something like this because although you could use the ffi.new() to construct the bits of memory, you can’t guarantee the lifetime of such a thing across the call to enqueue. Lua might try to free up the allocated memory before the worker actually has time to do anything with it. Doing allocation using the Heap gives you full control of the lifetime of that bit of memory.

Well, there you have it. Basically a very simple work sharing multi-threaded program in LuaJIT with a minimal amount of fuss. Of course, this works out fairly well because Windows already has the IO Completion Port (queue), so it’s mainly just gluing things together correctly.

Of course, you don’t have to do it this way, true to Windows, there’s a thousand different ways you could spin up threads and deal with work sharing. One easy alternative is so called Thread Pooling. I must say though, using IOCP is a much easier and less intensive mechanism. The challenge with thread pooling is that when a piece of work becomes ready, all the threads might try to jump on the queue to get at it. This might cause a lot of context switching in the kernel, which is a performance killer. With IO Completion Port, only one of the threads will be awakened, and that’s that. Not a lot of thrashing.

This is now half way to using IO Completion Port for low CPU utilization network server. The other half is to add in the Windows Sockets, and event scheduler. Well, I already have a handy event scheduler, and the Socket class is functioning quite well.

On the way to doing this, I discovered the joys of Registered IO in Windows 8 (RIO), which is even faster for networking, but I’ll save that for another iteration.

In the meanwhile, if you want to do the lazy programmers multi-threaded programming, using IO Completion Ports is the way to go. This Lua code makes it so brain dead simple, I feel like using threads just for the heck of it now.

Advertisements


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s