Parallel Conversations

I have been tinkering with words of late. I’ve added ‘waitFor’, ‘when’ and ‘whenever’ to the TINN lexicon, and my programming is becoming easier, more organized, and easier to describe.

Recently I added another set of words: ‘waitSignal’, ‘signalOne’, and ‘signalAll’. If you were using another system, you might call these ‘events’, but really they’re just signaling.

Here’s how I might use them:

local Application = require("Application")(true)

local function waiter(num)
  num = num or 0

  local function closure()
    print(string.format("WAITED: %d",num))
  end

  return closure;
end

local function main()

  for i=1,4 do
    onSignal(waiter(i), "waiting")
  end

  -- signal only one of them
  signalOne("waiting")

  -- sleep a bit giving other tasks
  -- a chance to run
  sleep(500)

  -- signal the rest of them	
  signalAll("waiting"))
  sleep(2000)
	
  print("SLEEP AFTER signalAll")
end

run(main)

First, it spawns 4 tasks which will all be waiting on the ‘waiting’ signal. With ‘signalOne’, only 1 of the 4 waiting tasks will be awakened and continue execution. With the ‘signalAll’, the rest of the waiting tasks are all scheduled to run, and thus continue from where they left of.

The core primitive for signaling is the ‘waitSignal()’ function. This will essentially suspend the currently running task until the specified signal has been given. The ‘onSignal()’ function is a convenience which will spawn a separate task which will do the waiting and execute the specified function at the appropriate time.

If you were using another language, these might be ‘onEvent()’ and ’emit()’. Of course it’s really easy to code up that way, just create a name alias to whatever you like. You could even do:

local function on(eventName, func)
  return onSignal(func, eventName)
end

So, these are some more words that are in the parallel programming arsenal. In totality, the set is:

Time related

  • sleep
  • delay
  • periodic

Predicates

  • waitFor
  • when
  • whenever

Signaling

  • onSignal
  • signalOne
  • signalAll
  • waitSignal

Task and scheduler

  • run
  • stop
  • spawn
  • yield

With these words in hand, doing my multi-task programming is becoming easier by the moment. I don’t worry about the lower level multi-tasking primitive such as mutexes, locks, and the like. I’m not really that concerned with timers, thread pools, or any of the other machinery that makes this all happen. I just write my simple code.

One thing has struck me though. This is all great for a single threaded environment. Basically collaborative multi-tasking. In this age of super fast CPUs, it turns out that going single threaded is just fine. In many cases it’s actually better than going preemptive multi-threaded because you don’t necessarily incur as much OS thread level context switching.

I’d like to go one step further though. A long time ago we didn’t have L1/L2 caches on the CPUs, then they came along, got better, and now are fairly large. As a programmer, my concern for them is fairly minimal. I don’t do much to ensure that the caches are used properly. I just write my code, and either the compiler, or the CPU itself deals with all the mechanics. Same goes with prefetching instructions and the like. I don’t write my code to make that job any easier, it just happens automagically.

Here I am contemplating the same thing about threads. As my CPU has increasingly more cores, I am wondering if the CPU cycles should be viewed the same as we viewed levels of memory in the past. Is the a C1/C2 level of core execution. That is, most things should stay on the same thread because switching to a different core is costly. Something very low level should decide on when that switching occurs.

My application/runtime can know a lot about what I’m going to do because the thing is all laid out in script. After some analysis, perhaps I can decide “this thing is going to sleep for more than 500 ms, therefore I’m going to actually put it over here on this other thread which is actually sleeping.”. In that way, I can reduce my resource usage because I won’t be polling the timer every time through the scheduling loop, I can be smarter.

If I had such smartness, then when I have 16 – 100 cores, I’ll be able to easily take advantage of those multiple cores without having to truly understand and navigate that madness on my own. Let the machine figure it out, it’s much better at that than I am.

In the meanwhile, I’ve got these multi-tasking primitives which are making my life a lot easier when it comes to programming fairly complex systems.


Lua Coroutine Roundup

My WordPress dashboard says this is my 250th post. I figure what better way to mark the occasion than to do a little bit of a roundup on one of my favorite topics.

One of the most awesome features of the Lua language is a built in co-routine mechanism. I’ve talked about co-routines quite a bit, doing a little series on the basics of coroutines, all the way up through a scheduler for multi-tasking.

Coroutines give the programmer the illusion of creating a parallel processing environment. Basically, write up your code “spawn” different coroutines, and then just forget about them… Well, almost. The one thing the Lua language does not give you is a way to manage those coroutines. Co-routines are a mechanism by which ‘threads’ can do cooperative multi-tasking. This is different from the OS level preemptive multitasking that you get with ‘threads’ on most OS libraries. So, I can create coroutines, resume, and yield. It’s the ‘yield’ that gives coroutines the multi-tasking flavor. One co-routine yields, and another co-routine must be told to ‘resume’. Something has to do the telling, and keep track of who’s yielded, and who needs to be resumed. In steps the scheduler.

Summaries from my own archives
Lua Coroutines – Getting Started

Multitask UI Like it’s 1995

Hurry Up and Wait – TINN Timing

Computicles – A tale of two schedulers

Parallel Computing is Child’s Play

Multitasking single threaded UI – Gaming meets networking

Alertable Predicates? What’s that?

Pinging Peers – Creating Network Meshes

From the Interwebs
GitHub Gist: Deco / coroutine_scheduler.lua

Beginner’s Guide to Coroutines – Roblox Wiki

LOOP: Thread Scheduler

LuaAV

And of course the search engines will show you thousands more:

lua coroutine scheduler

I think it would be really great if the Lua language itself had some form of rudimentary scheduler, but scheduling is such a domain specific thing, and the rudiments are so basic, that it’s possibly not worth providing such support. The Go language supports a basic scheduler, and has coroutine support, to great effect I think.

The simple scheduler is simple, just do a round robbin execution of one task after the next as each task yields. I wrote a simple dispatcher which started with exactly this. Then interesting things start to happen. You’d like to have your tasks go into a wait state if they’re doing an IO operation, and not be scheduled again until some IO has occurred. Or, you want to put a thread to sleep for some amount of time. Or, you want a thread to wait on a condition of one sort or another, or signals, or something else. Pretty soon, you basic scheduler is hundreds of lines of code, and things start to get rather complicated.

After going round and round in this problem space, I finally came up with a solution that I think has some legs. Rather than a complex scheduler, I promote a simple scheduler, and add functions to it through normal thread support. So, here is my final ‘scheduler’ for 2013:


local ffi = require("ffi");

local Collections = require("Collections");
local StopWatch = require("StopWatch");


--[[
	The Scheduler supports a collaborative processing
	environment.  As such, it manages multiple tasks which
	are represented by Lua coroutines.
--]]
local Scheduler = {}
setmetatable(Scheduler, {
	__call = function(self, ...)
		return self:create(...)
	end,
})
local Scheduler_mt = {
	__index = Scheduler,
}

function Scheduler.init(self, scheduler)
	local obj = {
		Clock = StopWatch();

		TasksReadyToRun = Collections.Queue();
	}
	setmetatable(obj, Scheduler_mt)
	
	return obj;
end

function Scheduler.create(self, ...)
	return self:init(...)
end

--[[
		Instance Methods
--]]
function Scheduler.tasksArePending(self)
	return self.TasksReadyToRun:Len() > 0
end

function Scheduler.tasksPending(self)
	return self.TasksReadyToRun:Len();
end


function Scheduler.getClock(self)
	return self.Clock;
end


--[[
	Task Handling
--]]

function Scheduler.scheduleTask(self, afiber, ...)
	if not afiber then
		return false, "no fiber specified"
	end

	afiber:setParams(...);
	self.TasksReadyToRun:Enqueue(afiber);	
	afiber.state = "readytorun"

	return afiber;
