What The Functor? – Making them useful

Last time, I introduced the Functor in a rather convoluted way. I said that particular implementation may or may not be as performant as a straight up closure. Well, as it turns out, that’s going to be true most of the time. Why?

You see, when you use a table as a functor, you are forcing a call to ‘getmetatable’ to do a lookup to see if the ‘__call’ metamethod is implemented or not. If it’s not implemented, an error is thrown, if it is implemented, then your operation is performed. Well, that means an extra function call and dereference into our table for every single function call. When this is on a hot path, it could be quite a bummer in terms of performance.

So, why bother with this particular convoluted construct for functor implementation? The beauty of the table technique is that since your functor is actually a table, which gives you they syntactic sugar of being called like a function, it has all the attributes of a table. For example, let’s imagine you want to associate some bit of information with your function, such as when it was created, or version information:

local f1 = Functor(somefunction)
f1.Creation = os.date()
f1.Version = "1.2"

That might be useful in a case where you want to refresh your functors if they’re getting to be too old, or you want to compare versions of functions to ensure they are what you expect. Very handy, yet very costly.

Although this is useful, it does impose this overhead, so is there another way to implement the functor without incurring so much overhead?

We of course! If you’re willing to lose the table capabilities of your functor, but you want to retain the fact that you can associate some amount of state with your functor, then a closure is the way to go:

local Functor = function(func, target)
  if not target then return func end

  return function(...)
    return func(target,...)
  end
end

Yah, that’s right. After I bad mouthed closures when I first introduced functors, here I am praising their value. And it’s true. Given the constraint of ditching associating information with the functor, beyond the passed in state, this closure implementation is a lot faster when it comes time to actually execute the code.

Not only is this code faster, but it’s a lot easier to understand, easier to maintain, etc.

And how would I use this in real life? Well, at the core of TINN is the scheduler, as represented by the IOProcessor class. The scheduler is at the heart of what makes TINN tick, and thus it must be the most compact, performant bit of code that it can possibly be. At the same time, I want it to be flexible, because ultimately I need to ability to change the very core scheduling algorithms and other core features, without having to totally reengineer the codebase to make changes.

So, I’ve been tinkering.

I’ve recently rewritten parts of the scheduler to utilize the functors, and iteratores. Here is the main loop as it stands today:

IOProcessor.start = function(self)
  self:addQuantaStep(Functor(self.stepIOEvents,self));
  self:addQuantaStep(Functor(self.stepTimeEvents,self));
  self:addQuantaStep(Functor(self.stepFibers,self));
  self:addQuantaStep(Functor(self.stepPredicates,self));


  self.ContinueRunning = true;

  when(Functor(self.noMoreTasks,self), Functor(self.stop, self))

  for astep in self:quantumSteps() do
    astep()
  end
end

IOProcessor.run = function(self, func, ...)
  if func ~= nil then
    self:spawn(func, ...);
  end

  self:start();
end

If you were writing a typical TINN based program where you wanted multi-tasking, you would be doing this:

local Task = require("IOProcessor")

local function main()
  -- after 5 seconds, stop the system
  delay(function(tick) Task:stop(), 5 * 1000)

  -- print something every half second
  periodic(function(tick) print("Hello Again: ",tick) end, 500)
end

Task:run(main)

The IOProcessor.start() routine is littered with Functors! What are they all doing? OK, first with the ‘addQuantaSteps’. The quanta steps are the runctions that are executed each time through the scheduler’s loop. You can see this in the iterator at the bottom:

  for astep in self:quantumSteps() do
    astep()
  end

The usual way you’d see this code written is as a while loop, or some other looping construct. In this case, I have used an iterator as the fundamental loop construct. That seems a bit novel, or at least slightly off the beaten path. Why bother with such a construct? Well, first of all, since it’s an iterator, the ‘step()’ function doesn’t actually know anything about the code that is being executed. It doesn’t know how long, and it doesn’t assume any side effects. This gives great flexibility because in order to change the behavior of the main loop only requires changing the behavior of the iterator that is being used. You could easily modify the scheduler to utilize your own custom iterator if you like. The only assumption is that the items coming out of the iterator are functions that can be called, and that’s it.

The ‘addQuantaStep()’ function calls are simply a way to prepopulate the list of functions with a known set of things I want to do each time through the scheduler’s loop. I made this a function call because that way any bit of code could do the same thing before running the scheduler. Here the Functor is being used to ensure we capture the state that goes along with the function.

The last place the Functor is being used here is in the ‘when()’ call:

  when(Functor(self.noMoreTasks,self), Functor(self.stop, self))

“when there are no more tasks, stop”

The functors are used because the functions in question live within the IOProcessor table, and are called with state information, so it needs to be associated with them. As an aside, why not just make these functions flat? Why are they even associated with a table anyway? Well, because, even though at the moment you can run only a single scheduler in your process, there’s no reason in the world why this MUST be the case. With another couple of changes, you’ll actually be able to run multiple schedulers at the same time within a single process. Yep, that might be a tad bit useful. Kind of like running multiple processes at the OS level.

