schedlua – predicates and alarms

A few years back, during the creation of Language Integrated Query (LINQ), I had this idea. If we could add database semantics to the language, what would adding async semantics look like. These days we’ve gone the route of async/await and various other constructs, but still, I always just wanted a primitives. such as “when”, “whenever”, and “waitUntil”. The predicates in schedlua are just that:

  • signalOnPredicate(predicate, signalName)
  • waitForPredicate(predicate)
  • when(predicate, func)
  • whenever(predicate, func)

Of courese, this is all based at the very core on the signaling mechanism that’s in the kernel of schedlua, but these primitives are not in the kernel proper.  They don’t need to be, which is nice because it means you can easily add such functions without having to alter the core.

What do they look like in practice?  Well, first of all, a ‘predicate’ is nothing more than a fancy name for a function that returns a bool value.  It will either return ‘true’ or ‘false’.  Based on this, various things can occur.  For example, ‘signalOnPredicate’, when the predicate returns ‘true’, emit the signal specified by signalName.  Similarly, for ‘waitForPredicate’, the currently running task will be put into a suspended state until such time as the predicate returns ‘true’.  ‘when’ and ‘whenever’ are similar, but they spawn new tasks, rather than suspending the existing task.  And here’s some code:

 

--test_scheduler.lua
package.path = package.path..";../?.lua"

local Functor = require("functor")
local Kernel = require("kernel"){exportglobal = true}
local Predicate = require("predicate")(Kernel, true)

local idx = 0;
local maxidx = 100;

local function numbers(ending)
	local function closure()
		idx = idx + 1;
		if idx > ending then
			return nil;
		end
		return idx;
	end
	
	return closure;
end



local function counter(name, nCount)
	for num in numbers(nCount) do
		print(num)
		local eventName = name..tostring(num);
		--print(eventName)
		signalOne(eventName);

		yield();
	end

	signalAll(name..'-finished')
end


local function predCount(num)
	waitForPredicate(function() return idx > num end)
	print(string.format("PASSED: %d!!", num))
end



local function every5()
	while idx <= maxidx do
		waitForPredicate(function() return (idx % 5) == 0 end)
		print("!! matched 5 !!")
		yield();
	end
end

local function test_whenever()
	local t1 = whenever(
		function() 
			if idx >maxidx then return nil end; 
			return (idx % 2) == 0 end,
		function() print("== EVERY 2 ==") end)
end

local function main()
	local t1 = spawn(counter, "counter", maxidx)

	local t6 = spawn(test_whenever);

	local t2 = spawn(predCount, 12)
	local t3 = spawn(every5)
	local t4 = spawn(predCount, 50)
	local t5 = when(
		function() return idx == 75 end, 
		function() print("WHEN IDX == 75!!") end)
	local t6 = when(function() return idx >= maxidx end,
		function() halt(); end);


end

run(main)



It’s a test case, so it’s doing some contrived things.  Basically, there is one task that is running a counter that throws a signal for every new number (up to maxid).  Then you can see the various tests which use predicates.

local function every5()
	while idx <= maxidx do
		waitForPredicate(function() return (idx % 5) == 0 end)
		print("!! matched 5 !!")
		yield();
	end
end

Here, we’re just running a continuous loop which will print it’s message every time the predicate is true. It seems kind of wasteful doesn’t it? Under normal circumstances, this would be a very hot spin loop, but when you call ‘waitForPredicate’, you task will alctually be thrown into a ‘suspended’ state, which means if there are other tasks to execute, they’ll go ahead, and you’ll get back in the queue to be tested later. So, it’s really, “test this predicate, if it’s not true, then throw the task at the back of the ready list, and test it again later. If it’s true, then continue on with whatever is next in this task”. The ‘yield()’ here is redundant.

In this particular case, we’ve essentially created a ‘whenever’ construct. This construct happens enough that it’s worth creating a convenience function for it.

local function test_whenever()
	local t1 = whenever(
		function() 
			if idx >maxidx then return nil end; 
			return (idx % 2) == 0 end,
		function() print("== EVERY 2 ==") end)
end

In this particular case, we let the ‘whenever’ construct do the work for us. Every other count, we’ll print our message. Of course, I’m using in place functions (lambda expressions?) in these test cases. They don’t have to be that way, you can set the functions however you want.

t6 is interesting because it says, ‘when the count reaches maxidx, halt the program’, which will in fact break us out of the main even loop and stop the program. Very convenient. This construct is useful because there may be myriad reasons why you’d want to stop the program. You can simply setup a predicate to do that. It could be a ‘when’ or perhaps you’d rather it be based on a signal, in that case use a signalOnPredicate/waitForSignal sort of thing. It’s composable, so use whatever set of constructs makes the most sense. It’s kind of a sane form of exception handling.

So there you have it, yet another construct tackled. Predicates are a simple construct that kind of extend the if/then flow control into the async realm. ‘when’ is almost a direct replacement for ‘if’ in this context. The waitOnPredicate is kind of a new construct I think. It’s like an if/spinlock, except you’re not spinning, you’re suspended with periodic checks on the predicate. And then of course the ‘signalOnPredicate’ is like a hail Mary pass. You don’t know who/what is going to respond, but you’re going to send up the signal. That’s like programming with interrupts, except, unless the scheduler allows for high priority interrupts/signals, these will be handled in the normal flow of cooperative processing.

Predicates are great, they’re a slightly different construct than I’ve typically used in my every day programming. They make some tasks a lot easier, and they make thinking about async programming more manageable.

And then there’s time…

Time is a lot easier construct to think about, because it’s well represented in most language frameworks already. Here are the time primitives:
 

  • waitUntilTime
  • sleep
  • delay
  • periodic

 