end

function Scheduler.removeFiber(self, fiber)
	--print("DROPPING DEAD FIBER: ", fiber);
	return true;
end

function Scheduler.inMainFiber(self)
	return coroutine.running() == nil; 
end

function Scheduler.getCurrentFiber(self)
	return self.CurrentFiber;
end

function Scheduler.step(self)
	-- Now check the regular fibers
	local task = self.TasksReadyToRun:Dequeue()

	-- If no fiber in ready queue, then just return
	if task == nil then
		return true
	end

	if task:getStatus() == "dead" then
		self:removeFiber(task)

		return true;
	end

	-- If the task we pulled off the active list is 
	-- not dead, then perhaps it is suspended.  If that's true
	-- then it needs to drop out of the active list.
	-- We assume that some other part of the system is responsible for
	-- keeping track of the task, and rescheduling it when appropriate.
	if task.state == "suspended" then
		return true;
	end

	-- If we have gotten this far, then the task truly is ready to 
	-- run, and it should be set as the currentFiber, and its coroutine
	-- is resumed.
	self.CurrentFiber = task;
	local results = {task:resume()};

	-- no task is currently executing
	self.CurrentFiber = nil;

	-- once we get results back from the resume, one
	-- of two things could have happened.
	-- 1) The routine exited normally
	-- 2) The routine yielded
	--
	-- In both cases, we parse out the results of the resume 
	-- into a success indicator and the rest of the values returned 
	-- from the routine
	local success = results[1];
	table.remove(results,1);


	--print("SUCCESS: ", success);
	if not success then
		print("RESUME ERROR")
		print(unpack(results));
	end

	-- Again, check to see if the task is dead after
	-- the most recent resume.  If it's dead, then don't
	-- bother putting it back into the readytorun queue
	-- just remove the task from the list of tasks
	if task:getStatus() == "dead" then
		--print("Scheduler, DEAD coroutine, removing")
		self:removeFiber(task)

		return true;
	end

	-- The only way the task will get back onto the readylist
	-- is if it's state is 'readytorun', otherwise, it will
	-- stay out of the readytorun list.
	if task.state == "readytorun" then
		self:scheduleTask(task, results);
	end
end


--[[
	Primary Interfaces
--]]

function Scheduler.suspend(self, aTask)
	if not aTask then
		self.CurrentFiber.state = "suspended"
		return self:yield()
	end

	aTask.state = "suspended";

	return true
end

function Scheduler.yield(self, ...)
	return coroutine.yield(...);
end


--[[
	Running the scheduler itself
--]]
function Scheduler.start(self)
	if self.ContinueRunning then
		return false, "scheduler is already running"
	end
	
	self.ContinueRunning = true;

	while self.ContinueRunning do
		self:step();
	end
	--print("FINISHED STEP ITERATION")
end

function Scheduler.stop(self)
	self.ContinueRunning = false;
end

return Scheduler

At 195 LOC, this is back down to the basic size of the original simple dispatcher that I started with. There are only a few functions to this interface:

  • scheduleTask()
  • step()
  • suspend()
  • yield()
  • start()

These are the basics to do some scheduling of tasks. There is an assumption that there is an object that can be scheduled, such that ‘scheduleTask’ can attach some metadata, and the like. Other than that, there’s not much here but the basic round robin scheduler.

So, how do you make a more complex scheduler, say one that deals with time? Well, first of all, I’ve also added an Application object, which actually contains a scheduler, and supports various other things related to an application, including add ons to the scheduler. Here’s an excerpt of the Application object construction.

function Application.init(self, ...)
	local sched = Scheduler();

	waitForIO.MessageQuanta = 0;

	local obj = {
		Clock = Stopwatch();
		Scheduler = sched;
		TaskID = 0;
		wfc = waitForCondition(sched);
		wft = waitForTime(sched);
		wfio = waitForIO(sched);
	}
	setmetatable(obj, Application_mt)

	-- Create a task for each add-on
	obj:spawn(obj.wfc.start, obj.wfc)
	obj:spawn(obj.wft.start, obj.wft)
	obj:spawn(obj.wfio.start, obj.wfio)

	return obj;
end

In the init(), the Application object creates instances for the add ons to the scheduler. These add-ons (waitForCondition, waitForTime, waitForIO) don’t have to adhere to much of an interface, but you do need some function which is callable so that it can be spawned. The Application construction ends with the spawning of the three add-ons as normal threads in the scheduler. Yep, that’s right, the add ons are just like any other thread, running in the scheduler. You might think; But these things need to run with a higher priority than regular threads! And yes, in some cases they do need that. But hay, that’s an exercise for the scheduler. If the core scheduler gains a ‘priority’ mechanism, then these threads can be run with a higher priority, and everything is good.

And what do these add ons look like? I’ve gone over the before, but here’s an example of the timing one:

local Functor = require("Functor")
local tabutils = require("tabutils");


local waitForTime = {}
setmetatable(waitForTime, {
	__call = function(self, ...)
		return self:create(...)
	end,
})

local waitForTime_mt = {
	__index = waitForTime;
}

function waitForTime.init(self, scheduler)
	local obj = {
		Scheduler = scheduler;
		TasksWaitingForTime = {};
	}
	setmetatable(obj, waitForTime_mt)

	--scheduler:addQuantaStep(Functor(obj.step,obj));
	--scheduler:spawn(obj.step, obj)

	return obj;
end

function waitForTime.create(self, scheduler)
	if not scheduler then
		return nil, "no scheduler specified"
	end

	return self:init(scheduler)
end

function waitForTime.tasksArePending(self)
	return #self.TasksWaitingForTime > 0
end

function waitForTime.tasksPending(self)
	return #self.TasksWaitingForTime;
end

local function compareTaskDueTime(task1, task2)
	if task1.DueTime < task2.DueTime then
		return true
	end
	
	return false;
end

function waitForTime.yieldUntilTime(self, atime)
	--print("waitForTime.yieldUntilTime: ", atime, self.Scheduler.Clock:Milliseconds())
	--print("Current Fiber: ", self.CurrentFiber)
	local currentFiber = self.Scheduler:getCurrentFiber();
	if currentFiber == nil then
		return false, "not currently in a running task"
	end

	currentFiber.DueTime = atime;
	tabutils.binsert(self.TasksWaitingForTime, currentFiber, compareTaskDueTime);

	return self.Scheduler:suspend()
end

function waitForTime.yield(self, millis)
	--print('waitForTime.yield, CLOCK: ', self.Scheduler.Clock)

	local nextTime = self.Scheduler.Clock:Milliseconds() + millis;

	return self:yieldUntilTime(nextTime);
end

function waitForTime.step(self)
	--print("waitForTime.step, CLOCK: ", self.Scheduler.Clock);

	local currentTime = self.Scheduler.Clock:Milliseconds();

	-- traverse through the fibers that are waiting
	-- on time
	local nAwaiting = #self.TasksWaitingForTime;
--print("Timer Events Waiting: ", nAwaiting)
	for i=1,nAwaiting do

		local fiber = self.TasksWaitingForTime[1];
		if fiber.DueTime <= currentTime then
			--print("ACTIVATE: ", fiber.DueTime, currentTime);
			-- put it back into circulation
			-- preferably at the front of the list
			fiber.DueTime = 0;
			--print("waitForTime.step: ", self)
			self.Scheduler:scheduleTask(fiber);

			-- Remove the fiber from the list of fibers that are
			-- waiting on time
			table.remove(self.TasksWaitingForTime, 1);
		end
	end
end

function waitForTime.start(self)
	while true do
		self:step();
		self.Scheduler:yield();
	end
