schedlua – the kernel

Previously, I talked about the scheduler within the new scedlua.  A scheduler is a fairly simple thing, it just decides which of the many ready tasks will run next.  The default scheduler follows a fairly simple FIFO strategy, so there are no priorities, favorites, or the like.  Of course this wouldn’t be any fun if you were stuck with just one scheduler, so naturally enough this is an easily pluggable part of the system.  But what does this plugging?

In steps the Kernel.  In general, the schedlua project is about creating a set of tools by which highly performant services can be constructed.  schedlua largely supports the concept that a single processor can be highly leveraged if programmed correctly.  It does not try to gain performance through the usage of multiple threads, but rather it just takes on the task of suspending various tasks which are blocked on IO or otherwise idle, and letting tasks which are ready to run do their thing.  The concurrency model is cooperative, not preemptive, so if any one task misbehaves, the process can become stuck.

So, let’s take a look at this code:

--kernel.lua
-- kernel is a singleton, so return
-- single instance if we've already been
-- through this code
print("== KERNEL INCLUDED ==")

local Scheduler = require("scheduler")
local Task = require("task")
local Queue = require("queue")
local Functor = require("functor")

local Kernel = {
	ContinueRunning = true;
	TaskID = 0;
	Scheduler = Scheduler();
	TasksSuspendedForSignal = {};
}

setmetatable(Kernel, {
    __call = function(self, params)
    	params = params or {}
    	params.Scheduler = params.Scheduler or self.Scheduler
    	
    	if params.exportglobal then
    		self:globalize();
    	end

    	self.Scheduler = params.Scheduler;

    	return self;
    end,
})

function Kernel.getNewTaskID(self)
	self.TaskID = self.TaskID + 1;
	return self.TaskID;
end

function Kernel.getCurrentTaskID(self)
	return self:getCurrentTask().TaskID;
end

function Kernel.getCurrentTask(self)
	return self.Scheduler:getCurrentTask();
end

function Kernel.spawn(self, func, ...)
	local task = Task(func, ...)
	task.TaskID = self:getNewTaskID();
	self.Scheduler:scheduleTask(task, {...});
	
	return task;
end

function Kernel.suspend(self, ...)
	self.Scheduler:suspendCurrentFiber();
	return self:yield(...)
end

function Kernel.yield(self, ...)
	return self.Scheduler:yield();
end


function Kernel.signalOne(self, eventName, ...)
	if not self.TasksSuspendedForSignal[eventName] then
		return false, "event not registered", eventName
	end

	local nTasks = #self.TasksSuspendedForSignal[eventName]
	if nTasks < 1 then
		return false, "no tasks waiting for event"
	end

	local suspended = self.TasksSuspendedForSignal[eventName][1];

	self.Scheduler:scheduleTask(suspended,{...});
	table.remove(self.TasksSuspendedForSignal[eventName], 1);

	return true;
end

function Kernel.signalAll(self, eventName, ...)
	if not self.TasksSuspendedForSignal[eventName] then
		return false, "event not registered"
	end

	local nTasks = #self.TasksSuspendedForSignal[eventName]
	if nTasks < 1 then
		return false, "no tasks waiting for event"
	end

	for i=1,nTasks do
		self.Scheduler:scheduleTask(self.TasksSuspendedForSignal[eventName][1],{...});
		table.remove(self.TasksSuspendedForSignal[eventName], 1);
	end

	return true;
end

function Kernel.waitForSignal(self, eventName)
	local currentFiber = self.Scheduler:getCurrentTask();

	if currentFiber == nil then
		return false, "not currently in a running task"
	end

	if not self.TasksSuspendedForSignal[eventName] then
		self.TasksSuspendedForSignal[eventName] = {}
	end

	table.insert(self.TasksSuspendedForSignal[eventName], currentFiber);

	return self:suspend()
end

function Kernel.onSignal(self, func, eventName)
	local function closure()
		self:waitForSignal(eventName)
		func();
	end

	return self:spawn(closure)
end



function Kernel.run(self, func, ...)

	if func ~= nil then
		self:spawn(func, ...)
	end

	while (self.ContinueRunning) do
		self.Scheduler:step();		
	end
end

function Kernel.halt(self)
	self.ContinueRunning = false;
end

function Kernel.globalize()
	halt = Functor(Kernel.halt, Kernel);
    onSignal = Functor(Kernel.onSignal, Kernel);

    run = Functor(Kernel.run, Kernel);

    signalAll = Functor(Kernel.signalAll, Kernel);
    signalOne = Functor(Kernel.signalOne, Kernel);

    spawn = Functor(Kernel.spawn, Kernel);
    suspend = Functor(Kernel.suspend, Kernel);

    waitForSignal = Functor(Kernel.waitForSignal, Kernel);

    yield = Functor(Kernel.yield, Kernel);
end

return Kernel;

 