‘waitUntilTime’ is the lynch pin in this case. It will suspend the current task until the specified time. Time, in this case is a relative thing. The alarm module keeps its own clock, so everything is expressed relative to that clock.

sleep(seconds), will simply suspend the current task for a specified number of seconds. You can specify fractions of seconds, and the clock has nanosecond precision, but we’re not using a realtime scheduler, so you’ll get some amount of delay. Of course you could simply swap in a new scheduler and deal with any realtime requirements you might have.

delay, will spawn a task which will then execute the specified function after the specified amount of time has passed. This is kind of like a ‘when’ predicate, with a specialization for time. You could in fact reconstruct this primitive using the when predicate, but the alarm, knowing about time as it does, will do it more efficiently.

local Kernel = require("kernel"){exportglobal = true};
local Alarm = require("alarm")(Kernel)
local Clock = require("clock")

local c1 = Clock();

local function twoSeconds()
	print("TWO SECONDS: ", c1:secondsElapsed());
	Kernel:halt();
end

local function test_alarm_delay()
	print("delay(twoSeconds, 2000");
	Alarm:delay(twoSeconds, 2000);
end

run(test_alarm_delay)

periodic is similar, in that it well execute a function, but whereas ‘delay’ is a oneshot event, ‘periodic’ will repeat. In this way it is like the ‘whenever’ construct.

And there you have it. Between the predicates and the alarms, you have some new basic constructs for doing async programming. They are supported by the signaling construct that’s already a part of the kernel. They are simple add-ons, which means you can easily create your own constructs and replace these, or create completely new constructs which are similar. They can leverage the signaling mechanism, or maybe they want to do something else altogether.

So far, the constructs have been of the if/then variety, only in async fashion. I think there’s another set of constructs, which have to do with barriers and completions of tasks. That will clean up the other part of async, which is the ‘await’. We’ll see. In the meanwhile, next time, async io, which is pretty exciting.


schedlua – the kernel

Previously, I talked about the scheduler within the new scedlua.  A scheduler is a fairly simple thing, it just decides which of the many ready tasks will run next.  The default scheduler follows a fairly simple FIFO strategy, so there are no priorities, favorites, or the like.  Of course this wouldn’t be any fun if you were stuck with just one scheduler, so naturally enough this is an easily pluggable part of the system.  But what does this plugging?

In steps the Kernel.  In general, the schedlua project is about creating a set of tools by which highly performant services can be constructed.  schedlua largely supports the concept that a single processor can be highly leveraged if programmed correctly.  It does not try to gain performance through the usage of multiple threads, but rather it just takes on the task of suspending various tasks which are blocked on IO or otherwise idle, and letting tasks which are ready to run do their thing.  The concurrency model is cooperative, not preemptive, so if any one task misbehaves, the process can become stuck.

So, let’s take a look at this code:

--kernel.lua
-- kernel is a singleton, so return
-- single instance if we've already been
-- through this code
print("== KERNEL INCLUDED ==")

local Scheduler = require("scheduler")
local Task = require("task")
local Queue = require("queue")
local Functor = require("functor")

local Kernel = {
	ContinueRunning = true;
	TaskID = 0;
	Scheduler = Scheduler();
	TasksSuspendedForSignal = {};
}

setmetatable(Kernel, {
    __call = function(self, params)
    	params = params or {}
    	params.Scheduler = params.Scheduler or self.Scheduler
    	
    	if params.exportglobal then
    		self:globalize();
    	end

    	self.Scheduler = params.Scheduler;

    	return self;
    end,
})

function Kernel.getNewTaskID(self)
	self.TaskID = self.TaskID + 1;
	return self.TaskID;
end

function Kernel.getCurrentTaskID(self)
	return self:getCurrentTask().TaskID;
end

function Kernel.getCurrentTask(self)
	return self.Scheduler:getCurrentTask();
end

function Kernel.spawn(self, func, ...)
	local task = Task(func, ...)
	task.TaskID = self:getNewTaskID();
	self.Scheduler:scheduleTask(task, {...});
	
	return task;
end

function Kernel.suspend(self, ...)
	self.Scheduler:suspendCurrentFiber();
	return self:yield(...)
end

function Kernel.yield(self, ...)
	return self.Scheduler:yield();
end


function Kernel.signalOne(self, eventName, ...)
	if not self.TasksSuspendedForSignal[eventName] then
		return false, "event not registered", eventName
	end

	local nTasks = #self.TasksSuspendedForSignal[eventName]
	if nTasks < 1 then
		return false, "no tasks waiting for event"
	end

	local suspended = self.TasksSuspendedForSignal[eventName][1];

	self.Scheduler:scheduleTask(suspended,{...});
	table.remove(self.TasksSuspendedForSignal[eventName], 1);

	return true;
end

function Kernel.signalAll(self, eventName, ...)
	if not self.TasksSuspendedForSignal[eventName] then
		return false, "event not registered"
	end

	local nTasks = #self.TasksSuspendedForSignal[eventName]
	if nTasks < 1 then
		return false, "no tasks waiting for event"
	end

	for i=1,nTasks do
		self.Scheduler:scheduleTask(self.TasksSuspendedForSignal[eventName][1],{...});
		table.remove(self.TasksSuspendedForSignal[eventName], 1);
	end

	return true;
end

function Kernel.waitForSignal(self, eventName)
	local currentFiber = self.Scheduler:getCurrentTask();

	if currentFiber == nil then
		return false, "not currently in a running task"
	end

	if not self.TasksSuspendedForSignal[eventName] then
		self.TasksSuspendedForSignal[eventName] = {}
	end

	table.insert(self.TasksSuspendedForSignal[eventName], currentFiber);

	return self:suspend()
end