end

return waitForTime

The ‘start()’ function is what was scheduled with the scheduler, so that’s the thread that’s actually running. As you can see, it’s just a normal cooperative task.

The real action comes from the ‘yield()’ and ‘step()’ functions. The ‘yield()’s purpose in life is to suspend the current task (take it out of the active running list of the scheduler), and setup the appropriate stuff such that it can be activated later. The purpose of the “step()” function is to go through the list of tasks which are currently suspended, and determine which ones need to be scheduled again, because their time signal has been met.

And that’s how you add items such as “sleep()” to the system. You don’t change the core scheduler, but rather, you add a small module which knows how to deal with timed events, and that’s that.

The application object comes along and provides some convenience methods that can easily be turned into globals, so that you can easily type things like:

delay(function() print("Hello, World!") end, 5000)  -- print "Hello, World! after 5 seconds
periodic(function() print("Hi There!") end, 1000)  -- print "Hi, There!" every second

And there you have it. The core scheduler is a very simple thing. Just a ready list, and some algorithm that determines which one of the tasks in the list gets to run next. This can be as complex as necessary, or stay very simple.

Using an add on mechanism, the overall system can take on more robust scenarios, without increasing the complexity of the individual parts.

This simplified design satisfies all the scenario needs I currently have from high throughput web server to multi-tasking UI with network interaction. I will improve the scheduling algorithm, and probably make that a pluggable function for ease.

Lua coroutines are an awesome feature to have in a scripting language. With a little bit of work, a handy scheduler makes multi-task programming a mindless exercise, and that’s a good thing.

Happy New Year!


What The Functor? – Making them useful

Last time, I introduced the Functor in a rather convoluted way. I said that particular implementation may or may not be as performant as a straight up closure. Well, as it turns out, that’s going to be true most of the time. Why?

You see, when you use a table as a functor, you are forcing a call to ‘getmetatable’ to do a lookup to see if the ‘__call’ metamethod is implemented or not. If it’s not implemented, an error is thrown, if it is implemented, then your operation is performed. Well, that means an extra function call and dereference into our table for every single function call. When this is on a hot path, it could be quite a bummer in terms of performance.

So, why bother with this particular convoluted construct for functor implementation? The beauty of the table technique is that since your functor is actually a table, which gives you they syntactic sugar of being called like a function, it has all the attributes of a table. For example, let’s imagine you want to associate some bit of information with your function, such as when it was created, or version information:

local f1 = Functor(somefunction)
f1.Creation = os.date()
f1.Version = "1.2"

That might be useful in a case where you want to refresh your functors if they’re getting to be too old, or you want to compare versions of functions to ensure they are what you expect. Very handy, yet very costly.

Although this is useful, it does impose this overhead, so is there another way to implement the functor without incurring so much overhead?

We of course! If you’re willing to lose the table capabilities of your functor, but you want to retain the fact that you can associate some amount of state with your functor, then a closure is the way to go:

local Functor = function(func, target)
  if not target then return func end

  return function(...)
    return func(target,...)
  end
end

Yah, that’s right. After I bad mouthed closures when I first introduced functors, here I am praising their value. And it’s true. Given the constraint of ditching associating information with the functor, beyond the passed in state, this closure implementation is a lot faster when it comes time to actually execute the code.

Not only is this code faster, but it’s a lot easier to understand, easier to maintain, etc.

And how would I use this in real life? Well, at the core of TINN is the scheduler, as represented by the IOProcessor class. The scheduler is at the heart of what makes TINN tick, and thus it must be the most compact, performant bit of code that it can possibly be. At the same time, I want it to be flexible, because ultimately I need to ability to change the very core scheduling algorithms and other core features, without having to totally reengineer the codebase to make changes.

So, I’ve been tinkering.

I’ve recently rewritten parts of the scheduler to utilize the functors, and iteratores. Here is the main loop as it stands today:

IOProcessor.start = function(self)
  self:addQuantaStep(Functor(self.stepIOEvents,self));
  self:addQuantaStep(Functor(self.stepTimeEvents,self));
  self:addQuantaStep(Functor(self.stepFibers,self));
  self:addQuantaStep(Functor(self.stepPredicates,self));


  self.ContinueRunning = true;

  when(Functor(self.noMoreTasks,self), Functor(self.stop, self))

  for astep in self:quantumSteps() do
    astep()
  end
end

IOProcessor.run = function(self, func, ...)
  if func ~= nil then
    self:spawn(func, ...);
  end

  self:start();
end

If you were writing a typical TINN based program where you wanted multi-tasking, you would be doing this:

local Task = require("IOProcessor")

local function main()
  -- after 5 seconds, stop the system
  delay(function(tick) Task:stop(), 5 * 1000)

  -- print something every half second
  periodic(function(tick) print("Hello Again: ",tick) end, 500)
end

Task:run(main)

The IOProcessor.start() routine is littered with Functors! What are they all doing? OK, first with the ‘addQuantaSteps’. The quanta steps are the runctions that are executed each time through the scheduler’s loop. You can see this in the iterator at the bottom:

  for astep in self:quantumSteps() do
    astep()
  end

The usual way you’d see this code written is as a while loop, or some other looping construct. In this case, I have used an iterator as the fundamental loop construct. That seems a bit novel, or at least slightly off the beaten path. Why bother with such a construct? Well, first of all, since it’s an iterator, the ‘step()’ function doesn’t actually know anything about the code that is being executed. It doesn’t know how long, and it doesn’t assume any side effects. This gives great flexibility because in order to change the behavior of the main loop only requires changing the behavior of the iterator that is being used. You could easily modify the scheduler to utilize your own custom iterator if you like. The only assumption is that the items coming out of the iterator are functions that can be called, and that’s it.

The ‘addQuantaStep()’ function calls are simply a way to prepopulate the list of functions with a known set of things I want to do each time through the scheduler’s loop. I made this a function call because that way any bit of code could do the same thing before running the scheduler. Here the Functor is being used to ensure we capture the state that goes along with the function.

The last place the Functor is being used here is in the ‘when()’ call:

  when(Functor(self.noMoreTasks,self), Functor(self.stop, self))

“when there are no more tasks, stop”

The functors are used because the functions in question live within the IOProcessor table, and are called with state information, so it needs to be associated with them. As an aside, why not just make these functions flat? Why are they even associated with a table anyway? Well, because, even though at the moment you can run only a single scheduler in your process, there’s no reason in the world why this MUST be the case. With another couple of changes, you’ll actually be able to run multiple schedulers at the same time within a single process. Yep, that might be a tad bit useful. Kind of like running multiple processes at the OS level.

At any rate, just for kicks, here is what the iterator of those functors looks like:

IOProcessor.addQuantaStep = function(self, astep)
  table.insert(self.QuantaSteps,astep)
end

IOProcessor.quantumSteps = function(self)
  local index = 0;
  local listSize = #self.QuantaSteps;

  local closure = function()
    if not self.ContinueRunning then
      return nil;
    end

    index = index + 1;
		
    local astep = self.QuantaSteps[index]

    if (index % listSize) == 0 then
      index = 0
    end

    return astep
  end

  return closure	
end

Basically, just keep going through the list of steps over and over again until something sets ‘ContinueRunning’ to false.

Having these steps represented as an iterator satisfies the needs that I have for running a fairly dynamically configurable main loop. While I was doing this, I was also considering what implications this might have for doing other things with the iterator. For example, is there something I can do with functional programming? The recent Lua Fun library is all about iterators, and I’m wondering if I can do something with that.

Functor and iterator in hand, I am going to look to streamline the scheduler even further. I am imagining the core of the scheduler is not more than 100 lines of code. The quantum steps, which is where all the real action is, could live separately, and be easily pluggable modules. That’s where this is headed, because I don’t know how to deal with all that complexity otherwise.

