schedlua – async io

And so, finally we come to the point. Thus far, we looked at the simple scheduler, the core signaling, and the predicate and alarm functions built atop that. That’s all great stuff for fairly straight forward apps that need some amount of concurrency. The best part though is when you can do concurrent networking stuff.

Here’s the setup; I want to issue 20 concurrent GET requests to 20 different sites, and get results back. I want the program to halt once all the tasks have been completed.

--test_linux_net.lua
package.path = package.path..";../?.lua"

local ffi = require("ffi")

local Kernel = require("kernel"){exportglobal = true}
local predicate = require("predicate")(Kernel, true)
local AsyncSocket = require("AsyncSocket")

local sites = require("sites");

-- list of tasks
local taskList = {}


local function httpRequest(s, sitename)
	local request = string.format("GET / HTTP/1.1\r\nUser-Agent: schedlua (linux-gnu)\r\nAccept: */*\r\nHost: %s\r\nConnection: close\r\n\r\n", sitename);
	return s:write(request, #request);
end

local function httpResponse(s)
	local BUFSIZ = 512;
	local buffer = ffi.new("char[512+1]");
	local bytesRead = 0
	local err = nil;
	local cumulative = 0

	repeat
		bytesRead, err = s:read(buffer, BUFSIZ);

		if bytesRead then
			cumulative = cumulative + bytesRead;
		else
			print("read, error: ", err)
			break;
		end
	until bytesRead < 1

	return cumulative;
end


local function siteGET(sitename)
	print("siteGET, BEGIN: ", sitename);

	local s = AsyncSocket();

	local success, err = s:connect(sitename, 80);  

	if success then
		httpRequest(s, sitename);
		httpResponse(s);
	else
		print("connect, error: ", err, sitename);
	end

	s:close();

	print("siteGET, FINISHED: ", sitename)
end


local function allProbesFinished()
	for idx, t in ipairs(taskList) do
		if t:getStatus() ~= "dead" then
			return false;
		end
	end

	return true;
end

local function main()
	for count=1,20 do
		table.insert(taskList, Kernel:spawn(siteGET, sites[math.random(#sites)]))
		Kernel:yield();
	end

	when(allProbesFinished, halt);
end

run(main)

Step by step. The httpRequest() function takes a socket, and does the most bare mimimal HTTP GET request, assuming the socket is already connected to the site.

Similarly, the httpResponse() function gets a response back from the server, and reads as much as it can until the socket is closed (because the Connection: close header was sent).

That’s about the most basic of HTTP request/response pairs you can have, ignoring doing any parsing of the returned data.

Alright, so let’s wrap those two up into a function called siteGET(). siteGET(sitename) takes the name of a site, creates a socket, connects it to the site, and then issues the httpRequest(), and then the httpResponse(). Very simple. What I like about this is that the httpRequest(); httpResponse() sequence is executed in serial as far as I’m concerned. I don’t have to be worried about the httpResponse() being issued before the request completes. Furthermore, if I didn’t use a spawn(), I could simply execute the code directly and be none the wiser.

I want to execute these siteGET()s concurrently though, so within main(), I start up 20 of these tasks, and let them go. Then comes the waiting part:

local function allProbesFinished()
	for idx, t in ipairs(taskList) do
		if t:getStatus() ~= "dead" then
			return false;
		end
	end

	return true;
end

	when(allProbesFinished, halt);

Going back to our knowledge of predicates, we know that the ‘when’ function takes a predicate (function that returns true/false), and will execute the second function when the predicate returns true.

OK, so we just need to come up with a predicate which tells us that all the tasks have completed. Easy enough as a list of the tasks is generated when they are spawned. So, we just go through that list and see if any of them are still running. If there is a single one that is still running, the predicate will return false, and ‘halt()’ will not be called. As soon as the last task finished, the predicate will return true, and the halt() function will be called.

Of course, most things in schedlua are convenient compositions of deeper things (with signals being at the core).

Instead of using the ‘when’ function, you could write the code more directly like this:

	while true
		if allProbesFinished() then
			halt();
			break;
		end
		yield();
	end

That doesn’t quite look as nice as just using the when() function I think. Also, you’re sitting in the main() function, which is no big deal as there’s nothing else trying to execute after this, but it just doesn’t seem as clean. Furthermore, the ‘when’ function might have some magic in its implementation, such as a better understanding of the state of tasks, or special knowledge of the scheduler, or who knows what. At any rate, either way essentially implements a barrier, and the technique can be used anywhere you want to perform an action after some set of tasks has completed. The allProbesFinished() function can be generalized to wait on any list of tasks, maybe call it “waitForTasks()” or some such thing.

At any rate, that completes the primitives that are baked into the core schedlua package. Everything from signals, to predicates, alarms, and finally async io. Of course this is Linux, so async io works with any file descriptor, not just network sockets, so file management or device communications in general can be thrown into the mix.

Now that the basics work, it’s a matter of cleaning up, writing more test cases, fixing bugs, reorganizing, and optimizing resource usage a bit. In general though, the constructs are there, and it’s ready to be used for real applications.


schedlua – predicates and alarms

A few years back, during the creation of Language Integrated Query (LINQ), I had this idea. If we could add database semantics to the language, what would adding async semantics look like. These days we’ve gone the route of async/await and various other constructs, but still, I always just wanted a primitives. such as “when”, “whenever”, and “waitUntil”. The predicates in schedlua are just that:

  • signalOnPredicate(predicate, signalName)
  • waitForPredicate(predicate)
  • when(predicate, func)
  • whenever(predicate, func)

Of courese, this is all based at the very core on the signaling mechanism that’s in the kernel of schedlua, but these primitives are not in the kernel proper.  They don’t need to be, which is nice because it means you can easily add such functions without having to alter the core.

What do they look like in practice?  Well, first of all, a ‘predicate’ is nothing more than a fancy name for a function that returns a bool value.  It will either return ‘true’ or ‘false’.  Based on this, various things can occur.  For example, ‘signalOnPredicate’, when the predicate returns ‘true’, emit the signal specified by signalName.  Similarly, for ‘waitForPredicate’, the currently running task will be put into a suspended state until such time as the predicate returns ‘true’.  ‘when’ and ‘whenever’ are similar, but they spawn new tasks, rather than suspending the existing task.  And here’s some code:

 

--test_scheduler.lua
package.path = package.path..";../?.lua"

local Functor = require("functor")
local Kernel = require("kernel"){exportglobal = true}
local Predicate = require("predicate")(Kernel, true)

local idx = 0;
local maxidx = 100;

local function numbers(ending)
	local function closure()
		idx = idx + 1;
		if idx > ending then
			return nil;
		end
		return idx;
	end
	
	return closure;
end



local function counter(name, nCount)
	for num in numbers(nCount) do
		print(num)
		local eventName = name..tostring(num);
		--print(eventName)
		signalOne(eventName);

		yield();
	end

	signalAll(name..'-finished')
end


local function predCount(num)
	waitForPredicate(function() return idx > num end)
	print(string.format("PASSED: %d!!", num))
end



local function every5()
	while idx <= maxidx do
		waitForPredicate(function() return (idx % 5) == 0 end)
		print("!! matched 5 !!")
		yield();
	end
end

local function test_whenever()
	local t1 = whenever(
		function() 
			if idx >maxidx then return nil end; 
			return (idx % 2) == 0 end,
		function() print("== EVERY 2 ==") end)
end

local function main()
	local t1 = spawn(counter, "counter", maxidx)

	local t6 = spawn(test_whenever);

	local t2 = spawn(predCount, 12)
	local t3 = spawn(every5)
	local t4 = spawn(predCount, 50)
	local t5 = when(
		function() return idx == 75 end, 
		function() print("WHEN IDX == 75!!") end)
	local t6 = when(function() return idx >= maxidx end,
		function() halt(); end);


end

run(main)



It’s a test case, so it’s doing some contrived things.  Basically, there is one task that is running a counter that throws a signal for every new number (up to maxid).  Then you can see the various tests which use predicates.

local function every5()
	while idx <= maxidx do
		waitForPredicate(function() return (idx % 5) == 0 end)
		print("!! matched 5 !!")
		yield();
	end
end

Here, we’re just running a continuous loop which will print it’s message every time the predicate is true. It seems kind of wasteful doesn’t it? Under normal circumstances, this would be a very hot spin loop, but when you call ‘waitForPredicate’, you task will alctually be thrown into a ‘suspended’ state, which means if there are other tasks to execute, they’ll go ahead, and you’ll get back in the queue to be tested later. So, it’s really, “test this predicate, if it’s not true, then throw the task at the back of the ready list, and test it again later. If it’s true, then continue on with whatever is next in this task”. The ‘yield()’ here is redundant.

In this particular case, we’ve essentially created a ‘whenever’ construct. This construct happens enough that it’s worth creating a convenience function for it.

local function test_whenever()
	local t1 = whenever(
		function() 
			if idx >maxidx then return nil end; 
			return (idx % 2) == 0 end,
		function() print("== EVERY 2 ==") end)
end

In this particular case, we let the ‘whenever’ construct do the work for us. Every other count, we’ll print our message. Of course, I’m using in place functions (lambda expressions?) in these test cases. They don’t have to be that way, you can set the functions however you want.

t6 is interesting because it says, ‘when the count reaches maxidx, halt the program’, which will in fact break us out of the main even loop and stop the program. Very convenient. This construct is useful because there may be myriad reasons why you’d want to stop the program. You can simply setup a predicate to do that. It could be a ‘when’ or perhaps you’d rather it be based on a signal, in that case use a signalOnPredicate/waitForSignal sort of thing. It’s composable, so use whatever set of constructs makes the most sense. It’s kind of a sane form of exception handling.

So there you have it, yet another construct tackled. Predicates are a simple construct that kind of extend the if/then flow control into the async realm. ‘when’ is almost a direct replacement for ‘if’ in this context. The waitOnPredicate is kind of a new construct I think. It’s like an if/spinlock, except you’re not spinning, you’re suspended with periodic checks on the predicate. And then of course the ‘signalOnPredicate’ is like a hail Mary pass. You don’t know who/what is going to respond, but you’re going to send up the signal. That’s like programming with interrupts, except, unless the scheduler allows for high priority interrupts/signals, these will be handled in the normal flow of cooperative processing.

Predicates are great, they’re a slightly different construct than I’ve typically used in my every day programming. They make some tasks a lot easier, and they make thinking about async programming more manageable.

And then there’s time…

Time is a lot easier construct to think about, because it’s well represented in most language frameworks already. Here are the time primitives:
 

  • waitUntilTime
  • sleep
  • delay
  • periodic

 
‘waitUntilTime’ is the lynch pin in this case. It will suspend the current task until the specified time. Time, in this case is a relative thing. The alarm module keeps its own clock, so everything is expressed relative to that clock.

sleep(seconds), will simply suspend the current task for a specified number of seconds. You can specify fractions of seconds, and the clock has nanosecond precision, but we’re not using a realtime scheduler, so you’ll get some amount of delay. Of course you could simply swap in a new scheduler and deal with any realtime requirements you might have.

delay, will spawn a task which will then execute the specified function after the specified amount of time has passed. This is kind of like a ‘when’ predicate, with a specialization for time. You could in fact reconstruct this primitive using the when predicate, but the alarm, knowing about time as it does, will do it more efficiently.

local Kernel = require("kernel"){exportglobal = true};
local Alarm = require("alarm")(Kernel)
local Clock = require("clock")

local c1 = Clock();

local function twoSeconds()
	print("TWO SECONDS: ", c1:secondsElapsed());
	Kernel:halt();
end

local function test_alarm_delay()
	print("delay(twoSeconds, 2000");
	Alarm:delay(twoSeconds, 2000);
end

run(test_alarm_delay)

periodic is similar, in that it well execute a function, but whereas ‘delay’ is a oneshot event, ‘periodic’ will repeat. In this way it is like the ‘whenever’ construct.

And there you have it. Between the predicates and the alarms, you have some new basic constructs for doing async programming. They are supported by the signaling construct that’s already a part of the kernel. They are simple add-ons, which means you can easily create your own constructs and replace these, or create completely new constructs which are similar. They can leverage the signaling mechanism, or maybe they want to do something else altogether.

So far, the constructs have been of the if/then variety, only in async fashion. I think there’s another set of constructs, which have to do with barriers and completions of tasks. That will clean up the other part of async, which is the ‘await’. We’ll see. In the meanwhile, next time, async io, which is pretty exciting.


schedlua – the kernel

Previously, I talked about the scheduler within the new scedlua.  A scheduler is a fairly simple thing, it just decides which of the many ready tasks will run next.  The default scheduler follows a fairly simple FIFO strategy, so there are no priorities, favorites, or the like.  Of course this wouldn’t be any fun if you were stuck with just one scheduler, so naturally enough this is an easily pluggable part of the system.  But what does this plugging?

In steps the Kernel.  In general, the schedlua project is about creating a set of tools by which highly performant services can be constructed.  schedlua largely supports the concept that a single processor can be highly leveraged if programmed correctly.  It does not try to gain performance through the usage of multiple threads, but rather it just takes on the task of suspending various tasks which are blocked on IO or otherwise idle, and letting tasks which are ready to run do their thing.  The concurrency model is cooperative, not preemptive, so if any one task misbehaves, the process can become stuck.

So, let’s take a look at this code:

--kernel.lua
-- kernel is a singleton, so return
-- single instance if we've already been
-- through this code
print("== KERNEL INCLUDED ==")

local Scheduler = require("scheduler")
local Task = require("task")
local Queue = require("queue")
local Functor = require("functor")

local Kernel = {
	ContinueRunning = true;
	TaskID = 0;
	Scheduler = Scheduler();
	TasksSuspendedForSignal = {};
}

setmetatable(Kernel, {
    __call = function(self, params)
    	params = params or {}
    	params.Scheduler = params.Scheduler or self.Scheduler
    	
    	if params.exportglobal then
    		self:globalize();
    	end

    	self.Scheduler = params.Scheduler;

    	return self;
    end,
})

function Kernel.getNewTaskID(self)
	self.TaskID = self.TaskID + 1;
	return self.TaskID;
end

function Kernel.getCurrentTaskID(self)
	return self:getCurrentTask().TaskID;
end

function Kernel.getCurrentTask(self)
	return self.Scheduler:getCurrentTask();
end

function Kernel.spawn(self, func, ...)
	local task = Task(func, ...)
	task.TaskID = self:getNewTaskID();
	self.Scheduler:scheduleTask(task, {...});
	
	return task;
end

function Kernel.suspend(self, ...)
	self.Scheduler:suspendCurrentFiber();
	return self:yield(...)
end

function Kernel.yield(self, ...)
	return self.Scheduler:yield();
end


function Kernel.signalOne(self, eventName, ...)
	if not self.TasksSuspendedForSignal[eventName] then
		return false, "event not registered", eventName
	end

	local nTasks = #self.TasksSuspendedForSignal[eventName]
	if nTasks < 1 then
		return false, "no tasks waiting for event"
	end

	local suspended = self.TasksSuspendedForSignal[eventName][1];

	self.Scheduler:scheduleTask(suspended,{...});
	table.remove(self.TasksSuspendedForSignal[eventName], 1);

	return true;
end

function Kernel.signalAll(self, eventName, ...)
	if not self.TasksSuspendedForSignal[eventName] then
		return false, "event not registered"
	end

	local nTasks = #self.TasksSuspendedForSignal[eventName]
	if nTasks < 1 then
		return false, "no tasks waiting for event"
	end

	for i=1,nTasks do
		self.Scheduler:scheduleTask(self.TasksSuspendedForSignal[eventName][1],{...});
		table.remove(self.TasksSuspendedForSignal[eventName], 1);
	end

	return true;
end

function Kernel.waitForSignal(self, eventName)
	local currentFiber = self.Scheduler:getCurrentTask();

	if currentFiber == nil then
		return false, "not currently in a running task"
	end

	if not self.TasksSuspendedForSignal[eventName] then
		self.TasksSuspendedForSignal[eventName] = {}
	end

	table.insert(self.TasksSuspendedForSignal[eventName], currentFiber);

	return self:suspend()
end

function Kernel.onSignal(self, func, eventName)
	local function closure()
		self:waitForSignal(eventName)
		func();
	end

	return self:spawn(closure)
end



function Kernel.run(self, func, ...)

	if func ~= nil then
		self:spawn(func, ...)
	end

	while (self.ContinueRunning) do
		self.Scheduler:step();		
	end
end

function Kernel.halt(self)
	self.ContinueRunning = false;
end

function Kernel.globalize()
	halt = Functor(Kernel.halt, Kernel);
    onSignal = Functor(Kernel.onSignal, Kernel);

    run = Functor(Kernel.run, Kernel);

    signalAll = Functor(Kernel.signalAll, Kernel);
    signalOne = Functor(Kernel.signalOne, Kernel);

    spawn = Functor(Kernel.spawn, Kernel);
    suspend = Functor(Kernel.suspend, Kernel);

    waitForSignal = Functor(Kernel.waitForSignal, Kernel);

    yield = Functor(Kernel.yield, Kernel);
end

return Kernel;

 

From the top, you can see the Kernel requires the scheduler, task and functor. The scheduler has already been explained. The Kernel serves a couple of purposes. First of all, it manages the scheduler. The ‘run’ function at the bottom is the ‘loop’ of the application. It will run until ‘halt’ is called. Each time through the loop it’s telling the scheduler to take a step.

Also at the bottom, you can see usage of the Functor. A functor is just a simple convenience wrapper that helps you call a function on a table at a later point. Those functors are used to make the keywords global.

There are two primary things the kernel does. One is to spawn new tasks, the other is to provide a central point for signal handling.

First, let’s look at the ‘spawn’.

function Kernel.spawn(self, func, ...)
	local task = Task(func, ...)
	task.TaskID = self:getNewTaskID();
	self.Scheduler:scheduleTask(task, {...});
	
	return task;
end

This is actually the coolest part of the system in terms of how the programming model is expressed. Here’s an example of it in use.

local Kernel = require("kernel"){exportglobal = true}


local function numbers(ending)
	local idx = 0;
	local function closure()
		idx = idx + 1;
		if idx > ending then
			return nil;
		end
		return idx;
	end
	
	return closure;
end

local function task1()
	print("first task, first line")
	yield();
	print("first task, second line")
end

local function task2()
	print("second task, only line")
end

local function counter(name, nCount)
	for num in numbers(nCount) do
		print(name, num);
		yield();
	end
	halt();
end

local function main()
	local t0 = spawn(counter, "counter1", 5)
	local t1 = spawn(task1)
	local t2 = spawn(task2)
	local t3 = spawn(counter, "counter2", 7)
end

run(main)

Basically, any time you want something to happen concurrently, you just say ‘spawn(func, params) and that’s that.

What happens is a Task object is created which holds onto the function object as well as the initial set of parameters. This task is then sent to the scheduler to be run. From then on out you can forget about it. Of course, you’re handed the task when you say ‘spawn’, so you do have a chance of suspending and killing it off in the future if you like. Similarly, you can wait for a task to complete as well.

So, that’s spawning.

The other major feature in the kernel is signal handling.

function Kernel.waitForSignal(self, eventName)
	local currentFiber = self.Scheduler:getCurrentTask();

	if currentFiber == nil then
		return false, "not currently in a running task"
	end

	if not self.TasksSuspendedForSignal[eventName] then
		self.TasksSuspendedForSignal[eventName] = {}
	end

	table.insert(self.TasksSuspendedForSignal[eventName], currentFiber);

	return self:suspend()
end

This is probably THE most important routine in the whole system. Basically, take the current task, and put it onto the suspension list, connected with the signal name (signals are just a string value). Later on, when you want the task to resume, you would signal it using either signalOne(), or signalAll().

A little bit of code demonstrating this:

local Kernel = require("kernel"){exportglobal = true}
local Functor = require("functor")

local function numbers(ending)
	local idx = 0;
	local function closure()
		idx = idx + 1;
		if idx > ending then
			return nil;
		end
		return idx;
	end
	
	return closure;
end

local function waitingOnCount(name, ending)
	local eventName = name..tostring(ending)
	waitForSignal(eventName)

	print("REACHED COUNT: ", ending)
end

local function onCountFinished(name)
	print("Counter Finished: ", name)
end

local function counter(name, nCount)
	for num in numbers(nCount) do
		print(num)
		local eventName = name..tostring(num);
		--print(eventName)
		signalOne(eventName);

		yield();
	end

	signalAll(name..'-finished')
end

local function main()
	local t1 = spawn(counter, "counter", 50)
	local t2 = spawn(waitingOnCount, "counter", 20)
	local t3 = spawn(function() print("LAMDA"); waitForSignal("counter15") print("reached 15!!") end)
	
	-- test signalAll().  All three of these should trigger when
	-- counter finishes
	local t13 = onSignal(Functor(onCountFinished, "counter-1"), "counter-finished")
	local t14 = onSignal(Functor(onCountFinished, "counter-2"), "counter-finished")
	local t15 = onSignal(Functor(onCountFinished, "counter-3"), "counter-finished")
end

run(main)

Here, there is a counter(), which is just counting, and firing off a signal for each number. The various waitingOnCount(), and LAMBDA routines are going to respond to the appropriate signals.

Finally, the t13, t14, and t15 tasks are waiting for the “counter-finished” signal, and they will all fire off and print their little message.

Of course, at this point you could have something that would call ‘halt()’ so you don’t have to press Ctl-C to stop the process, but you get the idea.

And that’s pretty much it for the kernel. Absent from here are the async io, predicates, alarms and the like. They are available in schedlua, but they’re not a part of the kernel. Instead of being part of the kernel proper, these are essentially modules. They utilize the signal and spawn features built into the kernel, and they’re free to do their own thing.

I’ll get into the details of alarms and predicates next time around to demonstrate the concept of easy add-on modules.


Asynchronous DNS lookups on Windows

I began this particular journey because I wanted to do DNS lookups asynchronously on Windows. There is of course a function for that:

DnsQueryEx

The problem I ran into is that unlike the various other Windows functions I’ve done with async, this one does not use an IO Completion Port. Instead it uses a mechanism called APC (Asynchronouse Procedure Call). With this little bit of magic, you pass in a pointer to a function which will be called in your thread context, kind of in between when other things are happening. Well, given the runtime environment I’m using, I don’t think this quite works out. Basically, I’d have a function being called leaving the VM in an unknown state.

So, I got to digging. I figured, how hard can it be to make calls to a DNS server directly? After all, it is nothing more than a network based service with a well known protocol. Once I could make a straight networking call, then I could go back to leveraging IO Completion Ports just like I do for all other IO.

You can view the DNS system as nothing more than a database to which you pose queries. You express your queries using some nice well defined protocol, which is ancient in origin, and fairly effective considering how frequently DNS queries are issued. Although I could man up and write the queries from scratch, Windows helps me here by providing functions that will format the query into a buffer for me.

But, before I get into that, what do the queries look like? What am I looking up? Well, a Domain Name Server serves up translations of names to other names and numbers. For example, I need to find the IP address of http://www.bing.com. I can look for CNAMES (an alias), or ‘A’ records (direct to an IP address. This gets esoteric and confusing, so a little code can help:

-- Prepare the DNS request
local dwBuffSize = ffi.new("DWORD[1]", 2048);
local buff = ffi.new("uint8_t[2048]")

local wID = clock:GetCurrentTicks() % 65536;
        
local res = windns_ffi.DnsWriteQuestionToBuffer_UTF8( 
  ffi.cast("DNS_MESSAGE_BUFFER*",buff), 
  dwBuffSize, 
  ffi.cast("char *",strToQuery), 
  wType, 
  wID, 
  true )

DnsWriteQuestionToBuffer_UTF8 is the Windows function which helps me to write a DNS query into a buffer, which will then be send to the actual dns server.

wType, represents the type of record you want to be returned. The values might be something like:

wType = ffi.C.DNS_TYPE_A
wType = ffi.C.DNS_TYPE_MX  - mail records
wType = ffi.C.DNS_TYPE_CNAME

There are about a hundred different types that you can query for. The vast majority of the time though, you either looking for ‘A’, or ‘CNAME’ records.

The wID is just a unique identifier for the particular query so that if you’re issuing several on the same channel, you can check the response to ensure they match up.

OK. Now I have a DNS query stuffed into a buffer, how do I make the query and get the results?

-- Send the request.
local IPPORT_DNS = 53;
local remoteAddr = sockaddr_in(IPPORT_DNS, AF_INET);
remoteAddr.sin_addr.S_addr = ws2_32.inet_addr( "209.244.0.3");

-- create the UDP socket
local socket, err = NativeSocket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );

-- send the query
local iRes, err = socket:sendTo(
  ServerAddress, ffi.sizeof(ServerAddress), 
  buff, dwBuffSize[0]);

This little bit of code shows the socket creation, and the actual ‘sendTo’ call. Of note, the “209.244.0.3” represents the IP address of a well known public DNS server. In this case it is hosted by Level 3, which is a internet services provider. There are of course calls you can make to figure out which DNS server your machine is typically configured to use, but this way the query will always work, no matter which network you are on.

Notice the socket is a UDP socket.

At this point, we’re already running cooperatively due to the fact that within TINN, all IO is done cooperatively, without the programmer needing to do much special.

Now to receive the query response back:

   -- Try to receive the results
    local RecvFromAddr = sockaddr_in();
    local RecvFromAddrSize = ffi.sizeof(RecvFromAddr);
    local cbReceived, err = self.Socket:receiveFrom(RecvFromAddr, RecvFromAddrSize, buff, 2048);

Basically just wait for the server to send back a response. Of course, like the sendTo, the receiveFrom works cooperatively, so that if the developer issues several ‘spawn’ commands, each query could be running in its own task, working cooperatively.

Once you have the response, you can parse out the results. The results come back as a set of records. There are of course functions which will help you to parse these records out. The key here is that the record type is indicated, and its up to the developer to pull out the relevant information.

The complete DNSNameServer class is here:

local ffi = require("ffi")

local Application = require("Application")
local windns_ffi = require("windns_ffi")
local NativeSocket = require("NativeSocket")
local ws2_32 = require("ws2_32")
local Stopwatch = require("StopWatch")

local clock = Stopwatch();

-- DNS UDP port
local IPPORT_DNS = 53;

local DNSNameServer = {}
setmetatable(DNSNameServer, {
    __call = function(self, ...)
        return self:create(...)
    end,
})
local DNSNameServer_mt = {
    __index = DNSNameServer,
}

function DNSNameServer.init(self, serveraddress)
    local socket, err = NativeSocket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );

    if not socket then
        return nil, err
    end

    local obj = {
        Socket = socket,
        ServerAddress = serveraddress,
    }
    setmetatable(obj, DNSNameServer_mt)

    return obj;
end

function DNSNameServer.create(self, servername)
    local remoteAddr = sockaddr_in(IPPORT_DNS, AF_INET);
    remoteAddr.sin_addr.S_addr = ws2_32.inet_addr( servername );

    return self:init(remoteAddr)
end

-- Construct DNS_TYPE_A request, send it to the specified DNS server, wait for the reply.
function DNSNameServer.Query(self, strToQuery, wType, msTimeout)
    wType = wType or ffi.C.DNS_TYPE_A
    msTimeout = msTimeout or 60 * 1000  -- 1 minute


    -- Prepare the DNS request
    local dwBuffSize = ffi.new("DWORD[1]", 2048);
    local buff = ffi.new("uint8_t[2048]")

    local wID = clock:GetCurrentTicks() % 65536;
        
    local res = windns_ffi.DnsWriteQuestionToBuffer_UTF8( ffi.cast("DNS_MESSAGE_BUFFER*",buff), dwBuffSize, ffi.cast("char *",strToQuery), wType, wID, true )

    if res == 0 then
        return false, "DnsWriteQuestionToBuffer_UTF8 failed."
    end

    -- Send the request.
    local iRes, err = self.Socket:sendTo(self.ServerAddress, ffi.sizeof(self.ServerAddress), buff, dwBuffSize[0]);
    

    if (not iRes) then
        print("Error sending data: ", err)
        return false, err
    end

    -- Try to receive the results
    local RecvFromAddr = sockaddr_in();
    local RecvFromAddrSize = ffi.sizeof(RecvFromAddr);
    local cbReceived, err = self.Socket:receiveFrom(RecvFromAddr, RecvFromAddrSize, buff, 2048);

    if not cbReceived then
        print("Error Receiving Data: ", err)
        return false, err;
    end

    if( 0 == cbReceived ) then
        return false, "Nothing received"
    end

    -- Parse the DNS response received with DNS API
    local pDnsResponseBuff = ffi.cast("DNS_MESSAGE_BUFFER*", buff);
    windns_ffi.DNS_BYTE_FLIP_HEADER_COUNTS ( pDnsResponseBuff.MessageHead );

    if pDnsResponseBuff.MessageHead.Xid ~= wID then        
        return false, "wrong transaction ID"
    end

    local pRecord = ffi.new("DNS_RECORD *[1]",nil);

    iRes = windns_ffi.DnsExtractRecordsFromMessage_W( pDnsResponseBuff, cbReceived, pRecord );
    
    pRecord = pRecord[0];
    local pRecordA = ffi.cast("DNS_RECORD *", pRecord);
    
    local function closure()
        if pRecordA == nil then
            return nil;
        end

        if pRecordA.wType == wType then
            local retVal = pRecordA
            pRecordA = pRecordA.pNext

            return retVal;
        end

        -- Find the next record of the specified type
        repeat
            pRecordA = pRecordA.pNext;
        until pRecordA == nil or pRecordA.wType == wType
    
        if pRecordA ~= nil then
            local retVal = pRecordA
            pRecordA = pRecordA.pNext
            
            return retVal;
        end

        -- Free the resources
        if pRecord ~= nil then
            windns_ffi.DnsRecordListFree( pRecord, ffi.C.DnsFreeRecordList );
        end 

        return nil
    end

    return closure
end

function DNSNameServer.A(self, domainToQuery) return self:Query(domainToQuery, ffi.C.DNS_TYPE_A) end
function DNSNameServer.MX(self, domainToQuery) return self:Query(domainToQuery, ffi.C.DNS_TYPE_MX) end
function DNSNameServer.CNAME(self, domainToQuery) return self:Query(domainToQuery, ffi.C.DNS_TYPE_CNAME) end
function DNSNameServer.SRV(self, domainToQuery) return self:Query(domainToQuery, ffi.C.DNS_TYPE_SRV) end

return DNSNameServer

Notice at the end there are some convenience functions for a few of the well known DNS record types. The ‘Query()’ function is generic, and will return records of any type. These convenience functions just make it easier.

And how to use it?

local ffi = require("ffi")
local DNSNameServer = require("DNSNameServer")
local core_string = require("core_string_l1_1_0")


--local serveraddress = "10.211.55.1"		-- xfinity
local serveraddress = "209.244.0.3" -- level 3

local domains = {
	"www.nanotechstyles.com",
	"www.adafruit.com",
	"adafruit.com",
	"adamation.com",
	"www.adamation.com",
	"microsoft.com",
	"google.com",
	"ibm.com",
	"oracle.com",
	"sparkfun.com",
	"apple.com",
	"netflix.com",
	"www.netflix.com",
	"www.us-west-2.netflix.com",
	"www.us-west-2.prodaa.netflix.com",
	"news.com",
	"hardkernel.org",
	"amazon.com",
	"walmart.com",
	"target.com",
	"godaddy.com",
	"luajit.org",
}



local function queryA()
	local function queryDomain(name)
		local dns = DNSNameServer(serveraddress) -- ms corporate
		print("==== DNS A ====> ", name)
		for record in dns:A(name) do
			local a = IN_ADDR();
    		a.S_addr = record.Data.A.IpAddress

    		print(string.format("name: %s\tIP: %s, TTL %d", name, a, record.dwTtl));
		end
	end

	for _, name in ipairs(domains) do 
		spawn(queryDomain, name)
		--queryDomain(name)
	end
end

local function queryCNAME()
	local dns = DNSNameServer(serveraddress) -- ms corporate
	local function queryDomain(name)
		print("==== DNS CNAME ====> ", name)
		for record in dns:CNAME(name) do
			print(core_string.toAnsi(record.pName), core_string.toAnsi(record.Data.CNAME.pNameHost))
		end
	end

	for _, name in ipairs(domains) do 
		queryDomain(name)
	end
end

local function queryMX()
	local function queryDomain(name)
		local dns = DNSNameServer(serveraddress) -- ms corporate
		print("==== DNS MX ====> ", name)
		for record in dns:MX(name) do
			print(core_string.toAnsi(record.pName), core_string.toAnsi(record.Data["MX"].pNameExchange))
		end
	end

	for _, name in ipairs(domains) do 
		spawn(queryDomain, name)
	end
end

local function querySRV()
	local dns = DNSNameServer(serveraddress) -- ms corporate
	for _, name in ipairs(domains) do 
		print("==== DNS SRV ====> ", name)
		for record in dns:SRV(name) do
			print(core_string.toAnsi(record.pName), core_string.toAnsi(record.Data.SRV.pNameTarget))
		end
	end
end

local function main()
  queryA();
  --queryCNAME();
  --queryMX();
  --querySRV();
end

run(main)

The function queryA() will query for the ‘A’ records, and print them out. Notice that it has knowledge of the giant union structure that contains the results, and it pulls out the specific information for ‘A’ records. It will create a new instance of the DNSNameServer for each query. That’s not as bad as it might seem. All it amounts to is creating a new UDP socket for each query. Since each query is spawned into its own task, they are all free to run and complete independently, which was the goal of this little exercise.

In the case of the CNAME query, there is only a single socket, and it is used repeatedly, serially, for each query.

The difference between the two styles is noticeable. For the serial case, the queries might ‘take a while’, because you have to wait for each result to come back before issuing the next query. In the cooperative case, you issue several queries in parallel, so the total time will only be as long as the longest query.

That’s a good outcome.

I like this style of programming. You go as low as you can to root out where the system might otherwise block, and you make that part cooperative. That way everything else above it is automatically cooperative. I also like the fact that it feels like I’m getting some parallelism, but I’m not using any of the typical primitives of parallelism, including actual threads, mutexes, and the like.

Well, that’s a hefty bit of code, and it serves the purpose I set out, so I’m a happy camper. Now, if I could just turn those unions into tables automatically…


All the pretty little asyncs…

I have gone on about various forms of async for quite some time now. So could there possibly be more? Well, yes of course!

Here’s the scenario I want to enable. I want to keep track of my file system activity, sending the various operations to a distant storage facility. I want to do this while a UI displays what’s going on, and I want to be able to configure things while its happening, like which events I really care to shadow, and the like.

I don’t want to use multiple OS level threads if I can at all avoid them as they will complicate my programming tremendously. So, what to do.

Well, first I’ll start with the file tracking business. I have talked about change journals in the past. This is a basic mechanism that Windows has to track changes to the file system. Every single open, close, delete, write, etc, has an entry in this journal. If you’re writing a backup program, you’ll be using change journals.

The essence of the change journal functionality is usage of the DeviceIoControl() function. Most of us are very familiar with the likes of CreateFile(), ReadFile(), WriteFile(), CloseHandle(), when it comes to dealing with files. But, for everything else, there is this DeviceIOControl() function.

What is a device? Well, you’d be surprised to learn that most things in the Windows OS are represented by ‘devices’ just like they are in UNIX systems. For example, ‘C:’, is a device. But, also “DISPLAY1” is also a device, as are “LCD” and “PhysicalDisk0”. When it comes to controlling devices, the Win32 level API calls will ultimately make DeviceIoControl() calls with various parameters. That’s great to know as it allows you to create whatever API you want, as long as you know the nuances of the device driver you’re trying to control.

But, I digress. The key point here is that I can open up a device, and I can make a DeviceIoControl() call, and true to form, I can use OVERLAPPED structures, and IO Completion Ports. That makes these calls “async”, or with TINN, cooperative.

To wrap it up in a tidy little bow, here is a Device class which does the grunt work for me:

--[[
References
http://msdn.microsoft.com/en-us/magazine/cc163415.aspx
--]]
local ffi = require("ffi")
local bit = require("bit")
local bor = bit.bor;

local core_file = require("core_file_l1_2_0");
local core_io = require("core_io_l1_1_1");
local Application = require("Application")
local IOOps = require("IOOps")
local FsHandles = require("FsHandles");
local errorhandling = require("core_errorhandling_l1_1_1");
local WinBase = require("WinBase");


local Device = {}
setmetatable(Device, {
	__call = function(self, ...)
		return self:open(...)
	end,
})
local Device_mt = {
	__index = Device,
}

function Device.init(self, rawhandle)
	local obj = {
		Handle = FsHandles.FsHandle(rawhandle)
	}
	setmetatable(obj, Device_mt)
	
	Application:watchForIO(rawhandle, rawhandle)

	return obj;
end


function Device.open(self, devicename, dwDesiredAccess, dwShareMode)
	local lpFileName = string.format("\\\\.\\%s", devicename);
	dwDesiredAccess = dwDesiredAccess or bor(ffi.C.GENERIC_READ, ffi.C.GENERIC_WRITE);
	dwShareMode = bor(FILE_SHARE_READ, FILE_SHARE_WRITE);
	local lpSecurityAttributes = nil;
	local dwCreationDisposition = OPEN_EXISTING;
	local dwFlagsAndAttributes = FILE_FLAG_OVERLAPPED;
	local hTemplateFile = nil;

	local handle = core_file.CreateFileA(
        lpFileName,
        dwDesiredAccess,
        dwShareMode,
     	lpSecurityAttributes,
        dwCreationDisposition,
        dwFlagsAndAttributes,
     	hTemplateFile);


	if handle == INVALID_HANDLE_VALUE then
		return nil, errorhandling.GetLastError();
	end

	return self:init(handle)
end

function Device.getNativeHandle(self)
	return self.Handle.Handle;
end

function Device.createOverlapped(self, buff, bufflen)
	local obj = ffi.new("FileOverlapped");
	
	obj.file = self:getNativeHandle();
	obj.OVL.Buffer = buff;
	obj.OVL.BufferLength = bufflen;

	return obj;
end

function Device.control(self, dwIoControlCode, lpInBuffer, nInBufferSize, lpOutBuffer, nOutBufferSize)
	local lpBytesReturned = nil;
	local lpOverlapped = self:createOverlapped(ffi.cast("void *", lpInBuffer), nInBufferSize);


	local status = core_io.DeviceIoControl(self:getNativeHandle(), 
          dwIoControlCode, 
          ffi.cast("void *", lpInBuffer),
          nInBufferSize,
          lpOutBuffer,
          nOutBufferSize,
          lpBytesReturned,
          ffi.cast("OVERLAPPED *",lpOverlapped));

	local err = errorhandling.GetLastError();

	-- Error conditions
	-- status == 1, err == WAIT_TIMEOUT (258)
	-- status == 0, err == ERROR_IO_PENDING (997)
	-- status == 0, err == something else

	if status == 0 then
		if err ~= ERROR_IO_PENDING then
			return false, err
		end
	end

    local key, bytes, ovl = Application:waitForIO(self, lpOverlapped);

    return bytes;
end

return Device

I’ve shown this kind of construct before with the NativeFile object. That object contains Read, and Write functions as well, but lacks the control() function. Of course the two could be combined for maximum benefit.

How to use this thing?

dev = Device("c:")
dev:control(...)

OK, that’s out of the way. Now, what about this change journal thing? Very simple now that the device is handled.
A change journal can look like this:

-- USNJournal.lua
-- References
-- http://msdn.microsoft.com/en-us/library/windows/desktop/aa364563(v=vs.85).aspx
-- http://www.microsoft.com/msj/0999/journal/journal.aspx
-- http://www.microsoft.com/msj/1099/journal2/journal2.aspx
-- 

local ffi = require("ffi");
local bit = require("bit");
local bor = bit.bor;
local band = bit.band;

local core_io = require("core_io_l1_1_1");
local core_file = require("core_file_l1_2_0");
local WinIoCtl = require("WinIoCtl");
local WinBase = require("WinBase");
local errorhandling = require("core_errorhandling_l1_1_1");
local FsHandles = require("FsHandles");
local Device = require("Device")

--[[
	ChangeJournal

	An abstraction for NTFS Change journal management
--]]
local ChangeJournal = {}
setmetatable(ChangeJournal, {
	__call = function(self, ...)
		return self:open(...);
	end,
});

local ChangeJournal_mt = {
	__index = ChangeJournal;
}

ChangeJournal.init = function(self, device)
	local obj = {
		Device = device;
	}
	setmetatable(obj, ChangeJournal_mt);

	local jinfo, err = obj:getJournalInfo();

	print("ChangeJournal.init, jinfo: ", jinfo, err)

	if jinfo then
		obj.JournalID = jinfo.UsnJournalID;
		obj.LowestUsn = jinfo.LowestValidUsn;
		obj.FirstUsn = jinfo.FirstUsn;
		obj.MaxSize = jinfo.MaximumSize;
		obj.MaxUsn = jinfo.MaxUsn;
		obj.AllocationSize = jinfo.AllocationDelta;
	end

	return obj;
end


ChangeJournal.open = function(self, driveLetter)
	local device, err = Device(driveLetter)

	if not device then
		print("ChangeJournal.open, ERROR: ", err)
		return nil, err
	end

	return self:init(device);
end


ChangeJournal.getNextUsn = function(self)
	local jinfo, err = self:getJournalInfo();

	if not jinfo then
		return false, err;
	end

	return jinfo.NextUsn;
end



ChangeJournal.getJournalInfo = function(self)
	local dwIoControlCode = FSCTL_QUERY_USN_JOURNAL;
	local lpInBuffer = nil;
	local nInBufferSize = 0;
	local lpOutBuffer = ffi.new("USN_JOURNAL_DATA");
	local nOutBufferSize = ffi.sizeof(lpOutBuffer);

	local success, err = self.Device:control(dwIoControlCode, 
          lpInBuffer,
          nInBufferSize,
          lpOutBuffer,
          nOutBufferSize);

	if not success then
		return false, errorhandling.GetLastError();
	end

	return lpOutBuffer;
end

function ChangeJournal.waitForNextEntry(self, usn, ReasonMask) 
 	usn = usn or self:getNextUsn();
 	local ReasonMask = ReasonMask or 0xFFFFFFFF;
 	local ReturnOnlyOnClose = false;
 	local Timeout = 0;
 	local BytesToWaitFor = 1;

    local ReadData = ffi.new("READ_USN_JOURNAL_DATA", {usn, ReasonMask, ReturnOnlyOnClose, Timeout, BytesToWaitFor, self.JournalID});

    local pusn = ffi.new("USN");
    
    -- This function does not return until the USN
    -- record exits
	local BUF_LEN = ffi.C.USN_PAGE_SIZE;
	local Buffer = ffi.new("uint8_t[?]", BUF_LEN);
    local dwBytes = ffi.new("DWORD[1]");

	local success, err = self.Device:control(FSCTL_READ_USN_JOURNAL, 
        ReadData,
        ffi.sizeof(ReadData),
        Buffer,
        BUF_LEN);

	if not success then 
		return false, err
	end

	local UsnRecord = ffi.cast("PUSN_RECORD", ffi.cast("PUCHAR",Buffer) + ffi.sizeof("USN")); 

    return UsnRecord;
end

return ChangeJournal;

This very much looks like the change journal I created a few months back. The primary difference is the device control stuff is abstracted out into the Device object, so it does not need to be repeated here.

When we want to track the changes to the device, we make repeated calls to ‘waitForNextEntry’.

local function test_waitForNextEntry(journal)
    local entry = journal:waitForNextEntry();

    while entry do
        printJournalEntry(entry);
        entry = journal:waitForNextEntry();
    end
end

This is your typical serially written code. There’s nothing that look special about it, no hint of async processing. Behind the covers though, way back in the Device:control() function, the actual sending of a command to the device happens using IO Completion Port, so if you’re running with TINN, this particular task will ‘waitForIO’, and other tasks can continue.

So, using it in context looks like this:

local function main()
    local journal, err = ChangeJournal("c:")

    spawn(test_waitForNextEntry, journal);
    periodic(function() print("tick") end, 1000)
end

run(main)

In this case, I spawn the journal waiting/printing thing as its own task. Then I setup a periodic timer to simply print “tick” every second, to show there is some activity.

Since the journaling is cooperative (mostly waiting for io control to complete), the periodic timer, or UI processing, or what have you, is free to run, without any hindrance.

Combine this with the already cooperative UI stuff, and you can imagine how easy it could be to construct the scenario I set out to construct. Since all networking and file system operations in TINN are automatically async, it would be easy to log these values, or send them across a network to be stored for later analysis or what have you.

And there you have it. Async everywhere makes for some very nice scenarios. Being able to do async on any device, whether with standard read/write operations, or io control, makes for very flexible programming.

Next time around, I’m going to show how to do async DNS queries for fun and profit.


ReadFile – The Good, the bad, and the async

If you use various frameworks on any platform, you’re probably an arm’s length away from the nasty little quirks of the underlying operating system.  If you are the creator of such frameworks, the nasty quirks are what you live with on a daily basis.

In TINN, I want to be async from soup to nuts.  All tcp/udp, socket stuff is already that way.  Recently I’ve been adding async support for “file handles”, and let me tell you, you have to be very careful around these things.

In the core windows APIs, in order to read from a file, you do two things.  You first open a file using the CreateFile(), function.  This may be a bit confusing, because why would you use “create” to ‘open’ an existing file?  Well, you have to think of it like a kernel developer might.  From that perspective, what you’re doing is ‘create a file handle’.  While you’re doing this, you can tell the function whether to actually create the file if it doesn’t exist already, open only if it exists, open read-only, etc.

The basic function signature for CreateFile() looks like this:

HANDLE WINAPI CreateFile(
  _In_      LPCTSTR lpFileName,
  _In_      DWORD dwDesiredAccess,
  _In_      DWORD dwShareMode,
  _In_opt_  LPSECURITY_ATTRIBUTES lpSecurityAttributes,
  _In_      DWORD dwCreationDisposition,
  _In_      DWORD dwFlagsAndAttributes,
  _In_opt_  HANDLE hTemplateFile
);

Well, that’s a mouthful, just to get a file handle. But hay, it’s not much more than you’d do in Linux, except it has some extra flags and attributes that you might want to take care of. Here’s where the history of Windows gets in the way. There is a much simpler function “OpenFile()”, which on the surface might do what you want, but beware, it’s a lot less capable, a leftover from the MSDOS days. The documentation is pretty clear about this point “don’t use this, use CreateFile instead…”, but still, you’d have to wade through some documentation to reach this conclusion.

Then, the ReadFile() function has this signature:

BOOL WINAPI ReadFile(
  _In_         HANDLE hFile,
  _Out_        LPVOID lpBuffer,
  _In_         DWORD nNumberOfBytesToRead,
  _Out_opt_    LPDWORD lpNumberOfBytesRead,
  _Inout_opt_  LPOVERLAPPED lpOverlapped
);

Don’t be confused by another function, ReadFileEx(). That one sounds even more modern, but in fact, it does not support the async file reading that I want.

Seems simple enough. Take the handle you got from CreateFile(), and pass it to this function, including a buffer, and you’re done? Well yah, this is where things get really interesting.
Windows supports two forms of IO processing. Async, and synchronous. The Synchronous case is easy. You just make your call, and your thread will be blocked until the IO “completes”. That is certainly easy to uderstand, and if you’re a user of the standard C library, or most other frameworks, this is exactly the behaviour you can expect. Lua, by default, using the standard io library will do exactly this.

The other case is when you want to do async io. That is, you want to initiate the ReadFile() and get an immediate return, and handle the processing of the result later, perhaps with an alert on an io completion port.

Here’s the nasty bit. This same function can be used in both cases, but has very different behavior. It’s a subtle thing. If you doing synchronous, then the kernel will track the fileposition, and automatically update it for you. So, you can do consecutive ReadFile() calls, and read the file contents from beginning to end.

But… When you do things async, the kernel will not track your file pointer. Instead, you must do this on your own! When you do async, you pass in a instance of a OVERLAPPED structure, wich contains things like a pointer to the buffer to be filled, as well as the size of the buffer. This structure also contains things like the offset within the file to read from. By default, the offset is ‘0’, which will have you reading from the beginning of the file every single time.

typedef struct _OVERLAPPED {
    ULONG_PTR Internal;
    ULONG_PTR InternalHigh;
    union {
        struct {
            DWORD Offset;
            DWORD OffsetHigh;
        };

        PVOID Pointer;
    };

    HANDLE hEvent;
} OVERLAPPED, *LPOVERLAPPED;

You have to be very careful and diligent with using this structure, and the proper calling sequences. In addition, if you’re going to do async, you need to call CreateFile() with the appropriate OVERLAPPED flag. In TINN, I have created the NativeFile object, which pretty much deals with all this subtlety. The NativeFile object presents a basic block device interface to the user, and wraps up all that subtlety such that the interface to files is clean and simple.

-- NativeFile.lua

local ffi = require("ffi")
local bit = require("bit")
local bor = bit.bor;

local core_file = require("core_file_l1_2_0");
local errorhandling = require("core_errorhandling_l1_1_1");
local FsHandles = require("FsHandles")
local WinBase = require("WinBase")
local IOOps = require("IOOps")

ffi.cdef[[
typedef struct {
  IOOverlapped OVL;

  // Our specifics
  HANDLE file;
} FileOverlapped;
]]

-- A win32 file interfaces
-- put the standard async stream interface onto a file
local NativeFile={}
setmetatable(NativeFile, {
  __call = function(self, ...)
    return self:create(...);
  end,
})

local NativeFile_mt = {
  __index = NativeFile;
}

NativeFile.init = function(self, rawHandle)
	local obj = {
		Handle = FsHandles.FsHandle(rawHandle);
		Offset = 0;
	}
	setmetatable(obj, NativeFile_mt)

	if IOProcessor then
		IOProcessor:observeIOEvent(obj:getNativeHandle(), obj:getNativeHandle());
	end

	return obj;
end

NativeFile.create = function(self, lpFileName, dwDesiredAccess, dwCreationDisposition, dwShareMode)
	if not lpFileName then
		return nil;
	end
	dwDesiredAccess = dwDesiredAccess or bor(ffi.C.GENERIC_READ, ffi.C.GENERIC_WRITE)
	dwCreationDisposition = dwCreationDisposition or OPEN_ALWAYS;
	dwShareMode = dwShareMode or bor(FILE_SHARE_READ, FILE_SHARE_WRITE);
	local lpSecurityAttributes = nil;
	local dwFlagsAndAttributes = bor(ffi.C.FILE_ATTRIBUTE_NORMAL, FILE_FLAG_OVERLAPPED);
	local hTemplateFile = nil;

	local rawHandle = core_file.CreateFileA(
        lpFileName,
        dwDesiredAccess,
        dwShareMode,
    	lpSecurityAttributes,
        dwCreationDisposition,
        dwFlagsAndAttributes,
    	hTemplateFile);

	if rawHandle == INVALID_HANDLE_VALUE then
		return nil, errorhandling.GetLastError();
	end

	return self:init(rawHandle)
end

NativeFile.getNativeHandle = function(self)
  return self.Handle.Handle
end

-- Cancel current IO operation
NativeFile.cancel = function(self)
  local res = core_file.CancelIo(self:getNativeHandle());
end

-- Close the file handle
NativeFile.close = function(self)
  self.Handle:free();
  self.Handle = nil;
end

NativeFile.createOverlapped = function(self, buff, bufflen, operation, deviceoffset)
	if not IOProcessor then
		return nil
	end

	fileoffset = fileoffset or 0;

	local obj = ffi.new("FileOverlapped");

	obj.file = self:getNativeHandle();
	obj.OVL.operation = operation;
	obj.OVL.opcounter = IOProcessor:getNextOperationId();
	obj.OVL.Buffer = buff;
	obj.OVL.BufferLength = bufflen;
	obj.OVL.OVL.Offset = deviceoffset;

	return obj, obj.OVL.opcounter;
end

-- Write bytes to the file
NativeFile.writeBytes = function(self, buff, nNumberOfBytesToWrite, offset, deviceoffset)
	fileoffset = fileoffset or 0

	if not self.Handle then
		return nil;
	end

	local lpBuffer = ffi.cast("const char *",buff) + offset or 0
	local lpNumberOfBytesWritten = nil;
	local lpOverlapped = self:createOverlapped(ffi.cast("uint8_t *",buff)+offset,
		nNumberOfBytesToWrite,
		IOOps.WRITE,
		deviceoffset);

	if lpOverlapped == nil then
		lpNumberOfBytesWritten = ffi.new("DWORD[1]")
	end

	local res = core_file.WriteFile(self:getNativeHandle(), lpBuffer, nNumberOfBytesToWrite,
		lpNumberOfBytesWritten,
  		ffi.cast("OVERLAPPED *",lpOverlapped));

	if res == 0 then
		local err = errorhandling.GetLastError();
		if err ~= ERROR_IO_PENDING then
			return false, err
		end
	else
		return lpNumberOfBytesWritten[0];
	end

	if IOProcessor then
    	local key, bytes, ovl = IOProcessor:yieldForIo(self, IOOps.WRITE, lpOverlapped.OVL.opcounter);
--print("key, bytes, ovl: ", key, bytes, ovl)
	    return bytes
	end
end

NativeFile.readBytes = function(self, buff, nNumberOfBytesToRead, offset, deviceoffset)
	offset = offset or 0
	local lpBuffer = ffi.cast("char *",buff) + offset
	local lpNumberOfBytesRead = nil
	local lpOverlapped = self:createOverlapped(ffi.cast("uint8_t *",buff)+offset,
		nNumberOfBytesToRead,
		IOOps.READ,
		deviceoffset);

	if lpOverlapped == nil then
		lpNumberOfBytesRead = ffi.new("DWORD[1]")
	end

	local res = core_file.ReadFile(self:getNativeHandle(), lpBuffer, nNumberOfBytesToRead,
		lpNumberOfBytesRead,
		ffi.cast("OVERLAPPED *",lpOverlapped));

	if res == 0 then
		local err = errorhandling.GetLastError();

--print("NativeFile, readBytes: ", res, err)

		if err ~= ERROR_IO_PENDING then
			return false, err
		end
	else
		return lpNumberOfBytesRead[0];
	end

	if IOProcessor then
    	local key, bytes, ovl = IOProcessor:yieldForIo(self, IOOps.READ, lpOverlapped.OVL.opcounter);

    	local ovlp = ffi.cast("OVERLAPPED *", ovl)
    	print("overlap offset: ", ovlp.Offset)

--print("key, bytes, ovl: ", key, bytes, ovl)
	    return bytes
	end

end

return NativeFile;

This is enough of a start. If you want to simply open a file:

local NativeFile = require("NativeFile")
local fd = NativeFile("sample.txt");

From there you can use readBytes(), and writeBytes(). If you want to do streaming, you can feed this into the new and improved Stream class like this:

local NativeFile = require("NativeFile") 
local Stream = require("stream") 
local IOProcessor = require("IOProcessor")

local function main()

  local filedev, err = NativeFile("./sample.txt", nil, OPEN_EXISTING, FILE_SHARE_READ)

  -- wrap the file block device with a stream
  local filestrm = Stream(filedev)

  local line1, err = filestrm:readLine();  
  local line2, err = filestrm:readLine();  
  local line3, err = filestrm:readLine()

  print("line1: ", line1, err)  
  print("line2: ", line2, err)  
  print("line3: ", line3, err) 
end

run(main)

The Stream class looks for readBytes() and writeBytes(), and can provide the higher level readLine(), writeLine(), read/writeString(), and a few others. This is great because it can be fed by anything that purports to be a block device, which could be anything from an async file, to a chunk of memory.

And that’s about it for now. There are subtleties when dealing with async file access in windows. Having a nice abstraction on top of it gives you all the benefits of async without all the headaches.

 


Its About Time – TINN Timers

The last time I wrote about time: Hurry Up and Wait –  TINN Timing the focus was on introducing the wait() function and explaining how that fits into the TINN scheduler in general.

Having a wait() function is a great low level primitive.  It allows you to pause execution for an amount of time, in a synchronous manner.  That will work well when you’re doing something serially and need to take break at specified intervals.  What if your usage pattern is more complex though?  More likely than not, if you’re writing a web server of some sort, you’ll be doing this sort of sequence:
executeasynccodethattakestime()

if codeNotDoneAfter sometime then

cancel code

end

do other stuff while waiting

Basically, I need to do something like send off a web request.  If the request has not been satisfied within a specified amount of time, I want to cancel the operation.  I need to be able to do this asynchronously because I may have many requests in flight.  So, what to do?  I obviously need a Timer of some sort that will deal with this for me.

local Task = require("IOProcessor")

local Timer = {}
setmetatable(Timer, {
  __call = function(self, ...)
    return self:create(...)
  end,
});

local Timer_mt = {
  __index = Timer;
}

Timer.init = function(self,params)
  local obj = {
    Delay = params.Delay;
    Period = params.Period;
    Running = false;
    TimerFunc = params.TimerFunc;
  }
  setmetatable(obj, Timer_mt);

  if self.Running then
    obj:start()
  end

  return obj;
end

Timer.create = function(self, ...)
  return self:init(...);
end

Timer.start = function(self)
  self.Running = true;

  local function closure()
    if self.Delay then
      wait(self.Delay);
      self.TimerFunc(self);
    end

    if not self.Period then
      return
    end

    while self.Running do
      wait(self.Period)
      self.TimerFunc(self);
    end
  end

  spawn(closure);
end

Timer.stop = function(self)
  self.Running = false;
end

return Timer

To dissect, basically an object that provides easy wrapper convenience for the wait() function. You specify an initial delay, and a period and call the start() function. Start will spawn the actual function that is involved in doing the waiting and executing of the specified function.

Here is a simple use case:

-- test_Timer.lua

local Timer = require("Timer")

local function printThis(timer)
  print("this")
end

local function main()
  local timer = Timer {Delay = 1*1000, Period = 300, TimerFunc = printThis}

  timer:start();

  -- wait a few seconds, then stop the time
  print("wait 4 seconds...")
  wait(4*1000)
  print("stop timer")
  timer:stop(1000*4);

  -- Wait a few more seconds then exit
  print("wait 2 seconds...")
  wait(1000*2);
end

run(main)

In this case, I create a timer that has an initial 1 second delay, and a period of 300 milliseconds. So, after the initial delay, the printThis() function will be called. Then every 300 milliseconds after that, it will be called again.

In the sample, the timer is started, which causes it to run independently of the main task. Within the main task, wait 4 seconds, then call the stop() function on the time. Wait two more seconds, and finally exit altogether. This shows that a timer can run independently. The function that is called can be anything. If you want the function to have some parameters, it is itself probably a closure (function within a function). Additionally, since the function is passed the timer as the only parameter, it can cause the timer to stop. Here’s another example where a timer is running, and will stop after a certain number has been reached.

local count = 0;
local function counter(timer)
  count = count + 1;
  if count >= 5 then
    timer:stop();
  end
  print("counter: ",count)
end

local testCounter = function()
  local timer = Timer {Period = 200, TimerFunc = counter, Running=true}
end

run(testCounter)

The counter function is simple. Basically, increment a counter. Once it reaches 5, stop the time. Starting the counter in the first place is also fairly easy. Just set a period (forget the initial delay), tell it which function is to be executed every period, and set it to run automatically (without requiring an explicit ‘start()’).

This will call the function once every 200 milliseconds, and then quit. Nice to have.

With this little component in hand, you can probably imagine how I/O might be accomplished with timeouts. Basically, any I/O operation would look like:

function cancel(ioperator)
  local function closure(timer)
    timer:stop();
    ioperator:cancel();
  end
  return closure;
end

op = startioop(someparams)
timer=Timer{Delay=1000, TimerFunc=cancel(op), Running=true}

Here, a timer is created with a delay of one second. After one second, the timer fires and the cancel.closure() function is called. The operation is cancelled, if it’s not already done, and that’s the end of things. This assumes that there is no harm in canceling an already finished transaction.

Well, that’s just great. I/O with timeouts fully implemented in user space, asynchronous, and all that. This is a great thing for timers, io, and TINN code in general.

The only thing missing is the ability to do async on more than just sockets. So, next up is async ‘file’ I/O in general, of which sockets are a specialized subset.