All the pretty little asyncs…

I have gone on about various forms of async for quite some time now. So could there possibly be more? Well, yes of course!

Here’s the scenario I want to enable. I want to keep track of my file system activity, sending the various operations to a distant storage facility. I want to do this while a UI displays what’s going on, and I want to be able to configure things while its happening, like which events I really care to shadow, and the like.

I don’t want to use multiple OS level threads if I can at all avoid them as they will complicate my programming tremendously. So, what to do.

Well, first I’ll start with the file tracking business. I have talked about change journals in the past. This is a basic mechanism that Windows has to track changes to the file system. Every single open, close, delete, write, etc, has an entry in this journal. If you’re writing a backup program, you’ll be using change journals.

The essence of the change journal functionality is usage of the DeviceIoControl() function. Most of us are very familiar with the likes of CreateFile(), ReadFile(), WriteFile(), CloseHandle(), when it comes to dealing with files. But, for everything else, there is this DeviceIOControl() function.

What is a device? Well, you’d be surprised to learn that most things in the Windows OS are represented by ‘devices’ just like they are in UNIX systems. For example, ‘C:’, is a device. But, also “DISPLAY1″ is also a device, as are “LCD” and “PhysicalDisk0″. When it comes to controlling devices, the Win32 level API calls will ultimately make DeviceIoControl() calls with various parameters. That’s great to know as it allows you to create whatever API you want, as long as you know the nuances of the device driver you’re trying to control.

But, I digress. The key point here is that I can open up a device, and I can make a DeviceIoControl() call, and true to form, I can use OVERLAPPED structures, and IO Completion Ports. That makes these calls “async”, or with TINN, cooperative.

To wrap it up in a tidy little bow, here is a Device class which does the grunt work for me:

--[[
References

http://msdn.microsoft.com/en-us/magazine/cc163415.aspx

--]]
local ffi = require("ffi")
local bit = require("bit")
local bor = bit.bor;

local core_file = require("core_file_l1_2_0");
local core_io = require("core_io_l1_1_1");
local Application = require("Application")
local IOOps = require("IOOps")
local FsHandles = require("FsHandles");
local errorhandling = require("core_errorhandling_l1_1_1");
local WinBase = require("WinBase");


local Device = {}
setmetatable(Device, {
	__call = function(self, ...)
		return self:open(...)
	end,
})
local Device_mt = {
	__index = Device,
}

function Device.init(self, rawhandle)
	local obj = {
		Handle = FsHandles.FsHandle(rawhandle)
	}
	setmetatable(obj, Device_mt)
	
	Application:watchForIO(rawhandle, rawhandle)

	return obj;
end


function Device.open(self, devicename, dwDesiredAccess, dwShareMode)
	local lpFileName = string.format("\\\\.\\%s", devicename);
	dwDesiredAccess = dwDesiredAccess or bor(ffi.C.GENERIC_READ, ffi.C.GENERIC_WRITE);
	dwShareMode = bor(FILE_SHARE_READ, FILE_SHARE_WRITE);
	local lpSecurityAttributes = nil;
	local dwCreationDisposition = OPEN_EXISTING;
	local dwFlagsAndAttributes = FILE_FLAG_OVERLAPPED;
	local hTemplateFile = nil;

	local handle = core_file.CreateFileA(
        lpFileName,
        dwDesiredAccess,
        dwShareMode,
     	lpSecurityAttributes,
        dwCreationDisposition,
        dwFlagsAndAttributes,
     	hTemplateFile);


	if handle == INVALID_HANDLE_VALUE then
		return nil, errorhandling.GetLastError();
	end

	return self:init(handle)
end

function Device.getNativeHandle(self)
	return self.Handle.Handle;
end

function Device.createOverlapped(self, buff, bufflen)
	local obj = ffi.new("FileOverlapped");
	
	obj.file = self:getNativeHandle();
	obj.OVL.Buffer = buff;
	obj.OVL.BufferLength = bufflen;

	return obj;
end

function Device.control(self, dwIoControlCode, lpInBuffer, nInBufferSize, lpOutBuffer, nOutBufferSize)
	local lpBytesReturned = nil;
	local lpOverlapped = self:createOverlapped(ffi.cast("void *", lpInBuffer), nInBufferSize);


	local status = core_io.DeviceIoControl(self:getNativeHandle(), 
          dwIoControlCode, 
          ffi.cast("void *", lpInBuffer),
          nInBufferSize,
          lpOutBuffer,
          nOutBufferSize,
          lpBytesReturned,
          ffi.cast("OVERLAPPED *",lpOverlapped));

	local err = errorhandling.GetLastError();

	-- Error conditions
	-- status == 1, err == WAIT_TIMEOUT (258)
	-- status == 0, err == ERROR_IO_PENDING (997)
	-- status == 0, err == something else

	if status == 0 then
		if err ~= ERROR_IO_PENDING then
			return false, err
		end
	end

    local key, bytes, ovl = Application:waitForIO(self, lpOverlapped);

    return bytes;
end

return Device

I’ve shown this kind of construct before with the NativeFile object. That object contains Read, and Write functions as well, but lacks the control() function. Of course the two could be combined for maximum benefit.

How to use this thing?

dev = Device("c:")
dev:control(...)

OK, that’s out of the way. Now, what about this change journal thing? Very simple now that the device is handled.
A change journal can look like this:

-- USNJournal.lua
-- References
-- http://msdn.microsoft.com/en-us/library/windows/desktop/aa364563(v=vs.85).aspx
-- http://www.microsoft.com/msj/0999/journal/journal.aspx
-- http://www.microsoft.com/msj/1099/journal2/journal2.aspx
-- 

local ffi = require("ffi");
local bit = require("bit");
local bor = bit.bor;
local band = bit.band;

