Computicles – A tale of two schedulers

One of the drivers for the creation of computicles is to maximize the efficiency of the running system while minimizing the complexity for the programmer.  Herein lies the rub.  Modern computers are multi-core/multi-proc, and Lua is largely a single core sort of system.  Lua has its own notion of “light weight threads”, which are essentially cooperative processing threads.  The native OS (Windows or Linux) has a notion of “threads” which are much more heavy weight.  While the Lua threads can number easily in the thousands and more, they are not actually running a parallel, they are just rapidly context switching between each other at the whim of the programmer.  The OS threads, on the other hand, are in fact running in parallel, on multiple cores if they exist.  But, as soon as you have more threads than you have cores, the threads are shifting rapidly between each other, just like in the Lua case, but it’s ‘preemptive’ instead of cooperative.

What I want?  I want to get the best of both worlds.  But, before starting down the path of leveraging the multiple cores, I want to start with the programming paradigm.

I want to write essentially serial code.  My brain is not good at dealing things like mutexes, semaphores, barriers, or any other kind of sharing mechanisms that have been invented over the past 40 years.  I know how to write straight sequential code.  I can deal with saying “spawn” to get something running in parallel, but that’s about it.

So, in steps computicles.

I’ve gone on about the subject a few times now, but I’ve finally created the unified scheduler that I require.  It looks like this:

 

-- comp_msgpump.lua
local ffi = require("ffi");
require("IOProcessor");

-- default to 15 millisecond timeout
gIdleTimeout = gIdleTimeout or 15

local idlecount = 0;

while true do
  if IOProcessor then
    IOProcessor:step();
  end

  local msg, err = SELFICLE:getMessage(gIdleTimeout);

  if not msg then
    if err == WAIT_TIMEOUT then
      --print("about to idle")
      idlecount = idlecount + 1;
      if OnIdle then
        OnIdle(idlecount);
      end
    end
  else
    local msgFullyHandled = false;
    msg = ffi.cast("ComputicleMsg *", msg);

    if OnMessage then
      msgFullyHandled = OnMessage(msg);
    end

    if not msgFullyHandled then
      msg = ffi.cast("ComputicleMsg *", msg);
      local Message = msg.Message;
      --print("Message: ", Message, msg.Param1, msg.Param2);
		
      if Message == Computicle.Messages.QUIT then
        if OnExit then
          OnExit();
        end
        break;
      end

      if Message == Computicle.Messages.CODE then
        local len = msg.Param2;
        local codePtr = ffi.cast("const char *", msg.Param1);
		
        if codePtr ~= nil and len > 0 then
          local code = ffi.string(codePtr, len);

          SELFICLE:freeData(ffi.cast("void *",codePtr));

          local func = loadstring(code);
          func();
        end
      end
      SELFICLE:freeMessage(msg);
    end
  end
end

 
This is pretty much the same event driven loop that has existed previously. It’s main function is to get messages off its message queue, and deal with them. This is how you communicate with a computicle. Under normal circumstances, a Computicle can simply implement either OnMessage(), if it wants to only respond when it receives a message. This is a perfectly event driven way to exist. Or it can implement OnIdle() if it wants to respond to the fact that nothing else is occuring in the system. This is a great combination, and will cover many useful cases. But what about waiting for some IO to complete?

Well, at the top of this event loop there is the IOProcessor:step() call. And what is an IOProcessor?

The IOProcessor is a scheduler for cooperative Lua threads. The IOProcessor assumes the user’s code is utilizing co-routines, and will deal with putting them on a ‘sleeping’ list whenever they perform a task, such as socket IO which does not complete immediately. It’s a classic, and before the Computicles existed, this was the primary scheduler.

It’s a bit thick with code, but here it is:

local ffi = require("ffi");

local Collections = require "Collections"
local IOCPSocket = require("IOCPSocket");
local SimpleFiber = require("SimpleFiber");
local IOCompletionPort = require("IOCompletionPort");
local SocketOps = require("SocketOps");


IOProcessor = {
  fibers = Collections.Queue.new();
  coroutines = {};
  EventFibers = {};
  FibersAwaitingEvent = {};

  IOEventQueue = IOCompletionPort:create();
  MessageQuanta = 15;		-- 15 milliseconds
};


--[[
	Socket Management
--]]

IOProcessor.createClientSocket = function(self, hostname, port)
  return IOCPSocket:createClient(hostname, port, self)
end

IOProcessor.createServerSocket = function(self, params)
  return IOCPSocket:createServerSocket(params, self)
end

IOProcessor.observeSocketIO = function(self, socket)
  return self.IOEventQueue:addIoHandle(socket:getNativeHandle(), socket.SafeHandle);
end

--[[
	Fiber Handling
--]]