At any rate, just for kicks, here is what the iterator of those functors looks like:

IOProcessor.addQuantaStep = function(self, astep)
  table.insert(self.QuantaSteps,astep)
end

IOProcessor.quantumSteps = function(self)
  local index = 0;
  local listSize = #self.QuantaSteps;

  local closure = function()
    if not self.ContinueRunning then
      return nil;
    end

    index = index + 1;
		
    local astep = self.QuantaSteps[index]

    if (index % listSize) == 0 then
      index = 0
    end

    return astep
  end

  return closure	
end

Basically, just keep going through the list of steps over and over again until something sets ‘ContinueRunning’ to false.

Having these steps represented as an iterator satisfies the needs that I have for running a fairly dynamically configurable main loop. While I was doing this, I was also considering what implications this might have for doing other things with the iterator. For example, is there something I can do with functional programming? The recent Lua Fun library is all about iterators, and I’m wondering if I can do something with that.

Functor and iterator in hand, I am going to look to streamline the scheduler even further. I am imagining the core of the scheduler is not more than 100 lines of code. The quantum steps, which is where all the real action is, could live separately, and be easily pluggable modules. That’s where this is headed, because I don’t know how to deal with all that complexity otherwise.

So, there you have it. Functors can be useful in practice, and not just a software engineering curiosity.


What the Functor! Again?

No, I’m not talking about George Clinton (probably related to Bill), but rather the function that can wrap functions.

What’s the point of this? In this case, I’ll start with the Functor class itself.

local functor = {}
setmetatable(functor, {
  __call = function(self, ...)
    return self:create(...);
  end,
})
local functor_mt = {
  __index = functor,
  __call = function(self, ...)
    if self.Target then
      return self.Func(self.Target, ...)
    end

    return self.Func(...)
  end,
}

functor.init = function(self, func, target)
  local obj = {
    Func = func;
    Target = target;
  }
  setmetatable(obj, functor_mt)

  return obj;
end

functor.create = function(self, func, target)
  return self:init(func, target)
end

Before explaining how this little thing works, here’s how it is used.

local function printSomething(words)
  print(words)
end

local f1 = Functor(printSomething)

f1("words to say")

OK. So, what’s the big deal? This is the most basic case, and there is no big deal. Basically the Function wraps the function you pass to it when you construct it: f1 = Functor(printSomething)

Then, later, when you use the function: f1(“words to say”), it’s as if you were calling the function directly. Why on earth would you ever want to do this?

Let’s imagine that you have a list of functions that you want to call from within the body of some other function.

listOfFuncs = {
  Functor(func1),
  Functor(func2), 
  Functor(func3)
}

local function every(funclist)
  for _, fun in ipairs(funclist) do
    fun()
  end
end

every(listOfFuncs)

For regular functions, there’s no real benefit here to using a functor, and there’s all this added overhead.

Now here’s another case. Instead of the functions just laying around without any associated state, they are methods of ‘objects’. The trick is to get the object instance associated with the function pointer, while maintaining the same relatively easy syntax.

local methods = {}
local methods_mt = {
	__index = methods,
}

methods.init = function(self)
	local obj = {
		name = "foo",
		name2 = "bar",
	}
	setmetatable(obj, methods_mt)
	
	return obj;
end

methods.method1 = function(self)
	print(self.name)
end

methods.method2 = function(self)
	print(self.name2)
end

local m1 = methods:init();

local f2 = Functor(methods.method1, m1)
local f3 = Functor(m1.method2, m1)

f2();
f3();

In this case, I need to associate the function pointer with an instance of the methods ‘object’, so when the Functor is constructed, the second parameter, which is the instance of the object, is also passed along.

Now, back to the Functor code, when it comes time to execute the function, it does it one way or another depending on whether it has a target or not:

  __call = function(self, ...)
    if self.Target then
      return self.Func(self.Target, ...)
    end

    return self.Func(...)
  end,

If it has a target, it will pass that as the first parameter when calling the function. If there is no specific target, then it will just call the function, passing along the supplied parameters.

So, this solves a very basic problem. Without the Functor construct, I would have to use a ‘closure’ for each function. A closure is essentially the same thing, but the language itself supports the concept of keeping information associated with a function. It might do this by placing information in a global state, or some other construct that might not be quite as succinct as this Functor. The Functor construct allows you to do essentially what you would be doing with a closure, but by using an object, you can more tightly control exactly what’s going on, what’s in the structure, lifetime, and all that.

To use a closure, or to use a functor? Closures are an easy and natural part of the Lua language. Functors are a construct that may or may not be efficient in comparison. The Functor does solve a particular problem when it comes to sticking a function in a list with its attendant state.


Parallel Computing is Child’s Play

Recently, my wife and I brought a new son into the world!  And that got me to thinking.