local core_io = require("core_io_l1_1_1");
local core_file = require("core_file_l1_2_0");
local WinIoCtl = require("WinIoCtl");
local WinBase = require("WinBase");
local errorhandling = require("core_errorhandling_l1_1_1");
local FsHandles = require("FsHandles");
local Device = require("Device")

--[[
	ChangeJournal

	An abstraction for NTFS Change journal management
--]]
local ChangeJournal = {}
setmetatable(ChangeJournal, {
	__call = function(self, ...)
		return self:open(...);
	end,
});

local ChangeJournal_mt = {
	__index = ChangeJournal;
}

ChangeJournal.init = function(self, device)
	local obj = {
		Device = device;
	}
	setmetatable(obj, ChangeJournal_mt);

	local jinfo, err = obj:getJournalInfo();

	print("ChangeJournal.init, jinfo: ", jinfo, err)

	if jinfo then
		obj.JournalID = jinfo.UsnJournalID;
		obj.LowestUsn = jinfo.LowestValidUsn;
		obj.FirstUsn = jinfo.FirstUsn;
		obj.MaxSize = jinfo.MaximumSize;
		obj.MaxUsn = jinfo.MaxUsn;
		obj.AllocationSize = jinfo.AllocationDelta;
	end

	return obj;
end


ChangeJournal.open = function(self, driveLetter)
	local device, err = Device(driveLetter)

	if not device then
		print("ChangeJournal.open, ERROR: ", err)
		return nil, err
	end

	return self:init(device);
end


ChangeJournal.getNextUsn = function(self)
	local jinfo, err = self:getJournalInfo();

	if not jinfo then
		return false, err;
	end

	return jinfo.NextUsn;
end



ChangeJournal.getJournalInfo = function(self)
	local dwIoControlCode = FSCTL_QUERY_USN_JOURNAL;
	local lpInBuffer = nil;
	local nInBufferSize = 0;
	local lpOutBuffer = ffi.new("USN_JOURNAL_DATA");
	local nOutBufferSize = ffi.sizeof(lpOutBuffer);

	local success, err = self.Device:control(dwIoControlCode, 
          lpInBuffer,
          nInBufferSize,
          lpOutBuffer,
          nOutBufferSize);

	if not success then
		return false, errorhandling.GetLastError();
	end

	return lpOutBuffer;
end

function ChangeJournal.waitForNextEntry(self, usn, ReasonMask) 
 	usn = usn or self:getNextUsn();
 	local ReasonMask = ReasonMask or 0xFFFFFFFF;
 	local ReturnOnlyOnClose = false;
 	local Timeout = 0;
 	local BytesToWaitFor = 1;

    local ReadData = ffi.new("READ_USN_JOURNAL_DATA", {usn, ReasonMask, ReturnOnlyOnClose, Timeout, BytesToWaitFor, self.JournalID});

    local pusn = ffi.new("USN");
    
    -- This function does not return until the USN
    -- record exits
	local BUF_LEN = ffi.C.USN_PAGE_SIZE;
	local Buffer = ffi.new("uint8_t[?]", BUF_LEN);
    local dwBytes = ffi.new("DWORD[1]");

	local success, err = self.Device:control(FSCTL_READ_USN_JOURNAL, 
        ReadData,
        ffi.sizeof(ReadData),
        Buffer,
        BUF_LEN);

	if not success then 
		return false, err
	end

	local UsnRecord = ffi.cast("PUSN_RECORD", ffi.cast("PUCHAR",Buffer) + ffi.sizeof("USN")); 

    return UsnRecord;
end

return ChangeJournal;

This very much looks like the change journal I created a few months back. The primary difference is the device control stuff is abstracted out into the Device object, so it does not need to be repeated here.

When we want to track the changes to the device, we make repeated calls to ‘waitForNextEntry’.

local function test_waitForNextEntry(journal)
    local entry = journal:waitForNextEntry();

    while entry do
        printJournalEntry(entry);
        entry = journal:waitForNextEntry();
    end
end

This is your typical serially written code. There’s nothing that look special about it, no hint of async processing. Behind the covers though, way back in the Device:control() function, the actual sending of a command to the device happens using IO Completion Port, so if you’re running with TINN, this particular task will ‘waitForIO’, and other tasks can continue.

So, using it in context looks like this:

local function main()
    local journal, err = ChangeJournal("c:")

    spawn(test_waitForNextEntry, journal);
    periodic(function() print("tick") end, 1000)
end

run(main)

In this case, I spawn the journal waiting/printing thing as its own task. Then I setup a periodic timer to simply print “tick” every second, to show there is some activity.

Since the journaling is cooperative (mostly waiting for io control to complete), the periodic timer, or UI processing, or what have you, is free to run, without any hindrance.

Combine this with the already cooperative UI stuff, and you can imagine how easy it could be to construct the scenario I set out to construct. Since all networking and file system operations in TINN are automatically async, it would be easy to log these values, or send them across a network to be stored for later analysis or what have you.

And there you have it. Async everywhere makes for some very nice scenarios. Being able to do async on any device, whether with standard read/write operations, or io control, makes for very flexible programming.

Next time around, I’m going to show how to do async DNS queries for fun and profit.


TINN Version 0.7 Available

Although TINN is under constant development, there’s nothing like declaring a new “release”. It’s been 3 months since the 0.6 release. So, now there is a 0.7 release. You can read about the TINN v0.7 Release and install it if you would like.

There were 84 commits since the previous release, so I can’t even remember all the changes that were involved. The major addition from my most recent work has to do with the new scheduler as described on this blog. That’s the extensible, plug-in driven scheduler. Pretty nifty for my work at least.

There are quite a few additions, such as a revamped stream object, io completion port supported file interface, a logfile thing, and quite a few more interfaces from the OS.