So, there you have it. Functors can be useful in practice, and not just a software engineering curiosity.


What the Functor! Again?

No, I’m not talking about George Clinton (probably related to Bill), but rather the function that can wrap functions.

What’s the point of this? In this case, I’ll start with the Functor class itself.

local functor = {}
setmetatable(functor, {
  __call = function(self, ...)
    return self:create(...);
  end,
})
local functor_mt = {
  __index = functor,
  __call = function(self, ...)
    if self.Target then
      return self.Func(self.Target, ...)
    end

    return self.Func(...)
  end,
}

functor.init = function(self, func, target)
  local obj = {
    Func = func;
    Target = target;
  }
  setmetatable(obj, functor_mt)

  return obj;
end

functor.create = function(self, func, target)
  return self:init(func, target)
end

Before explaining how this little thing works, here’s how it is used.

local function printSomething(words)
  print(words)
end

local f1 = Functor(printSomething)

f1("words to say")

OK. So, what’s the big deal? This is the most basic case, and there is no big deal. Basically the Function wraps the function you pass to it when you construct it: f1 = Functor(printSomething)

Then, later, when you use the function: f1(“words to say”), it’s as if you were calling the function directly. Why on earth would you ever want to do this?

Let’s imagine that you have a list of functions that you want to call from within the body of some other function.

listOfFuncs = {
  Functor(func1),
  Functor(func2), 
  Functor(func3)
}

local function every(funclist)
  for _, fun in ipairs(funclist) do
    fun()
  end
end

every(listOfFuncs)

For regular functions, there’s no real benefit here to using a functor, and there’s all this added overhead.

Now here’s another case. Instead of the functions just laying around without any associated state, they are methods of ‘objects’. The trick is to get the object instance associated with the function pointer, while maintaining the same relatively easy syntax.

local methods = {}
local methods_mt = {
	__index = methods,
}

methods.init = function(self)
	local obj = {
		name = "foo",
		name2 = "bar",
	}
	setmetatable(obj, methods_mt)
	
	return obj;
end

methods.method1 = function(self)
	print(self.name)
end

methods.method2 = function(self)
	print(self.name2)
end

local m1 = methods:init();

local f2 = Functor(methods.method1, m1)
local f3 = Functor(m1.method2, m1)

f2();
f3();

In this case, I need to associate the function pointer with an instance of the methods ‘object’, so when the Functor is constructed, the second parameter, which is the instance of the object, is also passed along.

Now, back to the Functor code, when it comes time to execute the function, it does it one way or another depending on whether it has a target or not:

  __call = function(self, ...)
    if self.Target then
      return self.Func(self.Target, ...)
    end

    return self.Func(...)
  end,

If it has a target, it will pass that as the first parameter when calling the function. If there is no specific target, then it will just call the function, passing along the supplied parameters.

So, this solves a very basic problem. Without the Functor construct, I would have to use a ‘closure’ for each function. A closure is essentially the same thing, but the language itself supports the concept of keeping information associated with a function. It might do this by placing information in a global state, or some other construct that might not be quite as succinct as this Functor. The Functor construct allows you to do essentially what you would be doing with a closure, but by using an object, you can more tightly control exactly what’s going on, what’s in the structure, lifetime, and all that.

To use a closure, or to use a functor? Closures are an easy and natural part of the Lua language. Functors are a construct that may or may not be efficient in comparison. The Functor does solve a particular problem when it comes to sticking a function in a list with its attendant state.


Parallel Computing is Child’s Play

Recently, my wife and I brought a new son into the world!  And that got me to thinking.

Some years back, I did some self study on linguistics and language development.  At the time, I was just doing some thinking on how best to communicate, how to write programs that are composable, just like spoken sentences.  One of things I learned from one of the books had to do with how babies learn language.  As one theory goes, they develop a core internal language first.  These are their early bits of associations and linkages.  Everything they learn after that is expressed using the core primitives they developed early on.

Armed with this knowledge, I went away and taught software engineering for a few years.  One of my core tenets was; software programming is nothing more than a specific conversation.  As software engineers, we are in the business of translating intentions from one domain to the next.  We talk to customers, who speak in their own language, and we have to eventually get that translated down into C, or assembly, or whatever.  The more closely the customer’s language matches the machine’s, the easier the translation, and less margin for error.  But, people don’t speak in machine code.

So what then?  Well, then comes the task of the various frameworks and language tools that we use as software engineers to translate from the high level human language to the very narrow and specific machine language.  At the lowest level, we might use C, or Javascript, or C#, or Lua.  Then on top of that there are innevitably various frameworks which help us express ideas in a more meaningful way, without having to work purely with the lower level primitives.  And so on and so forth.

What I’ve been exploring recently is how best to do parallel programming.  At the very core, my machine has a multi-core processor.  There is an OS, and I have a bunch of C based APIs to get various things done.  In the Lua language I have co-routines, which help in making tasks ‘parallel’, or at least cooperative.

Atop this core, I have built a scheduler in the form of the TINN environment, and thus far I can achieve quite a lot of apparent parallelism with the core primitives such as timer, waitFor, sleep, and the like.  Now I want to add some more higher level primitives.

There are two cases I want to cover.  The first is ‘when’.  That is, when some condition is met, I want to perform some action.  This is a natural part of English at least.  I can easily imagine having a conversation with someone where they’re trying to tell me what they need some machine to do, let’s say scheduling some event to occur:

“When ‘the first person comes into the factory’ ‘turn on all the machines'”

The first part of this sentence establishes the condition.  The second part specifies the action that is to occur once the condition is met.

Easy enough.  If I were to write it in code, it might look like:

waitFor(firstPersonArrives);
turnOnAllMachines();

That’s great, and you could already do this in TINN. But, the language doesn’t quite match the words that came out of my mouth. What I really want to write is the following:

when(firstPersonArrives, turnOnAllMachines)

Or even better, with some syntactic sugar

when firstPersonArrives do
  turnOnAllMachines()
end

I need a primitive ‘when’ which will take a predicate as its first argument, and an action as its second. Perhaps something like this:

local Task = require("IOProcessor")

local when = function(pred, func)
  local watchit = function()
    Task:yieldUntilPredicate(pred)
    func()
  end

  Task:spawn(watchit)
end

That will do the trick, and give me the function I need. What’s happening? Well, the when() function takes the predicate and the function, and spawns a task with a ‘waitFor()’ in it already. It’s the combination of the spawn() and waitFor() which essentially creates a parallel task, without the programmer having to deal with the very specifics of parallelism. All the programmer has to focus on is the cause and effect. Code that up, and let it go.

Having the ‘when’ in my vocabulary as a programmer is great. It enables a much easier conversation when I’m talking with someone. Whereas ‘when’ is great for covering a single event, the word “whenever” is great at covering multiple occurences. For example, a natural conversation might go like this:

Water the grass every tuesday

In code, it might be:

whenever(itIsTuesday, waterTheGrass)

Well, this is just a slight modification of the ‘when’ primitive. Basically, after the trigger event, spawn another task to wait on the exact same thing again.

local whenever = function(pred, func)
  local watchit = nil;
  watchit = function()
    Task:yieldUntilPredicate(pred)
    func()
    Task:spawn(watchit)
  end

  Task:spawn(watchit)
end

I think this is pretty nifty myself. Here is one contrived example of using these primitives. The English explanation would be:

Every 5 clock ticks, say “Halleluja!”
After the end of 25 clock ticks, stop.

That’s pretty easy to understand. How hard is it to code?

-- test_when.lua

