Fast Apps, Microsoft Style

Pheeeuuww!!

That’s what I exclaimed at least a couple of times this morning as I sat at a table in a makeshift “team room” in building 43 at Microsoft’s Redmond campus. What was the exclamation for? Well, over the past 3 months, I’ve been working on a quick strike project with a new team, and today we finally announced our “Public Preview“.  Or, if you want to get right to the product: Cloud App Discovery

I’m not a PM or marketing type, so it’s best to go and read the announcement for yourself if you want to get the official spiel on the project.  Here, I want to write a bit about the experience of coming up with a project, in short order, in the new Microsoft.

It all started back in January for me.  I was just coming off another project, and casting about for the next hardest ‘mission impossible’ to jump on.  I had a brief conversation with a dev manager who posed the question; “Is it possible to reestablish the ‘perimeter’ for IT guys in this world of cloud computing”?  An intriguing question.  The basic problem was, if you go to a lot of IT guys, they can barely tell you how many of the people within their corporation are using SalesForce.com, let alone DropBox from a cafe in Singapore.  Forget the notion of even trying to control such access.  The corporate ‘firewall’ is almost nothing more than a quartz space heater at this point, preventing very little, and knowing about even less.

So, with that question in mind, we laid out 3 phases of development.  Actually, they were already laid out before I joined the party (by a couple of weeks), so I just heard the pitch.  It was simple, the first phase of development is to see if we can capture network traffic, using various means, and project it up to the  cloud where we could use some machine learning to give an admin a view of what’s going on.

Conveniently sidestepping any objections actual employees might have with this notion, I got to thinking on how it could be done.

For my part, we wanted to have something sitting on the client machine (a windows machine that the user is using), which will inspect all network traffic coming and going, and generate some reports to be sent up to the cloud.  Keep in mind, this is all consented activity, the employee gets to opt in to being monitored in this way.  All in the open and up front.

At the lowest level, my first inclination was to use a raw socket to create a packet sniffer, but Windows has a much better solution these days, built for exactly this purpose.  The Windows Filter Platform, allows you to create a ‘filter’ which you can configure to callout to a function whenever there is traffic.  My close teammate implemented that piece, and suddenly we had a handle on network packets.

We fairly quickly decided on an interface between that low level packet sniffing, and the higher level processor.  It’s as easy as this:

 

int WriteBytes(char *buff, int bufflen);
int ReadBytes(char *buff, int bufflen, int &bytesRead);

I’m paraphrasing a bit, but it really is that simple. What’s it do? Well, the fairly raw network packets are sent into ‘WriteBytes’, some processing is done, and a ‘report’ becomes available through ‘ReadBytes’. The reports are a JSON formatted string which then gets turned into the appropriate thing to be sent up to the cloud.

The time it took from hearing about the basic product idea, to a prototype of this thing was about 3 weeks.

What do I do once I get network packets? Well, the network packets represent a multiplexed stream of packets, as if I were a NIC. All incoming, outgoing, all TCP ports. Once I receive some bytes, I have to turn it back into individual streams, then start doing some ‘parsing’. Right now we handle http and TLS. For http, I do full http parsing, separating out headers, reading bodies, and the like. I did that by leveraging the http parsing work I had done for TINN already. I used C++ in this case, but it’s all relatively the same.

TLS is a different story. At this ‘discovery’ phase, it was more about simple parsing. So, reading the record layer, decoding client_hello and server_hello, certificate, and the like. This gave me a chance to implement TLS processing using C++ instead of Lua. One of the core components that I leveraged was the byte order aware streams that I had developed for TINN. That really is the crux of most network protocol handling. If you can make herds or tails of what the various RFCs are saying, it usually comes down to doing some simple serialization, but getting the byte ordering is the hardest part. 24-bit big endian integers?

At any rate, http parsing, fairly quick. TLS client_hello, fast enough, although properly handling the extensions took a bit of time. At this point, we’d be a couple months in, and our first partners get to start kicking the tires.

For such a project, it’s very critical that real world customers are involved really early, almost sitting in our design meetings. They course corrected us, and told us what was truly important and annoying about what we were doing, right from day one.

From the feedback, it becomes clear that getting more information, like the amount of traffic flowing through the pipes is as interesting as the meta information, so getting the full support for flows becomes a higher priority. For the regular http traffic, no problem. The TLS becomes a bit more interesting. In order to deal with that correctly, it becomes necessary to suck in more of the TLS implementation. Read the server_hello, and the certificate information. Well, if you’re going to read in the cert, you might as well get the subject common name out so you can use that bit of meta information. Now comes ASN.1 (DER) parsing, and x509 parsing. That code took about 2 weeks, working “nights and weekends” while the other stuff was going on. It took a good couple of weeks not to integrate, but to write enough test cases, with real live data, to ensure that it was actually working correctly.

The last month was largely a lot of testing, making sure corner cases were dealt with and the like. As the client code is actually deployed to a bunch of machines, it really needed to be rock solid, no memory leaks, no excessive resource utilization, no CPU spiking, just unobtrusive, quietly getting the job done.

So, that’s what it does.

Now, I’ve shipped at Microsoft for numerous years. The fastest cycles I’ve usually dealt with are on the order of 3 months. That’s usually for a product that’s fairly mature, has plenty of engineering system support, and a well laid out roadmap. Really you’re just turning the crank on an already laid out plan.

This AppDiscovery project has been a bit different. It did not start out with a plan that had a 6 month planning cycle in front of it. It was a hunch that we could deliver customer value by implementing something that was challenging enough, but achievable, in a short amount of time.

So, how is this different than Microsoft of yore? Well, yes, we’ve always been ‘customer focused’, but this is to the extreme. I’ve never had customers this involved in what I was doing this early in the development cycle. I mean literally, before the first prototypical bits are even dry, the PM team is pounding on the door asking “when can I give it to the customers?”. That’s a great feeling actually.

The second thing is how much process we allowed ourselves to use. Recognizing that it’s a first run, and recognizing that customers might actually say “mehh, not interested”, it doesn’t make sense to spin up the classic development cycle which is meant to maintain a product for 10-14 years. A much more streamlined lifecycle which favors delivering quality code and getting customer feedback, is what we employed. If it turns out that customers really like the product, then there’s room to fit the cycle to a cycle that is more appropriate for longer term support.

The last thing that’s special is the amount of leveraging Open Source we are allowing ourselves these days. Microsoft has gone full tilt on OpenSource support. I didn’t personally end up using much myself, but we are free to use it elsewhere (with some legal guidelines). This is encouraging, because for crypto, I’m looking forward to using things like SipHash, and ChaCha20, which don’t come natively with the Microsoft platform.

Overall, as Microsoft continues to evolve and deliver ‘customer centric’ stuff, I’m pretty excited and encouraged that we’ll be able to use this same model time and again to great effect. Microsoft has a lot of smart engineers. Combined with some new directives about meeting customer expectations at the market, we will surely be cranking out some more interesting stuff.

I’ve implemented some interesting stuff while working on this project, some if it I’ll share here.


Device Iteration with Functional Programming

One of the great pleasures I have in life is learning something new. There’s nothing greater than those ‘light bulb goes on’ moments as you realize something and gain a much deeper understanding than you had before.

Well, a little while ago, there was an announcement of this thing called Lua Fun.  Lua Fun is a large set of functions which make functional programming in Lua really easy.  It has the usual suspects such as map, reduce, filter, each, etc.  If you read the documentation, you get a really good understanding of how iterators work in Lua, and more importantly, how LuaJIT is able to fold and manipulate things in hot loops such that the produced code is much tighter than anything I could possibly write in C, or any other language I so happen to use.

So, now I’m a fan of Lua Fun, and I would encourage anyone who’s both into Lua, and functional programming to take a look.

How to use it?  I’ve been enumerating various types of things in the Windows system of late.  Using the MMDevice subsystem, I was able to get an enumeration of the audio devices (that took a lot of COM work).  What about displays, and disk drives, and USB devices, and…  Yes, each one of those things has an attendant API which will facilitate monitoring said category.  But, is there one to rule them all?  Well yes, as it turns out, in most cases what these various APIs are doing is getting information out of the System Registry, and just presenting it reasonably.

There is a way to enumerate all the devices in the system.  You know, like when you bring up the Device Manager in Windows, and you see a tree of devices, and their various details.  The stage is set, how do you do that?  I created a simple object that does the grunt work of enumerating the devices in the system.

The DeviceRecordSet is essentially a query. Creating an instance of the object just gives you a handle onto making query requests. Here is the code:

local ffi = require("ffi")
local bit = require("bit")
local bor = bit.bor;
local band = bit.band;

local errorhandling = require("core_errorhandling_l1_1_1");
local SetupApi = require("SetupApi")
local WinNT = require("WinNT")


local DeviceRecordSet = {}
setmetatable(DeviceRecordSet, {
	__call = function(self, ...)
		return self:create(...)
	end,
})

local DeviceRecordSet_mt = {
	__index = DeviceRecordSet,
}


function DeviceRecordSet.init(self, rawhandle)
	print("init: ", rawhandle)

	local obj = {
		Handle = rawhandle,
	}
	setmetatable(obj, DeviceRecordSet_mt)

	return obj;
end

function DeviceRecordSet.create(self, Flags)
	Flags = Flags or bor(ffi.C.DIGCF_PRESENT, ffi.C.DIGCF_ALLCLASSES)

	local rawhandle = SetupApi.SetupDiGetClassDevs(
		nil, 
        nil, 
        nil, 
        Flags);

	if rawhandle == nil then
		return nil, errorhandling.GetLastError();
	end

	return self:init(rawhandle)
end

function DeviceRecordSet.getNativeHandle(self)
	return self.Handle;