function Kernel.onSignal(self, func, eventName)
	local function closure()
		self:waitForSignal(eventName)
		func();
	end

	return self:spawn(closure)
end



function Kernel.run(self, func, ...)

	if func ~= nil then
		self:spawn(func, ...)
	end

	while (self.ContinueRunning) do
		self.Scheduler:step();		
	end
end

function Kernel.halt(self)
	self.ContinueRunning = false;
end

function Kernel.globalize()
	halt = Functor(Kernel.halt, Kernel);
    onSignal = Functor(Kernel.onSignal, Kernel);

    run = Functor(Kernel.run, Kernel);

    signalAll = Functor(Kernel.signalAll, Kernel);
    signalOne = Functor(Kernel.signalOne, Kernel);

    spawn = Functor(Kernel.spawn, Kernel);
    suspend = Functor(Kernel.suspend, Kernel);

    waitForSignal = Functor(Kernel.waitForSignal, Kernel);

    yield = Functor(Kernel.yield, Kernel);
end

return Kernel;

 

From the top, you can see the Kernel requires the scheduler, task and functor. The scheduler has already been explained. The Kernel serves a couple of purposes. First of all, it manages the scheduler. The ‘run’ function at the bottom is the ‘loop’ of the application. It will run until ‘halt’ is called. Each time through the loop it’s telling the scheduler to take a step.

Also at the bottom, you can see usage of the Functor. A functor is just a simple convenience wrapper that helps you call a function on a table at a later point. Those functors are used to make the keywords global.

There are two primary things the kernel does. One is to spawn new tasks, the other is to provide a central point for signal handling.

First, let’s look at the ‘spawn’.

function Kernel.spawn(self, func, ...)
	local task = Task(func, ...)
	task.TaskID = self:getNewTaskID();
	self.Scheduler:scheduleTask(task, {...});
	
	return task;
end

This is actually the coolest part of the system in terms of how the programming model is expressed. Here’s an example of it in use.

local Kernel = require("kernel"){exportglobal = true}


local function numbers(ending)
	local idx = 0;
	local function closure()
		idx = idx + 1;
		if idx > ending then
			return nil;
		end
		return idx;
	end
	
	return closure;
end

local function task1()
	print("first task, first line")
	yield();
	print("first task, second line")
end

local function task2()
	print("second task, only line")
end

local function counter(name, nCount)
	for num in numbers(nCount) do
		print(name, num);
		yield();
	end
	halt();
end

local function main()
	local t0 = spawn(counter, "counter1", 5)
	local t1 = spawn(task1)
	local t2 = spawn(task2)
	local t3 = spawn(counter, "counter2", 7)
end

run(main)

Basically, any time you want something to happen concurrently, you just say ‘spawn(func, params) and that’s that.

What happens is a Task object is created which holds onto the function object as well as the initial set of parameters. This task is then sent to the scheduler to be run. From then on out you can forget about it. Of course, you’re handed the task when you say ‘spawn’, so you do have a chance of suspending and killing it off in the future if you like. Similarly, you can wait for a task to complete as well.

So, that’s spawning.

The other major feature in the kernel is signal handling.

function Kernel.waitForSignal(self, eventName)
	local currentFiber = self.Scheduler:getCurrentTask();

	if currentFiber == nil then
		return false, "not currently in a running task"
	end

	if not self.TasksSuspendedForSignal[eventName] then
		self.TasksSuspendedForSignal[eventName] = {}
	end

	table.insert(self.TasksSuspendedForSignal[eventName], currentFiber);

	return self:suspend()
end

This is probably THE most important routine in the whole system. Basically, take the current task, and put it onto the suspension list, connected with the signal name (signals are just a string value). Later on, when you want the task to resume, you would signal it using either signalOne(), or signalAll().

A little bit of code demonstrating this:

local Kernel = require("kernel"){exportglobal = true}
local Functor = require("functor")

local function numbers(ending)
	local idx = 0;
	local function closure()
		idx = idx + 1;
		if idx > ending then
			return nil;
		end
		return idx;
	end
	
	return closure;
end

local function waitingOnCount(name, ending)
	local eventName = name..tostring(ending)
	waitForSignal(eventName)

	print("REACHED COUNT: ", ending)
end

local function onCountFinished(name)
	print("Counter Finished: ", name)
end

local function counter(name, nCount)
	for num in numbers(nCount) do
		print(num)
		local eventName = name..tostring(num);
		--print(eventName)
		signalOne(eventName);

		yield();
	end

	signalAll(name..'-finished')
end

local function main()
	local t1 = spawn(counter, "counter", 50)
	local t2 = spawn(waitingOnCount, "counter", 20)
	local t3 = spawn(function() print("LAMDA"); waitForSignal("counter15") print("reached 15!!") end)
	
	-- test signalAll().  All three of these should trigger when
	-- counter finishes
	local t13 = onSignal(Functor(onCountFinished, "counter-1"), "counter-finished")
	local t14 = onSignal(Functor(onCountFinished, "counter-2"), "counter-finished")
	local t15 = onSignal(Functor(onCountFinished, "counter-3"), "counter-finished")
end

run(main)

Here, there is a counter(), which is just counting, and firing off a signal for each number. The various waitingOnCount(), and LAMBDA routines are going to respond to the appropriate signals.

Finally, the t13, t14, and t15 tasks are waiting for the “counter-finished” signal, and they will all fire off and print their little message.

Of course, at this point you could have something that would call ‘halt()’ so you don’t have to press Ctl-C to stop the process, but you get the idea.

And that’s pretty much it for the kernel. Absent from here are the async io, predicates, alarms and the like. They are available in schedlua, but they’re not a part of the kernel. Instead of being part of the kernel proper, these are essentially modules. They utilize the signal and spawn features built into the kernel, and they’re free to do their own thing.

