Computicles – A tale of two schedulers

One of the drivers for the creation of computicles is to maximize the efficiency of the running system while minimizing the complexity for the programmer.  Herein lies the rub.  Modern computers are multi-core/multi-proc, and Lua is largely a single core sort of system.  Lua has its own notion of “light weight threads”, which are essentially cooperative processing threads.  The native OS (Windows or Linux) has a notion of “threads” which are much more heavy weight.  While the Lua threads can number easily in the thousands and more, they are not actually running a parallel, they are just rapidly context switching between each other at the whim of the programmer.  The OS threads, on the other hand, are in fact running in parallel, on multiple cores if they exist.  But, as soon as you have more threads than you have cores, the threads are shifting rapidly between each other, just like in the Lua case, but it’s ‘preemptive’ instead of cooperative.

What I want?  I want to get the best of both worlds.  But, before starting down the path of leveraging the multiple cores, I want to start with the programming paradigm.

I want to write essentially serial code.  My brain is not good at dealing things like mutexes, semaphores, barriers, or any other kind of sharing mechanisms that have been invented over the past 40 years.  I know how to write straight sequential code.  I can deal with saying “spawn” to get something running in parallel, but that’s about it.

So, in steps computicles.

I’ve gone on about the subject a few times now, but I’ve finally created the unified scheduler that I require.  It looks like this:

 

-- comp_msgpump.lua
local ffi = require("ffi");
require("IOProcessor");

-- default to 15 millisecond timeout
gIdleTimeout = gIdleTimeout or 15

local idlecount = 0;

while true do
  if IOProcessor then
    IOProcessor:step();
  end

  local msg, err = SELFICLE:getMessage(gIdleTimeout);

  if not msg then
    if err == WAIT_TIMEOUT then
      --print("about to idle")
      idlecount = idlecount + 1;
      if OnIdle then
        OnIdle(idlecount);
      end
    end
  else
    local msgFullyHandled = false;
    msg = ffi.cast("ComputicleMsg *", msg);

    if OnMessage then
      msgFullyHandled = OnMessage(msg);
    end

    if not msgFullyHandled then
      msg = ffi.cast("ComputicleMsg *", msg);
      local Message = msg.Message;
      --print("Message: ", Message, msg.Param1, msg.Param2);
		
      if Message == Computicle.Messages.QUIT then
        if OnExit then
          OnExit();
        end
        break;
      end

      if Message == Computicle.Messages.CODE then
        local len = msg.Param2;
        local codePtr = ffi.cast("const char *", msg.Param1);
		
        if codePtr ~= nil and len > 0 then
          local code = ffi.string(codePtr, len);

          SELFICLE:freeData(ffi.cast("void *",codePtr));

          local func = loadstring(code);
          func();
        end
      end
      SELFICLE:freeMessage(msg);
    end
  end
end

 
This is pretty much the same event driven loop that has existed previously. It’s main function is to get messages off its message queue, and deal with them. This is how you communicate with a computicle. Under normal circumstances, a Computicle can simply implement either OnMessage(), if it wants to only respond when it receives a message. This is a perfectly event driven way to exist. Or it can implement OnIdle() if it wants to respond to the fact that nothing else is occuring in the system. This is a great combination, and will cover many useful cases. But what about waiting for some IO to complete?

Well, at the top of this event loop there is the IOProcessor:step() call. And what is an IOProcessor?

The IOProcessor is a scheduler for cooperative Lua threads. The IOProcessor assumes the user’s code is utilizing co-routines, and will deal with putting them on a ‘sleeping’ list whenever they perform a task, such as socket IO which does not complete immediately. It’s a classic, and before the Computicles existed, this was the primary scheduler.

It’s a bit thick with code, but here it is:

local ffi = require("ffi");

local Collections = require "Collections"
local IOCPSocket = require("IOCPSocket");
local SimpleFiber = require("SimpleFiber");
local IOCompletionPort = require("IOCompletionPort");
local SocketOps = require("SocketOps");


IOProcessor = {
  fibers = Collections.Queue.new();
  coroutines = {};
  EventFibers = {};
  FibersAwaitingEvent = {};

  IOEventQueue = IOCompletionPort:create();
  MessageQuanta = 15;		-- 15 milliseconds
};


--[[
	Socket Management
--]]

IOProcessor.createClientSocket = function(self, hostname, port)
  return IOCPSocket:createClient(hostname, port, self)
end

IOProcessor.createServerSocket = function(self, params)
  return IOCPSocket:createServerSocket(params, self)
end

IOProcessor.observeSocketIO = function(self, socket)
  return self.IOEventQueue:addIoHandle(socket:getNativeHandle(), socket.SafeHandle);
end

--[[
	Fiber Handling
--]]

IOProcessor.scheduleFiber = function(self, afiber, ...)
  if not afiber then
    return nil
  end
  self.coroutines[afiber.routine] = afiber;
  self.fibers:Enqueue(afiber);	

  return afiber;
end

IOProcessor.spawn = function(self, aroutine, ...)
  return self:scheduleFiber(SimpleFiber(aroutine, ...));
end

IOProcessor.removeFiber = function(self, fiber)
  self.coroutines[fiber.routine] = nil;
end

IOProcessor.inMainFiber = function(self)
  return coroutine.running() == nil; 
end

IOProcessor.yield = function(self)
  coroutine.yield();
end