IOProcessor.scheduleFiber = function(self, afiber, ...)
  if not afiber then
    return nil
  end
  self.coroutines[afiber.routine] = afiber;
  self.fibers:Enqueue(afiber);	

  return afiber;
end

IOProcessor.spawn = function(self, aroutine, ...)
  return self:scheduleFiber(SimpleFiber(aroutine, ...));
end

IOProcessor.removeFiber = function(self, fiber)
  self.coroutines[fiber.routine] = nil;
end

IOProcessor.inMainFiber = function(self)
  return coroutine.running() == nil; 
end

IOProcessor.yield = function(self)
  coroutine.yield();
end

IOProcessor.yieldForIo = function(self, sock, iotype)
  -- associate a fiber with a socket
  print("yieldForIo, CurrentFiber: ", self.CurrentFiber);
	
  self.EventFibers[sock:getNativeSocket()] = self.CurrentFiber;

  -- Keep a list of fibers that are awaiting io
  if self.CurrentFiber ~= nil then
    self.FibersAwaitingEvent[self.CurrentFiber] = true;

    -- Whether we were successful or not in adding the socket
    -- to the pool, perform a yield() so the world can move on.
    self:yield();
  end
end


IOProcessor.processIOEvent = function(self, key, numbytes, overlapped)
    local ovl = ffi.cast("SocketOverlapped *", overlapped);
    local sock = ovl.sock;
    ovl.bytestransferred = numbytes;
    if sock == INVALID_SOCKET then
		return false, "invalid socket"
    end

    --print("IOProcessor.processIOEvent(): ", sock, ovl.operation);

    local fiber = self.EventFibers[sock];
    if fiber then
      self:scheduleFiber(fiber);
      self.EventFibers[sock] = nil;
      self.FibersAwaitingEvent[fiber] = nil;
    else
      print("EventScheduler_t.ProcessEventQueue(), No Fiber waiting to process.")
      -- remove the socket from the watch list
    end
end

IOProcessor.stepIOEvents = function(self)
    -- Check to see if there are any IO Events to deal with
    local key, numbytes, overlapped = self.IOEventQueue:dequeue(self.MessageQuanta);

    if key then
      self:processIOEvent(key, numbytes, overlapped);
    else
      -- typically timeout
      --print("Event Pool ERROR: ", numbytes);
    end
end

IOProcessor.stepFibers = function(self)
  -- Now check the regular fibers
  local fiber = self.fibers:Dequeue()

  -- Take care of spawning a fiber first
  if fiber then
    if fiber.status ~= "dead" then
      self.CurrentFiber = fiber;
      local result, values = fiber:Resume();
      if not result then
        print("RESUME RESULT: ", result, values)
      end
      self.CurrentFiber = nil;

      if fiber.status ~= "dead" and not self.FibersAwaitingEvent[fiber] then
        self:scheduleFiber(fiber)
      else
        --print("FIBER FINISHED")
        -- remove coroutine from dictionary
        self:removeFiber(fiber)
      end
    else
      self:removeFiber(fiber)
    end
  end
end

IOProcessor.step = function(self)
  self:stepFibers();
  self:stepIOEvents();
end

return IOProcessor

There are a couple of ways to approach this. From the perspective of the other event loop, the “step()” method here is executed once around the loop. The ‘step()’ method in turn checks on the fibers, and then on the ioevents. “stepFibers” checks the list of fibers that are ready to run, and runs one of them for a bit until it yields, and is thus placed back on the queue of fibers ready to be run, or it finishes. This is the part where a normal cooperative processing system could be brought to its knees, and a preemptive multi-tasking system would just keep going. The ‘stepIOEvents()’ function checks on the IOCompletionPort that is being used by sockets to indicate whether anything interesting has occured. If there has been any activity, the cooperative thread associated with the activity is scheduled to execute a bit of code. It does not execute immediately, but it is now on the list to be executed next time around.

The stepIOEvents() function is at the heart of any system, such as node.js, which gets high performance with IO processing, while maintaining a low CPU load. Most of the time you’re just waiting, doing nothing, and once the system indicates there is action on the socket, you can spring into action. Thus, you do not spend any time looping over sockets polling to see if there’s any activity, you’re just notified when there is.

The rest of the code is largely helpers, like creating a socket that is wired correctly and whatnot.

So, at the end of it, what does this system do?

Well, assuming I want to write a DateTimeClient, which talks to a service, gets the date and time, prints it out, etc, I would write this:

local ffi = require "ffi"
require("IOProcessor");

local daytimeport = 13

GetDateAndTime = function(hostname, port)
    hostname = hostname or "localhost";
    port = port or daytimeport;

    local socket, err = IOProcessor:createClientSocket(hostname, port);

    if not socket then
        print("Socket Creation Failed: ", err);
        return nil, err;
    end

    local bufflen = 256;
    local buff = ffi.new("char [?]", bufflen);

    local n, err = socket:receive(buff, bufflen)
 
    if not n then
        return false, err;
    end

    if n > 0 then
        return ffi.string(buff, n);
    end
