Parallel Conversations

I have been tinkering with words of late. I’ve added ‘waitFor’, ‘when’ and ‘whenever’ to the TINN lexicon, and my programming is becoming easier, more organized, and easier to describe.

Recently I added another set of words: ‘waitSignal’, ‘signalOne’, and ‘signalAll’. If you were using another system, you might call these ‘events’, but really they’re just signaling.

Here’s how I might use them:

local Application = require("Application")(true)

local function waiter(num)
  num = num or 0

  local function closure()
    print(string.format("WAITED: %d",num))
  end

  return closure;
end

local function main()

  for i=1,4 do
    onSignal(waiter(i), "waiting")
  end

  -- signal only one of them
  signalOne("waiting")

  -- sleep a bit giving other tasks
  -- a chance to run
  sleep(500)

  -- signal the rest of them	
  signalAll("waiting"))
  sleep(2000)
	
  print("SLEEP AFTER signalAll")
end

run(main)

First, it spawns 4 tasks which will all be waiting on the ‘waiting’ signal. With ‘signalOne’, only 1 of the 4 waiting tasks will be awakened and continue execution. With the ‘signalAll’, the rest of the waiting tasks are all scheduled to run, and thus continue from where they left of.

The core primitive for signaling is the ‘waitSignal()’ function. This will essentially suspend the currently running task until the specified signal has been given. The ‘onSignal()’ function is a convenience which will spawn a separate task which will do the waiting and execute the specified function at the appropriate time.

If you were using another language, these might be ‘onEvent()’ and ’emit()’. Of course it’s really easy to code up that way, just create a name alias to whatever you like. You could even do:

local function on(eventName, func)
  return onSignal(func, eventName)
end

So, these are some more words that are in the parallel programming arsenal. In totality, the set is:

Time related

  • sleep
  • delay
  • periodic

Predicates

  • waitFor
  • when
  • whenever

Signaling

  • onSignal
  • signalOne
  • signalAll
  • waitSignal

Task and scheduler

  • run
  • stop
  • spawn
  • yield

With these words in hand, doing my multi-task programming is becoming easier by the moment. I don’t worry about the lower level multi-tasking primitive such as mutexes, locks, and the like. I’m not really that concerned with timers, thread pools, or any of the other machinery that makes this all happen. I just write my simple code.

One thing has struck me though. This is all great for a single threaded environment. Basically collaborative multi-tasking. In this age of super fast CPUs, it turns out that going single threaded is just fine. In many cases it’s actually better than going preemptive multi-threaded because you don’t necessarily incur as much OS thread level context switching.

I’d like to go one step further though. A long time ago we didn’t have L1/L2 caches on the CPUs, then they came along, got better, and now are fairly large. As a programmer, my concern for them is fairly minimal. I don’t do much to ensure that the caches are used properly. I just write my code, and either the compiler, or the CPU itself deals with all the mechanics. Same goes with prefetching instructions and the like. I don’t write my code to make that job any easier, it just happens automagically.

Here I am contemplating the same thing about threads. As my CPU has increasingly more cores, I am wondering if the CPU cycles should be viewed the same as we viewed levels of memory in the past. Is the a C1/C2 level of core execution. That is, most things should stay on the same thread because switching to a different core is costly. Something very low level should decide on when that switching occurs.

My application/runtime can know a lot about what I’m going to do because the thing is all laid out in script. After some analysis, perhaps I can decide “this thing is going to sleep for more than 500 ms, therefore I’m going to actually put it over here on this other thread which is actually sleeping.”. In that way, I can reduce my resource usage because I won’t be polling the timer every time through the scheduling loop, I can be smarter.

If I had such smartness, then when I have 16 – 100 cores, I’ll be able to easily take advantage of those multiple cores without having to truly understand and navigate that madness on my own. Let the machine figure it out, it’s much better at that than I am.

In the meanwhile, I’ve got these multi-tasking primitives which are making my life a lot easier when it comes to programming fairly complex systems.


Computicles – A tale of two schedulers

One of the drivers for the creation of computicles is to maximize the efficiency of the running system while minimizing the complexity for the programmer.  Herein lies the rub.  Modern computers are multi-core/multi-proc, and Lua is largely a single core sort of system.  Lua has its own notion of “light weight threads”, which are essentially cooperative processing threads.  The native OS (Windows or Linux) has a notion of “threads” which are much more heavy weight.  While the Lua threads can number easily in the thousands and more, they are not actually running a parallel, they are just rapidly context switching between each other at the whim of the programmer.  The OS threads, on the other hand, are in fact running in parallel, on multiple cores if they exist.  But, as soon as you have more threads than you have cores, the threads are shifting rapidly between each other, just like in the Lua case, but it’s ‘preemptive’ instead of cooperative.