From the top, you can see the Kernel requires the scheduler, task and functor. The scheduler has already been explained. The Kernel serves a couple of purposes. First of all, it manages the scheduler. The ‘run’ function at the bottom is the ‘loop’ of the application. It will run until ‘halt’ is called. Each time through the loop it’s telling the scheduler to take a step.

Also at the bottom, you can see usage of the Functor. A functor is just a simple convenience wrapper that helps you call a function on a table at a later point. Those functors are used to make the keywords global.

There are two primary things the kernel does. One is to spawn new tasks, the other is to provide a central point for signal handling.

First, let’s look at the ‘spawn’.

function Kernel.spawn(self, func, ...)
	local task = Task(func, ...)
	task.TaskID = self:getNewTaskID();
	self.Scheduler:scheduleTask(task, {...});
	
	return task;
end

This is actually the coolest part of the system in terms of how the programming model is expressed. Here’s an example of it in use.

local Kernel = require("kernel"){exportglobal = true}


local function numbers(ending)
	local idx = 0;
	local function closure()
		idx = idx + 1;
		if idx > ending then
			return nil;
		end
		return idx;
	end
	
	return closure;
end

local function task1()
	print("first task, first line")
	yield();
	print("first task, second line")
end

local function task2()
	print("second task, only line")
end

local function counter(name, nCount)
	for num in numbers(nCount) do
		print(name, num);
		yield();
	end
	halt();
end

local function main()
	local t0 = spawn(counter, "counter1", 5)
	local t1 = spawn(task1)
	local t2 = spawn(task2)
	local t3 = spawn(counter, "counter2", 7)
end

run(main)

Basically, any time you want something to happen concurrently, you just say ‘spawn(func, params) and that’s that.

What happens is a Task object is created which holds onto the function object as well as the initial set of parameters. This task is then sent to the scheduler to be run. From then on out you can forget about it. Of course, you’re handed the task when you say ‘spawn’, so you do have a chance of suspending and killing it off in the future if you like. Similarly, you can wait for a task to complete as well.

So, that’s spawning.

The other major feature in the kernel is signal handling.

function Kernel.waitForSignal(self, eventName)
	local currentFiber = self.Scheduler:getCurrentTask();

	if currentFiber == nil then
		return false, "not currently in a running task"
	end

	if not self.TasksSuspendedForSignal[eventName] then
		self.TasksSuspendedForSignal[eventName] = {}
	end

	table.insert(self.TasksSuspendedForSignal[eventName], currentFiber);

	return self:suspend()
end

This is probably THE most important routine in the whole system. Basically, take the current task, and put it onto the suspension list, connected with the signal name (signals are just a string value). Later on, when you want the task to resume, you would signal it using either signalOne(), or signalAll().

A little bit of code demonstrating this:

local Kernel = require("kernel"){exportglobal = true}
local Functor = require("functor")

local function numbers(ending)
	local idx = 0;
	local function closure()
		idx = idx + 1;
		if idx > ending then
			return nil;
		end
		return idx;
	end
	
	return closure;
end

local function waitingOnCount(name, ending)
	local eventName = name..tostring(ending)
	waitForSignal(eventName)

	print("REACHED COUNT: ", ending)
end

local function onCountFinished(name)
	print("Counter Finished: ", name)
end

local function counter(name, nCount)
	for num in numbers(nCount) do
		print(num)
		local eventName = name..tostring(num);
		--print(eventName)
		signalOne(eventName);

		yield();
	end

	signalAll(name..'-finished')
end

local function main()
	local t1 = spawn(counter, "counter", 50)
	local t2 = spawn(waitingOnCount, "counter", 20)
	local t3 = spawn(function() print("LAMDA"); waitForSignal("counter15") print("reached 15!!") end)
	
	-- test signalAll().  All three of these should trigger when
	-- counter finishes
	local t13 = onSignal(Functor(onCountFinished, "counter-1"), "counter-finished")
	local t14 = onSignal(Functor(onCountFinished, "counter-2"), "counter-finished")
	local t15 = onSignal(Functor(onCountFinished, "counter-3"), "counter-finished")
end

run(main)

Here, there is a counter(), which is just counting, and firing off a signal for each number. The various waitingOnCount(), and LAMBDA routines are going to respond to the appropriate signals.

Finally, the t13, t14, and t15 tasks are waiting for the “counter-finished” signal, and they will all fire off and print their little message.

Of course, at this point you could have something that would call ‘halt()’ so you don’t have to press Ctl-C to stop the process, but you get the idea.

And that’s pretty much it for the kernel. Absent from here are the async io, predicates, alarms and the like. They are available in schedlua, but they’re not a part of the kernel. Instead of being part of the kernel proper, these are essentially modules. They utilize the signal and spawn features built into the kernel, and they’re free to do their own thing.

I’ll get into the details of alarms and predicates next time around to demonstrate the concept of easy add-on modules.

Advertisements


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s