local Task = require("IOProcessor")
local Timer = require("Timer")
local parallel = require("parallel")()


local counter = 0;

local function incrementCounter()
	counter = counter + 1;

	print("Counter: ", counter)
end

local function timeRunsOut()
	if counter >= 25 then
		return true;
	end

	return false;
end

local function stopWorld()
  print("Goodbye World!")
  Task:stop();
end


local lasttrigger = 0;
local function counterHits5()

  if counter % 5 == 0 then
    if counter ~= lasttrigger then
      lasttrigger = counter
      return true;
    end
  end

  return false;
end

local function sayHalleluja()
  print("Halleluja!")
end


local function main()
  -- Create a timer with the task of incrementing
  -- the counter every few milliseconds
  local t1 = Timer({Period=500, OnTime=incrementCounter})
	
  -- start the task to sing whenever we hit 5 ticks
  whenever(counterHits5, sayHalleluja);

  -- start the task to stop after so many ticks
  when(timeRunsOut, stopWorld);
end

run(main)

I find the ‘when’ and ‘whenever’ constructs to be fairly useful when trying to create code that I intend to run in ‘parallel’. By ‘parallel’ here, I mean they are fairly self contained units which may or may not actually run on multiple cores, but at the very least, I can think of them conceptually as independent actions.

Thus far in my programming career, I’ve made fairly good usage of the ‘for’, ‘do-while’, ‘while’, and various other control structures. Now that I’m doing more parallel and async programming, I find the addition of the ‘when’ and ‘whenever’ primitives to be a fairly useful construct for describing how things should work in a parallel universe. I can think of my atomic parallel nuggets, and then just code those nuggets serially. Then, I can use the when, whenever, and spawn constructs to stitch together an entire system.

So, there you have it. As children build up their language skills based on an internal dialog, so too must my programming language skills be built. The ‘when’, ‘whenever’ and ‘spawn’ are the core primitives of my parallel programming inner dialog.


Multitasking single threaded UI – Gaming meets networking

There are two worlds that I want to collide. There is the active UI gaming world, then there’s the more passive networking server world. The difference between these two worlds is the kind of control loop and scheduler that they both typically use.

The scheduler for the networking server is roughly:

while true do
  waitFor(networkingEvent)
  
  doNetworkingStuff()
end

This is great, and works quite well to limit the amount of resources required to run the server because most of the time it’s sitting around idle. This allows you to serve up web pages with a much smaller machine than if you were running flat out, with say a “polling” api.

The game loop is a bit different. On Windows it looks something like this:

while true do
  ffi.fill(msg, ffi.sizeof("MSG"))
  local peeked = User32.PeekMessageA(msg, nil, 0, 0, User32.PM_REMOVE);
			
  if peeked ~= 0 then
    -- do regular Windows message processing
    local res = User32.TranslateMessage(msg)		
    User32.DispatchMessageA(msg)
  end

  doOtherStuffLikeRunningGamePhysics();
end

So, when I want to run a networking game, where I take some input from the internet, and incorporate that into the gameplay, I’ve got a basic problem. Who’s on top? Which loop is going to be THE loop?

The answer lies in the fact that the TINN scheduler is a basic infinite loop, and you can modify what occurs within that loop. Last time around, I showed how the waitFor() function can be used to insert a ‘predicate’ into the scheduler’s primary loop. So, perhaps I can recast the gaming loop as a predicate and insert it into the scheduler?

I have a ‘GameWindow’ class that takes care of creating a window, showing it on the screen and dealing with drawing. This window has a run() function which has the typical gaming infinite loop. I have modified this code to recast things in such a way that I can use the waitFor() as the primary looping mechanism. The modification make it look like this:

local appFinish = function(win)

  win.IsRunning = true
  local msg = ffi.new("MSG")

  local closure = function()
    ffi.fill(msg, ffi.sizeof("MSG"))
    local peeked = User32.PeekMessageA(msg, nil, 0, 0, User32.PM_REMOVE);
			
    if peeked ~= 0 then
      local res = User32.TranslateMessage(msg)
      User32.DispatchMessageA(msg)

      if msg.message == User32.WM_QUIT then
        return win:OnQuit()
      end
    end

    if not win.IsRunning then
      return true;
    end
  end

  return closure;
end

local runWindow = function(self)
	
  self:show()
  self:update()

  -- Start the FrameTimer
  local period = 1000/self.FrameRate;
  self.FrameTimer = Timer({Delay=period, Period=period, OnTime =self:handleFrameTick()})

  -- wait here until the application window is closed
  waitFor(appFinish(self))

  -- cancel the frame timer
  self.FrameTimer:cancel();
end

GameWindow.run = function(self)
  -- spawn the fiber that will wait
  -- for messages to finish
  Task:spawn(runWindow, self);

  -- set quanta to 0 so we don't waste time
  -- in i/o processing if there's nothing there
  Task:setMessageQuanta(0);
	
  Task:start()
end

Starting from the run() function. Only three things need to occur. First, spawn ‘runWindow’ in its own fiber. I want this routine to run in a fiber so that it is cooperative with other parts of the system that might be running.

Second, call ‘setMessageQuanta(0)’. This is a critical piece to get a ‘gaming loop’. This quanta is the amount of time the IO processing part of the scheduler will spend waiting for an IO event to occur. This time will be spent every time through the primary scheduler’s loop. If the value is set to 0, then effectively we have a nice runaway infinite loop for the scheduler. IO events will still be processed, but we won’t spend any time waiting for them to occur.

This has the effect of providing maximum CPU timeslice to various other waiting fibers. If this value is anything other than 0, let’s say ‘5’ for example, then the inner loop of the scheduler will slow to a crawl, providing no better than 60 iterations of the loop per second. Not enough time slice for a game. Setting it to 0 allows more like 3000 iteractions of the loop per second, which gives more time to other fibers.

That’s the trick of this integration right there. Just set the messageQuanta to 0, and away you go. To finish this out, take a look at the runWindow() function. Here just a couple of things are set in place. First, a timer is created. This timer will ultimately end up calling a ‘tick()’ function that the user can specify.

The other thing of note is the use of the waitFor(appCheck(self)). This fiber will block here until the “appCheck()” predicate returns true.

So, finally, the appFinish() predicate, what does that do?

Well, I’ll be darned if it isn’t the essence of the typical game window loop, at least the Windows message handling part of it. Remember that a predicate that is injected to the scheduler using “waitFor()” is executed every time through the scheduler’s loop, so the scheduler’s loop is essentially the same as the outer loop of a typical game.

With all this in place, you can finally do the following:


local GameWindow = require "GameWindow"
local StopWatch = require "StopWatch"

local sw = StopWatch();

-- The routine that gets called for any
-- mouse activity messages
function mouseinteraction(msg, wparam, lparam)
	print(string.format("Mouse: 0x%x", msg))
end

function keyboardinteraction(msg, wparam, lparam)
	print(string.format("Keyboard: 0x%x", msg))
end


function randomColor()
		local r = math.random(0,255)
		local g = math.random(0,255)
		local b = math.random(0,255)
		local color = RGB(r,g,b)

	return color
end

function randomline(win)
	local x1 = math.random() * win.Width
	local y1 = 40 + (math.random() * (win.Height - 40))
	local x2 = math.random() * win.Width
	local y2 = 40 + (math.random() * (win.Height - 40))

	local color = randomColor()
	local ctxt = win.GDIContext;

	ctxt:SetDCPenColor(color)

	ctxt:MoveTo(x1, y1)
	ctxt:LineTo(x2, y2)
end

