Jobs at Microsoft – Working on iOS and Android

Catchy title isn’t it.  Microsoft, where I am employed, is actually doing a fair bit of iOS and Android work.  In days of yore, “cross platform” used to mean “works on multiple forms of Windows”.  These days, it actually means things like iOS, Android, Linux, and multiple forms of Windows.

I am currently working in the Windows Azure Group.  More specifically, I am working in the area of identity, which covers all sorts of things from Active Directory to single sign on for Office 365.  My own project, the Application Gateway, has been quite an experience in programming with node.js, Android OS, iOS, embedded devices, large world scale servers, and all manner of legal wranglings to ship Open Source for our product.

Recently, my colleague Rich Randall came by and said “I want to create a group of excellence centered around iOS and Android development, can you help me?”.  Of course I said “sure, why not”, so here is this post.

Rich is working on making it easier for devices (non-windows specific) to participate in our “identity ecosystem”.  What does that mean?  Well, the job descriptions are here:

iOS Developer – Develop apps and bits of code to make it relatively easy to leverage the identity infrastructure presented by Microsoft.

Android Developer – Develop apps and bits of code to make it relative easy to leverage the identity infrastructure presented by Microsoft.

I’m being unfair, these job descriptions were well crafted and more precisely convey the actual needs.  But, what’s more interesting to me is to give a should out to Rich, and some support for his recruiting efforts.

As Microsoft is “in transition”, it’s worth pointing out that although we may be considered old and stodgy by today’s internet standards, we are still a hotbed of creativity, and actually a great place to work.  Rich is not alone in putting together teams of programmers who have non-traditional Microsoft skillsets.  Like I said, there are plenty that now understand that as a “services and devices” company, we can’t just blindly push the party line and platform components.  We have to meet the market where it is, and that is in the mobile space, with these two other operating systems.

So, if you’re interesting in leveraging your iOS and Android skills, delivering code that is open source, being able to do full stack development, working with a great set of people, please feel free to check out those job listings, or send mail to Rich Randall directly.  I’d check out the listings, then send to Rich.

Yes, this has been a shameless jobs plug.  I do work for the company, and am very interested in getting more interesting people in the door to work with.

 

 


Name That Framework – Echo Service in two lines of code

… and here they are:

SocketServer = require("SocketServer");
SocketServer(9090):run(function(s, b, l)  s:send(b, l); end);

Back in the day there was this gameshow called “Name That Tune”, where contestants would be told a clue about a song, then they would bid on the fewest number of notes it would take for them to name the tune. Once the bids were fixed, the orchestra would play the number of notes, and the contestant would have to correctly guess the name of the tune.

So, above are two lines of code which implement a highly scalable “echo” service. Can you name the framework?

It’s TINN of course!

Here’s a more reasonable rendition of the same:

local SocketServer = require("SocketServer");

local function OnData(socket, buff, bufflen)
  socket:send(buff, bufflen);
end;

local server = SocketServer(9090);
server:run(OnData)

Simply put, a SocketServer is a generic service that will listen on a particular port that you specify. Whenever it receives any data on the port, it will call the supplied ‘OnData’ function. Each time ‘OnData’ is called, it could be with a different socket and data. You could build a fairly rudimentary http server on top of this if you like. what’s most important to me is the fact that you don’t have to write any of the underlying low level networking code. Nothing about accept, IO Completion ports, etc. Just, call me when some data comes in on this specified port.

The SocketServer code itself looks like this:

local ffi = require("ffi");
local IOProcessor = require("IOProcessor");
local IOCPSocket = require("IOCPSocket");

IOProcessor:setMessageQuanta(nil);

SocketServer = {}
setmetatable(SocketServer, {
  __call = function(self, ...)
    return self:create(...);
  end,
});

SocketServer_mt = {
  __index = SocketServer;
}

SocketServer.init = function(self, socket, datafunc)
--print("SocketServer.init: ", socket, datafunc)
  local obj = {
    ServerSocket = socket;
    OnData = datafunc;
  };

  setmetatable(obj, SocketServer_mt);

  return obj;
end

SocketServer.create = function(self, port, datafunc)
  port = port or 9090;

  local socket, err = IOProcessor:createServerSocket({port = port, backlog = 15});

  if not socket then
    print("Server Socket not created!!")
    return nil, err
  end

  return self:init(socket, datafunc);
end

-- The primary application loop
SocketServer.loop = function(self)
  local bufflen = 1500;
  local buff = ffi.new("uint8_t[?]", bufflen);

  while true do
    local sock, err = self.ServerSocket:accept();

    if sock then
      local socket = IOCPSocket:init(sock, IOProcessor);
      local bytesread, err = socket:receive(buff, bufflen);

      if not bytesread then
        print("RECEIVE ERROR: ", err);
      elseif self.OnData ~= nil then
        self.OnData(socket, buff, bytesread);
      else
        socket:closeDown();
        socket = nil
      end
    else
       print("Accept ERROR: ", err);
    end

    collectgarbage();
  end
end

SocketServer.run = function(self, datafunc)
  if datafunc then
    self.OnData = datafunc;
  end

  IOProcessor:spawn(self.loop, self));
  IOProcessor:run();
end

return SocketServer;

This basic server loop is good for a lot of little tiny tasks where you just need to put a listener on the front of something. No massive scaleout, not multi-threading, just good straight forward stuff. But, it’s already plumbed to go big too.

Here’s a slight modification:

SocketServer.handleAccepted = function(self, sock)
  local handleNewSocket = function()
    local bufflen = 1500;
    local buff = ffi.new("uint8_t[?]", bufflen);
    
    local socket = IOCPSocket:init(sock, IOProcessor);

    if self.OnAccepted then
    else
      local bytesread, err = socket:receive(buff, bufflen);
  
      if not bytesread then
        print("RECEIVE ERROR: ", err);
      elseif self.OnData ~= nil then
        self.OnData(socket, buff, bytesread);
      else
        socket:closeDown();
        socket = nil
      end
    end
  end

  return IOProcessor:spawn(handleNewSocket);
end

-- The primary application loop
SocketServer.loop = function(self)

  while true do
    local sock, err = self.ServerSocket:accept();

    if sock then
      self:handleAccepted(sock);
    else
       print("Accept ERROR: ", err);
    end

    collectgarbage();
  end
end

In the main loop, instead of doing the processing directly, call the ‘self:handleAccepted()’ function. That function in turn will spawn an internal function to actually handle the request. Everything else remains the same.

If you do it this way, then the ‘OnData’ will run cooperatively with other accepts that might be going on. Also, this highlights, in an invisible way, that the ‘accept()’ call is actually cooperative. Meaning, since IO completion ports are being used int he background, the accept call is actually async. As soon as it issues the accept, that coroutine will wait in place until another socket comes in. Meanwhile, the last socket that was being handled will get some time slice to do what it wants.

And thus, you get massive scale (thousands of potential connections) from using this fairly simple code.

Well, those are the basics. Now that I have plumbed TINN from the ground up to utilize the IO Completion Ports, I can start to build upon that. There are a couple of nice benefits to marrying IOCP and Lua coroutines. I’ll be exploring this some more, but it’s basically a match made in heaven.


Computicles – A tale of two schedulers

One of the drivers for the creation of computicles is to maximize the efficiency of the running system while minimizing the complexity for the programmer.  Herein lies the rub.  Modern computers are multi-core/multi-proc, and Lua is largely a single core sort of system.  Lua has its own notion of “light weight threads”, which are essentially cooperative processing threads.  The native OS (Windows or Linux) has a notion of “threads” which are much more heavy weight.  While the Lua threads can number easily in the thousands and more, they are not actually running a parallel, they are just rapidly context switching between each other at the whim of the programmer.  The OS threads, on the other hand, are in fact running in parallel, on multiple cores if they exist.  But, as soon as you have more threads than you have cores, the threads are shifting rapidly between each other, just like in the Lua case, but it’s ‘preemptive’ instead of cooperative.

What I want?  I want to get the best of both worlds.  But, before starting down the path of leveraging the multiple cores, I want to start with the programming paradigm.

I want to write essentially serial code.  My brain is not good at dealing things like mutexes, semaphores, barriers, or any other kind of sharing mechanisms that have been invented over the past 40 years.  I know how to write straight sequential code.  I can deal with saying “spawn” to get something running in parallel, but that’s about it.

So, in steps computicles.

I’ve gone on about the subject a few times now, but I’ve finally created the unified scheduler that I require.  It looks like this:

 

-- comp_msgpump.lua
local ffi = require("ffi");
require("IOProcessor");

-- default to 15 millisecond timeout
gIdleTimeout = gIdleTimeout or 15

local idlecount = 0;