What I want?  I want to get the best of both worlds.  But, before starting down the path of leveraging the multiple cores, I want to start with the programming paradigm.

I want to write essentially serial code.  My brain is not good at dealing things like mutexes, semaphores, barriers, or any other kind of sharing mechanisms that have been invented over the past 40 years.  I know how to write straight sequential code.  I can deal with saying “spawn” to get something running in parallel, but that’s about it.

So, in steps computicles.

I’ve gone on about the subject a few times now, but I’ve finally created the unified scheduler that I require.  It looks like this:

 

-- comp_msgpump.lua
local ffi = require("ffi");
require("IOProcessor");

-- default to 15 millisecond timeout
gIdleTimeout = gIdleTimeout or 15

local idlecount = 0;

while true do
  if IOProcessor then
    IOProcessor:step();
  end

  local msg, err = SELFICLE:getMessage(gIdleTimeout);

  if not msg then
    if err == WAIT_TIMEOUT then
      --print("about to idle")
      idlecount = idlecount + 1;
      if OnIdle then
        OnIdle(idlecount);
      end
    end
  else
    local msgFullyHandled = false;
    msg = ffi.cast("ComputicleMsg *", msg);

    if OnMessage then
      msgFullyHandled = OnMessage(msg);
    end

    if not msgFullyHandled then
      msg = ffi.cast("ComputicleMsg *", msg);
      local Message = msg.Message;
      --print("Message: ", Message, msg.Param1, msg.Param2);
		
      if Message == Computicle.Messages.QUIT then
        if OnExit then
          OnExit();
        end
        break;
      end

      if Message == Computicle.Messages.CODE then
        local len = msg.Param2;
        local codePtr = ffi.cast("const char *", msg.Param1);
		
        if codePtr ~= nil and len > 0 then
          local code = ffi.string(codePtr, len);

          SELFICLE:freeData(ffi.cast("void *",codePtr));

          local func = loadstring(code);
          func();
        end
      end
      SELFICLE:freeMessage(msg);
    end
  end
end

 
This is pretty much the same event driven loop that has existed previously. It’s main function is to get messages off its message queue, and deal with them. This is how you communicate with a computicle. Under normal circumstances, a Computicle can simply implement either OnMessage(), if it wants to only respond when it receives a message. This is a perfectly event driven way to exist. Or it can implement OnIdle() if it wants to respond to the fact that nothing else is occuring in the system. This is a great combination, and will cover many useful cases. But what about waiting for some IO to complete?

Well, at the top of this event loop there is the IOProcessor:step() call. And what is an IOProcessor?

The IOProcessor is a scheduler for cooperative Lua threads. The IOProcessor assumes the user’s code is utilizing co-routines, and will deal with putting them on a ‘sleeping’ list whenever they perform a task, such as socket IO which does not complete immediately. It’s a classic, and before the Computicles existed, this was the primary scheduler.

It’s a bit thick with code, but here it is:

local ffi = require("ffi");

local Collections = require "Collections"
local IOCPSocket = require("IOCPSocket");
local SimpleFiber = require("SimpleFiber");
local IOCompletionPort = require("IOCompletionPort");
local SocketOps = require("SocketOps");


IOProcessor = {
  fibers = Collections.Queue.new();
  coroutines = {};
  EventFibers = {};
  FibersAwaitingEvent = {};

  IOEventQueue = IOCompletionPort:create();
  MessageQuanta = 15;		-- 15 milliseconds
};


--[[
	Socket Management
--]]

IOProcessor.createClientSocket = function(self, hostname, port)
  return IOCPSocket:createClient(hostname, port, self)
end

IOProcessor.createServerSocket = function(self, params)
  return IOCPSocket:createServerSocket(params, self)
end

IOProcessor.observeSocketIO = function(self, socket)
  return self.IOEventQueue:addIoHandle(socket:getNativeHandle(), socket.SafeHandle);
end

--[[
	Fiber Handling
--]]

IOProcessor.scheduleFiber = function(self, afiber, ...)
  if not afiber then
    return nil
  end
  self.coroutines[afiber.routine] = afiber;
  self.fibers:Enqueue(afiber);	

  return afiber;
end

IOProcessor.spawn = function(self, aroutine, ...)
  return self:scheduleFiber(SimpleFiber(aroutine, ...));
end

IOProcessor.removeFiber = function(self, fiber)
  self.coroutines[fiber.routine] = nil;
end