function randomrect(win)
	local width = math.random(2,40)
	local height = math.random(2,40)
	local x = math.random(0,win.Width-1-width)
	local y = math.random(0, win.Height-1-height)
	local right = x + width
	local bottom = y + height
	local brushColor = randomColor()

	local ctxt = win.GDIContext;

	ctxt:SetDCBrushColor(brushColor)
	--ctxt:RoundRect(x, y, right, bottom, 0, 0)
	ctxt:Rectangle(x, y, right, bottom)
end


function ontick(win, tickCount)

	for i=1,30 do
		randomrect(win)
		randomline(win)
	end

	local stats = string.format("Seconds: %f  Frame: %d  FPS: %f", sw:Seconds(), tickCount, tickCount/sw:Seconds())
	win.GDIContext:Text(stats)
end



local appwin = GameWindow({
		Title = "Game Window",
		KeyboardInteractor = keyboardinteraction,
		MouseInteractor = mouseinteraction,
		FrameRate = 24,
		OnTickDelegate = ontick,
		Extent = {1024,768},
		})


appwin:run()

This will put a window on the screen, and draw some lines and rectangles at a frame rate of roughly 24 frames per second.

And thus, the two worlds have been melded together. I’m not doing any networking in this particular case, but adding it is no different than doing it for any other networking application. The same scheduler is still at play, and everything else will work as expected.

The cool thing about this is the integration of these two worlds does not require the introduction of multiple threads of execution. Everything is still within the same single threaded context that TINN normally runs with. If real threads are required, they can easily be added and communicated with using the computicles that are within TINN.


Alertable Predicates? What’s that?

TINN does async on I/O without much fuss.  It even allows you to suspend a ‘fiber’ by putting it to sleep.  You can easily execute a function on a timer, etc.  These are all good tools to have in the parallel programming toolbox.  There is one tool that has been missing for me though.  The fact that I can run things in parallel means that I need a mechanism to synchronize when some of those tasks complete.  I don’t want to introduce callbacks, because that leads down a whole different path.

I want to execute code that looks something like this:

local finalGoal = function()
  waitFor(goal1)
print("Goal 1 Attained.")
  waitFor(goal2)
print("Goal 2 Attained..")
  waitFor(goal3)
print("Goal 3 Attained...")
print("All Goals Attained!!")

  -- Once all the goals have been met
  -- stop the scheduler.
  stop();
end

The key here is the ‘waitFor()’ function call. You call this function with a ‘predicate’. A predicate is nothing more than a function that returns true or false. If it returns ‘false’ then the condition of the predicate has not been met, and the fiber will not continue. Once the predicate returns ‘true’, the fiber will continue from the point directly after it called ‘waitFor()’. So, to see this sample in its entirety:

-- test_waitFor.lua

local IOProcessor = require("IOProcessor")
local Timer = require("Timer")

local count = 0;


local goal1 = function()
  return count >= 5;
end

local goal2 = function()
  return count >= 10;
end

local goal3 = function()
  return count >= 15;
end

local finalGoal = function()
  waitFor(goal1)
print("Goal 1 Attained.")
  waitFor(goal2)
print("Goal 2 Attained..")
  waitFor(goal3)
print("Goal 3 Attained...")
print("All Goals Attained!!")

  -- Once all the goals have been met
  -- stop the scheduler.
  stop();
end

-- setup a timer to increment the count
-- every 500 milliseconds
local incrTimer = Timer({Period = 500; OnTime = function(timer) count = count+1 end})

spawn(finalGoal)	

run()

This is a contrived case, but it’s illustrative. Down at the bottom, there is a timer being setup with an anonymous function. The timer is set to go off every 500 milliseconds. Once it does, it will execute the associated code, and the count variable will be incremented by 1. That’s all the time needs to be concerned with.

There are three predicate functions setup, goal1, goal2, and goal3. Each one of these predicates checks the value of the count variable, and returns true or false depending on the criterial of the goal.

There is a separate function, ‘finalGoal’. This one is spawned separately, and it will wait for each of the goals to be met in turn. Which order it does these does not really matter in this case. Once all of the goals are met, the scheduler will be stopped and the program will end.

Those predicates can be anything really. They can make calls out to the internet, they can count, they can check the value of any variable, they can check any state of the running process. Whatever you want can be a predicate. Perhaps you want to stop accepting new connections on a web server once a memory limit has been reached. Whatever.

Having the finalGoal() as a separate fiber is useful because the whole fiber will essentially ‘block’ until the goals are all met. This allows you to have other parallel fibers running at the same time, and they will go along their merry way while finalGoal() is waiting for its conditions to be met.

The implementation of this predicate yielding is pretty straight forward. Within the scheduler, there are only two new functions:

IOProcessor.yieldUntilPredicate = function(self, predicate)
	if self.CurrentFiber~= nil then
		self.CurrentFiber.Predicate = predicate;
		table.insert(self.FibersAwaitingPredicate, self.CurrentFiber)

		return self:yield();
	end

	return false;
end

IOProcessor.stepPredicates = function(self)
	local nPredicates = #self.FibersAwaitingPredicate

	for i=1,nPredicates do
		local fiber = self.FibersAwaitingPredicate[1];
		if fiber.Predicate() then
			fiber.Predicate = nil;
			self:scheduleFiber(fiber);

			table.remove(self.FibersAwaitingPredicate, 1)
		end
	end
end

Then, waitFor() is just a shorthand for yieldUntilPredicate():

waitFor = function(predicate)
  return IOProcessor:yieldUntilPredicate(predicate)
end

The predicates are checked every time through the scheduler loop:

IOProcessor.step = function(self)
	self:stepTimeEvents();
	self:stepFibers();
	self:stepIOEvents();
	self:stepPredicates();
.
.
.

This is essentially the same as installing a very high priority fiber because all code for all predicates will run every time through the scheduler loop. The predicates do not need to be ‘scheduled’ like other tasks. A predicate should probably be a fairly short bit of code that runs quickly, and is not very resource intensive. They should be used sparingly.

Quite a few years ago, I was involved with the creation of LinQ in the .net frameworks. At that time, I had this nagging notion that if we could only add a ‘when predicate do’ construct to the CLR, that would be a fairly useful thing. I think using predicates for control flow is especially useful for parallel programming. I also think this construct is easier to use and implement than various other forms of barriers and whatnot.

So, there you have it. TINN supports async IO, timers, yielding on time, and yielding on predicates. Perhaps a predicate can be used to bridge the gap between the normal scheduler, and the event loop that is presented by the typical keyboard and mouse loop.

 


My Head In The Cloud – putting my code where my keyboard is

I have written a lot about the greatness of LuaJIT, coding for the internet, async programming, and the wonders of Windows. Now, I have finally reached a point where it’s time to put the code to the test.

I am running a service in Azure: http://nanotechstyles.cloudapp.net

This site is totally fueled by the work I have done with TINN. It is a static web page server with a couple of twists.

First of all, you can access the site through a pretty name:
http://www.nanotechstyles.com

If you just hit the site directly, you will get the static front page which has nothing more than an “about” link on it.

If you want to load up a threed model viewing thing, hit this:
http://www.nanotechstyles.com/threed.html

If you want to see what your browser is actually sending to the server, then hit this:
http://www.nanotechstyles.com/echo

I find the echo thing to be interesting, and I try hitting the site using different browsers to see what they produce.  This kind of feedback makes it relatively easy to do rapid turnarounds on the webpage content, challenging my assumptions and filling in the blanks.

The code for this web server is not very complex.  It’s the same standard ‘main’ that I’ve used in the past:

local resourceMap = require("ResourceMap");
local ResourceMapper = require("ResourceMapper");
local HttpServer = require("HttpServer")

local port = arg[1] or 8080

local Mapper = ResourceMapper(resourceMap);

local obj = {}