end

function DeviceRecordSet.getRegistryValue(self, key, idx)
	idx = idx or 0;

	did = ffi.new("SP_DEVINFO_DATA")
	did.cbSize = ffi.sizeof("SP_DEVINFO_DATA");

--print("HANDLE: ", self.Handle)
	local res = SetupApi.SetupDiEnumDeviceInfo(self.Handle,idx,did)

	if res == 0 then
		local err = errorhandling.GetLastError()
		--print("after SetupDiEnumDeviceInfo, ERROR: ", err)
		return nil, err;
	end

	local regDataType = ffi.new("DWORD[1]")
	local pbuffersize = ffi.new("DWORD[1]",260);
	local buffer = ffi.new("char[260]")

	local res = SetupApi.SetupDiGetDeviceRegistryProperty(
            self:getNativeHandle(),
            did,
			key,
			regDataType,
            buffer,
            pbuffersize[0],
            pbuffersize);

	if res == 0 then
		local err = errorhandling.GetLastError();
		--print("after GetDeviceRegistryProperty, ERROR: ", err)
		return nil, err;
	end

	--print("TYPE: ", regDataType[0])
	if (regDataType[0] == 1) or (regDataType[0] == 7) then
		return ffi.string(buffer, pbuffersize[0]-1)
	elseif regDataType[0] == ffi.C.REG_DWORD_LITTLE_ENDIAN then
		return ffi.cast("DWORD *", buffer)[0]
	end

	return nil;
end


function DeviceRecordSet.devices(self, fields)
	fields = fields or {
		{ffi.C.SPDRP_DEVICEDESC, "description"},
		{ffi.C.SPDRP_MFG, "manufacturer"},
		{ffi.C.SPDRP_DEVTYPE, "devicetype"},
		{ffi.C.SPDRP_CLASS, "class"},
		{ffi.C.SPDRP_ENUMERATOR_NAME, "enumerator"},
		{ffi.C.SPDRP_FRIENDLYNAME, "friendlyname"},
		{ffi.C.SPDRP_LOCATION_INFORMATION , "locationinfo"},
		{ffi.C.SPDRP_LOCATION_PATHS, "locationpaths"},
		{ffi.C.SPDRP_PHYSICAL_DEVICE_OBJECT_NAME, "objectname"},
		{ffi.C.SPDRP_SERVICE, "service"},
	}

	local function closure(fields, idx)
		local res = {}

		local count = 0;
		for _it, field in ipairs(fields) do
			local value, err = self:getRegistryValue(field[1], idx)
			if value then
				count = count + 1;
				res[field[2]] = value;
			end
		end

		if count == 0 then
			return nil;
		end
				
		return idx+1, res;
	end

	return closure, fields, 0
end

return DeviceRecordSet

The ‘getRegistryValue()’ function is the real workhorse of this object. That’s what gets your values out of the system registry. The other function of importance is ‘devices()’. This is an iterator.

There are a couple of things of note about this iterator. First of all, it does not require ‘up values’ to be held onto. All that means is that everything the iterator needs to operate is carried in the return values from the function. The ‘state’ if you will, is handed in fresh every time the ‘closure()’ is called. This is the key to creating an iterator that will work well with Lua Fun.

By default, this iterator will return quite a few (but not all) fields related to each object, and it will return all the objects. This is ok, because there are typically less than 150 objects in any given system.

Now, I want to do various queries against this set without much fuss. This is where Lua Fun, and functional programming in general, really shines.

First, a little setup:

--test_enumdevices.lua
local ffi = require("ffi")
local DeviceRecordSet = require("DeviceRecordSet")
local serpent = require("serpent")
local Functor = require("Functor")

local fun = require("fun")()
local drs = DeviceRecordSet();

local function printIt(record)
	print("==========")
	each(print, record)
	print("----------")
end

This creates an instance of the DeviceRecordSet object, which will be used in the queries. Already the printIt() function is utilizing Lua Fun. The ‘each()’ function will take whatever it’s handed, and perform the function specified. In this case, the ‘record’ will be a table. So, each will iterate over the table entries and print each one of them out. This is the equivalent of doing:

for k,v in pairs(record)
print(k, v)
end

I think that simply typing ‘each’ is a lot simpler and pretty easy to understand.

How about a query then?

-- show everything for every device
each(printIt, drs:devices())

In this case, the ‘each’ is applied to the results of the ‘devices()’ iterator. For each record coming from the devices iterator, the printIt function will be called, which will in turn print out all the values in the record. That’s pretty nice.

What if I don’t want to see all the fields in the record, I just want to see the objectname, and description fields. Well, this is a ‘map’ operation, or a projection in database parlance, so:

-- do a projection on the fields
local function projection(x)
  return {objectname = x.objectname, description = x.description}
end
each(printIt, map(projection, drs:devices()))

Working from the inside out, for each record coming from the devices() iterator, call the ‘projection’ function. The return value from the projection function becomes the new record for this iteration. For each of those records, call the printIt function.

Using ‘map’ is great as you can reshape data in any way you like without much fuss.

Lastly, I want to see only the records that are related to “STORAGE”, so…

-- show only certain records
local function enumeratorFilter(x)
	return x.enumerator == "STORAGE"
end

each(printIt, filter(enumeratorFilter, drs:devices()))

Here, the ‘filter’ iterator is used. So, again, for each of the records coming from the ‘devices()’ enumerator, call the ‘enumeratorFilter’ function. If this function returns ‘true’ for the record, then it is passed along as the next record for the ‘each’. If ‘false’, then it is skipped, and the next record is tried.

This is pretty powerful, and yet simple stuff. The fact that iterators create new iterators, in tight loops, makes for some very dense and efficient code. If you’re interested in why this is so special in LuaJIT, and not many other languages, read up on the Lua Fun documentation.

I’ve killed two birds with one stone. I have finally gotten to the root of all device iterators. I have also learned how to best write iterators that can be used in a functional programming way. Judicious usage of this mechanism will surely make a lot of my code more compact and readable, as well as highly performant.

 


Asynchronous DNS lookups on Windows

I began this particular journey because I wanted to do DNS lookups asynchronously on Windows. There is of course a function for that:

DnsQueryEx

The problem I ran into is that unlike the various other Windows functions I’ve done with async, this one does not use an IO Completion Port. Instead it uses a mechanism called APC (Asynchronouse Procedure Call). With this little bit of magic, you pass in a pointer to a function which will be called in your thread context, kind of in between when other things are happening. Well, given the runtime environment I’m using, I don’t think this quite works out. Basically, I’d have a function being called leaving the VM in an unknown state.

So, I got to digging. I figured, how hard can it be to make calls to a DNS server directly? After all, it is nothing more than a network based service with a well known protocol. Once I could make a straight networking call, then I could go back to leveraging IO Completion Ports just like I do for all other IO.

You can view the DNS system as nothing more than a database to which you pose queries. You express your queries using some nice well defined protocol, which is ancient in origin, and fairly effective considering how frequently DNS queries are issued. Although I could man up and write the queries from scratch, Windows helps me here by providing functions that will format the query into a buffer for me.

But, before I get into that, what do the queries look like? What am I looking up? Well, a Domain Name Server serves up translations of names to other names and numbers. For example, I need to find the IP address of http://www.bing.com. I can look for CNAMES (an alias), or ‘A’ records (direct to an IP address. This gets esoteric and confusing, so a little code can help:

-- Prepare the DNS request
local dwBuffSize = ffi.new("DWORD[1]", 2048);
local buff = ffi.new("uint8_t[2048]")

local wID = clock:GetCurrentTicks() % 65536;
        
local res = windns_ffi.DnsWriteQuestionToBuffer_UTF8( 
  ffi.cast("DNS_MESSAGE_BUFFER*",buff), 
  dwBuffSize, 
  ffi.cast("char *",strToQuery), 
  wType, 
  wID, 
  true )

DnsWriteQuestionToBuffer_UTF8 is the Windows function which helps me to write a DNS query into a buffer, which will then be send to the actual dns server.

wType, represents the type of record you want to be returned. The values might be something like:

wType = ffi.C.DNS_TYPE_A
wType = ffi.C.DNS_TYPE_MX  - mail records
wType = ffi.C.DNS_TYPE_CNAME

There are about a hundred different types that you can query for. The vast majority of the time though, you either looking for ‘A’, or ‘CNAME’ records.

The wID is just a unique identifier for the particular query so that if you’re issuing several on the same channel, you can check the response to ensure they match up.

OK. Now I have a DNS query stuffed into a buffer, how do I make the query and get the results?

-- Send the request.
local IPPORT_DNS = 53;
local remoteAddr = sockaddr_in(IPPORT_DNS, AF_INET);
remoteAddr.sin_addr.S_addr = ws2_32.inet_addr( "209.244.0.3");

-- create the UDP socket
local socket, err = NativeSocket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );

-- send the query
local iRes, err = socket:sendTo(
  ServerAddress, ffi.sizeof(ServerAddress), 
  buff, dwBuffSize[0]);

This little bit of code shows the socket creation, and the actual ‘sendTo’ call. Of note, the “209.244.0.3″ represents the IP address of a well known public DNS server. In this case it is hosted by Level 3, which is a internet services provider. There are of course calls you can make to figure out which DNS server your machine is typically configured to use, but this way the query will always work, no matter which network you are on.

Notice the socket is a UDP socket.

At this point, we’re already running cooperatively due to the fact that within TINN, all IO is done cooperatively, without the programmer needing to do much special.

Now to receive the query response back:

   -- Try to receive the results
    local RecvFromAddr = sockaddr_in();
    local RecvFromAddrSize = ffi.sizeof(RecvFromAddr);
    local cbReceived, err = self.Socket:receiveFrom(RecvFromAddr, RecvFromAddrSize, buff, 2048);