end

Well, that looks like normal sequential code to me. And yes, it is. Nothing unusual. But, when running in the context of a computicle, like the following, it gets more interesting.

local Computicle = require("Computicle");

local codeTemplate = [[
require("DaytimeClient");
local dtc, err = GetDateAndTime("localhost");
print(dtc);
]]

local comp1 = Computicle:create(codeTemplate);
local comp2 = Computicle:create(codeTemplate);

comp1:waitForFinish();
comp2:waitForFinish();

This will take the otherwise sequential code of the DateTimeClient, and execute it in two parallel preemptive Operating System level threads, and wait for their completion. The magic is all hidden behind the schedulers and event loops. I never have to know about how that all happens, but I can appreciate the benefits.

Marrying the concepts of event driven, muti-process, cooperative user space threads, preemptive multi-tasking, and the like can be a daunting task. But, with a little bit of script, some chewing gum, a whistle and a prayer, it all comes together in a seamless whole which is quite easy to use, and fairly simple to understand. I think I have achieved my objectives for ease of use, maximizing efficiency, and reducing head explosions.


Computicles – Inter-computicle communication

Alrighty then, so a computicle is a vessel that holds a bit of computation power. You can communicate with it, and it can communicate with others.

Most computicles do not stand as islands unto themselves, so easily communicating with them becomes very important.

Here is some code that I want to be able to run:

local Computicle = require("Computicle");
local comp = Computicle:load("comp_receivecode");

-- start by saying hello
comp:exec([[print("hello, injector")]]);

-- queue up a quit message
comp:quit();

-- wait for it all to actually go through
comp:waitForFinish();

So, what’s going on here? The first line is a standard “require” to pull in the computicle module.

Next, I create a single instance of a Computicle, running the Lua code that can be found in the file “comp_receivecode.lua”. I’ll come back to that bit of code later. Suffice to say it’s running a simple computicle that does stuff, like execute bits of code that I hand to it.

Further on, I use the Computicle I just created, and call the “exec()” function. I’m passing a string along as the only parameter. What will happen is the receiving Computicle will take that string, and execute the script from within its own context. That’s actually a pretty nifty trick I think. Just imagine, outside the world of scripting, you can create a thread in one line of code, and then inject a bit of code for that thread to execute. Hmmm, the possibilities are intriguing methinks.

The tail end of this code just posts a quit, and then finally waits for everything to finish up. Just not that the ‘quit()’ function is not the same thing as “TerminateThread()”, or “ExitThread()”. Nope, all it does is post a specific kind of message to the receiving Computicle’s queue. What the thread does with that QUIT message is up to the individual Computicle.

Let’s have a look at the code for this computicle:

local ffi = require("ffi");

-- This is a basic message pump
-- 
while true do
  local msg = SELFICLE:getMessage();
  msg = ffi.cast("ComputicleMsg *", msg);
  local Message = msg.Message;

  if OnMessage then
    OnMessage(msg);
  else
    if Message == Computicle.Messages.QUIT then
      break;
    end

    if Message == Computicle.Messages.CODE then
      local len = msg.Param2;
      local codePtr = ffi.cast("const char *", msg.Param1);
		
      if codePtr ~= nil and len > 0 then
        local code = ffi.string(codePtr, len);

        SELFICLE:freeData(ffi.cast("void *",codePtr));

        local f = loadstring(code);
	f();
      end
    end
  end

  SELFICLE:freeMessage(msg);
end

It’s not too many lines. This little Computicle takes care a few scenarios.

First of all, if there so happens to be a ‘OnMessage’ function defined, it will receive the message, and the main loop will do no further processing of it.

If there is no ‘OnMessage’ function, then the message pump will handle a couple of cases. In case a ‘QUIT’ message is received, the loop will break and the thread/Computible will simply exit.

When the message == ‘CODE’ things get really interesting. The ‘Param1’ of the message contains a pointer to the actual bit of code that is intended to be executed. The ‘Param2’ contains the length of the specified code.

Through a couple of type casts, and an ffi.string() call, the code is turned into something that can be used with ‘loadstring()’, which is a standard Lua function. It will parse the string, and then when ‘f()’ is called, that string will actually be executed (within the context of the Computicle). And that’s that!

At the end, the ‘SELFICLE:freeMessage()’ is called to free up the memory used to allocate the outer message. Notice that ‘SELFICLE:freeData()’ was used to clean up the string value that was within the message itself. I have intimate knowledge of how this message was constructed, so I know this is the correct behavior. In general, if you’re going to pass data to a computicle, and you intend the Computicle to clean it up, you should use the computicle instance “allocData()” function.