IOProcessor.inMainFiber = function(self)
  return coroutine.running() == nil; 
end

IOProcessor.yield = function(self)
  coroutine.yield();
end

IOProcessor.yieldForIo = function(self, sock, iotype)
  -- associate a fiber with a socket
  print("yieldForIo, CurrentFiber: ", self.CurrentFiber);
	
  self.EventFibers[sock:getNativeSocket()] = self.CurrentFiber;

  -- Keep a list of fibers that are awaiting io
  if self.CurrentFiber ~= nil then
    self.FibersAwaitingEvent[self.CurrentFiber] = true;

    -- Whether we were successful or not in adding the socket
    -- to the pool, perform a yield() so the world can move on.
    self:yield();
  end
end


IOProcessor.processIOEvent = function(self, key, numbytes, overlapped)
    local ovl = ffi.cast("SocketOverlapped *", overlapped);
    local sock = ovl.sock;
    ovl.bytestransferred = numbytes;
    if sock == INVALID_SOCKET then
		return false, "invalid socket"
    end

    --print("IOProcessor.processIOEvent(): ", sock, ovl.operation);

    local fiber = self.EventFibers[sock];
    if fiber then
      self:scheduleFiber(fiber);
      self.EventFibers[sock] = nil;
      self.FibersAwaitingEvent[fiber] = nil;
    else
      print("EventScheduler_t.ProcessEventQueue(), No Fiber waiting to process.")
      -- remove the socket from the watch list
    end
end

IOProcessor.stepIOEvents = function(self)
    -- Check to see if there are any IO Events to deal with
    local key, numbytes, overlapped = self.IOEventQueue:dequeue(self.MessageQuanta);

    if key then
      self:processIOEvent(key, numbytes, overlapped);
    else
      -- typically timeout
      --print("Event Pool ERROR: ", numbytes);
    end
end

IOProcessor.stepFibers = function(self)
  -- Now check the regular fibers
  local fiber = self.fibers:Dequeue()

  -- Take care of spawning a fiber first
  if fiber then
    if fiber.status ~= "dead" then
      self.CurrentFiber = fiber;
      local result, values = fiber:Resume();
      if not result then
        print("RESUME RESULT: ", result, values)
      end
      self.CurrentFiber = nil;

      if fiber.status ~= "dead" and not self.FibersAwaitingEvent[fiber] then
        self:scheduleFiber(fiber)
      else
        --print("FIBER FINISHED")
        -- remove coroutine from dictionary
        self:removeFiber(fiber)
      end
    else
      self:removeFiber(fiber)
    end
  end
end

IOProcessor.step = function(self)
  self:stepFibers();
  self:stepIOEvents();
end

return IOProcessor

There are a couple of ways to approach this. From the perspective of the other event loop, the “step()” method here is executed once around the loop. The ‘step()’ method in turn checks on the fibers, and then on the ioevents. “stepFibers” checks the list of fibers that are ready to run, and runs one of them for a bit until it yields, and is thus placed back on the queue of fibers ready to be run, or it finishes. This is the part where a normal cooperative processing system could be brought to its knees, and a preemptive multi-tasking system would just keep going. The ‘stepIOEvents()’ function checks on the IOCompletionPort that is being used by sockets to indicate whether anything interesting has occured. If there has been any activity, the cooperative thread associated with the activity is scheduled to execute a bit of code. It does not execute immediately, but it is now on the list to be executed next time around.

The stepIOEvents() function is at the heart of any system, such as node.js, which gets high performance with IO processing, while maintaining a low CPU load. Most of the time you’re just waiting, doing nothing, and once the system indicates there is action on the socket, you can spring into action. Thus, you do not spend any time looping over sockets polling to see if there’s any activity, you’re just notified when there is.

The rest of the code is largely helpers, like creating a socket that is wired correctly and whatnot.

So, at the end of it, what does this system do?

Well, assuming I want to write a DateTimeClient, which talks to a service, gets the date and time, prints it out, etc, I would write this:

local ffi = require "ffi"
require("IOProcessor");

local daytimeport = 13

GetDateAndTime = function(hostname, port)
    hostname = hostname or "localhost";
    port = port or daytimeport;

    local socket, err = IOProcessor:createClientSocket(hostname, port);

    if not socket then
        print("Socket Creation Failed: ", err);
        return nil, err;
    end

    local bufflen = 256;
    local buff = ffi.new("char [?]", bufflen);

    local n, err = socket:receive(buff, bufflen)
 
    if not n then
        return false, err;
    end

    if n > 0 then
        return ffi.string(buff, n);
    end
end

