Multitasking single threaded UI – Gaming meets networking

There are two worlds that I want to collide. There is the active UI gaming world, then there’s the more passive networking server world. The difference between these two worlds is the kind of control loop and scheduler that they both typically use.

The scheduler for the networking server is roughly:

while true do
  waitFor(networkingEvent)
  
  doNetworkingStuff()
end

This is great, and works quite well to limit the amount of resources required to run the server because most of the time it’s sitting around idle. This allows you to serve up web pages with a much smaller machine than if you were running flat out, with say a “polling” api.

The game loop is a bit different. On Windows it looks something like this:

while true do
  ffi.fill(msg, ffi.sizeof("MSG"))
  local peeked = User32.PeekMessageA(msg, nil, 0, 0, User32.PM_REMOVE);
			
  if peeked ~= 0 then
    -- do regular Windows message processing
    local res = User32.TranslateMessage(msg)		
    User32.DispatchMessageA(msg)
  end

  doOtherStuffLikeRunningGamePhysics();
end

So, when I want to run a networking game, where I take some input from the internet, and incorporate that into the gameplay, I’ve got a basic problem. Who’s on top? Which loop is going to be THE loop?

The answer lies in the fact that the TINN scheduler is a basic infinite loop, and you can modify what occurs within that loop. Last time around, I showed how the waitFor() function can be used to insert a ‘predicate’ into the scheduler’s primary loop. So, perhaps I can recast the gaming loop as a predicate and insert it into the scheduler?

I have a ‘GameWindow’ class that takes care of creating a window, showing it on the screen and dealing with drawing. This window has a run() function which has the typical gaming infinite loop. I have modified this code to recast things in such a way that I can use the waitFor() as the primary looping mechanism. The modification make it look like this:

local appFinish = function(win)

  win.IsRunning = true
  local msg = ffi.new("MSG")

  local closure = function()
    ffi.fill(msg, ffi.sizeof("MSG"))
    local peeked = User32.PeekMessageA(msg, nil, 0, 0, User32.PM_REMOVE);
			
    if peeked ~= 0 then
      local res = User32.TranslateMessage(msg)
      User32.DispatchMessageA(msg)

      if msg.message == User32.WM_QUIT then
        return win:OnQuit()
      end
    end

    if not win.IsRunning then
      return true;
    end
  end

  return closure;
end

local runWindow = function(self)
	
  self:show()
  self:update()

  -- Start the FrameTimer
  local period = 1000/self.FrameRate;
  self.FrameTimer = Timer({Delay=period, Period=period, OnTime =self:handleFrameTick()})

  -- wait here until the application window is closed
  waitFor(appFinish(self))

  -- cancel the frame timer
  self.FrameTimer:cancel();
end

GameWindow.run = function(self)
  -- spawn the fiber that will wait
  -- for messages to finish
  Task:spawn(runWindow, self);

  -- set quanta to 0 so we don't waste time
  -- in i/o processing if there's nothing there
  Task:setMessageQuanta(0);
	
  Task:start()
end

Starting from the run() function. Only three things need to occur. First, spawn ‘runWindow’ in its own fiber. I want this routine to run in a fiber so that it is cooperative with other parts of the system that might be running.

Second, call ‘setMessageQuanta(0)’. This is a critical piece to get a ‘gaming loop’. This quanta is the amount of time the IO processing part of the scheduler will spend waiting for an IO event to occur. This time will be spent every time through the primary scheduler’s loop. If the value is set to 0, then effectively we have a nice runaway infinite loop for the scheduler. IO events will still be processed, but we won’t spend any time waiting for them to occur.

This has the effect of providing maximum CPU timeslice to various other waiting fibers. If this value is anything other than 0, let’s say ‘5’ for example, then the inner loop of the scheduler will slow to a crawl, providing no better than 60 iterations of the loop per second. Not enough time slice for a game. Setting it to 0 allows more like 3000 iteractions of the loop per second, which gives more time to other fibers.

That’s the trick of this integration right there. Just set the messageQuanta to 0, and away you go. To finish this out, take a look at the runWindow() function. Here just a couple of things are set in place. First, a timer is created. This timer will ultimately end up calling a ‘tick()’ function that the user can specify.

The other thing of note is the use of the waitFor(appCheck(self)). This fiber will block here until the “appCheck()” predicate returns true.

So, finally, the appFinish() predicate, what does that do?

Well, I’ll be darned if it isn’t the essence of the typical game window loop, at least the Windows message handling part of it. Remember that a predicate that is injected to the scheduler using “waitFor()” is executed every time through the scheduler’s loop, so the scheduler’s loop is essentially the same as the outer loop of a typical game.

With all this in place, you can finally do the following:


local GameWindow = require "GameWindow"
local StopWatch = require "StopWatch"

local sw = StopWatch();

-- The routine that gets called for any
-- mouse activity messages
function mouseinteraction(msg, wparam, lparam)
	print(string.format("Mouse: 0x%x", msg))
end

function keyboardinteraction(msg, wparam, lparam)
	print(string.format("Keyboard: 0x%x", msg))
end


function randomColor()
		local r = math.random(0,255)
		local g = math.random(0,255)
		local b = math.random(0,255)
		local color = RGB(r,g,b)

	return color
end

function randomline(win)
	local x1 = math.random() * win.Width
	local y1 = 40 + (math.random() * (win.Height - 40))
	local x2 = math.random() * win.Width
	local y2 = 40 + (math.random() * (win.Height - 40))

	local color = randomColor()
	local ctxt = win.GDIContext;

	ctxt:SetDCPenColor(color)

	ctxt:MoveTo(x1, y1)
	ctxt:LineTo(x2, y2)
end

function randomrect(win)
	local width = math.random(2,40)
	local height = math.random(2,40)
	local x = math.random(0,win.Width-1-width)
	local y = math.random(0, win.Height-1-height)
	local right = x + width
	local bottom = y + height
	local brushColor = randomColor()

	local ctxt = win.GDIContext;

	ctxt:SetDCBrushColor(brushColor)
	--ctxt:RoundRect(x, y, right, bottom, 0, 0)
	ctxt:Rectangle(x, y, right, bottom)
end


function ontick(win, tickCount)

	for i=1,30 do
		randomrect(win)
		randomline(win)
	end

	local stats = string.format("Seconds: %f  Frame: %d  FPS: %f", sw:Seconds(), tickCount, tickCount/sw:Seconds())
	win.GDIContext:Text(stats)
end



local appwin = GameWindow({
		Title = "Game Window",
		KeyboardInteractor = keyboardinteraction,
		MouseInteractor = mouseinteraction,
		FrameRate = 24,
		OnTickDelegate = ontick,
		Extent = {1024,768},
		})


appwin:run()

This will put a window on the screen, and draw some lines and rectangles at a frame rate of roughly 24 frames per second.