Basically just wait for the server to send back a response. Of course, like the sendTo, the receiveFrom works cooperatively, so that if the developer issues several ‘spawn’ commands, each query could be running in its own task, working cooperatively.

Once you have the response, you can parse out the results. The results come back as a set of records. There are of course functions which will help you to parse these records out. The key here is that the record type is indicated, and its up to the developer to pull out the relevant information.

The complete DNSNameServer class is here:

local ffi = require("ffi")

local Application = require("Application")
local windns_ffi = require("windns_ffi")
local NativeSocket = require("NativeSocket")
local ws2_32 = require("ws2_32")
local Stopwatch = require("StopWatch")

local clock = Stopwatch();

-- DNS UDP port
local IPPORT_DNS = 53;

local DNSNameServer = {}
setmetatable(DNSNameServer, {
    __call = function(self, ...)
        return self:create(...)
    end,
})
local DNSNameServer_mt = {
    __index = DNSNameServer,
}

function DNSNameServer.init(self, serveraddress)
    local socket, err = NativeSocket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );

    if not socket then
        return nil, err
    end

    local obj = {
        Socket = socket,
        ServerAddress = serveraddress,
    }
    setmetatable(obj, DNSNameServer_mt)

    return obj;
end

function DNSNameServer.create(self, servername)
    local remoteAddr = sockaddr_in(IPPORT_DNS, AF_INET);
    remoteAddr.sin_addr.S_addr = ws2_32.inet_addr( servername );

    return self:init(remoteAddr)
end

-- Construct DNS_TYPE_A request, send it to the specified DNS server, wait for the reply.
function DNSNameServer.Query(self, strToQuery, wType, msTimeout)
    wType = wType or ffi.C.DNS_TYPE_A
    msTimeout = msTimeout or 60 * 1000  -- 1 minute


    -- Prepare the DNS request
    local dwBuffSize = ffi.new("DWORD[1]", 2048);
    local buff = ffi.new("uint8_t[2048]")

    local wID = clock:GetCurrentTicks() % 65536;
        
    local res = windns_ffi.DnsWriteQuestionToBuffer_UTF8( ffi.cast("DNS_MESSAGE_BUFFER*",buff), dwBuffSize, ffi.cast("char *",strToQuery), wType, wID, true )

    if res == 0 then
        return false, "DnsWriteQuestionToBuffer_UTF8 failed."
    end

    -- Send the request.
    local iRes, err = self.Socket:sendTo(self.ServerAddress, ffi.sizeof(self.ServerAddress), buff, dwBuffSize[0]);
    

    if (not iRes) then
        print("Error sending data: ", err)
        return false, err
    end

    -- Try to receive the results
    local RecvFromAddr = sockaddr_in();
    local RecvFromAddrSize = ffi.sizeof(RecvFromAddr);
    local cbReceived, err = self.Socket:receiveFrom(RecvFromAddr, RecvFromAddrSize, buff, 2048);

    if not cbReceived then
        print("Error Receiving Data: ", err)
        return false, err;
    end

    if( 0 == cbReceived ) then
        return false, "Nothing received"
    end

    -- Parse the DNS response received with DNS API
    local pDnsResponseBuff = ffi.cast("DNS_MESSAGE_BUFFER*", buff);
    windns_ffi.DNS_BYTE_FLIP_HEADER_COUNTS ( pDnsResponseBuff.MessageHead );

    if pDnsResponseBuff.MessageHead.Xid ~= wID then        
        return false, "wrong transaction ID"
    end

    local pRecord = ffi.new("DNS_RECORD *[1]",nil);

    iRes = windns_ffi.DnsExtractRecordsFromMessage_W( pDnsResponseBuff, cbReceived, pRecord );
    
    pRecord = pRecord[0];
    local pRecordA = ffi.cast("DNS_RECORD *", pRecord);
    
    local function closure()
        if pRecordA == nil then
            return nil;
        end

        if pRecordA.wType == wType then
            local retVal = pRecordA
            pRecordA = pRecordA.pNext

            return retVal;
        end

        -- Find the next record of the specified type
        repeat
            pRecordA = pRecordA.pNext;
        until pRecordA == nil or pRecordA.wType == wType
    
        if pRecordA ~= nil then
            local retVal = pRecordA
            pRecordA = pRecordA.pNext
            
            return retVal;
        end

        -- Free the resources
        if pRecord ~= nil then
            windns_ffi.DnsRecordListFree( pRecord, ffi.C.DnsFreeRecordList );
        end 

        return nil
    end

    return closure
end

function DNSNameServer.A(self, domainToQuery) return self:Query(domainToQuery, ffi.C.DNS_TYPE_A) end
function DNSNameServer.MX(self, domainToQuery) return self:Query(domainToQuery, ffi.C.DNS_TYPE_MX) end
function DNSNameServer.CNAME(self, domainToQuery) return self:Query(domainToQuery, ffi.C.DNS_TYPE_CNAME) end
function DNSNameServer.SRV(self, domainToQuery) return self:Query(domainToQuery, ffi.C.DNS_TYPE_SRV) end

return DNSNameServer

Notice at the end there are some convenience functions for a few of the well known DNS record types. The ‘Query()’ function is generic, and will return records of any type. These convenience functions just make it easier.

And how to use it?

local ffi = require("ffi")
local DNSNameServer = require("DNSNameServer")
local core_string = require("core_string_l1_1_0")


--local serveraddress = "10.211.55.1"		-- xfinity
local serveraddress = "209.244.0.3" -- level 3

local domains = {
	"www.nanotechstyles.com",
	"www.adafruit.com",
	"adafruit.com",
	"adamation.com",
	"www.adamation.com",
	"microsoft.com",
	"google.com",
	"ibm.com",
	"oracle.com",
	"sparkfun.com",
	"apple.com",
	"netflix.com",
	"www.netflix.com",
	"www.us-west-2.netflix.com",
	"www.us-west-2.prodaa.netflix.com",
	"news.com",
	"hardkernel.org",
	"amazon.com",
	"walmart.com",
	"target.com",
	"godaddy.com",
	"luajit.org",
}



local function queryA()
	local function queryDomain(name)
		local dns = DNSNameServer(serveraddress) -- ms corporate
		print("==== DNS A ====> ", name)
		for record in dns:A(name) do
			local a = IN_ADDR();
    		a.S_addr = record.Data.A.IpAddress

    		print(string.format("name: %s\tIP: %s, TTL %d", name, a, record.dwTtl));
		end
	end

	for _, name in ipairs(domains) do 
		spawn(queryDomain, name)
		--queryDomain(name)
	end
end

local function queryCNAME()
	local dns = DNSNameServer(serveraddress) -- ms corporate
	local function queryDomain(name)
		print("==== DNS CNAME ====> ", name)
		for record in dns:CNAME(name) do
			print(core_string.toAnsi(record.pName), core_string.toAnsi(record.Data.CNAME.pNameHost))
		end
	end

	for _, name in ipairs(domains) do 
		queryDomain(name)
	end
end

local function queryMX()
	local function queryDomain(name)
		local dns = DNSNameServer(serveraddress) -- ms corporate
		print("==== DNS MX ====> ", name)
		for record in dns:MX(name) do
			print(core_string.toAnsi(record.pName), core_string.toAnsi(record.Data["MX"].pNameExchange))
		end
	end

	for _, name in ipairs(domains) do 
		spawn(queryDomain, name)
	end
end

local function querySRV()
	local dns = DNSNameServer(serveraddress) -- ms corporate
	for _, name in ipairs(domains) do 
		print("==== DNS SRV ====> ", name)
		for record in dns:SRV(name) do
			print(core_string.toAnsi(record.pName), core_string.toAnsi(record.Data.SRV.pNameTarget))
		end
	end
end

local function main()
  queryA();
  --queryCNAME();
  --queryMX();
  --querySRV();
end

run(main)

The function queryA() will query for the ‘A’ records, and print them out. Notice that it has knowledge of the giant union structure that contains the results, and it pulls out the specific information for ‘A’ records. It will create a new instance of the DNSNameServer for each query. That’s not as bad as it might seem. All it amounts to is creating a new UDP socket for each query. Since each query is spawned into its own task, they are all free to run and complete independently, which was the goal of this little exercise.

In the case of the CNAME query, there is only a single socket, and it is used repeatedly, serially, for each query.

The difference between the two styles is noticeable. For the serial case, the queries might ‘take a while’, because you have to wait for each result to come back before issuing the next query. In the cooperative case, you issue several queries in parallel, so the total time will only be as long as the longest query.

That’s a good outcome.

I like this style of programming. You go as low as you can to root out where the system might otherwise block, and you make that part cooperative. That way everything else above it is automatically cooperative. I also like the fact that it feels like I’m getting some parallelism, but I’m not using any of the typical primitives of parallelism, including actual threads, mutexes, and the like.

Well, that’s a hefty bit of code, and it serves the purpose I set out, so I’m a happy camper. Now, if I could just turn those unions into tables automatically…


All the pretty little asyncs…

I have gone on about various forms of async for quite some time now. So could there possibly be more? Well, yes of course!

Here’s the scenario I want to enable. I want to keep track of my file system activity, sending the various operations to a distant storage facility. I want to do this while a UI displays what’s going on, and I want to be able to configure things while its happening, like which events I really care to shadow, and the like.

I don’t want to use multiple OS level threads if I can at all avoid them as they will complicate my programming tremendously. So, what to do.