Other items I have been working on include support for various COM interfaces such as DXGI, DXVA, MMDevice and some others. There are all in the “experimental” folder if you look at the enlistment. They are not quite ready for prime time, so they’re not actually in the v0.7 release.

What can you do with TINN? In short, you can create all sorts of Windows based applications. Everything from scalable web services to interactive multi-tasking UI (including OpenGL based).

TINN is a command line tool (tinn.exe). As such, the easiest thing to do is bring up a command line shell and run various scripts through tinn.

c:\> tinn.exe hello.lua

The TINN repository contains numerous test cases that utilize the various modules of TINN.

That’s it for now.  Next release will be about cleanup and stabilization primarily.


Parallel Conversations

I have been tinkering with words of late. I’ve added ‘waitFor’, ‘when’ and ‘whenever’ to the TINN lexicon, and my programming is becoming easier, more organized, and easier to describe.

Recently I added another set of words: ‘waitSignal’, ‘signalOne’, and ‘signalAll’. If you were using another system, you might call these ‘events’, but really they’re just signaling.

Here’s how I might use them:

local Application = require("Application")(true)

local function waiter(num)
  num = num or 0

  local function closure()
    print(string.format("WAITED: %d",num))
  end

  return closure;
end

local function main()

  for i=1,4 do
    onSignal(waiter(i), "waiting")
  end

  -- signal only one of them
  signalOne("waiting")

  -- sleep a bit giving other tasks
  -- a chance to run
  sleep(500)

  -- signal the rest of them	
  signalAll("waiting"))
  sleep(2000)
	
  print("SLEEP AFTER signalAll")
end

run(main)

First, it spawns 4 tasks which will all be waiting on the ‘waiting’ signal. With ‘signalOne’, only 1 of the 4 waiting tasks will be awakened and continue execution. With the ‘signalAll’, the rest of the waiting tasks are all scheduled to run, and thus continue from where they left of.

The core primitive for signaling is the ‘waitSignal()’ function. This will essentially suspend the currently running task until the specified signal has been given. The ‘onSignal()’ function is a convenience which will spawn a separate task which will do the waiting and execute the specified function at the appropriate time.

If you were using another language, these might be ‘onEvent()’ and ‘emit()’. Of course it’s really easy to code up that way, just create a name alias to whatever you like. You could even do:

local function on(eventName, func)
  return onSignal(func, eventName)
end

So, these are some more words that are in the parallel programming arsenal. In totality, the set is:

Time related

  • sleep
  • delay
  • periodic

Predicates

  • waitFor
  • when
  • whenever

Signaling

  • onSignal
  • signalOne
  • signalAll
  • waitSignal

Task and scheduler

  • run
  • stop
  • spawn
  • yield

With these words in hand, doing my multi-task programming is becoming easier by the moment. I don’t worry about the lower level multi-tasking primitive such as mutexes, locks, and the like. I’m not really that concerned with timers, thread pools, or any of the other machinery that makes this all happen. I just write my simple code.

One thing has struck me though. This is all great for a single threaded environment. Basically collaborative multi-tasking. In this age of super fast CPUs, it turns out that going single threaded is just fine. In many cases it’s actually better than going preemptive multi-threaded because you don’t necessarily incur as much OS thread level context switching.

I’d like to go one step further though. A long time ago we didn’t have L1/L2 caches on the CPUs, then they came along, got better, and now are fairly large. As a programmer, my concern for them is fairly minimal. I don’t do much to ensure that the caches are used properly. I just write my code, and either the compiler, or the CPU itself deals with all the mechanics. Same goes with prefetching instructions and the like. I don’t write my code to make that job any easier, it just happens automagically.

Here I am contemplating the same thing about threads. As my CPU has increasingly more cores, I am wondering if the CPU cycles should be viewed the same as we viewed levels of memory in the past. Is the a C1/C2 level of core execution. That is, most things should stay on the same thread because switching to a different core is costly. Something very low level should decide on when that switching occurs.

My application/runtime can know a lot about what I’m going to do because the thing is all laid out in script. After some analysis, perhaps I can decide “this thing is going to sleep for more than 500 ms, therefore I’m going to actually put it over here on this other thread which is actually sleeping.”. In that way, I can reduce my resource usage because I won’t be polling the timer every time through the scheduling loop, I can be smarter.

If I had such smartness, then when I have 16 – 100 cores, I’ll be able to easily take advantage of those multiple cores without having to truly understand and navigate that madness on my own. Let the machine figure it out, it’s much better at that than I am.

In the meanwhile, I’ve got these multi-tasking primitives which are making my life a lot easier when it comes to programming fairly complex systems.


Lua Coroutine Roundup

My WordPress dashboard says this is my 250th post. I figure what better way to mark the occasion than to do a little bit of a roundup on one of my favorite topics.

One of the most awesome features of the Lua language is a built in co-routine mechanism. I’ve talked about co-routines quite a bit, doing a little series on the basics of coroutines, all the way up through a scheduler for multi-tasking.

Coroutines give the programmer the illusion of creating a parallel processing environment. Basically, write up your code “spawn” different coroutines, and then just forget about them… Well, almost. The one thing the Lua language does not give you is a way to manage those coroutines. Co-routines are a mechanism by which ‘threads’ can do cooperative multi-tasking. This is different from the OS level preemptive multitasking that you get with ‘threads’ on most OS libraries. So, I can create coroutines, resume, and yield. It’s the ‘yield’ that gives coroutines the multi-tasking flavor. One co-routine yields, and another co-routine must be told to ‘resume’. Something has to do the telling, and keep track of who’s yielded, and who needs to be resumed. In steps the scheduler.

Summaries from my own archives
Lua Coroutines – Getting Started