Some years back, I did some self study on linguistics and language development.  At the time, I was just doing some thinking on how best to communicate, how to write programs that are composable, just like spoken sentences.  One of things I learned from one of the books had to do with how babies learn language.  As one theory goes, they develop a core internal language first.  These are their early bits of associations and linkages.  Everything they learn after that is expressed using the core primitives they developed early on.

Armed with this knowledge, I went away and taught software engineering for a few years.  One of my core tenets was; software programming is nothing more than a specific conversation.  As software engineers, we are in the business of translating intentions from one domain to the next.  We talk to customers, who speak in their own language, and we have to eventually get that translated down into C, or assembly, or whatever.  The more closely the customer’s language matches the machine’s, the easier the translation, and less margin for error.  But, people don’t speak in machine code.

So what then?  Well, then comes the task of the various frameworks and language tools that we use as software engineers to translate from the high level human language to the very narrow and specific machine language.  At the lowest level, we might use C, or Javascript, or C#, or Lua.  Then on top of that there are innevitably various frameworks which help us express ideas in a more meaningful way, without having to work purely with the lower level primitives.  And so on and so forth.

What I’ve been exploring recently is how best to do parallel programming.  At the very core, my machine has a multi-core processor.  There is an OS, and I have a bunch of C based APIs to get various things done.  In the Lua language I have co-routines, which help in making tasks ‘parallel’, or at least cooperative.

Atop this core, I have built a scheduler in the form of the TINN environment, and thus far I can achieve quite a lot of apparent parallelism with the core primitives such as timer, waitFor, sleep, and the like.  Now I want to add some more higher level primitives.

There are two cases I want to cover.  The first is ‘when’.  That is, when some condition is met, I want to perform some action.  This is a natural part of English at least.  I can easily imagine having a conversation with someone where they’re trying to tell me what they need some machine to do, let’s say scheduling some event to occur:

“When ‘the first person comes into the factory’ ‘turn on all the machines'”

The first part of this sentence establishes the condition.  The second part specifies the action that is to occur once the condition is met.

Easy enough.  If I were to write it in code, it might look like:

waitFor(firstPersonArrives);
turnOnAllMachines();

That’s great, and you could already do this in TINN. But, the language doesn’t quite match the words that came out of my mouth. What I really want to write is the following:

when(firstPersonArrives, turnOnAllMachines)

Or even better, with some syntactic sugar

when firstPersonArrives do
  turnOnAllMachines()
end

I need a primitive ‘when’ which will take a predicate as its first argument, and an action as its second. Perhaps something like this:

local Task = require("IOProcessor")

local when = function(pred, func)
  local watchit = function()
    Task:yieldUntilPredicate(pred)
    func()
  end

  Task:spawn(watchit)
end

That will do the trick, and give me the function I need. What’s happening? Well, the when() function takes the predicate and the function, and spawns a task with a ‘waitFor()’ in it already. It’s the combination of the spawn() and waitFor() which essentially creates a parallel task, without the programmer having to deal with the very specifics of parallelism. All the programmer has to focus on is the cause and effect. Code that up, and let it go.

Having the ‘when’ in my vocabulary as a programmer is great. It enables a much easier conversation when I’m talking with someone. Whereas ‘when’ is great for covering a single event, the word “whenever” is great at covering multiple occurences. For example, a natural conversation might go like this:

Water the grass every tuesday

In code, it might be:

whenever(itIsTuesday, waterTheGrass)

Well, this is just a slight modification of the ‘when’ primitive. Basically, after the trigger event, spawn another task to wait on the exact same thing again.

local whenever = function(pred, func)
  local watchit = nil;
  watchit = function()
    Task:yieldUntilPredicate(pred)
    func()
    Task:spawn(watchit)
  end

  Task:spawn(watchit)
end

I think this is pretty nifty myself. Here is one contrived example of using these primitives. The English explanation would be:

Every 5 clock ticks, say “Halleluja!”
After the end of 25 clock ticks, stop.

That’s pretty easy to understand. How hard is it to code?

-- test_when.lua

local Task = require("IOProcessor")
local Timer = require("Timer")
local parallel = require("parallel")()


local counter = 0;

local function incrementCounter()
	counter = counter + 1;

	print("Counter: ", counter)
end

local function timeRunsOut()
	if counter >= 25 then
		return true;
	end

	return false;
end

local function stopWorld()
  print("Goodbye World!")
  Task:stop();
end


local lasttrigger = 0;
local function counterHits5()

  if counter % 5 == 0 then
    if counter ~= lasttrigger then
      lasttrigger = counter
      return true;
    end
  end

  return false;
end

local function sayHalleluja()
  print("Halleluja!")
end


local function main()
  -- Create a timer with the task of incrementing
  -- the counter every few milliseconds
  local t1 = Timer({Period=500, OnTime=incrementCounter})
	
  -- start the task to sing whenever we hit 5 ticks
  whenever(counterHits5, sayHalleluja);

  -- start the task to stop after so many ticks
  when(timeRunsOut, stopWorld);
end

run(main)