Well, first I’ll start with the file tracking business. I have talked about change journals in the past. This is a basic mechanism that Windows has to track changes to the file system. Every single open, close, delete, write, etc, has an entry in this journal. If you’re writing a backup program, you’ll be using change journals.

The essence of the change journal functionality is usage of the DeviceIoControl() function. Most of us are very familiar with the likes of CreateFile(), ReadFile(), WriteFile(), CloseHandle(), when it comes to dealing with files. But, for everything else, there is this DeviceIOControl() function.

What is a device? Well, you’d be surprised to learn that most things in the Windows OS are represented by ‘devices’ just like they are in UNIX systems. For example, ‘C:’, is a device. But, also “DISPLAY1″ is also a device, as are “LCD” and “PhysicalDisk0″. When it comes to controlling devices, the Win32 level API calls will ultimately make DeviceIoControl() calls with various parameters. That’s great to know as it allows you to create whatever API you want, as long as you know the nuances of the device driver you’re trying to control.

But, I digress. The key point here is that I can open up a device, and I can make a DeviceIoControl() call, and true to form, I can use OVERLAPPED structures, and IO Completion Ports. That makes these calls “async”, or with TINN, cooperative.

To wrap it up in a tidy little bow, here is a Device class which does the grunt work for me:

--[[
References

http://msdn.microsoft.com/en-us/magazine/cc163415.aspx

--]]
local ffi = require("ffi")
local bit = require("bit")
local bor = bit.bor;

local core_file = require("core_file_l1_2_0");
local core_io = require("core_io_l1_1_1");
local Application = require("Application")
local IOOps = require("IOOps")
local FsHandles = require("FsHandles");
local errorhandling = require("core_errorhandling_l1_1_1");
local WinBase = require("WinBase");


local Device = {}
setmetatable(Device, {
	__call = function(self, ...)
		return self:open(...)
	end,
})
local Device_mt = {
	__index = Device,
}

function Device.init(self, rawhandle)
	local obj = {
		Handle = FsHandles.FsHandle(rawhandle)
	}
	setmetatable(obj, Device_mt)
	
	Application:watchForIO(rawhandle, rawhandle)

	return obj;
end


function Device.open(self, devicename, dwDesiredAccess, dwShareMode)
	local lpFileName = string.format("\\\\.\\%s", devicename);
	dwDesiredAccess = dwDesiredAccess or bor(ffi.C.GENERIC_READ, ffi.C.GENERIC_WRITE);
	dwShareMode = bor(FILE_SHARE_READ, FILE_SHARE_WRITE);
	local lpSecurityAttributes = nil;
	local dwCreationDisposition = OPEN_EXISTING;
	local dwFlagsAndAttributes = FILE_FLAG_OVERLAPPED;
	local hTemplateFile = nil;

	local handle = core_file.CreateFileA(
        lpFileName,
        dwDesiredAccess,
        dwShareMode,
     	lpSecurityAttributes,
        dwCreationDisposition,
        dwFlagsAndAttributes,
     	hTemplateFile);


	if handle == INVALID_HANDLE_VALUE then
		return nil, errorhandling.GetLastError();
	end

	return self:init(handle)
end

function Device.getNativeHandle(self)
	return self.Handle.Handle;
end

function Device.createOverlapped(self, buff, bufflen)
	local obj = ffi.new("FileOverlapped");
	
	obj.file = self:getNativeHandle();
	obj.OVL.Buffer = buff;
	obj.OVL.BufferLength = bufflen;

	return obj;
end

function Device.control(self, dwIoControlCode, lpInBuffer, nInBufferSize, lpOutBuffer, nOutBufferSize)
	local lpBytesReturned = nil;
	local lpOverlapped = self:createOverlapped(ffi.cast("void *", lpInBuffer), nInBufferSize);


	local status = core_io.DeviceIoControl(self:getNativeHandle(), 
          dwIoControlCode, 
          ffi.cast("void *", lpInBuffer),
          nInBufferSize,
          lpOutBuffer,
          nOutBufferSize,
          lpBytesReturned,
          ffi.cast("OVERLAPPED *",lpOverlapped));

	local err = errorhandling.GetLastError();

	-- Error conditions
	-- status == 1, err == WAIT_TIMEOUT (258)
	-- status == 0, err == ERROR_IO_PENDING (997)
	-- status == 0, err == something else

	if status == 0 then
		if err ~= ERROR_IO_PENDING then
			return false, err
		end
	end

    local key, bytes, ovl = Application:waitForIO(self, lpOverlapped);

    return bytes;
end

return Device

I’ve shown this kind of construct before with the NativeFile object. That object contains Read, and Write functions as well, but lacks the control() function. Of course the two could be combined for maximum benefit.

How to use this thing?

dev = Device("c:")
dev:control(...)

OK, that’s out of the way. Now, what about this change journal thing? Very simple now that the device is handled.
A change journal can look like this:

-- USNJournal.lua
-- References
-- http://msdn.microsoft.com/en-us/library/windows/desktop/aa364563(v=vs.85).aspx
-- http://www.microsoft.com/msj/0999/journal/journal.aspx
-- http://www.microsoft.com/msj/1099/journal2/journal2.aspx
-- 

local ffi = require("ffi");
local bit = require("bit");
local bor = bit.bor;
local band = bit.band;

local core_io = require("core_io_l1_1_1");
local core_file = require("core_file_l1_2_0");
local WinIoCtl = require("WinIoCtl");
local WinBase = require("WinBase");
local errorhandling = require("core_errorhandling_l1_1_1");
local FsHandles = require("FsHandles");
local Device = require("Device")

--[[
	ChangeJournal

	An abstraction for NTFS Change journal management
--]]
local ChangeJournal = {}
setmetatable(ChangeJournal, {
	__call = function(self, ...)
		return self:open(...);
	end,
});

local ChangeJournal_mt = {
	__index = ChangeJournal;
}

ChangeJournal.init = function(self, device)
	local obj = {
		Device = device;
	}
	setmetatable(obj, ChangeJournal_mt);

	local jinfo, err = obj:getJournalInfo();

	print("ChangeJournal.init, jinfo: ", jinfo, err)

	if jinfo then
		obj.JournalID = jinfo.UsnJournalID;
		obj.LowestUsn = jinfo.LowestValidUsn;
		obj.FirstUsn = jinfo.FirstUsn;
		obj.MaxSize = jinfo.MaximumSize;
		obj.MaxUsn = jinfo.MaxUsn;
		obj.AllocationSize = jinfo.AllocationDelta;
	end

	return obj;
end


ChangeJournal.open = function(self, driveLetter)
	local device, err = Device(driveLetter)

	if not device then
		print("ChangeJournal.open, ERROR: ", err)
		return nil, err
	end

	return self:init(device);
end


ChangeJournal.getNextUsn = function(self)
	local jinfo, err = self:getJournalInfo();

	if not jinfo then
		return false, err;
	end

	return jinfo.NextUsn;
end



ChangeJournal.getJournalInfo = function(self)
	local dwIoControlCode = FSCTL_QUERY_USN_JOURNAL;
	local lpInBuffer = nil;
	local nInBufferSize = 0;
	local lpOutBuffer = ffi.new("USN_JOURNAL_DATA");
	local nOutBufferSize = ffi.sizeof(lpOutBuffer);

	local success, err = self.Device:control(dwIoControlCode, 
          lpInBuffer,
          nInBufferSize,
          lpOutBuffer,
          nOutBufferSize);

	if not success then
		return false, errorhandling.GetLastError();
	end

	return lpOutBuffer;
end

function ChangeJournal.waitForNextEntry(self, usn, ReasonMask) 
 	usn = usn or self:getNextUsn();
 	local ReasonMask = ReasonMask or 0xFFFFFFFF;
 	local ReturnOnlyOnClose = false;
 	local Timeout = 0;
 	local BytesToWaitFor = 1;

    local ReadData = ffi.new("READ_USN_JOURNAL_DATA", {usn, ReasonMask, ReturnOnlyOnClose, Timeout, BytesToWaitFor, self.JournalID});

    local pusn = ffi.new("USN");
    
    -- This function does not return until the USN
    -- record exits
	local BUF_LEN = ffi.C.USN_PAGE_SIZE;
	local Buffer = ffi.new("uint8_t[?]", BUF_LEN);
    local dwBytes = ffi.new("DWORD[1]");

	local success, err = self.Device:control(FSCTL_READ_USN_JOURNAL, 
        ReadData,
        ffi.sizeof(ReadData),
        Buffer,
        BUF_LEN);

	if not success then 
		return false, err
	end

	local UsnRecord = ffi.cast("PUSN_RECORD", ffi.cast("PUCHAR",Buffer) + ffi.sizeof("USN")); 

    return UsnRecord;
end

return ChangeJournal;

This very much looks like the change journal I created a few months back. The primary difference is the device control stuff is abstracted out into the Device object, so it does not need to be repeated here.

When we want to track the changes to the device, we make repeated calls to ‘waitForNextEntry’.

local function test_waitForNextEntry(journal)
    local entry = journal:waitForNextEntry();

    while entry do
        printJournalEntry(entry);
        entry = journal:waitForNextEntry();
    end
end

This is your typical serially written code. There’s nothing that look special about it, no hint of async processing. Behind the covers though, way back in the Device:control() function, the actual sending of a command to the device happens using IO Completion Port, so if you’re running with TINN, this particular task will ‘waitForIO’, and other tasks can continue.

So, using it in context looks like this:

local function main()
    local journal, err = ChangeJournal("c:")

    spawn(test_waitForNextEntry, journal);
    periodic(function() print("tick") end, 1000)
end

run(main)

In this case, I spawn the journal waiting/printing thing as its own task. Then I setup a periodic timer to simply print “tick” every second, to show there is some activity.

