Have Computicles Arrived?

So, I’ve written quite a lot about computicles over the past few years.  In most of those articles, I’m talking about the software implementation of tiny units of computation.  The idea for computicles stems from a conversation I had with my daughter circa 2007 in which I was laying out a grand vision of the world where units of computation would be really small, fit in your hand sized, and be able to connect and do stuff fairly easily.  That was my envisioning of ubiquitous computing.  And so, yesterday, I received the latest creation from HardKernel, the Odroid HC1 (HC – Home Cloud).

20170902_072503

Hardkernel is a funny company.  I’ve been buying at least one of everything they’ve made in the past 5 years or so.  They essentially make single board computers in the “credit card” form factor.  What you see in the picture is the HC1, with an attached SSD of 120Gb.  The SSD is 2.5″ standard, so that gives you a sense of the size of the thing.  The black is aluminum, and it’s the heat sink for the unit.

The computer bit of it is essentially a reworked Odroid XU4, which all by itself is quite a strong computer in this category.  Think Raspberry Pi, but 4 or 5 times stronger.  The HC1 has a single Sata connector to slot in whatever hard storage you choose.  No extra ribbon cables or anything like that.  The XU4 itself can run variants of Linux, as well as Android.  The uSD card sticking out the right side provides the OS.  In this particular case I’m using OMV (Open Media Vault), because I wanted to try the unit out as a NAS in my home office.

One of the nice things about the HC1 is that it’s stackable.  So, I can see piling up 3 or 4 of these to run my local server needs.  Of course, when you compare to the giant beefy 64-bit super server that I’m currently typing at, these toy droids give it very little competition in the way of absolute compute power.  They even did an analysis of bitcoin mining and determined a number of years it would take to get a return on their investment.  But, computing, and computicles aren’t about absolute concentrated power.  Rather, they are about distributed processing.

Right now I have a Synology, probably the equivalent of today’s DS1517.  That thing has RAID up the wazoo, redundant power, multiple nics, and it’s just a reliable workhorse that just won’t quit, so far.  The base price starts at $799, before you actually start adding storage media.  The HC1 starts at $49.  Of course there’s no real comparison in terms of reliability, redundancy, and the like, or is there?

Each HC1 can hold a single disk.  You can throw on whatever size and variety you like.  This first one has a Samsung SSD that’s a couple years old, at 120Gb.  These days you can get 250Gb for $90.  You can go up to 4TB with an SSD, but that’s more like a $1600 proposition.  So, I’d be sticking with the low end.  That makes a reasonable storage computicle roughly $150.

You could of course skip the SSD or HDD and just stick a largish USB thumb drive, or 128Gb uSD for $65, but the speed on that interface isn’t going to be nearly as fast as the Sata III interface the HC1 is sporting.  So, great for a small time use, but for relatively constant streaming and download, the SSD solutions, and even HDD solutions will be more robust.

So, what’s the use case?  Well, besides the pure geekery of the thing, I’m trying to imagine more appliance like usage.  I’m imagining what it looks like to have several of these placed throughout the house.  Maybe one is configured as a YouTube downloader, and that’s all it does all the time, shunting to the larger Synology every once in a while.

How about video streaming?  Right now that’s served up by the Synology running a Plex server, which is great for most clients, but sometimes, it’s just plain slow, particularly when it comes to converting video clips from ancient cameras and cell phones.  Having one HC1 dedicated to the task of converting clips to various formats that are known to be used in our house would be good.  Also, maybe serving up the video itself?  The OMV comes with a minidlna server, which works well with almost all the viewing kit we have.  But, do we really have any hiccups when it comes to video streaming from the Synology?  Not enough to worry about, but still.

Maybe it’s about multiple redundant distributed RAID.  With 5 – 10 of these spread throughout the house, each one could fail in time, be replaced, and nothing would be the wiser.  I could load each with a couple of terabytes, and configure some form of pleasant redudancy across them and be very happy.  But, then there’s the cloud.  I actually do feel somewhat reassured having the ability to backup somewhere else.  As recent flooding in Texas shows, as well as wildfires, no matter how much redundancy you have locally, it’s local.

Then there’s compute.  Like I said, a single beefy x64 machine with a decent GPU is going to smoke any single one of these.  Likewise if you have a small cluster of these.  But, that doesn’t mean it’s not useful.  Odroid boards are ARM based, which makes them inherently low power consumption compared to their intel counterparts.  If I’ve have some relatively light loads that are trivially parallelizable, then having a cluster of a few of these might make some sense.  Again with the ubiquitous computing, if I want to have the Star Trek style “computer, where’s my son”, or “turn on the lights in the garage”, without having to send my voice to the cloud constantly, then performing operations such as speech recognition on a little cluster might be interesting.

The long and short of it is that having a compute/storage module in the $150 range makes for some interesting thinking.  It’s surely not the only storage option in this range, but the combination of darned good hardware, tons of software support, low price and easy assembly, gives me pause to consider the possibilities.  Perhaps the hardware has finally caught up to the software, and I can start realizing computicles in physical as well as soft form.

Advertisements

schedlua – async io

And so, finally we come to the point. Thus far, we looked at the simple scheduler, the core signaling, and the predicate and alarm functions built atop that. That’s all great stuff for fairly straight forward apps that need some amount of concurrency. The best part though is when you can do concurrent networking stuff.