I find the ‘when’ and ‘whenever’ constructs to be fairly useful when trying to create code that I intend to run in ‘parallel’. By ‘parallel’ here, I mean they are fairly self contained units which may or may not actually run on multiple cores, but at the very least, I can think of them conceptually as independent actions.

Thus far in my programming career, I’ve made fairly good usage of the ‘for’, ‘do-while’, ‘while’, and various other control structures. Now that I’m doing more parallel and async programming, I find the addition of the ‘when’ and ‘whenever’ primitives to be a fairly useful construct for describing how things should work in a parallel universe. I can think of my atomic parallel nuggets, and then just code those nuggets serially. Then, I can use the when, whenever, and spawn constructs to stitch together an entire system.

So, there you have it. As children build up their language skills based on an internal dialog, so too must my programming language skills be built. The ‘when’, ‘whenever’ and ‘spawn’ are the core primitives of my parallel programming inner dialog.


Multitasking single threaded UI – Gaming meets networking

There are two worlds that I want to collide. There is the active UI gaming world, then there’s the more passive networking server world. The difference between these two worlds is the kind of control loop and scheduler that they both typically use.

The scheduler for the networking server is roughly:

while true do
  waitFor(networkingEvent)
  
  doNetworkingStuff()
end

This is great, and works quite well to limit the amount of resources required to run the server because most of the time it’s sitting around idle. This allows you to serve up web pages with a much smaller machine than if you were running flat out, with say a “polling” api.

The game loop is a bit different. On Windows it looks something like this:

while true do
  ffi.fill(msg, ffi.sizeof("MSG"))
  local peeked = User32.PeekMessageA(msg, nil, 0, 0, User32.PM_REMOVE);
			
  if peeked ~= 0 then
    -- do regular Windows message processing
    local res = User32.TranslateMessage(msg)		
    User32.DispatchMessageA(msg)
  end

  doOtherStuffLikeRunningGamePhysics();
end

So, when I want to run a networking game, where I take some input from the internet, and incorporate that into the gameplay, I’ve got a basic problem. Who’s on top? Which loop is going to be THE loop?

The answer lies in the fact that the TINN scheduler is a basic infinite loop, and you can modify what occurs within that loop. Last time around, I showed how the waitFor() function can be used to insert a ‘predicate’ into the scheduler’s primary loop. So, perhaps I can recast the gaming loop as a predicate and insert it into the scheduler?

I have a ‘GameWindow’ class that takes care of creating a window, showing it on the screen and dealing with drawing. This window has a run() function which has the typical gaming infinite loop. I have modified this code to recast things in such a way that I can use the waitFor() as the primary looping mechanism. The modification make it look like this:

local appFinish = function(win)

  win.IsRunning = true
  local msg = ffi.new("MSG")

  local closure = function()
    ffi.fill(msg, ffi.sizeof("MSG"))
    local peeked = User32.PeekMessageA(msg, nil, 0, 0, User32.PM_REMOVE);
			
    if peeked ~= 0 then
      local res = User32.TranslateMessage(msg)
      User32.DispatchMessageA(msg)

      if msg.message == User32.WM_QUIT then
        return win:OnQuit()
      end
    end

    if not win.IsRunning then
      return true;
    end
  end

  return closure;
end

local runWindow = function(self)
	
  self:show()
  self:update()

  -- Start the FrameTimer
  local period = 1000/self.FrameRate;
  self.FrameTimer = Timer({Delay=period, Period=period, OnTime =self:handleFrameTick()})

  -- wait here until the application window is closed
  waitFor(appFinish(self))

  -- cancel the frame timer
  self.FrameTimer:cancel();
end

GameWindow.run = function(self)
  -- spawn the fiber that will wait
  -- for messages to finish
  Task:spawn(runWindow, self);

  -- set quanta to 0 so we don't waste time
  -- in i/o processing if there's nothing there
  Task:setMessageQuanta(0);
	
  Task:start()
end

Starting from the run() function. Only three things need to occur. First, spawn ‘runWindow’ in its own fiber. I want this routine to run in a fiber so that it is cooperative with other parts of the system that might be running.

Second, call ‘setMessageQuanta(0)’. This is a critical piece to get a ‘gaming loop’. This quanta is the amount of time the IO processing part of the scheduler will spend waiting for an IO event to occur. This time will be spent every time through the primary scheduler’s loop. If the value is set to 0, then effectively we have a nice runaway infinite loop for the scheduler. IO events will still be processed, but we won’t spend any time waiting for them to occur.

This has the effect of providing maximum CPU timeslice to various other waiting fibers. If this value is anything other than 0, let’s say ‘5’ for example, then the inner loop of the scheduler will slow to a crawl, providing no better than 60 iterations of the loop per second. Not enough time slice for a game. Setting it to 0 allows more like 3000 iteractions of the loop per second, which gives more time to other fibers.