Since the journaling is cooperative (mostly waiting for io control to complete), the periodic timer, or UI processing, or what have you, is free to run, without any hindrance.

Combine this with the already cooperative UI stuff, and you can imagine how easy it could be to construct the scenario I set out to construct. Since all networking and file system operations in TINN are automatically async, it would be easy to log these values, or send them across a network to be stored for later analysis or what have you.

And there you have it. Async everywhere makes for some very nice scenarios. Being able to do async on any device, whether with standard read/write operations, or io control, makes for very flexible programming.

Next time around, I’m going to show how to do async DNS queries for fun and profit.


TINN Version 0.7 Available

Although TINN is under constant development, there’s nothing like declaring a new “release”. It’s been 3 months since the 0.6 release. So, now there is a 0.7 release. You can read about the TINN v0.7 Release and install it if you would like.

There were 84 commits since the previous release, so I can’t even remember all the changes that were involved. The major addition from my most recent work has to do with the new scheduler as described on this blog. That’s the extensible, plug-in driven scheduler. Pretty nifty for my work at least.

There are quite a few additions, such as a revamped stream object, io completion port supported file interface, a logfile thing, and quite a few more interfaces from the OS.

Other items I have been working on include support for various COM interfaces such as DXGI, DXVA, MMDevice and some others. There are all in the “experimental” folder if you look at the enlistment. They are not quite ready for prime time, so they’re not actually in the v0.7 release.

What can you do with TINN? In short, you can create all sorts of Windows based applications. Everything from scalable web services to interactive multi-tasking UI (including OpenGL based).

TINN is a command line tool (tinn.exe). As such, the easiest thing to do is bring up a command line shell and run various scripts through tinn.

c:\> tinn.exe hello.lua

The TINN repository contains numerous test cases that utilize the various modules of TINN.

That’s it for now.  Next release will be about cleanup and stabilization primarily.


Parallel Conversations

I have been tinkering with words of late. I’ve added ‘waitFor’, ‘when’ and ‘whenever’ to the TINN lexicon, and my programming is becoming easier, more organized, and easier to describe.

Recently I added another set of words: ‘waitSignal’, ‘signalOne’, and ‘signalAll’. If you were using another system, you might call these ‘events’, but really they’re just signaling.

Here’s how I might use them:

local Application = require("Application")(true)

local function waiter(num)
  num = num or 0

  local function closure()
    print(string.format("WAITED: %d",num))
  end

  return closure;
end

local function main()

  for i=1,4 do
    onSignal(waiter(i), "waiting")
  end

  -- signal only one of them
  signalOne("waiting")

  -- sleep a bit giving other tasks
  -- a chance to run
  sleep(500)

  -- signal the rest of them	
  signalAll("waiting"))
  sleep(2000)
	
  print("SLEEP AFTER signalAll")
end

run(main)

First, it spawns 4 tasks which will all be waiting on the ‘waiting’ signal. With ‘signalOne’, only 1 of the 4 waiting tasks will be awakened and continue execution. With the ‘signalAll’, the rest of the waiting tasks are all scheduled to run, and thus continue from where they left of.

The core primitive for signaling is the ‘waitSignal()’ function. This will essentially suspend the currently running task until the specified signal has been given. The ‘onSignal()’ function is a convenience which will spawn a separate task which will do the waiting and execute the specified function at the appropriate time.

If you were using another language, these might be ‘onEvent()’ and ‘emit()’. Of course it’s really easy to code up that way, just create a name alias to whatever you like. You could even do:

local function on(eventName, func)
  return onSignal(func, eventName)
end

So, these are some more words that are in the parallel programming arsenal. In totality, the set is:

Time related

  • sleep
  • delay
  • periodic

Predicates

  • waitFor
  • when
  • whenever

Signaling

  • onSignal
  • signalOne
  • signalAll
  • waitSignal

Task and scheduler

  • run
  • stop
  • spawn
  • yield

With these words in hand, doing my multi-task programming is becoming easier by the moment. I don’t worry about the lower level multi-tasking primitive such as mutexes, locks, and the like. I’m not really that concerned with timers, thread pools, or any of the other machinery that makes this all happen. I just write my simple code.

One thing has struck me though. This is all great for a single threaded environment. Basically collaborative multi-tasking. In this age of super fast CPUs, it turns out that going single threaded is just fine. In many cases it’s actually better than going preemptive multi-threaded because you don’t necessarily incur as much OS thread level context switching.

I’d like to go one step further though. A long time ago we didn’t have L1/L2 caches on the CPUs, then they came along, got better, and now are fairly large. As a programmer, my concern for them is fairly minimal. I don’t do much to ensure that the caches are used properly. I just write my code, and either the compiler, or the CPU itself deals with all the mechanics. Same goes with prefetching instructions and the like. I don’t write my code to make that job any easier, it just happens automagically.

Here I am contemplating the same thing about threads. As my CPU has increasingly more cores, I am wondering if the CPU cycles should be viewed the same as we viewed levels of memory in the past. Is the a C1/C2 level of core execution. That is, most things should stay on the same thread because switching to a different core is costly. Something very low level should decide on when that switching occurs.

My application/runtime can know a lot about what I’m going to do because the thing is all laid out in script. After some analysis, perhaps I can decide “this thing is going to sleep for more than 500 ms, therefore I’m going to actually put it over here on this other thread which is actually sleeping.”. In that way, I can reduce my resource usage because I won’t be polling the timer every time through the scheduling loop, I can be smarter.

If I had such smartness, then when I have 16 – 100 cores, I’ll be able to easily take advantage of those multiple cores without having to truly understand and navigate that madness on my own. Let the machine figure it out, it’s much better at that than I am.

In the meanwhile, I’ve got these multi-tasking primitives which are making my life a lot easier when it comes to programming fairly complex systems.


Lua Coroutine Roundup

My WordPress dashboard says this is my 250th post. I figure what better way to mark the occasion than to do a little bit of a roundup on one of my favorite topics.

One of the most awesome features of the Lua language is a built in co-routine mechanism. I’ve talked about co-routines quite a bit, doing a little series on the basics of coroutines, all the way up through a scheduler for multi-tasking.

Coroutines give the programmer the illusion of creating a parallel processing environment. Basically, write up your code “spawn” different coroutines, and then just forget about them… Well, almost. The one thing the Lua language does not give you is a way to manage those coroutines. Co-routines are a mechanism by which ‘threads’ can do cooperative multi-tasking. This is different from the OS level preemptive multitasking that you get with ‘threads’ on most OS libraries. So, I can create coroutines, resume, and yield. It’s the ‘yield’ that gives coroutines the multi-tasking flavor. One co-routine yields, and another co-routine must be told to ‘resume’. Something has to do the telling, and keep track of who’s yielded, and who needs to be resumed. In steps the scheduler.

Summaries from my own archives
Lua Coroutines – Getting Started

Multitask UI Like it’s 1995

Hurry Up and Wait – TINN Timing

Computicles – A tale of two schedulers

Parallel Computing is Child’s Play

Multitasking single threaded UI – Gaming meets networking

Alertable Predicates? What’s that?

Pinging Peers – Creating Network Meshes

From the Interwebs
GitHub Gist: Deco / coroutine_scheduler.lua

Beginner’s Guide to Coroutines – Roblox Wiki

LOOP: Thread Scheduler

LuaAV

And of course the search engines will show you thousands more:

lua coroutine scheduler

I think it would be really great if the Lua language itself had some form of rudimentary scheduler, but scheduling is such a domain specific thing, and the rudiments are so basic, that it’s possibly not worth providing such support. The Go language supports a basic scheduler, and has coroutine support, to great effect I think.

The simple scheduler is simple, just do a round robbin execution of one task after the next as each task yields. I wrote a simple dispatcher which started with exactly this. Then interesting things start to happen. You’d like to have your tasks go into a wait state if they’re doing an IO operation, and not be scheduled again until some IO has occurred. Or, you want to put a thread to sleep for some amount of time. Or, you want a thread to wait on a condition of one sort or another, or signals, or something else. Pretty soon, you basic scheduler is hundreds of lines of code, and things start to get rather complicated.

After going round and round in this problem space, I finally came up with a solution that I think has some legs. Rather than a complex scheduler, I promote a simple scheduler, and add functions to it through normal thread support. So, here is my final ‘scheduler’ for 2013:


local ffi = require("ffi");

local Collections = require("Collections");
local StopWatch = require("StopWatch");


--[[
	The Scheduler supports a collaborative processing
	environment.  As such, it manages multiple tasks which
	are represented by Lua coroutines.
--]]
local Scheduler = {}
setmetatable(Scheduler, {
	__call = function(self, ...)
		return self:create(...)
	end,
})
local Scheduler_mt = {
	__index = Scheduler,
}

function Scheduler.init(self, scheduler)
	local obj = {
		Clock = StopWatch();

		TasksReadyToRun = Collections.Queue();
	}
	setmetatable(obj, Scheduler_mt)
	
	return obj;
end

function Scheduler.create(self, ...)
	return self:init(...)
end

--[[
		Instance Methods
--]]
function Scheduler.tasksArePending(self)
	return self.TasksReadyToRun:Len() > 0
end

function Scheduler.tasksPending(self)
	return self.TasksReadyToRun:Len();
end


function Scheduler.getClock(self)
	return self.Clock;
end


--[[
	Task Handling
--]]

function Scheduler.scheduleTask(self, afiber, ...)
	if not afiber then
		return false, "no fiber specified"
	end

	afiber:setParams(...);
	self.TasksReadyToRun:Enqueue(afiber);	
	afiber.state = "readytorun"

	return afiber;
end

function Scheduler.removeFiber(self, fiber)
	--print("DROPPING DEAD FIBER: ", fiber);
	return true;