I’ll get into the details of alarms and predicates next time around to demonstrate the concept of easy add-on modules.


Schedulers revisited

A ran quite a long series on the buildup of the TINN scheduler, including this gem on the primitives included in the application model: Parallel Conversations

The scheduler developed in that whole series served me very well, taking on timing, predicates, signals, and async IO.  It was all based on Windows primitives, and although fairly modular, was an evolved codebase, which had some lumps and inefficiencies.  So now, two years on, I decided to reimagine that scheduler.  Primary reasons?  I had some time on my hands, and I wanted a scheduler that works equally well on Linux as on Windows.

And so, the schedlua project was born.

What were the design criteria this time around?

  • Reduce Size and complexity
  • Increase composability
  • Do not force features where they’re not desired

In the previous incarnation, I went back and forth between having an “Application” object, and the “Scheduler”.  Things were primarily driven by the Application, and it had a central role in the instance of any running app.  I still favor having this central role, but I changed the meaning and composition of that central player this time around.  First though, there is the “Scheduler”.


local ffi = require("ffi");

local Queue = require("queue")
local Task = require("task");


--[[
	The Scheduler supports a collaborative processing
	environment.  As such, it manages multiple tasks which
	are represented by Lua coroutines.

	The scheduler works by being the arbitrator of which routine is running
	in the application.  When work is to be done, it is encapsulated in 
	the form of a task object.  The scheduler relies upon that task object
	to call a form of 'yield' at some point, at which time it will receive
	the main thread of execution, and will pick the next task out of the ready
	list to run.
--]]
local Scheduler = {}
setmetatable(Scheduler, {
	__call = function(self, ...)
		return self:create(...)
	end,
})
local Scheduler_mt = {
	__index = Scheduler,
}

function Scheduler.init(self, ...)
	local obj = {
		TasksReadyToRun = Queue();
	}
	setmetatable(obj, Scheduler_mt)
	
	return obj;
end

function Scheduler.create(self, ...)
	return self:init(...)
end


function Scheduler.tasksPending(self)
	return self.TasksReadyToRun:length();
end


-- put a task on the ready list
-- the 'task' should be something that can be executed,
-- whether it's a function, functor, or something that has a '__call'
-- metamethod implemented.
-- The 'params' is a table of parameters which will be passed to the function
-- when it's ready to run.
function Scheduler.scheduleTask(self, task, params)
	--print("Scheduler.scheduleTask: ", task, params)
	params = params or {}
	
	if not task then
		return false, "no task specified"
	end

	task:setParams(params);
	self.TasksReadyToRun:enqueue(task);	
	task.state = "readytorun"

	return task;
end

function Scheduler.removeFiber(self, fiber)
	--print("REMOVING DEAD FIBER: ", fiber);
	return true;
end

function Scheduler.inMainFiber(self)
	return coroutine.running() == nil; 
end

function Scheduler.getCurrentFiber(self)
	return self.CurrentFiber;
end

function Scheduler.suspendCurrentFiber(self, ...)
	self.CurrentFiber.state = "suspended"
end

function Scheduler.step(self)
	-- Now check the regular fibers
	local task = self.TasksReadyToRun:dequeue()

	-- If no fiber in ready queue, then just return
	if task == nil then
		print("Scheduler.step: NO TASK")
		return true
	end

	if task:getStatus() == "dead" then
		self:removeFiber(task)

		return true;
	end

	-- If the task we pulled off the active list is 
	-- not dead, then perhaps it is suspended.  If that's true
	-- then it needs to drop out of the active list.
	-- We assume that some other part of the system is responsible for
	-- keeping track of the task, and rescheduling it when appropriate.
	if task.state == "suspended" then
		--print("suspended task wants to run")
		return true;
	end

	-- If we have gotten this far, then the task truly is ready to 
	-- run, and it should be set as the currentFiber, and its coroutine
	-- is resumed.
	self.CurrentFiber = task;
	local results = {task:resume()};

	-- once we get results back from the resume, one
	-- of two things could have happened.
	-- 1) The routine exited normally
	-- 2) The routine yielded
	--
	-- In both cases, we parse out the results of the resume 
	-- into a success indicator and the rest of the values returned 
	-- from the routine
	--local pcallsuccess = results[1];
	--table.remove(results,1);

	local success = results[1];
	table.remove(results,1);

--print("PCALL, RESUME: ", pcallsuccess, success)

	-- no task is currently executing
	self.CurrentFiber = nil;


	if not success then
		print("RESUME ERROR")
		print(unpack(results));
	end

	-- Again, check to see if the task is dead after
	-- the most recent resume.  If it's dead, then don't
	-- bother putting it back into the readytorun queue
	-- just remove the task from the list of tasks
	if task:getStatus() == "dead" then
		self:removeFiber(task)

		return true;
	end

	-- The only way the task will get back onto the readylist
	-- is if it's state is 'readytorun', otherwise, it will
	-- stay out of the readytorun list.
	if task.state == "readytorun" then
		self:scheduleTask(task, results);
	end
end

function Scheduler.yield(self, ...)
	return coroutine.yield(...);
end


return Scheduler

I think the code is pretty straight forward to follow, except for that ‘step()’ function.  So, what is a scheduler anyway?  Well, we want to have some level of ‘concurrency’ in our applications.  That is, I would love to be able to have the appearance of running several tasks at once on my machine, without much work.  In modern day computing, this is fairly well covered by the OS as applications.  We can run several apps at the same time on our computers, and the kernel takes care of the details of how that gets done.  Modern kernels simply split  time between tasks, giving some amount of time to each task in turn, so they feel like they are running in parallel, or concurrently.  It’s a nice illusion because the machines are fast enough, and there are enough times when a task will simply be waiting around anyway that the amount of concurrency achieved can be very high.