That’s the trick of this integration right there. Just set the messageQuanta to 0, and away you go. To finish this out, take a look at the runWindow() function. Here just a couple of things are set in place. First, a timer is created. This timer will ultimately end up calling a ‘tick()’ function that the user can specify.

The other thing of note is the use of the waitFor(appCheck(self)). This fiber will block here until the “appCheck()” predicate returns true.

So, finally, the appFinish() predicate, what does that do?

Well, I’ll be darned if it isn’t the essence of the typical game window loop, at least the Windows message handling part of it. Remember that a predicate that is injected to the scheduler using “waitFor()” is executed every time through the scheduler’s loop, so the scheduler’s loop is essentially the same as the outer loop of a typical game.

With all this in place, you can finally do the following:


local GameWindow = require "GameWindow"
local StopWatch = require "StopWatch"

local sw = StopWatch();

-- The routine that gets called for any
-- mouse activity messages
function mouseinteraction(msg, wparam, lparam)
	print(string.format("Mouse: 0x%x", msg))
end

function keyboardinteraction(msg, wparam, lparam)
	print(string.format("Keyboard: 0x%x", msg))
end


function randomColor()
		local r = math.random(0,255)
		local g = math.random(0,255)
		local b = math.random(0,255)
		local color = RGB(r,g,b)

	return color
end

function randomline(win)
	local x1 = math.random() * win.Width
	local y1 = 40 + (math.random() * (win.Height - 40))
	local x2 = math.random() * win.Width
	local y2 = 40 + (math.random() * (win.Height - 40))

	local color = randomColor()
	local ctxt = win.GDIContext;

	ctxt:SetDCPenColor(color)

	ctxt:MoveTo(x1, y1)
	ctxt:LineTo(x2, y2)
end

function randomrect(win)
	local width = math.random(2,40)
	local height = math.random(2,40)
	local x = math.random(0,win.Width-1-width)
	local y = math.random(0, win.Height-1-height)
	local right = x + width
	local bottom = y + height
	local brushColor = randomColor()

	local ctxt = win.GDIContext;

	ctxt:SetDCBrushColor(brushColor)
	--ctxt:RoundRect(x, y, right, bottom, 0, 0)
	ctxt:Rectangle(x, y, right, bottom)
end


function ontick(win, tickCount)

	for i=1,30 do
		randomrect(win)
		randomline(win)
	end

	local stats = string.format("Seconds: %f  Frame: %d  FPS: %f", sw:Seconds(), tickCount, tickCount/sw:Seconds())
	win.GDIContext:Text(stats)
end



local appwin = GameWindow({
		Title = "Game Window",
		KeyboardInteractor = keyboardinteraction,
		MouseInteractor = mouseinteraction,
		FrameRate = 24,
		OnTickDelegate = ontick,
		Extent = {1024,768},
		})


appwin:run()

This will put a window on the screen, and draw some lines and rectangles at a frame rate of roughly 24 frames per second.

And thus, the two worlds have been melded together. I’m not doing any networking in this particular case, but adding it is no different than doing it for any other networking application. The same scheduler is still at play, and everything else will work as expected.

The cool thing about this is the integration of these two worlds does not require the introduction of multiple threads of execution. Everything is still within the same single threaded context that TINN normally runs with. If real threads are required, they can easily be added and communicated with using the computicles that are within TINN.


Alertable Predicates? What’s that?

TINN does async on I/O without much fuss.  It even allows you to suspend a ‘fiber’ by putting it to sleep.  You can easily execute a function on a timer, etc.  These are all good tools to have in the parallel programming toolbox.  There is one tool that has been missing for me though.  The fact that I can run things in parallel means that I need a mechanism to synchronize when some of those tasks complete.  I don’t want to introduce callbacks, because that leads down a whole different path.

I want to execute code that looks something like this:

local finalGoal = function()
  waitFor(goal1)
print("Goal 1 Attained.")
  waitFor(goal2)
print("Goal 2 Attained..")
  waitFor(goal3)
print("Goal 3 Attained...")
print("All Goals Attained!!")

  -- Once all the goals have been met
  -- stop the scheduler.
  stop();
end

The key here is the ‘waitFor()’ function call. You call this function with a ‘predicate’. A predicate is nothing more than a function that returns true or false. If it returns ‘false’ then the condition of the predicate has not been met, and the fiber will not continue. Once the predicate returns ‘true’, the fiber will continue from the point directly after it called ‘waitFor()’. So, to see this sample in its entirety:

-- test_waitFor.lua

local IOProcessor = require("IOProcessor")
local Timer = require("Timer")

local count = 0;


local goal1 = function()
  return count >= 5;
end

local goal2 = function()
  return count >= 10;
end

local goal3 = function()
  return count >= 15;
end

local finalGoal = function()
  waitFor(goal1)
print("Goal 1 Attained.")
  waitFor(goal2)
print("Goal 2 Attained..")
  waitFor(goal3)
print("Goal 3 Attained...")
print("All Goals Attained!!")

  -- Once all the goals have been met
  -- stop the scheduler.
  stop();
end

