All the pretty little asyncs…

I have gone on about various forms of async for quite some time now. So could there possibly be more? Well, yes of course!

Here’s the scenario I want to enable. I want to keep track of my file system activity, sending the various operations to a distant storage facility. I want to do this while a UI displays what’s going on, and I want to be able to configure things while its happening, like which events I really care to shadow, and the like.

I don’t want to use multiple OS level threads if I can at all avoid them as they will complicate my programming tremendously. So, what to do.

Well, first I’ll start with the file tracking business. I have talked about change journals in the past. This is a basic mechanism that Windows has to track changes to the file system. Every single open, close, delete, write, etc, has an entry in this journal. If you’re writing a backup program, you’ll be using change journals.

The essence of the change journal functionality is usage of the DeviceIoControl() function. Most of us are very familiar with the likes of CreateFile(), ReadFile(), WriteFile(), CloseHandle(), when it comes to dealing with files. But, for everything else, there is this DeviceIOControl() function.

What is a device? Well, you’d be surprised to learn that most things in the Windows OS are represented by ‘devices’ just like they are in UNIX systems. For example, ‘C:’, is a device. But, also “DISPLAY1” is also a device, as are “LCD” and “PhysicalDisk0”. When it comes to controlling devices, the Win32 level API calls will ultimately make DeviceIoControl() calls with various parameters. That’s great to know as it allows you to create whatever API you want, as long as you know the nuances of the device driver you’re trying to control.

But, I digress. The key point here is that I can open up a device, and I can make a DeviceIoControl() call, and true to form, I can use OVERLAPPED structures, and IO Completion Ports. That makes these calls “async”, or with TINN, cooperative.

To wrap it up in a tidy little bow, here is a Device class which does the grunt work for me:

local ffi = require("ffi")
local bit = require("bit")
local bor = bit.bor;

local core_file = require("core_file_l1_2_0");
local core_io = require("core_io_l1_1_1");
local Application = require("Application")
local IOOps = require("IOOps")
local FsHandles = require("FsHandles");
local errorhandling = require("core_errorhandling_l1_1_1");
local WinBase = require("WinBase");