while true do
  if IOProcessor then
    IOProcessor:step();
  end

  local msg, err = SELFICLE:getMessage(gIdleTimeout);

  if not msg then
    if err == WAIT_TIMEOUT then
      --print("about to idle")
      idlecount = idlecount + 1;
      if OnIdle then
        OnIdle(idlecount);
      end
    end
  else
    local msgFullyHandled = false;
    msg = ffi.cast("ComputicleMsg *", msg);

    if OnMessage then
      msgFullyHandled = OnMessage(msg);
    end

    if not msgFullyHandled then
      msg = ffi.cast("ComputicleMsg *", msg);
      local Message = msg.Message;
      --print("Message: ", Message, msg.Param1, msg.Param2);
		
      if Message == Computicle.Messages.QUIT then
        if OnExit then
          OnExit();
        end
        break;
      end

      if Message == Computicle.Messages.CODE then
        local len = msg.Param2;
        local codePtr = ffi.cast("const char *", msg.Param1);
		
        if codePtr ~= nil and len > 0 then
          local code = ffi.string(codePtr, len);

          SELFICLE:freeData(ffi.cast("void *",codePtr));

          local func = loadstring(code);
          func();
        end
      end
      SELFICLE:freeMessage(msg);
    end
  end
end

 
This is pretty much the same event driven loop that has existed previously. It’s main function is to get messages off its message queue, and deal with them. This is how you communicate with a computicle. Under normal circumstances, a Computicle can simply implement either OnMessage(), if it wants to only respond when it receives a message. This is a perfectly event driven way to exist. Or it can implement OnIdle() if it wants to respond to the fact that nothing else is occuring in the system. This is a great combination, and will cover many useful cases. But what about waiting for some IO to complete?

Well, at the top of this event loop there is the IOProcessor:step() call. And what is an IOProcessor?

The IOProcessor is a scheduler for cooperative Lua threads. The IOProcessor assumes the user’s code is utilizing co-routines, and will deal with putting them on a ‘sleeping’ list whenever they perform a task, such as socket IO which does not complete immediately. It’s a classic, and before the Computicles existed, this was the primary scheduler.

It’s a bit thick with code, but here it is:

local ffi = require("ffi");

local Collections = require "Collections"
local IOCPSocket = require("IOCPSocket");
local SimpleFiber = require("SimpleFiber");
local IOCompletionPort = require("IOCompletionPort");
local SocketOps = require("SocketOps");


IOProcessor = {
  fibers = Collections.Queue.new();
  coroutines = {};
  EventFibers = {};
  FibersAwaitingEvent = {};

  IOEventQueue = IOCompletionPort:create();
  MessageQuanta = 15;		-- 15 milliseconds
};


--[[
	Socket Management
--]]

IOProcessor.createClientSocket = function(self, hostname, port)
  return IOCPSocket:createClient(hostname, port, self)
end

IOProcessor.createServerSocket = function(self, params)
  return IOCPSocket:createServerSocket(params, self)
end

IOProcessor.observeSocketIO = function(self, socket)
  return self.IOEventQueue:addIoHandle(socket:getNativeHandle(), socket.SafeHandle);
end

--[[
	Fiber Handling
--]]

IOProcessor.scheduleFiber = function(self, afiber, ...)
  if not afiber then
    return nil
  end
  self.coroutines[afiber.routine] = afiber;
  self.fibers:Enqueue(afiber);	

  return afiber;
end

IOProcessor.spawn = function(self, aroutine, ...)
  return self:scheduleFiber(SimpleFiber(aroutine, ...));
end

IOProcessor.removeFiber = function(self, fiber)
  self.coroutines[fiber.routine] = nil;
end

IOProcessor.inMainFiber = function(self)
  return coroutine.running() == nil; 
end

IOProcessor.yield = function(self)
  coroutine.yield();
end

IOProcessor.yieldForIo = function(self, sock, iotype)
  -- associate a fiber with a socket
  print("yieldForIo, CurrentFiber: ", self.CurrentFiber);
	
  self.EventFibers[sock:getNativeSocket()] = self.CurrentFiber;

  -- Keep a list of fibers that are awaiting io
  if self.CurrentFiber ~= nil then
    self.FibersAwaitingEvent[self.CurrentFiber] = true;

    -- Whether we were successful or not in adding the socket
    -- to the pool, perform a yield() so the world can move on.
    self:yield();
  end
end


IOProcessor.processIOEvent = function(self, key, numbytes, overlapped)
    local ovl = ffi.cast("SocketOverlapped *", overlapped);
    local sock = ovl.sock;
    ovl.bytestransferred = numbytes;
    if sock == INVALID_SOCKET then
		return false, "invalid socket"
    end

    --print("IOProcessor.processIOEvent(): ", sock, ovl.operation);

    local fiber = self.EventFibers[sock];
    if fiber then
      self:scheduleFiber(fiber);
      self.EventFibers[sock] = nil;
      self.FibersAwaitingEvent[fiber] = nil;
    else
      print("EventScheduler_t.ProcessEventQueue(), No Fiber waiting to process.")
      -- remove the socket from the watch list
    end
end

IOProcessor.stepIOEvents = function(self)
    -- Check to see if there are any IO Events to deal with
    local key, numbytes, overlapped = self.IOEventQueue:dequeue(self.MessageQuanta);

    if key then
      self:processIOEvent(key, numbytes, overlapped);
    else
      -- typically timeout
      --print("Event Pool ERROR: ", numbytes);
    end
end

IOProcessor.stepFibers = function(self)
  -- Now check the regular fibers
  local fiber = self.fibers:Dequeue()

  -- Take care of spawning a fiber first
  if fiber then
    if fiber.status ~= "dead" then
      self.CurrentFiber = fiber;
      local result, values = fiber:Resume();
      if not result then
        print("RESUME RESULT: ", result, values)
      end
      self.CurrentFiber = nil;

      if fiber.status ~= "dead" and not self.FibersAwaitingEvent[fiber] then
        self:scheduleFiber(fiber)
      else
        --print("FIBER FINISHED")
        -- remove coroutine from dictionary
        self:removeFiber(fiber)
      end
    else
      self:removeFiber(fiber)
    end
  end
end

IOProcessor.step = function(self)
  self:stepFibers();
  self:stepIOEvents();
end

return IOProcessor

There are a couple of ways to approach this. From the perspective of the other event loop, the “step()” method here is executed once around the loop. The ‘step()’ method in turn checks on the fibers, and then on the ioevents. “stepFibers” checks the list of fibers that are ready to run, and runs one of them for a bit until it yields, and is thus placed back on the queue of fibers ready to be run, or it finishes. This is the part where a normal cooperative processing system could be brought to its knees, and a preemptive multi-tasking system would just keep going. The ‘stepIOEvents()’ function checks on the IOCompletionPort that is being used by sockets to indicate whether anything interesting has occured. If there has been any activity, the cooperative thread associated with the activity is scheduled to execute a bit of code. It does not execute immediately, but it is now on the list to be executed next time around.

The stepIOEvents() function is at the heart of any system, such as node.js, which gets high performance with IO processing, while maintaining a low CPU load. Most of the time you’re just waiting, doing nothing, and once the system indicates there is action on the socket, you can spring into action. Thus, you do not spend any time looping over sockets polling to see if there’s any activity, you’re just notified when there is.

The rest of the code is largely helpers, like creating a socket that is wired correctly and whatnot.

So, at the end of it, what does this system do?

Well, assuming I want to write a DateTimeClient, which talks to a service, gets the date and time, prints it out, etc, I would write this:

local ffi = require "ffi"
require("IOProcessor");

local daytimeport = 13

GetDateAndTime = function(hostname, port)
    hostname = hostname or "localhost";
    port = port or daytimeport;

    local socket, err = IOProcessor:createClientSocket(hostname, port);

    if not socket then
        print("Socket Creation Failed: ", err);
        return nil, err;
    end

    local bufflen = 256;
    local buff = ffi.new("char [?]", bufflen);

    local n, err = socket:receive(buff, bufflen)
 
    if not n then
        return false, err;
    end

    if n > 0 then
        return ffi.string(buff, n);
    end
end

Well, that looks like normal sequential code to me. And yes, it is. Nothing unusual. But, when running in the context of a computicle, like the following, it gets more interesting.

local Computicle = require("Computicle");

local codeTemplate = [[
require("DaytimeClient");
local dtc, err = GetDateAndTime("localhost");
print(dtc);
]]

local comp1 = Computicle:create(codeTemplate);
local comp2 = Computicle:create(codeTemplate);

comp1:waitForFinish();
comp2:waitForFinish();

This will take the otherwise sequential code of the DateTimeClient, and execute it in two parallel preemptive Operating System level threads, and wait for their completion. The magic is all hidden behind the schedulers and event loops. I never have to know about how that all happens, but I can appreciate the benefits.