-- setup a timer to increment the count
-- every 500 milliseconds
local incrTimer = Timer({Period = 500; OnTime = function(timer) count = count+1 end})

spawn(finalGoal)	

run()

This is a contrived case, but it’s illustrative. Down at the bottom, there is a timer being setup with an anonymous function. The timer is set to go off every 500 milliseconds. Once it does, it will execute the associated code, and the count variable will be incremented by 1. That’s all the time needs to be concerned with.

There are three predicate functions setup, goal1, goal2, and goal3. Each one of these predicates checks the value of the count variable, and returns true or false depending on the criterial of the goal.

There is a separate function, ‘finalGoal’. This one is spawned separately, and it will wait for each of the goals to be met in turn. Which order it does these does not really matter in this case. Once all of the goals are met, the scheduler will be stopped and the program will end.

Those predicates can be anything really. They can make calls out to the internet, they can count, they can check the value of any variable, they can check any state of the running process. Whatever you want can be a predicate. Perhaps you want to stop accepting new connections on a web server once a memory limit has been reached. Whatever.

Having the finalGoal() as a separate fiber is useful because the whole fiber will essentially ‘block’ until the goals are all met. This allows you to have other parallel fibers running at the same time, and they will go along their merry way while finalGoal() is waiting for its conditions to be met.

The implementation of this predicate yielding is pretty straight forward. Within the scheduler, there are only two new functions:

IOProcessor.yieldUntilPredicate = function(self, predicate)
	if self.CurrentFiber~= nil then
		self.CurrentFiber.Predicate = predicate;
		table.insert(self.FibersAwaitingPredicate, self.CurrentFiber)

		return self:yield();
	end

	return false;
end

IOProcessor.stepPredicates = function(self)
	local nPredicates = #self.FibersAwaitingPredicate

	for i=1,nPredicates do
		local fiber = self.FibersAwaitingPredicate[1];
		if fiber.Predicate() then
			fiber.Predicate = nil;
			self:scheduleFiber(fiber);

			table.remove(self.FibersAwaitingPredicate, 1)
		end
	end
end

Then, waitFor() is just a shorthand for yieldUntilPredicate():

waitFor = function(predicate)
  return IOProcessor:yieldUntilPredicate(predicate)
end

The predicates are checked every time through the scheduler loop:

IOProcessor.step = function(self)
	self:stepTimeEvents();
	self:stepFibers();
	self:stepIOEvents();
	self:stepPredicates();
.
.
.

This is essentially the same as installing a very high priority fiber because all code for all predicates will run every time through the scheduler loop. The predicates do not need to be ‘scheduled’ like other tasks. A predicate should probably be a fairly short bit of code that runs quickly, and is not very resource intensive. They should be used sparingly.

Quite a few years ago, I was involved with the creation of LinQ in the .net frameworks. At that time, I had this nagging notion that if we could only add a ‘when predicate do’ construct to the CLR, that would be a fairly useful thing. I think using predicates for control flow is especially useful for parallel programming. I also think this construct is easier to use and implement than various other forms of barriers and whatnot.

So, there you have it. TINN supports async IO, timers, yielding on time, and yielding on predicates. Perhaps a predicate can be used to bridge the gap between the normal scheduler, and the event loop that is presented by the typical keyboard and mouse loop.

 


My Head In The Cloud – putting my code where my keyboard is

I have written a lot about the greatness of LuaJIT, coding for the internet, async programming, and the wonders of Windows. Now, I have finally reached a point where it’s time to put the code to the test.

I am running a service in Azure: http://nanotechstyles.cloudapp.net

This site is totally fueled by the work I have done with TINN. It is a static web page server with a couple of twists.

First of all, you can access the site through a pretty name:
http://www.nanotechstyles.com

If you just hit the site directly, you will get the static front page which has nothing more than an “about” link on it.

If you want to load up a threed model viewing thing, hit this:
http://www.nanotechstyles.com/threed.html

If you want to see what your browser is actually sending to the server, then hit this:
http://www.nanotechstyles.com/echo

I find the echo thing to be interesting, and I try hitting the site using different browsers to see what they produce.  This kind of feedback makes it relatively easy to do rapid turnarounds on the webpage content, challenging my assumptions and filling in the blanks.

The code for this web server is not very complex.  It’s the same standard ‘main’ that I’ve used in the past:

local resourceMap = require("ResourceMap");
local ResourceMapper = require("ResourceMapper");
local HttpServer = require("HttpServer")

local port = arg[1] or 8080

local Mapper = ResourceMapper(resourceMap);

local obj = {}

local OnRequest = function(param, request, response)
	local handler, err = Mapper:getHandler(request)


	-- recycle the socket, unless the handler explictly says
	-- it will do it, by returning 'true'
	if handler then
		if not handler(request, response) then
			param.Server:HandleRequestFinished(request);
		end
	else
		print("NO HANDLER: ", request.Url.path);
		-- send back content not found
		response:writeHead(404);
		response:writeEnd();

		-- recylce the request in case the socket
		-- is still open
		param.Server:HandleRequestFinished(request);
	end