Well, that looks like normal sequential code to me. And yes, it is. Nothing unusual. But, when running in the context of a computicle, like the following, it gets more interesting.

local Computicle = require("Computicle");

local codeTemplate = [[
require("DaytimeClient");
local dtc, err = GetDateAndTime("localhost");
print(dtc);
]]

local comp1 = Computicle:create(codeTemplate);
local comp2 = Computicle:create(codeTemplate);

comp1:waitForFinish();
comp2:waitForFinish();

This will take the otherwise sequential code of the DateTimeClient, and execute it in two parallel preemptive Operating System level threads, and wait for their completion. The magic is all hidden behind the schedulers and event loops. I never have to know about how that all happens, but I can appreciate the benefits.

Marrying the concepts of event driven, muti-process, cooperative user space threads, preemptive multi-tasking, and the like can be a daunting task. But, with a little bit of script, some chewing gum, a whistle and a prayer, it all comes together in a seamless whole which is quite easy to use, and fairly simple to understand. I think I have achieved my objectives for ease of use, maximizing efficiency, and reducing head explosions.


The Lazy Programmer Multitasks – Using IOCP to get work done

I’ve written about event driven stuff, and brushed up IO Completion Ports in the past, but I’ve never really delivered the goods on these things.  Well, I was wanting to up the scalability, and reduce the CPU utilization of my standard eventing model, so I finally bit the bullet and started down the path of using IO Completion ports.

First of all, IO Completion Ports is this mechanism that Windows uses to facilitate respectably fast asynchronous IO.  They are typically associated with file read/writes as well as sockets.  But what are they really?

At the end of the day, the IO Completion Port is not much more than a kernel managed queue.  In fact, I’ll just call it that for simplicity, because I approached using them from a direction that really has nothing at all to do with IO, and the whole “Completion Port” part of the name, while somewhat indicative of one pattern of usage, really does just get in the way of understanding a fairly simple and powerful mechanism in Windows.

So, let’s begin at the beginning.  I want to do the following:

I have some thread that is generating “work”.  The items of work are placed on a queue to be processed by some thread.  I have several threads which spend their lives simply pulling work items out of a queue, performing some action related to the work, and doing that over and over again.

Seems simple enough.  There are few pieces to solving this puzzle:

  • threads – I need a really easy way to initiate a thread.  I’d like the body of the thread to be nothing more than script.  I don’t want to know too much about the OS, I don’t want to have to deal with locking primitives.  I just want to write some code that does it’s job, and doesn’t really know much about the outside world.
  • queues – The work gets placed into a queue.  The queue must at least deal with single writer, multiple reader, or something, but really, I don’t want to have to worry about that at all.  From the writer’s perspective, I just want to do: queue:enqueue(workItem).  From the reader’s perspective, I just want: workItem = queue:dequeue();
  • memory management – I need to be able to share pieces of data across multiple threads.  I will need to have a way to indicate ownership of the data, so that it can be safely cleaned up from any thread that has access to the data.

If I have these basic pieces in place, I’m all set.  First of all, I’ll start with the worker thread and what it will be doing. First is the definition of the data that will be used to define the work that is to be done.

The code for this example can be found here: https://github.com/Wiladams/TINN/tree/master/tests

local ffi = require("ffi");
require("WTypes");

ffi.cdef[[
typedef struct {
    HWND hwnd;
    UINT message;
    WPARAM wParam;
    LPARAM lParam;
    DWORD time;
    POINT pt;
} AMSG, *PAMSG;
]]

return {
  AMSG = ffi.typeof("AMSG");

  print = function(msg)
    msg = ffi.cast("PAMSG", ffi.cast("void *",msg));

    print(string.format("== AMSG: %d", msg.message));
  end,
}

Easy enough. This is a simple data structure that contains some fields that might be useful for certain kinds of applications. The data structure is very application specific, and can be whatever is required for your application. This one is just really easy, and familiar to a Windows UI programmer.

Now that worker thread:

local workerTmpl = [==[
local ffi = require("ffi");
local core_synch = require("core_synch_l1_2_0");
local Heap = require("Heap");
local IOCompletionPort = require("IOCompletionPort");
local TINNThread = require("TINNThread");
local MSG = require("AMSG");
local random = math.random;

local QueueHandle = ffi.cast("HANDLE", ffi.cast("intptr_t", %s));
local HeapHandle = ffi.cast("HANDLE", ffi.cast("intptr_t", %s));

local iocp, err = IOCompletionPort:init(QueueHandle);
local memman, err = Heap(HeapHandle);

while true do
  local key, bytes, overlap = iocp:dequeue();

  if not key then
    break;
  end

  MSG.print(key);
  memman:free(ffi.cast("void *",key));
end
]==];