Here’s the setup; I want to issue 20 concurrent GET requests to 20 different sites, and get results back. I want the program to halt once all the tasks have been completed.

--test_linux_net.lua
package.path = package.path..";../?.lua"

local ffi = require("ffi")

local Kernel = require("kernel"){exportglobal = true}
local predicate = require("predicate")(Kernel, true)
local AsyncSocket = require("AsyncSocket")

local sites = require("sites");

-- list of tasks
local taskList = {}


local function httpRequest(s, sitename)
	local request = string.format("GET / HTTP/1.1\r\nUser-Agent: schedlua (linux-gnu)\r\nAccept: */*\r\nHost: %s\r\nConnection: close\r\n\r\n", sitename);
	return s:write(request, #request);
end

local function httpResponse(s)
	local BUFSIZ = 512;
	local buffer = ffi.new("char[512+1]");
	local bytesRead = 0
	local err = nil;
	local cumulative = 0

	repeat
		bytesRead, err = s:read(buffer, BUFSIZ);

		if bytesRead then
			cumulative = cumulative + bytesRead;
		else
			print("read, error: ", err)
			break;
		end
	until bytesRead < 1

	return cumulative;
end


local function siteGET(sitename)
	print("siteGET, BEGIN: ", sitename);

	local s = AsyncSocket();

	local success, err = s:connect(sitename, 80);  

	if success then
		httpRequest(s, sitename);
		httpResponse(s);
	else
		print("connect, error: ", err, sitename);
	end

	s:close();

	print("siteGET, FINISHED: ", sitename)
end


local function allProbesFinished()
	for idx, t in ipairs(taskList) do
		if t:getStatus() ~= "dead" then
			return false;
		end
	end

	return true;
end

local function main()
	for count=1,20 do
		table.insert(taskList, Kernel:spawn(siteGET, sites[math.random(#sites)]))
		Kernel:yield();
	end

	when(allProbesFinished, halt);
end

run(main)

Step by step. The httpRequest() function takes a socket, and does the most bare mimimal HTTP GET request, assuming the socket is already connected to the site.

Similarly, the httpResponse() function gets a response back from the server, and reads as much as it can until the socket is closed (because the Connection: close header was sent).

That’s about the most basic of HTTP request/response pairs you can have, ignoring doing any parsing of the returned data.

Alright, so let’s wrap those two up into a function called siteGET(). siteGET(sitename) takes the name of a site, creates a socket, connects it to the site, and then issues the httpRequest(), and then the httpResponse(). Very simple. What I like about this is that the httpRequest(); httpResponse() sequence is executed in serial as far as I’m concerned. I don’t have to be worried about the httpResponse() being issued before the request completes. Furthermore, if I didn’t use a spawn(), I could simply execute the code directly and be none the wiser.

I want to execute these siteGET()s concurrently though, so within main(), I start up 20 of these tasks, and let them go. Then comes the waiting part:

local function allProbesFinished()
	for idx, t in ipairs(taskList) do
		if t:getStatus() ~= "dead" then
			return false;
		end
	end

	return true;
end

	when(allProbesFinished, halt);

Going back to our knowledge of predicates, we know that the ‘when’ function takes a predicate (function that returns true/false), and will execute the second function when the predicate returns true.

OK, so we just need to come up with a predicate which tells us that all the tasks have completed. Easy enough as a list of the tasks is generated when they are spawned. So, we just go through that list and see if any of them are still running. If there is a single one that is still running, the predicate will return false, and ‘halt()’ will not be called. As soon as the last task finished, the predicate will return true, and the halt() function will be called.

Of course, most things in schedlua are convenient compositions of deeper things (with signals being at the core).

Instead of using the ‘when’ function, you could write the code more directly like this:

	while true
		if allProbesFinished() then
			halt();
			break;
		end
		yield();
	end

That doesn’t quite look as nice as just using the when() function I think. Also, you’re sitting in the main() function, which is no big deal as there’s nothing else trying to execute after this, but it just doesn’t seem as clean. Furthermore, the ‘when’ function might have some magic in its implementation, such as a better understanding of the state of tasks, or special knowledge of the scheduler, or who knows what. At any rate, either way essentially implements a barrier, and the technique can be used anywhere you want to perform an action after some set of tasks has completed. The allProbesFinished() function can be generalized to wait on any list of tasks, maybe call it “waitForTasks()” or some such thing.

At any rate, that completes the primitives that are baked into the core schedlua package. Everything from signals, to predicates, alarms, and finally async io. Of course this is Linux, so async io works with any file descriptor, not just network sockets, so file management or device communications in general can be thrown into the mix.

Now that the basics work, it’s a matter of cleaning up, writing more test cases, fixing bugs, reorganizing, and optimizing resource usage a bit. In general though, the constructs are there, and it’s ready to be used for real applications.


schedlua – predicates and alarms

A few years back, during the creation of Language Integrated Query (LINQ), I had this idea. If we could add database semantics to the language, what would adding async semantics look like. These days we’ve gone the route of async/await and various other constructs, but still, I always just wanted a primitives. such as “when”, “whenever”, and “waitUntil”. The predicates in schedlua are just that:

  • signalOnPredicate(predicate, signalName)
  • waitForPredicate(predicate)
  • when(predicate, func)
  • whenever(predicate, func)

Of courese, this is all based at the very core on the signaling mechanism that’s in the kernel of schedlua, but these primitives are not in the kernel proper.  They don’t need to be, which is nice because it means you can easily add such functions without having to alter the core.

What do they look like in practice?  Well, first of all, a ‘predicate’ is nothing more than a fancy name for a function that returns a bool value.  It will either return ‘true’ or ‘false’.  Based on this, various things can occur.  For example, ‘signalOnPredicate’, when the predicate returns ‘true’, emit the signal specified by signalName.  Similarly, for ‘waitForPredicate’, the currently running task will be put into a suspended state until such time as the predicate returns ‘true’.  ‘when’ and ‘whenever’ are similar, but they spawn new tasks, rather than suspending the existing task.  And here’s some code:

 

--test_scheduler.lua
package.path = package.path..";../?.lua"

local Functor = require("functor")
local Kernel = require("kernel"){exportglobal = true}
local Predicate = require("predicate")(Kernel, true)

local idx = 0;
local maxidx = 100;

local function numbers(ending)
	local function closure()
		idx = idx + 1;
		if idx > ending then
			return nil;
		end
		return idx;
	end
	
	return closure;
end



local function counter(name, nCount)
	for num in numbers(nCount) do
		print(num)
		local eventName = name..tostring(num);
		--print(eventName)
		signalOne(eventName);

		yield();
	end

	signalAll(name..'-finished')
end


local function predCount(num)
	waitForPredicate(function() return idx > num end)
	print(string.format("PASSED: %d!!", num))
end



local function every5()
	while idx <= maxidx do
		waitForPredicate(function() return (idx % 5) == 0 end)
		print("!! matched 5 !!")
		yield();
	end
end

local function test_whenever()
	local t1 = whenever(
		function() 
			if idx >maxidx then return nil end; 
			return (idx % 2) == 0 end,
		function() print("== EVERY 2 ==") end)
end

local function main()
	local t1 = spawn(counter, "counter", maxidx)

	local t6 = spawn(test_whenever);

	local t2 = spawn(predCount, 12)
	local t3 = spawn(every5)
	local t4 = spawn(predCount, 50)
	local t5 = when(
		function() return idx == 75 end, 
		function() print("WHEN IDX == 75!!") end)
	local t6 = when(function() return idx >= maxidx end,
		function() halt(); end);


end

run(main)



It’s a test case, so it’s doing some contrived things.  Basically, there is one task that is running a counter that throws a signal for every new number (up to maxid).  Then you can see the various tests which use predicates.

local function every5()
	while idx <= maxidx do
		waitForPredicate(function() return (idx % 5) == 0 end)
		print("!! matched 5 !!")
		yield();
	end
end

Here, we’re just running a continuous loop which will print it’s message every time the predicate is true. It seems kind of wasteful doesn’t it? Under normal circumstances, this would be a very hot spin loop, but when you call ‘waitForPredicate’, you task will alctually be thrown into a ‘suspended’ state, which means if there are other tasks to execute, they’ll go ahead, and you’ll get back in the queue to be tested later. So, it’s really, “test this predicate, if it’s not true, then throw the task at the back of the ready list, and test it again later. If it’s true, then continue on with whatever is next in this task”. The ‘yield()’ here is redundant.

In this particular case, we’ve essentially created a ‘whenever’ construct. This construct happens enough that it’s worth creating a convenience function for it.

local function test_whenever()
	local t1 = whenever(
		function() 
			if idx >maxidx then return nil end; 
			return (idx % 2) == 0 end,
		function() print("== EVERY 2 ==") end)
end

In this particular case, we let the ‘whenever’ construct do the work for us. Every other count, we’ll print our message. Of course, I’m using in place functions (lambda expressions?) in these test cases. They don’t have to be that way, you can set the functions however you want.

t6 is interesting because it says, ‘when the count reaches maxidx, halt the program’, which will in fact break us out of the main even loop and stop the program. Very convenient. This construct is useful because there may be myriad reasons why you’d want to stop the program. You can simply setup a predicate to do that. It could be a ‘when’ or perhaps you’d rather it be based on a signal, in that case use a signalOnPredicate/waitForSignal sort of thing. It’s composable, so use whatever set of constructs makes the most sense. It’s kind of a sane form of exception handling.

So there you have it, yet another construct tackled. Predicates are a simple construct that kind of extend the if/then flow control into the async realm. ‘when’ is almost a direct replacement for ‘if’ in this context. The waitOnPredicate is kind of a new construct I think. It’s like an if/spinlock, except you’re not spinning, you’re suspended with periodic checks on the predicate. And then of course the ‘signalOnPredicate’ is like a hail Mary pass. You don’t know who/what is going to respond, but you’re going to send up the signal. That’s like programming with interrupts, except, unless the scheduler allows for high priority interrupts/signals, these will be handled in the normal flow of cooperative processing.

Predicates are great, they’re a slightly different construct than I’ve typically used in my every day programming. They make some tasks a lot easier, and they make thinking about async programming more manageable.

And then there’s time…

Time is a lot easier construct to think about, because it’s well represented in most language frameworks already. Here are the time primitives:
 

  • waitUntilTime
  • sleep
  • delay
  • periodic

 
‘waitUntilTime’ is the lynch pin in this case. It will suspend the current task until the specified time. Time, in this case is a relative thing. The alarm module keeps its own clock, so everything is expressed relative to that clock.

sleep(seconds), will simply suspend the current task for a specified number of seconds. You can specify fractions of seconds, and the clock has nanosecond precision, but we’re not using a realtime scheduler, so you’ll get some amount of delay. Of course you could simply swap in a new scheduler and deal with any realtime requirements you might have.

delay, will spawn a task which will then execute the specified function after the specified amount of time has passed. This is kind of like a ‘when’ predicate, with a specialization for time. You could in fact reconstruct this primitive using the when predicate, but the alarm, knowing about time as it does, will do it more efficiently.

local Kernel = require("kernel"){exportglobal = true};
local Alarm = require("alarm")(Kernel)
local Clock = require("clock")

local c1 = Clock();

local function twoSeconds()
	print("TWO SECONDS: ", c1:secondsElapsed());
	Kernel:halt();
end

local function test_alarm_delay()
	print("delay(twoSeconds, 2000");
	Alarm:delay(twoSeconds, 2000);
end

run(test_alarm_delay)

periodic is similar, in that it well execute a function, but whereas ‘delay’ is a oneshot event, ‘periodic’ will repeat. In this way it is like the ‘whenever’ construct.

And there you have it. Between the predicates and the alarms, you have some new basic constructs for doing async programming. They are supported by the signaling construct that’s already a part of the kernel. They are simple add-ons, which means you can easily create your own constructs and replace these, or create completely new constructs which are similar. They can leverage the signaling mechanism, or maybe they want to do something else altogether.

So far, the constructs have been of the if/then variety, only in async fashion. I think there’s another set of constructs, which have to do with barriers and completions of tasks. That will clean up the other part of async, which is the ‘await’. We’ll see. In the meanwhile, next time, async io, which is pretty exciting.


TINN Version 0.7 Available

Although TINN is under constant development, there’s nothing like declaring a new “release”. It’s been 3 months since the 0.6 release. So, now there is a 0.7 release. You can read about the TINN v0.7 Release and install it if you would like.

There were 84 commits since the previous release, so I can’t even remember all the changes that were involved. The major addition from my most recent work has to do with the new scheduler as described on this blog. That’s the extensible, plug-in driven scheduler. Pretty nifty for my work at least.

There are quite a few additions, such as a revamped stream object, io completion port supported file interface, a logfile thing, and quite a few more interfaces from the OS.

Other items I have been working on include support for various COM interfaces such as DXGI, DXVA, MMDevice and some others. There are all in the “experimental” folder if you look at the enlistment. They are not quite ready for prime time, so they’re not actually in the v0.7 release.

What can you do with TINN? In short, you can create all sorts of Windows based applications. Everything from scalable web services to interactive multi-tasking UI (including OpenGL based).

TINN is a command line tool (tinn.exe). As such, the easiest thing to do is bring up a command line shell and run various scripts through tinn.

c:\> tinn.exe hello.lua

The TINN repository contains numerous test cases that utilize the various modules of TINN.

That’s it for now.  Next release will be about cleanup and stabilization primarily.


Computicles – A tale of two schedulers

One of the drivers for the creation of computicles is to maximize the efficiency of the running system while minimizing the complexity for the programmer.  Herein lies the rub.  Modern computers are multi-core/multi-proc, and Lua is largely a single core sort of system.  Lua has its own notion of “light weight threads”, which are essentially cooperative processing threads.  The native OS (Windows or Linux) has a notion of “threads” which are much more heavy weight.  While the Lua threads can number easily in the thousands and more, they are not actually running a parallel, they are just rapidly context switching between each other at the whim of the programmer.  The OS threads, on the other hand, are in fact running in parallel, on multiple cores if they exist.  But, as soon as you have more threads than you have cores, the threads are shifting rapidly between each other, just like in the Lua case, but it’s ‘preemptive’ instead of cooperative.

What I want?  I want to get the best of both worlds.  But, before starting down the path of leveraging the multiple cores, I want to start with the programming paradigm.

I want to write essentially serial code.  My brain is not good at dealing things like mutexes, semaphores, barriers, or any other kind of sharing mechanisms that have been invented over the past 40 years.  I know how to write straight sequential code.  I can deal with saying “spawn” to get something running in parallel, but that’s about it.

So, in steps computicles.

I’ve gone on about the subject a few times now, but I’ve finally created the unified scheduler that I require.  It looks like this:

 

-- comp_msgpump.lua
local ffi = require("ffi");
require("IOProcessor");

-- default to 15 millisecond timeout
gIdleTimeout = gIdleTimeout or 15

local idlecount = 0;

while true do
  if IOProcessor then
    IOProcessor:step();
  end

  local msg, err = SELFICLE:getMessage(gIdleTimeout);

  if not msg then
    if err == WAIT_TIMEOUT then
      --print("about to idle")
      idlecount = idlecount + 1;
      if OnIdle then
        OnIdle(idlecount);
      end
    end
  else
    local msgFullyHandled = false;
    msg = ffi.cast("ComputicleMsg *", msg);

    if OnMessage then
      msgFullyHandled = OnMessage(msg);
    end

    if not msgFullyHandled then
      msg = ffi.cast("ComputicleMsg *", msg);
      local Message = msg.Message;
      --print("Message: ", Message, msg.Param1, msg.Param2);
		
      if Message == Computicle.Messages.QUIT then
        if OnExit then
          OnExit();
        end
        break;
      end

      if Message == Computicle.Messages.CODE then
        local len = msg.Param2;
        local codePtr = ffi.cast("const char *", msg.Param1);
		
        if codePtr ~= nil and len > 0 then
          local code = ffi.string(codePtr, len);

          SELFICLE:freeData(ffi.cast("void *",codePtr));

          local func = loadstring(code);
          func();
        end
      end
      SELFICLE:freeMessage(msg);
    end
  end
end

 
This is pretty much the same event driven loop that has existed previously. It’s main function is to get messages off its message queue, and deal with them. This is how you communicate with a computicle. Under normal circumstances, a Computicle can simply implement either OnMessage(), if it wants to only respond when it receives a message. This is a perfectly event driven way to exist. Or it can implement OnIdle() if it wants to respond to the fact that nothing else is occuring in the system. This is a great combination, and will cover many useful cases. But what about waiting for some IO to complete?

Well, at the top of this event loop there is the IOProcessor:step() call. And what is an IOProcessor?

The IOProcessor is a scheduler for cooperative Lua threads. The IOProcessor assumes the user’s code is utilizing co-routines, and will deal with putting them on a ‘sleeping’ list whenever they perform a task, such as socket IO which does not complete immediately. It’s a classic, and before the Computicles existed, this was the primary scheduler.

It’s a bit thick with code, but here it is:

local ffi = require("ffi");

local Collections = require "Collections"
local IOCPSocket = require("IOCPSocket");
local SimpleFiber = require("SimpleFiber");
local IOCompletionPort = require("IOCompletionPort");
local SocketOps = require("SocketOps");


IOProcessor = {
  fibers = Collections.Queue.new();
  coroutines = {};
  EventFibers = {};
  FibersAwaitingEvent = {};

  IOEventQueue = IOCompletionPort:create();
  MessageQuanta = 15;		-- 15 milliseconds
};


--[[
	Socket Management
--]]

IOProcessor.createClientSocket = function(self, hostname, port)
  return IOCPSocket:createClient(hostname, port, self)
end

IOProcessor.createServerSocket = function(self, params)
  return IOCPSocket:createServerSocket(params, self)
end

IOProcessor.observeSocketIO = function(self, socket)
  return self.IOEventQueue:addIoHandle(socket:getNativeHandle(), socket.SafeHandle);
end

--[[
	Fiber Handling
--]]

IOProcessor.scheduleFiber = function(self, afiber, ...)
  if not afiber then
    return nil
  end
  self.coroutines[afiber.routine] = afiber;
  self.fibers:Enqueue(afiber);	

  return afiber;
end

IOProcessor.spawn = function(self, aroutine, ...)
  return self:scheduleFiber(SimpleFiber(aroutine, ...));
end

IOProcessor.removeFiber = function(self, fiber)
  self.coroutines[fiber.routine] = nil;
end

IOProcessor.inMainFiber = function(self)
  return coroutine.running() == nil; 
end

IOProcessor.yield = function(self)
  coroutine.yield();
end

IOProcessor.yieldForIo = function(self, sock, iotype)
  -- associate a fiber with a socket
  print("yieldForIo, CurrentFiber: ", self.CurrentFiber);
	
  self.EventFibers[sock:getNativeSocket()] = self.CurrentFiber;

  -- Keep a list of fibers that are awaiting io
  if self.CurrentFiber ~= nil then
    self.FibersAwaitingEvent[self.CurrentFiber] = true;

    -- Whether we were successful or not in adding the socket
    -- to the pool, perform a yield() so the world can move on.
    self:yield();
  end
end


IOProcessor.processIOEvent = function(self, key, numbytes, overlapped)
    local ovl = ffi.cast("SocketOverlapped *", overlapped);
    local sock = ovl.sock;
    ovl.bytestransferred = numbytes;
    if sock == INVALID_SOCKET then
		return false, "invalid socket"
    end

    --print("IOProcessor.processIOEvent(): ", sock, ovl.operation);

    local fiber = self.EventFibers[sock];
    if fiber then
      self:scheduleFiber(fiber);
      self.EventFibers[sock] = nil;
      self.FibersAwaitingEvent[fiber] = nil;
    else
      print("EventScheduler_t.ProcessEventQueue(), No Fiber waiting to process.")
      -- remove the socket from the watch list
    end
end

IOProcessor.stepIOEvents = function(self)
    -- Check to see if there are any IO Events to deal with
    local key, numbytes, overlapped = self.IOEventQueue:dequeue(self.MessageQuanta);

    if key then
      self:processIOEvent(key, numbytes, overlapped);
    else
      -- typically timeout
      --print("Event Pool ERROR: ", numbytes);
    end
end

IOProcessor.stepFibers = function(self)
  -- Now check the regular fibers
  local fiber = self.fibers:Dequeue()

  -- Take care of spawning a fiber first
  if fiber then
    if fiber.status ~= "dead" then
      self.CurrentFiber = fiber;
      local result, values = fiber:Resume();
      if not result then
        print("RESUME RESULT: ", result, values)
      end
      self.CurrentFiber = nil;

      if fiber.status ~= "dead" and not self.FibersAwaitingEvent[fiber] then
        self:scheduleFiber(fiber)
      else
        --print("FIBER FINISHED")
        -- remove coroutine from dictionary
        self:removeFiber(fiber)
      end
    else
      self:removeFiber(fiber)
    end
  end
end

IOProcessor.step = function(self)
  self:stepFibers();
  self:stepIOEvents();
end

return IOProcessor

There are a couple of ways to approach this. From the perspective of the other event loop, the “step()” method here is executed once around the loop. The ‘step()’ method in turn checks on the fibers, and then on the ioevents. “stepFibers” checks the list of fibers that are ready to run, and runs one of them for a bit until it yields, and is thus placed back on the queue of fibers ready to be run, or it finishes. This is the part where a normal cooperative processing system could be brought to its knees, and a preemptive multi-tasking system would just keep going. The ‘stepIOEvents()’ function checks on the IOCompletionPort that is being used by sockets to indicate whether anything interesting has occured. If there has been any activity, the cooperative thread associated with the activity is scheduled to execute a bit of code. It does not execute immediately, but it is now on the list to be executed next time around.

The stepIOEvents() function is at the heart of any system, such as node.js, which gets high performance with IO processing, while maintaining a low CPU load. Most of the time you’re just waiting, doing nothing, and once the system indicates there is action on the socket, you can spring into action. Thus, you do not spend any time looping over sockets polling to see if there’s any activity, you’re just notified when there is.

The rest of the code is largely helpers, like creating a socket that is wired correctly and whatnot.

So, at the end of it, what does this system do?

Well, assuming I want to write a DateTimeClient, which talks to a service, gets the date and time, prints it out, etc, I would write this:

local ffi = require "ffi"
require("IOProcessor");

local daytimeport = 13

GetDateAndTime = function(hostname, port)
    hostname = hostname or "localhost";
    port = port or daytimeport;

    local socket, err = IOProcessor:createClientSocket(hostname, port);

    if not socket then
        print("Socket Creation Failed: ", err);
        return nil, err;
    end

    local bufflen = 256;
    local buff = ffi.new("char [?]", bufflen);

    local n, err = socket:receive(buff, bufflen)
 
    if not n then
        return false, err;
    end

    if n > 0 then
        return ffi.string(buff, n);
    end
end

Well, that looks like normal sequential code to me. And yes, it is. Nothing unusual. But, when running in the context of a computicle, like the following, it gets more interesting.

local Computicle = require("Computicle");

local codeTemplate = [[
require("DaytimeClient");
local dtc, err = GetDateAndTime("localhost");
print(dtc);
]]

local comp1 = Computicle:create(codeTemplate);
local comp2 = Computicle:create(codeTemplate);

comp1:waitForFinish();
comp2:waitForFinish();

This will take the otherwise sequential code of the DateTimeClient, and execute it in two parallel preemptive Operating System level threads, and wait for their completion. The magic is all hidden behind the schedulers and event loops. I never have to know about how that all happens, but I can appreciate the benefits.

Marrying the concepts of event driven, muti-process, cooperative user space threads, preemptive multi-tasking, and the like can be a daunting task. But, with a little bit of script, some chewing gum, a whistle and a prayer, it all comes together in a seamless whole which is quite easy to use, and fairly simple to understand. I think I have achieved my objectives for ease of use, maximizing efficiency, and reducing head explosions.


Computicles – Simplifying Chaining

Using the “exec()” method, I can chain computicles together, but the code is kind of raw. There is one addition I left out of the equation last time around, and that the hydrate/rehydrate of a computicle variable itself. Now it looks like this:

elseif dtype == "table" then
  if getmetatable(data) == Computicle_mt then
    -- package up a computicle
    datastr = string.format("Computicle:init(TINNThread:StringToPointer(%s),TINNThread:StringToPointer(%s));", 
      TINNThread:PointerToString(data.Heap:getNativeHandle()), 
      TINNThread:PointerToString(data.IOCP:getNativeHandle()));
  elseif getmetatable(data) == getmetatable(self.IOCP) then
    -- The data is an iocompletion port, so handle it specially
    datastr = string.format("IOCompletionPort:init(TINNThread:StringToPointer(%s))",
      TINNThread:PointerToString(data:getNativeHandle()));
  else
    -- get a json string representation of the table
    datastr = string.format("[[ %s ]]",JSON.encode(data, {indent=true}));
  end

This is just showing the types that are descended from the ‘table’ type. This includes IOCompletionPort, and ‘Computicle’. Anything other than these two will be serialized as fairly straight forward key/value pair tables. With this in place, I can now do the following.

-- Setup the splitter that will dispatch to the leaf nodes
local splitter = Computicle:load("comp_splittercode");

-- Setup the source, which will be originating messages
local source = Computicle:load("comp_sourcecode");
source.sink = splitter;

The code for the comp_sourcecode computicle looks like this:


RunOnce = function(sink)
  print("comp_sourcecode, RunOnce...")
  for i = 1, 10 do
    sink:postMessage(i);
  end
end

OnIdle = function(counter)
  print("comp_sourcecode, OnIdle(): ", counter)
  if sink ~= nil then 
    RunOnce(sink);

    sink = nil;
    exit();
  end
end

This computicle implements the OnIdle() function, as it will use that to determine when I can send messages off to the sink. If the sink hasn’t been set yet, then it will do nothing. If it has been set, then it will execute the ‘RunOnce()’ function, sending the sink a bunch of messages.

After performing the RunOnce() task, the computicle will simply exit(), which == SELFICLE:quit() == SELFICLE:postMessage(Computicle.Messages.QUIT);

And that’s the end of that computicle. The splitter has become much simpler as well.

local ffi = require("ffi");


OnMessage = function(msg)
	msg = ffi.cast("ComputicleMsg *", msg);
	local Message = msg.Message;
--print("comp_splittercode, OnMessage(): ", Message);

	if sink1 then
		sink1:postMessage(Message);
	end	

	if sink2 then
		sink2:postMessage(Message);
	end
end

Here, the splitter assumes there are two sinks. If either of them exists, they will receive the message that was passed into the splitter. If they don’t exist, then the message will be dropped. Of course, other behaviors, such as holding on the messages, could be implemented as well.

In this case, the computicle is not exited internally. This is because the splitter itself does not know when it is finished, all it knows is that when it receives a message, it is supposed to pass that message on to its two sinks.

The sink code is equally simple.

local ffi = require("ffi");

OnMessage = function(msg)
  msg = ffi.cast("ComputicleMsg *", msg);
  local Message = msg.Message;

  print(msg.Message*10);
end

Here again, just implement the ‘OnMessage()’ function, and the comp_msgpump code will call it automatically. And again, the sink does not know when it is done, so it does not explicitly exit.

Pulling it all together, the exiting logic becomes more clear:

local Computicle = require("Computicle");

-- Setup the leaf nodes to receive messages
local sink1 = Computicle:load("comp_sinkcode");
local sink2 = Computicle:load("comp_sinkcode");


-- Setup the splitter that will dispatch to the leaf nodes
local splitter = Computicle:load("comp_splittercode");

splitter.sink1 = sink1;
splitter.sink2 = sink2;


-- Setup the source, which will be originating messages
local source = Computicle:load("comp_sourcecode");
source.sink = splitter;


-- Close everything down from the outside
print("FINISH source: ", source:waitForFinish());

-- tell the splitter to quit
splitter:quit();
print("FINISH splitter:", splitter:waitForFinish());

-- the two sinks will receive the quit message from the splitter
-- so, just wait for them to quit.
print("Finish Sink 1: ", sink1:waitForFinish());
print("Finish Sink 2: ", sink2:waitForFinish());

There is an order in which computicles are created, and assigned, which stitches them together. The computicles could actually be constructed in a “suspended state”, but that would not help matters too much.

At the end, the quit() sequence can clearly be seen. First, wait for the source to be finished. Then tell the splitter to quit(). This is not an interrupt, so whatever was in its queue previously will flow through before the QUIT is processed. Then finally, do the same thing on the sinks, after the splitter has finished.

All messages flow, and all processing finishes cleanly. So, this is one way to perform the exit mechanism from the outside, as well as from the inside.

Adding this bit of ease in programming did not change the fundamentals of the computicle. It still has a single communciation mechanism (the queue). It still performs computation, it still communicates with the outside world. The new code saves the error prone process of type casting and hydrating objects that the system already knows about. This in turn makes the usage pattern that much easier to deal with.

The comp_msgpump was added in a fairly straight forward way. When a computicle is constructed, there is a bit of code (a Prolog) that is placed before the user’s code, and a bit of code (Epilog) placed after it. Those two bits of code look like this:

Computicle = {
	Prolog = [[
TINNThread = require("TINNThread");
Computicle = require("Computicle");
IOCompletionPort = require("IOCompletionPort");
Heap = require("Heap");

exit = function()
    SELFICLE:quit();
end
]];

Epilog = [[
require("comp_msgpump");
]];

Computicle.createThreadChunk = function(self, codechunk, params, codeparams)
	local res = {};

	-- What we want to load before any other code is executed
	-- This is typically some 'require' statements.
	table.insert(res, Computicle.Prolog);


	-- Package up the parameters that may want to be passed in
	local paramname = "_params";

	table.insert(res, string.format("%s = {};", paramname));
	table.insert(res, self:packParams(params, paramname));
	table.insert(res, self:packParams(codeparams, paramname));

	-- Create the Computicle instance that is running within 
	-- the Computicle
	table.insert(res, 
		string.format("SELFICLE = Computicle:init(%s.HeapHandle, %s.IOCPHandle);", paramname, paramname));


	-- Stuff in the user's code
	table.insert(res, codechunk);

	
	-- What we want to execute after the user's code is loaded
	-- By default, this will be a message pump
	table.insert(res, Computicle.Epilog);

	return table.concat(res, '\n');
end

If an application needs a different prolog or epilog, it’s fairly easy to change them either by changing the file that is being read in, or by changing the string itself: Computicle.Prolog = [[print(“Hello, World”)]]

And so it goes. I’ve never had it so easy creating multi-threaded bits of computation and stitching those bits together.


Computicles – Unsafe, but fun, code injection

Computicles are capable of receiving bits of code to execute. That is in fact one way in which they communicate with each. From .
one context I can simple do: comp.someValue = 100, and that will set “someValue = 100” in another context. Well, great. How about sending something across that’s more substantial than a simple variable setting?

Well, with the Computicle.exec() method, which the previous example is based on, you can really send across any bit of code to be executed. It works, and you can even send across the body of a function like this:

comp:exec[[
function hello()
  print("Hello, World!");
end
]]

That works fine, and you have full access to all stuff that’s running in that context. It’s a bit clunky though because the entirety of your piece of code is wrapped up in a string value. This is very similar to executing SQL code on some server. The bulk of your code, if not in stored procedures”, ends up in these opaque strings. Hard to catch errors, hard to debug, etc. How about the following:

comp.hello = function()
  print("Hello, World!");
end

What that you say? Basically, assigning a function as a variable to the computicle. Yes, this can work. There is a slight modification to turn a function into a string, and it looks like this:

Computicle.datumToString = function(self, data, name)
	local dtype = type(data);
	local datastr = tostring(nil);

--print("DATUM TYPE: ", name, dtype);

	if type(data) == "cdata" then
		-- If it is a cdata type that easily converts to 
		-- a number, then convert to a number and assign to string
		if tonumber(data) then
			datastr = tostring(tonumber(data));
		else
			-- if not easily converted to number, then just assign the pointer
			datastr = string.format("TINNThread:StringToPointer(%s);", 
				TINNThread:PointerToString(data));
		end
	elseif dtype == "table" then
		if getmetatable(data) == Computicle_mt then
			-- package up a computicle
		else
			-- get a json string representation of the table
			datastr = string.format("[[ %s ]]",JSON.encode(data, {indent=true}));

			--print("=== JSON ===");
			--print(datastr)
		end
	elseif dtype == "function" then
		datastr = "loadstring([["..string.dump(data).."]])";
	elseif dtype == "string" then
		datastr = string.format("[[%s]]", data);
	else 
		datastr = tostring(data);
	end

	return datastr;
end

Notice the part that starts: ‘elseif dtype == “function()” then’.

Basically, there is a string function that turns anything into bytecode which can later be hydrated using loadstring later. It is placed in ‘loadstring([[…]])’, because this is the only way to preserve the actual 8-bit values. If you use string.format(“%s”), it will chop out the non-ascii stuff.

Then, this code can be executed in the remote context just the same as any other little bit of code. There are some restrictions to what kinds of functions you can do this with though. No ‘upvalues’, meaning, everything must be contained within the function itself, no calling out to global variables and the like.

Of course, that just gets the code over to the other side in a convenient way. What about actually executing the code?

comp:exec([[hello()]]);

Same as ever, just make the function call using “exec()”. Of course, this little bit could be wrapped up in some other convenient override, like using the ‘:’ notation, but there’s some more work to be done before making that a reality.

What other functions can be dealt with? Well, the loop running within the computicle looks like the following now:

-- comp_msgpump.lua

local ffi = require("ffi");

-- This is a basic message pump
-- 

-- default to 15 millisecond timeout
gIdleTimeout = gIdleTimeout or 15


local idlecount = 0;

while true do
  local msg, err = SELFICLE:getMessage(gIdleTimeout);
  -- false, WAIT_TIMEOUT == timed out
  --print("MSG: ", msg, err);

  if not msg then
    if err == WAIT_TIMEOUT then
      --print("about to idle")
      idlecount = idlecount + 1;
      if OnIdle then
        OnIdle(idlecount);
      end
    end
  else
  	local msgFullyHandled = false;

    if OnMessage then
      msgFullyHandled = OnMessage(msg);
    end

    if not msgFullyHandled then
      msg = ffi.cast("ComputicleMsg *", msg);
      local Message = msg.Message;
      --print("Message: ", Message, msg.Param1, msg.Param2);
		
      if Message == Computicle.Messages.QUIT then
        break;
      end

      if Message == Computicle.Messages.CODE then
        local len = msg.Param2;
        local codePtr = ffi.cast("const char *", msg.Param1);
		
        if codePtr ~= nil and len > 0 then
          local code = ffi.string(codePtr, len);

          SELFICLE:freeData(ffi.cast("void *",codePtr));

          local func = loadstring(code);
          func();
        end
      end
      SELFICLE:freeMessage(msg);
    end
  end
end

This new message pump has a couple of interesting new features. The first feature has to do with the getMessage(). It will timeout after some number of milliseconds have passed and a message hasn’t come in. If there is an ‘OnIdle()’ function defined, it will be called, passing that function the current count. If that function does not exist, then nothing will happen.

The second change has to do with message processing. If there is a “OnMessage()” function defined, it will be called, and the message will be handed to it for processing. If that function does not exist, then it will perform some default actions, such as reading and code, and possibly handling a “QUIT”.

So, how about that OnIdle? that’s a function that takes a parameter. Can I inject that? Sure can:

comp.OnIdle = function(count)
  print("IDLE", count)
end

Just the the hello() case, except this one can take a parameter. This does not violate the “no upvalues” rule, so it’s just fine.

In this case, I don’t have to actually execute the code, because the loop within the computicle will pick up on it and execute it. And if you wanted to remove this idling funcition, simple do: ‘comp.OnIdle = nil’

And then what?

Well, that’s pretty cool. Now I can easily set simple variables, and I can set table values, and I can even set functions to be called. I have a generic message pumping loop, which has embedded “OnIdle”, and “OnMessage” functions.

The last little bit of work to be done here is to deal with Asynchronous IO in a relatively seamless way using continuations, just like I had in the pre-computicle world. That’s basically a marriage of the core of the EventScheduler and the main even loop here.

Throw some authentication on top of it all and suddenly you have a system that is very cool, secure, and useful.