end

obj.Server = HttpServer(port, OnRequest, obj);
obj.Server:run()

In this case, I’m dealing with the OnRequest() directly, rather than using the WebApp object.  I’m doing this because I want to do some more interactions at this level that the standard WebApp may not support.

Of course the ‘handlers’ are where all the fun is. I guess it makes sense to host the content of the site up on the site for all to see and poke fun at.

My little experiment here is to give my code real world exposure, with the intention of hardening it, and gaining practical experience on what a typical web server is likely to see out in the wild.

So, if you read this blog, go hit those links. Soon enough, perhaps I will be able to serve up my own blog using my own software. That’s got a certain circular reference to it.


Pinging Peers – Creating Network Meshes

I have the need to create a mesh of network nodes on occasion. The configuration of the mesh is typically a set of VMs running on some cloud service or another. The code running on these VMs may perform some basic task, probably similar across all the VMs. They do need to communicate information with each other, and to do that, they need to maintain a list of their peers and the latencies between them, and the like.

So, I started modeling this thing called the Boundary Gateway Protocol (BGP), which is one of the ancient low lying protocols of the internet. I model the protocol by first creating an object BGPPeer. The intention of this object is to act as both the host and client of the fundamental mesh communications. It looks like this:

-- BGProtocol.lua
--[[
	Implementation of Boundary Gateway Protocol (BGP)
--]]

local ffi = require("ffi");
local Timer = require("Timer")
local IOCPSocket = require("IOCPSocket");
local SocketUtils = require("SocketUtils")
local Network = require("Network")

--[[
	BGP Relies on the presence of a fully connected graph
	The Protocol object is initialized by reading a list of nodes
	and creating the right channels between each of the nodes.
--]]