Marrying the concepts of event driven, muti-process, cooperative user space threads, preemptive multi-tasking, and the like can be a daunting task. But, with a little bit of script, some chewing gum, a whistle and a prayer, it all comes together in a seamless whole which is quite easy to use, and fairly simple to understand. I think I have achieved my objectives for ease of use, maximizing efficiency, and reducing head explosions.


Computicles – Simplifying Chaining

Using the “exec()” method, I can chain computicles together, but the code is kind of raw. There is one addition I left out of the equation last time around, and that the hydrate/rehydrate of a computicle variable itself. Now it looks like this:

elseif dtype == "table" then
  if getmetatable(data) == Computicle_mt then
    -- package up a computicle
    datastr = string.format("Computicle:init(TINNThread:StringToPointer(%s),TINNThread:StringToPointer(%s));", 
      TINNThread:PointerToString(data.Heap:getNativeHandle()), 
      TINNThread:PointerToString(data.IOCP:getNativeHandle()));
  elseif getmetatable(data) == getmetatable(self.IOCP) then
    -- The data is an iocompletion port, so handle it specially
    datastr = string.format("IOCompletionPort:init(TINNThread:StringToPointer(%s))",
      TINNThread:PointerToString(data:getNativeHandle()));
  else
    -- get a json string representation of the table
    datastr = string.format("[[ %s ]]",JSON.encode(data, {indent=true}));
  end

This is just showing the types that are descended from the ‘table’ type. This includes IOCompletionPort, and ‘Computicle’. Anything other than these two will be serialized as fairly straight forward key/value pair tables. With this in place, I can now do the following.

-- Setup the splitter that will dispatch to the leaf nodes
local splitter = Computicle:load("comp_splittercode");

-- Setup the source, which will be originating messages
local source = Computicle:load("comp_sourcecode");
source.sink = splitter;

The code for the comp_sourcecode computicle looks like this:


RunOnce = function(sink)
  print("comp_sourcecode, RunOnce...")
  for i = 1, 10 do
    sink:postMessage(i);
  end
end

OnIdle = function(counter)
  print("comp_sourcecode, OnIdle(): ", counter)
  if sink ~= nil then 
    RunOnce(sink);

    sink = nil;
    exit();
  end
end

This computicle implements the OnIdle() function, as it will use that to determine when I can send messages off to the sink. If the sink hasn’t been set yet, then it will do nothing. If it has been set, then it will execute the ‘RunOnce()’ function, sending the sink a bunch of messages.

After performing the RunOnce() task, the computicle will simply exit(), which == SELFICLE:quit() == SELFICLE:postMessage(Computicle.Messages.QUIT);

And that’s the end of that computicle. The splitter has become much simpler as well.

local ffi = require("ffi");


OnMessage = function(msg)
	msg = ffi.cast("ComputicleMsg *", msg);
	local Message = msg.Message;
--print("comp_splittercode, OnMessage(): ", Message);

	if sink1 then
		sink1:postMessage(Message);
	end	

	if sink2 then
		sink2:postMessage(Message);
	end
end

Here, the splitter assumes there are two sinks. If either of them exists, they will receive the message that was passed into the splitter. If they don’t exist, then the message will be dropped. Of course, other behaviors, such as holding on the messages, could be implemented as well.

In this case, the computicle is not exited internally. This is because the splitter itself does not know when it is finished, all it knows is that when it receives a message, it is supposed to pass that message on to its two sinks.

The sink code is equally simple.

local ffi = require("ffi");

OnMessage = function(msg)
  msg = ffi.cast("ComputicleMsg *", msg);
  local Message = msg.Message;

  print(msg.Message*10);
end

Here again, just implement the ‘OnMessage()’ function, and the comp_msgpump code will call it automatically. And again, the sink does not know when it is done, so it does not explicitly exit.

Pulling it all together, the exiting logic becomes more clear:

local Computicle = require("Computicle");

-- Setup the leaf nodes to receive messages
local sink1 = Computicle:load("comp_sinkcode");
local sink2 = Computicle:load("comp_sinkcode");


-- Setup the splitter that will dispatch to the leaf nodes
local splitter = Computicle:load("comp_splittercode");

splitter.sink1 = sink1;
splitter.sink2 = sink2;


-- Setup the source, which will be originating messages
local source = Computicle:load("comp_sourcecode");
source.sink = splitter;


-- Close everything down from the outside
print("FINISH source: ", source:waitForFinish());

-- tell the splitter to quit
splitter:quit();
print("FINISH splitter:", splitter:waitForFinish());

-- the two sinks will receive the quit message from the splitter
-- so, just wait for them to quit.
print("Finish Sink 1: ", sink1:waitForFinish());
print("Finish Sink 2: ", sink2:waitForFinish());

There is an order in which computicles are created, and assigned, which stitches them together. The computicles could actually be constructed in a “suspended state”, but that would not help matters too much.

At the end, the quit() sequence can clearly be seen. First, wait for the source to be finished. Then tell the splitter to quit(). This is not an interrupt, so whatever was in its queue previously will flow through before the QUIT is processed. Then finally, do the same thing on the sinks, after the splitter has finished.

All messages flow, and all processing finishes cleanly. So, this is one way to perform the exit mechanism from the outside, as well as from the inside.

Adding this bit of ease in programming did not change the fundamentals of the computicle. It still has a single communciation mechanism (the queue). It still performs computation, it still communicates with the outside world. The new code saves the error prone process of type casting and hydrating objects that the system already knows about. This in turn makes the usage pattern that much easier to deal with.

The comp_msgpump was added in a fairly straight forward way. When a computicle is constructed, there is a bit of code (a Prolog) that is placed before the user’s code, and a bit of code (Epilog) placed after it. Those two bits of code look like this:

Computicle = {
	Prolog = [[
TINNThread = require("TINNThread");
Computicle = require("Computicle");
IOCompletionPort = require("IOCompletionPort");
Heap = require("Heap");

exit = function()
    SELFICLE:quit();
end
]];

Epilog = [[
require("comp_msgpump");
]];

Computicle.createThreadChunk = function(self, codechunk, params, codeparams)
	local res = {};

	-- What we want to load before any other code is executed
	-- This is typically some 'require' statements.
	table.insert(res, Computicle.Prolog);


	-- Package up the parameters that may want to be passed in
	local paramname = "_params";

	table.insert(res, string.format("%s = {};", paramname));
	table.insert(res, self:packParams(params, paramname));
	table.insert(res, self:packParams(codeparams, paramname));

	-- Create the Computicle instance that is running within 
	-- the Computicle
	table.insert(res, 
		string.format("SELFICLE = Computicle:init(%s.HeapHandle, %s.IOCPHandle);", paramname, paramname));


	-- Stuff in the user's code
	table.insert(res, codechunk);

	
	-- What we want to execute after the user's code is loaded
	-- By default, this will be a message pump
	table.insert(res, Computicle.Epilog);

	return table.concat(res, '\n');
end

If an application needs a different prolog or epilog, it’s fairly easy to change them either by changing the file that is being read in, or by changing the string itself: Computicle.Prolog = [[print(“Hello, World”)]]

And so it goes. I’ve never had it so easy creating multi-threaded bits of computation and stitching those bits together.


Computicles – Unsafe, but fun, code injection

Computicles are capable of receiving bits of code to execute. That is in fact one way in which they communicate with each. From .
one context I can simple do: comp.someValue = 100, and that will set “someValue = 100” in another context. Well, great. How about sending something across that’s more substantial than a simple variable setting?

Well, with the Computicle.exec() method, which the previous example is based on, you can really send across any bit of code to be executed. It works, and you can even send across the body of a function like this:

comp:exec[[
function hello()
  print("Hello, World!");
end
]]

That works fine, and you have full access to all stuff that’s running in that context. It’s a bit clunky though because the entirety of your piece of code is wrapped up in a string value. This is very similar to executing SQL code on some server. The bulk of your code, if not in stored procedures”, ends up in these opaque strings. Hard to catch errors, hard to debug, etc. How about the following:

comp.hello = function()
  print("Hello, World!");
end

What that you say? Basically, assigning a function as a variable to the computicle. Yes, this can work. There is a slight modification to turn a function into a string, and it looks like this:

Computicle.datumToString = function(self, data, name)
	local dtype = type(data);
	local datastr = tostring(nil);

--print("DATUM TYPE: ", name, dtype);

	if type(data) == "cdata" then
		-- If it is a cdata type that easily converts to 
		-- a number, then convert to a number and assign to string
		if tonumber(data) then
			datastr = tostring(tonumber(data));
		else
			-- if not easily converted to number, then just assign the pointer
			datastr = string.format("TINNThread:StringToPointer(%s);", 
				TINNThread:PointerToString(data));
		end
	elseif dtype == "table" then
		if getmetatable(data) == Computicle_mt then
			-- package up a computicle
		else
			-- get a json string representation of the table
			datastr = string.format("[[ %s ]]",JSON.encode(data, {indent=true}));

			--print("=== JSON ===");
			--print(datastr)
		end
	elseif dtype == "function" then
		datastr = "loadstring([["..string.dump(data).."]])";
	elseif dtype == "string" then
		datastr = string.format("[[%s]]", data);
	else 
		datastr = tostring(data);
	end

	return datastr;
end

Notice the part that starts: ‘elseif dtype == “function()” then’.

Basically, there is a string function that turns anything into bytecode which can later be hydrated using loadstring later. It is placed in ‘loadstring([[…]])’, because this is the only way to preserve the actual 8-bit values. If you use string.format(“%s”), it will chop out the non-ascii stuff.

Then, this code can be executed in the remote context just the same as any other little bit of code. There are some restrictions to what kinds of functions you can do this with though. No ‘upvalues’, meaning, everything must be contained within the function itself, no calling out to global variables and the like.

Of course, that just gets the code over to the other side in a convenient way. What about actually executing the code?

comp:exec([[hello()]]);

Same as ever, just make the function call using “exec()”. Of course, this little bit could be wrapped up in some other convenient override, like using the ‘:’ notation, but there’s some more work to be done before making that a reality.

What other functions can be dealt with? Well, the loop running within the computicle looks like the following now:

-- comp_msgpump.lua

local ffi = require("ffi");

-- This is a basic message pump
-- 

-- default to 15 millisecond timeout
gIdleTimeout = gIdleTimeout or 15


local idlecount = 0;

while true do
  local msg, err = SELFICLE:getMessage(gIdleTimeout);
  -- false, WAIT_TIMEOUT == timed out
  --print("MSG: ", msg, err);

  if not msg then
    if err == WAIT_TIMEOUT then
      --print("about to idle")
      idlecount = idlecount + 1;
      if OnIdle then
        OnIdle(idlecount);
      end
    end
  else
  	local msgFullyHandled = false;

    if OnMessage then
      msgFullyHandled = OnMessage(msg);
    end

    if not msgFullyHandled then
      msg = ffi.cast("ComputicleMsg *", msg);
      local Message = msg.Message;
      --print("Message: ", Message, msg.Param1, msg.Param2);
		
      if Message == Computicle.Messages.QUIT then
        break;
      end

      if Message == Computicle.Messages.CODE then
        local len = msg.Param2;
        local codePtr = ffi.cast("const char *", msg.Param1);
		
        if codePtr ~= nil and len > 0 then
          local code = ffi.string(codePtr, len);

          SELFICLE:freeData(ffi.cast("void *",codePtr));

          local func = loadstring(code);
          func();
        end
      end
      SELFICLE:freeMessage(msg);
    end
  end
end

This new message pump has a couple of interesting new features. The first feature has to do with the getMessage(). It will timeout after some number of milliseconds have passed and a message hasn’t come in. If there is an ‘OnIdle()’ function defined, it will be called, passing that function the current count. If that function does not exist, then nothing will happen.

The second change has to do with message processing. If there is a “OnMessage()” function defined, it will be called, and the message will be handed to it for processing. If that function does not exist, then it will perform some default actions, such as reading and code, and possibly handling a “QUIT”.

So, how about that OnIdle? that’s a function that takes a parameter. Can I inject that? Sure can:

comp.OnIdle = function(count)
  print("IDLE", count)
end

Just the the hello() case, except this one can take a parameter. This does not violate the “no upvalues” rule, so it’s just fine.

In this case, I don’t have to actually execute the code, because the loop within the computicle will pick up on it and execute it. And if you wanted to remove this idling funcition, simple do: ‘comp.OnIdle = nil’

And then what?

Well, that’s pretty cool. Now I can easily set simple variables, and I can set table values, and I can even set functions to be called. I have a generic message pumping loop, which has embedded “OnIdle”, and “OnMessage” functions.

The last little bit of work to be done here is to deal with Asynchronous IO in a relatively seamless way using continuations, just like I had in the pre-computicle world. That’s basically a marriage of the core of the EventScheduler and the main even loop here.

Throw some authentication on top of it all and suddenly you have a system that is very cool, secure, and useful.


Computicles – Inter-computicle communication

Alrighty then, so a computicle is a vessel that holds a bit of computation power. You can communicate with it, and it can communicate with others.

Most computicles do not stand as islands unto themselves, so easily communicating with them becomes very important.

Here is some code that I want to be able to run:

local Computicle = require("Computicle");
local comp = Computicle:load("comp_receivecode");

-- start by saying hello
comp:exec([[print("hello, injector")]]);

-- queue up a quit message
comp:quit();

-- wait for it all to actually go through
comp:waitForFinish();

So, what’s going on here? The first line is a standard “require” to pull in the computicle module.

Next, I create a single instance of a Computicle, running the Lua code that can be found in the file “comp_receivecode.lua”. I’ll come back to that bit of code later. Suffice to say it’s running a simple computicle that does stuff, like execute bits of code that I hand to it.

Further on, I use the Computicle I just created, and call the “exec()” function. I’m passing a string along as the only parameter. What will happen is the receiving Computicle will take that string, and execute the script from within its own context. That’s actually a pretty nifty trick I think. Just imagine, outside the world of scripting, you can create a thread in one line of code, and then inject a bit of code for that thread to execute. Hmmm, the possibilities are intriguing methinks.

The tail end of this code just posts a quit, and then finally waits for everything to finish up. Just not that the ‘quit()’ function is not the same thing as “TerminateThread()”, or “ExitThread()”. Nope, all it does is post a specific kind of message to the receiving Computicle’s queue. What the thread does with that QUIT message is up to the individual Computicle.

Let’s have a look at the code for this computicle:

local ffi = require("ffi");

-- This is a basic message pump
-- 
while true do
  local msg = SELFICLE:getMessage();
  msg = ffi.cast("ComputicleMsg *", msg);
  local Message = msg.Message;

  if OnMessage then
    OnMessage(msg);
  else
    if Message == Computicle.Messages.QUIT then
      break;
    end

    if Message == Computicle.Messages.CODE then
      local len = msg.Param2;
      local codePtr = ffi.cast("const char *", msg.Param1);
		
      if codePtr ~= nil and len > 0 then
        local code = ffi.string(codePtr, len);

        SELFICLE:freeData(ffi.cast("void *",codePtr));

        local f = loadstring(code);
	f();
      end
    end
  end

  SELFICLE:freeMessage(msg);
end

It’s not too many lines. This little Computicle takes care a few scenarios.

First of all, if there so happens to be a ‘OnMessage’ function defined, it will receive the message, and the main loop will do no further processing of it.

If there is no ‘OnMessage’ function, then the message pump will handle a couple of cases. In case a ‘QUIT’ message is received, the loop will break and the thread/Computible will simply exit.

When the message == ‘CODE’ things get really interesting. The ‘Param1’ of the message contains a pointer to the actual bit of code that is intended to be executed. The ‘Param2’ contains the length of the specified code.

Through a couple of type casts, and an ffi.string() call, the code is turned into something that can be used with ‘loadstring()’, which is a standard Lua function. It will parse the string, and then when ‘f()’ is called, that string will actually be executed (within the context of the Computicle). And that’s that!

At the end, the ‘SELFICLE:freeMessage()’ is called to free up the memory used to allocate the outer message. Notice that ‘SELFICLE:freeData()’ was used to clean up the string value that was within the message itself. I have intimate knowledge of how this message was constructed, so I know this is the correct behavior. In general, if you’re going to pass data to a computicle, and you intend the Computicle to clean it up, you should use the computicle instance “allocData()” function.

OK. So, that explains how I could possibly inject some code into a Computicle for execution. That’s pretty nifty, but it looks a bit clunky. Can I do better?

I would like to be able to do the following.

comp.lowValue = 100;
comp.highValue = 200;

In this case, it looks like I’m setting a value on the computicle instance, but in which thread context? Well, what will actually happen is this will get executed within the computicle instance context, and be available to any code that is within the computicle.

We already know that the ‘exec()’ function will execute a bit of code within the context of the running computicle, so the following should now be possible:

comp:exec([[print(" Low: ", lowValue)]]);
comp:exec([[print("High: ", highValue)]])

Basically, just print those values from the context of the computile. If they were in fact set, then this should print them out. If there were not in fact set, then it should print ‘nil’ for each of them. On my machine, I get the correct values, so that’s an indication that they were in fact set correctly.

How is this bit of magic achieved?

The key is the Lua ‘__newindex’ metamethod. Wha? Basically, if you have a table, and you try to set a value that does not exist, like I did with ‘lowValue’ and ‘highValue’, the ‘__newindex()’ function will be called on your table if you’ve got it setup right. Here’s the associated code of the Computicle that does exactly this.

__newindex = function(self, key, value)
  local setvalue = string.format("%s = %s", key, self:datumToString(value, key));
  return self:exec(setvalue);
end

That’s pretty straight forward. Just create some string that represents setting whatever value you’re trying to set, and then call ‘exec()’, which is already known to execute within the context of the thread. So, in the case where I have written “comp.lowValue = 100”, this will turn into a sting that == “lowValue == 100”, and that string will be executed, setting a global variable ‘lowValue’ == 100.

And what is this ‘datumToString()’ function? Ah yes, this is the little bit that takes various values and returns their string equivalent, ready to be injected into a running Computicle.

Computicle.datumToString = function(self, data, name)
  local dtype = type(data);
  local datastr = tostring(nil);

  if type(data) == "cdata" then
      -- If it is a cdata type that easily converts to 
      -- a number, then convert to a number and assign to string
    if tonumber(data) then
      datastr = tostring(tonumber(data));
    else
      -- if not easily converted to number, then just assign the pointer
      datastr = string.format("TINNThread:StringToPointer(%s);", 
        TINNThread:PointerToString(data));
    end
  elseif dtype == "table" then
    if getmetatable(data) == Computicle_mt then
      -- package up a computicle
    else
      -- get a json string representation of the table
      datastr = string.format("[[ %s ]]",JSON.encode(data, {indent=true}));
    end
  elseif dtype == "string" then
    datastr = string.format("[[%s]]", data);
  else 
    datastr = tostring(data);
  end

  return datastr;
end

The task is actually fairly straight forward. Given a Lua based value, turn it into a string that can be executed in another Lua state. There are of course methods in Lua which will do this, and tons of marshalling frameworks as well. But, this is a quick and dirty version that does exactly what I need.

Of particular note are the handling of cdata and table types. For cdata, some of the values, such as ‘int64_t’, I want to just convert to a number. Tables are the most interesting. This particular technique will really only work for fairly simple tables, that do not make references to other tables and the like. Basically, turn the table into a JSON string, and send that across to be rehydrated as a table.

Here’s some code that actually makes use of this.

comp.contacts = {
  {first = "William", last = "Adams", phone = "(111) 555-1212"},
  {first = "Bill", last = "Gates", phone = "(111) 123-4567"},
}

comp:exec([=[
print("== CONTACTS ==")

-- turn contacts back into a Lua table
local JSON = require("dkjson");

local contable = JSON.decode(contacts);


for _, person in ipairs(contable) do
	--print(person);
	print("== PERSON ==");
	for k,v in pairs(person) do
		print(k,v);
	end
end
]=]);

Notice that ‘comp.contacts = …’ just assigns the created table directly. This is fine, as there are no other references to the table on ‘this’ side of the computicle, so it will be safely garbage collected after some time.

The rest of the code is using the ‘exec()’, so it is executing in the context of the computicle. It basically gets the value of the ‘contacts’ variable, and turns it back into a Lua table value, and does some regular processing on it (print out all the values).

And that’s about it. From the humble beginnings of being able to inject a bit of code to run in the context of an already running thread, to exchanging tables between two different threads with ease, Computicles make pretty short work of such a task. It all stems from the same three principles of Computicles: listen, compute, communicate. And again, not a single mutex, lock, or other visible form of synchronization. The IOCompletionPort is the single communications mechanism, and all the magic of serialized multi-threaded communication hide behind that.

Of course, ‘code injection’ is a dirty word around the computing world, so there must be a way to secure such transfers? Yah, sure, why not. I’ve been bumping around the indentity/security/authorization/authentication space recently, so surely something must be applicable here…


The Birth of Computicles

I have written about the “computicle” concept a couple of times in the past:
Super Computing Simulation Sure is Simple
The Existential Program

The essence of a computicle boils down to the following three attributes.

  • Ability to receive input
  • Ability to perform some operations
  • Ability to communicate with other computicles

I had written about memory sharing and ownership, and what mechanisms might be used to exchange data and whatnot. Now that I’ve done the IO Completion Port thing, I can finally introduce a ‘computicle’.

The idea is fairly straight forward. I would like to be able to write a piece of code that is fairly self sufficient/standalone, and have it run in its own environment without much fuss. In the context of Lua, that means, I want each computicle to have its own LuaState, and to run on its own OS thread. I want to be able to communicate with the thing, using a queue, and allow it to communicate with other computicles using the same mechanism. Of course I want everything seamlessly integrated so the programmer isn’t required to know anything about locking mechanisms, or even the fact that they are in a separate thread, or anything like that. Without further ado, here’s an example:

local Computicle = require("Computicle");

print(Computicle:compute([[print("Hello World!")]]):waitForFinish());

Tada! This little code snippet does all those things I listed above. I’ll present it in another form so it is a little more obvious what’s going on.

local Computicle = require("Computicle");

local comp = Computicle:create([[print("Hello World!")]]);

local status, err = comp:waitForFinish();

print("Finish: ", status, err);

Here, an instance of a computicle is created, and the code chunk that it is to execute is fed to it through the constructor. Of course, that is Lua code, so it could be anything. Another more interesting form might be the following:

local comp = Computicle:create([[require("helloworld.lua")]]);

What’s happening behind the scenese here is that the Computicle is creating a TINNThread to run that bit of code, which in turn is creating a separate LuaState for the same. At the same time, an IOCompletion port is also being created so that communication can occur. The ‘waitForFinish’ is a simple mechanism by which the any computicle can be waited upon. More exotic synchronization mechanisms can be brought into play where necessary, but this works just fine.

The code for the computicle is a test case to be found here.

Here’s an example of how you can spin up multiple computicles at the same time.

local Computicle = require("Computicle");

local comp1 = Computicle:compute([[print("Hello World!");]]);
local comp2 = Computicle:compute([[
for i=1,10 do
print("Counter: ", i);
end
]]);

print("Finish: ", comp1:waitForFinish());
print("Finish: ", comp2:waitForFinish());

 

If you run this, you see some interleaving of the two threads running at the same time. You wait for completion of both computicles before finally finishing.

How about inter-computicle communications? Well, when you construct a computicle, you can actually give it two bits of information. The first is an absolute must, and that’s the code to be executed. The second is optional, and is the set of parameters that you want to pass to the computicle. I have previously written about the fact that communicating bits of data between threads requires a well thought out strategy with respect to memory ownership, and I showed how the IO Completion Port can be used as a queue to achieve this. Well, computicles just use that mechanism. So, you can do the following:

local Computicle = require("Computicle");

local comp2 = Computicle:create([[
local ffi = require("ffi");

while true do
  local msg = SELFICLE:getMessage();
  msg = ffi.cast("ComputicleMsg *", msg);

  print(msg.Message*10);
  SELFICLE.Heap:free(msg);
end
]]);

local sinkstone = comp2:getStoned();

local comp1 = Computicle:create([[
local ffi = require("ffi");

local stone = _params.sink;
stone = ffi.cast("Computicle_t *", stone);

local sinkComp = Computicle:init(stone.HeapHandle, stone.IOCPHandle, stone.ThreadHandle);

for i = 1, 10 do
  sinkComp:postMessage(i);
end
]], {sink = sinkstone});

print("Finish 1: ", comp1:waitForFinish());
print("Finish 2: ", comp2:waitForFinish());

What’s going on here? Well, the computicle ‘comp2’ is acting as a sink for information. Meaning, it spends its whole time just pulling work data out of its inbuilt queue. But, there’s a bit of trickery here, so a line by line explanation is in order.

  local msg = SELFICLE:getMessage();

What’s a ‘SELFICLE’? That’s the Computicle context within which the code is running. It is a global variable that exists within each and every computicle. This is the way the code within a computicle can get at various functions and variables for the environment. One of the computicle methods is ‘getMessage()’. This is of course how you can get messages out of your Computicle message queue. If you’ve done any Windows programming before, it’s very similar to doing “GetMessage()”, when you’re fiddling about with User32. I suspect the very lowest level mechanisms are probably similar.

  msg = ffi.cast("ComputicleMsg *", msg);

What you get out of ‘getMessage()’ is a pointer (as expected). In order to make any sense of it, you need to cast it to the “ComputicleMsg” type, which looks like this:

typedef struct {
	int32_t		Message;
	UINT_PTR	wParam;
	LONG_PTR	lParam;
} ComputicleMsg;

Again, looks fairly familiar. Basically, any data structure would do, as long as your computicles agree on what it is. This data structure works because it combines a simple “message”, with a couple of pointers, so you can pass pointers to other more elaborate structures.

  print(msg.Message*10);

Once we have the message cast to our message type, we can do some work. In this case, just get the ‘Message’ field, multiply it, and print it out. This is the working end of the computicle. You can put any code in here for the “work”. I can call out to other bits of code, fire of network requests, launch more computicles, whatever. It’s just Lua code running in a thread with its own LuaState, so the sky’s the limit!

  SELFICLE.Heap:free(msg);

And finally, you have to manage the bit of memory that you were handed. These computicles are simple, they are sharing a common heap across all of them, so I know the bit of memory was allocated from one heap, so I can just free it.

So, that’s the consuming side. What about the sending side?

local sinkstone = comp2:getStoned();

I need to tell the second computicle about the first one. There are only two communications mechanisms available. The first is to pass whatever I want as a list of parameters when I construct the computicle. The second is to use the computicle’s queue. In this particular case, I’ll use the former method and pass the computicle as a startup parameter. In order to do that though, I can only communicate cdata. I can not pass any LuaState based bits of information, because it becomes ambiguous as to who owns the chunk of memory used, and what’s the lifetime of that chunk.

The ‘getStoned()’ function takes a computicle and creates a heap based representation of it which is appropriate for sending to another computicle. The data structure that is created looks like this:

typedef struct {
	HANDLE HeapHandle;
	HANDLE IOCPHandle;
	HANDLE ThreadHandle;
} Computicle_t;
local comp1 = Computicle:create([[
local ffi = require("ffi");

The source Computicle is created. Keep in mind that a Computicle is an almost, but not quite, empty container waiting for you to fill with code. There are a couple of things, like the SELFICLE, that are already available, but things like other modules must be pulled in just like any other code you might write. The only modules already in place, besides standard libraries, are the TINNThread, and Computicle.

local stone = _params.sink;
stone = ffi.cast("Computicle_t *", stone);

local sinkComp = Computicle:init(stone.HeapHandle, stone.IOCPHandle, stone.ThreadHandle);

The other global variable besides ‘SELFICLE’ is the ‘_params’ table. This table contains a list of parameters that you might have passed into the constructor. Some manipulation has occured to these items though. They do not contain type information, they are just raw ‘void *’, so you have to turn them back into whatever they were supposed to be by doing the ‘ffi.cast’. Once you do that, you can use them. In this particular case, we passed the stoned state of the sink Computicle as the ‘sink’ paramter when the source computicle was constructed, so the code can just access that parameter and be on its way. It’s easiest to think of the ‘_params’ table as being the same as the ‘arg’ table that Lua has in general. I didn’t reuse the ‘arg’ concept though for a couple of reasons. First of all, ‘arg’ is a simple array, not a dictionary. So, you access things using index numbers. I wanted to support named values because that’s more useful in this particular usage. Second, since I’m not using it as a simple array, I didn’t want to confuse matters by naming it the same thing as ‘arg’, so thus, ‘_params’.

In the above code, a computicle is being initiated using the value passed in from the sink stone. This is a common pattern. When it is desirable to construct a Computicle from scratch, the ‘Computicle:create()’ function is used. In this case, I don’t want to create a new computicle, but rather an alias to an existing computicle. This is why I need to know its state, so that I can create this alias, and ultimately communicate with it.

Lastly, after everything is setup:

for i = 1, 10 do
	sinkComp:postMessage(i);
end

What’s going on here? Well, here’s what the postMessage() function looks like:

Computicle.postMessage = function(self, msg, wParam, lParam)
  -- Create a message object to send to the thread
  local msgSize = ffi.sizeof("ComputicleMsg");
  local newWork = self.Heap:alloc(msgSize);
  newWork = ffi.cast("ComputicleMsg *", newWork);
  newWork.Message = msg;
  newWork.wParam = wParam or 0;
  newWork.lParam = lParam or 0;

  -- post it to the thread's queue
  self.IOCP:enqueue(newWork);

  return true;
end

Remember, the only way to communicate to a computicle is to send it a bit of shared memory by placing it into its queue. So, postMessage just takes the few parameters that you pass it, and packages them up into a chunk of memory which is ready to be sent across via that queue. On this case, we have the alias to the Computicle we want to talk to, so the message does in fact land in the proper queue.

]], {sink = sinkstone});

This final part is just how we pass the ‘sink’ stone as a parameter to the constructor.

So, there you have it. In full detail. I can construct Computicles. I can write bits of script code that run completely independently. I can communicate between Computicles in a relatively easy manner.

As a lazy programmer, I’m loving this. It allows me to conceptualize my code at a different level. I don’t have to be so much concerned with the mechanics of making low level system calls, which is typically an error prone process for me. That’s all been wrapped up and taken care of. One of the hardest aspects of multi-threaded programming (locking), is completely eliminated because of using the IO Completion Port as a simple thread-safe queue. Simple synchronization is achieved either through queued messages, or through waiting on a particular Computicle to finish its job.

This construct reminds me of DirectShow and filter graphs. Very similar in concept, but Computicles generalize the concept, and make it brain dead simple to pull off with scrict rather than ‘C’ code.

I’m also loving this because now I can go more easily from ‘boxes and arrows’ to actual working system. The ‘boxes’ are instances of computicles, and the arrows are the ‘postMessage/getMessage’ calls between them. This is a fairly under stated, but extremely powerful mechanism.

Next time around, I’ll go through the actual Computicle code itself to shed light on the mystery therein.


The Lazy Programmer Multitasks – Using IOCP to get work done

I’ve written about event driven stuff, and brushed up IO Completion Ports in the past, but I’ve never really delivered the goods on these things.  Well, I was wanting to up the scalability, and reduce the CPU utilization of my standard eventing model, so I finally bit the bullet and started down the path of using IO Completion ports.

First of all, IO Completion Ports is this mechanism that Windows uses to facilitate respectably fast asynchronous IO.  They are typically associated with file read/writes as well as sockets.  But what are they really?

At the end of the day, the IO Completion Port is not much more than a kernel managed queue.  In fact, I’ll just call it that for simplicity, because I approached using them from a direction that really has nothing at all to do with IO, and the whole “Completion Port” part of the name, while somewhat indicative of one pattern of usage, really does just get in the way of understanding a fairly simple and powerful mechanism in Windows.

So, let’s begin at the beginning.  I want to do the following:

I have some thread that is generating “work”.  The items of work are placed on a queue to be processed by some thread.  I have several threads which spend their lives simply pulling work items out of a queue, performing some action related to the work, and doing that over and over again.

Seems simple enough.  There are few pieces to solving this puzzle:

  • threads – I need a really easy way to initiate a thread.  I’d like the body of the thread to be nothing more than script.  I don’t want to know too much about the OS, I don’t want to have to deal with locking primitives.  I just want to write some code that does it’s job, and doesn’t really know much about the outside world.
  • queues – The work gets placed into a queue.  The queue must at least deal with single writer, multiple reader, or something, but really, I don’t want to have to worry about that at all.  From the writer’s perspective, I just want to do: queue:enqueue(workItem).  From the reader’s perspective, I just want: workItem = queue:dequeue();
  • memory management – I need to be able to share pieces of data across multiple threads.  I will need to have a way to indicate ownership of the data, so that it can be safely cleaned up from any thread that has access to the data.

If I have these basic pieces in place, I’m all set.  First of all, I’ll start with the worker thread and what it will be doing. First is the definition of the data that will be used to define the work that is to be done.

The code for this example can be found here: https://github.com/Wiladams/TINN/tree/master/tests

local ffi = require("ffi");
require("WTypes");

ffi.cdef[[
typedef struct {
    HWND hwnd;
    UINT message;
    WPARAM wParam;
    LPARAM lParam;
    DWORD time;
    POINT pt;
} AMSG, *PAMSG;
]]

return {
  AMSG = ffi.typeof("AMSG");

  print = function(msg)
    msg = ffi.cast("PAMSG", ffi.cast("void *",msg));

    print(string.format("== AMSG: %d", msg.message));
  end,
}

Easy enough. This is a simple data structure that contains some fields that might be useful for certain kinds of applications. The data structure is very application specific, and can be whatever is required for your application. This one is just really easy, and familiar to a Windows UI programmer.

Now that worker thread:

local workerTmpl = [==[
local ffi = require("ffi");
local core_synch = require("core_synch_l1_2_0");
local Heap = require("Heap");
local IOCompletionPort = require("IOCompletionPort");
local TINNThread = require("TINNThread");
local MSG = require("AMSG");
local random = math.random;

local QueueHandle = ffi.cast("HANDLE", ffi.cast("intptr_t", %s));
local HeapHandle = ffi.cast("HANDLE", ffi.cast("intptr_t", %s));

local iocp, err = IOCompletionPort:init(QueueHandle);
local memman, err = Heap(HeapHandle);

while true do
  local key, bytes, overlap = iocp:dequeue();

  if not key then
    break;
  end

  MSG.print(key);
  memman:free(ffi.cast("void *",key));
end
]==];

The while loop is the working end of this bit of code. Basically, it will just spin forever, trying to pull some work out of the iocp queue. The code will ‘block’ until it actually receives something from the queue, or an error. Once it gets the data from the queue, it will simply call the ‘print’ function, and that’s the end of that piece of work.

This system assumes that the receiver of the data will be responsible for freeing this bit of memory that it’s using, so memman:free() does that task.

Where does that memman object come from? Well, as can be seen in the code, it is an instance of the Heap object, constructed using a handle that is passed in as a string value, and turned into a “HANDLE”. The same is true of the IOCompletionPort.

This is a neat little trick. Basically, the parent thread constructs these objects that will be shared with the sibling threads, and passes a pointer to them. The objects know how to construct themselves from nothing more than this pointer value, and away you go!

So, how about the main thread then?

-- test_workerthreads.lua

local ffi = require("ffi");
local core_synch = require("core_synch_l1_2_0");
local IOCompletionPort = require("IOCompletionPort");
local TINNThread = require("TINNThread");
local AMSG = require("AMSG");
local Heap = require("Heap");

local main = function()
  local memman = Heap:create();
  local threads = {};

  local iocp, err = IOCompletionPort();

  if not iocp then
    return false, err;
  end

  -- launch worker threads
  local iocphandle = TINNThread:CreatePointerString(iocp:getNativeHandle());
  local heaphandle = TINNThread:CreatePointerString(memman:getNativeHandle());
  local codechunk = string.format(workerTmpl, iocphandle, heaphandle);

  for i=1,4 do
    local worker, err = TINNThread({CodeChunk = codechunk});
    table.insert(threads, worker);
  end

  -- continuously put 'work' items into queue
  local numWorkItems = 1000;
  local counter = 0;
  local sleepInterval = 50;
  local workSize = ffi.sizeof("AMSG");

  while true do
    for i = 1,numWorkItems do
      local newWork = memman:alloc(workSize);
      ffi.cast("AMSG *",newWork).message = (counter*numWorkItems)+i;
      iocp:enqueue(newWork, workSize);
    end

    collectgarbage();
    counter = counter + 1;

    -- The worker threads should continue to get work done
    -- while we sleep for a little bit
    core_synch.Sleep(sleepInterval);
  end
end

main();

The most interesting part here might be the creation of the codechunk. A simple string.format() is used to replace the string parameters in the template with the string values for the heap handle, and the io completion port handle.

Once the codechunk is created, a thread is created that uses the codechunk as its body. This thread will start automatically, so nothing needs to be done beyond simply constructing it. In this case, 4 threads are created. This matches the number of virtual cores I have on my machine, so there’s one thread per core. This is a good number as any more would be wasteful in this particular case.

Then there’s simply a loop that constantly creates work items and places them on the queue. Notice how the work items are constructed from the heap, and then never freed. Remember, back in the worker thread, the workItem is cleaned up using memman:free(). You must do something like this because although you could use the ffi.new() to construct the bits of memory, you can’t guarantee the lifetime of such a thing across the call to enqueue. Lua might try to free up the allocated memory before the worker actually has time to do anything with it. Doing allocation using the Heap gives you full control of the lifetime of that bit of memory.

Well, there you have it. Basically a very simple work sharing multi-threaded program in LuaJIT with a minimal amount of fuss. Of course, this works out fairly well because Windows already has the IO Completion Port (queue), so it’s mainly just gluing things together correctly.

Of course, you don’t have to do it this way, true to Windows, there’s a thousand different ways you could spin up threads and deal with work sharing. One easy alternative is so called Thread Pooling. I must say though, using IOCP is a much easier and less intensive mechanism. The challenge with thread pooling is that when a piece of work becomes ready, all the threads might try to jump on the queue to get at it. This might cause a lot of context switching in the kernel, which is a performance killer. With IO Completion Port, only one of the threads will be awakened, and that’s that. Not a lot of thrashing.

This is now half way to using IO Completion Port for low CPU utilization network server. The other half is to add in the Windows Sockets, and event scheduler. Well, I already have a handy event scheduler, and the Socket class is functioning quite well.

On the way to doing this, I discovered the joys of Registered IO in Windows 8 (RIO), which is even faster for networking, but I’ll save that for another iteration.

In the meanwhile, if you want to do the lazy programmers multi-threaded programming, using IO Completion Ports is the way to go. This Lua code makes it so brain dead simple, I feel like using threads just for the heck of it now.


More File System Shenanigans

It’s really rather funny to have tools available that make otherwise challenging programming tasks really easy. I make tools to understand an area and to make it easier to prototype a solution. Just the other day, I was thinking, how can I detect when a virus is being hidden in a filestream attached to an NTFS file?

In order to understand what this attack might look like, you have to understand a bit about the NTFS file system. Basically, there is this concept of ‘streams’, which is an attachment mechanism within the NTFS file system. It stems from the earlier days when NTFS was able to read files from the Macintosh, including their “forks”. What it amounts to is you can attach anything you want to any file, whether it is an image, an executable, or what have you. You can use the same “CreateFile” api to get a handle on the attached file, as long as you know the name of the file. These stream attachments don’t normally show up in the file explorer, and simple usage of the command line “dir” command won’t show them either. If you use “dir /R “, you will get a list of the files, as well as their “alternate data streams”, which is what these attached stream things are called.

Here’s a task I wanted to perform. I want to get a list of all the streams that are attached to all of the files in my entire file system. So, first, I will attach a simple iterator to the FileSystemItem object that I’ve used previously:

ffi.cdef[[
typedef enum _STREAM_INFO_LEVELS {
    FindStreamInfoStandard,
    FindStreamInfoMaxInfoLevel
} STREAM_INFO_LEVELS;

typedef struct _WIN32_FIND_STREAM_DATA {
    LARGE_INTEGER StreamSize;
    WCHAR cStreamName[ MAX_PATH + 36 ];
} WIN32_FIND_STREAM_DATA, *PWIN32_FIND_STREAM_DATA;

HANDLE FindFirstStreamW(
    LPCWSTR lpFileName,
    STREAM_INFO_LEVELS InfoLevel,
    LPVOID lpFindStreamData,
    DWORD dwFlags);

BOOL FindNextStreamW(
    HANDLE hFindStream,
	LPVOID lpFindStreamData
);
]]

local k32Lib = ffi.load("Kernel32");


FileSystemItem.streams = function(self)
  local lpFileName = core_string.toUnicode(self:getFullPath());
  local InfoLevel = ffi.C.FindStreamInfoStandard;
  local lpFindStreamData = ffi.new("WIN32_FIND_STREAM_DATA");
  local dwFlags = 0;

  local rawHandle = k32Lib.FindFirstStreamW(lpFileName,
    InfoLevel,
    lpFindStreamData,
    dwFlags);
  
  local firstone = true;
  local fsHandle = FsFindFileHandle(rawHandle);

  local closure = function()
    if not fsHandle:isValid() then return nil; end

    if firstone then
      firstone = false;
      return core_string.toAnsi(lpFindStreamData.cStreamName);
    end
		 
    local status = k32Lib.FindNextStreamW(fsHandle.Handle, lpFindStreamData);
    if status == 0 then
      local err = errorhandling.GetLastError();
      return nil;
    end
    
    return core_string.toAnsi(lpFindStreamData.cStreamName);
  end

  return closure;
end

With this bit of code, I can do something like:

fsItem = FileSystemItem({Name="c:\\Temp\\filename.txt")
for _, streamName in ipairs(fsItem:streams()) do
  print(streamName);
end

That will get me the name of the streams that might be attached to one particular file system item, whether it be a directory or a file.

If I want to get the names of all the streams attached to all of the files in my entire file system, I would do the following:

local getFsStreams = function(fsItem)
  local res = {}

  for item in fsItem:itemsRecursive() do
    local entry = {Path=item:getFullPath()}
    local streams = {};
    for stream in item:streams() do
      table.insert(streams, {Name = stream});
    end
    if #streams > 0 then
      entry.Streams = streams;
    end

    table.insert(res, entry);
  end
  return res;
end

There aren’t actually that many unique names used as alternate streams, but if I wanted to get a list of them, I would do this:

local getUniqueStreamNames = function(fsItem)
  local items = getFsStreams(fsItem);

  local names = {}
  for _,item in ipairs(items) do  
    if item.Streams then
      for _,entry in ipairs(item.Streams) do
        if not names[entry.Name] then
          names[entry.Name] = 1;
        else
          names[entry.Name] = names[entry.Name] + 1;
        end
      end
    end
  end

  return names;
end

local test_findUniqueStreams = function(fsItem)
  local uniqueNames = getUniqueStreamNames(fsItem);

  local jsonstr = JSON.encode(uniqueNames, {indent=true});

  print(jsonstr);
end

This will return:

  ":Zone.Identifier:$DATA":3657,
  ":CA_INOCULATEIT:$DATA":1,
  ":OECustomProperty:$DATA":2,
  "::$DATA":302687,
  ":favicon:$DATA":2,
  ":encryptable:$DATA":2

That’s kind of handy and informative. I can now look at my file system and see what kinds of alternate data streams are being used on files. Having this in hand, if I want to get a list of files that have a paricular alternate stream attached to them, I can do this:

local test_findFilesWithStream = function(fsItem, streamType)
  local items = getFsStreams(fsItem);

  local res = {};
  for _, item in ipairs(items) do
    if item.Streams then
      for _,entry in ipairs(item.Streams) do
        if entry.Name == streamType then
          table.insert(res, item);
        end
      end
    end
  end

  local jsonstr = JSON.encode(res, {indent=true});
  print(jsonstr);
end

local rootName = arg[1] or "c:";
local streamType = arg[2] or ":Zone.Identifier:$DATA";
local fsItem = FileSystemItem({Name=rootName});
test_findFilesWithStream(fsItem, streamType);

That will basically list all files on my ‘c:’ drive which have an attached stream named “:Zone.Identifier:$DATA”. Of course, it’s instructive to Bing the names of the alternate data streams and see what they’re about. This is also a handy way of figuring out where those viruses are hiding attached to your files relatively unseen, ready to pounce.


Spelunking Windows – Tokens for fun and profit

I want to shutdown/restart my machine programmatically. There’s an API for that:

-- kernel32.dll
BOOL
InitiateSystemShutdownExW(
    LPWSTR lpMachineName,
    LPWSTR lpMessage,
    DWORD dwTimeout,
    BOOL bForceAppsClosed,
    BOOL bRebootAfterShutdown,
    DWORD dwReason);

Wow, it’s that easy?!!

OK. So, I need the name of the machine, some message to display in a dialog box, a timeout, force app closure, reboot or not, and some reason why the shutdown is occuring. That sounds easy enough. So, I’ll just give it a call…

local status = core_shutdown.InitiateSystemShutdownExW(
  nil,    -- nil, so local machine
  nil,    -- no special message
  10,     -- wait 10 seconds
  false,  -- don't force apps to close
  true,   -- reboot after shutdown
  ffi.C.SHTDN_REASON_MAJOR_APPLICATION);

And what do I get for my troubles?
> error: (5) ERROR_ACCESS_DENIED

Darn, now I’m going to have to read the documentation.

In the Remarks of the documentation, it plainly states:

To shut down the local computer, the calling thread must have the SE_SHUTDOWN_NAME privilege.

Yah, ok, right then. What’s a privilege? And thus Alice went down into the rabbit’s hole…

As it turns out, there are quite a few concepts in Windows that are related to identity, security, authorization, and the like. As soon as you log into your machine, even if done programmatically, you get this thing called a ‘Token’ attached to your process. The easiest way to think of the token is it’s your electronic proxy and passport. Just like your passport, this token contains some basic identity information about who you are (name, identifying marks…). Some things in the system, such as being able to access a file, can be handled simply by knowing your name. These are simple access rights. But, other things in the system require a ‘visa’, meaning, not only does the operation have to know who you are, but it also needs to know you have the proper permissions to perform the operation you’re about to perform. It’s just like getting a visa stamped into your passport. If I want to travel to India, my passport alone is not enough. I need to get a visa as well. The same is true of this token thing. It’s not enough that I simply have an identity, I must also have a “privilege” in order to perform certain operations.

In addition to having a privilege, I must actually ‘activate’ it. So, yes, the system may have granted me the privilege, but it’s like super powers, you don’t want them to always be active. It’s like when you’re walking down the street in that foreign country you’re visiting. You don’t walk down the street flashing your fancy passport showing everyone the neat visas you have stamped in there. If you do, you’ll likely get a crowd following you trying to relieve you of said passport. So, you generally keep it to yourself, and only flash it when the need arises. So too with token privilege. Yes, you might have the ability to reboot the machine, but you don’t always want to have that privilege enabled, in case some nefarious software so happens to come along to exploit that fact.

Alright, that’s enough analogizing. How about some code. Well, it can be daunting to get your head around the various APIs associated with tokens. To begin with, there is a token associated with the process you’re currently running in, and there is a token associated with every thread you may launch from within that process as well. Generally, you want the process token if you’re single threaded. That’s one API call:

BOOL
OpenProcessToken (
    HANDLE ProcessHandle,
    DWORD DesiredAccess,
    PHANDLE TokenHandle
    );

This is one of those standard API calls where you pass in a couple of parameters (ProcessHandle, DesiredAccess), and a ‘handle’ is returned (TokenHandle). You then use the ‘handle’ to make subsequent calls to the various API functions. This is ripe for wrapping up in some nice data structure to deal with it.

I’ve created the ‘Token’ object, as the convenience point. One of the functions in there is this one:

getProcessToken = function(self, DesiredAccess)
  DesiredAccess = DesiredAccess or ffi.C.TOKEN_QUERY;
	
  local ProcessHandle = core_process.GetCurrentProcess();
  local pTokenHandle = ffi.new("HANDLE [1]")
  local status  = core_process.OpenProcessToken (ProcessHandle, DesiredAccess, pTokenHandle);

  if status == 0 then
    return false, errorhandling.GetLastError();
  end

  return Token(pTokenHandle[0]);
end

One of the important things to take note of when you create a token is the DesiredAccess. What you can do with a token after it is created is somewhat determined by the access that you put into it when you create it. Here are the various options available:

static const int TOKEN_ASSIGN_PRIMARY    =(0x0001);
static const int TOKEN_DUPLICATE         =(0x0002);
static const int TOKEN_IMPERSONATE       =(0x0004);
static const int TOKEN_QUERY             =(0x0008);
static const int TOKEN_QUERY_SOURCE      =(0x0010);
static const int TOKEN_ADJUST_PRIVILEGES =(0x0020);
static const int TOKEN_ADJUST_GROUPS     =(0x0040);
static const int TOKEN_ADJUST_DEFAULT    =(0x0080);
static const int TOKEN_ADJUST_SESSIONID  =(0x0100);

For the case where we want to turn on a privilege that’s attached to the token, we will want to make sure the ‘TOKEN_ADJUST_PRIVILEGES’ access right is attached. It also does not hurt to add the ‘TOKEN_QUERY’ access as well. It’s probably best to use the least of these rights as is necessary to get the job done.

Setting a privilege on a token is another bit of work. It’s not hard, but it’s just one of those things where you have to read the docs, and look at a few samples on the internet in order to get it right. Assuming your token has the TOKEN_ADJUST_PRIVILEGES access right on it, you can do the following:

Token.enablePrivilege = function(self, privilege)
  local lpLuid, err = self:getLocalPrivilege(privilege);
  if not lpLuid then
    return false, err;
  end

  local tkp = ffi.new("TOKEN_PRIVILEGES");
  tkp.PrivilegeCount = 1;
  tkp.Privileges[0].Luid = lpLuid;
  tkp.Privileges[0].Attributes = ffi.C.SE_PRIVILEGE_ENABLED;

  local status = security_base.AdjustTokenPrivileges(self.Handle.Handle, false, tkp, 0, nil, nil);

  if status == 0 then
    return false, errorhandling.GetLastError();
  end

  return true;
end

Well, that gets into some data structures, and introduces this thing called a LUID, and that AdjustTokenPrivileges function, and… I get tired just thinking about it. Luckily, once you have this function, it’s a fairly easy task to turn a privilege on and off.

OK. So, with this little bit of code in hand, I can now do the following:

	local token = Token:getProcessToken(ffi.C.TOKEN_ADJUST_PRIVILEGES);
	token:enablePrivilege(Token.Privileges.SE_SHUTDOWN_NAME);

This just gets a token that is associated with the current process and turns on the privilege that allows us to successfully call the shutdown function.

In totality:

-- test_shutdown.lua
local ffi = require("ffi");

local core_shutdown = require("core_shutdown_l1_1_0");
local errorhandling = require("core_errorhandling_l1_1_1");
local Token = require("Token");

local function test_Shutdown()
  local token = Token:getProcessToken();
  token:enablePrivilege(Token.Privileges.SE_SHUTDOWN_NAME);
	
  local status = core_shutdown.InitiateSystemShutdownExW(nil, nil,
    10,false,true,ffi.C.SHTDN_REASON_MAJOR_APPLICATION);

  if status == 0 then
    return false, errorhandling.GetLastError();
  end

  return true;
end

print(test_Shutdown());

And finally we emerge back into the light! This will now actually work. It’s funny, when I got this to work correctly, I pointed out to my wife that my machine was rebooting without me touching it. She tried to muster a smile of support, but really, she wasn’t that impressed. But, knowing the amount of work that goes into such a simple task, I gave myself a pat on the back, and smiled inwardly at the greatness of my programming fu.

Tokens are a very powerful thing in Windows. Being able to master both the concepts, and the API calls themselves, gives you a lot of control over what happens with your machine.