local OnRequest = function(param, request, response)
	local handler, err = Mapper:getHandler(request)


	-- recycle the socket, unless the handler explictly says
	-- it will do it, by returning 'true'
	if handler then
		if not handler(request, response) then
			param.Server:HandleRequestFinished(request);
		end
	else
		print("NO HANDLER: ", request.Url.path);
		-- send back content not found
		response:writeHead(404);
		response:writeEnd();

		-- recylce the request in case the socket
		-- is still open
		param.Server:HandleRequestFinished(request);
	end

end

obj.Server = HttpServer(port, OnRequest, obj);
obj.Server:run()

In this case, I’m dealing with the OnRequest() directly, rather than using the WebApp object.  I’m doing this because I want to do some more interactions at this level that the standard WebApp may not support.

Of course the ‘handlers’ are where all the fun is. I guess it makes sense to host the content of the site up on the site for all to see and poke fun at.

My little experiment here is to give my code real world exposure, with the intention of hardening it, and gaining practical experience on what a typical web server is likely to see out in the wild.

So, if you read this blog, go hit those links. Soon enough, perhaps I will be able to serve up my own blog using my own software. That’s got a certain circular reference to it.


Pinging Peers – Creating Network Meshes

I have the need to create a mesh of network nodes on occasion. The configuration of the mesh is typically a set of VMs running on some cloud service or another. The code running on these VMs may perform some basic task, probably similar across all the VMs. They do need to communicate information with each other, and to do that, they need to maintain a list of their peers and the latencies between them, and the like.

So, I started modeling this thing called the Boundary Gateway Protocol (BGP), which is one of the ancient low lying protocols of the internet. I model the protocol by first creating an object BGPPeer. The intention of this object is to act as both the host and client of the fundamental mesh communications. It looks like this:

-- BGProtocol.lua
--[[
	Implementation of Boundary Gateway Protocol (BGP)
--]]

local ffi = require("ffi");
local Timer = require("Timer")
local IOCPSocket = require("IOCPSocket");
local SocketUtils = require("SocketUtils")
local Network = require("Network")

--[[
	BGP Relies on the presence of a fully connected graph
	The Protocol object is initialized by reading a list of nodes
	and creating the right channels between each of the nodes.
--]]