--[[
print("ComputerNameNetBIOS: ", Network.getHostName(ffi.C.ComputerNameNetBIOS))
print("ComputerNameDnsHostname: ", Network.getHostName(ffi.C.ComputerNameDnsHostname))
print("ComputerNameDnsDomain: ", Network.getHostName(ffi.C.ComputerNameDnsDomain))
print("ComputerNameDnsFullyQualified: ", Network.getHostName(ffi.C.ComputerNameDnsFullyQualified))
print("ComputerNamePhysicalNetBIOS: ", Network.getHostName(ffi.C.ComputerNamePhysicalNetBIOS))
print("ComputerNamePhysicalDnsHostname: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsHostname))
print("ComputerNamePhysicalDnsDomain: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsDomain))
print("ComputerNamePhysicalDnsFullyQualified: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsFullyQualified))
--]]

local selfhost = Network.getHostName(ffi.C.ComputerNameDnsHostname):lower()
--print("Self Host: ",selfhost)

local BGPPeer = {
  HeartbeatInterval = 50 * 1000;	-- send out a hearbeat on this interval
  UdpPort = 1313;					-- port used to communicate UDP
  EliminateSelf = true;			-- don't consider self to be a peer
}
setmetatable(BGPPeer, {
  __call = function(self, ...)
    return self:create(...)
  end,
})

local BGPPeer_mt = {
  __index = BGPPeer,
}

BGPPeer.init = function(self, config)
  local err
  local obj = {
    Config = config,
    Peers = {},
  }

  -- if there is config information, then use
  -- it to setup sockets to the peer
  if config then
    -- create the client socket
    obj.Name = config.name;
    obj.UdpSender, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
    obj.UdpPeerAddress, err = SocketUtils.CreateSocketAddress(config.host, config.udpport or BGPPeer.UdpPort)


    obj.UdpPeerAddressLen = ffi.sizeof(obj.UdpPeerAddress);
  end

  setmetatable(obj, BGPPeer_mt)

  return obj;
end

BGPPeer.create = function(self, configfile)
  -- load the configuration file
  local fs, err = io.open(configfile, "rb")
	
  if not fs then return nil, "could not load file" end

  local configdata = fs:read("*all")
  fs:close();
	
  local func, err = loadstring(configdata)
  local config = func()

  -- create self, so peers can be added
  local obj = self:init()

  -- add the udp socket
  -- Setup the server socket
  obj.UdpListener, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
  if not obj.UdpListener then
    return nil, err;
  end

  local success, err = obj.UdpListener:bindToPort(BGPPeer.UdpPort);

  if not success then
    return nil, err;
  end

  -- for each of the peers within the config 
  -- create a new peer, and add it to the 
  -- list of peers for the first object
  for _,peerconfig in ipairs(config) do
		--print("PEER: ", peerconfig.name, peerconfig.host)
    if peerconfig.name:lower() ~= selfhost then
      obj:createPeer(peerconfig)
    end
  end

  -- Need to setup a listener for the UDP port

  -- add the heartbeat timer
  obj.HeartbeatTimer = Timer({
		Delay =  BGPPeer.HeartbeatInterval;
		Period = BGPPeer.HeartbeatInterval;
		OnTime = obj:onHeartbeat();})

  -- start a fiber which will run the UDP listener loop
  spawn(obj:handleUdpListen())

  return obj;
end

BGPPeer.onHeartbeat = function(self)
  local message = "HELLO FROM: "..Network:getHostName();
  local closure = function(timer)

  for peername, peer in pairs(self.Peers) do
    local bytessent, err = peer.UdpSender:sendTo(peer.UdpPeerAddress, peer.UdpPeerAddressLen, 
      message, #message);

    end
  end

  return closure
end

BGPPeer.addPeer = function(self, peer)
  self.Peers[peer.Name] = peer
end

BGPPeer.createPeer = function(self, peerconfig)
  self:addPeer(BGPPeer:init(peerconfig));
end

local count = 0;
BGPPeer.onReceiveHeartBeat = function(self, buff, bytesread)
  count = count + 1
  print("onReceiveHeartBeat: ", count, ffi.string(buff, bytesread));
end


BGPPeer.handleUdpListen = function(self)
  local bufflen = 1500;
  local buff = ffi.new("uint8_t[1500]");
  local from = sockaddr_in();
  local fromLen = ffi.sizeof(from);


  local closure = function()

    while true do
      local bytesread, err = self.UdpListener:receiveFrom(from, fromLen, buff, bufflen);

      if not bytesread then
        print("receiveFrom ERROR: ", err)
        return false, err;
      end

      self:onReceiveHeartBeat(buff, bytesread)
    end
  end

  return closure
end

return BGPPeer

There are a couple things of note here. One of the basic problems I need to solve is the fact that this is not a basic client/server app, but rather a peer to peer setup. As such, each peer both listens on a port, and sends data.

A typical network ‘server’ app may block on listening, and not perform any actions until it receives a request from a ‘client’. In my case, I don’t want to block, but I do want to initiate a ‘blocking’ loop in parallel with some non-blocking stuff.

In this BGPPeer, there is the handleUdpListen() function. This function is the heart of the Udp listening loop, or the “server” side of the peer communication. This function is run in parallel as it is invoked in the constructor of the object using this:

  spawn(obj:handleUdpListen())

In this particular case, the handleUdpListen() function contains a closure (another function) which is the function that is actually returned, and spawned in its own fiber. The function is pretty straight forward.

  local closure = function()

    while true do
      local bytesread, err = self.UdpListener:receiveFrom(from, fromLen, buff, bufflen);

      if not bytesread then
        print("receiveFrom ERROR: ", err)
        return false, err;
      end

      self:onReceiveHeartBeat(buff, bytesread)
    end
  end

Basically, just run a loop forever, waiting to receiveFrom some Udp packet. Since in TINN all IO calls are cooperative, while this receiveFrom is ‘blocked’, the current coroutine is actually yielding, so that other work can continue. That’s great, that’s exactly what I want. So, now the listening side is setup.

How about the sending side? I want to send out a formatted packet on a regular interval. Since TINN has a Timer object, this is a pretty straight forward matter. Within the constructor of the BGPPeer, we find the following:

  -- add the heartbeat timer
  obj.HeartbeatTimer = Timer({
		Delay =  BGPPeer.HeartbeatInterval;
		Period = BGPPeer.HeartbeatInterval;
		OnTime = obj:onHeartbeat();})

With this setup, the timer will execute the ‘onHeartbeat()’ function every “Period”. This is some number of milliseconds. The default is 50 seconds, but it can be configured to any value.

As we saw from the last article on Timers, this timer object will start automatically, and run cooperatively with anything else running in the process. That’s great. So, onHeartbeat just needs to send out information to all its peers whenever it is called.

BGPPeer.onHeartbeat = function(self)
  local message = "HELLO FROM: "..Network:getHostName();
  local closure = function(timer)

  for peername, peer in pairs(self.Peers) do
    local bytessent, err = peer.UdpSender:sendTo(peer.UdpPeerAddress, peer.UdpPeerAddressLen, 
      message, #message);

    end
  end

  return closure
end

This becomes a simple matter of traversing the list of peers, and using the Udp socket that was setup with each one of them to send out the packet we want to send out.

What’s that altogether then? We saw the spawn that sent the Udp listener off in a cooperative manner. And just now I showed the timer going in parallel. The two combined give you a model where you can both listen and send ‘at the same time’.

It looks so easy, and that’s the power of TINN, the scheduler, and Lua’s coroutines. It’s fairly easy to write such code without having to worry overly much about old school things like mutexes, thread context switching, and the like. Just write your code however you want, sprinkle in a few ‘spawn()’ functions here and there, and call it a day. This is how easy it is to write network programs that can walk and chew gum at the same time.

Next time around, I want to write the simple handler that can deal with Win32 message looping and peer networking at the same time. Same general mechanisms, just a cooperative User32 message pump to incorporate.


Follow

Get every new post delivered to your Inbox.

Join 45 other followers