Schedulers revisited

A ran quite a long series on the buildup of the TINN scheduler, including this gem on the primitives included in the application model: Parallel Conversations

The scheduler developed in that whole series served me very well, taking on timing, predicates, signals, and async IO.  It was all based on Windows primitives, and although fairly modular, was an evolved codebase, which had some lumps and inefficiencies.  So now, two years on, I decided to reimagine that scheduler.  Primary reasons?  I had some time on my hands, and I wanted a scheduler that works equally well on Linux as on Windows.

And so, the schedlua project was born.

What were the design criteria this time around?

  • Reduce Size and complexity
  • Increase composability
  • Do not force features where they’re not desired

In the previous incarnation, I went back and forth between having an “Application” object, and the “Scheduler”.  Things were primarily driven by the Application, and it had a central role in the instance of any running app.  I still favor having this central role, but I changed the meaning and composition of that central player this time around.  First though, there is the “Scheduler”.


local ffi = require("ffi");

local Queue = require("queue")
local Task = require("task");


--[[
	The Scheduler supports a collaborative processing
	environment.  As such, it manages multiple tasks which
	are represented by Lua coroutines.

	The scheduler works by being the arbitrator of which routine is running
	in the application.  When work is to be done, it is encapsulated in 
	the form of a task object.  The scheduler relies upon that task object
	to call a form of 'yield' at some point, at which time it will receive
	the main thread of execution, and will pick the next task out of the ready
	list to run.
--]]
local Scheduler = {}
setmetatable(Scheduler, {
	__call = function(self, ...)
		return self:create(...)
	end,
})
local Scheduler_mt = {
	__index = Scheduler,
}

function Scheduler.init(self, ...)
	local obj = {
		TasksReadyToRun = Queue();
	}
	setmetatable(obj, Scheduler_mt)
	
	return obj;
end

function Scheduler.create(self, ...)
	return self:init(...)
end


function Scheduler.tasksPending(self)
	return self.TasksReadyToRun:length();
end


-- put a task on the ready list
-- the 'task' should be something that can be executed,
-- whether it's a function, functor, or something that has a '__call'
-- metamethod implemented.
-- The 'params' is a table of parameters which will be passed to the function
-- when it's ready to run.
function Scheduler.scheduleTask(self, task, params)
	--print("Scheduler.scheduleTask: ", task, params)
	params = params or {}
	
	if not task then
		return false, "no task specified"
	end

	task:setParams(params);
	self.TasksReadyToRun:enqueue(task);	
	task.state = "readytorun"

	return task;
end

function Scheduler.removeFiber(self, fiber)
	--print("REMOVING DEAD FIBER: ", fiber);
	return true;
end

function Scheduler.inMainFiber(self)
	return coroutine.running() == nil; 
end

function Scheduler.getCurrentFiber(self)
	return self.CurrentFiber;
end

function Scheduler.suspendCurrentFiber(self, ...)
	self.CurrentFiber.state = "suspended"
end

function Scheduler.step(self)
	-- Now check the regular fibers
	local task = self.TasksReadyToRun:dequeue()

	-- If no fiber in ready queue, then just return
	if task == nil then
		print("Scheduler.step: NO TASK")
		return true
	end

	if task:getStatus() == "dead" then
		self:removeFiber(task)

		return true;
	end

	-- If the task we pulled off the active list is 
	-- not dead, then perhaps it is suspended.  If that's true
	-- then it needs to drop out of the active list.
	-- We assume that some other part of the system is responsible for
	-- keeping track of the task, and rescheduling it when appropriate.
	if task.state == "suspended" then
		--print("suspended task wants to run")
		return true;
	end

	-- If we have gotten this far, then the task truly is ready to 
	-- run, and it should be set as the currentFiber, and its coroutine
	-- is resumed.
	self.CurrentFiber = task;
	local results = {task:resume()};

	-- once we get results back from the resume, one
	-- of two things could have happened.
	-- 1) The routine exited normally
	-- 2) The routine yielded
	--
	-- In both cases, we parse out the results of the resume 
	-- into a success indicator and the rest of the values returned 
	-- from the routine
	--local pcallsuccess = results[1];
	--table.remove(results,1);

	local success = results[1];
	table.remove(results,1);

--print("PCALL, RESUME: ", pcallsuccess, success)

	-- no task is currently executing
	self.CurrentFiber = nil;


	if not success then
		print("RESUME ERROR")
		print(unpack(results));
	end

	-- Again, check to see if the task is dead after
	-- the most recent resume.  If it's dead, then don't
	-- bother putting it back into the readytorun queue
	-- just remove the task from the list of tasks
	if task:getStatus() == "dead" then
		self:removeFiber(task)

		return true;
	end

	-- The only way the task will get back onto the readylist
	-- is if it's state is 'readytorun', otherwise, it will
	-- stay out of the readytorun list.
	if task.state == "readytorun" then
		self:scheduleTask(task, results);
	end
end

function Scheduler.yield(self, ...)
	return coroutine.yield(...);
end


return Scheduler

