Optimize At the Right Time and Level

It takes a long time to do the simplest things correctly. You want to receive some bytes on a socket?
int recv(SOCKET s,char *buf,int len,int flags);

No problem right? How about that socket? How did that get setup? Is it right for low latency, big buffers, small buffers? what? And what about that buffer? and lord knows which are the best flags to use? First question I guess is, ‘for what purpose’?

For the longest time, I have known that my IP sockets were not the most performant. Through rounds of trying to get low level networking, all the way up through http handling working correctly, I deliberately made some things extremely brain dead simple.

One of those things was my readline() function. this is the workhorse of http processing. Previously, I would literally read a single character at a time, looking for those line terminators. Which meant making that recv() call a bazillion times during normal http processing.

Well, it has worked great, reduces the surface area to help flesh out various bugs, and allowed me to finally implement the IO Completion Port based networking stack that I now have. It works well enough. But, it’s time to optimize. I am now about to add various types of streams such as compressed, encrypted, and the like. For that I need to actually buffer up chunks at a time, not read single characters at a time.

So, I finally optimized this code.

function IOCPNetStream:refillReadingBuffer()
	print("IOCPNetStream:RefillReadingBuffer(): ",self.ReadingBuffer:BytesReadyToBeRead());

	-- if the buffer already has data in it
	-- then just return the number of bytes that
	-- are currently ready to be read.
	if self.ReadingBuffer:BytesReadyToBeRead() > 0 then
		return self.ReadingBuffer:BytesReadyToBeRead();
	end

	-- If there are currently no bytes ready, then we need
	-- to refill the buffer from the source
	local err
	local bytesread

	bytesread, err = self.Socket:receive(self.ReadingBuffer.Buffer, self.ReadingBuffer.Length)

	print("-- LOADED BYTES: ", bytesread, err);

	if not bytesread then
		return false, err;
	end

	if bytesread == 0 then
		return false, "eof"
	end

	self.ReadingBuffer:Reset()
	self.ReadingBuffer.BytesWritten = bytesread

	return bytesread
end

Basically, whenever some bytes are needed within the stream, the refillReadingBuffer() function is called. If there are still bytes sitting in the buffer, then those bytes are used. If it’s empty, then new bytes are read in. The buffer can be whatever size you want, so if your app is better off reading 1500 bytes at a time, then make it so. If you find performance is best at 4096 bytes per read, then set it up that way.

Now that pre-fetching is occuring, all operations related to reading bytes from the source MUST go through this buffer, or bytes will be missed. So, the readLine() function looks like this now:

function IOCPNetStream:readByte()
	-- First see if we can get a byte out of the
	-- Reading buffer
	local abyte,err = self.ReadingBuffer:ReadByte()

	if abyte then
		return abyte
	end

	-- If we did not get a byte out of the reading buffer
	-- try refilling the buffer, and try again
	local bytesread, err = self:refillReadingBuffer()

	if bytesread then
		abyte, err = self.ReadingBuffer:ReadByte()
		return abyte, err
	end

	-- If there was an error
	-- then return that error immediately
	print("-- IOCPNetStream:readByte, ERROR: ", err)
	return false, err
end

function IOCPNetStream:readOneLine(buffer, size)
	local nchars = 0;
	local ptr = ffi.cast("uint8_t *", buff);
	local bytesread
	local byteread
	local err

	while nchars < size do
		byteread, err = self:readByte();

		if not byteread then
			-- err is either "eof" or some other socket error
			break
		else
			if byteread == 0 then
				break
			elseif byteread == LF then
				--io.write("LF]\n")
				break
			elseif byteread == CR then
				-- swallow it and continue on
				--io.write("[CR")
			else
				-- Just a regular character, so save it
				buffer[nchars] = byteread
				nchars = nchars+1
			end
		end
	end

	if err and err ~= "eof" then
		return nil, err
	end

	return nchars
end

The readLine() is still reading a byte at a time, but this time, instead of going to the underlying socket directly, it’s calling the stream’s version of readByte(), which in turn is reading a single byte from the pre-fetched buffer. That’s a lot faster than making the recv() system call, and when the buffer runs out of bytes, it will be automatically refilled, until there is a socket error.

Well, that’s just great. The performance boost is one thing, but there is another benefit. Now that I am reading in controllable chunks at a time, I could up the size to say 16K for the first chunk read for an http request. That would likely get all the data for whatever headers there are included in the buffer up front. From there I could do simple pointer offset/size objects, instead of creating actual strings for every line (creates copies). That would be a great optimization.

I can wait for further optimizations. Getting the pre-fetch working, and continuing to work with IOCP is a more important optimization at this time.

And so it goes. Don’t optimize too early until you really understand the performance characteristics of your system. Then, only belatedly, do what minimal amount you can to achieve the specific goal you are trying to achieve before moving on.


Exposing Yourself on the Interwebs – Baby Steps

I have begun a little experiement. Over the past year, I have written quite a bit of code related to networking. I have prototyped a lot of different things, and actually used some of it in a production environment. I have written http parser, websocket implementation, xml parser, and myriad wrappers for standard libraries.

So, now the experiment. I want to expose a few web services, running from my home, running on nothing but code that I have written (except for core OS code). How hard could it be?

Yesterday, I packaged up a bit of TINN and put it on my desktop machine to run a very simple http static content server. It’s a Windows box, and of course I could simply run IIS, but that’s a bit of a cheat. So, I started the service:

tinn main.lua

And that’s that. According to my intentions, the only content that should be served up is stuff that’s sitting within the ‘./wwwroot’ directory relative to where I started the service running. This is essentially the server that I outlined previously.

I am an average internet consumer when it comes to home network setup. I have an ASUS router that’s pretty fast and decent with its various security holes and strengths. At home I am sitting behind it’s “firewall” protection. But, I do want to expose myself, so what do I do?

Well, I must change the configuration on the router. First of all, I need to get a DNS entry that will point a certain URL to my router. Luckily, the ASUS router has a dynamic DNS service built right in. So, I choose a name (I’ll show that later), and simply select a button, and “Apply”. OK. Now my router is accessible on a well known url/ip: chosenname.asuscomm.com I confirm this by typing that into my we browser, and sure enough, I can connect to my browser over the internet. I am prompted for the admin password, and I’m in!