--[[
print("ComputerNameNetBIOS: ", Network.getHostName(ffi.C.ComputerNameNetBIOS))
print("ComputerNameDnsHostname: ", Network.getHostName(ffi.C.ComputerNameDnsHostname))
print("ComputerNameDnsDomain: ", Network.getHostName(ffi.C.ComputerNameDnsDomain))
print("ComputerNameDnsFullyQualified: ", Network.getHostName(ffi.C.ComputerNameDnsFullyQualified))
print("ComputerNamePhysicalNetBIOS: ", Network.getHostName(ffi.C.ComputerNamePhysicalNetBIOS))
print("ComputerNamePhysicalDnsHostname: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsHostname))
print("ComputerNamePhysicalDnsDomain: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsDomain))
print("ComputerNamePhysicalDnsFullyQualified: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsFullyQualified))
--]]

local selfhost = Network.getHostName(ffi.C.ComputerNameDnsHostname):lower()
--print("Self Host: ",selfhost)

local BGPPeer = {
  HeartbeatInterval = 50 * 1000;	-- send out a hearbeat on this interval
  UdpPort = 1313;					-- port used to communicate UDP
  EliminateSelf = true;			-- don't consider self to be a peer
}
setmetatable(BGPPeer, {
  __call = function(self, ...)
    return self:create(...)
  end,
})

local BGPPeer_mt = {
  __index = BGPPeer,
}

BGPPeer.init = function(self, config)
  local err
  local obj = {
    Config = config,
    Peers = {},
  }

  -- if there is config information, then use
  -- it to setup sockets to the peer
  if config then
    -- create the client socket
    obj.Name = config.name;
    obj.UdpSender, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
    obj.UdpPeerAddress, err = SocketUtils.CreateSocketAddress(config.host, config.udpport or BGPPeer.UdpPort)


    obj.UdpPeerAddressLen = ffi.sizeof(obj.UdpPeerAddress);
  end

  setmetatable(obj, BGPPeer_mt)

  return obj;
end

BGPPeer.create = function(self, configfile)
  -- load the configuration file
  local fs, err = io.open(configfile, "rb")
	
  if not fs then return nil, "could not load file" end

  local configdata = fs:read("*all")
  fs:close();
	
  local func, err = loadstring(configdata)
  local config = func()

  -- create self, so peers can be added
  local obj = self:init()

  -- add the udp socket
  -- Setup the server socket
  obj.UdpListener, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
  if not obj.UdpListener then
    return nil, err;
  end

  local success, err = obj.UdpListener:bindToPort(BGPPeer.UdpPort);

  if not success then
    return nil, err;
  end

  -- for each of the peers within the config 
  -- create a new peer, and add it to the 
  -- list of peers for the first object
  for _,peerconfig in ipairs(config) do
		--print("PEER: ", peerconfig.name, peerconfig.host)
    if peerconfig.name:lower() ~= selfhost then
      obj:createPeer(peerconfig)
    end
  end

  -- Need to setup a listener for the UDP port

  -- add the heartbeat timer
  obj.HeartbeatTimer = Timer({
		Delay =  BGPPeer.HeartbeatInterval;
		Period = BGPPeer.HeartbeatInterval;
		OnTime = obj:onHeartbeat();})

  -- start a fiber which will run the UDP listener loop
  spawn(obj:handleUdpListen())

  return obj;
end

BGPPeer.onHeartbeat = function(self)
  local message = "HELLO FROM: "..Network:getHostName();
  local closure = function(timer)

  for peername, peer in pairs(self.Peers) do
    local bytessent, err = peer.UdpSender:sendTo(peer.UdpPeerAddress, peer.UdpPeerAddressLen, 
      message, #message);

    end
  end

  return closure
end

BGPPeer.addPeer = function(self, peer)
  self.Peers[peer.Name] = peer
end

BGPPeer.createPeer = function(self, peerconfig)
  self:addPeer(BGPPeer:init(peerconfig));
end

local count = 0;
BGPPeer.onReceiveHeartBeat = function(self, buff, bytesread)
  count = count + 1
  print("onReceiveHeartBeat: ", count, ffi.string(buff, bytesread));
end


BGPPeer.handleUdpListen = function(self)
  local bufflen = 1500;
  local buff = ffi.new("uint8_t[1500]");
  local from = sockaddr_in();
  local fromLen = ffi.sizeof(from);


  local closure = function()

    while true do
      local bytesread, err = self.UdpListener:receiveFrom(from, fromLen, buff, bufflen);

      if not bytesread then
        print("receiveFrom ERROR: ", err)
        return false, err;
      end

      self:onReceiveHeartBeat(buff, bytesread)
    end
  end

  return closure
end

return BGPPeer

There are a couple things of note here. One of the basic problems I need to solve is the fact that this is not a basic client/server app, but rather a peer to peer setup. As such, each peer both listens on a port, and sends data.

A typical network ‘server’ app may block on listening, and not perform any actions until it receives a request from a ‘client’. In my case, I don’t want to block, but I do want to initiate a ‘blocking’ loop in parallel with some non-blocking stuff.

In this BGPPeer, there is the handleUdpListen() function. This function is the heart of the Udp listening loop, or the “server” side of the peer communication. This function is run in parallel as it is invoked in the constructor of the object using this:

  spawn(obj:handleUdpListen())

In this particular case, the handleUdpListen() function contains a closure (another function) which is the function that is actually returned, and spawned in its own fiber. The function is pretty straight forward.

  local closure = function()

    while true do
      local bytesread, err = self.UdpListener:receiveFrom(from, fromLen, buff, bufflen);

      if not bytesread then
        print("receiveFrom ERROR: ", err)
        return false, err;
      end

      self:onReceiveHeartBeat(buff, bytesread)
    end
  end

Basically, just run a loop forever, waiting to receiveFrom some Udp packet. Since in TINN all IO calls are cooperative, while this receiveFrom is ‘blocked’, the current coroutine is actually yielding, so that other work can continue. That’s great, that’s exactly what I want. So, now the listening side is setup.

How about the sending side? I want to send out a formatted packet on a regular interval. Since TINN has a Timer object, this is a pretty straight forward matter. Within the constructor of the BGPPeer, we find the following:

  -- add the heartbeat timer
  obj.HeartbeatTimer = Timer({
		Delay =  BGPPeer.HeartbeatInterval;
		Period = BGPPeer.HeartbeatInterval;
		OnTime = obj:onHeartbeat();})

With this setup, the timer will execute the ‘onHeartbeat()’ function every “Period”. This is some number of milliseconds. The default is 50 seconds, but it can be configured to any value.

As we saw from the last article on Timers, this timer object will start automatically, and run cooperatively with anything else running in the process. That’s great. So, onHeartbeat just needs to send out information to all its peers whenever it is called.

BGPPeer.onHeartbeat = function(self)
  local message = "HELLO FROM: "..Network:getHostName();
  local closure = function(timer)

  for peername, peer in pairs(self.Peers) do
    local bytessent, err = peer.UdpSender:sendTo(peer.UdpPeerAddress, peer.UdpPeerAddressLen, 
      message, #message);

    end
  end

  return closure
end

This becomes a simple matter of traversing the list of peers, and using the Udp socket that was setup with each one of them to send out the packet we want to send out.

What’s that altogether then? We saw the spawn that sent the Udp listener off in a cooperative manner. And just now I showed the timer going in parallel. The two combined give you a model where you can both listen and send ‘at the same time’.

It looks so easy, and that’s the power of TINN, the scheduler, and Lua’s coroutines. It’s fairly easy to write such code without having to worry overly much about old school things like mutexes, thread context switching, and the like. Just write your code however you want, sprinkle in a few ‘spawn()’ functions here and there, and call it a day. This is how easy it is to write network programs that can walk and chew gum at the same time.

Next time around, I want to write the simple handler that can deal with Win32 message looping and peer networking at the same time. Same general mechanisms, just a cooperative User32 message pump to incorporate.


How About that Web Server Again?

Has it really been more than a month?

Well, you see, I bought a house, packed, moved, took daughter to college, and…

The last time I wrote about making a simple WebServer, I left the job half finished. I showed that through layering, from a basic socket server, you could then build a simple http handler, which had not much more than an ‘OnRequest’ function.

That works great, and gives you the right level of access if you’re going to do things with http in general, that are beyond simple static file serving. But what if your interactions are more complex than you care to deal with using simple ‘if-then-else’ logic.

This time around, I’m going to use a new and improved form of the WebApp which first of all deals with the new and improved IOCP based sockets, and leverages the HttpServer object of yore.

Here is the basic web server:

-- main.lua
local WebApp = require("WebApp")
local resourceMap = require("ResourceMap");

local port = arg[1] or 8080

local app = WebApp(resourceMap, port);
app:run();

And you simply invoke it with:

tinn main.lua 8080

Well, that seems easy enough. If you wanted to get real crazy, and win an obfuscated code competition, you could do it in one line:

-- main.lua
require("WebApp")(require("ResourceMap"), arg[1] or 8080):run()

Or, if you’re a real Lua head, you could do it all on the command line, without even using the ‘main.lua’ file:

tinn -e "require('WebApp')(require('ResourceMap'), arg[1] or 8080):run()"

That’s all fun and games, and it’s cool to see that you can implement a web server in a single command line. But what’s in that magical ResourceMap file?

local URL = require("url");
local StaticService = require("StaticService");

local function replacedotdot(s)
  return string.gsub(s, "%.%.", '%.')
end

local HandleFileGET = function(request, response)
  local absolutePath = replacedotdot(URL.unescape(request.Url.path));
  local filename = './wwwroot'..absolutePath;

  StaticService.SendFile(filename, response)
  return false;
end

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
  };
}

return ResourceMap;

The idea here is that we want to route any ‘GET’ methods to the HandleFileGET() function. There is a ResourceMapper object within tinn that utilizes a structurce such as the one in ResourceMap. The general layout is {[pattern] = {name=””, METHOD = function [,METHOD = function]},}

Using this simple mechanism, you can easily route the handling of a particular resource request to a particular function.

This table can have entries that are much more interesting. For example, if you want to handle the retrieval of ‘favicon.ico’ differently than other files, you can just add a specific mapping for that.

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

You could have multiple methods per resource as well:

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
    POST = HandleFilePOST,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

In general, the longest prefix match algorithm is applied to whatever is supplied within the resource field of the request. If you want to deal with all methods of a particular resource, without having to specify them explicitly, then you can use the magic method ‘_DEFAULT’.

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
    POST = HandleFilePOST,
    _DEFAULT = HandleFileDEFAULT,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

In this way, if there is a request for a resource, and a method that we don’t know about at the time of creating the map, the HandleFileDEFAULT() function will be called to deal with it. That might be handy in the case where you’d like to handle these unknown method requests in a generic way, or you might want to change it over time without having to change your mapping.

Another case is when the resource isn’t actually a resource. Take for example a ‘CONNECT’ request:

CONNECT localhost:9000

In this case, the ‘resource’ does not start with a ‘/’, so the longest prefix match won’t land it on anything in our map. I need to deal with these with a different pattern. Well, the pattern part of the map is nothing more than a standard pattern in the Lua sense of the word, so a ‘.’ will l match any character. The following map will do the trick.

local ResourceMap = {
  ["."] = {name=".",
    CONNECT = HandleCONNECT,
  };

  ["/"] = {name="/",
    GET = HandleFileGET,
    POST = HandleFilePOST,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

In this way, we’ll deal with the CONNECT method if it shows up.

This is an affirmative list. If there is a match to one of the patterns, then the mapped function is executed. If no pattern is found, either because the resource itself does not match, or because the resource does not have a function to handle the specified method, then a general 404 error is returned.

That’s about all there is to it. Create a mapping between URLs and functions, and you’re all done. Of course there’s the function itself:

local URL = require("url");
local StaticService = require("StaticService");

local function replacedotdot(s)
  return string.gsub(s, "%.%.", '%.')
end

local HandleFileGET = function(request, response)
  local absolutePath = replacedotdot(URL.unescape(request.Url.path));
  local filename = './wwwroot'..absolutePath;
	
  StaticService.SendFile(filename, response)

  return false;
end

Not a lot of magic in this particular one. First of all, there’s that simplistic ‘replacedotdot()’ function. That’s just a casual attempt to restrict the file access to the directory of our choosing. In HandleFileGET, the first thing to do is urldecode the path specified, then feed that to the replacedotdot() function. Then, take whatever comes out of that, and prepend it with ‘./wwwroot’, and finally feed that to the StaticService.SendFile() function, which is a standard part of TINN.

The return value of this function has meaning. If you return false, or ‘nil’, then the socket representing this particular request will be recycled, assuming there is potentially another request coming on the same socket.

If you instead return ‘true’, then the system assumes you are handling any subsequent recycling, or closing, and it will not take any further action with the underlying socket.

This is a nice feature in that it allows you to construct much more interesting interactions than the standard request/response, without much fuss. For example, you could easily open up websockets, upgrade connections in general, or do whatever you want.

At any rate, with TINN, you can create a simple web server in a single command line. I find that to be a fairly useful thing.