And thus, the two worlds have been melded together. I’m not doing any networking in this particular case, but adding it is no different than doing it for any other networking application. The same scheduler is still at play, and everything else will work as expected.

The cool thing about this is the integration of these two worlds does not require the introduction of multiple threads of execution. Everything is still within the same single threaded context that TINN normally runs with. If real threads are required, they can easily be added and communicated with using the computicles that are within TINN.


Alertable Predicates? What’s that?

TINN does async on I/O without much fuss.  It even allows you to suspend a ‘fiber’ by putting it to sleep.  You can easily execute a function on a timer, etc.  These are all good tools to have in the parallel programming toolbox.  There is one tool that has been missing for me though.  The fact that I can run things in parallel means that I need a mechanism to synchronize when some of those tasks complete.  I don’t want to introduce callbacks, because that leads down a whole different path.

I want to execute code that looks something like this:

local finalGoal = function()
  waitFor(goal1)
print("Goal 1 Attained.")
  waitFor(goal2)
print("Goal 2 Attained..")
  waitFor(goal3)
print("Goal 3 Attained...")
print("All Goals Attained!!")

  -- Once all the goals have been met
  -- stop the scheduler.
  stop();
end

The key here is the ‘waitFor()’ function call. You call this function with a ‘predicate’. A predicate is nothing more than a function that returns true or false. If it returns ‘false’ then the condition of the predicate has not been met, and the fiber will not continue. Once the predicate returns ‘true’, the fiber will continue from the point directly after it called ‘waitFor()’. So, to see this sample in its entirety:

-- test_waitFor.lua

local IOProcessor = require("IOProcessor")
local Timer = require("Timer")

local count = 0;


local goal1 = function()
  return count >= 5;
end

local goal2 = function()
  return count >= 10;
end

local goal3 = function()
  return count >= 15;
end

local finalGoal = function()
  waitFor(goal1)
print("Goal 1 Attained.")
  waitFor(goal2)
print("Goal 2 Attained..")
  waitFor(goal3)
print("Goal 3 Attained...")
print("All Goals Attained!!")

  -- Once all the goals have been met
  -- stop the scheduler.
  stop();
end

-- setup a timer to increment the count
-- every 500 milliseconds
local incrTimer = Timer({Period = 500; OnTime = function(timer) count = count+1 end})

spawn(finalGoal)	

run()

This is a contrived case, but it’s illustrative. Down at the bottom, there is a timer being setup with an anonymous function. The timer is set to go off every 500 milliseconds. Once it does, it will execute the associated code, and the count variable will be incremented by 1. That’s all the time needs to be concerned with.

There are three predicate functions setup, goal1, goal2, and goal3. Each one of these predicates checks the value of the count variable, and returns true or false depending on the criterial of the goal.

There is a separate function, ‘finalGoal’. This one is spawned separately, and it will wait for each of the goals to be met in turn. Which order it does these does not really matter in this case. Once all of the goals are met, the scheduler will be stopped and the program will end.

Those predicates can be anything really. They can make calls out to the internet, they can count, they can check the value of any variable, they can check any state of the running process. Whatever you want can be a predicate. Perhaps you want to stop accepting new connections on a web server once a memory limit has been reached. Whatever.

Having the finalGoal() as a separate fiber is useful because the whole fiber will essentially ‘block’ until the goals are all met. This allows you to have other parallel fibers running at the same time, and they will go along their merry way while finalGoal() is waiting for its conditions to be met.

The implementation of this predicate yielding is pretty straight forward. Within the scheduler, there are only two new functions:

IOProcessor.yieldUntilPredicate = function(self, predicate)
	if self.CurrentFiber~= nil then
		self.CurrentFiber.Predicate = predicate;
		table.insert(self.FibersAwaitingPredicate, self.CurrentFiber)

		return self:yield();
	end

	return false;
end

IOProcessor.stepPredicates = function(self)
	local nPredicates = #self.FibersAwaitingPredicate

	for i=1,nPredicates do
		local fiber = self.FibersAwaitingPredicate[1];
		if fiber.Predicate() then
			fiber.Predicate = nil;
			self:scheduleFiber(fiber);

			table.remove(self.FibersAwaitingPredicate, 1)
		end
	end
end

Then, waitFor() is just a shorthand for yieldUntilPredicate():

waitFor = function(predicate)
  return IOProcessor:yieldUntilPredicate(predicate)
end

The predicates are checked every time through the scheduler loop:

IOProcessor.step = function(self)
	self:stepTimeEvents();
	self:stepFibers();
	self:stepIOEvents();
	self:stepPredicates();
.
.
.

This is essentially the same as installing a very high priority fiber because all code for all predicates will run every time through the scheduler loop. The predicates do not need to be ‘scheduled’ like other tasks. A predicate should probably be a fairly short bit of code that runs quickly, and is not very resource intensive. They should be used sparingly.

Quite a few years ago, I was involved with the creation of LinQ in the .net frameworks. At that time, I had this nagging notion that if we could only add a ‘when predicate do’ construct to the CLR, that would be a fairly useful thing. I think using predicates for control flow is especially useful for parallel programming. I also think this construct is easier to use and implement than various other forms of barriers and whatnot.

So, there you have it. TINN supports async IO, timers, yielding on time, and yielding on predicates. Perhaps a predicate can be used to bridge the gap between the normal scheduler, and the event loop that is presented by the typical keyboard and mouse loop.

 


My Head In The Cloud – putting my code where my keyboard is

I have written a lot about the greatness of LuaJIT, coding for the internet, async programming, and the wonders of Windows. Now, I have finally reached a point where it’s time to put the code to the test.

I am running a service in Azure: http://nanotechstyles.cloudapp.net

This site is totally fueled by the work I have done with TINN. It is a static web page server with a couple of twists.

First of all, you can access the site through a pretty name:
http://www.nanotechstyles.com

If you just hit the site directly, you will get the static front page which has nothing more than an “about” link on it.

If you want to load up a threed model viewing thing, hit this:
http://www.nanotechstyles.com/threed.html

If you want to see what your browser is actually sending to the server, then hit this:
http://www.nanotechstyles.com/echo

I find the echo thing to be interesting, and I try hitting the site using different browsers to see what they produce.  This kind of feedback makes it relatively easy to do rapid turnarounds on the webpage content, challenging my assumptions and filling in the blanks.

The code for this web server is not very complex.  It’s the same standard ‘main’ that I’ve used in the past:

local resourceMap = require("ResourceMap");
local ResourceMapper = require("ResourceMapper");
local HttpServer = require("HttpServer")

local port = arg[1] or 8080

local Mapper = ResourceMapper(resourceMap);

local obj = {}