So, the first scary thought is, I hope I chose a password that is relatively strong. I hope I didn’t use the default ‘password’, like so many people do.

Alright. Now I know my router, and thus my network in general, can be accessed through a well known public url. The next thing I need to do is set a static IP address for my web server machine. This isn’t strictly necessary, but as I’m about to enable port forwarding, it will just be easier to use a static IP within my home domain. I set it up as: 192.168.1.4 The HP printer is 1, the Synology box is 2, and everything else gets random numbers.

Next is port forwarding. What I want is to have the web server machine, which is listening on port 8080, receive any traffice coming from the well known url headed to port 8080. I want the following URL to land on this machine and be handled by the web server code that’s running:

http://chosenname.asuscomm.com:8080/index.htm

So, I set that configuration in the router, and press ‘Apply’…

Back to my browser, type in that URL and voila! It works!

Now I take a pause at this point and ask myself a few questions. First of all, am I really confident enough in my programming skills to expose myself to the wide open internet like this? Second, I ask myself if my brother, or mother could have worked their way through a similar exercise?

Having gotten this far, I’m feeling fairly confident, so I let it run overnight to see what happens. Mind you, I’m not accessing it myself at night, but I wanted to see what would happen just having my router and server hanging out there on the internet.

I cam back in the morning, and checked the console output to see what happened, if anything. What I saw was this:

NO RESPONSE BODY: ./wwwroot/HNAP1

Hah! It happened twice, then never more. Well, that HNAP1 trick is a particular vulnerability to home routers which are configured by default to do automatic configuration stuff. D-Link routers, in particular, are vulnerable to an attack whereby they can be compromised through a well scripted Soap exchange, starting from here.

I’ve turned off that particular feature of my router, so, I think I luckily dodged that particular bullet.

The funny thing is though, I didn’t advertise my url, and I didn’t tell anyone that there would be an http server hanging out on port 8080. This happened within 8 hours of my service going live. So, it tells you what a teaming pool hackedness the internet truly is.

The other thing I have learned thus far is that I need a nice logging module. I just so happen to be printing out the URL of each request that comes in, but I should like to have the IP address of the requester, and some more interesting information that you typically find in web logs. So, I’ll have to add that module.

Having started down this path, I have another desire as well. My desktop machine is way too loud, and consumes too much power to be an always on web server. So, I’ve ordered the parts to build a nice Shuttle PC which will serve this purpose. It’s a decent enough machine. 256Gb SSD, i7, onboard video. I don’t need it to be a gaming rig, nor an HTPC, nor serve any other purpose. It just needs to run whatever web services I come up with, and it must run Windows. This goes towards the purpose built argument I made about the Surface 2. A machine specific to a specific job, without concern for any other purpose it might have. You could argue that I should just purchase a router that has a built in web server, or just use the Synology box, which will do this just fine. But, my criteria is that I want to write code, tinker about, and it must run Windows.

And so it begins. I’ve got the basic server up and running, and I’m already popular enough to be attacked. Now I am confident to add some features and content over time to make it more interesting.


How About that Web Server Again?

Has it really been more than a month?

Well, you see, I bought a house, packed, moved, took daughter to college, and…

The last time I wrote about making a simple WebServer, I left the job half finished. I showed that through layering, from a basic socket server, you could then build a simple http handler, which had not much more than an ‘OnRequest’ function.

That works great, and gives you the right level of access if you’re going to do things with http in general, that are beyond simple static file serving. But what if your interactions are more complex than you care to deal with using simple ‘if-then-else’ logic.

This time around, I’m going to use a new and improved form of the WebApp which first of all deals with the new and improved IOCP based sockets, and leverages the HttpServer object of yore.

Here is the basic web server:

-- main.lua
local WebApp = require("WebApp")
local resourceMap = require("ResourceMap");

local port = arg[1] or 8080

local app = WebApp(resourceMap, port);
app:run();

And you simply invoke it with:

tinn main.lua 8080

Well, that seems easy enough. If you wanted to get real crazy, and win an obfuscated code competition, you could do it in one line:

-- main.lua
require("WebApp")(require("ResourceMap"), arg[1] or 8080):run()

Or, if you’re a real Lua head, you could do it all on the command line, without even using the ‘main.lua’ file:

tinn -e "require('WebApp')(require('ResourceMap'), arg[1] or 8080):run()"

That’s all fun and games, and it’s cool to see that you can implement a web server in a single command line. But what’s in that magical ResourceMap file?

local URL = require("url");
local StaticService = require("StaticService");

local function replacedotdot(s)
  return string.gsub(s, "%.%.", '%.')
end

local HandleFileGET = function(request, response)
  local absolutePath = replacedotdot(URL.unescape(request.Url.path));
  local filename = './wwwroot'..absolutePath;

  StaticService.SendFile(filename, response)
  return false;
end

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
  };
}

return ResourceMap;

The idea here is that we want to route any ‘GET’ methods to the HandleFileGET() function. There is a ResourceMapper object within tinn that utilizes a structurce such as the one in ResourceMap. The general layout is {[pattern] = {name=””, METHOD = function [,METHOD = function]},}

Using this simple mechanism, you can easily route the handling of a particular resource request to a particular function.

This table can have entries that are much more interesting. For example, if you want to handle the retrieval of ‘favicon.ico’ differently than other files, you can just add a specific mapping for that.

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

You could have multiple methods per resource as well:

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
    POST = HandleFilePOST,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

In general, the longest prefix match algorithm is applied to whatever is supplied within the resource field of the request. If you want to deal with all methods of a particular resource, without having to specify them explicitly, then you can use the magic method ‘_DEFAULT’.