So, what if you want some of that kernel goodness within the context of your app directly?  You want to handle mouse and keyboard, while doing some drawing and networking at the same time.  This is typically where ‘threads’ step in, and down the rabbit hole we go, getting involved in threading models, concurrency primitives and the like.  Well, in Lua, there’s a concurrency primitive built in, in the form of coroutines.  Coroutines are a fairly old style of concurrency whereby the application gets control of the CPU, and only gives it up willingly by calling a ‘yield()’ function at some point.  OK, great.  So, you start a routine, and at some point you call yield, and some other routine gets a chance to run.  But, which routine?  Aha, so this is where the ‘scheduler’ comes in.

The scheduler might be considered the ‘home’ routine.  That is, once a routine gives up a slide of time by calling ‘yield’, the scheduler is the place they will return to.  At that point, the scheduler can decide which routines is to be run next.  It can do this because it has a list of ready to run tasks (those placed on the list using ‘scheduleTask’).  In this scheduler, it simply takes the next one off the list, and tells it to resume.  That is the essence of the ‘step()’ function.

Great!  And what does it look like in action?

--test_scheduler.lua
package.path = package.path..";../?.lua"

Scheduler = require("scheduler")()
Task = require("task")
local taskID = 0;

local function getNewTaskID()
	taskID = taskID + 1;
	return taskID;
end

local function spawn(scheduler, func, ...)
	local task = Task(func, ...)
	task.TaskID = getNewTaskID();
	scheduler:scheduleTask(task, {...});
	
	return task;
end


local function task1()
	print("first task, first line")
	Scheduler:yield();
	print("first task, second line")
end

local function task2()
	print("second task, only line")
end

local function main()
	local t1 = spawn(Scheduler, task1)
	local t2 = spawn(Scheduler, task2)

	while (true) do
		--print("STATUS: ", t1:getStatus(), t2:getStatus())
		if t1:getStatus() == "dead" and t2:getStatus() == "dead" then
			break;
		end
		Scheduler:step()
	end
end

main()

That’s the scheduler in isolation.  In this test case, you can see that task creation, and the ‘spawn’ primitive aren’t part of the scheduler proper.  They don’t need to be.  The scheduler should only be concerned with deciding which task is going to be next.  In this way, you can imagine creating all sorts of different kinds of schedulers.  You cold introduce the concept of priorities, or aging, or realtime, or whatever.  The same general idea would apply, the scheduler just needs to decide which task is going to run next.  Also of not here, the ‘main()’ function operates as the “even loop” if you will.  This is the bit of code that drives the scheduler between steps.  Having this separate from the scheduler proper will become advantageous down the road when we want to compose the scheduler will a larger application framework.

So, this is the first step in revitalizing the concurrency core I built into the TINN project.  This time around, more composable, more feature rich, more easily cross platform, more bang for the buck.  Next time around, I’ll examine the “kernel”.


Lua Coroutine Roundup

My WordPress dashboard says this is my 250th post. I figure what better way to mark the occasion than to do a little bit of a roundup on one of my favorite topics.

One of the most awesome features of the Lua language is a built in co-routine mechanism. I’ve talked about co-routines quite a bit, doing a little series on the basics of coroutines, all the way up through a scheduler for multi-tasking.

Coroutines give the programmer the illusion of creating a parallel processing environment. Basically, write up your code “spawn” different coroutines, and then just forget about them… Well, almost. The one thing the Lua language does not give you is a way to manage those coroutines. Co-routines are a mechanism by which ‘threads’ can do cooperative multi-tasking. This is different from the OS level preemptive multitasking that you get with ‘threads’ on most OS libraries. So, I can create coroutines, resume, and yield. It’s the ‘yield’ that gives coroutines the multi-tasking flavor. One co-routine yields, and another co-routine must be told to ‘resume’. Something has to do the telling, and keep track of who’s yielded, and who needs to be resumed. In steps the scheduler.

Summaries from my own archives
Lua Coroutines – Getting Started

Multitask UI Like it’s 1995

Hurry Up and Wait – TINN Timing

Computicles – A tale of two schedulers

Parallel Computing is Child’s Play

Multitasking single threaded UI – Gaming meets networking

Alertable Predicates? What’s that?

Pinging Peers – Creating Network Meshes

From the Interwebs
GitHub Gist: Deco / coroutine_scheduler.lua

Beginner’s Guide to Coroutines – Roblox Wiki

LOOP: Thread Scheduler

LuaAV

And of course the search engines will show you thousands more:

lua coroutine scheduler

I think it would be really great if the Lua language itself had some form of rudimentary scheduler, but scheduling is such a domain specific thing, and the rudiments are so basic, that it’s possibly not worth providing such support. The Go language supports a basic scheduler, and has coroutine support, to great effect I think.

The simple scheduler is simple, just do a round robbin execution of one task after the next as each task yields. I wrote a simple dispatcher which started with exactly this. Then interesting things start to happen. You’d like to have your tasks go into a wait state if they’re doing an IO operation, and not be scheduled again until some IO has occurred. Or, you want to put a thread to sleep for some amount of time. Or, you want a thread to wait on a condition of one sort or another, or signals, or something else. Pretty soon, you basic scheduler is hundreds of lines of code, and things start to get rather complicated.

After going round and round in this problem space, I finally came up with a solution that I think has some legs. Rather than a complex scheduler, I promote a simple scheduler, and add functions to it through normal thread support. So, here is my final ‘scheduler’ for 2013:


local ffi = require("ffi");

local Collections = require("Collections");
local StopWatch = require("StopWatch");


