Pinging Peers – Creating Network Meshes

I have the need to create a mesh of network nodes on occasion. The configuration of the mesh is typically a set of VMs running on some cloud service or another. The code running on these VMs may perform some basic task, probably similar across all the VMs. They do need to communicate information with each other, and to do that, they need to maintain a list of their peers and the latencies between them, and the like.

So, I started modeling this thing called the Boundary Gateway Protocol (BGP), which is one of the ancient low lying protocols of the internet. I model the protocol by first creating an object BGPPeer. The intention of this object is to act as both the host and client of the fundamental mesh communications. It looks like this:

-- BGProtocol.lua
--[[
	Implementation of Boundary Gateway Protocol (BGP)
--]]

local ffi = require("ffi");
local Timer = require("Timer")
local IOCPSocket = require("IOCPSocket");
local SocketUtils = require("SocketUtils")
local Network = require("Network")

--[[
	BGP Relies on the presence of a fully connected graph
	The Protocol object is initialized by reading a list of nodes
	and creating the right channels between each of the nodes.
--]]

--[[
print("ComputerNameNetBIOS: ", Network.getHostName(ffi.C.ComputerNameNetBIOS))
print("ComputerNameDnsHostname: ", Network.getHostName(ffi.C.ComputerNameDnsHostname))
print("ComputerNameDnsDomain: ", Network.getHostName(ffi.C.ComputerNameDnsDomain))
print("ComputerNameDnsFullyQualified: ", Network.getHostName(ffi.C.ComputerNameDnsFullyQualified))
print("ComputerNamePhysicalNetBIOS: ", Network.getHostName(ffi.C.ComputerNamePhysicalNetBIOS))
print("ComputerNamePhysicalDnsHostname: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsHostname))
print("ComputerNamePhysicalDnsDomain: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsDomain))
print("ComputerNamePhysicalDnsFullyQualified: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsFullyQualified))
--]]

local selfhost = Network.getHostName(ffi.C.ComputerNameDnsHostname):lower()
--print("Self Host: ",selfhost)

local BGPPeer = {
  HeartbeatInterval = 50 * 1000;	-- send out a hearbeat on this interval
  UdpPort = 1313;					-- port used to communicate UDP
  EliminateSelf = true;			-- don't consider self to be a peer
}
setmetatable(BGPPeer, {
  __call = function(self, ...)
    return self:create(...)
  end,
})

local BGPPeer_mt = {
  __index = BGPPeer,
}

BGPPeer.init = function(self, config)
  local err
  local obj = {
    Config = config,
    Peers = {},
  }

  -- if there is config information, then use
  -- it to setup sockets to the peer
  if config then
    -- create the client socket
    obj.Name = config.name;
    obj.UdpSender, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
    obj.UdpPeerAddress, err = SocketUtils.CreateSocketAddress(config.host, config.udpport or BGPPeer.UdpPort)


    obj.UdpPeerAddressLen = ffi.sizeof(obj.UdpPeerAddress);
  end

  setmetatable(obj, BGPPeer_mt)

  return obj;
end

BGPPeer.create = function(self, configfile)
  -- load the configuration file
  local fs, err = io.open(configfile, "rb")
	
  if not fs then return nil, "could not load file" end

  local configdata = fs:read("*all")
  fs:close();
	
  local func, err = loadstring(configdata)
  local config = func()

  -- create self, so peers can be added
  local obj = self:init()

  -- add the udp socket
  -- Setup the server socket
  obj.UdpListener, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
  if not obj.UdpListener then
    return nil, err;
  end

  local success, err = obj.UdpListener:bindToPort(BGPPeer.UdpPort);

  if not success then
    return nil, err;
  end

  -- for each of the peers within the config 
  -- create a new peer, and add it to the 
  -- list of peers for the first object
  for _,peerconfig in ipairs(config) do
		--print("PEER: ", peerconfig.name, peerconfig.host)
    if peerconfig.name:lower() ~= selfhost then
      obj:createPeer(peerconfig)
    end
  end

  -- Need to setup a listener for the UDP port

  -- add the heartbeat timer
  obj.HeartbeatTimer = Timer({
		Delay =  BGPPeer.HeartbeatInterval;
		Period = BGPPeer.HeartbeatInterval;
		OnTime = obj:onHeartbeat();})

  -- start a fiber which will run the UDP listener loop
  spawn(obj:handleUdpListen())

  return obj;
end

BGPPeer.onHeartbeat = function(self)
  local message = "HELLO FROM: "..Network:getHostName();
  local closure = function(timer)

  for peername, peer in pairs(self.Peers) do
    local bytessent, err = peer.UdpSender:sendTo(peer.UdpPeerAddress, peer.UdpPeerAddressLen, 
      message, #message);

    end
  end

  return closure
end

BGPPeer.addPeer = function(self, peer)
  self.Peers[peer.Name] = peer
end

BGPPeer.createPeer = function(self, peerconfig)
  self:addPeer(BGPPeer:init(peerconfig));
end

local count = 0;
BGPPeer.onReceiveHeartBeat = function(self, buff, bytesread)
  count = count + 1
  print("onReceiveHeartBeat: ", count, ffi.string(buff, bytesread));
end


BGPPeer.handleUdpListen = function(self)
  local bufflen = 1500;
  local buff = ffi.new("uint8_t[1500]");
  local from = sockaddr_in();
  local fromLen = ffi.sizeof(from);


  local closure = function()

    while true do
      local bytesread, err = self.UdpListener:receiveFrom(from, fromLen, buff, bufflen);

      if not bytesread then
        print("receiveFrom ERROR: ", err)
        return false, err;
      end

      self:onReceiveHeartBeat(buff, bytesread)
    end
  end

  return closure
end

return BGPPeer

There are a couple things of note here. One of the basic problems I need to solve is the fact that this is not a basic client/server app, but rather a peer to peer setup. As such, each peer both listens on a port, and sends data.

A typical network ‘server’ app may block on listening, and not perform any actions until it receives a request from a ‘client’. In my case, I don’t want to block, but I do want to initiate a ‘blocking’ loop in parallel with some non-blocking stuff.

In this BGPPeer, there is the handleUdpListen() function. This function is the heart of the Udp listening loop, or the “server” side of the peer communication. This function is run in parallel as it is invoked in the constructor of the object using this:

  spawn(obj:handleUdpListen())

In this particular case, the handleUdpListen() function contains a closure (another function) which is the function that is actually returned, and spawned in its own fiber. The function is pretty straight forward.

  local closure = function()

    while true do
      local bytesread, err = self.UdpListener:receiveFrom(from, fromLen, buff, bufflen);

      if not bytesread then
        print("receiveFrom ERROR: ", err)
        return false, err;
      end

      self:onReceiveHeartBeat(buff, bytesread)
    end
  end

Basically, just run a loop forever, waiting to receiveFrom some Udp packet. Since in TINN all IO calls are cooperative, while this receiveFrom is ‘blocked’, the current coroutine is actually yielding, so that other work can continue. That’s great, that’s exactly what I want. So, now the listening side is setup.

How about the sending side? I want to send out a formatted packet on a regular interval. Since TINN has a Timer object, this is a pretty straight forward matter. Within the constructor of the BGPPeer, we find the following:

  -- add the heartbeat timer
  obj.HeartbeatTimer = Timer({
		Delay =  BGPPeer.HeartbeatInterval;
		Period = BGPPeer.HeartbeatInterval;
		OnTime = obj:onHeartbeat();})

With this setup, the timer will execute the ‘onHeartbeat()’ function every “Period”. This is some number of milliseconds. The default is 50 seconds, but it can be configured to any value.

As we saw from the last article on Timers, this timer object will start automatically, and run cooperatively with anything else running in the process. That’s great. So, onHeartbeat just needs to send out information to all its peers whenever it is called.

BGPPeer.onHeartbeat = function(self)
  local message = "HELLO FROM: "..Network:getHostName();
  local closure = function(timer)

  for peername, peer in pairs(self.Peers) do
    local bytessent, err = peer.UdpSender:sendTo(peer.UdpPeerAddress, peer.UdpPeerAddressLen, 
      message, #message);

    end
  end

  return closure
end

This becomes a simple matter of traversing the list of peers, and using the Udp socket that was setup with each one of them to send out the packet we want to send out.

What’s that altogether then? We saw the spawn that sent the Udp listener off in a cooperative manner. And just now I showed the timer going in parallel. The two combined give you a model where you can both listen and send ‘at the same time’.

It looks so easy, and that’s the power of TINN, the scheduler, and Lua’s coroutines. It’s fairly easy to write such code without having to worry overly much about old school things like mutexes, thread context switching, and the like. Just write your code however you want, sprinkle in a few ‘spawn()’ functions here and there, and call it a day. This is how easy it is to write network programs that can walk and chew gum at the same time.

Next time around, I want to write the simple handler that can deal with Win32 message looping and peer networking at the same time. Same general mechanisms, just a cooperative User32 message pump to incorporate.


ReadFile – The Good, the bad, and the async

If you use various frameworks on any platform, you’re probably an arm’s length away from the nasty little quirks of the underlying operating system.  If you are the creator of such frameworks, the nasty quirks are what you live with on a daily basis.

In TINN, I want to be async from soup to nuts.  All tcp/udp, socket stuff is already that way.  Recently I’ve been adding async support for “file handles”, and let me tell you, you have to be very careful around these things.

In the core windows APIs, in order to read from a file, you do two things.  You first open a file using the CreateFile(), function.  This may be a bit confusing, because why would you use “create” to ‘open’ an existing file?  Well, you have to think of it like a kernel developer might.  From that perspective, what you’re doing is ‘create a file handle’.  While you’re doing this, you can tell the function whether to actually create the file if it doesn’t exist already, open only if it exists, open read-only, etc.

The basic function signature for CreateFile() looks like this:

HANDLE WINAPI CreateFile(
  _In_      LPCTSTR lpFileName,
  _In_      DWORD dwDesiredAccess,
  _In_      DWORD dwShareMode,
  _In_opt_  LPSECURITY_ATTRIBUTES lpSecurityAttributes,
  _In_      DWORD dwCreationDisposition,
  _In_      DWORD dwFlagsAndAttributes,
  _In_opt_  HANDLE hTemplateFile
);

Well, that’s a mouthful, just to get a file handle. But hay, it’s not much more than you’d do in Linux, except it has some extra flags and attributes that you might want to take care of. Here’s where the history of Windows gets in the way. There is a much simpler function “OpenFile()”, which on the surface might do what you want, but beware, it’s a lot less capable, a leftover from the MSDOS days. The documentation is pretty clear about this point “don’t use this, use CreateFile instead…”, but still, you’d have to wade through some documentation to reach this conclusion.

Then, the ReadFile() function has this signature:

BOOL WINAPI ReadFile(
  _In_         HANDLE hFile,
  _Out_        LPVOID lpBuffer,
  _In_         DWORD nNumberOfBytesToRead,
  _Out_opt_    LPDWORD lpNumberOfBytesRead,
  _Inout_opt_  LPOVERLAPPED lpOverlapped
);

Don’t be confused by another function, ReadFileEx(). That one sounds even more modern, but in fact, it does not support the async file reading that I want.

Seems simple enough. Take the handle you got from CreateFile(), and pass it to this function, including a buffer, and you’re done? Well yah, this is where things get really interesting.
Windows supports two forms of IO processing. Async, and synchronous. The Synchronous case is easy. You just make your call, and your thread will be blocked until the IO “completes”. That is certainly easy to uderstand, and if you’re a user of the standard C library, or most other frameworks, this is exactly the behaviour you can expect. Lua, by default, using the standard io library will do exactly this.

The other case is when you want to do async io. That is, you want to initiate the ReadFile() and get an immediate return, and handle the processing of the result later, perhaps with an alert on an io completion port.

Here’s the nasty bit. This same function can be used in both cases, but has very different behavior. It’s a subtle thing. If you doing synchronous, then the kernel will track the fileposition, and automatically update it for you. So, you can do consecutive ReadFile() calls, and read the file contents from beginning to end.

But… When you do things async, the kernel will not track your file pointer. Instead, you must do this on your own! When you do async, you pass in a instance of a OVERLAPPED structure, wich contains things like a pointer to the buffer to be filled, as well as the size of the buffer. This structure also contains things like the offset within the file to read from. By default, the offset is ‘0’, which will have you reading from the beginning of the file every single time.

typedef struct _OVERLAPPED {
    ULONG_PTR Internal;
    ULONG_PTR InternalHigh;
    union {
        struct {
            DWORD Offset;
            DWORD OffsetHigh;
        };

        PVOID Pointer;
    };

    HANDLE hEvent;
} OVERLAPPED, *LPOVERLAPPED;

You have to be very careful and diligent with using this structure, and the proper calling sequences. In addition, if you’re going to do async, you need to call CreateFile() with the appropriate OVERLAPPED flag. In TINN, I have created the NativeFile object, which pretty much deals with all this subtlety. The NativeFile object presents a basic block device interface to the user, and wraps up all that subtlety such that the interface to files is clean and simple.

-- NativeFile.lua

local ffi = require("ffi")
local bit = require("bit")
local bor = bit.bor;

local core_file = require("core_file_l1_2_0");
local errorhandling = require("core_errorhandling_l1_1_1");
local FsHandles = require("FsHandles")
local WinBase = require("WinBase")
local IOOps = require("IOOps")

ffi.cdef[[
typedef struct {
  IOOverlapped OVL;

  // Our specifics
  HANDLE file;
} FileOverlapped;
]]

-- A win32 file interfaces
-- put the standard async stream interface onto a file
local NativeFile={}
setmetatable(NativeFile, {
  __call = function(self, ...)
    return self:create(...);
  end,
})

local NativeFile_mt = {
  __index = NativeFile;
}

NativeFile.init = function(self, rawHandle)
	local obj = {
		Handle = FsHandles.FsHandle(rawHandle);
		Offset = 0;
	}
	setmetatable(obj, NativeFile_mt)

	if IOProcessor then
		IOProcessor:observeIOEvent(obj:getNativeHandle(), obj:getNativeHandle());
	end

	return obj;
end

NativeFile.create = function(self, lpFileName, dwDesiredAccess, dwCreationDisposition, dwShareMode)
	if not lpFileName then
		return nil;
	end
	dwDesiredAccess = dwDesiredAccess or bor(ffi.C.GENERIC_READ, ffi.C.GENERIC_WRITE)
	dwCreationDisposition = dwCreationDisposition or OPEN_ALWAYS;
	dwShareMode = dwShareMode or bor(FILE_SHARE_READ, FILE_SHARE_WRITE);
	local lpSecurityAttributes = nil;
	local dwFlagsAndAttributes = bor(ffi.C.FILE_ATTRIBUTE_NORMAL, FILE_FLAG_OVERLAPPED);
	local hTemplateFile = nil;

	local rawHandle = core_file.CreateFileA(
        lpFileName,
        dwDesiredAccess,
        dwShareMode,
    	lpSecurityAttributes,
        dwCreationDisposition,
        dwFlagsAndAttributes,
    	hTemplateFile);

	if rawHandle == INVALID_HANDLE_VALUE then
		return nil, errorhandling.GetLastError();
	end

	return self:init(rawHandle)
end

NativeFile.getNativeHandle = function(self)
  return self.Handle.Handle
end

-- Cancel current IO operation
NativeFile.cancel = function(self)
  local res = core_file.CancelIo(self:getNativeHandle());
end

-- Close the file handle
NativeFile.close = function(self)
  self.Handle:free();
  self.Handle = nil;
end

NativeFile.createOverlapped = function(self, buff, bufflen, operation, deviceoffset)
	if not IOProcessor then
		return nil
	end

	fileoffset = fileoffset or 0;

	local obj = ffi.new("FileOverlapped");

	obj.file = self:getNativeHandle();
	obj.OVL.operation = operation;
	obj.OVL.opcounter = IOProcessor:getNextOperationId();
	obj.OVL.Buffer = buff;
	obj.OVL.BufferLength = bufflen;
	obj.OVL.OVL.Offset = deviceoffset;

	return obj, obj.OVL.opcounter;
end

-- Write bytes to the file
NativeFile.writeBytes = function(self, buff, nNumberOfBytesToWrite, offset, deviceoffset)
	fileoffset = fileoffset or 0

	if not self.Handle then
		return nil;
	end

	local lpBuffer = ffi.cast("const char *",buff) + offset or 0
	local lpNumberOfBytesWritten = nil;
	local lpOverlapped = self:createOverlapped(ffi.cast("uint8_t *",buff)+offset,
		nNumberOfBytesToWrite,
		IOOps.WRITE,
		deviceoffset);

	if lpOverlapped == nil then
		lpNumberOfBytesWritten = ffi.new("DWORD[1]")
	end

	local res = core_file.WriteFile(self:getNativeHandle(), lpBuffer, nNumberOfBytesToWrite,
		lpNumberOfBytesWritten,
  		ffi.cast("OVERLAPPED *",lpOverlapped));

	if res == 0 then
		local err = errorhandling.GetLastError();
		if err ~= ERROR_IO_PENDING then
			return false, err
		end
	else
		return lpNumberOfBytesWritten[0];
	end

	if IOProcessor then
    	local key, bytes, ovl = IOProcessor:yieldForIo(self, IOOps.WRITE, lpOverlapped.OVL.opcounter);
--print("key, bytes, ovl: ", key, bytes, ovl)
	    return bytes
	end
end

NativeFile.readBytes = function(self, buff, nNumberOfBytesToRead, offset, deviceoffset)
	offset = offset or 0
	local lpBuffer = ffi.cast("char *",buff) + offset
	local lpNumberOfBytesRead = nil
	local lpOverlapped = self:createOverlapped(ffi.cast("uint8_t *",buff)+offset,
		nNumberOfBytesToRead,
		IOOps.READ,
		deviceoffset);

	if lpOverlapped == nil then
		lpNumberOfBytesRead = ffi.new("DWORD[1]")
	end

	local res = core_file.ReadFile(self:getNativeHandle(), lpBuffer, nNumberOfBytesToRead,
		lpNumberOfBytesRead,
		ffi.cast("OVERLAPPED *",lpOverlapped));

	if res == 0 then
		local err = errorhandling.GetLastError();

--print("NativeFile, readBytes: ", res, err)

		if err ~= ERROR_IO_PENDING then
			return false, err
		end
	else
		return lpNumberOfBytesRead[0];
	end

	if IOProcessor then
    	local key, bytes, ovl = IOProcessor:yieldForIo(self, IOOps.READ, lpOverlapped.OVL.opcounter);

    	local ovlp = ffi.cast("OVERLAPPED *", ovl)
    	print("overlap offset: ", ovlp.Offset)

--print("key, bytes, ovl: ", key, bytes, ovl)
	    return bytes
	end

end

return NativeFile;

This is enough of a start. If you want to simply open a file:

local NativeFile = require("NativeFile")
local fd = NativeFile("sample.txt");

From there you can use readBytes(), and writeBytes(). If you want to do streaming, you can feed this into the new and improved Stream class like this:

local NativeFile = require("NativeFile") 
local Stream = require("stream") 
local IOProcessor = require("IOProcessor")

local function main()

  local filedev, err = NativeFile("./sample.txt", nil, OPEN_EXISTING, FILE_SHARE_READ)

  -- wrap the file block device with a stream
  local filestrm = Stream(filedev)

  local line1, err = filestrm:readLine();  
  local line2, err = filestrm:readLine();  
  local line3, err = filestrm:readLine()

  print("line1: ", line1, err)  
  print("line2: ", line2, err)  
  print("line3: ", line3, err) 
end

run(main)

The Stream class looks for readBytes() and writeBytes(), and can provide the higher level readLine(), writeLine(), read/writeString(), and a few others. This is great because it can be fed by anything that purports to be a block device, which could be anything from an async file, to a chunk of memory.

And that’s about it for now. There are subtleties when dealing with async file access in windows. Having a nice abstraction on top of it gives you all the benefits of async without all the headaches.

 


Its About Time – TINN Timers

The last time I wrote about time: Hurry Up and Wait –  TINN Timing the focus was on introducing the wait() function and explaining how that fits into the TINN scheduler in general.

Having a wait() function is a great low level primitive.  It allows you to pause execution for an amount of time, in a synchronous manner.  That will work well when you’re doing something serially and need to take break at specified intervals.  What if your usage pattern is more complex though?  More likely than not, if you’re writing a web server of some sort, you’ll be doing this sort of sequence:
executeasynccodethattakestime()

if codeNotDoneAfter sometime then

cancel code

end

do other stuff while waiting

Basically, I need to do something like send off a web request.  If the request has not been satisfied within a specified amount of time, I want to cancel the operation.  I need to be able to do this asynchronously because I may have many requests in flight.  So, what to do?  I obviously need a Timer of some sort that will deal with this for me.

local Task = require("IOProcessor")

local Timer = {}
setmetatable(Timer, {
  __call = function(self, ...)
    return self:create(...)
  end,
});

local Timer_mt = {
  __index = Timer;
}

Timer.init = function(self,params)
  local obj = {
    Delay = params.Delay;
    Period = params.Period;
    Running = false;
    TimerFunc = params.TimerFunc;
  }
  setmetatable(obj, Timer_mt);

  if self.Running then
    obj:start()
  end

  return obj;
end

Timer.create = function(self, ...)
  return self:init(...);
end

Timer.start = function(self)
  self.Running = true;

  local function closure()
    if self.Delay then
      wait(self.Delay);
      self.TimerFunc(self);
    end

    if not self.Period then
      return
    end

    while self.Running do
      wait(self.Period)
      self.TimerFunc(self);
    end
  end

  spawn(closure);
end

Timer.stop = function(self)
  self.Running = false;
end

return Timer

To dissect, basically an object that provides easy wrapper convenience for the wait() function. You specify an initial delay, and a period and call the start() function. Start will spawn the actual function that is involved in doing the waiting and executing of the specified function.

Here is a simple use case:

-- test_Timer.lua

local Timer = require("Timer")

local function printThis(timer)
  print("this")
end

local function main()
  local timer = Timer {Delay = 1*1000, Period = 300, TimerFunc = printThis}

  timer:start();

  -- wait a few seconds, then stop the time
  print("wait 4 seconds...")
  wait(4*1000)
  print("stop timer")
  timer:stop(1000*4);

  -- Wait a few more seconds then exit
  print("wait 2 seconds...")
  wait(1000*2);
end

run(main)

In this case, I create a timer that has an initial 1 second delay, and a period of 300 milliseconds. So, after the initial delay, the printThis() function will be called. Then every 300 milliseconds after that, it will be called again.

In the sample, the timer is started, which causes it to run independently of the main task. Within the main task, wait 4 seconds, then call the stop() function on the time. Wait two more seconds, and finally exit altogether. This shows that a timer can run independently. The function that is called can be anything. If you want the function to have some parameters, it is itself probably a closure (function within a function). Additionally, since the function is passed the timer as the only parameter, it can cause the timer to stop. Here’s another example where a timer is running, and will stop after a certain number has been reached.

local count = 0;
local function counter(timer)
  count = count + 1;
  if count >= 5 then
    timer:stop();
  end
  print("counter: ",count)
end

local testCounter = function()
  local timer = Timer {Period = 200, TimerFunc = counter, Running=true}
end

run(testCounter)

The counter function is simple. Basically, increment a counter. Once it reaches 5, stop the time. Starting the counter in the first place is also fairly easy. Just set a period (forget the initial delay), tell it which function is to be executed every period, and set it to run automatically (without requiring an explicit ‘start()’).

This will call the function once every 200 milliseconds, and then quit. Nice to have.

With this little component in hand, you can probably imagine how I/O might be accomplished with timeouts. Basically, any I/O operation would look like:

function cancel(ioperator)
  local function closure(timer)
    timer:stop();
    ioperator:cancel();
  end
  return closure;
end

op = startioop(someparams)
timer=Timer{Delay=1000, TimerFunc=cancel(op), Running=true}

Here, a timer is created with a delay of one second. After one second, the timer fires and the cancel.closure() function is called. The operation is cancelled, if it’s not already done, and that’s the end of things. This assumes that there is no harm in canceling an already finished transaction.

Well, that’s just great. I/O with timeouts fully implemented in user space, asynchronous, and all that. This is a great thing for timers, io, and TINN code in general.

The only thing missing is the ability to do async on more than just sockets. So, next up is async ‘file’ I/O in general, of which sockets are a specialized subset.


The Rise of Tiny Things

When I first started playing around with LuaJIT, one of the earliest thoughts I had was “I think this runtime specifies a new base machine”. I truly believe that we’re now at a point in the CPU performance curve that the core “CPU” can be totally done in software. Cases in point:

Tessel – A new tiny device in development which is meant to run node.js code, on something the size of a microcontroller.  The basic specs:

  • 180mhz ARM Cortex-M3 LPC1830
  • 32mb SDRAM
  • 32mb Flash
  • TI CC3000 WiFi radio

Why is this interesting?  Because at its very core, they translate the javascript to Lua for faster execution on the tiny little machine.  In this way, they don’t really care that they’re running on a Cortex-M3, they are really running on a Lua VM.

Then, there’s this interesting bit about [pdf] Lua becoming available in the NetBSD kernel.  That’s not quite the same thing as the Tessel, but again, this tiny little bit of runtime is like putting a small machine in place.  Now, various device drivers and other stuff might actually be implemented using Lua, and in a kernel space no less!

But, it’s not all about Lua.  Javascript, another relatively tiny thing, is starting to emulate larger things.  An x86 emulator written in Javascript!  This parrots the previous effort of the same, which was jslinux.

I mean come on, seriously!  Yes, things have reached the level of rediculocity such that a lowly scripting language can be used to emulate an entire CPU!  How cools is that.  And, yet again, representative of the rise of tiny things.

When we get to the point when you can front end a FPGA with Lua or Javascript, then we’ll reach truly ridiculous levels of ease and proliferation.

I think this is cools because for one, it raises the level of performance for small things.  By ‘performance’, I mean the number of people who have access to fairly low level hardware is increasing because the tools used are very simple.  the average javascript developer being able to play with microcontroller/processor based systems?  Simply amazing.

I am glad to see this happening.  If a guy can create an x86 emulator, then he can design his own more specific CPU emulator.  The next guy can write the FPGA interface, and suddenly, the strangle hold on processor evolution is broken!!

Or at the very least the world of programming is just becoming curiouser and curioser.

 


Optimize At the Right Time and Level

It takes a long time to do the simplest things correctly. You want to receive some bytes on a socket?
int recv(SOCKET s,char *buf,int len,int flags);

No problem right? How about that socket? How did that get setup? Is it right for low latency, big buffers, small buffers? what? And what about that buffer? and lord knows which are the best flags to use? First question I guess is, ‘for what purpose’?

For the longest time, I have known that my IP sockets were not the most performant. Through rounds of trying to get low level networking, all the way up through http handling working correctly, I deliberately made some things extremely brain dead simple.

One of those things was my readline() function. this is the workhorse of http processing. Previously, I would literally read a single character at a time, looking for those line terminators. Which meant making that recv() call a bazillion times during normal http processing.

Well, it has worked great, reduces the surface area to help flesh out various bugs, and allowed me to finally implement the IO Completion Port based networking stack that I now have. It works well enough. But, it’s time to optimize. I am now about to add various types of streams such as compressed, encrypted, and the like. For that I need to actually buffer up chunks at a time, not read single characters at a time.

So, I finally optimized this code.

function IOCPNetStream:refillReadingBuffer()
	print("IOCPNetStream:RefillReadingBuffer(): ",self.ReadingBuffer:BytesReadyToBeRead());

	-- if the buffer already has data in it
	-- then just return the number of bytes that
	-- are currently ready to be read.
	if self.ReadingBuffer:BytesReadyToBeRead() > 0 then
		return self.ReadingBuffer:BytesReadyToBeRead();
	end

	-- If there are currently no bytes ready, then we need
	-- to refill the buffer from the source
	local err
	local bytesread

	bytesread, err = self.Socket:receive(self.ReadingBuffer.Buffer, self.ReadingBuffer.Length)

	print("-- LOADED BYTES: ", bytesread, err);

	if not bytesread then
		return false, err;
	end

	if bytesread == 0 then
		return false, "eof"
	end

	self.ReadingBuffer:Reset()
	self.ReadingBuffer.BytesWritten = bytesread

	return bytesread
end

Basically, whenever some bytes are needed within the stream, the refillReadingBuffer() function is called. If there are still bytes sitting in the buffer, then those bytes are used. If it’s empty, then new bytes are read in. The buffer can be whatever size you want, so if your app is better off reading 1500 bytes at a time, then make it so. If you find performance is best at 4096 bytes per read, then set it up that way.

Now that pre-fetching is occuring, all operations related to reading bytes from the source MUST go through this buffer, or bytes will be missed. So, the readLine() function looks like this now:

function IOCPNetStream:readByte()
	-- First see if we can get a byte out of the
	-- Reading buffer
	local abyte,err = self.ReadingBuffer:ReadByte()

	if abyte then
		return abyte
	end

	-- If we did not get a byte out of the reading buffer
	-- try refilling the buffer, and try again
	local bytesread, err = self:refillReadingBuffer()

	if bytesread then
		abyte, err = self.ReadingBuffer:ReadByte()
		return abyte, err
	end

	-- If there was an error
	-- then return that error immediately
	print("-- IOCPNetStream:readByte, ERROR: ", err)
	return false, err
end

function IOCPNetStream:readOneLine(buffer, size)
	local nchars = 0;
	local ptr = ffi.cast("uint8_t *", buff);
	local bytesread
	local byteread
	local err

	while nchars < size do
		byteread, err = self:readByte();

		if not byteread then
			-- err is either "eof" or some other socket error
			break
		else
			if byteread == 0 then
				break
			elseif byteread == LF then
				--io.write("LF]\n")
				break
			elseif byteread == CR then
				-- swallow it and continue on
				--io.write("[CR")
			else
				-- Just a regular character, so save it
				buffer[nchars] = byteread
				nchars = nchars+1
			end
		end
	end

	if err and err ~= "eof" then
		return nil, err
	end

	return nchars
end

The readLine() is still reading a byte at a time, but this time, instead of going to the underlying socket directly, it’s calling the stream’s version of readByte(), which in turn is reading a single byte from the pre-fetched buffer. That’s a lot faster than making the recv() system call, and when the buffer runs out of bytes, it will be automatically refilled, until there is a socket error.

Well, that’s just great. The performance boost is one thing, but there is another benefit. Now that I am reading in controllable chunks at a time, I could up the size to say 16K for the first chunk read for an http request. That would likely get all the data for whatever headers there are included in the buffer up front. From there I could do simple pointer offset/size objects, instead of creating actual strings for every line (creates copies). That would be a great optimization.

I can wait for further optimizations. Getting the pre-fetch working, and continuing to work with IOCP is a more important optimization at this time.

And so it goes. Don’t optimize too early until you really understand the performance characteristics of your system. Then, only belatedly, do what minimal amount you can to achieve the specific goal you are trying to achieve before moving on.


Exposing Yourself to the InterWebs – The Service Machine

Last time around, I eagerly put myself out onto the internet without a care in the world. Using nothing more than my standard home desktop machine, and features of my home router. I was probeb almost immediately, but it worked.

Next steps, I want to have a machine which is dedicated to the job of dealing with stuff to be accessible from the internet. I purchased one of those “barebones” PC systems from Shuttle. I configured it like this:

SZ77R5
32Gb RAM
intel i73770K processor
ASUS DVD-ROM
Seagate 2Tb drive
Crucial 256Gb mSata SSD
Samsung 256Gb SSD

I did not purchase an additional video card because although the specs are fairly beefy, this is not meant to be a gaming rig.

The first time I built a PC with my daughter, about 10 years ago I think, it was a pair of Shuttle PCs. They’re fairly straight forward to build, just purchase the right components.

This build was fairly straight forward, except for one little piece. When first bringing up the system, I had to connect my standard viewsonic monitor which has vga/dvi/hdmi connectors on it. At first, I tried the HDMI connector, because that typically works. Well, it was a real head scratcher as I never saw anything on the monitor, even though it sounded like it was ‘posting’ to the BIOS. Scratch scratch, go to bed. Next morning, I thought I’d try to swap out the cable to using the DVI connector instead, and voila! that worked.

That’s an interesting bit of news right there. I was having a similar problem with input selection using my Odroid XU trying to startup Ubuntu. Same solution, try to use a monitor that has dedicated HDMI, or use the DVI connector if you can. Once the OS is installed, it will likely figure it all out correctly, but until then, no such luck.

My intention is to install Windows Server 2012. That’s so I can run Hyper-V and have some options as to OS installs and the like. More than likely, I’ll run a server os as one of the VMs, and a client OS just for kicks. I’d like to have a Windows 7 client available, as well as Windows 8. Of course, this is breaking with my intention to have specialized machines, but given the amount of storage available, and the excessive amount of storage, I figure I can have this little bit of fun.

I haven’t installed 2012 as yet, so I thought I’d try out one of those Linux Distros on a stick. I downloaded an instance of Tiny Core Linux, and booted the thing up. No problems. Then I created a USB keychain of the same, and again, no problem. It strikes me that this is a fairly easy way to test whether a machine is fundamentally working or not. I also like the fact that you can just pick up all your goods and move from machine to machine. Reminds me of the Raspberry Pi, and the Odroid, where the whole OS and your storage can be on an SD card, making it easy to move around.

In the coming week, I’ll setup the Windows Server OS images, and start putting content on the machine. Along with a web server, I want to host my various 3D models, and even a blog thing if I can.

It would also be nice to try out putting the machine in ‘the dmz’ and see how much of a gateway I can really make out of the machine. But, that’s a longer term thing.

For now, it’s just fun to have a good solid modern machine that is much quieter than my previous desktop machine, takes up less space, uses less power, and has a lot more capabilities.


How About that Web Server Again?

Has it really been more than a month?

Well, you see, I bought a house, packed, moved, took daughter to college, and…

The last time I wrote about making a simple WebServer, I left the job half finished. I showed that through layering, from a basic socket server, you could then build a simple http handler, which had not much more than an ‘OnRequest’ function.

That works great, and gives you the right level of access if you’re going to do things with http in general, that are beyond simple static file serving. But what if your interactions are more complex than you care to deal with using simple ‘if-then-else’ logic.

This time around, I’m going to use a new and improved form of the WebApp which first of all deals with the new and improved IOCP based sockets, and leverages the HttpServer object of yore.

Here is the basic web server:

-- main.lua
local WebApp = require("WebApp")
local resourceMap = require("ResourceMap");

local port = arg[1] or 8080

local app = WebApp(resourceMap, port);
app:run();

And you simply invoke it with:

tinn main.lua 8080

Well, that seems easy enough. If you wanted to get real crazy, and win an obfuscated code competition, you could do it in one line:

-- main.lua
require("WebApp")(require("ResourceMap"), arg[1] or 8080):run()

Or, if you’re a real Lua head, you could do it all on the command line, without even using the ‘main.lua’ file:

tinn -e "require('WebApp')(require('ResourceMap'), arg[1] or 8080):run()"

That’s all fun and games, and it’s cool to see that you can implement a web server in a single command line. But what’s in that magical ResourceMap file?

local URL = require("url");
local StaticService = require("StaticService");

local function replacedotdot(s)
  return string.gsub(s, "%.%.", '%.')
end

local HandleFileGET = function(request, response)
  local absolutePath = replacedotdot(URL.unescape(request.Url.path));
  local filename = './wwwroot'..absolutePath;

  StaticService.SendFile(filename, response)
  return false;
end

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
  };
}

return ResourceMap;

The idea here is that we want to route any ‘GET’ methods to the HandleFileGET() function. There is a ResourceMapper object within tinn that utilizes a structurce such as the one in ResourceMap. The general layout is {[pattern] = {name=””, METHOD = function [,METHOD = function]},}

Using this simple mechanism, you can easily route the handling of a particular resource request to a particular function.

This table can have entries that are much more interesting. For example, if you want to handle the retrieval of ‘favicon.ico’ differently than other files, you can just add a specific mapping for that.

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

You could have multiple methods per resource as well:

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
    POST = HandleFilePOST,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

In general, the longest prefix match algorithm is applied to whatever is supplied within the resource field of the request. If you want to deal with all methods of a particular resource, without having to specify them explicitly, then you can use the magic method ‘_DEFAULT’.

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
    POST = HandleFilePOST,
    _DEFAULT = HandleFileDEFAULT,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

In this way, if there is a request for a resource, and a method that we don’t know about at the time of creating the map, the HandleFileDEFAULT() function will be called to deal with it. That might be handy in the case where you’d like to handle these unknown method requests in a generic way, or you might want to change it over time without having to change your mapping.

Another case is when the resource isn’t actually a resource. Take for example a ‘CONNECT’ request:

CONNECT localhost:9000

In this case, the ‘resource’ does not start with a ‘/’, so the longest prefix match won’t land it on anything in our map. I need to deal with these with a different pattern. Well, the pattern part of the map is nothing more than a standard pattern in the Lua sense of the word, so a ‘.’ will l match any character. The following map will do the trick.

local ResourceMap = {
  ["."] = {name=".",
    CONNECT = HandleCONNECT,
  };

  ["/"] = {name="/",
    GET = HandleFileGET,
    POST = HandleFilePOST,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

In this way, we’ll deal with the CONNECT method if it shows up.

This is an affirmative list. If there is a match to one of the patterns, then the mapped function is executed. If no pattern is found, either because the resource itself does not match, or because the resource does not have a function to handle the specified method, then a general 404 error is returned.

That’s about all there is to it. Create a mapping between URLs and functions, and you’re all done. Of course there’s the function itself:

local URL = require("url");
local StaticService = require("StaticService");

local function replacedotdot(s)
  return string.gsub(s, "%.%.", '%.')
end

local HandleFileGET = function(request, response)
  local absolutePath = replacedotdot(URL.unescape(request.Url.path));
  local filename = './wwwroot'..absolutePath;
	
  StaticService.SendFile(filename, response)

  return false;
end

Not a lot of magic in this particular one. First of all, there’s that simplistic ‘replacedotdot()’ function. That’s just a casual attempt to restrict the file access to the directory of our choosing. In HandleFileGET, the first thing to do is urldecode the path specified, then feed that to the replacedotdot() function. Then, take whatever comes out of that, and prepend it with ‘./wwwroot’, and finally feed that to the StaticService.SendFile() function, which is a standard part of TINN.

The return value of this function has meaning. If you return false, or ‘nil’, then the socket representing this particular request will be recycled, assuming there is potentially another request coming on the same socket.

If you instead return ‘true’, then the system assumes you are handling any subsequent recycling, or closing, and it will not take any further action with the underlying socket.

This is a nice feature in that it allows you to construct much more interesting interactions than the standard request/response, without much fuss. For example, you could easily open up websockets, upgrade connections in general, or do whatever you want.

At any rate, with TINN, you can create a simple web server in a single command line. I find that to be a fairly useful thing.


Hurry Up and Wait – TINN Timing

Moving right along. First I needed to do basic networking. Starting at the lowest level of socket interaction, advancing up the stack through specific TCP and HTTP uses. Then back down to UDP.

With basic async networking covered, the next thing that comes up is timing. The general socket IO is covered. You can basically build an entire service using nothing more than asynchronous socket calls. But, most servers are more interesting than that. There are situations where you’ll want to cancel out an async operation if it’s taking too long, or you might want to perform some operation over time, repeatedly. So, clearly the TINN system needs some concept of time management.

Here’s the kind of code I would like to write in order to do something every 500 milliseconds:

require ("IOProcessor");

local test_wait = function(interval)
  while true do
    wait(interval);
    print(string.format("interval: %d", IOProcessor.Clock:Milliseconds()));
  end
end

run(test_wait)

Basically, there is a new ‘wait()’ function. You give it a number of milliseconds, and it will suspend the coroutine you’re currently in for the given amount of time. This capability comes courtesy of some changes to the base scheduler. The changes are the following:

wait = function(millis)
  local nextTime = IOProcessor.Clock:Milliseconds() + millis;
  return IOProcessor:yieldUntilTime(nextTime);
end

IOProcessor.yieldUntilTime = function(self, atime)
  if self.CurrentFiber ~= nil then
    self.CurrentFiber.DueTime = atime;
    tabutils.binsert(self.FibersAwaitingTime, self.CurrentFiber, compareTaskDueTime);

    return self:yield();
  end
  return false;
end

The yieldUntilTime() function will take the currently running fiber (coroutine) and put it into the list of FibersAwaitingTime. This is simply a table which is maintained in sorted order from lowest to highest. Once a fiber is placed on this list, it is no longer in the list of currently active fibers. it will sit on this list until it’s DueTime has passed.

The main scheduling loop will step through the fibers that are sitting in the AwaitingTime list using the following:

IOProcessor.stepTimeEvents = function(self)
  local currentTime = self.Clock:Milliseconds();

  -- traverse through the fibers that are waiting
  -- on time
  local nAwaiting = #self.FibersAwaitingTime;
  for i=1,nAwaiting do

    local fiber = self.FibersAwaitingTime[1];
    if fiber.DueTime <= currentTime then
      -- put it back into circulation
      fiber.DueTime = 0;
      self:scheduleFiber(fiber);

      -- Remove the fiber from the list of fibers that are
      -- waiting on time
      table.remove(self.FibersAwaitingTime, 1);
    end
  end
end

Basically, step through the list of fibers that are waiting for their wait time to expire. For all those that qualify, put them back into the list of active fibers by calling the ‘shcheduleFiber()’ function.

This begins to get very interesting I think. Of course once you create a timer, or even async i/o, you probably also want the ability to cancel such operations. But, that’s another matter.

Doing this little exercise, the scheduler begins to take on some more of the attributes of what you might find in the core OS. The big difference is that everything is done in the Lua space, and does not rely on OS primitives, so it is actually somewhat portable to other architectures. That’s a nice idea to have such a nice scheduler available across multiple LuaJIT environments.

While going through this exercise, I also started looking at other features of schedulers, thinking about priorities, and other factors that might go into a really good scheduler. So, at least one thing becomes apparent to me. Having this scheduler readily available and transformable in the Lua space makes it relatively easy to try out different scheduling techniques that will match the particular situation at hand. There is even the possibility of changing the scheduling algorithm dynamically based on attributes of the running system.

Exciting times.


How About that Web Server

Although echo servers are the “Hello, World” of network programs, http servers are much more useful.

local HttpServer = require("HttpServer");
local StaticService = require("StaticService");

-- a simple ping template
local pingtemplate = [[
<html>
  <head>
    <title>HTTP Server</title>
  </head>
  <body>ping</body>
</html>
]]

-- Called every time there is a new http request
local OnRequest = function(param, request, response)
  if request.Url.path == "/ping" then
    response:writeHead("200")
    response:writeEnd(pingtemplate);
  else
    local filename = './wwwroot'..request.Url.path;
    StaticService.SendFile(filename, response);
  end
end

local server = HttpServer(8080, OnRequest);
server:run();

This looks a little bit like the echo service which was based on the raw socket server. Instead of “OnAccept”, this one implements “OnRequest”. The ‘request’ is an instance of a ‘WebRequest’ object, which contains all the stuff you’d expect in a WebRequest (resource, headers…). The routine is also handed a ‘WebResponse’ object as well. This is a simple convenience because it just wraps the netstream that is associated with the request object.

The HttpServer code itself looks like this:

local SocketServer = require("SocketServer")

local IOCPSocket = require("IOCPSocket")
local IOCPNetStream = require("IOCPNetStream");
local WebRequest = require("WebRequest");
local WebResponse = require("WebResponse");
local URL = require("url");

HttpServer = {}
setmetatable(HttpServer, {
  __call = function(self, ...)
    return self:create(...);
  end,
});

HttpServer_mt = {
  __index = HttpServer;
}

HttpServer.init = function(self, port, onRequest, onRequestParam)
  local obj = {
    OnRequest = onRequest;
    OnRequestParam = onRequestParam;
  };
  setmetatable(obj, HttpServer_mt);
	
  obj.SocketServer = SocketServer(port, HttpServer.OnAccept, obj);

  return obj;
end

HttpServer.create = function(self, port, onRequest, onRequestParam)
  return self:init(port, onRequest, onRequestParam);
end


HttpServer.OnAccept = function(self, sock)
  local socket = IOCPSocket:init(sock, IOProcessor);
  local stream, err = IOCPNetStream:init(socket);

  if self.OnRequest then
    local request, err  = WebRequest:Parse(stream);

    if request then
      request.Url = URL.parse(request.Resource);
      local response = WebResponse:OpenResponse(request.DataStream)
      self.OnRequest(self.OnRequestParam, request, response);
    else
      print("HandleSingleRequest, Dump stream: ", err)
    end
  else
    -- do nothing and let the socket close
  end
end

HttpServer.run = function(self)
  return self.SocketServer:run();
end

return HttpServer;

The ‘OnAccept()’ function this time around takes the unadorned socket, wraps it into a nicer socket object (so the io completion stuff can happen), and then uses the HttpRequest object to parse what’s on the stream. If the request is found the be intact, a response object is created and the two are handed off to the ‘OnRequest’ function if it exists.

This construct allows you to compose a webserver to meet your needs. You can spawn wherever you want, to run whichever part you want to run in parallel. At the top end, the consumer of this object won’t know the different, and can thus just handle the individual requests.

So, what’s so good about all this?
Well, first of all the TINN runtime, all up, is about 3Mb.
What you get for that is access to pretty much all the interesting stuff that Windows APIs have to offer. Whether it be network, OS, graphics, crypto, or multi-thread related, it’s all available right there in the little package.

This is good when you want to start creating simple REST based web services for this and that. For example, if you want to expose a webcam feed from your PC, or your PC acts as a hub for various wireless “internet of things” devices around your home, or whatever, you just write some lua code, without worrying about interop libraries, compiling, or anything else more interesting. Just a little script and away you go.


Name That Framework – Echo Service in two lines of code

… and here they are:

SocketServer = require("SocketServer");
SocketServer(9090):run(function(s, b, l)  s:send(b, l); end);

Back in the day there was this gameshow called “Name That Tune”, where contestants would be told a clue about a song, then they would bid on the fewest number of notes it would take for them to name the tune. Once the bids were fixed, the orchestra would play the number of notes, and the contestant would have to correctly guess the name of the tune.

So, above are two lines of code which implement a highly scalable “echo” service. Can you name the framework?

It’s TINN of course!

Here’s a more reasonable rendition of the same:

local SocketServer = require("SocketServer");

local function OnData(socket, buff, bufflen)
  socket:send(buff, bufflen);
end;

local server = SocketServer(9090);
server:run(OnData)

Simply put, a SocketServer is a generic service that will listen on a particular port that you specify. Whenever it receives any data on the port, it will call the supplied ‘OnData’ function. Each time ‘OnData’ is called, it could be with a different socket and data. You could build a fairly rudimentary http server on top of this if you like. what’s most important to me is the fact that you don’t have to write any of the underlying low level networking code. Nothing about accept, IO Completion ports, etc. Just, call me when some data comes in on this specified port.

The SocketServer code itself looks like this:

local ffi = require("ffi");
local IOProcessor = require("IOProcessor");
local IOCPSocket = require("IOCPSocket");

IOProcessor:setMessageQuanta(nil);

SocketServer = {}
setmetatable(SocketServer, {
  __call = function(self, ...)
    return self:create(...);
  end,
});

SocketServer_mt = {
  __index = SocketServer;
}

SocketServer.init = function(self, socket, datafunc)
--print("SocketServer.init: ", socket, datafunc)
  local obj = {
    ServerSocket = socket;
    OnData = datafunc;
  };

  setmetatable(obj, SocketServer_mt);

  return obj;
end

SocketServer.create = function(self, port, datafunc)
  port = port or 9090;

  local socket, err = IOProcessor:createServerSocket({port = port, backlog = 15});

  if not socket then
    print("Server Socket not created!!")
    return nil, err
  end

  return self:init(socket, datafunc);
end

-- The primary application loop
SocketServer.loop = function(self)
  local bufflen = 1500;
  local buff = ffi.new("uint8_t[?]", bufflen);

  while true do
    local sock, err = self.ServerSocket:accept();

    if sock then
      local socket = IOCPSocket:init(sock, IOProcessor);
      local bytesread, err = socket:receive(buff, bufflen);

      if not bytesread then
        print("RECEIVE ERROR: ", err);
      elseif self.OnData ~= nil then
        self.OnData(socket, buff, bytesread);
      else
        socket:closeDown();
        socket = nil
      end
    else
       print("Accept ERROR: ", err);
    end

    collectgarbage();
  end
end

SocketServer.run = function(self, datafunc)
  if datafunc then
    self.OnData = datafunc;
  end

  IOProcessor:spawn(self.loop, self));
  IOProcessor:run();
end

return SocketServer;

This basic server loop is good for a lot of little tiny tasks where you just need to put a listener on the front of something. No massive scaleout, not multi-threading, just good straight forward stuff. But, it’s already plumbed to go big too.

Here’s a slight modification:

SocketServer.handleAccepted = function(self, sock)
  local handleNewSocket = function()
    local bufflen = 1500;
    local buff = ffi.new("uint8_t[?]", bufflen);
    
    local socket = IOCPSocket:init(sock, IOProcessor);

    if self.OnAccepted then
    else
      local bytesread, err = socket:receive(buff, bufflen);
  
      if not bytesread then
        print("RECEIVE ERROR: ", err);
      elseif self.OnData ~= nil then
        self.OnData(socket, buff, bytesread);
      else
        socket:closeDown();
        socket = nil
      end
    end
  end

  return IOProcessor:spawn(handleNewSocket);
end

-- The primary application loop
SocketServer.loop = function(self)

  while true do
    local sock, err = self.ServerSocket:accept();

    if sock then
      self:handleAccepted(sock);
    else
       print("Accept ERROR: ", err);
    end

    collectgarbage();
  end
end

In the main loop, instead of doing the processing directly, call the ‘self:handleAccepted()’ function. That function in turn will spawn an internal function to actually handle the request. Everything else remains the same.

If you do it this way, then the ‘OnData’ will run cooperatively with other accepts that might be going on. Also, this highlights, in an invisible way, that the ‘accept()’ call is actually cooperative. Meaning, since IO completion ports are being used int he background, the accept call is actually async. As soon as it issues the accept, that coroutine will wait in place until another socket comes in. Meanwhile, the last socket that was being handled will get some time slice to do what it wants.

And thus, you get massive scale (thousands of potential connections) from using this fairly simple code.

Well, those are the basics. Now that I have plumbed TINN from the ground up to utilize the IO Completion Ports, I can start to build upon that. There are a couple of nice benefits to marrying IOCP and Lua coroutines. I’ll be exploring this some more, but it’s basically a match made in heaven.