local ResourceMap = {
  ["/"] = {name="/",
    GET = HandleFileGET,
    POST = HandleFilePOST,
    _DEFAULT = HandleFileDEFAULT,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

In this way, if there is a request for a resource, and a method that we don’t know about at the time of creating the map, the HandleFileDEFAULT() function will be called to deal with it. That might be handy in the case where you’d like to handle these unknown method requests in a generic way, or you might want to change it over time without having to change your mapping.

Another case is when the resource isn’t actually a resource. Take for example a ‘CONNECT’ request:

CONNECT localhost:9000

In this case, the ‘resource’ does not start with a ‘/’, so the longest prefix match won’t land it on anything in our map. I need to deal with these with a different pattern. Well, the pattern part of the map is nothing more than a standard pattern in the Lua sense of the word, so a ‘.’ will l match any character. The following map will do the trick.

local ResourceMap = {
  ["."] = {name=".",
    CONNECT = HandleCONNECT,
  };

  ["/"] = {name="/",
    GET = HandleFileGET,
    POST = HandleFilePOST,
  };

  ["/favicon.ico"] = {name="/favicon.ico",
    GET = HandleFaviconGET,
  };
}

In this way, we’ll deal with the CONNECT method if it shows up.

This is an affirmative list. If there is a match to one of the patterns, then the mapped function is executed. If no pattern is found, either because the resource itself does not match, or because the resource does not have a function to handle the specified method, then a general 404 error is returned.

That’s about all there is to it. Create a mapping between URLs and functions, and you’re all done. Of course there’s the function itself:

local URL = require("url");
local StaticService = require("StaticService");

local function replacedotdot(s)
  return string.gsub(s, "%.%.", '%.')
end

local HandleFileGET = function(request, response)
  local absolutePath = replacedotdot(URL.unescape(request.Url.path));
  local filename = './wwwroot'..absolutePath;
	
  StaticService.SendFile(filename, response)

  return false;
end

Not a lot of magic in this particular one. First of all, there’s that simplistic ‘replacedotdot()’ function. That’s just a casual attempt to restrict the file access to the directory of our choosing. In HandleFileGET, the first thing to do is urldecode the path specified, then feed that to the replacedotdot() function. Then, take whatever comes out of that, and prepend it with ‘./wwwroot’, and finally feed that to the StaticService.SendFile() function, which is a standard part of TINN.

The return value of this function has meaning. If you return false, or ‘nil’, then the socket representing this particular request will be recycled, assuming there is potentially another request coming on the same socket.

If you instead return ‘true’, then the system assumes you are handling any subsequent recycling, or closing, and it will not take any further action with the underlying socket.

This is a nice feature in that it allows you to construct much more interesting interactions than the standard request/response, without much fuss. For example, you could easily open up websockets, upgrade connections in general, or do whatever you want.

At any rate, with TINN, you can create a simple web server in a single command line. I find that to be a fairly useful thing.


Hurry Up and Wait – TINN Timing

Moving right along. First I needed to do basic networking. Starting at the lowest level of socket interaction, advancing up the stack through specific TCP and HTTP uses. Then back down to UDP.

With basic async networking covered, the next thing that comes up is timing. The general socket IO is covered. You can basically build an entire service using nothing more than asynchronous socket calls. But, most servers are more interesting than that. There are situations where you’ll want to cancel out an async operation if it’s taking too long, or you might want to perform some operation over time, repeatedly. So, clearly the TINN system needs some concept of time management.

Here’s the kind of code I would like to write in order to do something every 500 milliseconds:

require ("IOProcessor");

local test_wait = function(interval)
  while true do
    wait(interval);
    print(string.format("interval: %d", IOProcessor.Clock:Milliseconds()));
  end
end

run(test_wait)

Basically, there is a new ‘wait()’ function. You give it a number of milliseconds, and it will suspend the coroutine you’re currently in for the given amount of time. This capability comes courtesy of some changes to the base scheduler. The changes are the following:

wait = function(millis)
  local nextTime = IOProcessor.Clock:Milliseconds() + millis;
  return IOProcessor:yieldUntilTime(nextTime);
end

IOProcessor.yieldUntilTime = function(self, atime)
  if self.CurrentFiber ~= nil then
    self.CurrentFiber.DueTime = atime;
    tabutils.binsert(self.FibersAwaitingTime, self.CurrentFiber, compareTaskDueTime);

    return self:yield();
  end
  return false;
end

The yieldUntilTime() function will take the currently running fiber (coroutine) and put it into the list of FibersAwaitingTime. This is simply a table which is maintained in sorted order from lowest to highest. Once a fiber is placed on this list, it is no longer in the list of currently active fibers. it will sit on this list until it’s DueTime has passed.

The main scheduling loop will step through the fibers that are sitting in the AwaitingTime list using the following:

IOProcessor.stepTimeEvents = function(self)
  local currentTime = self.Clock:Milliseconds();

  -- traverse through the fibers that are waiting
  -- on time
  local nAwaiting = #self.FibersAwaitingTime;
  for i=1,nAwaiting do

    local fiber = self.FibersAwaitingTime[1];
    if fiber.DueTime <= currentTime then
      -- put it back into circulation
      fiber.DueTime = 0;
      self:scheduleFiber(fiber);

      -- Remove the fiber from the list of fibers that are
      -- waiting on time
      table.remove(self.FibersAwaitingTime, 1);
    end
  end
end

Basically, step through the list of fibers that are waiting for their wait time to expire. For all those that qualify, put them back into the list of active fibers by calling the ‘shcheduleFiber()’ function.

This begins to get very interesting I think. Of course once you create a timer, or even async i/o, you probably also want the ability to cancel such operations. But, that’s another matter.

Doing this little exercise, the scheduler begins to take on some more of the attributes of what you might find in the core OS. The big difference is that everything is done in the Lua space, and does not rely on OS primitives, so it is actually somewhat portable to other architectures. That’s a nice idea to have such a nice scheduler available across multiple LuaJIT environments.

While going through this exercise, I also started looking at other features of schedulers, thinking about priorities, and other factors that might go into a really good scheduler. So, at least one thing becomes apparent to me. Having this scheduler readily available and transformable in the Lua space makes it relatively easy to try out different scheduling techniques that will match the particular situation at hand. There is even the possibility of changing the scheduling algorithm dynamically based on attributes of the running system.

Exciting times.


Creating Udp Echo Service in TINN

These days, networking applications utilize more than a single protocol at a time. My current server, which is a software router of sorts, needs to support TCP/IP as well as Udp channels at the same time. On top of the TCP is HTTP, but that’s already been covered.

Here I present the support for the Udp protocol. Udp differs from TCP in a few ways, key being the lack of “connection”. Every single packet is individually addressed and sent to the intended recipient. Of course you can cache the DNS lookup, so that the delivery of the packets themselves is blazing fast. There’s no redundancy, no ack, no error recovery.

When TCP/IP/UDP were first created, the error rate was probably much higher than it is today. These days, depending on the network, Udp might be a perfectly reasonable choice. The trick, from a TINN perspective, is to make programming with either protocol look relatively the same. For the most part, this just means using the same mechanism for async calls.

Here’s what the server code looks like, minus the error recovery logic:

local ffi = require("ffi");

local IOProcessor = require("IOProcessor");
local IOCPSocket = require("IOCPSocket");

-- Setup the server socket
local socket, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
local success, err = socket:bindToPort(9090);

-- Setup buffers to be used to receive data
local bufflen = 1500;
local buff = ffi.new("uint8_t[?]", bufflen);
local from = sockaddr_in();
local fromLen = ffi.sizeof(from);


-- The primary application loop
local loop = function()

  while true do
    local bytesread, err = socket:receiveFrom(from, fromLen, buff, bufflen);

    if not bytesread then
      return false, err;
    end

    -- echo back to sender
    local bytessent, err = socket:sendTo(from, fromLen, buff, bufflen);
  end
end

run(loop);

And that’s about all there is to it. In this particular case, a single packet is received, and that packet is immediately sent back to whomever sent it. In this The receiveFrom(), and sendTo(), do in fact use IOCompletion ports, and, for a more complex server that actually does work, you might formulate this differently, utilizing multiple receive buffers, and spawning a task for each packet. But, this is the most basic form of doing Udp handling with TINN.

The socket:receiveFrom() implementation is pretty much the same as that for socket:receive(), except for the addition of the address information so you can see who sent the message, and so you can subsequently return the packet to the source.

This code is not particularly hard, and if you were programming in ‘C’, it would look pretty much the same. The key benefit though comes from the automatic semi-concurrency which is possible, without really changing the code that much. This is what makes it easier to integrate and handle.


How About that Web Server

Although echo servers are the “Hello, World” of network programs, http servers are much more useful.

local HttpServer = require("HttpServer");
local StaticService = require("StaticService");

-- a simple ping template
local pingtemplate = [[
<html>
  <head>
    <title>HTTP Server</title>
  </head>
  <body>ping</body>
</html>
]]

-- Called every time there is a new http request
local OnRequest = function(param, request, response)
  if request.Url.path == "/ping" then
    response:writeHead("200")
    response:writeEnd(pingtemplate);
  else
    local filename = './wwwroot'..request.Url.path;
    StaticService.SendFile(filename, response);
  end
end

local server = HttpServer(8080, OnRequest);
server:run();

This looks a little bit like the echo service which was based on the raw socket server. Instead of “OnAccept”, this one implements “OnRequest”. The ‘request’ is an instance of a ‘WebRequest’ object, which contains all the stuff you’d expect in a WebRequest (resource, headers…). The routine is also handed a ‘WebResponse’ object as well. This is a simple convenience because it just wraps the netstream that is associated with the request object.

The HttpServer code itself looks like this:

local SocketServer = require("SocketServer")

local IOCPSocket = require("IOCPSocket")
local IOCPNetStream = require("IOCPNetStream");
local WebRequest = require("WebRequest");
local WebResponse = require("WebResponse");
local URL = require("url");

HttpServer = {}
setmetatable(HttpServer, {
  __call = function(self, ...)
    return self:create(...);
  end,
});

HttpServer_mt = {
  __index = HttpServer;
}

HttpServer.init = function(self, port, onRequest, onRequestParam)
  local obj = {
    OnRequest = onRequest;
    OnRequestParam = onRequestParam;
  };
  setmetatable(obj, HttpServer_mt);
	
  obj.SocketServer = SocketServer(port, HttpServer.OnAccept, obj);

  return obj;
end

HttpServer.create = function(self, port, onRequest, onRequestParam)
  return self:init(port, onRequest, onRequestParam);
end


HttpServer.OnAccept = function(self, sock)
  local socket = IOCPSocket:init(sock, IOProcessor);
  local stream, err = IOCPNetStream:init(socket);

  if self.OnRequest then
    local request, err  = WebRequest:Parse(stream);

    if request then
      request.Url = URL.parse(request.Resource);
      local response = WebResponse:OpenResponse(request.DataStream)
      self.OnRequest(self.OnRequestParam, request, response);
    else
      print("HandleSingleRequest, Dump stream: ", err)
    end
  else
    -- do nothing and let the socket close
  end
end

HttpServer.run = function(self)
  return self.SocketServer:run();
end

return HttpServer;

The ‘OnAccept()’ function this time around takes the unadorned socket, wraps it into a nicer socket object (so the io completion stuff can happen), and then uses the HttpRequest object to parse what’s on the stream. If the request is found the be intact, a response object is created and the two are handed off to the ‘OnRequest’ function if it exists.

This construct allows you to compose a webserver to meet your needs. You can spawn wherever you want, to run whichever part you want to run in parallel. At the top end, the consumer of this object won’t know the different, and can thus just handle the individual requests.

So, what’s so good about all this?
Well, first of all the TINN runtime, all up, is about 3Mb.
What you get for that is access to pretty much all the interesting stuff that Windows APIs have to offer. Whether it be network, OS, graphics, crypto, or multi-thread related, it’s all available right there in the little package.

This is good when you want to start creating simple REST based web services for this and that. For example, if you want to expose a webcam feed from your PC, or your PC acts as a hub for various wireless “internet of things” devices around your home, or whatever, you just write some lua code, without worrying about interop libraries, compiling, or anything else more interesting. Just a little script and away you go.


Name That Framework – Echo Service in two lines of code

… and here they are:

SocketServer = require("SocketServer");
SocketServer(9090):run(function(s, b, l)  s:send(b, l); end);

Back in the day there was this gameshow called “Name That Tune”, where contestants would be told a clue about a song, then they would bid on the fewest number of notes it would take for them to name the tune. Once the bids were fixed, the orchestra would play the number of notes, and the contestant would have to correctly guess the name of the tune.

So, above are two lines of code which implement a highly scalable “echo” service. Can you name the framework?

It’s TINN of course!

Here’s a more reasonable rendition of the same:

local SocketServer = require("SocketServer");

local function OnData(socket, buff, bufflen)
  socket:send(buff, bufflen);
end;

local server = SocketServer(9090);
server:run(OnData)

Simply put, a SocketServer is a generic service that will listen on a particular port that you specify. Whenever it receives any data on the port, it will call the supplied ‘OnData’ function. Each time ‘OnData’ is called, it could be with a different socket and data. You could build a fairly rudimentary http server on top of this if you like. what’s most important to me is the fact that you don’t have to write any of the underlying low level networking code. Nothing about accept, IO Completion ports, etc. Just, call me when some data comes in on this specified port.

The SocketServer code itself looks like this:

local ffi = require("ffi");
local IOProcessor = require("IOProcessor");
local IOCPSocket = require("IOCPSocket");

IOProcessor:setMessageQuanta(nil);

SocketServer = {}
setmetatable(SocketServer, {
  __call = function(self, ...)
    return self:create(...);
  end,
});

SocketServer_mt = {
  __index = SocketServer;
}

SocketServer.init = function(self, socket, datafunc)
--print("SocketServer.init: ", socket, datafunc)
  local obj = {
    ServerSocket = socket;
    OnData = datafunc;
  };

  setmetatable(obj, SocketServer_mt);

  return obj;
end

SocketServer.create = function(self, port, datafunc)
  port = port or 9090;

  local socket, err = IOProcessor:createServerSocket({port = port, backlog = 15});

  if not socket then
    print("Server Socket not created!!")
    return nil, err
  end

  return self:init(socket, datafunc);
end

-- The primary application loop
SocketServer.loop = function(self)
  local bufflen = 1500;
  local buff = ffi.new("uint8_t[?]", bufflen);

  while true do
    local sock, err = self.ServerSocket:accept();

    if sock then
      local socket = IOCPSocket:init(sock, IOProcessor);
      local bytesread, err = socket:receive(buff, bufflen);

      if not bytesread then
        print("RECEIVE ERROR: ", err);
      elseif self.OnData ~= nil then
        self.OnData(socket, buff, bytesread);
      else
        socket:closeDown();
        socket = nil
      end
    else
       print("Accept ERROR: ", err);
    end

    collectgarbage();
  end
end

SocketServer.run = function(self, datafunc)
  if datafunc then
    self.OnData = datafunc;
  end

  IOProcessor:spawn(self.loop, self));
  IOProcessor:run();
end

return SocketServer;

This basic server loop is good for a lot of little tiny tasks where you just need to put a listener on the front of something. No massive scaleout, not multi-threading, just good straight forward stuff. But, it’s already plumbed to go big too.

Here’s a slight modification:

SocketServer.handleAccepted = function(self, sock)
  local handleNewSocket = function()
    local bufflen = 1500;
    local buff = ffi.new("uint8_t[?]", bufflen);
    
    local socket = IOCPSocket:init(sock, IOProcessor);

    if self.OnAccepted then
    else
      local bytesread, err = socket:receive(buff, bufflen);
  
      if not bytesread then
        print("RECEIVE ERROR: ", err);
      elseif self.OnData ~= nil then
        self.OnData(socket, buff, bytesread);
      else
        socket:closeDown();
        socket = nil
      end
    end
  end

  return IOProcessor:spawn(handleNewSocket);
end

-- The primary application loop
SocketServer.loop = function(self)

  while true do
    local sock, err = self.ServerSocket:accept();

    if sock then
      self:handleAccepted(sock);
    else
       print("Accept ERROR: ", err);
    end

    collectgarbage();
  end
end

In the main loop, instead of doing the processing directly, call the ‘self:handleAccepted()’ function. That function in turn will spawn an internal function to actually handle the request. Everything else remains the same.

If you do it this way, then the ‘OnData’ will run cooperatively with other accepts that might be going on. Also, this highlights, in an invisible way, that the ‘accept()’ call is actually cooperative. Meaning, since IO completion ports are being used int he background, the accept call is actually async. As soon as it issues the accept, that coroutine will wait in place until another socket comes in. Meanwhile, the last socket that was being handled will get some time slice to do what it wants.

And thus, you get massive scale (thousands of potential connections) from using this fairly simple code.

Well, those are the basics. Now that I have plumbed TINN from the ground up to utilize the IO Completion Ports, I can start to build upon that. There are a couple of nice benefits to marrying IOCP and Lua coroutines. I’ll be exploring this some more, but it’s basically a match made in heaven.


Computicles – Simplifying Chaining

Using the “exec()” method, I can chain computicles together, but the code is kind of raw. There is one addition I left out of the equation last time around, and that the hydrate/rehydrate of a computicle variable itself. Now it looks like this:

elseif dtype == "table" then
  if getmetatable(data) == Computicle_mt then
    -- package up a computicle
    datastr = string.format("Computicle:init(TINNThread:StringToPointer(%s),TINNThread:StringToPointer(%s));", 
      TINNThread:PointerToString(data.Heap:getNativeHandle()), 
      TINNThread:PointerToString(data.IOCP:getNativeHandle()));
  elseif getmetatable(data) == getmetatable(self.IOCP) then
    -- The data is an iocompletion port, so handle it specially
    datastr = string.format("IOCompletionPort:init(TINNThread:StringToPointer(%s))",
      TINNThread:PointerToString(data:getNativeHandle()));
  else
    -- get a json string representation of the table
    datastr = string.format("[[ %s ]]",JSON.encode(data, {indent=true}));
  end

This is just showing the types that are descended from the ‘table’ type. This includes IOCompletionPort, and ‘Computicle’. Anything other than these two will be serialized as fairly straight forward key/value pair tables. With this in place, I can now do the following.

-- Setup the splitter that will dispatch to the leaf nodes
local splitter = Computicle:load("comp_splittercode");

-- Setup the source, which will be originating messages
local source = Computicle:load("comp_sourcecode");
source.sink = splitter;

The code for the comp_sourcecode computicle looks like this:


RunOnce = function(sink)
  print("comp_sourcecode, RunOnce...")
  for i = 1, 10 do
    sink:postMessage(i);
  end
end

OnIdle = function(counter)
  print("comp_sourcecode, OnIdle(): ", counter)
  if sink ~= nil then 
    RunOnce(sink);

    sink = nil;
    exit();
  end
end

This computicle implements the OnIdle() function, as it will use that to determine when I can send messages off to the sink. If the sink hasn’t been set yet, then it will do nothing. If it has been set, then it will execute the ‘RunOnce()’ function, sending the sink a bunch of messages.

After performing the RunOnce() task, the computicle will simply exit(), which == SELFICLE:quit() == SELFICLE:postMessage(Computicle.Messages.QUIT);

And that’s the end of that computicle. The splitter has become much simpler as well.

local ffi = require("ffi");


OnMessage = function(msg)
	msg = ffi.cast("ComputicleMsg *", msg);
	local Message = msg.Message;
--print("comp_splittercode, OnMessage(): ", Message);

	if sink1 then
		sink1:postMessage(Message);
	end	

	if sink2 then
		sink2:postMessage(Message);
	end
end

Here, the splitter assumes there are two sinks. If either of them exists, they will receive the message that was passed into the splitter. If they don’t exist, then the message will be dropped. Of course, other behaviors, such as holding on the messages, could be implemented as well.

In this case, the computicle is not exited internally. This is because the splitter itself does not know when it is finished, all it knows is that when it receives a message, it is supposed to pass that message on to its two sinks.

The sink code is equally simple.

local ffi = require("ffi");

OnMessage = function(msg)
  msg = ffi.cast("ComputicleMsg *", msg);
  local Message = msg.Message;

  print(msg.Message*10);
end

Here again, just implement the ‘OnMessage()’ function, and the comp_msgpump code will call it automatically. And again, the sink does not know when it is done, so it does not explicitly exit.

Pulling it all together, the exiting logic becomes more clear:

local Computicle = require("Computicle");

-- Setup the leaf nodes to receive messages
local sink1 = Computicle:load("comp_sinkcode");
local sink2 = Computicle:load("comp_sinkcode");


-- Setup the splitter that will dispatch to the leaf nodes
local splitter = Computicle:load("comp_splittercode");

splitter.sink1 = sink1;
splitter.sink2 = sink2;


-- Setup the source, which will be originating messages
local source = Computicle:load("comp_sourcecode");
source.sink = splitter;


-- Close everything down from the outside
print("FINISH source: ", source:waitForFinish());

-- tell the splitter to quit
splitter:quit();
print("FINISH splitter:", splitter:waitForFinish());

-- the two sinks will receive the quit message from the splitter
-- so, just wait for them to quit.
print("Finish Sink 1: ", sink1:waitForFinish());
print("Finish Sink 2: ", sink2:waitForFinish());

There is an order in which computicles are created, and assigned, which stitches them together. The computicles could actually be constructed in a “suspended state”, but that would not help matters too much.

At the end, the quit() sequence can clearly be seen. First, wait for the source to be finished. Then tell the splitter to quit(). This is not an interrupt, so whatever was in its queue previously will flow through before the QUIT is processed. Then finally, do the same thing on the sinks, after the splitter has finished.

All messages flow, and all processing finishes cleanly. So, this is one way to perform the exit mechanism from the outside, as well as from the inside.

Adding this bit of ease in programming did not change the fundamentals of the computicle. It still has a single communciation mechanism (the queue). It still performs computation, it still communicates with the outside world. The new code saves the error prone process of type casting and hydrating objects that the system already knows about. This in turn makes the usage pattern that much easier to deal with.

The comp_msgpump was added in a fairly straight forward way. When a computicle is constructed, there is a bit of code (a Prolog) that is placed before the user’s code, and a bit of code (Epilog) placed after it. Those two bits of code look like this:

Computicle = {
	Prolog = [[
TINNThread = require("TINNThread");
Computicle = require("Computicle");
IOCompletionPort = require("IOCompletionPort");
Heap = require("Heap");

exit = function()
    SELFICLE:quit();
end
]];

Epilog = [[
require("comp_msgpump");
]];

Computicle.createThreadChunk = function(self, codechunk, params, codeparams)
	local res = {};

	-- What we want to load before any other code is executed
	-- This is typically some 'require' statements.
	table.insert(res, Computicle.Prolog);


	-- Package up the parameters that may want to be passed in
	local paramname = "_params";

	table.insert(res, string.format("%s = {};", paramname));
	table.insert(res, self:packParams(params, paramname));
	table.insert(res, self:packParams(codeparams, paramname));

	-- Create the Computicle instance that is running within 
	-- the Computicle
	table.insert(res, 
		string.format("SELFICLE = Computicle:init(%s.HeapHandle, %s.IOCPHandle);", paramname, paramname));


	-- Stuff in the user's code
	table.insert(res, codechunk);

	
	-- What we want to execute after the user's code is loaded
	-- By default, this will be a message pump
	table.insert(res, Computicle.Epilog);

	return table.concat(res, '\n');
end

If an application needs a different prolog or epilog, it’s fairly easy to change them either by changing the file that is being read in, or by changing the string itself: Computicle.Prolog = [[print(“Hello, World”)]]

And so it goes. I’ve never had it so easy creating multi-threaded bits of computation and stitching those bits together.


Computicles – Unsafe, but fun, code injection

Computicles are capable of receiving bits of code to execute. That is in fact one way in which they communicate with each. From .
one context I can simple do: comp.someValue = 100, and that will set “someValue = 100” in another context. Well, great. How about sending something across that’s more substantial than a simple variable setting?

Well, with the Computicle.exec() method, which the previous example is based on, you can really send across any bit of code to be executed. It works, and you can even send across the body of a function like this:

comp:exec[[
function hello()
  print("Hello, World!");
end
]]

That works fine, and you have full access to all stuff that’s running in that context. It’s a bit clunky though because the entirety of your piece of code is wrapped up in a string value. This is very similar to executing SQL code on some server. The bulk of your code, if not in stored procedures”, ends up in these opaque strings. Hard to catch errors, hard to debug, etc. How about the following:

comp.hello = function()
  print("Hello, World!");
end

What that you say? Basically, assigning a function as a variable to the computicle. Yes, this can work. There is a slight modification to turn a function into a string, and it looks like this:

Computicle.datumToString = function(self, data, name)
	local dtype = type(data);
	local datastr = tostring(nil);

--print("DATUM TYPE: ", name, dtype);

	if type(data) == "cdata" then
		-- If it is a cdata type that easily converts to 
		-- a number, then convert to a number and assign to string
		if tonumber(data) then
			datastr = tostring(tonumber(data));
		else
			-- if not easily converted to number, then just assign the pointer
			datastr = string.format("TINNThread:StringToPointer(%s);", 
				TINNThread:PointerToString(data));
		end
	elseif dtype == "table" then
		if getmetatable(data) == Computicle_mt then
			-- package up a computicle
		else
			-- get a json string representation of the table
			datastr = string.format("[[ %s ]]",JSON.encode(data, {indent=true}));

			--print("=== JSON ===");
			--print(datastr)
		end
	elseif dtype == "function" then
		datastr = "loadstring([["..string.dump(data).."]])";
	elseif dtype == "string" then
		datastr = string.format("[[%s]]", data);
	else 
		datastr = tostring(data);
	end

	return datastr;
end

Notice the part that starts: ‘elseif dtype == “function()” then’.

Basically, there is a string function that turns anything into bytecode which can later be hydrated using loadstring later. It is placed in ‘loadstring([[…]])’, because this is the only way to preserve the actual 8-bit values. If you use string.format(“%s”), it will chop out the non-ascii stuff.

Then, this code can be executed in the remote context just the same as any other little bit of code. There are some restrictions to what kinds of functions you can do this with though. No ‘upvalues’, meaning, everything must be contained within the function itself, no calling out to global variables and the like.

Of course, that just gets the code over to the other side in a convenient way. What about actually executing the code?

comp:exec([[hello()]]);

Same as ever, just make the function call using “exec()”. Of course, this little bit could be wrapped up in some other convenient override, like using the ‘:’ notation, but there’s some more work to be done before making that a reality.

What other functions can be dealt with? Well, the loop running within the computicle looks like the following now:

-- comp_msgpump.lua

local ffi = require("ffi");

-- This is a basic message pump
-- 

-- default to 15 millisecond timeout
gIdleTimeout = gIdleTimeout or 15


local idlecount = 0;

while true do
  local msg, err = SELFICLE:getMessage(gIdleTimeout);
  -- false, WAIT_TIMEOUT == timed out
  --print("MSG: ", msg, err);

  if not msg then
    if err == WAIT_TIMEOUT then
      --print("about to idle")
      idlecount = idlecount + 1;
      if OnIdle then
        OnIdle(idlecount);
      end
    end
  else
  	local msgFullyHandled = false;

    if OnMessage then
      msgFullyHandled = OnMessage(msg);
    end

    if not msgFullyHandled then
      msg = ffi.cast("ComputicleMsg *", msg);
      local Message = msg.Message;
      --print("Message: ", Message, msg.Param1, msg.Param2);
		
      if Message == Computicle.Messages.QUIT then
        break;
      end

      if Message == Computicle.Messages.CODE then
        local len = msg.Param2;
        local codePtr = ffi.cast("const char *", msg.Param1);
		
        if codePtr ~= nil and len > 0 then
          local code = ffi.string(codePtr, len);

          SELFICLE:freeData(ffi.cast("void *",codePtr));

          local func = loadstring(code);
          func();
        end
      end
      SELFICLE:freeMessage(msg);
    end
  end
end

This new message pump has a couple of interesting new features. The first feature has to do with the getMessage(). It will timeout after some number of milliseconds have passed and a message hasn’t come in. If there is an ‘OnIdle()’ function defined, it will be called, passing that function the current count. If that function does not exist, then nothing will happen.

The second change has to do with message processing. If there is a “OnMessage()” function defined, it will be called, and the message will be handed to it for processing. If that function does not exist, then it will perform some default actions, such as reading and code, and possibly handling a “QUIT”.

So, how about that OnIdle? that’s a function that takes a parameter. Can I inject that? Sure can:

comp.OnIdle = function(count)
  print("IDLE", count)
end

Just the the hello() case, except this one can take a parameter. This does not violate the “no upvalues” rule, so it’s just fine.

In this case, I don’t have to actually execute the code, because the loop within the computicle will pick up on it and execute it. And if you wanted to remove this idling funcition, simple do: ‘comp.OnIdle = nil’

And then what?

Well, that’s pretty cool. Now I can easily set simple variables, and I can set table values, and I can even set functions to be called. I have a generic message pumping loop, which has embedded “OnIdle”, and “OnMessage” functions.

The last little bit of work to be done here is to deal with Asynchronous IO in a relatively seamless way using continuations, just like I had in the pre-computicle world. That’s basically a marriage of the core of the EventScheduler and the main even loop here.

Throw some authentication on top of it all and suddenly you have a system that is very cool, secure, and useful.


Computicles – Inter-computicle communication

Alrighty then, so a computicle is a vessel that holds a bit of computation power. You can communicate with it, and it can communicate with others.

Most computicles do not stand as islands unto themselves, so easily communicating with them becomes very important.

Here is some code that I want to be able to run:

local Computicle = require("Computicle");
local comp = Computicle:load("comp_receivecode");

-- start by saying hello
comp:exec([[print("hello, injector")]]);

-- queue up a quit message
comp:quit();

-- wait for it all to actually go through
comp:waitForFinish();

So, what’s going on here? The first line is a standard “require” to pull in the computicle module.

Next, I create a single instance of a Computicle, running the Lua code that can be found in the file “comp_receivecode.lua”. I’ll come back to that bit of code later. Suffice to say it’s running a simple computicle that does stuff, like execute bits of code that I hand to it.

Further on, I use the Computicle I just created, and call the “exec()” function. I’m passing a string along as the only parameter. What will happen is the receiving Computicle will take that string, and execute the script from within its own context. That’s actually a pretty nifty trick I think. Just imagine, outside the world of scripting, you can create a thread in one line of code, and then inject a bit of code for that thread to execute. Hmmm, the possibilities are intriguing methinks.

The tail end of this code just posts a quit, and then finally waits for everything to finish up. Just not that the ‘quit()’ function is not the same thing as “TerminateThread()”, or “ExitThread()”. Nope, all it does is post a specific kind of message to the receiving Computicle’s queue. What the thread does with that QUIT message is up to the individual Computicle.

Let’s have a look at the code for this computicle:

local ffi = require("ffi");

-- This is a basic message pump
-- 
while true do
  local msg = SELFICLE:getMessage();
  msg = ffi.cast("ComputicleMsg *", msg);
  local Message = msg.Message;

  if OnMessage then
    OnMessage(msg);
  else
    if Message == Computicle.Messages.QUIT then
      break;
    end

    if Message == Computicle.Messages.CODE then
      local len = msg.Param2;
      local codePtr = ffi.cast("const char *", msg.Param1);
		
      if codePtr ~= nil and len > 0 then
        local code = ffi.string(codePtr, len);

        SELFICLE:freeData(ffi.cast("void *",codePtr));

        local f = loadstring(code);
	f();
      end
    end
  end

  SELFICLE:freeMessage(msg);
end

It’s not too many lines. This little Computicle takes care a few scenarios.

First of all, if there so happens to be a ‘OnMessage’ function defined, it will receive the message, and the main loop will do no further processing of it.

If there is no ‘OnMessage’ function, then the message pump will handle a couple of cases. In case a ‘QUIT’ message is received, the loop will break and the thread/Computible will simply exit.

When the message == ‘CODE’ things get really interesting. The ‘Param1’ of the message contains a pointer to the actual bit of code that is intended to be executed. The ‘Param2’ contains the length of the specified code.

Through a couple of type casts, and an ffi.string() call, the code is turned into something that can be used with ‘loadstring()’, which is a standard Lua function. It will parse the string, and then when ‘f()’ is called, that string will actually be executed (within the context of the Computicle). And that’s that!

At the end, the ‘SELFICLE:freeMessage()’ is called to free up the memory used to allocate the outer message. Notice that ‘SELFICLE:freeData()’ was used to clean up the string value that was within the message itself. I have intimate knowledge of how this message was constructed, so I know this is the correct behavior. In general, if you’re going to pass data to a computicle, and you intend the Computicle to clean it up, you should use the computicle instance “allocData()” function.

OK. So, that explains how I could possibly inject some code into a Computicle for execution. That’s pretty nifty, but it looks a bit clunky. Can I do better?

I would like to be able to do the following.

comp.lowValue = 100;
comp.highValue = 200;

In this case, it looks like I’m setting a value on the computicle instance, but in which thread context? Well, what will actually happen is this will get executed within the computicle instance context, and be available to any code that is within the computicle.

We already know that the ‘exec()’ function will execute a bit of code within the context of the running computicle, so the following should now be possible:

comp:exec([[print(" Low: ", lowValue)]]);
comp:exec([[print("High: ", highValue)]])

Basically, just print those values from the context of the computile. If they were in fact set, then this should print them out. If there were not in fact set, then it should print ‘nil’ for each of them. On my machine, I get the correct values, so that’s an indication that they were in fact set correctly.

How is this bit of magic achieved?

The key is the Lua ‘__newindex’ metamethod. Wha? Basically, if you have a table, and you try to set a value that does not exist, like I did with ‘lowValue’ and ‘highValue’, the ‘__newindex()’ function will be called on your table if you’ve got it setup right. Here’s the associated code of the Computicle that does exactly this.

__newindex = function(self, key, value)
  local setvalue = string.format("%s = %s", key, self:datumToString(value, key));
  return self:exec(setvalue);
end

That’s pretty straight forward. Just create some string that represents setting whatever value you’re trying to set, and then call ‘exec()’, which is already known to execute within the context of the thread. So, in the case where I have written “comp.lowValue = 100”, this will turn into a sting that == “lowValue == 100”, and that string will be executed, setting a global variable ‘lowValue’ == 100.

And what is this ‘datumToString()’ function? Ah yes, this is the little bit that takes various values and returns their string equivalent, ready to be injected into a running Computicle.

Computicle.datumToString = function(self, data, name)
  local dtype = type(data);
  local datastr = tostring(nil);

  if type(data) == "cdata" then
      -- If it is a cdata type that easily converts to 
      -- a number, then convert to a number and assign to string
    if tonumber(data) then
      datastr = tostring(tonumber(data));
    else
      -- if not easily converted to number, then just assign the pointer
      datastr = string.format("TINNThread:StringToPointer(%s);", 
        TINNThread:PointerToString(data));
    end
  elseif dtype == "table" then
    if getmetatable(data) == Computicle_mt then
      -- package up a computicle
    else
      -- get a json string representation of the table
      datastr = string.format("[[ %s ]]",JSON.encode(data, {indent=true}));
    end
  elseif dtype == "string" then
    datastr = string.format("[[%s]]", data);
  else 
    datastr = tostring(data);
  end

  return datastr;
end

The task is actually fairly straight forward. Given a Lua based value, turn it into a string that can be executed in another Lua state. There are of course methods in Lua which will do this, and tons of marshalling frameworks as well. But, this is a quick and dirty version that does exactly what I need.

Of particular note are the handling of cdata and table types. For cdata, some of the values, such as ‘int64_t’, I want to just convert to a number. Tables are the most interesting. This particular technique will really only work for fairly simple tables, that do not make references to other tables and the like. Basically, turn the table into a JSON string, and send that across to be rehydrated as a table.

Here’s some code that actually makes use of this.

comp.contacts = {
  {first = "William", last = "Adams", phone = "(111) 555-1212"},
  {first = "Bill", last = "Gates", phone = "(111) 123-4567"},
}

comp:exec([=[
print("== CONTACTS ==")

-- turn contacts back into a Lua table
local JSON = require("dkjson");

local contable = JSON.decode(contacts);


for _, person in ipairs(contable) do
	--print(person);
	print("== PERSON ==");
	for k,v in pairs(person) do
		print(k,v);
	end
end
]=]);

Notice that ‘comp.contacts = …’ just assigns the created table directly. This is fine, as there are no other references to the table on ‘this’ side of the computicle, so it will be safely garbage collected after some time.

The rest of the code is using the ‘exec()’, so it is executing in the context of the computicle. It basically gets the value of the ‘contacts’ variable, and turns it back into a Lua table value, and does some regular processing on it (print out all the values).

And that’s about it. From the humble beginnings of being able to inject a bit of code to run in the context of an already running thread, to exchanging tables between two different threads with ease, Computicles make pretty short work of such a task. It all stems from the same three principles of Computicles: listen, compute, communicate. And again, not a single mutex, lock, or other visible form of synchronization. The IOCompletionPort is the single communications mechanism, and all the magic of serialized multi-threaded communication hide behind that.

Of course, ‘code injection’ is a dirty word around the computing world, so there must be a way to secure such transfers? Yah, sure, why not. I’ve been bumping around the indentity/security/authorization/authentication space recently, so surely something must be applicable here…