I think the code is pretty straight forward to follow, except for that ‘step()’ function.  So, what is a scheduler anyway?  Well, we want to have some level of ‘concurrency’ in our applications.  That is, I would love to be able to have the appearance of running several tasks at once on my machine, without much work.  In modern day computing, this is fairly well covered by the OS as applications.  We can run several apps at the same time on our computers, and the kernel takes care of the details of how that gets done.  Modern kernels simply split  time between tasks, giving some amount of time to each task in turn, so they feel like they are running in parallel, or concurrently.  It’s a nice illusion because the machines are fast enough, and there are enough times when a task will simply be waiting around anyway that the amount of concurrency achieved can be very high.

So, what if you want some of that kernel goodness within the context of your app directly?  You want to handle mouse and keyboard, while doing some drawing and networking at the same time.  This is typically where ‘threads’ step in, and down the rabbit hole we go, getting involved in threading models, concurrency primitives and the like.  Well, in Lua, there’s a concurrency primitive built in, in the form of coroutines.  Coroutines are a fairly old style of concurrency whereby the application gets control of the CPU, and only gives it up willingly by calling a ‘yield()’ function at some point.  OK, great.  So, you start a routine, and at some point you call yield, and some other routine gets a chance to run.  But, which routine?  Aha, so this is where the ‘scheduler’ comes in.

The scheduler might be considered the ‘home’ routine.  That is, once a routine gives up a slide of time by calling ‘yield’, the scheduler is the place they will return to.  At that point, the scheduler can decide which routines is to be run next.  It can do this because it has a list of ready to run tasks (those placed on the list using ‘scheduleTask’).  In this scheduler, it simply takes the next one off the list, and tells it to resume.  That is the essence of the ‘step()’ function.

Great!  And what does it look like in action?

--test_scheduler.lua
package.path = package.path..";../?.lua"

Scheduler = require("scheduler")()
Task = require("task")
local taskID = 0;

local function getNewTaskID()
	taskID = taskID + 1;
	return taskID;
end

local function spawn(scheduler, func, ...)
	local task = Task(func, ...)
	task.TaskID = getNewTaskID();
	scheduler:scheduleTask(task, {...});
	
	return task;
end


local function task1()
	print("first task, first line")
	Scheduler:yield();
	print("first task, second line")
end

local function task2()
	print("second task, only line")
end

local function main()
	local t1 = spawn(Scheduler, task1)
	local t2 = spawn(Scheduler, task2)

	while (true) do
		--print("STATUS: ", t1:getStatus(), t2:getStatus())
		if t1:getStatus() == "dead" and t2:getStatus() == "dead" then
			break;
		end
		Scheduler:step()
	end
end

main()

That’s the scheduler in isolation.  In this test case, you can see that task creation, and the ‘spawn’ primitive aren’t part of the scheduler proper.  They don’t need to be.  The scheduler should only be concerned with deciding which task is going to be next.  In this way, you can imagine creating all sorts of different kinds of schedulers.  You cold introduce the concept of priorities, or aging, or realtime, or whatever.  The same general idea would apply, the scheduler just needs to decide which task is going to run next.  Also of not here, the ‘main()’ function operates as the “even loop” if you will.  This is the bit of code that drives the scheduler between steps.  Having this separate from the scheduler proper will become advantageous down the road when we want to compose the scheduler will a larger application framework.

So, this is the first step in revitalizing the concurrency core I built into the TINN project.  This time around, more composable, more feature rich, more easily cross platform, more bang for the buck.  Next time around, I’ll examine the “kernel”.


Lua Coroutines – Getting Started

Sometimes I run across a concept that’s really exciting, and I want to learn/know/master it, but my brain just throws up block after block, preventing me from truly learning it. Lua’s coroutines are kind of like that for me. I mean, think of the prospect of using ‘light weight threads’ in your programs. Lua’s coroutines aren’t ‘threads’ at all though, at least not in the usual ‘preemptive multitasking’ sense that one might think.

Lua coroutines are like programming in Windows 1.0. Basically, cooperative multi-tasking. If any one ‘thread’ doesn’t cooperate, the whole system comes to a standstill until that one task decides to take a break. So, some amount of work is required to do things correctly.

How to get started though?

Well, first of all, I start with a simple function:

local routines = {}

routines.doOnce = function(phrase)
  print(phrase)
end

routines.doOnce("Hello, World")
>Hello, World

A simple routine, that prints whatever I tell it to print. Fantastic, woot! Raise the roof up!!

Why did I go through the trouble of creating a table, and having a function, just to print? All will become clear in a bit.

Now, how about another routine? Perhaps one that does something in a loop:

routines.doLoop = function(iterations)
  for i=1,iterations do
    print("Iteration: ", i)
  end
end

routines.doLoop(3)
>Iteration: 1
>Iteration: 2
>Iteration: 3

Fantastic! Now I have one routine that prints a single thing, and exits, and another that prints a few numbers and exits.

Now just one last one:

routines.doTextChar = function(phrase)
  local nchars = #phrase

  for i=1,nchars do
    print(phrase:sub(i,i))
  end
end
>routines.doTextChar("brown")
>b
>r
>o
>w
>n

