Computicles – Inter-computicle communication
Posted: June 16, 2013 Filed under: Computicle, LuaJIT, Microsoft, System Programming, TINN | Tags: computicle, luajit, microsoft, multi-threaded Leave a comment »Alrighty then, so a computicle is a vessel that holds a bit of computation power. You can communicate with it, and it can communicate with others.
Most computicles do not stand as islands unto themselves, so easily communicating with them becomes very important.
Here is some code that I want to be able to run:
local Computicle = require("Computicle");
local comp = Computicle:load("comp_receivecode");
-- start by saying hello
comp:exec([[print("hello, injector")]]);
-- queue up a quit message
comp:quit();
-- wait for it all to actually go through
comp:waitForFinish();
So, what’s going on here? The first line is a standard “require” to pull in the computicle module.
Next, I create a single instance of a Computicle, running the Lua code that can be found in the file “comp_receivecode.lua”. I’ll come back to that bit of code later. Suffice to say it’s running a simple computicle that does stuff, like execute bits of code that I hand to it.
Further on, I use the Computicle I just created, and call the “exec()” function. I’m passing a string along as the only parameter. What will happen is the receiving Computicle will take that string, and execute the script from within its own context. That’s actually a pretty nifty trick I think. Just imagine, outside the world of scripting, you can create a thread in one line of code, and then inject a bit of code for that thread to execute. Hmmm, the possibilities are intriguing methinks.
The tail end of this code just posts a quit, and then finally waits for everything to finish up. Just not that the ‘quit()’ function is not the same thing as “TerminateThread()”, or “ExitThread()”. Nope, all it does is post a specific kind of message to the receiving Computicle’s queue. What the thread does with that QUIT message is up to the individual Computicle.
Let’s have a look at the code for this computicle:
local ffi = require("ffi");
-- This is a basic message pump
--
while true do
local msg = SELFICLE:getMessage();
msg = ffi.cast("ComputicleMsg *", msg);
local Message = msg.Message;
if OnMessage then
OnMessage(msg);
else
if Message == Computicle.Messages.QUIT then
break;
end
if Message == Computicle.Messages.CODE then
local len = msg.Param2;
local codePtr = ffi.cast("const char *", msg.Param1);
if codePtr ~= nil and len > 0 then
local code = ffi.string(codePtr, len);
SELFICLE:freeData(ffi.cast("void *",codePtr));
local f = loadstring(code);
f();
end
end
end
SELFICLE:freeMessage(msg);
end
It’s not too many lines. This little Computicle takes care a few scenarios.
First of all, if there so happens to be a ‘OnMessage’ function defined, it will receive the message, and the main loop will do no further processing of it.
If there is no ‘OnMessage’ function, then the message pump will handle a couple of cases. In case a ‘QUIT’ message is received, the loop will break and the thread/Computible will simply exit.
When the message == ‘CODE’ things get really interesting. The ‘Param1′ of the message contains a pointer to the actual bit of code that is intended to be executed. The ‘Param2′ contains the length of the specified code.
Through a couple of type casts, and an ffi.string() call, the code is turned into something that can be used with ‘loadstring()’, which is a standard Lua function. It will parse the string, and then when ‘f()’ is called, that string will actually be executed (within the context of the Computicle). And that’s that!
At the end, the ‘SELFICLE:freeMessage()’ is called to free up the memory used to allocate the outer message. Notice that ‘SELFICLE:freeData()’ was used to clean up the string value that was within the message itself. I have intimate knowledge of how this message was constructed, so I know this is the correct behavior. In general, if you’re going to pass data to a computicle, and you intend the Computicle to clean it up, you should use the computicle instance “allocData()” function.
OK. So, that explains how I could possibly inject some code into a Computicle for execution. That’s pretty nifty, but it looks a bit clunky. Can I do better?
I would like to be able to do the following.
comp.lowValue = 100; comp.highValue = 200;
In this case, it looks like I’m setting a value on the computicle instance, but in which thread context? Well, what will actually happen is this will get executed within the computicle instance context, and be available to any code that is within the computicle.
We already know that the ‘exec()’ function will execute a bit of code within the context of the running computicle, so the following should now be possible:
comp:exec([[print(" Low: ", lowValue)]]);
comp:exec([[print("High: ", highValue)]])
Basically, just print those values from the context of the computile. If they were in fact set, then this should print them out. If there were not in fact set, then it should print ‘nil’ for each of them. On my machine, I get the correct values, so that’s an indication that they were in fact set correctly.
How is this bit of magic achieved?
The key is the Lua ‘__newindex’ metamethod. Wha? Basically, if you have a table, and you try to set a value that does not exist, like I did with ‘lowValue’ and ‘highValue’, the ‘__newindex()’ function will be called on your table if you’ve got it setup right. Here’s the associated code of the Computicle that does exactly this.
__newindex = function(self, key, value)
local setvalue = string.format("%s = %s", key, self:datumToString(value, key));
return self:exec(setvalue);
end
That’s pretty straight forward. Just create some string that represents setting whatever value you’re trying to set, and then call ‘exec()’, which is already known to execute within the context of the thread. So, in the case where I have written “comp.lowValue = 100″, this will turn into a sting that == “lowValue == 100″, and that string will be executed, setting a global variable ‘lowValue’ == 100.
And what is this ‘datumToString()’ function? Ah yes, this is the little bit that takes various values and returns their string equivalent, ready to be injected into a running Computicle.
Computicle.datumToString = function(self, data, name)
local dtype = type(data);
local datastr = tostring(nil);
if type(data) == "cdata" then
-- If it is a cdata type that easily converts to
-- a number, then convert to a number and assign to string
if tonumber(data) then
datastr = tostring(tonumber(data));
else
-- if not easily converted to number, then just assign the pointer
datastr = string.format("TINNThread:StringToPointer(%s);",
TINNThread:PointerToString(data));
end
elseif dtype == "table" then
if getmetatable(data) == Computicle_mt then
-- package up a computicle
else
-- get a json string representation of the table
datastr = string.format("[[ %s ]]",JSON.encode(data, {indent=true}));
end
elseif dtype == "string" then
datastr = string.format("[[%s]]", data);
else
datastr = tostring(data);
end
return datastr;
end
The task is actually fairly straight forward. Given a Lua based value, turn it into a string that can be executed in another Lua state. There are of course methods in Lua which will do this, and tons of marshalling frameworks as well. But, this is a quick and dirty version that does exactly what I need.
Of particular note are the handling of cdata and table types. For cdata, some of the values, such as ‘int64_t’, I want to just convert to a number. Tables are the most interesting. This particular technique will really only work for fairly simple tables, that do not make references to other tables and the like. Basically, turn the table into a JSON string, and send that across to be rehydrated as a table.
Here’s some code that actually makes use of this.
comp.contacts = {
{first = "William", last = "Adams", phone = "(111) 555-1212"},
{first = "Bill", last = "Gates", phone = "(111) 123-4567"},
}
comp:exec([=[
print("== CONTACTS ==")
-- turn contacts back into a Lua table
local JSON = require("dkjson");
local contable = JSON.decode(contacts);
for _, person in ipairs(contable) do
--print(person);
print("== PERSON ==");
for k,v in pairs(person) do
print(k,v);
end
end
]=]);
Notice that ‘comp.contacts = …’ just assigns the created table directly. This is fine, as there are no other references to the table on ‘this’ side of the computicle, so it will be safely garbage collected after some time.
The rest of the code is using the ‘exec()’, so it is executing in the context of the computicle. It basically gets the value of the ‘contacts’ variable, and turns it back into a Lua table value, and does some regular processing on it (print out all the values).
And that’s about it. From the humble beginnings of being able to inject a bit of code to run in the context of an already running thread, to exchanging tables between two different threads with ease, Computicles make pretty short work of such a task. It all stems from the same three principles of Computicles: listen, compute, communicate. And again, not a single mutex, lock, or other visible form of synchronization. The IOCompletionPort is the single communications mechanism, and all the magic of serialized multi-threaded communication hide behind that.
Of course, ‘code injection’ is a dirty word around the computing world, so there must be a way to secure such transfers? Yah, sure, why not. I’ve been bumping around the indentity/security/authorization/authentication space recently, so surely something must be applicable here…
The Birth of Computicles
Posted: June 12, 2013 Filed under: Computicle, Core Programming, Lua, LuaJIT, Microsoft, TINN Leave a comment »I have written about the “computicle” concept a couple of times in the past:
Super Computing Simulation Sure is Simple
The Existential Program
The essence of a computicle boils down to the following three attributes.
- Ability to receive input
- Ability to perform some operations
- Ability to communicate with other computicles
I had written about memory sharing and ownership, and what mechanisms might be used to exchange data and whatnot. Now that I’ve done the IO Completion Port thing, I can finally introduce a ‘computicle’.
The idea is fairly straight forward. I would like to be able to write a piece of code that is fairly self sufficient/standalone, and have it run in its own environment without much fuss. In the context of Lua, that means, I want each computicle to have its own LuaState, and to run on its own OS thread. I want to be able to communicate with the thing, using a queue, and allow it to communicate with other computicles using the same mechanism. Of course I want everything seamlessly integrated so the programmer isn’t required to know anything about locking mechanisms, or even the fact that they are in a separate thread, or anything like that. Without further ado, here’s an example:
local Computicle = require("Computicle");
print(Computicle:compute([[print("Hello World!")]]):waitForFinish());
Tada! This little code snippet does all those things I listed above. I’ll present it in another form so it is a little more obvious what’s going on.
local Computicle = require("Computicle");
local comp = Computicle:create([[print("Hello World!")]]);
local status, err = comp:waitForFinish();
print("Finish: ", status, err);
Here, an instance of a computicle is created, and the code chunk that it is to execute is fed to it through the constructor. Of course, that is Lua code, so it could be anything. Another more interesting form might be the following:
local comp = Computicle:create([[require("helloworld.lua")]]);
What’s happening behind the scenese here is that the Computicle is creating a TINNThread to run that bit of code, which in turn is creating a separate LuaState for the same. At the same time, an IOCompletion port is also being created so that communication can occur. The ‘waitForFinish’ is a simple mechanism by which the any computicle can be waited upon. More exotic synchronization mechanisms can be brought into play where necessary, but this works just fine.
The code for the computicle is a test case to be found here.
Here’s an example of how you can spin up multiple computicles at the same time.
local Computicle = require("Computicle");
local comp1 = Computicle:compute([[print("Hello World!");]]);
local comp2 = Computicle:compute([[
for i=1,10 do
print("Counter: ", i);
end
]]);
print("Finish: ", comp1:waitForFinish());
print("Finish: ", comp2:waitForFinish());
If you run this, you see some interleaving of the two threads running at the same time. You wait for completion of both computicles before finally finishing.
How about inter-computicle communications? Well, when you construct a computicle, you can actually give it two bits of information. The first is an absolute must, and that’s the code to be executed. The second is optional, and is the set of parameters that you want to pass to the computicle. I have previously written about the fact that communicating bits of data between threads requires a well thought out strategy with respect to memory ownership, and I showed how the IO Completion Port can be used as a queue to achieve this. Well, computicles just use that mechanism. So, you can do the following:
local Computicle = require("Computicle");
local comp2 = Computicle:create([[
local ffi = require("ffi");
while true do
local msg = SELFICLE:getMessage();
msg = ffi.cast("ComputicleMsg *", msg);
print(msg.Message*10);
SELFICLE.Heap:free(msg);
end
]]);
local sinkstone = comp2:getStoned();
local comp1 = Computicle:create([[
local ffi = require("ffi");
local stone = _params.sink;
stone = ffi.cast("Computicle_t *", stone);
local sinkComp = Computicle:init(stone.HeapHandle, stone.IOCPHandle, stone.ThreadHandle);
for i = 1, 10 do
sinkComp:postMessage(i);
end
]], {sink = sinkstone});
print("Finish 1: ", comp1:waitForFinish());
print("Finish 2: ", comp2:waitForFinish());
What’s going on here? Well, the computicle ‘comp2′ is acting as a sink for information. Meaning, it spends its whole time just pulling work data out of its inbuilt queue. But, there’s a bit of trickery here, so a line by line explanation is in order.
local msg = SELFICLE:getMessage();
What’s a ‘SELFICLE’? That’s the Computicle context within which the code is running. It is a global variable that exists within each and every computicle. This is the way the code within a computicle can get at various functions and variables for the environment. One of the computicle methods is ‘getMessage()’. This is of course how you can get messages out of your Computicle message queue. If you’ve done any Windows programming before, it’s very similar to doing “GetMessage()”, when you’re fiddling about with User32. I suspect the very lowest level mechanisms are probably similar.
msg = ffi.cast("ComputicleMsg *", msg);
What you get out of ‘getMessage()’ is a pointer (as expected). In order to make any sense of it, you need to cast it to the “ComputicleMsg” type, which looks like this:
typedef struct {
int32_t Message;
UINT_PTR wParam;
LONG_PTR lParam;
} ComputicleMsg;
Again, looks fairly familiar. Basically, any data structure would do, as long as your computicles agree on what it is. This data structure works because it combines a simple “message”, with a couple of pointers, so you can pass pointers to other more elaborate structures.
print(msg.Message*10);
Once we have the message cast to our message type, we can do some work. In this case, just get the ‘Message’ field, multiply it, and print it out. This is the working end of the computicle. You can put any code in here for the “work”. I can call out to other bits of code, fire of network requests, launch more computicles, whatever. It’s just Lua code running in a thread with its own LuaState, so the sky’s the limit!
SELFICLE.Heap:free(msg);
And finally, you have to manage the bit of memory that you were handed. These computicles are simple, they are sharing a common heap across all of them, so I know the bit of memory was allocated from one heap, so I can just free it.
So, that’s the consuming side. What about the sending side?
local sinkstone = comp2:getStoned();
I need to tell the second computicle about the first one. There are only two communications mechanisms available. The first is to pass whatever I want as a list of parameters when I construct the computicle. The second is to use the computicle’s queue. In this particular case, I’ll use the former method and pass the computicle as a startup parameter. In order to do that though, I can only communicate cdata. I can not pass any LuaState based bits of information, because it becomes ambiguous as to who owns the chunk of memory used, and what’s the lifetime of that chunk.
The ‘getStoned()’ function takes a computicle and creates a heap based representation of it which is appropriate for sending to another computicle. The data structure that is created looks like this:
typedef struct {
HANDLE HeapHandle;
HANDLE IOCPHandle;
HANDLE ThreadHandle;
} Computicle_t;
local comp1 = Computicle:create([[
local ffi = require("ffi");
The source Computicle is created. Keep in mind that a Computicle is an almost, but not quite, empty container waiting for you to fill with code. There are a couple of things, like the SELFICLE, that are already available, but things like other modules must be pulled in just like any other code you might write. The only modules already in place, besides standard libraries, are the TINNThread, and Computicle.
local stone = _params.sink;
stone = ffi.cast("Computicle_t *", stone);
local sinkComp = Computicle:init(stone.HeapHandle, stone.IOCPHandle, stone.ThreadHandle);
The other global variable besides 'SELFICLE' is the '_params' table. This table contains a list of parameters that you might have passed into the constructor. Some manipulation has occured to these items though. They do not contain type information, they are just raw 'void *', so you have to turn them back into whatever they were supposed to be by doing the 'ffi.cast'. Once you do that, you can use them. In this particular case, we passed the stoned state of the sink Computicle as the 'sink' paramter when the source computicle was constructed, so the code can just access that parameter and be on its way. It's easiest to think of the '_params' table as being the same as the 'arg' table that Lua has in general. I didn't reuse the 'arg' concept though for a couple of reasons. First of all, 'arg' is a simple array, not a dictionary. So, you access things using index numbers. I wanted to support named values because that's more useful in this particular usage. Second, since I'm not using it as a simple array, I didn't want to confuse matters by naming it the same thing as 'arg', so thus, '_params'.
In the above code, a computicle is being initiated using the value passed in from the sink stone. This is a common pattern. When it is desirable to construct a Computicle from scratch, the 'Computicle:create()' function is used. In this case, I don't want to create a new computicle, but rather an alias to an existing computicle. This is why I need to know its state, so that I can create this alias, and ultimately communicate with it.
Lastly, after everything is setup:
for i = 1, 10 do sinkComp:postMessage(i); end
What's going on here? Well, here's what the postMessage() function looks like:
Computicle.postMessage = function(self, msg, wParam, lParam)
-- Create a message object to send to the thread
local msgSize = ffi.sizeof("ComputicleMsg");
local newWork = self.Heap:alloc(msgSize);
newWork = ffi.cast("ComputicleMsg *", newWork);
newWork.Message = msg;
newWork.wParam = wParam or 0;
newWork.lParam = lParam or 0;
-- post it to the thread's queue
self.IOCP:enqueue(newWork);
return true;
end
Remember, the only way to communicate to a computicle is to send it a bit of shared memory by placing it into its queue. So, postMessage just takes the few parameters that you pass it, and packages them up into a chunk of memory which is ready to be sent across via that queue. On this case, we have the alias to the Computicle we want to talk to, so the message does in fact land in the proper queue.
]], {sink = sinkstone});
This final part is just how we pass the ‘sink’ stone as a parameter to the constructor.
So, there you have it. In full detail. I can construct Computicles. I can write bits of script code that run completely independently. I can communicate between Computicles in a relatively easy manner.
As a lazy programmer, I’m loving this. It allows me to conceptualize my code at a different level. I don’t have to be so much concerned with the mechanics of making low level system calls, which is typically an error prone process for me. That’s all been wrapped up and taken care of. One of the hardest aspects of multi-threaded programming (locking), is completely eliminated because of using the IO Completion Port as a simple thread-safe queue. Simple synchronization is achieved either through queued messages, or through waiting on a particular Computicle to finish its job.
This construct reminds me of DirectShow and filter graphs. Very similar in concept, but Computicles generalize the concept, and make it brain dead simple to pull off with scrict rather than ‘C’ code.
I’m also loving this because now I can go more easily from ‘boxes and arrows’ to actual working system. The ‘boxes’ are instances of computicles, and the arrows are the ‘postMessage/getMessage’ calls between them. This is a fairly under stated, but extremely powerful mechanism.
Next time around, I’ll go through the actual Computicle code itself to shed light on the mystery therein.
Signs of our times
Posted: June 11, 2013 Filed under: Musings Leave a comment »Well, how can I not say something about PRISM?
Way back in the day, at the dawn of personal computers, there was always this dance between copy protecting software, and cracking copy protected software. Sometimes the people who provided the copy protection techniques also supplied the cracking techniques as well.
XBOX One just announced over the past month, and surprise surprise, there’s a huge ‘copy protection’ component. ’Always On’. Of course, it’s reasonable to expect that if you spend millions and billions developing a platform and attendant ecosystem, you want to protect the same, to retain value as long as possible. So, there you go.
Then along comes the whole PRISM thing, whatever that is. As this story has unfolded over the past week, I keep getting flashes of yin/yang, or snakes eating their tails. PRISM seems like a perfect yang to the yin of WikiLeaks.
And then there are drones, webcams, spycams, insect cams, cell phone cams, street view cams, department store dressing room cams, and other cams. There are cameras everywhere, and they’re not all owned by “big brother”. AR Drones for about $300 give citizens the power to serveil their surroundings. Of course the military has greater capabilities, but there you have it.
We live in very interesting times, with plenty of issues to contemplate. I personally don’t believe the world will go the route of “big brother” as technology is so pervasive that we all have the ability to monitor each other. We’ll see how it all evolves.
The Lazy Programmer Multitasks – Using IOCP to get work done
Posted: June 10, 2013 Filed under: Lua, LuaJIT, Microsoft, System Programming | Tags: iocp, lua, luajit, multi tasking, thread, windows Leave a comment »I’ve written about event driven stuff, and brushed up IO Completion Ports in the past, but I’ve never really delivered the goods on these things. Well, I was wanting to up the scalability, and reduce the CPU utilization of my standard eventing model, so I finally bit the bullet and started down the path of using IO Completion ports.
First of all, IO Completion Ports is this mechanism that Windows uses to facilitate respectably fast asynchronous IO. They are typically associated with file read/writes as well as sockets. But what are they really?
At the end of the day, the IO Completion Port is not much more than a kernel managed queue. In fact, I’ll just call it that for simplicity, because I approached using them from a direction that really has nothing at all to do with IO, and the whole “Completion Port” part of the name, while somewhat indicative of one pattern of usage, really does just get in the way of understanding a fairly simple and powerful mechanism in Windows.
So, let’s begin at the beginning. I want to do the following:
I have some thread that is generating “work”. The items of work are placed on a queue to be processed by some thread. I have several threads which spend their lives simply pulling work items out of a queue, performing some action related to the work, and doing that over and over again.
Seems simple enough. There are few pieces to solving this puzzle:
- threads – I need a really easy way to initiate a thread. I’d like the body of the thread to be nothing more than script. I don’t want to know too much about the OS, I don’t want to have to deal with locking primitives. I just want to write some code that does it’s job, and doesn’t really know much about the outside world.
- queues – The work gets placed into a queue. The queue must at least deal with single writer, multiple reader, or something, but really, I don’t want to have to worry about that at all. From the writer’s perspective, I just want to do: queue:enqueue(workItem). From the reader’s perspective, I just want: workItem = queue:dequeue();
- memory management – I need to be able to share pieces of data across multiple threads. I will need to have a way to indicate ownership of the data, so that it can be safely cleaned up from any thread that has access to the data.
If I have these basic pieces in place, I’m all set. First of all, I’ll start with the worker thread and what it will be doing. First is the definition of the data that will be used to define the work that is to be done.
The code for this example can be found here: https://github.com/Wiladams/TINN/tree/master/tests
local ffi = require("ffi");
require("WTypes");
ffi.cdef[[
typedef struct {
HWND hwnd;
UINT message;
WPARAM wParam;
LPARAM lParam;
DWORD time;
POINT pt;
} AMSG, *PAMSG;
]]
return {
AMSG = ffi.typeof("AMSG");
print = function(msg)
msg = ffi.cast("PAMSG", ffi.cast("void *",msg));
print(string.format("== AMSG: %d", msg.message));
end,
}
Easy enough. This is a simple data structure that contains some fields that might be useful for certain kinds of applications. The data structure is very application specific, and can be whatever is required for your application. This one is just really easy, and familiar to a Windows UI programmer.
Now that worker thread:
local workerTmpl = [==[
local ffi = require("ffi");
local core_synch = require("core_synch_l1_2_0");
local Heap = require("Heap");
local IOCompletionPort = require("IOCompletionPort");
local TINNThread = require("TINNThread");
local MSG = require("AMSG");
local random = math.random;
local QueueHandle = ffi.cast("HANDLE", ffi.cast("intptr_t", %s));
local HeapHandle = ffi.cast("HANDLE", ffi.cast("intptr_t", %s));
local iocp, err = IOCompletionPort:init(QueueHandle);
local memman, err = Heap(HeapHandle);
while true do
local key, bytes, overlap = iocp:dequeue();
if not key then
break;
end
MSG.print(key);
memman:free(ffi.cast("void *",key));
end
]==];
The while loop is the working end of this bit of code. Basically, it will just spin forever, trying to pull some work out of the iocp queue. The code will ‘block’ until it actually receives something from the queue, or an error. Once it gets the data from the queue, it will simply call the ‘print’ function, and that’s the end of that piece of work.
This system assumes that the receiver of the data will be responsible for freeing this bit of memory that it’s using, so memman:free() does that task.
Where does that memman object come from? Well, as can be seen in the code, it is an instance of the Heap object, constructed using a handle that is passed in as a string value, and turned into a “HANDLE”. The same is true of the IOCompletionPort.
This is a neat little trick. Basically, the parent thread constructs these objects that will be shared with the sibling threads, and passes a pointer to them. The objects know how to construct themselves from nothing more than this pointer value, and away you go!
So, how about the main thread then?
-- test_workerthreads.lua
local ffi = require("ffi");
local core_synch = require("core_synch_l1_2_0");
local IOCompletionPort = require("IOCompletionPort");
local TINNThread = require("TINNThread");
local AMSG = require("AMSG");
local Heap = require("Heap");
local main = function()
local memman = Heap:create();
local threads = {};
local iocp, err = IOCompletionPort();
if not iocp then
return false, err;
end
-- launch worker threads
local iocphandle = TINNThread:CreatePointerString(iocp:getNativeHandle());
local heaphandle = TINNThread:CreatePointerString(memman:getNativeHandle());
local codechunk = string.format(workerTmpl, iocphandle, heaphandle);
for i=1,4 do
local worker, err = TINNThread({CodeChunk = codechunk});
table.insert(threads, worker);
end
-- continuously put 'work' items into queue
local numWorkItems = 1000;
local counter = 0;
local sleepInterval = 50;
local workSize = ffi.sizeof("AMSG");
while true do
for i = 1,numWorkItems do
local newWork = memman:alloc(workSize);
ffi.cast("AMSG *",newWork).message = (counter*numWorkItems)+i;
iocp:enqueue(newWork, workSize);
end
collectgarbage();
counter = counter + 1;
-- The worker threads should continue to get work done
-- while we sleep for a little bit
core_synch.Sleep(sleepInterval);
end
end
main();
The most interesting part here might be the creation of the codechunk. A simple string.format() is used to replace the string parameters in the template with the string values for the heap handle, and the io completion port handle.
Once the codechunk is created, a thread is created that uses the codechunk as its body. This thread will start automatically, so nothing needs to be done beyond simply constructing it. In this case, 4 threads are created. This matches the number of virtual cores I have on my machine, so there’s one thread per core. This is a good number as any more would be wasteful in this particular case.
Then there’s simply a loop that constantly creates work items and places them on the queue. Notice how the work items are constructed from the heap, and then never freed. Remember, back in the worker thread, the workItem is cleaned up using memman:free(). You must do something like this because although you could use the ffi.new() to construct the bits of memory, you can’t guarantee the lifetime of such a thing across the call to enqueue. Lua might try to free up the allocated memory before the worker actually has time to do anything with it. Doing allocation using the Heap gives you full control of the lifetime of that bit of memory.
Well, there you have it. Basically a very simple work sharing multi-threaded program in LuaJIT with a minimal amount of fuss. Of course, this works out fairly well because Windows already has the IO Completion Port (queue), so it’s mainly just gluing things together correctly.
Of course, you don’t have to do it this way, true to Windows, there’s a thousand different ways you could spin up threads and deal with work sharing. One easy alternative is so called Thread Pooling. I must say though, using IOCP is a much easier and less intensive mechanism. The challenge with thread pooling is that when a piece of work becomes ready, all the threads might try to jump on the queue to get at it. This might cause a lot of context switching in the kernel, which is a performance killer. With IO Completion Port, only one of the threads will be awakened, and that’s that. Not a lot of thrashing.
This is now half way to using IO Completion Port for low CPU utilization network server. The other half is to add in the Windows Sockets, and event scheduler. Well, I already have a handy event scheduler, and the Socket class is functioning quite well.
On the way to doing this, I discovered the joys of Registered IO in Windows 8 (RIO), which is even faster for networking, but I’ll save that for another iteration.
In the meanwhile, if you want to do the lazy programmers multi-threaded programming, using IO Completion Ports is the way to go. This Lua code makes it so brain dead simple, I feel like using threads just for the heck of it now.
Spelunking Windows – NTFS Change Journals
Posted: June 6, 2013 Filed under: Lua, LuaJIT, System Programming, TINN Leave a comment »Since Windows 2000, the NTFS file system has had this capability to journal most file system actions. You know, things like open file, close file, delete, move, extend, add/remove attributes, etc. Originally, this capability was known as USN Journal, based on the sequence number that is part of the implementation. I name it NTFS Change Journals just because that’s a good enough explanation. The code for this example can be found here: https://github.com/Wiladams/TINN/blob/master/tests/test_USNJournal.lua
What can you do with this? Well, I want to get a list of all the changes that have occured on my c: since the beginning of time…
local test_getJournalEntries = function(StartUsn, ReasonMask, printRoutine)
StartUsn = StartUsn or 0
ReasonMask = ReasonMask or 0xFFFFFFFF;
printRoutine = printRoutine or printJournalEntry;
-- print all the USNs
for entry in journal:entries(nil, ReasonMask) do
printRoutine(entry);
collectgarbage();
end
end
Well, that’s what I’d like to do anyway. First, I need a convenient wrapper for the Change Journal. The API for the Change Journal is wrapped up in some DeviceIoControl() function calls. This is more akin to writing against a low level driver than the more typical FindFile/Next type of API that is the core of the Windows API. For that reason, it’s useful to wrap it up. Here’s some excerpt.
ChangeJournal.open = function(self, driveLetter)
local handle, err = self:getVolumeHandle(driveLetter);
if not handle then
return false, err;
end
return self:init(handle);
end
ChangeJournal.entries = function(self, StartUsn, ReasonMask)
StartUsn = StartUsn or 0;
ReasonMask = ReasonMask or 0xFFFFFFFF;
local ReturnOnlyOnClose = false;
local Timeout = 0;
local BytesToWaitFor = 0;
local BUF_LEN = ffi.C.USN_PAGE_SIZE;
local Buffer = ffi.new("uint8_t[?]", BUF_LEN);
local ReadData = ffi.new("READ_USN_JOURNAL_DATA",
{StartUsn, ReasonMask, ReturnOnlyOnClose, Timeout, BytesToWaitFor, self.JournalID});
local dwBytes = ffi.new("DWORD[1]");
local bytesReturned = 0;
local dwRetBytes = 0;
local UsnRecord = nil;
local nextBuffUsn = StartUsn;
local closure = function()
if dwRetBytes == 0 then
ReadData.StartUsn = nextBuffUsn;
local status = core_io.DeviceIoControl( self:getNativeHandle(),
FSCTL_READ_USN_JOURNAL,
ReadData,
ffi.sizeof(ReadData),
Buffer,
BUF_LEN,
dwBytes,
nil);
if status == 0 then
local err = errorhandling.GetLastError();
return nil;
end
bytesReturned = dwBytes[0];
-- skip past the initial USN
nextBuffUsn = ffi.cast("USN *", Buffer)[0];
dwRetBytes = bytesReturned - ffi.sizeof("USN");
if dwRetBytes == 0 then
-- reached end of records
return nil;
end
-- Find the first record
UsnRecord = ffi.cast("PUSN_RECORD", ffi.cast("PUCHAR",Buffer) + ffi.sizeof("USN"));
dwRetBytes = dwRetBytes - UsnRecord.RecordLength;
return UsnRecord;
end
-- Return the next record
UsnRecord = ffi.cast("PUSN_RECORD",(ffi.cast("PCHAR",UsnRecord) + UsnRecord.RecordLength));
dwRetBytes = dwRetBytes - UsnRecord.RecordLength;
return UsnRecord;
end
return closure;
end
Great, with this minimal amount of functions in hand, I can now iterate the journal using the first bit of code.
Of course, what you can actually do with this journal information is another story. You could use it for backup/restore, virus checking, file browsing, whatever. Another thing that starts to emerge from playing around with this low level stuff is that you can really dive more deeply into the file system, at its very core, exploring MFTs and the like.
Another fun thing to do is just look at the latest changes that are occuring to the file system from this moment forward. This can be done fairly easily by simply looking for any new entries that show up.
ChangeJournal.waitForNextEntry = function(self, usn, ReasonMask)
usn = usn or self:getNextUsn();
local ReasonMask = ReasonMask or 0xFFFFFFFF;
local ReturnOnlyOnClose = false;
local Timeout = 0;
local BytesToWaitFor = 1;
local ReadData = ffi.new("READ_USN_JOURNAL_DATA", {usn, ReasonMask, ReturnOnlyOnClose, Timeout, BytesToWaitFor, self.JournalID});
local pusn = ffi.new("USN");
-- This function does not return until the USN
-- record exits
local BUF_LEN = ffi.C.USN_PAGE_SIZE;
local Buffer = ffi.new("uint8_t[?]", BUF_LEN);
local dwBytes = ffi.new("DWORD[1]");
local status = core_io.DeviceIoControl( self:getNativeHandle(),
FSCTL_READ_USN_JOURNAL,
ReadData,
ffi.sizeof(ReadData),
Buffer,
BUF_LEN,
dwBytes,
nil);
if status == 0 then
return false, errorhandling.GetLastError();
end
local UsnRecord = ffi.cast("PUSN_RECORD", ffi.cast("PUCHAR",Buffer) + ffi.sizeof("USN"));
return UsnRecord;
end
local test_waitForNextEntry = function()
local entry = journal:waitForNextEntry();
while entry do
printJournalEntry(entry);
entry = journal:waitForNextEntry();
end
end
I find it informative to run this little loop while I perform various activities on my machine. For example, when I launch a web browser, there’s a flury of activity related to opening, creating, closing, deleting various files. When I bring up a text editor, same thing. There’s the file I’m editing, which gets opened, then there’s invariably a temporary file that gets created, and possibly some addon files and the like.
It might be useful if you’re watching for viruses to see the activities of various files being opened which you don’t expect. With this tool in hand, being able to do that becomes a little bit easier.
At any rate, the Change Journal is a nice little bit of kit that has existed in the system for quite some time. There have been numerous tools built around it over the years. This little bit of code just makes it that much more approachable, at least for the common man programmer such as myself.
More File System Shenanigans
Posted: June 3, 2013 Filed under: LuaJIT, Microsoft, System Programming, TINN Leave a comment »It’s really rather funny to have tools available that make otherwise challenging programming tasks really easy. I make tools to understand an area and to make it easier to prototype a solution. Just the other day, I was thinking, how can I detect when a virus is being hidden in a filestream attached to an NTFS file?
In order to understand what this attack might look like, you have to understand a bit about the NTFS file system. Basically, there is this concept of ‘streams’, which is an attachment mechanism within the NTFS file system. It stems from the earlier days when NTFS was able to read files from the Macintosh, including their “forks”. What it amounts to is you can attach anything you want to any file, whether it is an image, an executable, or what have you. You can use the same “CreateFile” api to get a handle on the attached file, as long as you know the name of the file. These stream attachments don’t normally show up in the file explorer, and simple usage of the command line “dir” command won’t show them either. If you use “dir /R “, you will get a list of the files, as well as their “alternate data streams”, which is what these attached stream things are called.
Here’s a task I wanted to perform. I want to get a list of all the streams that are attached to all of the files in my entire file system. So, first, I will attach a simple iterator to the FileSystemItem object that I’ve used previously:
ffi.cdef[[
typedef enum _STREAM_INFO_LEVELS {
FindStreamInfoStandard,
FindStreamInfoMaxInfoLevel
} STREAM_INFO_LEVELS;
typedef struct _WIN32_FIND_STREAM_DATA {
LARGE_INTEGER StreamSize;
WCHAR cStreamName[ MAX_PATH + 36 ];
} WIN32_FIND_STREAM_DATA, *PWIN32_FIND_STREAM_DATA;
HANDLE FindFirstStreamW(
LPCWSTR lpFileName,
STREAM_INFO_LEVELS InfoLevel,
LPVOID lpFindStreamData,
DWORD dwFlags);
BOOL FindNextStreamW(
HANDLE hFindStream,
LPVOID lpFindStreamData
);
]]
local k32Lib = ffi.load("Kernel32");
FileSystemItem.streams = function(self)
local lpFileName = core_string.toUnicode(self:getFullPath());
local InfoLevel = ffi.C.FindStreamInfoStandard;
local lpFindStreamData = ffi.new("WIN32_FIND_STREAM_DATA");
local dwFlags = 0;
local rawHandle = k32Lib.FindFirstStreamW(lpFileName,
InfoLevel,
lpFindStreamData,
dwFlags);
local firstone = true;
local fsHandle = FsFindFileHandle(rawHandle);
local closure = function()
if not fsHandle:isValid() then return nil; end
if firstone then
firstone = false;
return core_string.toAnsi(lpFindStreamData.cStreamName);
end
local status = k32Lib.FindNextStreamW(fsHandle.Handle, lpFindStreamData);
if status == 0 then
local err = errorhandling.GetLastError();
return nil;
end
return core_string.toAnsi(lpFindStreamData.cStreamName);
end
return closure;
end
With this bit of code, I can do something like:
fsItem = FileSystemItem({Name="c:\\Temp\\filename.txt")
for _, streamName in ipairs(fsItem:streams()) do
print(streamName);
end
That will get me the name of the streams that might be attached to one particular file system item, whether it be a directory or a file.
If I want to get the names of all the streams attached to all of the files in my entire file system, I would do the following:
local getFsStreams = function(fsItem)
local res = {}
for item in fsItem:itemsRecursive() do
local entry = {Path=item:getFullPath()}
local streams = {};
for stream in item:streams() do
table.insert(streams, {Name = stream});
end
if #streams > 0 then
entry.Streams = streams;
end
table.insert(res, entry);
end
return res;
end
There aren’t actually that many unique names used as alternate streams, but if I wanted to get a list of them, I would do this:
local getUniqueStreamNames = function(fsItem)
local items = getFsStreams(fsItem);
local names = {}
for _,item in ipairs(items) do
if item.Streams then
for _,entry in ipairs(item.Streams) do
if not names[entry.Name] then
names[entry.Name] = 1;
else
names[entry.Name] = names[entry.Name] + 1;
end
end
end
end
return names;
end
local test_findUniqueStreams = function(fsItem)
local uniqueNames = getUniqueStreamNames(fsItem);
local jsonstr = JSON.encode(uniqueNames, {indent=true});
print(jsonstr);
end
This will return:
":Zone.Identifier:$DATA":3657, ":CA_INOCULATEIT:$DATA":1, ":OECustomProperty:$DATA":2, "::$DATA":302687, ":favicon:$DATA":2, ":encryptable:$DATA":2
That’s kind of handy and informative. I can now look at my file system and see what kinds of alternate data streams are being used on files. Having this in hand, if I want to get a list of files that have a paricular alternate stream attached to them, I can do this:
local test_findFilesWithStream = function(fsItem, streamType)
local items = getFsStreams(fsItem);
local res = {};
for _, item in ipairs(items) do
if item.Streams then
for _,entry in ipairs(item.Streams) do
if entry.Name == streamType then
table.insert(res, item);
end
end
end
end
local jsonstr = JSON.encode(res, {indent=true});
print(jsonstr);
end
local rootName = arg[1] or "c:";
local streamType = arg[2] or ":Zone.Identifier:$DATA";
local fsItem = FileSystemItem({Name=rootName});
test_findFilesWithStream(fsItem, streamType);
That will basically list all files on my ‘c:’ drive which have an attached stream named “:Zone.Identifier:$DATA”. Of course, it’s instructive to Bing the names of the alternate data streams and see what they’re about. This is also a handy way of figuring out where those viruses are hiding attached to your files relatively unseen, ready to pounce.
Faster Filesystem Finding
Posted: May 29, 2013 Filed under: LuaJIT, System Programming, TINN Leave a comment »While spelunking the filesystem, I created a simple iterator that would print out the items at each level while traversing the depths of the file system. I’ve since added a new generalization of the technique which works out even better for those cases where you don’t just want to print, but you want to perhaps gather and later process.
The key to this new algorithm is that I don’t do recursion. I could not work out how I would both do recursion and create an iterator, so I use a stack. This particular exercise has historical significance for me as it was one of the interview questions I received when I first interviewed at Microsoft oh so many years ago. At the time the question was, ‘show code for traversing the DOM in depth, without using recursion’.
FileSystemItem.itemsRecursive = function(self)
local stack = Collections.Stack();
local itemIter = self:items();
local closure = function()
while true do
local anItem = itemIter();
if anItem then
if (anItem.Name ~= ".") and (anItem.Name ~= "..") then
if anItem:isDirectory() then
stack:push(itemIter);
itemIter = anItem:items();
end
return anItem;
end
else
itemIter = stack:pop();
if not itemIter then
return nil;
end
end
end
end
return closure;
end
With that little mechanism in place, printing all the files in my system becomes as easy as this:
local printFileItems = function(startat, filterfunc)
for item in startat:itemsRecursive() do
if filterfunc then
if filterfunc(item) then
print(item:getFullPath());
end
else
print(item:getFullPath());
end
end
end
printFileItems(FileSystemItem({Name=arg[1]}));
That’s all fine and good. Of course, these days, ‘data’ is not useful unless it can be consumed as html or JSON, so here’s another view of file system, doing a simple html wrapper.
local printHtml = function(pattern, filterfunc)
local fs = FileSystemItem({Name=pattern});
io.write[[
<html>
<head>
<title>File Directory</title>
</head>
<body>
<ul>
]]
for item in fs:items() do
local goone = true;
if filterfunc then
if not filterfunc(item) then
goone = false;
end
end
local url = item:getFullPath();
if goone then
io.write([[<li><a href="]]..url..[[">]]..item.Name..[[</a></li>]]);
io.write('\n');
end
end
io.write[[
</ul>
</body>
</html>
]]
end
local nodotdot = function(item)
return item.Name ~= "." and item.Name ~= "..";
end
printHtml("c:\\tools", nodotdot);
I’m dumping what’s in my c:\tools directory, ignoring the case of ‘.’ and ‘..’. That’s a pretty funny mixing of the Lua and html code. Of course the ‘io.write’ could just as easily be replaced with ‘response:write’, and you’d be generating dynamic content directly to the web.
At any rate, having the simple iterative, non-recursive case of file exploration available makes doing file filtering a little bit more interesting, while maintaining a fairly low resource footprint.