end

function Scheduler.inMainFiber(self)
	return coroutine.running() == nil; 
end

function Scheduler.getCurrentFiber(self)
	return self.CurrentFiber;
end

function Scheduler.step(self)
	-- Now check the regular fibers
	local task = self.TasksReadyToRun:Dequeue()

	-- If no fiber in ready queue, then just return
	if task == nil then
		return true
	end

	if task:getStatus() == "dead" then
		self:removeFiber(task)

		return true;
	end

	-- If the task we pulled off the active list is 
	-- not dead, then perhaps it is suspended.  If that's true
	-- then it needs to drop out of the active list.
	-- We assume that some other part of the system is responsible for
	-- keeping track of the task, and rescheduling it when appropriate.
	if task.state == "suspended" then
		return true;
	end

	-- If we have gotten this far, then the task truly is ready to 
	-- run, and it should be set as the currentFiber, and its coroutine
	-- is resumed.
	self.CurrentFiber = task;
	local results = {task:resume()};

	-- no task is currently executing
	self.CurrentFiber = nil;

	-- once we get results back from the resume, one
	-- of two things could have happened.
	-- 1) The routine exited normally
	-- 2) The routine yielded
	--
	-- In both cases, we parse out the results of the resume 
	-- into a success indicator and the rest of the values returned 
	-- from the routine
	local success = results[1];
	table.remove(results,1);


	--print("SUCCESS: ", success);
	if not success then
		print("RESUME ERROR")
		print(unpack(results));
	end

	-- Again, check to see if the task is dead after
	-- the most recent resume.  If it's dead, then don't
	-- bother putting it back into the readytorun queue
	-- just remove the task from the list of tasks
	if task:getStatus() == "dead" then
		--print("Scheduler, DEAD coroutine, removing")
		self:removeFiber(task)

		return true;
	end

	-- The only way the task will get back onto the readylist
	-- is if it's state is 'readytorun', otherwise, it will
	-- stay out of the readytorun list.
	if task.state == "readytorun" then
		self:scheduleTask(task, results);
	end
end


--[[
	Primary Interfaces
--]]

function Scheduler.suspend(self, aTask)
	if not aTask then
		self.CurrentFiber.state = "suspended"
		return self:yield()
	end

	aTask.state = "suspended";

	return true
end

function Scheduler.yield(self, ...)
	return coroutine.yield(...);
end


--[[
	Running the scheduler itself
--]]
function Scheduler.start(self)
	if self.ContinueRunning then
		return false, "scheduler is already running"
	end
	
	self.ContinueRunning = true;

	while self.ContinueRunning do
		self:step();
	end
	--print("FINISHED STEP ITERATION")
end

function Scheduler.stop(self)
	self.ContinueRunning = false;
end

return Scheduler

At 195 LOC, this is back down to the basic size of the original simple dispatcher that I started with. There are only a few functions to this interface:

  • scheduleTask()
  • step()
  • suspend()
  • yield()
  • start()

These are the basics to do some scheduling of tasks. There is an assumption that there is an object that can be scheduled, such that ‘scheduleTask’ can attach some metadata, and the like. Other than that, there’s not much here but the basic round robin scheduler.

So, how do you make a more complex scheduler, say one that deals with time? Well, first of all, I’ve also added an Application object, which actually contains a scheduler, and supports various other things related to an application, including add ons to the scheduler. Here’s an excerpt of the Application object construction.

function Application.init(self, ...)
	local sched = Scheduler();

	waitForIO.MessageQuanta = 0;

	local obj = {
		Clock = Stopwatch();
		Scheduler = sched;
		TaskID = 0;
		wfc = waitForCondition(sched);
		wft = waitForTime(sched);
		wfio = waitForIO(sched);
	}
	setmetatable(obj, Application_mt)

	-- Create a task for each add-on
	obj:spawn(obj.wfc.start, obj.wfc)
	obj:spawn(obj.wft.start, obj.wft)
	obj:spawn(obj.wfio.start, obj.wfio)

	return obj;
end

In the init(), the Application object creates instances for the add ons to the scheduler. These add-ons (waitForCondition, waitForTime, waitForIO) don’t have to adhere to much of an interface, but you do need some function which is callable so that it can be spawned. The Application construction ends with the spawning of the three add-ons as normal threads in the scheduler. Yep, that’s right, the add ons are just like any other thread, running in the scheduler. You might think; But these things need to run with a higher priority than regular threads! And yes, in some cases they do need that. But hay, that’s an exercise for the scheduler. If the core scheduler gains a ‘priority’ mechanism, then these threads can be run with a higher priority, and everything is good.

And what do these add ons look like? I’ve gone over the before, but here’s an example of the timing one:

local Functor = require("Functor")
local tabutils = require("tabutils");


local waitForTime = {}
setmetatable(waitForTime, {
	__call = function(self, ...)
		return self:create(...)
	end,
})

local waitForTime_mt = {
	__index = waitForTime;
}

function waitForTime.init(self, scheduler)
	local obj = {
		Scheduler = scheduler;
		TasksWaitingForTime = {};
	}
	setmetatable(obj, waitForTime_mt)

	--scheduler:addQuantaStep(Functor(obj.step,obj));
	--scheduler:spawn(obj.step, obj)

	return obj;
end

function waitForTime.create(self, scheduler)
	if not scheduler then
		return nil, "no scheduler specified"
	end

	return self:init(scheduler)
end

function waitForTime.tasksArePending(self)
	return #self.TasksWaitingForTime > 0
end

function waitForTime.tasksPending(self)
	return #self.TasksWaitingForTime;
end

local function compareTaskDueTime(task1, task2)
	if task1.DueTime < task2.DueTime then
		return true
	end
	
	return false;
end

function waitForTime.yieldUntilTime(self, atime)
	--print("waitForTime.yieldUntilTime: ", atime, self.Scheduler.Clock:Milliseconds())
	--print("Current Fiber: ", self.CurrentFiber)
	local currentFiber = self.Scheduler:getCurrentFiber();
	if currentFiber == nil then
		return false, "not currently in a running task"
	end

	currentFiber.DueTime = atime;
	tabutils.binsert(self.TasksWaitingForTime, currentFiber, compareTaskDueTime);

	return self.Scheduler:suspend()
end

function waitForTime.yield(self, millis)
	--print('waitForTime.yield, CLOCK: ', self.Scheduler.Clock)

	local nextTime = self.Scheduler.Clock:Milliseconds() + millis;

	return self:yieldUntilTime(nextTime);
end

function waitForTime.step(self)
	--print("waitForTime.step, CLOCK: ", self.Scheduler.Clock);

	local currentTime = self.Scheduler.Clock:Milliseconds();

	-- traverse through the fibers that are waiting
	-- on time
	local nAwaiting = #self.TasksWaitingForTime;
--print("Timer Events Waiting: ", nAwaiting)
	for i=1,nAwaiting do

		local fiber = self.TasksWaitingForTime[1];
		if fiber.DueTime <= currentTime then
			--print("ACTIVATE: ", fiber.DueTime, currentTime);
			-- put it back into circulation
			-- preferably at the front of the list
			fiber.DueTime = 0;
			--print("waitForTime.step: ", self)
			self.Scheduler:scheduleTask(fiber);

			-- Remove the fiber from the list of fibers that are
			-- waiting on time
			table.remove(self.TasksWaitingForTime, 1);
		end
	end
end

function waitForTime.start(self)
	while true do
		self:step();
		self.Scheduler:yield();
	end
end

return waitForTime

The ‘start()’ function is what was scheduled with the scheduler, so that’s the thread that’s actually running. As you can see, it’s just a normal cooperative task.

The real action comes from the ‘yield()’ and ‘step()’ functions. The ‘yield()’s purpose in life is to suspend the current task (take it out of the active running list of the scheduler), and setup the appropriate stuff such that it can be activated later. The purpose of the “step()” function is to go through the list of tasks which are currently suspended, and determine which ones need to be scheduled again, because their time signal has been met.

And that’s how you add items such as “sleep()” to the system. You don’t change the core scheduler, but rather, you add a small module which knows how to deal with timed events, and that’s that.

The application object comes along and provides some convenience methods that can easily be turned into globals, so that you can easily type things like:

delay(function() print("Hello, World!") end, 5000)  -- print "Hello, World! after 5 seconds
periodic(function() print("Hi There!") end, 1000)  -- print "Hi, There!" every second

And there you have it. The core scheduler is a very simple thing. Just a ready list, and some algorithm that determines which one of the tasks in the list gets to run next. This can be as complex as necessary, or stay very simple.

Using an add on mechanism, the overall system can take on more robust scenarios, without increasing the complexity of the individual parts.

This simplified design satisfies all the scenario needs I currently have from high throughput web server to multi-tasking UI with network interaction. I will improve the scheduling algorithm, and probably make that a pluggable function for ease.

Lua coroutines are an awesome feature to have in a scripting language. With a little bit of work, a handy scheduler makes multi-task programming a mindless exercise, and that’s a good thing.

Happy New Year!


Parallel Computing is Child’s Play

Recently, my wife and I brought a new son into the world!  And that got me to thinking.

Some years back, I did some self study on linguistics and language development.  At the time, I was just doing some thinking on how best to communicate, how to write programs that are composable, just like spoken sentences.  One of things I learned from one of the books had to do with how babies learn language.  As one theory goes, they develop a core internal language first.  These are their early bits of associations and linkages.  Everything they learn after that is expressed using the core primitives they developed early on.