--[[
	The Scheduler supports a collaborative processing
	environment.  As such, it manages multiple tasks which
	are represented by Lua coroutines.
--]]
local Scheduler = {}
setmetatable(Scheduler, {
	__call = function(self, ...)
		return self:create(...)
	end,
})
local Scheduler_mt = {
	__index = Scheduler,
}

function Scheduler.init(self, scheduler)
	local obj = {
		Clock = StopWatch();

		TasksReadyToRun = Collections.Queue();
	}
	setmetatable(obj, Scheduler_mt)
	
	return obj;
end

function Scheduler.create(self, ...)
	return self:init(...)
end

--[[
		Instance Methods
--]]
function Scheduler.tasksArePending(self)
	return self.TasksReadyToRun:Len() > 0
end

function Scheduler.tasksPending(self)
	return self.TasksReadyToRun:Len();
end


function Scheduler.getClock(self)
	return self.Clock;
end


--[[
	Task Handling
--]]

function Scheduler.scheduleTask(self, afiber, ...)
	if not afiber then
		return false, "no fiber specified"
	end

	afiber:setParams(...);
	self.TasksReadyToRun:Enqueue(afiber);	
	afiber.state = "readytorun"

	return afiber;
end

function Scheduler.removeFiber(self, fiber)
	--print("DROPPING DEAD FIBER: ", fiber);
	return true;
end

function Scheduler.inMainFiber(self)
	return coroutine.running() == nil; 
end

function Scheduler.getCurrentFiber(self)
	return self.CurrentFiber;
end

function Scheduler.step(self)
	-- Now check the regular fibers
	local task = self.TasksReadyToRun:Dequeue()

	-- If no fiber in ready queue, then just return
	if task == nil then
		return true
	end

	if task:getStatus() == "dead" then
		self:removeFiber(task)

		return true;
	end

	-- If the task we pulled off the active list is 
	-- not dead, then perhaps it is suspended.  If that's true
	-- then it needs to drop out of the active list.
	-- We assume that some other part of the system is responsible for
	-- keeping track of the task, and rescheduling it when appropriate.
	if task.state == "suspended" then
		return true;
	end

	-- If we have gotten this far, then the task truly is ready to 
	-- run, and it should be set as the currentFiber, and its coroutine
	-- is resumed.
	self.CurrentFiber = task;
	local results = {task:resume()};

	-- no task is currently executing
	self.CurrentFiber = nil;

	-- once we get results back from the resume, one
	-- of two things could have happened.
	-- 1) The routine exited normally
	-- 2) The routine yielded
	--
	-- In both cases, we parse out the results of the resume 
	-- into a success indicator and the rest of the values returned 
	-- from the routine
	local success = results[1];
	table.remove(results,1);


	--print("SUCCESS: ", success);
	if not success then
		print("RESUME ERROR")
		print(unpack(results));
	end

	-- Again, check to see if the task is dead after
	-- the most recent resume.  If it's dead, then don't
	-- bother putting it back into the readytorun queue
	-- just remove the task from the list of tasks
	if task:getStatus() == "dead" then
		--print("Scheduler, DEAD coroutine, removing")
		self:removeFiber(task)

		return true;
	end

	-- The only way the task will get back onto the readylist
	-- is if it's state is 'readytorun', otherwise, it will
	-- stay out of the readytorun list.
	if task.state == "readytorun" then
		self:scheduleTask(task, results);
	end
end


--[[
	Primary Interfaces
--]]

function Scheduler.suspend(self, aTask)
	if not aTask then
		self.CurrentFiber.state = "suspended"
		return self:yield()
	end

	aTask.state = "suspended";

	return true
end

function Scheduler.yield(self, ...)
	return coroutine.yield(...);
end


--[[
	Running the scheduler itself
--]]
function Scheduler.start(self)
	if self.ContinueRunning then
		return false, "scheduler is already running"
	end
	
	self.ContinueRunning = true;

	while self.ContinueRunning do
		self:step();
	end
	--print("FINISHED STEP ITERATION")
end

function Scheduler.stop(self)
	self.ContinueRunning = false;
end

return Scheduler

At 195 LOC, this is back down to the basic size of the original simple dispatcher that I started with. There are only a few functions to this interface:

  • scheduleTask()
  • step()
  • suspend()
  • yield()
  • start()

These are the basics to do some scheduling of tasks. There is an assumption that there is an object that can be scheduled, such that ‘scheduleTask’ can attach some metadata, and the like. Other than that, there’s not much here but the basic round robin scheduler.

So, how do you make a more complex scheduler, say one that deals with time? Well, first of all, I’ve also added an Application object, which actually contains a scheduler, and supports various other things related to an application, including add ons to the scheduler. Here’s an excerpt of the Application object construction.

function Application.init(self, ...)
	local sched = Scheduler();

	waitForIO.MessageQuanta = 0;

	local obj = {
		Clock = Stopwatch();
		Scheduler = sched;
		TaskID = 0;
		wfc = waitForCondition(sched);
		wft = waitForTime(sched);
		wfio = waitForIO(sched);
	}
	setmetatable(obj, Application_mt)

	-- Create a task for each add-on
	obj:spawn(obj.wfc.start, obj.wfc)
	obj:spawn(obj.wft.start, obj.wft)
	obj:spawn(obj.wfio.start, obj.wfio)

	return obj;
end

In the init(), the Application object creates instances for the add ons to the scheduler. These add-ons (waitForCondition, waitForTime, waitForIO) don’t have to adhere to much of an interface, but you do need some function which is callable so that it can be spawned. The Application construction ends with the spawning of the three add-ons as normal threads in the scheduler. Yep, that’s right, the add ons are just like any other thread, running in the scheduler. You might think; But these things need to run with a higher priority than regular threads! And yes, in some cases they do need that. But hay, that’s an exercise for the scheduler. If the core scheduler gains a ‘priority’ mechanism, then these threads can be run with a higher priority, and everything is good.