Multitask UI Like it’s 1995

Hurry Up and Wait – TINN Timing

Computicles – A tale of two schedulers

Parallel Computing is Child’s Play

Multitasking single threaded UI – Gaming meets networking

Alertable Predicates? What’s that?

Pinging Peers – Creating Network Meshes

From the Interwebs
GitHub Gist: Deco / coroutine_scheduler.lua

Beginner’s Guide to Coroutines – Roblox Wiki

LOOP: Thread Scheduler

LuaAV

And of course the search engines will show you thousands more:

lua coroutine scheduler

I think it would be really great if the Lua language itself had some form of rudimentary scheduler, but scheduling is such a domain specific thing, and the rudiments are so basic, that it’s possibly not worth providing such support. The Go language supports a basic scheduler, and has coroutine support, to great effect I think.

The simple scheduler is simple, just do a round robbin execution of one task after the next as each task yields. I wrote a simple dispatcher which started with exactly this. Then interesting things start to happen. You’d like to have your tasks go into a wait state if they’re doing an IO operation, and not be scheduled again until some IO has occurred. Or, you want to put a thread to sleep for some amount of time. Or, you want a thread to wait on a condition of one sort or another, or signals, or something else. Pretty soon, you basic scheduler is hundreds of lines of code, and things start to get rather complicated.

After going round and round in this problem space, I finally came up with a solution that I think has some legs. Rather than a complex scheduler, I promote a simple scheduler, and add functions to it through normal thread support. So, here is my final ‘scheduler’ for 2013:


local ffi = require("ffi");

local Collections = require("Collections");
local StopWatch = require("StopWatch");


--[[
	The Scheduler supports a collaborative processing
	environment.  As such, it manages multiple tasks which
	are represented by Lua coroutines.
--]]
local Scheduler = {}
setmetatable(Scheduler, {
	__call = function(self, ...)
		return self:create(...)
	end,
})
local Scheduler_mt = {
	__index = Scheduler,
}

function Scheduler.init(self, scheduler)
	local obj = {
		Clock = StopWatch();

		TasksReadyToRun = Collections.Queue();
	}
	setmetatable(obj, Scheduler_mt)
	
	return obj;
end

function Scheduler.create(self, ...)
	return self:init(...)
end

--[[
		Instance Methods
--]]
function Scheduler.tasksArePending(self)
	return self.TasksReadyToRun:Len() > 0
end

function Scheduler.tasksPending(self)
	return self.TasksReadyToRun:Len();
end


function Scheduler.getClock(self)
	return self.Clock;
end


--[[
	Task Handling
--]]

function Scheduler.scheduleTask(self, afiber, ...)
	if not afiber then
		return false, "no fiber specified"
	end

	afiber:setParams(...);
	self.TasksReadyToRun:Enqueue(afiber);	
	afiber.state = "readytorun"

	return afiber;
end

function Scheduler.removeFiber(self, fiber)
	--print("DROPPING DEAD FIBER: ", fiber);
	return true;
end

function Scheduler.inMainFiber(self)
	return coroutine.running() == nil; 
end

function Scheduler.getCurrentFiber(self)
	return self.CurrentFiber;
end

function Scheduler.step(self)
	-- Now check the regular fibers
	local task = self.TasksReadyToRun:Dequeue()

	-- If no fiber in ready queue, then just return
	if task == nil then
		return true
	end

	if task:getStatus() == "dead" then
		self:removeFiber(task)

		return true;
	end

	-- If the task we pulled off the active list is 
	-- not dead, then perhaps it is suspended.  If that's true
	-- then it needs to drop out of the active list.
	-- We assume that some other part of the system is responsible for
	-- keeping track of the task, and rescheduling it when appropriate.
	if task.state == "suspended" then
		return true;
	end

	-- If we have gotten this far, then the task truly is ready to 
	-- run, and it should be set as the currentFiber, and its coroutine
	-- is resumed.
	self.CurrentFiber = task;
	local results = {task:resume()};

	-- no task is currently executing
	self.CurrentFiber = nil;

	-- once we get results back from the resume, one
	-- of two things could have happened.
	-- 1) The routine exited normally
	-- 2) The routine yielded
	--
	-- In both cases, we parse out the results of the resume 
	-- into a success indicator and the rest of the values returned 
	-- from the routine
	local success = results[1];
	table.remove(results,1);


	--print("SUCCESS: ", success);
	if not success then
		print("RESUME ERROR")
		print(unpack(results));
	end

	-- Again, check to see if the task is dead after
	-- the most recent resume.  If it's dead, then don't
	-- bother putting it back into the readytorun queue
	-- just remove the task from the list of tasks
	if task:getStatus() == "dead" then
		--print("Scheduler, DEAD coroutine, removing")
		self:removeFiber(task)

		return true;
	end

	-- The only way the task will get back onto the readylist
	-- is if it's state is 'readytorun', otherwise, it will
	-- stay out of the readytorun list.
	if task.state == "readytorun" then
		self:scheduleTask(task, results);
	end
end


--[[
	Primary Interfaces
--]]

function Scheduler.suspend(self, aTask)
	if not aTask then
		self.CurrentFiber.state = "suspended"
		return self:yield()
	end

	aTask.state = "suspended";

	return true
end

function Scheduler.yield(self, ...)
	return coroutine.yield(...);
end


--[[
	Running the scheduler itself
--]]
function Scheduler.start(self)
	if self.ContinueRunning then
		return false, "scheduler is already running"
	end
	
	self.ContinueRunning = true;

	while self.ContinueRunning do
		self:step();
	end
	--print("FINISHED STEP ITERATION")
end

function Scheduler.stop(self)
	self.ContinueRunning = false;
end

return Scheduler