Now, what I really want to do is to be able to run all three of these routines “at the same time”, without a lot of fuss. That is, I want to allow the doOnce() routine to run once, then I want the doTextChar(), and doLoop() routines to interleave their work, first one going, then the next.

This is where lua’s coroutines come into the picture.

First of all, I’ll take the doOnce as an example. In order to run it as a corutine, there isn’t anything that needs to change about the routine. But, you do have to start using some coroutine machinery. Instead of calling the routine directly as usual, you call it using coroutine.resume(), and before that, you have to create something to resume, what Lua knows as a ‘thread’, using coroutine.create(). Like this:

local doOnceT = coroutine.create(doOnce)
coroutine.resume(doOnceT, "Hello, World");

This will have the same effect as just running the routine directly, but will run it as a coroutine. Not particularly useful in this case, but it sets us up for some further success.

The looping routines are a little bit more interesting. You could do the following:

local doLoopT = coroutine.create(doLoop)
coroutine.resume(doLoopT, 3)

That will have the same output as before. But, it won’t be very cooperative. The way in which a routine signals that it’s willing to give up a little slice of it’s CPU time is by calling the ‘coroutine.yield()’ function. So, to alter the original routine slightly:

routines.doLoop = function(iterations)
  for i=1,iterations do
    print("Iteration: ", i)
    coroutine.yield();
  end
end

Now, running will do exactly the same thing, but we’re now setup to cooperate with multiple tasks.

I will alter the ‘doTextChar()’ routine in a similar way:

routines.doTextChar = function(phrase)
  local nchars = #phrase

  for i=1,nchars do
    print(phrase:sub(i,i))
    coroutine.yield();
  end
end

OK, so now I have three routines, which are ready for doing some coroutine cooperation. But how to do that? The essential ingredient that is missing is a scheduler, or rather, something to take care of the mundane task of resuming each routine until it is done.

So, in comes the SimpleDispatcher:

local Collections = require "Collections"

local SimpleDispatcher_t = {}
local SimpleDispatcher_mt = {
  __index = SimpleDispatcher_t
}

local SimpleDispatcher = function()
  local obj = {
    tasklist = Collections.Queue.new();
  }
  setmetatable(obj, SimpleDispatcher_mt)

  return obj
end


SimpleDispatcher_t.AddTask = function(self, atask)
  self.tasklist:Enqueue(atask)	
end

SimpleDispatcher_t.AddRoutine = function(self, aroutine, ...)
  local routine = coroutine.create(aroutine)
  local task = {routine = routine, params = {...}}
  self:AddTask(task)

  return task
end

SimpleDispatcher_t.Run =  function(self)
  while self.tasklist:Len() > 0 do
    local task = self.tasklist:Dequeue()
    if not task then 
      break
    end

    if coroutine.status(task.routine) ~= "dead" then
      local status, values = coroutine.resume(task.routine, unpack(task.params));

      if coroutine.status(task.routine) ~= "dead" then
        self:AddTask(task)
      else
        print("TASK FINISHED")
      end
    else
      print("DROPPING TASK")
    end
  end
end

return SimpleDispatcher

Before explaining much, here is how I would use it:

local runner = SimpleDispatcher();

runner:AddRoutine(routines.doOnce, "Hello, World");
runner:AddRoutine(routines.doLoop, 3)
runner:AddRoutine(routines.doTextChar, "The brown")

runner:Run();

Basically, create an instance of this simple dispatcher.
Then, add some routines to it for execution. AddRoutine() does not actually start the routines going, it just queues them up to be run using coroutine.resume().
Lastly, call ‘runner:Run’, which will start a loop going, resulting in each of the tasks being called one after the other. This will result in the following output:

>Hello, World
>1
>T
>2
>h
>3
>e
>
>b
>r
>o
>w
>n

Basically, each time a routine calls ‘coroutine.yield()’, it’s giving up for a bit, and allowing the dispatcher to call the next routine. When it comes back around to calling the routine that gave up a slice of CPU time, it will resume from wherever the ‘yield()’ was called. And that’s the really part of coroutines!

I find the easiest way to think about coroutines is to simply code something, as if it were going to be an independent ‘thread’. Then I throw in a few choice yield() calls here and there to ensure the thread cooperates with other threads that might be running.

There are quite exotic things you can do with coroutines, and all sorts of frameworks to make working with them better/easier, and perhaps more mysterious.

For my simplistic pea brain though, this is a good place to start. Just taking simple routines, and handing them to the dispatcher.

That dispatcher is quite a nice little piece of work. If you think about it, it’s the equivalent of the scheduler within any OS. Assuming your Lua code is running in a single threaded process, being able to do your own scheduling is quite a powerful thing. You can add all sorts of exotics like aging, and thread priority. You can add timers, and I/O event or what have you. The beauty is, the scheduler can become as complex as you care to make it, and your coroutine code does not have to do anything special to adapt to it, just like with thread code in the OS proper.

So, there you have it. Getting started with Lua coroutines.

Now, if only I could combine socket pooling and this Dispatcher in some meaningful way. I wonder…