And what do these add ons look like? I’ve gone over the before, but here’s an example of the timing one:

local Functor = require("Functor")
local tabutils = require("tabutils");


local waitForTime = {}
setmetatable(waitForTime, {
	__call = function(self, ...)
		return self:create(...)
	end,
})

local waitForTime_mt = {
	__index = waitForTime;
}

function waitForTime.init(self, scheduler)
	local obj = {
		Scheduler = scheduler;
		TasksWaitingForTime = {};
	}
	setmetatable(obj, waitForTime_mt)

	--scheduler:addQuantaStep(Functor(obj.step,obj));
	--scheduler:spawn(obj.step, obj)

	return obj;
end

function waitForTime.create(self, scheduler)
	if not scheduler then
		return nil, "no scheduler specified"
	end

	return self:init(scheduler)
end

function waitForTime.tasksArePending(self)
	return #self.TasksWaitingForTime > 0
end

function waitForTime.tasksPending(self)
	return #self.TasksWaitingForTime;
end

local function compareTaskDueTime(task1, task2)
	if task1.DueTime < task2.DueTime then
		return true
	end
	
	return false;
end

function waitForTime.yieldUntilTime(self, atime)
	--print("waitForTime.yieldUntilTime: ", atime, self.Scheduler.Clock:Milliseconds())
	--print("Current Fiber: ", self.CurrentFiber)
	local currentFiber = self.Scheduler:getCurrentFiber();
	if currentFiber == nil then
		return false, "not currently in a running task"
	end

	currentFiber.DueTime = atime;
	tabutils.binsert(self.TasksWaitingForTime, currentFiber, compareTaskDueTime);

	return self.Scheduler:suspend()
end

function waitForTime.yield(self, millis)
	--print('waitForTime.yield, CLOCK: ', self.Scheduler.Clock)

	local nextTime = self.Scheduler.Clock:Milliseconds() + millis;

	return self:yieldUntilTime(nextTime);
end

function waitForTime.step(self)
	--print("waitForTime.step, CLOCK: ", self.Scheduler.Clock);

	local currentTime = self.Scheduler.Clock:Milliseconds();

	-- traverse through the fibers that are waiting
	-- on time
	local nAwaiting = #self.TasksWaitingForTime;
--print("Timer Events Waiting: ", nAwaiting)
	for i=1,nAwaiting do

		local fiber = self.TasksWaitingForTime[1];
		if fiber.DueTime <= currentTime then
			--print("ACTIVATE: ", fiber.DueTime, currentTime);
			-- put it back into circulation
			-- preferably at the front of the list
			fiber.DueTime = 0;
			--print("waitForTime.step: ", self)
			self.Scheduler:scheduleTask(fiber);

			-- Remove the fiber from the list of fibers that are
			-- waiting on time
			table.remove(self.TasksWaitingForTime, 1);
		end
	end
end

function waitForTime.start(self)
	while true do
		self:step();
		self.Scheduler:yield();
	end
end

return waitForTime

The ‘start()’ function is what was scheduled with the scheduler, so that’s the thread that’s actually running. As you can see, it’s just a normal cooperative task.

The real action comes from the ‘yield()’ and ‘step()’ functions. The ‘yield()’s purpose in life is to suspend the current task (take it out of the active running list of the scheduler), and setup the appropriate stuff such that it can be activated later. The purpose of the “step()” function is to go through the list of tasks which are currently suspended, and determine which ones need to be scheduled again, because their time signal has been met.

And that’s how you add items such as “sleep()” to the system. You don’t change the core scheduler, but rather, you add a small module which knows how to deal with timed events, and that’s that.

The application object comes along and provides some convenience methods that can easily be turned into globals, so that you can easily type things like:

delay(function() print("Hello, World!") end, 5000)  -- print "Hello, World! after 5 seconds
periodic(function() print("Hi There!") end, 1000)  -- print "Hi, There!" every second

And there you have it. The core scheduler is a very simple thing. Just a ready list, and some algorithm that determines which one of the tasks in the list gets to run next. This can be as complex as necessary, or stay very simple.

Using an add on mechanism, the overall system can take on more robust scenarios, without increasing the complexity of the individual parts.

This simplified design satisfies all the scenario needs I currently have from high throughput web server to multi-tasking UI with network interaction. I will improve the scheduling algorithm, and probably make that a pluggable function for ease.

Lua coroutines are an awesome feature to have in a scripting language. With a little bit of work, a handy scheduler makes multi-task programming a mindless exercise, and that’s a good thing.

Happy New Year!


Lua Coroutines – Getting Started

Sometimes I run across a concept that’s really exciting, and I want to learn/know/master it, but my brain just throws up block after block, preventing me from truly learning it. Lua’s coroutines are kind of like that for me. I mean, think of the prospect of using ‘light weight threads’ in your programs. Lua’s coroutines aren’t ‘threads’ at all though, at least not in the usual ‘preemptive multitasking’ sense that one might think.

Lua coroutines are like programming in Windows 1.0. Basically, cooperative multi-tasking. If any one ‘thread’ doesn’t cooperate, the whole system comes to a standstill until that one task decides to take a break. So, some amount of work is required to do things correctly.

How to get started though?

Well, first of all, I start with a simple function:

local routines = {}

routines.doOnce = function(phrase)
  print(phrase)
end

routines.doOnce("Hello, World")
>Hello, World

A simple routine, that prints whatever I tell it to print. Fantastic, woot! Raise the roof up!!