At 195 LOC, this is back down to the basic size of the original simple dispatcher that I started with. There are only a few functions to this interface:

  • scheduleTask()
  • step()
  • suspend()
  • yield()
  • start()

These are the basics to do some scheduling of tasks. There is an assumption that there is an object that can be scheduled, such that ‘scheduleTask’ can attach some metadata, and the like. Other than that, there’s not much here but the basic round robin scheduler.

So, how do you make a more complex scheduler, say one that deals with time? Well, first of all, I’ve also added an Application object, which actually contains a scheduler, and supports various other things related to an application, including add ons to the scheduler. Here’s an excerpt of the Application object construction.

function Application.init(self, ...)
	local sched = Scheduler();

	waitForIO.MessageQuanta = 0;

	local obj = {
		Clock = Stopwatch();
		Scheduler = sched;
		TaskID = 0;
		wfc = waitForCondition(sched);
		wft = waitForTime(sched);
		wfio = waitForIO(sched);
	}
	setmetatable(obj, Application_mt)

	-- Create a task for each add-on
	obj:spawn(obj.wfc.start, obj.wfc)
	obj:spawn(obj.wft.start, obj.wft)
	obj:spawn(obj.wfio.start, obj.wfio)

	return obj;
end

In the init(), the Application object creates instances for the add ons to the scheduler. These add-ons (waitForCondition, waitForTime, waitForIO) don’t have to adhere to much of an interface, but you do need some function which is callable so that it can be spawned. The Application construction ends with the spawning of the three add-ons as normal threads in the scheduler. Yep, that’s right, the add ons are just like any other thread, running in the scheduler. You might think; But these things need to run with a higher priority than regular threads! And yes, in some cases they do need that. But hay, that’s an exercise for the scheduler. If the core scheduler gains a ‘priority’ mechanism, then these threads can be run with a higher priority, and everything is good.

And what do these add ons look like? I’ve gone over the before, but here’s an example of the timing one:

local Functor = require("Functor")
local tabutils = require("tabutils");


local waitForTime = {}
setmetatable(waitForTime, {
	__call = function(self, ...)
		return self:create(...)
	end,
})

local waitForTime_mt = {
	__index = waitForTime;
}

function waitForTime.init(self, scheduler)
	local obj = {
		Scheduler = scheduler;
		TasksWaitingForTime = {};
	}
	setmetatable(obj, waitForTime_mt)

	--scheduler:addQuantaStep(Functor(obj.step,obj));
	--scheduler:spawn(obj.step, obj)

	return obj;
end

function waitForTime.create(self, scheduler)
	if not scheduler then
		return nil, "no scheduler specified"
	end

	return self:init(scheduler)
end

function waitForTime.tasksArePending(self)
	return #self.TasksWaitingForTime > 0
end

function waitForTime.tasksPending(self)
	return #self.TasksWaitingForTime;
end

local function compareTaskDueTime(task1, task2)
	if task1.DueTime < task2.DueTime then
		return true
	end
	
	return false;
end

function waitForTime.yieldUntilTime(self, atime)
	--print("waitForTime.yieldUntilTime: ", atime, self.Scheduler.Clock:Milliseconds())
	--print("Current Fiber: ", self.CurrentFiber)
	local currentFiber = self.Scheduler:getCurrentFiber();
	if currentFiber == nil then
		return false, "not currently in a running task"
	end

	currentFiber.DueTime = atime;
	tabutils.binsert(self.TasksWaitingForTime, currentFiber, compareTaskDueTime);

	return self.Scheduler:suspend()
end

function waitForTime.yield(self, millis)
	--print('waitForTime.yield, CLOCK: ', self.Scheduler.Clock)

	local nextTime = self.Scheduler.Clock:Milliseconds() + millis;

	return self:yieldUntilTime(nextTime);
end

function waitForTime.step(self)
	--print("waitForTime.step, CLOCK: ", self.Scheduler.Clock);

	local currentTime = self.Scheduler.Clock:Milliseconds();

	-- traverse through the fibers that are waiting
	-- on time
	local nAwaiting = #self.TasksWaitingForTime;
--print("Timer Events Waiting: ", nAwaiting)
	for i=1,nAwaiting do

		local fiber = self.TasksWaitingForTime[1];
		if fiber.DueTime <= currentTime then
			--print("ACTIVATE: ", fiber.DueTime, currentTime);
			-- put it back into circulation
			-- preferably at the front of the list
			fiber.DueTime = 0;
			--print("waitForTime.step: ", self)
			self.Scheduler:scheduleTask(fiber);

			-- Remove the fiber from the list of fibers that are
			-- waiting on time
			table.remove(self.TasksWaitingForTime, 1);
		end
	end
end

function waitForTime.start(self)
	while true do
		self:step();
		self.Scheduler:yield();
	end
end

return waitForTime

The ‘start()’ function is what was scheduled with the scheduler, so that’s the thread that’s actually running. As you can see, it’s just a normal cooperative task.

The real action comes from the ‘yield()’ and ‘step()’ functions. The ‘yield()’s purpose in life is to suspend the current task (take it out of the active running list of the scheduler), and setup the appropriate stuff such that it can be activated later. The purpose of the “step()” function is to go through the list of tasks which are currently suspended, and determine which ones need to be scheduled again, because their time signal has been met.

And that’s how you add items such as “sleep()” to the system. You don’t change the core scheduler, but rather, you add a small module which knows how to deal with timed events, and that’s that.

The application object comes along and provides some convenience methods that can easily be turned into globals, so that you can easily type things like:

delay(function() print("Hello, World!") end, 5000)  -- print "Hello, World! after 5 seconds
periodic(function() print("Hi There!") end, 1000)  -- print "Hi, There!" every second