IOProcessor.yieldForIo = function(self, sock, iotype)
  -- associate a fiber with a socket
  print("yieldForIo, CurrentFiber: ", self.CurrentFiber);
	
  self.EventFibers[sock:getNativeSocket()] = self.CurrentFiber;

  -- Keep a list of fibers that are awaiting io
  if self.CurrentFiber ~= nil then
    self.FibersAwaitingEvent[self.CurrentFiber] = true;

    -- Whether we were successful or not in adding the socket
    -- to the pool, perform a yield() so the world can move on.
    self:yield();
  end
end


IOProcessor.processIOEvent = function(self, key, numbytes, overlapped)
    local ovl = ffi.cast("SocketOverlapped *", overlapped);
    local sock = ovl.sock;
    ovl.bytestransferred = numbytes;
    if sock == INVALID_SOCKET then
		return false, "invalid socket"
    end

    --print("IOProcessor.processIOEvent(): ", sock, ovl.operation);

    local fiber = self.EventFibers[sock];
    if fiber then
      self:scheduleFiber(fiber);
      self.EventFibers[sock] = nil;
      self.FibersAwaitingEvent[fiber] = nil;
    else
      print("EventScheduler_t.ProcessEventQueue(), No Fiber waiting to process.")
      -- remove the socket from the watch list
    end
end

IOProcessor.stepIOEvents = function(self)
    -- Check to see if there are any IO Events to deal with
    local key, numbytes, overlapped = self.IOEventQueue:dequeue(self.MessageQuanta);

    if key then
      self:processIOEvent(key, numbytes, overlapped);
    else
      -- typically timeout
      --print("Event Pool ERROR: ", numbytes);
    end
end

IOProcessor.stepFibers = function(self)
  -- Now check the regular fibers
  local fiber = self.fibers:Dequeue()

  -- Take care of spawning a fiber first
  if fiber then
    if fiber.status ~= "dead" then
      self.CurrentFiber = fiber;
      local result, values = fiber:Resume();
      if not result then
        print("RESUME RESULT: ", result, values)
      end
      self.CurrentFiber = nil;

      if fiber.status ~= "dead" and not self.FibersAwaitingEvent[fiber] then
        self:scheduleFiber(fiber)
      else
        --print("FIBER FINISHED")
        -- remove coroutine from dictionary
        self:removeFiber(fiber)
      end
    else
      self:removeFiber(fiber)
    end
  end
end

IOProcessor.step = function(self)
  self:stepFibers();
  self:stepIOEvents();
end

return IOProcessor

There are a couple of ways to approach this. From the perspective of the other event loop, the “step()” method here is executed once around the loop. The ‘step()’ method in turn checks on the fibers, and then on the ioevents. “stepFibers” checks the list of fibers that are ready to run, and runs one of them for a bit until it yields, and is thus placed back on the queue of fibers ready to be run, or it finishes. This is the part where a normal cooperative processing system could be brought to its knees, and a preemptive multi-tasking system would just keep going. The ‘stepIOEvents()’ function checks on the IOCompletionPort that is being used by sockets to indicate whether anything interesting has occured. If there has been any activity, the cooperative thread associated with the activity is scheduled to execute a bit of code. It does not execute immediately, but it is now on the list to be executed next time around.

The stepIOEvents() function is at the heart of any system, such as node.js, which gets high performance with IO processing, while maintaining a low CPU load. Most of the time you’re just waiting, doing nothing, and once the system indicates there is action on the socket, you can spring into action. Thus, you do not spend any time looping over sockets polling to see if there’s any activity, you’re just notified when there is.

The rest of the code is largely helpers, like creating a socket that is wired correctly and whatnot.

So, at the end of it, what does this system do?

Well, assuming I want to write a DateTimeClient, which talks to a service, gets the date and time, prints it out, etc, I would write this:

local ffi = require "ffi"
require("IOProcessor");

local daytimeport = 13

GetDateAndTime = function(hostname, port)
    hostname = hostname or "localhost";
    port = port or daytimeport;

    local socket, err = IOProcessor:createClientSocket(hostname, port);

    if not socket then
        print("Socket Creation Failed: ", err);
        return nil, err;
    end

    local bufflen = 256;
    local buff = ffi.new("char [?]", bufflen);

    local n, err = socket:receive(buff, bufflen)
 
    if not n then
        return false, err;
    end

    if n > 0 then
        return ffi.string(buff, n);
    end
end

Well, that looks like normal sequential code to me. And yes, it is. Nothing unusual. But, when running in the context of a computicle, like the following, it gets more interesting.

local Computicle = require("Computicle");

local codeTemplate = [[
require("DaytimeClient");
local dtc, err = GetDateAndTime("localhost");
print(dtc);
]]

local comp1 = Computicle:create(codeTemplate);
local comp2 = Computicle:create(codeTemplate);

comp1:waitForFinish();
comp2:waitForFinish();

This will take the otherwise sequential code of the DateTimeClient, and execute it in two parallel preemptive Operating System level threads, and wait for their completion. The magic is all hidden behind the schedulers and event loops. I never have to know about how that all happens, but I can appreciate the benefits.

Marrying the concepts of event driven, muti-process, cooperative user space threads, preemptive multi-tasking, and the like can be a daunting task. But, with a little bit of script, some chewing gum, a whistle and a prayer, it all comes together in a seamless whole which is quite easy to use, and fairly simple to understand. I think I have achieved my objectives for ease of use, maximizing efficiency, and reducing head explosions.

Advertisements

2 Comments on “Computicles – A tale of two schedulers”

  1. […] Computicles – A tale of two schedulers […]

  2. I like the valuable information you provide in your articles.
    I’ll bookmark your blog and check again here frequently.
    I’m quite sure I will learn many new stuff right here!
    Best of luck for the next!


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s