local OnRequest = function(param, request, response)
	local handler, err = Mapper:getHandler(request)


	-- recycle the socket, unless the handler explictly says
	-- it will do it, by returning 'true'
	if handler then
		if not handler(request, response) then
			param.Server:HandleRequestFinished(request);
		end
	else
		print("NO HANDLER: ", request.Url.path);
		-- send back content not found
		response:writeHead(404);
		response:writeEnd();

		-- recylce the request in case the socket
		-- is still open
		param.Server:HandleRequestFinished(request);
	end

end

obj.Server = HttpServer(port, OnRequest, obj);
obj.Server:run()

In this case, I’m dealing with the OnRequest() directly, rather than using the WebApp object.  I’m doing this because I want to do some more interactions at this level that the standard WebApp may not support.

Of course the ‘handlers’ are where all the fun is. I guess it makes sense to host the content of the site up on the site for all to see and poke fun at.

My little experiment here is to give my code real world exposure, with the intention of hardening it, and gaining practical experience on what a typical web server is likely to see out in the wild.

So, if you read this blog, go hit those links. Soon enough, perhaps I will be able to serve up my own blog using my own software. That’s got a certain circular reference to it.


Pinging Peers – Creating Network Meshes

I have the need to create a mesh of network nodes on occasion. The configuration of the mesh is typically a set of VMs running on some cloud service or another. The code running on these VMs may perform some basic task, probably similar across all the VMs. They do need to communicate information with each other, and to do that, they need to maintain a list of their peers and the latencies between them, and the like.

So, I started modeling this thing called the Boundary Gateway Protocol (BGP), which is one of the ancient low lying protocols of the internet. I model the protocol by first creating an object BGPPeer. The intention of this object is to act as both the host and client of the fundamental mesh communications. It looks like this:

-- BGProtocol.lua
--[[
	Implementation of Boundary Gateway Protocol (BGP)
--]]

local ffi = require("ffi");
local Timer = require("Timer")
local IOCPSocket = require("IOCPSocket");
local SocketUtils = require("SocketUtils")
local Network = require("Network")

--[[
	BGP Relies on the presence of a fully connected graph
	The Protocol object is initialized by reading a list of nodes
	and creating the right channels between each of the nodes.
--]]