And there you have it. The core scheduler is a very simple thing. Just a ready list, and some algorithm that determines which one of the tasks in the list gets to run next. This can be as complex as necessary, or stay very simple.

Using an add on mechanism, the overall system can take on more robust scenarios, without increasing the complexity of the individual parts.

This simplified design satisfies all the scenario needs I currently have from high throughput web server to multi-tasking UI with network interaction. I will improve the scheduling algorithm, and probably make that a pluggable function for ease.

Lua coroutines are an awesome feature to have in a scripting language. With a little bit of work, a handy scheduler makes multi-task programming a mindless exercise, and that’s a good thing.

Happy New Year!


Interface Obscura – DXGI, DXVA, MMDevice…

If you have programmed with Windows over the years, you have been subjected to a plethora of languages, paradigms, frameworks, APIs, tectonic shifts, and the like. Why is this so? Well, because Windows is fairly old, and stuff that worked 30 years ago still works today, mostly. Hardware has evolved, gained and changed capabilities, and along with these changes, so too have the frameworks that support it.

One interesting trend that has been occurring with Windows over the past few OS releases, is that things are becoming more modular, and more properly layered. For example, the API sets that now exist in the core of Windows properly layer and segregate APIs such that you don’t have to pull in the entire world just to do a conversion from an Ansi string to a multi-byte string.

There are some parts of the system that offer up more obscure interfaces that most programmers won’t experience directly. Recently I’ve been wanting to get at the core of audio and visual stuff. In Windows, you could be using the Media Foundation framework, or you would be using the older ‘Wave’ media framework. Both of these at their core are supported by the MMDevice library. That’s an obscure library which gives you things like the names of your audio endpoints, and various attributes about them. Ultimately you can open up an audio stream on a device and get bytes out of it.

Another one, a little less obscure is the Direct X Graphics Interface (DXGI). This one gives you access to low level information about the graphics adapters and capabilities of your machine. How is it that the control panel knows about your different monitors for example? It ultimately calls down to DXGI I would bet.

Yet another one is DXVA. This one, closely related to DXGI, gives you access to graphics ‘surfaces’. This is how you can actually do high speed rendering directly to the screen for example. You can also make a high speed screen capture program, without the use of mirror drivers, because you can get a handle on the actual graphics surface that’s being used to render the screen. This is how the application and desktop thumbnails are able to be rendered.

In most cases, these interfaces are steeped in the Component Object Model (COM). COM is a fairly mature part of the system which was created to ease the task of joining disparate programming languages and system components using a common runtime infrastructure. It is the dynamic type system of Windows if you will. COM served a world where there was no universal type system, and you had to be able to marshal data across disparate libraries and application domains. If I were to do it from scratch today, I’d probably use something like LuaJIT’s ffi and just write the interfaces in standard ‘C’ headers, but this came from a different time.

Recently I wanted to get a handle on the audio endpoints of my machine, and just print out the various attributes of each endpoint. Here’s the code:

local MMDevice = require("MMDevice")

for device in MMDevice:AudioEndpoints() do
  print("Device ID: ", device:getID())

  for property in device:properties() do
    print("  Property: ", property.vt, tostring(property))
  end
end

Audio endpoints are things like ‘microphone’ and ‘speaker’. They either render sound (speaker), or capture sound (microphone). The MMDevice:AudioEndpoints() function is a typical Lua iterator on the IMMDevice interfaces that represent the various endpoints in the system. Since this is an iterator, it can be used with the Lua Fun functional programming library to do all sorts of filtering, reductions, and the like. But, in this particular case, I just want to print out the various properties associated with the endpoint.

Each device has an iterator that will feed out the properties. That is the ‘properties()’ function. Again, a standard Lua iterator which will just spew out the properties one by one. In this case, the properties are represented by a very COM like VARIANT structure. When you look it up, the VARIANT, is nothing more than a giant union of all the base types that the COM system understands. In a modern language such as Lua, I can actually return the base type instead of the boxed up union, but for now it’s good enough.

Here, the VARIANT object has a simple __tostring() implementation, to turn some of the more common types into their string equivalents, it looks something like this:

local PROPVARIANT = ffi.typeof("PROPVARIANT")
local PROPVARIANT_mt = {
    __tostring = function(self)
        if self.vt == ffi.C.VT_BLOB then
            return tostring(self.blob)
        elseif self.vt == ffi.C.VT_BOOL then
            return not (self.boolVal == 0);
        elseif self.vt == ffi.C.VT_CLSID then
            return tostring(ffi.cast("CLSID *", self.puuid))
        elseif self.vt == ffi.C.VT_LPWSTR then
            return core_string.toAnsi(self.pwszVal)
        elseif self.vt == ffi.C.VT_UI4 then
            return tonumber(self.uintVal)
        end

        return tostring(self.pcVal)
    end,
}
ffi.metatype(PROPVARIANT, PROPVARIANT_mt)

There are various levels of “COM Support” within the APIs of the Windows system. Some are fairly basic, providing nothing more than a fairly thin veneer over an otherwise standard C interface. The Kinect interface is like this, as is the DXGI interface. There may not be any usage of things like VARIANT, and there is not much ‘inheritance’ of interfaces other than from ‘IUnknown’. Then, there are other interfaces that pull you further down into the COM rabbit hole. MMDevice is one such interface. You need VARIANT, and IDispatch (at least in a cursory way), IStorage, IPropertyStorage, and quite a few GUIDs along the way. Everything is done with GUIDs, and you have to worry about using “Release()” on your interfaces, CoMemoryFree() and a whole bunch of other stuff which, if not done properly, will lead to leaks, bugs, and worse.

And this is why the various frameworks exist. This knowledge is too obscure and esoteric for most developers to have to concern themselves with it. Better someone who really knows and understands the intricacies to deal with it, and provide a higher level abstraction that smooths over some of the rough spots.