Why did I go through the trouble of creating a table, and having a function, just to print? All will become clear in a bit.

Now, how about another routine? Perhaps one that does something in a loop:

routines.doLoop = function(iterations)
  for i=1,iterations do
    print("Iteration: ", i)
  end
end

routines.doLoop(3)
>Iteration: 1
>Iteration: 2
>Iteration: 3

Fantastic! Now I have one routine that prints a single thing, and exits, and another that prints a few numbers and exits.

Now just one last one:

routines.doTextChar = function(phrase)
  local nchars = #phrase

  for i=1,nchars do
    print(phrase:sub(i,i))
  end
end
>routines.doTextChar("brown")
>b
>r
>o
>w
>n

Now, what I really want to do is to be able to run all three of these routines “at the same time”, without a lot of fuss. That is, I want to allow the doOnce() routine to run once, then I want the doTextChar(), and doLoop() routines to interleave their work, first one going, then the next.

This is where lua’s coroutines come into the picture.

First of all, I’ll take the doOnce as an example. In order to run it as a corutine, there isn’t anything that needs to change about the routine. But, you do have to start using some coroutine machinery. Instead of calling the routine directly as usual, you call it using coroutine.resume(), and before that, you have to create something to resume, what Lua knows as a ‘thread’, using coroutine.create(). Like this:

local doOnceT = coroutine.create(doOnce)
coroutine.resume(doOnceT, "Hello, World");

This will have the same effect as just running the routine directly, but will run it as a coroutine. Not particularly useful in this case, but it sets us up for some further success.

The looping routines are a little bit more interesting. You could do the following:

local doLoopT = coroutine.create(doLoop)
coroutine.resume(doLoopT, 3)

That will have the same output as before. But, it won’t be very cooperative. The way in which a routine signals that it’s willing to give up a little slice of it’s CPU time is by calling the ‘coroutine.yield()’ function. So, to alter the original routine slightly:

routines.doLoop = function(iterations)
  for i=1,iterations do
    print("Iteration: ", i)
    coroutine.yield();
  end
end

Now, running will do exactly the same thing, but we’re now setup to cooperate with multiple tasks.

I will alter the ‘doTextChar()’ routine in a similar way:

routines.doTextChar = function(phrase)
  local nchars = #phrase

  for i=1,nchars do
    print(phrase:sub(i,i))
    coroutine.yield();
  end
end

OK, so now I have three routines, which are ready for doing some coroutine cooperation. But how to do that? The essential ingredient that is missing is a scheduler, or rather, something to take care of the mundane task of resuming each routine until it is done.

So, in comes the SimpleDispatcher:

local Collections = require "Collections"

local SimpleDispatcher_t = {}
local SimpleDispatcher_mt = {
  __index = SimpleDispatcher_t
}

local SimpleDispatcher = function()
  local obj = {
    tasklist = Collections.Queue.new();
  }
  setmetatable(obj, SimpleDispatcher_mt)

  return obj
end


SimpleDispatcher_t.AddTask = function(self, atask)
  self.tasklist:Enqueue(atask)	
end

SimpleDispatcher_t.AddRoutine = function(self, aroutine, ...)
  local routine = coroutine.create(aroutine)
  local task = {routine = routine, params = {...}}
  self:AddTask(task)

  return task
end

SimpleDispatcher_t.Run =  function(self)
  while self.tasklist:Len() > 0 do
    local task = self.tasklist:Dequeue()
    if not task then 
      break
    end

    if coroutine.status(task.routine) ~= "dead" then
      local status, values = coroutine.resume(task.routine, unpack(task.params));

      if coroutine.status(task.routine) ~= "dead" then
        self:AddTask(task)
      else
        print("TASK FINISHED")
      end
    else
      print("DROPPING TASK")
    end
  end
end

return SimpleDispatcher

Before explaining much, here is how I would use it:

local runner = SimpleDispatcher();

runner:AddRoutine(routines.doOnce, "Hello, World");
runner:AddRoutine(routines.doLoop, 3)
runner:AddRoutine(routines.doTextChar, "The brown")

runner:Run();

Basically, create an instance of this simple dispatcher.
Then, add some routines to it for execution. AddRoutine() does not actually start the routines going, it just queues them up to be run using coroutine.resume().
Lastly, call ‘runner:Run’, which will start a loop going, resulting in each of the tasks being called one after the other. This will result in the following output:

>Hello, World
>1
>T
>2
>h
>3
>e
>
>b
>r
>o
>w
>n

Basically, each time a routine calls ‘coroutine.yield()’, it’s giving up for a bit, and allowing the dispatcher to call the next routine. When it comes back around to calling the routine that gave up a slice of CPU time, it will resume from wherever the ‘yield()’ was called. And that’s the really part of coroutines!

I find the easiest way to think about coroutines is to simply code something, as if it were going to be an independent ‘thread’. Then I throw in a few choice yield() calls here and there to ensure the thread cooperates with other threads that might be running.

There are quite exotic things you can do with coroutines, and all sorts of frameworks to make working with them better/easier, and perhaps more mysterious.

For my simplistic pea brain though, this is a good place to start. Just taking simple routines, and handing them to the dispatcher.

That dispatcher is quite a nice little piece of work. If you think about it, it’s the equivalent of the scheduler within any OS. Assuming your Lua code is running in a single threaded process, being able to do your own scheduling is quite a powerful thing. You can add all sorts of exotics like aging, and thread priority. You can add timers, and I/O event or what have you. The beauty is, the scheduler can become as complex as you care to make it, and your coroutine code does not have to do anything special to adapt to it, just like with thread code in the OS proper.

So, there you have it. Getting started with Lua coroutines.

Now, if only I could combine socket pooling and this Dispatcher in some meaningful way. I wonder…