Armed with this knowledge, I went away and taught software engineering for a few years.  One of my core tenets was; software programming is nothing more than a specific conversation.  As software engineers, we are in the business of translating intentions from one domain to the next.  We talk to customers, who speak in their own language, and we have to eventually get that translated down into C, or assembly, or whatever.  The more closely the customer’s language matches the machine’s, the easier the translation, and less margin for error.  But, people don’t speak in machine code.

So what then?  Well, then comes the task of the various frameworks and language tools that we use as software engineers to translate from the high level human language to the very narrow and specific machine language.  At the lowest level, we might use C, or Javascript, or C#, or Lua.  Then on top of that there are innevitably various frameworks which help us express ideas in a more meaningful way, without having to work purely with the lower level primitives.  And so on and so forth.

What I’ve been exploring recently is how best to do parallel programming.  At the very core, my machine has a multi-core processor.  There is an OS, and I have a bunch of C based APIs to get various things done.  In the Lua language I have co-routines, which help in making tasks ‘parallel’, or at least cooperative.

Atop this core, I have built a scheduler in the form of the TINN environment, and thus far I can achieve quite a lot of apparent parallelism with the core primitives such as timer, waitFor, sleep, and the like.  Now I want to add some more higher level primitives.

There are two cases I want to cover.  The first is ‘when’.  That is, when some condition is met, I want to perform some action.  This is a natural part of English at least.  I can easily imagine having a conversation with someone where they’re trying to tell me what they need some machine to do, let’s say scheduling some event to occur:

“When ‘the first person comes into the factory’ ‘turn on all the machines’”

The first part of this sentence establishes the condition.  The second part specifies the action that is to occur once the condition is met.

Easy enough.  If I were to write it in code, it might look like:

waitFor(firstPersonArrives);
turnOnAllMachines();

That’s great, and you could already do this in TINN. But, the language doesn’t quite match the words that came out of my mouth. What I really want to write is the following:

when(firstPersonArrives, turnOnAllMachines)

Or even better, with some syntactic sugar

when firstPersonArrives do
  turnOnAllMachines()
end

I need a primitive ‘when’ which will take a predicate as its first argument, and an action as its second. Perhaps something like this:

local Task = require("IOProcessor")

local when = function(pred, func)
  local watchit = function()
    Task:yieldUntilPredicate(pred)
    func()
  end

  Task:spawn(watchit)
end

That will do the trick, and give me the function I need. What’s happening? Well, the when() function takes the predicate and the function, and spawns a task with a ‘waitFor()’ in it already. It’s the combination of the spawn() and waitFor() which essentially creates a parallel task, without the programmer having to deal with the very specifics of parallelism. All the programmer has to focus on is the cause and effect. Code that up, and let it go.

Having the ‘when’ in my vocabulary as a programmer is great. It enables a much easier conversation when I’m talking with someone. Whereas ‘when’ is great for covering a single event, the word “whenever” is great at covering multiple occurences. For example, a natural conversation might go like this:

Water the grass every tuesday

In code, it might be:

whenever(itIsTuesday, waterTheGrass)

Well, this is just a slight modification of the ‘when’ primitive. Basically, after the trigger event, spawn another task to wait on the exact same thing again.

local whenever = function(pred, func)
  local watchit = nil;
  watchit = function()
    Task:yieldUntilPredicate(pred)
    func()
    Task:spawn(watchit)
  end

  Task:spawn(watchit)
end

I think this is pretty nifty myself. Here is one contrived example of using these primitives. The English explanation would be:

Every 5 clock ticks, say “Halleluja!”
After the end of 25 clock ticks, stop.

That’s pretty easy to understand. How hard is it to code?

-- test_when.lua

local Task = require("IOProcessor")
local Timer = require("Timer")
local parallel = require("parallel")()


local counter = 0;

local function incrementCounter()
	counter = counter + 1;

	print("Counter: ", counter)
end

local function timeRunsOut()
	if counter >= 25 then
		return true;
	end

	return false;
end

local function stopWorld()
  print("Goodbye World!")
  Task:stop();
end


local lasttrigger = 0;
local function counterHits5()

  if counter % 5 == 0 then
    if counter ~= lasttrigger then
      lasttrigger = counter
      return true;
    end
  end

  return false;
end

local function sayHalleluja()
  print("Halleluja!")
end


local function main()
  -- Create a timer with the task of incrementing
  -- the counter every few milliseconds
  local t1 = Timer({Period=500, OnTime=incrementCounter})
	
  -- start the task to sing whenever we hit 5 ticks
  whenever(counterHits5, sayHalleluja);

  -- start the task to stop after so many ticks
  when(timeRunsOut, stopWorld);
end

run(main)

I find the ‘when’ and ‘whenever’ constructs to be fairly useful when trying to create code that I intend to run in ‘parallel’. By ‘parallel’ here, I mean they are fairly self contained units which may or may not actually run on multiple cores, but at the very least, I can think of them conceptually as independent actions.

Thus far in my programming career, I’ve made fairly good usage of the ‘for’, ‘do-while’, ‘while’, and various other control structures. Now that I’m doing more parallel and async programming, I find the addition of the ‘when’ and ‘whenever’ primitives to be a fairly useful construct for describing how things should work in a parallel universe. I can think of my atomic parallel nuggets, and then just code those nuggets serially. Then, I can use the when, whenever, and spawn constructs to stitch together an entire system.

So, there you have it. As children build up their language skills based on an internal dialog, so too must my programming language skills be built. The ‘when’, ‘whenever’ and ‘spawn’ are the core primitives of my parallel programming inner dialog.


Alertable Predicates? What’s that?

TINN does async on I/O without much fuss.  It even allows you to suspend a ‘fiber’ by putting it to sleep.  You can easily execute a function on a timer, etc.  These are all good tools to have in the parallel programming toolbox.  There is one tool that has been missing for me though.  The fact that I can run things in parallel means that I need a mechanism to synchronize when some of those tasks complete.  I don’t want to introduce callbacks, because that leads down a whole different path.

I want to execute code that looks something like this:

local finalGoal = function()
  waitFor(goal1)
print("Goal 1 Attained.")
  waitFor(goal2)
print("Goal 2 Attained..")
  waitFor(goal3)
print("Goal 3 Attained...")
print("All Goals Attained!!")

  -- Once all the goals have been met
  -- stop the scheduler.
  stop();
end

The key here is the ‘waitFor()’ function call. You call this function with a ‘predicate’. A predicate is nothing more than a function that returns true or false. If it returns ‘false’ then the condition of the predicate has not been met, and the fiber will not continue. Once the predicate returns ‘true’, the fiber will continue from the point directly after it called ‘waitFor()’. So, to see this sample in its entirety:

-- test_waitFor.lua

local IOProcessor = require("IOProcessor")
local Timer = require("Timer")

local count = 0;


local goal1 = function()
  return count >= 5;
end

local goal2 = function()
  return count >= 10;
end

local goal3 = function()
  return count >= 15;
end

local finalGoal = function()
  waitFor(goal1)
print("Goal 1 Attained.")
  waitFor(goal2)
print("Goal 2 Attained..")
  waitFor(goal3)
print("Goal 3 Attained...")
print("All Goals Attained!!")

  -- Once all the goals have been met
  -- stop the scheduler.
  stop();
end

-- setup a timer to increment the count
-- every 500 milliseconds
local incrTimer = Timer({Period = 500; OnTime = function(timer) count = count+1 end})

spawn(finalGoal)	

run()

This is a contrived case, but it’s illustrative. Down at the bottom, there is a timer being setup with an anonymous function. The timer is set to go off every 500 milliseconds. Once it does, it will execute the associated code, and the count variable will be incremented by 1. That’s all the time needs to be concerned with.

There are three predicate functions setup, goal1, goal2, and goal3. Each one of these predicates checks the value of the count variable, and returns true or false depending on the criterial of the goal.

There is a separate function, ‘finalGoal’. This one is spawned separately, and it will wait for each of the goals to be met in turn. Which order it does these does not really matter in this case. Once all of the goals are met, the scheduler will be stopped and the program will end.

Those predicates can be anything really. They can make calls out to the internet, they can count, they can check the value of any variable, they can check any state of the running process. Whatever you want can be a predicate. Perhaps you want to stop accepting new connections on a web server once a memory limit has been reached. Whatever.

Having the finalGoal() as a separate fiber is useful because the whole fiber will essentially ‘block’ until the goals are all met. This allows you to have other parallel fibers running at the same time, and they will go along their merry way while finalGoal() is waiting for its conditions to be met.

The implementation of this predicate yielding is pretty straight forward. Within the scheduler, there are only two new functions:

IOProcessor.yieldUntilPredicate = function(self, predicate)
	if self.CurrentFiber~= nil then
		self.CurrentFiber.Predicate = predicate;
		table.insert(self.FibersAwaitingPredicate, self.CurrentFiber)

		return self:yield();
	end

	return false;
end

IOProcessor.stepPredicates = function(self)
	local nPredicates = #self.FibersAwaitingPredicate

	for i=1,nPredicates do
		local fiber = self.FibersAwaitingPredicate[1];
		if fiber.Predicate() then
			fiber.Predicate = nil;
			self:scheduleFiber(fiber);

			table.remove(self.FibersAwaitingPredicate, 1)
		end
	end
end

Then, waitFor() is just a shorthand for yieldUntilPredicate():

waitFor = function(predicate)
  return IOProcessor:yieldUntilPredicate(predicate)
end

The predicates are checked every time through the scheduler loop:

IOProcessor.step = function(self)
	self:stepTimeEvents();
	self:stepFibers();
	self:stepIOEvents();
	self:stepPredicates();
.
.
.

This is essentially the same as installing a very high priority fiber because all code for all predicates will run every time through the scheduler loop. The predicates do not need to be ‘scheduled’ like other tasks. A predicate should probably be a fairly short bit of code that runs quickly, and is not very resource intensive. They should be used sparingly.