Why not just make the lower level stuff more approachable in the first place? Well, yah, it’s that legacy thing isn’t it? In some cases you can make a complete break and just code something up from scratch. The Kinect API does this. I can get frames out of my Kinect without having to worry about any multimedia frameworks, and without pulling in half the COM world.

It took me a good couple of days of pulling on the COM thread to bring in just enough support to make these interfaces work. Now that the basics of COM are in the system though, it’s much easier to pull in more interfaces. Mostly it’s a tedious job of looking up all the interfaces, putting them into a ffi header file, coding up the stubs to make the viable calls, making sure the __gc method is implemented to call “Release()”, etc. At the end of it, you gain access to what is likely a fairly useful interface without all the fuss of dealing with the low level COM stuff.

I implemented the MMDevice interface, not just to gain access to the audio endpoints, but really as a practice run for implementing the DXVA interfaces. In Windows 8, the DXVA interfaces contain the desktop duplication API, which replaces the old mirror driver API for screen copying. Using this low level API, I will redo the plumbing on my screen capture application, and dramatically improve the performance browser based screen sharing. The basic screen capture portion should remain fairly small, at a couple hundred lines of code, but the capabilities will be dramatically improved, while reducing the bandwidth requirements, which will mean even faster frame rates. It’s all goodness.

And so it goes. Windows has a rich history of APIs, and with that history comes a lot of baggage that most developers probably don’t want to deal with. There are new, more modern APIs introduced all the time. LuaJIT forms a pretty easy bridge to some of the less trod parts of the system, and gives rapid scriptable access to the new parts. Once that bridge has been formed, you can do some pretty nice things within the Lua environment on Windows.


What The Functor? – Making them useful

Last time, I introduced the Functor in a rather convoluted way. I said that particular implementation may or may not be as performant as a straight up closure. Well, as it turns out, that’s going to be true most of the time. Why?

You see, when you use a table as a functor, you are forcing a call to ‘getmetatable’ to do a lookup to see if the ‘__call’ metamethod is implemented or not. If it’s not implemented, an error is thrown, if it is implemented, then your operation is performed. Well, that means an extra function call and dereference into our table for every single function call. When this is on a hot path, it could be quite a bummer in terms of performance.

So, why bother with this particular convoluted construct for functor implementation? The beauty of the table technique is that since your functor is actually a table, which gives you they syntactic sugar of being called like a function, it has all the attributes of a table. For example, let’s imagine you want to associate some bit of information with your function, such as when it was created, or version information:

local f1 = Functor(somefunction)
f1.Creation = os.date()
f1.Version = "1.2"

That might be useful in a case where you want to refresh your functors if they’re getting to be too old, or you want to compare versions of functions to ensure they are what you expect. Very handy, yet very costly.

Although this is useful, it does impose this overhead, so is there another way to implement the functor without incurring so much overhead?

We of course! If you’re willing to lose the table capabilities of your functor, but you want to retain the fact that you can associate some amount of state with your functor, then a closure is the way to go:

local Functor = function(func, target)
  if not target then return func end

  return function(...)
    return func(target,...)
  end
end

Yah, that’s right. After I bad mouthed closures when I first introduced functors, here I am praising their value. And it’s true. Given the constraint of ditching associating information with the functor, beyond the passed in state, this closure implementation is a lot faster when it comes time to actually execute the code.

Not only is this code faster, but it’s a lot easier to understand, easier to maintain, etc.

And how would I use this in real life? Well, at the core of TINN is the scheduler, as represented by the IOProcessor class. The scheduler is at the heart of what makes TINN tick, and thus it must be the most compact, performant bit of code that it can possibly be. At the same time, I want it to be flexible, because ultimately I need to ability to change the very core scheduling algorithms and other core features, without having to totally reengineer the codebase to make changes.

So, I’ve been tinkering.

I’ve recently rewritten parts of the scheduler to utilize the functors, and iteratores. Here is the main loop as it stands today:

IOProcessor.start = function(self)
  self:addQuantaStep(Functor(self.stepIOEvents,self));
  self:addQuantaStep(Functor(self.stepTimeEvents,self));
  self:addQuantaStep(Functor(self.stepFibers,self));
  self:addQuantaStep(Functor(self.stepPredicates,self));


  self.ContinueRunning = true;

  when(Functor(self.noMoreTasks,self), Functor(self.stop, self))

  for astep in self:quantumSteps() do
    astep()
  end
end

IOProcessor.run = function(self, func, ...)
  if func ~= nil then
    self:spawn(func, ...);
  end

  self:start();
end

If you were writing a typical TINN based program where you wanted multi-tasking, you would be doing this:

local Task = require("IOProcessor")

local function main()
  -- after 5 seconds, stop the system
  delay(function(tick) Task:stop(), 5 * 1000)

  -- print something every half second
  periodic(function(tick) print("Hello Again: ",tick) end, 500)
end

Task:run(main)

The IOProcessor.start() routine is littered with Functors! What are they all doing? OK, first with the ‘addQuantaSteps’. The quanta steps are the runctions that are executed each time through the scheduler’s loop. You can see this in the iterator at the bottom:

  for astep in self:quantumSteps() do
    astep()
  end

The usual way you’d see this code written is as a while loop, or some other looping construct. In this case, I have used an iterator as the fundamental loop construct. That seems a bit novel, or at least slightly off the beaten path. Why bother with such a construct? Well, first of all, since it’s an iterator, the ‘step()’ function doesn’t actually know anything about the code that is being executed. It doesn’t know how long, and it doesn’t assume any side effects. This gives great flexibility because in order to change the behavior of the main loop only requires changing the behavior of the iterator that is being used. You could easily modify the scheduler to utilize your own custom iterator if you like. The only assumption is that the items coming out of the iterator are functions that can be called, and that’s it.