local Device = {}
setmetatable(Device, {
	__call = function(self, ...)
		return self:open(...)
local Device_mt = {
	__index = Device,

function Device.init(self, rawhandle)
	local obj = {
		Handle = FsHandles.FsHandle(rawhandle)
	setmetatable(obj, Device_mt)
	Application:watchForIO(rawhandle, rawhandle)

	return obj;

function, devicename, dwDesiredAccess, dwShareMode)
	local lpFileName = string.format("\\\\.\\%s", devicename);
	dwDesiredAccess = dwDesiredAccess or bor(ffi.C.GENERIC_READ, ffi.C.GENERIC_WRITE);
	local lpSecurityAttributes = nil;
	local dwCreationDisposition = OPEN_EXISTING;
	local dwFlagsAndAttributes = FILE_FLAG_OVERLAPPED;
	local hTemplateFile = nil;

	local handle = core_file.CreateFileA(

	if handle == INVALID_HANDLE_VALUE then
		return nil, errorhandling.GetLastError();

	return self:init(handle)

function Device.getNativeHandle(self)
	return self.Handle.Handle;

function Device.createOverlapped(self, buff, bufflen)
	local obj ="FileOverlapped");
	obj.file = self:getNativeHandle();
	obj.OVL.Buffer = buff;
	obj.OVL.BufferLength = bufflen;

	return obj;

function Device.control(self, dwIoControlCode, lpInBuffer, nInBufferSize, lpOutBuffer, nOutBufferSize)
	local lpBytesReturned = nil;
	local lpOverlapped = self:createOverlapped(ffi.cast("void *", lpInBuffer), nInBufferSize);

	local status = core_io.DeviceIoControl(self:getNativeHandle(), 
          ffi.cast("void *", lpInBuffer),
          ffi.cast("OVERLAPPED *",lpOverlapped));

	local err = errorhandling.GetLastError();

	-- Error conditions
	-- status == 1, err == WAIT_TIMEOUT (258)
	-- status == 0, err == ERROR_IO_PENDING (997)
	-- status == 0, err == something else

	if status == 0 then
		if err ~= ERROR_IO_PENDING then
			return false, err

    local key, bytes, ovl = Application:waitForIO(self, lpOverlapped);

    return bytes;

return Device

I’ve shown this kind of construct before with the NativeFile object. That object contains Read, and Write functions as well, but lacks the control() function. Of course the two could be combined for maximum benefit.

How to use this thing?

dev = Device("c:")

OK, that’s out of the way. Now, what about this change journal thing? Very simple now that the device is handled.
A change journal can look like this:

-- USNJournal.lua
-- References

local ffi = require("ffi");
local bit = require("bit");
local bor = bit.bor;
local band =;

local core_io = require("core_io_l1_1_1");
local core_file = require("core_file_l1_2_0");
local WinIoCtl = require("WinIoCtl");
local WinBase = require("WinBase");
local errorhandling = require("core_errorhandling_l1_1_1");
local FsHandles = require("FsHandles");
local Device = require("Device")


	An abstraction for NTFS Change journal management
local ChangeJournal = {}
setmetatable(ChangeJournal, {
	__call = function(self, ...)
		return self:open(...);

local ChangeJournal_mt = {
	__index = ChangeJournal;

ChangeJournal.init = function(self, device)
	local obj = {
		Device = device;
	setmetatable(obj, ChangeJournal_mt);

	local jinfo, err = obj:getJournalInfo();

	print("ChangeJournal.init, jinfo: ", jinfo, err)

	if jinfo then
		obj.JournalID = jinfo.UsnJournalID;
		obj.LowestUsn = jinfo.LowestValidUsn;
		obj.FirstUsn = jinfo.FirstUsn;
		obj.MaxSize = jinfo.MaximumSize;
		obj.MaxUsn = jinfo.MaxUsn;
		obj.AllocationSize = jinfo.AllocationDelta;

	return obj;
end = function(self, driveLetter)
	local device, err = Device(driveLetter)

	if not device then
		print(", ERROR: ", err)
		return nil, err

	return self:init(device);

ChangeJournal.getNextUsn = function(self)
	local jinfo, err = self:getJournalInfo();

	if not jinfo then
		return false, err;

	return jinfo.NextUsn;

ChangeJournal.getJournalInfo = function(self)
	local dwIoControlCode = FSCTL_QUERY_USN_JOURNAL;
	local lpInBuffer = nil;
	local nInBufferSize = 0;
	local lpOutBuffer ="USN_JOURNAL_DATA");
	local nOutBufferSize = ffi.sizeof(lpOutBuffer);

	local success, err = self.Device:control(dwIoControlCode, 

	if not success then
		return false, errorhandling.GetLastError();

	return lpOutBuffer;

function ChangeJournal.waitForNextEntry(self, usn, ReasonMask) 
 	usn = usn or self:getNextUsn();
 	local ReasonMask = ReasonMask or 0xFFFFFFFF;
 	local ReturnOnlyOnClose = false;
 	local Timeout = 0;
 	local BytesToWaitFor = 1;

    local ReadData ="READ_USN_JOURNAL_DATA", {usn, ReasonMask, ReturnOnlyOnClose, Timeout, BytesToWaitFor, self.JournalID});

    local pusn ="USN");
    -- This function does not return until the USN
    -- record exits
	local BUF_LEN = ffi.C.USN_PAGE_SIZE;
	local Buffer ="uint8_t[?]", BUF_LEN);
    local dwBytes ="DWORD[1]");

	local success, err = self.Device:control(FSCTL_READ_USN_JOURNAL, 

	if not success then 
		return false, err

	local UsnRecord = ffi.cast("PUSN_RECORD", ffi.cast("PUCHAR",Buffer) + ffi.sizeof("USN")); 

    return UsnRecord;

return ChangeJournal;

This very much looks like the change journal I created a few months back. The primary difference is the device control stuff is abstracted out into the Device object, so it does not need to be repeated here.

When we want to track the changes to the device, we make repeated calls to ‘waitForNextEntry’.

local function test_waitForNextEntry(journal)
    local entry = journal:waitForNextEntry();

    while entry do
        entry = journal:waitForNextEntry();

This is your typical serially written code. There’s nothing that look special about it, no hint of async processing. Behind the covers though, way back in the Device:control() function, the actual sending of a command to the device happens using IO Completion Port, so if you’re running with TINN, this particular task will ‘waitForIO’, and other tasks can continue.

So, using it in context looks like this:

local function main()
    local journal, err = ChangeJournal("c:")

    spawn(test_waitForNextEntry, journal);
    periodic(function() print("tick") end, 1000)


In this case, I spawn the journal waiting/printing thing as its own task. Then I setup a periodic timer to simply print “tick” every second, to show there is some activity.

Since the journaling is cooperative (mostly waiting for io control to complete), the periodic timer, or UI processing, or what have you, is free to run, without any hindrance.

Combine this with the already cooperative UI stuff, and you can imagine how easy it could be to construct the scenario I set out to construct. Since all networking and file system operations in TINN are automatically async, it would be easy to log these values, or send them across a network to be stored for later analysis or what have you.

And there you have it. Async everywhere makes for some very nice scenarios. Being able to do async on any device, whether with standard read/write operations, or io control, makes for very flexible programming.

Next time around, I’m going to show how to do async DNS queries for fun and profit.


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s