Quite a few years ago, I was involved with the creation of LinQ in the .net frameworks. At that time, I had this nagging notion that if we could only add a ‘when predicate do’ construct to the CLR, that would be a fairly useful thing. I think using predicates for control flow is especially useful for parallel programming. I also think this construct is easier to use and implement than various other forms of barriers and whatnot.

So, there you have it. TINN supports async IO, timers, yielding on time, and yielding on predicates. Perhaps a predicate can be used to bridge the gap between the normal scheduler, and the event loop that is presented by the typical keyboard and mouse loop.

 


Pinging Peers – Creating Network Meshes

I have the need to create a mesh of network nodes on occasion. The configuration of the mesh is typically a set of VMs running on some cloud service or another. The code running on these VMs may perform some basic task, probably similar across all the VMs. They do need to communicate information with each other, and to do that, they need to maintain a list of their peers and the latencies between them, and the like.

So, I started modeling this thing called the Boundary Gateway Protocol (BGP), which is one of the ancient low lying protocols of the internet. I model the protocol by first creating an object BGPPeer. The intention of this object is to act as both the host and client of the fundamental mesh communications. It looks like this:

-- BGProtocol.lua
--[[
	Implementation of Boundary Gateway Protocol (BGP)
--]]

local ffi = require("ffi");
local Timer = require("Timer")
local IOCPSocket = require("IOCPSocket");
local SocketUtils = require("SocketUtils")
local Network = require("Network")

--[[
	BGP Relies on the presence of a fully connected graph
	The Protocol object is initialized by reading a list of nodes
	and creating the right channels between each of the nodes.
--]]

--[[
print("ComputerNameNetBIOS: ", Network.getHostName(ffi.C.ComputerNameNetBIOS))
print("ComputerNameDnsHostname: ", Network.getHostName(ffi.C.ComputerNameDnsHostname))
print("ComputerNameDnsDomain: ", Network.getHostName(ffi.C.ComputerNameDnsDomain))
print("ComputerNameDnsFullyQualified: ", Network.getHostName(ffi.C.ComputerNameDnsFullyQualified))
print("ComputerNamePhysicalNetBIOS: ", Network.getHostName(ffi.C.ComputerNamePhysicalNetBIOS))
print("ComputerNamePhysicalDnsHostname: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsHostname))
print("ComputerNamePhysicalDnsDomain: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsDomain))
print("ComputerNamePhysicalDnsFullyQualified: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsFullyQualified))
--]]

local selfhost = Network.getHostName(ffi.C.ComputerNameDnsHostname):lower()
--print("Self Host: ",selfhost)

local BGPPeer = {
  HeartbeatInterval = 50 * 1000;	-- send out a hearbeat on this interval
  UdpPort = 1313;					-- port used to communicate UDP
  EliminateSelf = true;			-- don't consider self to be a peer
}
setmetatable(BGPPeer, {
  __call = function(self, ...)
    return self:create(...)
  end,
})

local BGPPeer_mt = {
  __index = BGPPeer,
}

BGPPeer.init = function(self, config)
  local err
  local obj = {
    Config = config,
    Peers = {},
  }

  -- if there is config information, then use
  -- it to setup sockets to the peer
  if config then
    -- create the client socket
    obj.Name = config.name;
    obj.UdpSender, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
    obj.UdpPeerAddress, err = SocketUtils.CreateSocketAddress(config.host, config.udpport or BGPPeer.UdpPort)


    obj.UdpPeerAddressLen = ffi.sizeof(obj.UdpPeerAddress);
  end

  setmetatable(obj, BGPPeer_mt)

  return obj;
end

BGPPeer.create = function(self, configfile)
  -- load the configuration file
  local fs, err = io.open(configfile, "rb")
	
  if not fs then return nil, "could not load file" end

  local configdata = fs:read("*all")
  fs:close();
	
  local func, err = loadstring(configdata)
  local config = func()

  -- create self, so peers can be added
  local obj = self:init()

  -- add the udp socket
  -- Setup the server socket
  obj.UdpListener, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
  if not obj.UdpListener then
    return nil, err;
  end

  local success, err = obj.UdpListener:bindToPort(BGPPeer.UdpPort);

  if not success then
    return nil, err;
  end

  -- for each of the peers within the config 
  -- create a new peer, and add it to the 
  -- list of peers for the first object
  for _,peerconfig in ipairs(config) do
		--print("PEER: ", peerconfig.name, peerconfig.host)
    if peerconfig.name:lower() ~= selfhost then
      obj:createPeer(peerconfig)
    end
  end

  -- Need to setup a listener for the UDP port

  -- add the heartbeat timer
  obj.HeartbeatTimer = Timer({
		Delay =  BGPPeer.HeartbeatInterval;
		Period = BGPPeer.HeartbeatInterval;
		OnTime = obj:onHeartbeat();})

  -- start a fiber which will run the UDP listener loop
  spawn(obj:handleUdpListen())

  return obj;
end

BGPPeer.onHeartbeat = function(self)
  local message = "HELLO FROM: "..Network:getHostName();
  local closure = function(timer)

  for peername, peer in pairs(self.Peers) do
    local bytessent, err = peer.UdpSender:sendTo(peer.UdpPeerAddress, peer.UdpPeerAddressLen, 
      message, #message);

    end
  end

  return closure
end

BGPPeer.addPeer = function(self, peer)
  self.Peers[peer.Name] = peer
end

BGPPeer.createPeer = function(self, peerconfig)
  self:addPeer(BGPPeer:init(peerconfig));
end

local count = 0;
BGPPeer.onReceiveHeartBeat = function(self, buff, bytesread)
  count = count + 1
  print("onReceiveHeartBeat: ", count, ffi.string(buff, bytesread));
end


BGPPeer.handleUdpListen = function(self)
  local bufflen = 1500;
  local buff = ffi.new("uint8_t[1500]");
  local from = sockaddr_in();
  local fromLen = ffi.sizeof(from);


  local closure = function()

    while true do
      local bytesread, err = self.UdpListener:receiveFrom(from, fromLen, buff, bufflen);

      if not bytesread then
        print("receiveFrom ERROR: ", err)
        return false, err;
      end

      self:onReceiveHeartBeat(buff, bytesread)
    end
  end

  return closure
end

return BGPPeer

There are a couple things of note here. One of the basic problems I need to solve is the fact that this is not a basic client/server app, but rather a peer to peer setup. As such, each peer both listens on a port, and sends data.

A typical network ‘server’ app may block on listening, and not perform any actions until it receives a request from a ‘client’. In my case, I don’t want to block, but I do want to initiate a ‘blocking’ loop in parallel with some non-blocking stuff.

In this BGPPeer, there is the handleUdpListen() function. This function is the heart of the Udp listening loop, or the “server” side of the peer communication. This function is run in parallel as it is invoked in the constructor of the object using this:

  spawn(obj:handleUdpListen())

In this particular case, the handleUdpListen() function contains a closure (another function) which is the function that is actually returned, and spawned in its own fiber. The function is pretty straight forward.

  local closure = function()

    while true do
      local bytesread, err = self.UdpListener:receiveFrom(from, fromLen, buff, bufflen);

      if not bytesread then
        print("receiveFrom ERROR: ", err)
        return false, err;
      end

      self:onReceiveHeartBeat(buff, bytesread)
    end
  end

Basically, just run a loop forever, waiting to receiveFrom some Udp packet. Since in TINN all IO calls are cooperative, while this receiveFrom is ‘blocked’, the current coroutine is actually yielding, so that other work can continue. That’s great, that’s exactly what I want. So, now the listening side is setup.

How about the sending side? I want to send out a formatted packet on a regular interval. Since TINN has a Timer object, this is a pretty straight forward matter. Within the constructor of the BGPPeer, we find the following:

  -- add the heartbeat timer
  obj.HeartbeatTimer = Timer({
		Delay =  BGPPeer.HeartbeatInterval;
		Period = BGPPeer.HeartbeatInterval;
		OnTime = obj:onHeartbeat();})

With this setup, the timer will execute the ‘onHeartbeat()’ function every “Period”. This is some number of milliseconds. The default is 50 seconds, but it can be configured to any value.

As we saw from the last article on Timers, this timer object will start automatically, and run cooperatively with anything else running in the process. That’s great. So, onHeartbeat just needs to send out information to all its peers whenever it is called.

BGPPeer.onHeartbeat = function(self)
  local message = "HELLO FROM: "..Network:getHostName();
  local closure = function(timer)

  for peername, peer in pairs(self.Peers) do
    local bytessent, err = peer.UdpSender:sendTo(peer.UdpPeerAddress, peer.UdpPeerAddressLen, 
      message, #message);

    end
  end

  return closure
end

This becomes a simple matter of traversing the list of peers, and using the Udp socket that was setup with each one of them to send out the packet we want to send out.

What’s that altogether then? We saw the spawn that sent the Udp listener off in a cooperative manner. And just now I showed the timer going in parallel. The two combined give you a model where you can both listen and send ‘at the same time’.

It looks so easy, and that’s the power of TINN, the scheduler, and Lua’s coroutines. It’s fairly easy to write such code without having to worry overly much about old school things like mutexes, thread context switching, and the like. Just write your code however you want, sprinkle in a few ‘spawn()’ functions here and there, and call it a day. This is how easy it is to write network programs that can walk and chew gum at the same time.

Next time around, I want to write the simple handler that can deal with Win32 message looping and peer networking at the same time. Same general mechanisms, just a cooperative User32 message pump to incorporate.


Follow

Get every new post delivered to your Inbox.

Join 45 other followers