The ‘addQuantaStep()’ function calls are simply a way to prepopulate the list of functions with a known set of things I want to do each time through the scheduler’s loop. I made this a function call because that way any bit of code could do the same thing before running the scheduler. Here the Functor is being used to ensure we capture the state that goes along with the function.

The last place the Functor is being used here is in the ‘when()’ call:

  when(Functor(self.noMoreTasks,self), Functor(self.stop, self))

“when there are no more tasks, stop”

The functors are used because the functions in question live within the IOProcessor table, and are called with state information, so it needs to be associated with them. As an aside, why not just make these functions flat? Why are they even associated with a table anyway? Well, because, even though at the moment you can run only a single scheduler in your process, there’s no reason in the world why this MUST be the case. With another couple of changes, you’ll actually be able to run multiple schedulers at the same time within a single process. Yep, that might be a tad bit useful. Kind of like running multiple processes at the OS level.

At any rate, just for kicks, here is what the iterator of those functors looks like:

IOProcessor.addQuantaStep = function(self, astep)
  table.insert(self.QuantaSteps,astep)
end

IOProcessor.quantumSteps = function(self)
  local index = 0;
  local listSize = #self.QuantaSteps;

  local closure = function()
    if not self.ContinueRunning then
      return nil;
    end

    index = index + 1;
		
    local astep = self.QuantaSteps[index]

    if (index % listSize) == 0 then
      index = 0
    end

    return astep
  end

  return closure	
end

Basically, just keep going through the list of steps over and over again until something sets ‘ContinueRunning’ to false.

Having these steps represented as an iterator satisfies the needs that I have for running a fairly dynamically configurable main loop. While I was doing this, I was also considering what implications this might have for doing other things with the iterator. For example, is there something I can do with functional programming? The recent Lua Fun library is all about iterators, and I’m wondering if I can do something with that.

Functor and iterator in hand, I am going to look to streamline the scheduler even further. I am imagining the core of the scheduler is not more than 100 lines of code. The quantum steps, which is where all the real action is, could live separately, and be easily pluggable modules. That’s where this is headed, because I don’t know how to deal with all that complexity otherwise.

So, there you have it. Functors can be useful in practice, and not just a software engineering curiosity.


What the Functor! Again?

No, I’m not talking about George Clinton (probably related to Bill), but rather the function that can wrap functions.

What’s the point of this? In this case, I’ll start with the Functor class itself.

local functor = {}
setmetatable(functor, {
  __call = function(self, ...)
    return self:create(...);
  end,
})
local functor_mt = {
  __index = functor,
  __call = function(self, ...)
    if self.Target then
      return self.Func(self.Target, ...)
    end

    return self.Func(...)
  end,
}

functor.init = function(self, func, target)
  local obj = {
    Func = func;
    Target = target;
  }
  setmetatable(obj, functor_mt)

  return obj;
end

functor.create = function(self, func, target)
  return self:init(func, target)
end

Before explaining how this little thing works, here’s how it is used.

local function printSomething(words)
  print(words)
end

local f1 = Functor(printSomething)

f1("words to say")

OK. So, what’s the big deal? This is the most basic case, and there is no big deal. Basically the Function wraps the function you pass to it when you construct it: f1 = Functor(printSomething)

Then, later, when you use the function: f1(“words to say”), it’s as if you were calling the function directly. Why on earth would you ever want to do this?

Let’s imagine that you have a list of functions that you want to call from within the body of some other function.

listOfFuncs = {
  Functor(func1),
  Functor(func2), 
  Functor(func3)
}

local function every(funclist)
  for _, fun in ipairs(funclist) do
    fun()
  end
end

every(listOfFuncs)

For regular functions, there’s no real benefit here to using a functor, and there’s all this added overhead.

Now here’s another case. Instead of the functions just laying around without any associated state, they are methods of ‘objects’. The trick is to get the object instance associated with the function pointer, while maintaining the same relatively easy syntax.

local methods = {}
local methods_mt = {
	__index = methods,
}

methods.init = function(self)
	local obj = {
		name = "foo",
		name2 = "bar",
	}
	setmetatable(obj, methods_mt)
	
	return obj;
end

methods.method1 = function(self)
	print(self.name)
end

methods.method2 = function(self)
	print(self.name2)
end

local m1 = methods:init();

local f2 = Functor(methods.method1, m1)
local f3 = Functor(m1.method2, m1)

f2();
f3();

In this case, I need to associate the function pointer with an instance of the methods ‘object’, so when the Functor is constructed, the second parameter, which is the instance of the object, is also passed along.

Now, back to the Functor code, when it comes time to execute the function, it does it one way or another depending on whether it has a target or not:

  __call = function(self, ...)
    if self.Target then
      return self.Func(self.Target, ...)
    end

    return self.Func(...)
  end,

If it has a target, it will pass that as the first parameter when calling the function. If there is no specific target, then it will just call the function, passing along the supplied parameters.

So, this solves a very basic problem. Without the Functor construct, I would have to use a ‘closure’ for each function. A closure is essentially the same thing, but the language itself supports the concept of keeping information associated with a function. It might do this by placing information in a global state, or some other construct that might not be quite as succinct as this Functor. The Functor construct allows you to do essentially what you would be doing with a closure, but by using an object, you can more tightly control exactly what’s going on, what’s in the structure, lifetime, and all that.

To use a closure, or to use a functor? Closures are an easy and natural part of the Lua language. Functors are a construct that may or may not be efficient in comparison. The Functor does solve a particular problem when it comes to sticking a function in a list with its attendant state.


Follow

Get every new post delivered to your Inbox.

Join 44 other followers