The while loop is the working end of this bit of code. Basically, it will just spin forever, trying to pull some work out of the iocp queue. The code will ‘block’ until it actually receives something from the queue, or an error. Once it gets the data from the queue, it will simply call the ‘print’ function, and that’s the end of that piece of work.

This system assumes that the receiver of the data will be responsible for freeing this bit of memory that it’s using, so memman:free() does that task.

Where does that memman object come from? Well, as can be seen in the code, it is an instance of the Heap object, constructed using a handle that is passed in as a string value, and turned into a “HANDLE”. The same is true of the IOCompletionPort.

This is a neat little trick. Basically, the parent thread constructs these objects that will be shared with the sibling threads, and passes a pointer to them. The objects know how to construct themselves from nothing more than this pointer value, and away you go!

So, how about the main thread then?

-- test_workerthreads.lua

local ffi = require("ffi");
local core_synch = require("core_synch_l1_2_0");
local IOCompletionPort = require("IOCompletionPort");
local TINNThread = require("TINNThread");
local AMSG = require("AMSG");
local Heap = require("Heap");

local main = function()
  local memman = Heap:create();
  local threads = {};

  local iocp, err = IOCompletionPort();

  if not iocp then
    return false, err;
  end

  -- launch worker threads
  local iocphandle = TINNThread:CreatePointerString(iocp:getNativeHandle());
  local heaphandle = TINNThread:CreatePointerString(memman:getNativeHandle());
  local codechunk = string.format(workerTmpl, iocphandle, heaphandle);

  for i=1,4 do
    local worker, err = TINNThread({CodeChunk = codechunk});
    table.insert(threads, worker);
  end

  -- continuously put 'work' items into queue
  local numWorkItems = 1000;
  local counter = 0;
  local sleepInterval = 50;
  local workSize = ffi.sizeof("AMSG");

  while true do
    for i = 1,numWorkItems do
      local newWork = memman:alloc(workSize);
      ffi.cast("AMSG *",newWork).message = (counter*numWorkItems)+i;
      iocp:enqueue(newWork, workSize);
    end

    collectgarbage();
    counter = counter + 1;

    -- The worker threads should continue to get work done
    -- while we sleep for a little bit
    core_synch.Sleep(sleepInterval);
  end
end

main();

The most interesting part here might be the creation of the codechunk. A simple string.format() is used to replace the string parameters in the template with the string values for the heap handle, and the io completion port handle.

Once the codechunk is created, a thread is created that uses the codechunk as its body. This thread will start automatically, so nothing needs to be done beyond simply constructing it. In this case, 4 threads are created. This matches the number of virtual cores I have on my machine, so there’s one thread per core. This is a good number as any more would be wasteful in this particular case.

Then there’s simply a loop that constantly creates work items and places them on the queue. Notice how the work items are constructed from the heap, and then never freed. Remember, back in the worker thread, the workItem is cleaned up using memman:free(). You must do something like this because although you could use the ffi.new() to construct the bits of memory, you can’t guarantee the lifetime of such a thing across the call to enqueue. Lua might try to free up the allocated memory before the worker actually has time to do anything with it. Doing allocation using the Heap gives you full control of the lifetime of that bit of memory.

Well, there you have it. Basically a very simple work sharing multi-threaded program in LuaJIT with a minimal amount of fuss. Of course, this works out fairly well because Windows already has the IO Completion Port (queue), so it’s mainly just gluing things together correctly.

Of course, you don’t have to do it this way, true to Windows, there’s a thousand different ways you could spin up threads and deal with work sharing. One easy alternative is so called Thread Pooling. I must say though, using IOCP is a much easier and less intensive mechanism. The challenge with thread pooling is that when a piece of work becomes ready, all the threads might try to jump on the queue to get at it. This might cause a lot of context switching in the kernel, which is a performance killer. With IO Completion Port, only one of the threads will be awakened, and that’s that. Not a lot of thrashing.

This is now half way to using IO Completion Port for low CPU utilization network server. The other half is to add in the Windows Sockets, and event scheduler. Well, I already have a handy event scheduler, and the Socket class is functioning quite well.

On the way to doing this, I discovered the joys of Registered IO in Windows 8 (RIO), which is even faster for networking, but I’ll save that for another iteration.

In the meanwhile, if you want to do the lazy programmers multi-threaded programming, using IO Completion Ports is the way to go. This Lua code makes it so brain dead simple, I feel like using threads just for the heck of it now.