OK. So, that explains how I could possibly inject some code into a Computicle for execution. That’s pretty nifty, but it looks a bit clunky. Can I do better?

I would like to be able to do the following.

comp.lowValue = 100;
comp.highValue = 200;

In this case, it looks like I’m setting a value on the computicle instance, but in which thread context? Well, what will actually happen is this will get executed within the computicle instance context, and be available to any code that is within the computicle.

We already know that the ‘exec()’ function will execute a bit of code within the context of the running computicle, so the following should now be possible:

comp:exec([[print(" Low: ", lowValue)]]);
comp:exec([[print("High: ", highValue)]])

Basically, just print those values from the context of the computile. If they were in fact set, then this should print them out. If there were not in fact set, then it should print ‘nil’ for each of them. On my machine, I get the correct values, so that’s an indication that they were in fact set correctly.

How is this bit of magic achieved?

The key is the Lua ‘__newindex’ metamethod. Wha? Basically, if you have a table, and you try to set a value that does not exist, like I did with ‘lowValue’ and ‘highValue’, the ‘__newindex()’ function will be called on your table if you’ve got it setup right. Here’s the associated code of the Computicle that does exactly this.

__newindex = function(self, key, value)
  local setvalue = string.format("%s = %s", key, self:datumToString(value, key));
  return self:exec(setvalue);
end

That’s pretty straight forward. Just create some string that represents setting whatever value you’re trying to set, and then call ‘exec()’, which is already known to execute within the context of the thread. So, in the case where I have written “comp.lowValue = 100”, this will turn into a sting that == “lowValue == 100”, and that string will be executed, setting a global variable ‘lowValue’ == 100.

And what is this ‘datumToString()’ function? Ah yes, this is the little bit that takes various values and returns their string equivalent, ready to be injected into a running Computicle.

Computicle.datumToString = function(self, data, name)
  local dtype = type(data);
  local datastr = tostring(nil);

  if type(data) == "cdata" then
      -- If it is a cdata type that easily converts to 
      -- a number, then convert to a number and assign to string
    if tonumber(data) then
      datastr = tostring(tonumber(data));
    else
      -- if not easily converted to number, then just assign the pointer
      datastr = string.format("TINNThread:StringToPointer(%s);", 
        TINNThread:PointerToString(data));
    end
  elseif dtype == "table" then
    if getmetatable(data) == Computicle_mt then
      -- package up a computicle
    else
      -- get a json string representation of the table
      datastr = string.format("[[ %s ]]",JSON.encode(data, {indent=true}));
    end
  elseif dtype == "string" then
    datastr = string.format("[[%s]]", data);
  else 
    datastr = tostring(data);
  end

  return datastr;
end

The task is actually fairly straight forward. Given a Lua based value, turn it into a string that can be executed in another Lua state. There are of course methods in Lua which will do this, and tons of marshalling frameworks as well. But, this is a quick and dirty version that does exactly what I need.

Of particular note are the handling of cdata and table types. For cdata, some of the values, such as ‘int64_t’, I want to just convert to a number. Tables are the most interesting. This particular technique will really only work for fairly simple tables, that do not make references to other tables and the like. Basically, turn the table into a JSON string, and send that across to be rehydrated as a table.

Here’s some code that actually makes use of this.

comp.contacts = {
  {first = "William", last = "Adams", phone = "(111) 555-1212"},
  {first = "Bill", last = "Gates", phone = "(111) 123-4567"},
}

comp:exec([=[
print("== CONTACTS ==")

-- turn contacts back into a Lua table
local JSON = require("dkjson");

local contable = JSON.decode(contacts);


for _, person in ipairs(contable) do
	--print(person);
	print("== PERSON ==");
	for k,v in pairs(person) do
		print(k,v);
	end
end
]=]);

Notice that ‘comp.contacts = …’ just assigns the created table directly. This is fine, as there are no other references to the table on ‘this’ side of the computicle, so it will be safely garbage collected after some time.

The rest of the code is using the ‘exec()’, so it is executing in the context of the computicle. It basically gets the value of the ‘contacts’ variable, and turns it back into a Lua table value, and does some regular processing on it (print out all the values).

And that’s about it. From the humble beginnings of being able to inject a bit of code to run in the context of an already running thread, to exchanging tables between two different threads with ease, Computicles make pretty short work of such a task. It all stems from the same three principles of Computicles: listen, compute, communicate. And again, not a single mutex, lock, or other visible form of synchronization. The IOCompletionPort is the single communications mechanism, and all the magic of serialized multi-threaded communication hide behind that.

Of course, ‘code injection’ is a dirty word around the computing world, so there must be a way to secure such transfers? Yah, sure, why not. I’ve been bumping around the indentity/security/authorization/authentication space recently, so surely something must be applicable here…