--[[
print("ComputerNameNetBIOS: ", Network.getHostName(ffi.C.ComputerNameNetBIOS))
print("ComputerNameDnsHostname: ", Network.getHostName(ffi.C.ComputerNameDnsHostname))
print("ComputerNameDnsDomain: ", Network.getHostName(ffi.C.ComputerNameDnsDomain))
print("ComputerNameDnsFullyQualified: ", Network.getHostName(ffi.C.ComputerNameDnsFullyQualified))
print("ComputerNamePhysicalNetBIOS: ", Network.getHostName(ffi.C.ComputerNamePhysicalNetBIOS))
print("ComputerNamePhysicalDnsHostname: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsHostname))
print("ComputerNamePhysicalDnsDomain: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsDomain))
print("ComputerNamePhysicalDnsFullyQualified: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsFullyQualified))
--]]

local selfhost = Network.getHostName(ffi.C.ComputerNameDnsHostname):lower()
--print("Self Host: ",selfhost)

local BGPPeer = {
  HeartbeatInterval = 50 * 1000;	-- send out a hearbeat on this interval
  UdpPort = 1313;					-- port used to communicate UDP
  EliminateSelf = true;			-- don't consider self to be a peer
}
setmetatable(BGPPeer, {
  __call = function(self, ...)
    return self:create(...)
  end,
})

local BGPPeer_mt = {
  __index = BGPPeer,
}

BGPPeer.init = function(self, config)
  local err
  local obj = {
    Config = config,
    Peers = {},
  }

  -- if there is config information, then use
  -- it to setup sockets to the peer
  if config then
    -- create the client socket
    obj.Name = config.name;
    obj.UdpSender, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
    obj.UdpPeerAddress, err = SocketUtils.CreateSocketAddress(config.host, config.udpport or BGPPeer.UdpPort)


    obj.UdpPeerAddressLen = ffi.sizeof(obj.UdpPeerAddress);
  end

  setmetatable(obj, BGPPeer_mt)

  return obj;
end

BGPPeer.create = function(self, configfile)
  -- load the configuration file
  local fs, err = io.open(configfile, "rb")
	
  if not fs then return nil, "could not load file" end

  local configdata = fs:read("*all")
  fs:close();
	
  local func, err = loadstring(configdata)
  local config = func()

  -- create self, so peers can be added
  local obj = self:init()

  -- add the udp socket
  -- Setup the server socket
  obj.UdpListener, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
  if not obj.UdpListener then
    return nil, err;
  end

  local success, err = obj.UdpListener:bindToPort(BGPPeer.UdpPort);

  if not success then
    return nil, err;
  end

  -- for each of the peers within the config 
  -- create a new peer, and add it to the 
  -- list of peers for the first object
  for _,peerconfig in ipairs(config) do
		--print("PEER: ", peerconfig.name, peerconfig.host)
    if peerconfig.name:lower() ~= selfhost then
      obj:createPeer(peerconfig)
    end
  end

  -- Need to setup a listener for the UDP port

  -- add the heartbeat timer
  obj.HeartbeatTimer = Timer({
		Delay =  BGPPeer.HeartbeatInterval;
		Period = BGPPeer.HeartbeatInterval;
		OnTime = obj:onHeartbeat();})

  -- start a fiber which will run the UDP listener loop
  spawn(obj:handleUdpListen())

  return obj;
end

BGPPeer.onHeartbeat = function(self)
  local message = "HELLO FROM: "..Network:getHostName();
  local closure = function(timer)

  for peername, peer in pairs(self.Peers) do
    local bytessent, err = peer.UdpSender:sendTo(peer.UdpPeerAddress, peer.UdpPeerAddressLen, 
      message, #message);

    end
  end

  return closure
end

BGPPeer.addPeer = function(self, peer)
  self.Peers[peer.Name] = peer
end

BGPPeer.createPeer = function(self, peerconfig)
  self:addPeer(BGPPeer:init(peerconfig));
end

local count = 0;
BGPPeer.onReceiveHeartBeat = function(self, buff, bytesread)
  count = count + 1
  print("onReceiveHeartBeat: ", count, ffi.string(buff, bytesread));
end


BGPPeer.handleUdpListen = function(self)
  local bufflen = 1500;
  local buff = ffi.new("uint8_t[1500]");
  local from = sockaddr_in();
  local fromLen = ffi.sizeof(from);


  local closure = function()

    while true do
      local bytesread, err = self.UdpListener:receiveFrom(from, fromLen, buff, bufflen);

      if not bytesread then
        print("receiveFrom ERROR: ", err)
        return false, err;
      end

      self:onReceiveHeartBeat(buff, bytesread)
    end
  end

  return closure
end

return BGPPeer

There are a couple things of note here. One of the basic problems I need to solve is the fact that this is not a basic client/server app, but rather a peer to peer setup. As such, each peer both listens on a port, and sends data.

A typical network ‘server’ app may block on listening, and not perform any actions until it receives a request from a ‘client’. In my case, I don’t want to block, but I do want to initiate a ‘blocking’ loop in parallel with some non-blocking stuff.

In this BGPPeer, there is the handleUdpListen() function. This function is the heart of the Udp listening loop, or the “server” side of the peer communication. This function is run in parallel as it is invoked in the constructor of the object using this:

  spawn(obj:handleUdpListen())

In this particular case, the handleUdpListen() function contains a closure (another function) which is the function that is actually returned, and spawned in its own fiber. The function is pretty straight forward.

  local closure = function()

    while true do
      local bytesread, err = self.UdpListener:receiveFrom(from, fromLen, buff, bufflen);

      if not bytesread then
        print("receiveFrom ERROR: ", err)
        return false, err;
      end

      self:onReceiveHeartBeat(buff, bytesread)
    end
  end

Basically, just run a loop forever, waiting to receiveFrom some Udp packet. Since in TINN all IO calls are cooperative, while this receiveFrom is ‘blocked’, the current coroutine is actually yielding, so that other work can continue. That’s great, that’s exactly what I want. So, now the listening side is setup.

How about the sending side? I want to send out a formatted packet on a regular interval. Since TINN has a Timer object, this is a pretty straight forward matter. Within the constructor of the BGPPeer, we find the following:

  -- add the heartbeat timer
  obj.HeartbeatTimer = Timer({
		Delay =  BGPPeer.HeartbeatInterval;
		Period = BGPPeer.HeartbeatInterval;
		OnTime = obj:onHeartbeat();})

With this setup, the timer will execute the ‘onHeartbeat()’ function every “Period”. This is some number of milliseconds. The default is 50 seconds, but it can be configured to any value.

As we saw from the last article on Timers, this timer object will start automatically, and run cooperatively with anything else running in the process. That’s great. So, onHeartbeat just needs to send out information to all its peers whenever it is called.

BGPPeer.onHeartbeat = function(self)
  local message = "HELLO FROM: "..Network:getHostName();
  local closure = function(timer)

  for peername, peer in pairs(self.Peers) do
    local bytessent, err = peer.UdpSender:sendTo(peer.UdpPeerAddress, peer.UdpPeerAddressLen, 
      message, #message);

    end
  end

  return closure
end

This becomes a simple matter of traversing the list of peers, and using the Udp socket that was setup with each one of them to send out the packet we want to send out.

What’s that altogether then? We saw the spawn that sent the Udp listener off in a cooperative manner. And just now I showed the timer going in parallel. The two combined give you a model where you can both listen and send ‘at the same time’.

It looks so easy, and that’s the power of TINN, the scheduler, and Lua’s coroutines. It’s fairly easy to write such code without having to worry overly much about old school things like mutexes, thread context switching, and the like. Just write your code however you want, sprinkle in a few ‘spawn()’ functions here and there, and call it a day. This is how easy it is to write network programs that can walk and chew gum at the same time.

Next time around, I want to write the simple handler that can deal with Win32 message looping and peer networking at the same time. Same general mechanisms, just a cooperative User32 message pump to incorporate.


How About that Web Server Again?

Has it really been more than a month?

Well, you see, I bought a house, packed, moved, took daughter to college, and…

The last time I wrote about making a simple WebServer, I left the job half finished. I showed that through layering, from a basic socket server, you could then build a simple http handler, which had not much more than an ‘OnRequest’ function.

That works great, and gives you the right level of access if you’re going to do things with http in general, that are beyond simple static file serving. But what if your interactions are more complex than you care to deal with using simple ‘if-then-else’ logic.

This time around, I’m going to use a new and improved form of the WebApp which first of all deals with the new and improved IOCP based sockets, and leverages the HttpServer object of yore.

Here is the basic web server:

-- main.lua
local WebApp = require("WebApp")
local resourceMap = require("ResourceMap");

local port = arg[1] or 8080

local app = WebApp(resourceMap, port);
app:run();

And you simply invoke it with:

tinn main.lua 8080

Well, that seems easy enough. If you wanted to get real crazy, and win an obfuscated code competition, you could do it in one line:

-- main.lua
require("WebApp")(require("ResourceMap"), arg[1] or 8080):run()

Or, if you’re a real Lua head, you could do it all on the command line, without even using the ‘main.lua’ file:

tinn -e "require('WebApp')(require('ResourceMap'), arg[1] or 8080):run()"

That’s all fun and games, and it’s cool to see that you can implement a web server in a single command line. But what’s in that magical ResourceMap file?

local URL = require("url");
local StaticService = require("StaticService");

local function replacedotdot(s)
  return string.gsub(s, "%.%.", '%.')
end

local HandleFileGET = function(request, response)
  local absolutePath = replacedotdot(URL.unescape(request.Url.path));
  local filename = './wwwroot'..absolutePath;

  StaticService.SendFile(filename, response)
  return false;
end

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
  };
}

return ResourceMap;

The idea here is that we want to route any ‘GET’ methods to the HandleFileGET() function. There is a ResourceMapper object within tinn that utilizes a structurce such as the one in ResourceMap. The general layout is {[pattern] = {name=””, METHOD = function [,METHOD = function]},}

Using this simple mechanism, you can easily route the handling of a particular resource request to a particular function.

This table can have entries that are much more interesting. For example, if you want to handle the retrieval of ‘favicon.ico’ differently than other files, you can just add a specific mapping for that.

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

You could have multiple methods per resource as well:

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
    POST = HandleFilePOST,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

In general, the longest prefix match algorithm is applied to whatever is supplied within the resource field of the request. If you want to deal with all methods of a particular resource, without having to specify them explicitly, then you can use the magic method ‘_DEFAULT’.

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
    POST = HandleFilePOST,
    _DEFAULT = HandleFileDEFAULT,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

In this way, if there is a request for a resource, and a method that we don’t know about at the time of creating the map, the HandleFileDEFAULT() function will be called to deal with it. That might be handy in the case where you’d like to handle these unknown method requests in a generic way, or you might want to change it over time without having to change your mapping.

Another case is when the resource isn’t actually a resource. Take for example a ‘CONNECT’ request:

CONNECT localhost:9000

In this case, the ‘resource’ does not start with a ‘/’, so the longest prefix match won’t land it on anything in our map. I need to deal with these with a different pattern. Well, the pattern part of the map is nothing more than a standard pattern in the Lua sense of the word, so a ‘.’ will l match any character. The following map will do the trick.

local ResourceMap = {
  ["."] = {name=".",
    CONNECT = HandleCONNECT,
  };

  ["/"] = {name="/",
    GET = HandleFileGET,
    POST = HandleFilePOST,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

In this way, we’ll deal with the CONNECT method if it shows up.

This is an affirmative list. If there is a match to one of the patterns, then the mapped function is executed. If no pattern is found, either because the resource itself does not match, or because the resource does not have a function to handle the specified method, then a general 404 error is returned.

That’s about all there is to it. Create a mapping between URLs and functions, and you’re all done. Of course there’s the function itself:

local URL = require("url");
local StaticService = require("StaticService");

local function replacedotdot(s)
  return string.gsub(s, "%.%.", '%.')
end

local HandleFileGET = function(request, response)
  local absolutePath = replacedotdot(URL.unescape(request.Url.path));
  local filename = './wwwroot'..absolutePath;
	
  StaticService.SendFile(filename, response)

  return false;
end

Not a lot of magic in this particular one. First of all, there’s that simplistic ‘replacedotdot()’ function. That’s just a casual attempt to restrict the file access to the directory of our choosing. In HandleFileGET, the first thing to do is urldecode the path specified, then feed that to the replacedotdot() function. Then, take whatever comes out of that, and prepend it with ‘./wwwroot’, and finally feed that to the StaticService.SendFile() function, which is a standard part of TINN.

The return value of this function has meaning. If you return false, or ‘nil’, then the socket representing this particular request will be recycled, assuming there is potentially another request coming on the same socket.

If you instead return ‘true’, then the system assumes you are handling any subsequent recycling, or closing, and it will not take any further action with the underlying socket.

This is a nice feature in that it allows you to construct much more interesting interactions than the standard request/response, without much fuss. For example, you could easily open up websockets, upgrade connections in general, or do whatever you want.

At any rate, with TINN, you can create a simple web server in a single command line. I find that to be a fairly useful thing.


Hurry Up and Wait – TINN Timing

Moving right along. First I needed to do basic networking. Starting at the lowest level of socket interaction, advancing up the stack through specific TCP and HTTP uses. Then back down to UDP.

With basic async networking covered, the next thing that comes up is timing. The general socket IO is covered. You can basically build an entire service using nothing more than asynchronous socket calls. But, most servers are more interesting than that. There are situations where you’ll want to cancel out an async operation if it’s taking too long, or you might want to perform some operation over time, repeatedly. So, clearly the TINN system needs some concept of time management.

Here’s the kind of code I would like to write in order to do something every 500 milliseconds:

require ("IOProcessor");

local test_wait = function(interval)
  while true do
    wait(interval);
    print(string.format("interval: %d", IOProcessor.Clock:Milliseconds()));
  end
end

run(test_wait)

Basically, there is a new ‘wait()’ function. You give it a number of milliseconds, and it will suspend the coroutine you’re currently in for the given amount of time. This capability comes courtesy of some changes to the base scheduler. The changes are the following:

wait = function(millis)
  local nextTime = IOProcessor.Clock:Milliseconds() + millis;
  return IOProcessor:yieldUntilTime(nextTime);
end

IOProcessor.yieldUntilTime = function(self, atime)
  if self.CurrentFiber ~= nil then
    self.CurrentFiber.DueTime = atime;
    tabutils.binsert(self.FibersAwaitingTime, self.CurrentFiber, compareTaskDueTime);

    return self:yield();
  end
  return false;
end

The yieldUntilTime() function will take the currently running fiber (coroutine) and put it into the list of FibersAwaitingTime. This is simply a table which is maintained in sorted order from lowest to highest. Once a fiber is placed on this list, it is no longer in the list of currently active fibers. it will sit on this list until it’s DueTime has passed.

The main scheduling loop will step through the fibers that are sitting in the AwaitingTime list using the following:

IOProcessor.stepTimeEvents = function(self)
  local currentTime = self.Clock:Milliseconds();

  -- traverse through the fibers that are waiting
  -- on time
  local nAwaiting = #self.FibersAwaitingTime;
  for i=1,nAwaiting do

    local fiber = self.FibersAwaitingTime[1];
    if fiber.DueTime <= currentTime then
      -- put it back into circulation
      fiber.DueTime = 0;
      self:scheduleFiber(fiber);

      -- Remove the fiber from the list of fibers that are
      -- waiting on time
      table.remove(self.FibersAwaitingTime, 1);
    end
  end
end

Basically, step through the list of fibers that are waiting for their wait time to expire. For all those that qualify, put them back into the list of active fibers by calling the ‘shcheduleFiber()’ function.

This begins to get very interesting I think. Of course once you create a timer, or even async i/o, you probably also want the ability to cancel such operations. But, that’s another matter.

Doing this little exercise, the scheduler begins to take on some more of the attributes of what you might find in the core OS. The big difference is that everything is done in the Lua space, and does not rely on OS primitives, so it is actually somewhat portable to other architectures. That’s a nice idea to have such a nice scheduler available across multiple LuaJIT environments.

While going through this exercise, I also started looking at other features of schedulers, thinking about priorities, and other factors that might go into a really good scheduler. So, at least one thing becomes apparent to me. Having this scheduler readily available and transformable in the Lua space makes it relatively easy to try out different scheduling techniques that will match the particular situation at hand. There is even the possibility of changing the scheduling algorithm dynamically based on attributes of the running system.

Exciting times.


How About that Web Server

Although echo servers are the “Hello, World” of network programs, http servers are much more useful.

local HttpServer = require("HttpServer");
local StaticService = require("StaticService");

-- a simple ping template
local pingtemplate = [[
<html>
  <head>
    <title>HTTP Server</title>
  </head>
  <body>ping</body>
</html>
]]

-- Called every time there is a new http request
local OnRequest = function(param, request, response)
  if request.Url.path == "/ping" then
    response:writeHead("200")
    response:writeEnd(pingtemplate);
  else
    local filename = './wwwroot'..request.Url.path;
    StaticService.SendFile(filename, response);
  end
end

local server = HttpServer(8080, OnRequest);
server:run();

This looks a little bit like the echo service which was based on the raw socket server. Instead of “OnAccept”, this one implements “OnRequest”. The ‘request’ is an instance of a ‘WebRequest’ object, which contains all the stuff you’d expect in a WebRequest (resource, headers…). The routine is also handed a ‘WebResponse’ object as well. This is a simple convenience because it just wraps the netstream that is associated with the request object.

The HttpServer code itself looks like this:

local SocketServer = require("SocketServer")

local IOCPSocket = require("IOCPSocket")
local IOCPNetStream = require("IOCPNetStream");
local WebRequest = require("WebRequest");
local WebResponse = require("WebResponse");
local URL = require("url");

HttpServer = {}
setmetatable(HttpServer, {
  __call = function(self, ...)
    return self:create(...);
  end,
});

HttpServer_mt = {
  __index = HttpServer;
}

HttpServer.init = function(self, port, onRequest, onRequestParam)
  local obj = {
    OnRequest = onRequest;
    OnRequestParam = onRequestParam;
  };
  setmetatable(obj, HttpServer_mt);
	
  obj.SocketServer = SocketServer(port, HttpServer.OnAccept, obj);

  return obj;
end

HttpServer.create = function(self, port, onRequest, onRequestParam)
  return self:init(port, onRequest, onRequestParam);
end


HttpServer.OnAccept = function(self, sock)
  local socket = IOCPSocket:init(sock, IOProcessor);
  local stream, err = IOCPNetStream:init(socket);

  if self.OnRequest then
    local request, err  = WebRequest:Parse(stream);

    if request then
      request.Url = URL.parse(request.Resource);
      local response = WebResponse:OpenResponse(request.DataStream)
      self.OnRequest(self.OnRequestParam, request, response);
    else
      print("HandleSingleRequest, Dump stream: ", err)
    end
  else
    -- do nothing and let the socket close
  end
end

HttpServer.run = function(self)
  return self.SocketServer:run();
end

return HttpServer;

The ‘OnAccept()’ function this time around takes the unadorned socket, wraps it into a nicer socket object (so the io completion stuff can happen), and then uses the HttpRequest object to parse what’s on the stream. If the request is found the be intact, a response object is created and the two are handed off to the ‘OnRequest’ function if it exists.

This construct allows you to compose a webserver to meet your needs. You can spawn wherever you want, to run whichever part you want to run in parallel. At the top end, the consumer of this object won’t know the different, and can thus just handle the individual requests.

So, what’s so good about all this?
Well, first of all the TINN runtime, all up, is about 3Mb.
What you get for that is access to pretty much all the interesting stuff that Windows APIs have to offer. Whether it be network, OS, graphics, crypto, or multi-thread related, it’s all available right there in the little package.

This is good when you want to start creating simple REST based web services for this and that. For example, if you want to expose a webcam feed from your PC, or your PC acts as a hub for various wireless “internet of things” devices around your home, or whatever, you just write some lua code, without worrying about interop libraries, compiling, or anything else more interesting. Just a little script and away you go.


Name That Framework – Echo Service in two lines of code

… and here they are:

SocketServer = require("SocketServer");
SocketServer(9090):run(function(s, b, l)  s:send(b, l); end);

Back in the day there was this gameshow called “Name That Tune”, where contestants would be told a clue about a song, then they would bid on the fewest number of notes it would take for them to name the tune. Once the bids were fixed, the orchestra would play the number of notes, and the contestant would have to correctly guess the name of the tune.

So, above are two lines of code which implement a highly scalable “echo” service. Can you name the framework?

It’s TINN of course!

Here’s a more reasonable rendition of the same:

local SocketServer = require("SocketServer");

local function OnData(socket, buff, bufflen)
  socket:send(buff, bufflen);
end;

local server = SocketServer(9090);
server:run(OnData)

Simply put, a SocketServer is a generic service that will listen on a particular port that you specify. Whenever it receives any data on the port, it will call the supplied ‘OnData’ function. Each time ‘OnData’ is called, it could be with a different socket and data. You could build a fairly rudimentary http server on top of this if you like. what’s most important to me is the fact that you don’t have to write any of the underlying low level networking code. Nothing about accept, IO Completion ports, etc. Just, call me when some data comes in on this specified port.

The SocketServer code itself looks like this:

local ffi = require("ffi");
local IOProcessor = require("IOProcessor");
local IOCPSocket = require("IOCPSocket");

IOProcessor:setMessageQuanta(nil);

SocketServer = {}
setmetatable(SocketServer, {
  __call = function(self, ...)
    return self:create(...);
  end,
});

SocketServer_mt = {
  __index = SocketServer;
}

SocketServer.init = function(self, socket, datafunc)
--print("SocketServer.init: ", socket, datafunc)
  local obj = {
    ServerSocket = socket;
    OnData = datafunc;
  };

  setmetatable(obj, SocketServer_mt);

  return obj;
end

SocketServer.create = function(self, port, datafunc)
  port = port or 9090;

  local socket, err = IOProcessor:createServerSocket({port = port, backlog = 15});

  if not socket then
    print("Server Socket not created!!")
    return nil, err
  end

  return self:init(socket, datafunc);
end

-- The primary application loop
SocketServer.loop = function(self)
  local bufflen = 1500;
  local buff = ffi.new("uint8_t[?]", bufflen);

  while true do
    local sock, err = self.ServerSocket:accept();

    if sock then
      local socket = IOCPSocket:init(sock, IOProcessor);
      local bytesread, err = socket:receive(buff, bufflen);

      if not bytesread then
        print("RECEIVE ERROR: ", err);
      elseif self.OnData ~= nil then
        self.OnData(socket, buff, bytesread);
      else
        socket:closeDown();
        socket = nil
      end
    else
       print("Accept ERROR: ", err);
    end

    collectgarbage();
  end
end

SocketServer.run = function(self, datafunc)
  if datafunc then
    self.OnData = datafunc;
  end

  IOProcessor:spawn(self.loop, self));
  IOProcessor:run();
end

return SocketServer;

This basic server loop is good for a lot of little tiny tasks where you just need to put a listener on the front of something. No massive scaleout, not multi-threading, just good straight forward stuff. But, it’s already plumbed to go big too.

Here’s a slight modification:

SocketServer.handleAccepted = function(self, sock)
  local handleNewSocket = function()
    local bufflen = 1500;
    local buff = ffi.new("uint8_t[?]", bufflen);
    
    local socket = IOCPSocket:init(sock, IOProcessor);

    if self.OnAccepted then
    else
      local bytesread, err = socket:receive(buff, bufflen);
  
      if not bytesread then
        print("RECEIVE ERROR: ", err);
      elseif self.OnData ~= nil then
        self.OnData(socket, buff, bytesread);
      else
        socket:closeDown();
        socket = nil
      end
    end
  end

  return IOProcessor:spawn(handleNewSocket);
end

-- The primary application loop
SocketServer.loop = function(self)

  while true do
    local sock, err = self.ServerSocket:accept();

    if sock then
      self:handleAccepted(sock);
    else
       print("Accept ERROR: ", err);
    end

    collectgarbage();
  end
end

In the main loop, instead of doing the processing directly, call the ‘self:handleAccepted()’ function. That function in turn will spawn an internal function to actually handle the request. Everything else remains the same.

If you do it this way, then the ‘OnData’ will run cooperatively with other accepts that might be going on. Also, this highlights, in an invisible way, that the ‘accept()’ call is actually cooperative. Meaning, since IO completion ports are being used int he background, the accept call is actually async. As soon as it issues the accept, that coroutine will wait in place until another socket comes in. Meanwhile, the last socket that was being handled will get some time slice to do what it wants.

And thus, you get massive scale (thousands of potential connections) from using this fairly simple code.

Well, those are the basics. Now that I have plumbed TINN from the ground up to utilize the IO Completion Ports, I can start to build upon that. There are a couple of nice benefits to marrying IOCP and Lua coroutines. I’ll be exploring this some more, but it’s basically a match made in heaven.


Function Forwarding – The Lazy Programmer Doesn’t Like Writing Glue Code

Back in the day, when NeXT computers roamed the earth, there was this language feature in Objective-C that I came to know and love. Objective-C has this feature called forwarding, or delegating, or whatever. The basic idea is, you call a function on an object, and if it knows the function (late bound goodness), it will execute the function. If it doesn’t, it will forward the function call to the objects ‘forward’ function, handing it all the parameters, and giving it a chance to do something, before throwing an error if nothing is done.

Well, roll forward 20 years (yah, it was that long ago), and here we are with the likes of Python, JavaScript, Ruby, and Lua inheriting the dynamic language mantle passed along from Simula and SmallTalk. So, recently, I was pining for that good ol’ feature of Objective-C, so I decided to implement it and see what it takes today.

Why? The scenario is simple. There are many instance where I want to write a ‘proxy’ of some sort or another. In one such scenario, I want to take the name of the function being called, along with the parameters for the function, wrap them up into a string, send them somewhere, and have them dealt with at that new place. At the other end, I can deal with actually applying the function and parameters.

So, beginning with this:


local baseclass = {
  func1 = function(foo, bar, bas)
    print(foo, bar, bas);
  end,

  func2 = funciton(baz, booz, bazzle)
    print("Function 2: ", baz, booz, dazzle);
  end
}

I want to be able to do the following:

local Messenger("machine.domain.com/baseclass");

Messenger:func1("this", "little", "Piggy");
Messenger:func2("Piggy", 500, false);

Tada! Done…

OK. So, let’s begin at the beginning. First of all, I want some sort of base Object/Class thing that will help make things easier. I’ll use the one from the ogle project, because it seems to be pretty decent and minimal.


-- Object.lua

local Object = {}
Object.__index = Object
setmetatable(Object, Object)

-- Variables
Object.__metamethods =  {
    "__add", "__sub", "__mul", "__div", 
    "__mod", "__pow", "__unm","__len", 
    "__lt", "__le", "__concat", "__tostring"
}

-- Metamethods
function Object:__call(...)
    return self:new(...)
end

function Object:__newindex(k, v)
    if k == "init" or getfenv(2)[k] == self then
        rawset(self, "__init", v)
    else
        rawset(self, k, v)
    end
end

-- Constructor
function Object:__init()
end

-- Private/Static methods
function Object:__metamethod(event)
    
    return function(...)
        if self:parent() == self then
            error(event .. " is unimplemented on object.", 2)
        end
        
        func = rawget(self, event) or rawget(self:parent(), event)
        if type(func) == "function" then return func(...) else return func end
    end
end

-- Methods
function Object:parent()
    return getmetatable(self)
end


function Object:new(...)
    local o = {}
    setmetatable(o, self);


    self.__index = function(self, key)
        -- See if the value is one from our metatable
        -- if it is, then return that.
        local mt = getmetatable(self); 
        local value = rawget(mt,key);
        if value then
            return value;
        end

        -- If the thing is not found, then return the 'forward' method
         local delegate = rawget(self, "__delegate") 
        if delegate ~= nil then
            return self:__delegate(key);
         end
    end


    self.__call = Object.__call
    self.__newindex = Object.__newindex
    
    for k,v in pairs(Object.__metamethods) do
        o[v] = self:__metamethod(v)
    end
    
    o:__init(...)
    return o
end

return Object;

Just a little bit of Object/behavior mixin mumbo jumbo. I’ve made a couple of mods from the original, the most interesting being the implementation of the ‘.__index’ function. The ‘__index’ function is called whenever you’ve tried to access something in the runtime that doesn’t immediately exist in the table. In the example code above, when I’m calling: ‘Messenger:func1()”, the runtime first does: something = Messenger.func1. If it finds something, and that something is a function, it evaluates the function, passing the specified parameters. If it doesn’t find something, then it will throw an error saying ‘something is nil’, and thus can not be called.

Alright, so if you look at that ‘__index’ function, the last case says; if there is a function called “__delegate” defined, then return that.

OK. So, at this point, if you create an instance of an object, and try to call a function that does not exist, it will fail because there is no ‘__delegate’ function defined. In steps the ‘subclass’.


Messenger = Object();

function Messenger:__init(target)
  self.Target = target;
  
  self.__delegate =  function(self, name, ...)
    return function(self, ...)
      local jsonstr = JSON.encode({...});
      print(name, jsonstr);
    end
  end

  return self;
end

What’s going on here? Basically, the “Messenger” is a subclass of the “Object”. The ‘__init()’ function is called automatically as part of the object/type creation. Within the ‘__init()’, the ‘__delegate()’ function is setup. Isn’t that a weird function to be called? If you go back and look at the ‘__index()’ function, you’ll notice that it returns the result of the ‘__delegate’ method right away. In this case, the ‘Messenger.__delegate()’ function will return a ‘closure’, which is a fancy word for a function enclosed in a function. This happens with iterators as well, but that’s another story. The closure, in this case, is the true workhorse of the operation. In this particular case, all it does is turn the parameters into a json string and print it out, but you can probably imagine doing other things here, like sending the parameters, along with the function name, across a network interface to a receiver, who will actually execute the code in question.

And that’s that.

Another example:


local mssngr = Messenger();
mssngr:foo("Hello", 5);

mssngr:bar("what", 'would', 1, "I", {23,14,12}, "do?");

> foo	["Hello",5]
> bar	["what","would",1,"I",[23,14,12],"do?"]

This is a great outcome as far as I’m concerned. It means that I can easily use this Messenger class as a proxy for whatever purposes I have in mind. I can create different kinds of proxies for different situations, simply by replacing the guts of the ‘__delegate()’ function.

Besides being motivated by nostalgia for Objective-C paradigms of the past, I actually have a fairly practical and immediate application. I’ve noticed that Computicles are an almost perfect implementation of the Actor pattern, and the only piece missing was a separable “Messenger” actor. Now I have one, and a mechanism to use to communicate both locally with other computicles within the same process, as well as computicles across networks, without changing any core computicle code. This makes it easier to pass data, call functions, and the like.

At any rate, discovering a fairly straight forward implementation of this pattern has been great, and I’m sure I’ll get much tremendously good usage out of it. I wonder if it can be applied to making calls to C libraries without first having to specify their interfaces…


Computicles – Simplifying Chaining

Using the “exec()” method, I can chain computicles together, but the code is kind of raw. There is one addition I left out of the equation last time around, and that the hydrate/rehydrate of a computicle variable itself. Now it looks like this:

elseif dtype == "table" then
  if getmetatable(data) == Computicle_mt then
    -- package up a computicle
    datastr = string.format("Computicle:init(TINNThread:StringToPointer(%s),TINNThread:StringToPointer(%s));", 
      TINNThread:PointerToString(data.Heap:getNativeHandle()), 
      TINNThread:PointerToString(data.IOCP:getNativeHandle()));
  elseif getmetatable(data) == getmetatable(self.IOCP) then
    -- The data is an iocompletion port, so handle it specially
    datastr = string.format("IOCompletionPort:init(TINNThread:StringToPointer(%s))",
      TINNThread:PointerToString(data:getNativeHandle()));
  else
    -- get a json string representation of the table
    datastr = string.format("[[ %s ]]",JSON.encode(data, {indent=true}));
  end

This is just showing the types that are descended from the ‘table’ type. This includes IOCompletionPort, and ‘Computicle’. Anything other than these two will be serialized as fairly straight forward key/value pair tables. With this in place, I can now do the following.

-- Setup the splitter that will dispatch to the leaf nodes
local splitter = Computicle:load("comp_splittercode");

-- Setup the source, which will be originating messages
local source = Computicle:load("comp_sourcecode");
source.sink = splitter;

The code for the comp_sourcecode computicle looks like this:


RunOnce = function(sink)
  print("comp_sourcecode, RunOnce...")
  for i = 1, 10 do
    sink:postMessage(i);
  end
end

OnIdle = function(counter)
  print("comp_sourcecode, OnIdle(): ", counter)
  if sink ~= nil then 
    RunOnce(sink);

    sink = nil;
    exit();
  end
end

This computicle implements the OnIdle() function, as it will use that to determine when I can send messages off to the sink. If the sink hasn’t been set yet, then it will do nothing. If it has been set, then it will execute the ‘RunOnce()’ function, sending the sink a bunch of messages.

After performing the RunOnce() task, the computicle will simply exit(), which == SELFICLE:quit() == SELFICLE:postMessage(Computicle.Messages.QUIT);

And that’s the end of that computicle. The splitter has become much simpler as well.

local ffi = require("ffi");


OnMessage = function(msg)
	msg = ffi.cast("ComputicleMsg *", msg);
	local Message = msg.Message;
--print("comp_splittercode, OnMessage(): ", Message);

	if sink1 then
		sink1:postMessage(Message);
	end	

	if sink2 then
		sink2:postMessage(Message);
	end
end

Here, the splitter assumes there are two sinks. If either of them exists, they will receive the message that was passed into the splitter. If they don’t exist, then the message will be dropped. Of course, other behaviors, such as holding on the messages, could be implemented as well.

In this case, the computicle is not exited internally. This is because the splitter itself does not know when it is finished, all it knows is that when it receives a message, it is supposed to pass that message on to its two sinks.

The sink code is equally simple.

local ffi = require("ffi");

OnMessage = function(msg)
  msg = ffi.cast("ComputicleMsg *", msg);
  local Message = msg.Message;

  print(msg.Message*10);
end

Here again, just implement the ‘OnMessage()’ function, and the comp_msgpump code will call it automatically. And again, the sink does not know when it is done, so it does not explicitly exit.

Pulling it all together, the exiting logic becomes more clear:

local Computicle = require("Computicle");

-- Setup the leaf nodes to receive messages
local sink1 = Computicle:load("comp_sinkcode");
local sink2 = Computicle:load("comp_sinkcode");


-- Setup the splitter that will dispatch to the leaf nodes
local splitter = Computicle:load("comp_splittercode");

splitter.sink1 = sink1;
splitter.sink2 = sink2;


-- Setup the source, which will be originating messages
local source = Computicle:load("comp_sourcecode");
source.sink = splitter;


-- Close everything down from the outside
print("FINISH source: ", source:waitForFinish());

-- tell the splitter to quit
splitter:quit();
print("FINISH splitter:", splitter:waitForFinish());

-- the two sinks will receive the quit message from the splitter
-- so, just wait for them to quit.
print("Finish Sink 1: ", sink1:waitForFinish());
print("Finish Sink 2: ", sink2:waitForFinish());

There is an order in which computicles are created, and assigned, which stitches them together. The computicles could actually be constructed in a “suspended state”, but that would not help matters too much.

At the end, the quit() sequence can clearly be seen. First, wait for the source to be finished. Then tell the splitter to quit(). This is not an interrupt, so whatever was in its queue previously will flow through before the QUIT is processed. Then finally, do the same thing on the sinks, after the splitter has finished.

All messages flow, and all processing finishes cleanly. So, this is one way to perform the exit mechanism from the outside, as well as from the inside.

Adding this bit of ease in programming did not change the fundamentals of the computicle. It still has a single communciation mechanism (the queue). It still performs computation, it still communicates with the outside world. The new code saves the error prone process of type casting and hydrating objects that the system already knows about. This in turn makes the usage pattern that much easier to deal with.

The comp_msgpump was added in a fairly straight forward way. When a computicle is constructed, there is a bit of code (a Prolog) that is placed before the user’s code, and a bit of code (Epilog) placed after it. Those two bits of code look like this:

Computicle = {
	Prolog = [[
TINNThread = require("TINNThread");
Computicle = require("Computicle");
IOCompletionPort = require("IOCompletionPort");
Heap = require("Heap");

exit = function()
    SELFICLE:quit();
end
]];

Epilog = [[
require("comp_msgpump");
]];

Computicle.createThreadChunk = function(self, codechunk, params, codeparams)
	local res = {};

	-- What we want to load before any other code is executed
	-- This is typically some 'require' statements.
	table.insert(res, Computicle.Prolog);


	-- Package up the parameters that may want to be passed in
	local paramname = "_params";

	table.insert(res, string.format("%s = {};", paramname));
	table.insert(res, self:packParams(params, paramname));
	table.insert(res, self:packParams(codeparams, paramname));

	-- Create the Computicle instance that is running within 
	-- the Computicle
	table.insert(res, 
		string.format("SELFICLE = Computicle:init(%s.HeapHandle, %s.IOCPHandle);", paramname, paramname));


	-- Stuff in the user's code
	table.insert(res, codechunk);

	
	-- What we want to execute after the user's code is loaded
	-- By default, this will be a message pump
	table.insert(res, Computicle.Epilog);

	return table.concat(res, '\n');
end

If an application needs a different prolog or epilog, it’s fairly easy to change them either by changing the file that is being read in, or by changing the string itself: Computicle.Prolog = [[print("Hello, World")]]

And so it goes. I’ve never had it so easy creating multi-threaded bits of computation and stitching those bits together.


Follow

Get every new post delivered to your Inbox.

Join 45 other followers