Taming the Leap Motion
Posted: March 18, 2013 Filed under: Leap Motion, Lua, LuaJIT, TINN Leave a comment »I have been playing with the Leap Motion input device of late. If you haven’t seen one of these, it’s worth taking a look at the video demonstrations. The Leap joins the likes of the Kinect in terms of offering an alternative form of input than the standard mouse and keyboard. But, that’s about where the similarities end.
Whereas the Kinect is good at large body movements of multiple players at a distance, it’s not so great at fine hand tracking at a close distance. This is the specialty of the Leap Motion. You can point at your screen, move your hands around, tap your fingers and the like.
It’s kind of like turning every monitor into a touch screen, except it provides a larger volume than the flat surface of the screen.
The Leap Motion comes with a SDK for developers to write whatever it is they’re going to write with this new input device. The core of the SDK is a library written in C++. That’s great for languages such as Python, Java, C# and the like which can use SWIG to turn the header file into something they can interop to. This approach might work for Lua as well, but C++ is just a pain to deal with from these dynamic languages.
I want to play with the Leap Motion, but I don’t want to do it through the supplied SDK as C++ just doesn’t work for me. Fortunately, the Leap Motion provides a different mechanism. The Leap Motion offers up a WebSocket server interface. So, you can ‘talk’ to it by simply opening up a websocket and start streaming the results.
Alrighty then, how to get started?
Well, First of all the url for the device is: ”ws://127.0.0.1:6437/”
I have a hacked together WebSocket object that’s good enough to establish a connection to this service endpoint. So, I do that, and encapsulate the connection in a LeapInterface_t object. This interface serves out a constant stream of JSON formatted data. To make things simple for Lua consumption, I put an iterator interface on this:
LeapInterface_t.RawFrames = function(self)
local closure = function()
local frame, err = self.SocketStream:ReadFrame();
if not frame then
return nil, err
end
return frame;
end
return closure;
end
With this alone, you could do something like:
local leap = LeapInterface() for rawframe in leap:RawFrames() do print(rawframe); end
That’s pretty much all you need to see the stream of frames coming out of the Leap Motion. But, that’s not particularly useful. There’s a ton of information you can get out of this device. You can get hand tracking, finger and ‘tool’ tracking, and even ‘gestures’. It’s almost too much information to make sense of. So, a little bit of organization is in order.
First, I create an object that will be the central focus of all things Leap Motion. This is the LeapScape object. I chose this name because I conceive of the area the Leap Motion can watch and report on as the LeapScape. There can be hands, pointables, gestures, and the like. I need something to control and coordinate all these things. It looks like this:
local Collections = require("Collections");
local LeapInterface = require("LeapInterface");
local JSON = require("dkjson");
local LeapScape_t = {}
local LeapScape_mt = {
__index = LeapScape_t,
}
local LeapScape = function()
local interface, err = LeapInterface({enableGestures=true});
if not interface then
return nil, err;
end
local obj = {
Interface = interface;
Hands = {};
Pointables = {};
Frames = {};
FrameQueue = Collections.Queue.new();
ContinueRunning = false;
};
setmetatable(obj, LeapScape_mt);
return obj;
end
LeapScape_t.Start = function(self)
self.ContinueRunning = true;
Runtime.Scheduler:Spawn(LeapScape_t.ProcessFrames, self)
Runtime.Scheduler:Spawn(LeapScape_t.ProcessRawFrames, self);
end
LeapScape_t.ProcessRawFrames = function(self)
for rawframe in self.Interface:RawFrames() do
local frame = JSON.decode(ffi.string(rawframe.Data, rawframe.DataLength));
self.FrameQueue:Enqueue(frame);
coroutine.yield();
end
end
LeapScape_t.ProcessFrames = function(self, frame)
while self.ContinueRunning do
-- get a frame off the queue
local frame = self.FrameQueue:Dequeue()
if frame then
if frame.gestures then
for _, gesture in ipairs(frame.gestures) do
self:OnGesture(gesture);
end
end
end
coroutine.yield();
end
end
LeapScape_t.OnGesture = function(self, gesture)
if self.GestureHandler then
self.GestureHandler:OnGesture(gesture);
end
end
Another piece that focuses on gesture handling specifically is the “GestureHandler” class. This is fairly straight forward as well. It will take a “gesture”, which is just a decoded JSON string, and perform various actions based on what it reads there. One of the things it does is provide some order to the input. For example, it’s totally possible that while you’re processing a ‘swipe’, you get some intervening ‘screenTap’ and ‘keyTap’ gestures, which may not be desirable. So, the GestureHandler will create some order by only allowing sweep update and ‘end’ gestures to filter through, until the sweep is completed.
GestureHandler_t = {}
GestureHandler_mt = {
__index = GestureHandler_t;
}
GestureHandler = function()
local obj = {
CurrentGesture = "none";
}
setmetatable(obj, GestureHandler_mt);
return obj;
end
GestureHandler_t.OnGesture = function(self, gesture)
--print("==== GESTURE ====")
--print("type: ", gesture.type, gesture.state);
if gesture.type == "screenTap" then
self:HandleScreenTap(gesture)
elseif gesture.type == "keyTap" then
self:HandleKeyTap(gesture)
elseif gesture.type == "swipe" then
self:HandleSwipe(gesture);
elseif gesture.type == "circle" then
self:HandleCircle(gesture);
end
end
GestureHandler_t.HandleScreenTap = function(self, gesture)
if self.CurrentGesture == "none" then
if self.OnScreenTap then
self.OnScreenTap(gesture);
end
end
end
GestureHandler_t.HandleKeyTap = function(self, gesture)
if self.CurrentGesture == "none" then
if self.OnKeyTap then
self.OnKeyTap(gesture);
end
end
end
GestureHandler_t.HandleCircle = function(self, gesture)
if not (self.OnCircleBegin or self.OnCircling or self.OnCircleEnd) then
return
end
if self.CurrentGesture == "circle" then
if gesture.state == "stop" then
if self.OnCircleEnd then
self.OnCircleEnd(gesture);
end
self.CurrentGesture = "none";
elseif gesture.state == "update" then
if self.OnCircling then
self.OnCircling(gesture)
end
end
elseif self.CurrentGesture == "none" then
self.CurrentGesture = "circle";
if self.OnCircleBegin then
self.OnCircleBegin(gesture)
end
end
end
GestureHandler_t.HandleSwipe = function(self, gesture)
if not (self.OnSwipeBegin or self.OnSwiping or self.OnSwipeEnd) then
return
end
if self.CurrentGesture == "swipe" then
if gesture.state == "stop" then
if self.OnSwipeEnd then
self.OnSwipeEnd(gesture);
end
self.CurrentGesture = "none";
elseif gesture.state == "update" then
if self.OnSwiping then
self.OnSwiping(gesture)
end
end
elseif self.CurrentGesture == "none" then
self.CurrentGesture = "swipe";
if self.OnSwipeBegin then
self.OnSwipeBegin(gesture)
end
end
end
return GestureHandler
OK. So, there’s the lowest level WebSocket, the subsequent LeapInterface, the enclosing LeapScape, and finally the Gesture Handler. Pulling it all together in a little demo program, you get:
local Runtime = require("Runtime");
local LeapScape = require ("LeapScape");
local GestureHandler = require("GestureHandler");
local printDict = function(dict)
for k,v in pairs(dict) do
print(k,v)
end
end
local OnSwipeBegin = function(gesture)
local p = gesture.position;
print("============")
print("SWIPE BEGIN: ", p[1], p[2], p[3])
end
local OnSwipeEnd = function(gesture)
local p = gesture.position;
local d = gesture.direction;
print("SWIPE END: ", p[1], p[2], p[3]);
print("Direction: ", d[1], d[2], d[3]);
print("Speed: ", gesture.speed);
end
local OnCircleBegin = function(gesture)
local p = gesture.position;
print("============")
print("CIRCLE BEGIN: ")
end
local OnCircling = function(gesture)
local n = gesture.normal;
local direction = "ccw";
if n[1] <0 and n[3] < 0 then
direction = "cw"
end
print(string.format("CIRCLING: %f %s", gesture.progress, direction));
--printDict(gesture);
end
local OnCircleEnd = function(gesture)
local c = gesture.center;
print(string.format("CIRCLE END: %f [%f, %f, %f]", gesture.radius, c[1], c[2], c[3]));
end
local OnScreenTap = function(gesture)
local p = gesture.position;
print("SCREEN TAP: ", p[1], p[2], p[3]);
end
local OnKeyTap = function(gesture)
local p = gesture.position;
print("KEY TAP: ", p[1], p[2], p[3]);
end
local main = function()
local scape = LeapScape();
local ghandler = GestureHandler();
-- Swipes
--ghandler.OnSwipeEnd = OnSwipeEnd;
--ghandler.OnSwipeBegin = OnSwipeBegin;
-- Circles
ghandler.OnCircleBegin = OnCircleBegin;
ghandler.OnCircling = OnCircling;
ghandler.OnCircleEnd = OnCircleEnd;
-- Taps
--ghandler.OnKeyTap = OnKeyTap;
--ghandler.OnScreenTap = OnScreenTap;
scape.GestureHandler = ghandler;
spawn(scape.Start, scape)
end
run(main);
In this particular case, I’m just tracking the ‘circle’ gesture. If you were doing a real application, you would probably tie this circle gesture to something meaningful in the application, like turning a knob perhaps, or rotating something one way or the other. Notice that within ‘main’, at the very end, the scape.Start routine is spawned. That means that it will run in parallel to any other spawned fibers running in the application, including things like network communications and UI.
The Leap Motion is a nifty device. It presents quite a lot of information to the programmer. Having a fairly simple websocket based interface available makes it relatively easy to do programming with it. I did not require a ‘C’ interface in the end, but just did whatever I wanted with the raw information provided.
I’m sure there will be some very interesting applications created with this device. Using TINN, this becomes a snap as the code here is a TINNSnip located here: leaper
TINN Is Not Node
Posted: March 15, 2013 Filed under: Lua, LuaJIT, TINN | Tags: lua, tinn Leave a comment »TINN Is Not Node
I had other meanings for this acronym, but I like this one.
What is it then? Well, I often post snippets of code to my blog, and otherwise want to show someone how to do something. TINN is an encapsulation of a minimal environment which can quickly run interesting snippets of code.
TINN is unapologetic in its Windows only support. Although I have various Linux and Apple machines laying about the house, my primary development platform is Windows, and I want to get the most out of it without having to compromise for cross platform compatibility.
So, what’s in the box? Well, tinn.exe contains a few things.
- LuaJIT compiler
- LPeg Module
- ZLib
- Some LAPHLibs snippets
- Http Stack
- Win32 Interop – GDI32, User32, Kernel32
What can you do with it?
The primary project is here: TINN
Some Snippets of Example code are here: TINNSnips
Since TINN is a Lua runtime, you can run any typical Lua script as normal:
– hello.lua
print("Hello, World!");
You could of course also create a fairly rudimentary static web page server as well:
local WebApp = require("WebApp")
local HttpRequest = require "HttpRequest"
local HttpResponse = require "HttpResponse"
local URL = require("url");
local StaticService = require("StaticService");
local HandleSingleRequest = function(stream, pendingqueue)
local request, err = HttpRequest.Parse(stream);
local urlparts = URL.parse(request.Resource)
local filename = './wwwroot'..urlparts.path;
local response = HttpResponse.Open(stream);
StaticService.SendFile(filename, response)
-- recycle the stream in case a new request comes
-- in on it.
return pendingqueue:Enqueue(stream)
end
--[[ Configure and start the service ]]
Runtime = WebApp({port = 8080, backlog=100})
Runtime:Run(HandleSingleRequest);
This is the full extent of the service. By using the WebApp construct, all the connection handling is done for you. Every time there is a new http request, your routine will be called. If you’re used to using node.js, you might find a few familiar things, like async sockets by default, but you’ll also find things to be a bit different, like the fact that HttpRequest and HttpResponse are totally separable from the streams they may be dealing with.
Another strength for networking is that although the http protocol is strongly supported, there’s nothing that prevents you from just using lower level things like sockets directly, so you can support your own protocols. Having low level bit handling routines built in and easily exposed makes it relatively easy to deal with protocols, and various binary packings of data.
On the more Windows specific side, here is a typical “Game” window. Basically, a timer driven environment that also deals with mouse and keyboard events:
local GameWindow = require "GameWindow"
local GDI32 = require "GDI32"
local StopWatch = require "StopWatch"
function mouseinteraction(msg, wparam, lparam)
print(string.format("Mouse: 0x%x", msg))
end
function keyboardinteraction(msg, wparam, lparam)
print(string.format("Keyboard: 0x%x", msg))
end
function randomrect(win)
local width = math.random(2,40)
local height = math.random(2,40)
local x = math.random(0,win.Width-1-width)
local y = math.random(0, win.Height-1-height)
local right = x + width
local bottom = y + height
local brushColor = RGB(math.random(0,255),math.random(0,255),math.random(0,255))
win.GDIContext:SetDCBrushColor(brushColor)
win.GDIContext:RoundRect(x, y, right, bottom, 0, 0)
end
local sw = StopWatch.new();
function ontick(win, tickCount)
local black = RGB(0,0,0)
win.GDIContext:SetDCPenColor(black)
for i=1,win.FrameRate do
randomrect(win)
end
local stats = string.format("Seconds: %f Frame: %d FPS: %f",
sw:Seconds(), tickCount, tickCount/sw:Seconds())
win.GDIContext:Text(stats)
end
local appwin = GameWindow({
Title = "Game Window",
KeyboardInteractor = keyboardinteraction,
MouseInteractor = mouseinteraction,
FrameRate = 120,
OnTickDelegate = ontick,
Extent = {1024,768},
})
appwin:Run()
Of course, I can combine things like UI with networking, as in the screenshare snip.
As a multi-tool, how does this fit in with my typical dev toolchain? Well, I was recently trying to prototype a WebSocket implementation. Having the bit twiddling capabilities of TINN helped me to make quick work of that prototype. Same thing for being able to read/write .bmp files. Then I wanted to try out a multi-connected http server, that could scale to a few hundred or thousand concurrent connections. TINN could handle all of these tasks fairly easily.
The real beauty though comes from being rooted in LuaJIT. LuaJIT is THE best interop mechanism for native code libraries. I have seen nothing more efficient than LuaJIT’s ability to fairly easily just take a C header file and give you scriptable access to the represented library. For Windows programming, this is a tremendous boon. Now, when I want to try out some new interface, such as Kinect, I can just wrap the header, and start programming. With other solutions, I have to actually write a fair amount of C glue code before the interop will work. Depending on the complexity of the library, that might be a fair amount of error prone code. Of course, LuaJIT allows you to write your interop layer that way as well, if what you get through the ffi mechanism just doesn’t cut it for you (interop to C++ for example).
So, TINN is a nice multi-tool. You can use it to prototype stuff, you can use it to put things into production directly if you like. Since the code is Lua, you could even convert it to JavaScript fairly easily if you want. At any rate, now that I have this multi-tool, and it’s readily available as a compact standalone executable, all my future examples will use TINN as the driver.
Screen Capture for Fun and Profit
Posted: March 12, 2013 Filed under: Lua, LuaJIT, Network Programming, System Programming | Tags: http server, lua, network, screen capture 5 Comments »In Screen Sharing from a Browser I wrote about how relatively easy it is to display a continuous snapshot of a remote screen, and even send mouse and keyboard events back to it. That was the essence of modern day browser based screen sharing. Everything else is about compression for bandwidth management.
In this article, I’ll present the “server” side of the equation. Since I’ve discovered the ‘sourcecode’ bracket in WordPress, I can even present the code with line numbers. So, here in its entirety is the server side:
local ffi = require "ffi"
local WebApp = require("WebApp")
local HttpRequest = require "HttpRequest"
local HttpResponse = require "HTTPResponse"
local URL = require("url")
local StaticService = require("StaticService")
local GDI32 = require ("GDI32")
local User32 = require ("User32")
local BinaryStream = require("core.BinaryStream")
local MemoryStream = require("core.MemoryStream")
local WebSocketStream = require("WebSocketStream")
local Network = require("Network")
local utils = require("utils")
local zlib = require ("zlib")
local UIOSimulator = require("UIOSimulator")
--[[
Application Variables
--]]
local ScreenWidth = User32.GetSystemMetrics(User32.FFI.CXSCREEN);
local ScreenHeight = User32.GetSystemMetrics(User32.FFI.CYSCREEN);
local captureWidth = ScreenWidth;
local captureHeight = ScreenHeight;
local ImageWidth = captureWidth;
local ImageHeight = captureHeight;
local ImageBitCount = 16;
local hbmScreen = GDIDIBSection(ImageWidth, ImageHeight, ImageBitCount);
local hdcScreen = GDI32.CreateDCForDefaultDisplay();
local net = Network();
--[[
Application Functions
--]]
function captureScreen(nWidthSrc, nHeightSrc, nXOriginSrc, nYOriginSrc)
nXOriginSrc = nXOriginSrc or 0;
nYOriginSrc = nYOriginSrc or 0;
-- Copy some of the screen into a
-- bitmap that is selected into a compatible DC.
local ROP = GDI32.FFI.SRCCOPY;
local nXOriginDest = 0;
local nYOriginDest = 0;
local nWidthDest = ImageWidth;
local nHeightDest = ImageHeight;
local nWidthSrc = nWidthSrc;
local nHeightSrc = nHeightSrc;
GDI32.Lib.StretchBlt(hbmScreen.hDC.Handle,
nXOriginDest,nYOriginDest,nWidthDest,nHeightDest,
hdcScreen.Handle,
nXOriginSrc,nYOriginSrc,nWidthSrc,nHeightSrc,
ROP);
hbmScreen.hDC:Flush();
end
-- Serve the screen up as a bitmap image (.bmp)
local getContentSize = function(width, height, bitcount, alignment)
alignment = alignment or 4
local rowsize = GDI32.GetAlignedByteCount(width, bitcount, alignment);
local pixelarraysize = rowsize * math.abs(height);
local filesize = 54+pixelarraysize;
local pixeloffset = 54;
return filesize;
end
local filesize = getContentSize(ImageWidth, ImageHeight, ImageBitCount);
local memstream = MemoryStream.new(filesize);
local zstream = MemoryStream.new(filesize);
local writeImage = function(dibsec, memstream)
--print("printImage")
local width = dibsec.Info.bmiHeader.biWidth;
local height = dibsec.Info.bmiHeader.biHeight;
local bitcount = dibsec.Info.bmiHeader.biBitCount;
local rowsize = GDI32.GetAlignedByteCount(width, bitcount, 4);
local pixelarraysize = rowsize * math.abs(height);
local filesize = 54+pixelarraysize;
local pixeloffset = 54;
-- allocate a MemoryStream to fit the file size
local streamsize = GDI32.GetAlignedByteCount(filesize, 8, 4);
memstream:Seek(0);
local bs = BinaryStream.new(memstream);
-- Write File Header
bs:WriteByte(string.byte('B'))
bs:WriteByte(string.byte('M'))
bs:WriteInt32(filesize);
bs:WriteInt16(0);
bs:WriteInt16(0);
bs:WriteInt32(pixeloffset);
-- Bitmap information header
bs:WriteInt32(40);
bs:WriteInt32(dibsec.Info.bmiHeader.biWidth);
bs:WriteInt32(dibsec.Info.bmiHeader.biHeight);
bs:WriteInt16(dibsec.Info.bmiHeader.biPlanes);
bs:WriteInt16(dibsec.Info.bmiHeader.biBitCount);
bs:WriteInt32(dibsec.Info.bmiHeader.biCompression);
bs:WriteInt32(dibsec.Info.bmiHeader.biSizeImage);
bs:WriteInt32(dibsec.Info.bmiHeader.biXPelsPerMeter);
bs:WriteInt32(dibsec.Info.bmiHeader.biYPelsPerMeter);
bs:WriteInt32(dibsec.Info.bmiHeader.biClrUsed);
bs:WriteInt32(dibsec.Info.bmiHeader.biClrImportant);
-- Write the actual pixel data
memstream:WriteBytes(dibsec.Pixels, pixelarraysize, 0);
end
local getSingleShot = function(response, compressed)
captureScreen(captureWidth, captureHeight);
writeImage(hbmScreen, memstream);
zstream:Seek(0);
local compressedLen = ffi.new("int[1]", zstream.Length);
local err = zlib.compress(zstream.Buffer, compressedLen, memstream.Buffer, memstream:GetPosition() );
zstream.BytesWritten = compressedLen[0];
local contentlength = zstream.BytesWritten;
local headers = {
["Content-Length"] = tostring(contentlength);
["Content-Type"] = "image/bmp";
["Content-Encoding"] = "deflate";
}
response:writeHead("200", headers);
response:WritePreamble();
return response.DataStream:WriteBytes(zstream.Buffer, zstream.BytesWritten);
end
local handleUIOCommand = function(command)
local values = utils.parseparams(command)
if values["action"] == "mousemove" then
UIOSimulator.MouseMove(tonumber(values["x"]), tonumber(values["y"]))
elseif values["action"] == "mousedown" then
UIOSimulator.MouseDown(tonumber(values["x"]), tonumber(values["y"]))
elseif values["action"] == "mouseup" then
UIOSimulator.MouseUp(tonumber(values["x"]), tonumber(values["y"]))
elseif values["action"] == "keydown" then
UIOSimulator.KeyDown(tonumber(values["which"]))
elseif values["action"] == "keyup" then
UIOSimulator.KeyUp(tonumber(values["which"]))
end
end
local startupContent = nil
local handleStartupRequest = function(request, response)
-- read the entire contents
if not startupContent then
-- load the file into memory
local fs, err = io.open("viewscreen2.htm")
if not fs then
response:writeHead("500")
response:writeEnd();
return true
end
local content = fs:read("*all")
fs:close();
-- perform the substitution of values
-- assume content looks like this:
-- <!--?hostip? -->:<!--?serviceport?-->
local subs = {
["frameinterval"] = 300,
["hostip"] = net:GetLocalAddress(),
["capturewidth"] = captureWidth,
["captureheight"] = captureHeight,
["imagewidth"] = ImageWidth,
["imageheight"] = ImageHeight,
["screenwidth"] = ScreenWidth,
["screenheight"] = ScreenHeight,
["serviceport"] = Runtime.config.port,
}
startupContent = string.gsub(content, "%<%?(%a+)%?%>", subs)
end
-- send the content back to the requester
response:writeHead("200",{["Content-Type"]="text/html"})
response:writeEnd(startupContent);
return true
end
--[[
Responding to remote user input
]]--
local handleUIOSocketData = function(ws)
while true do
local bytes, bytesread = ws:ReadFrame()
if not bytes then
print("handleUIOSocketData() - END: ", err);
break
end
local command = ffi.string(bytes, bytesread);
handleUIOCommand(command);
end
end
local handleUIOSocket = function(request, response)
local ws = WebSocketStream();
ws:RespondWithServerHandshake(request, response);
Runtime.Scheduler:Spawn(handleUIOSocketData, ws);
return false;
end
--[[
Primary Service Response routine
]]--
local HandleSingleRequest = function(stream, pendingqueue)
local request, err = HttpRequest.Parse(stream);
if not request then
-- dump the stream
--print("HandleSingleRequest, Dump stream: ", err)
return
end
local urlparts = URL.parse(request.Resource)
local response = HttpResponse.Open(stream)
local success = nil;
if urlparts.path == "/uiosocket" then
success, err = handleUIOSocket(request, response)
elseif urlparts.path == "/screen.bmp" then
success, err = getSingleShot(response, true);
elseif urlparts.path == "/screen" then
success, err = handleStartupRequest(request, response)
elseif urlparts.path == "/favicon.ico" then
success, err = StaticService.SendFile("favicon.ico", response)
elseif urlparts.path == "/jquery.js" then
success, err = StaticService.SendFile("jquery.js", response)
else
response:writeHead("404");
success, err = response:writeEnd();
end
if success then
return pendingqueue:Enqueue(stream)
end
end
--[[
Start running the service
--]]
local serviceport = tonumber(arg[1]) or 8080
Runtime = WebApp({port = serviceport, backlog=100})
Runtime:Run(HandleSingleRequest);
As a ‘server’ this code is responsible for handling a couple of things. First, it needs to act as a basic http server, serving up relatively static content to get things started. When the user specifies the url http://localhost/screen, the server will respond by sending back the browser code that I showed in the previous article. The function “handleStartupRequest()” performs this operation. The file ‘viewscreen2.htm’ is HTML, but it’s a bit of a template as well. You can delimit a piece to be replaced by enclosing it in a tag such as: . This tag can be replaced by any bit of code that you choose. In this case, I’m doing replacements for the size of the image, the size of the screen, the refreshinterval, and the hostid and port. This last is most important because without it, you won’t be able to setup the websocket.
The other parts are fairly straight forward. Of particular note is the ‘captureScreen()’ function. In Windows, since the dawn of man, there has been GDI for graphics. Good ol’ GDI still has the ability to capture the screen, or a single window, or a portion of the screen. this still works in Windows 8 as well. So, capturing the screen is nothing more that drawing into a DIBSection, and that’s that. Just one line of code.
The magic happens after that. Rather than handing the raw image back to the client, I want to send it out as a compressed BMP image. I could choose PNG, or JPG, or any other format browsers are capable of handling, but BMP is the absolute easiest to deal with, even if it is the most bulky. I figure that since I’m using zlib to deflate it before sending it out, that will be somewhat helpful, and it turns out this works just fine.
The rest of the machinery there is just to deal with being an http server. A lot is hidden behind the ‘WebApp’ and the ‘WebSocket’ classes. Those are good for another discussion.
So, all in, this is about 300 lines of code. Not too bad for a rudimentary screen sharing service. Of course, there’s a supporting cast that runs into the thousands of lines of code, but I’m assuming this as a given since frameworks such as Node and various others exist.
I could explain each and every line of code here, but I think it’s small enough and easy enough to read that won’t be necessary. I will point out that there’s not much difference between sending single snapshots one at a time vs having an open stream and presenting the screen as h.264 or WebM. For that scenario, you just need a library that can capture snapshots of the screen and turn them into the properly encoded video stream. Since you have the WebSocket, it could easily be put to use for that purpose, rather than just receiving the mouse and keyboard events.
Food for thought.
Bit Twiddling Again? – How I finally came to my senses
Posted: March 9, 2013 Filed under: Lua, LuaJIT, Uncategorized Leave a comment »Right after I published my last little missive, I saw an announcement that VNC is available on the Chrome browser. Go figure…
It’s been almost a year since I wrote about stuff related to serialization: Serialization Series
Oh, what a difference a year makes! I was recently implementing code to support WebSocket client and server, so I had reason to revisit this topic. For WebSocket, the protocol specifies things at the bit level, and in bigendian order. This poses some challenges for the little-endian machines that I use. That’s some extreme bit twiddling, and although I did revise my low level BitBang code, that’s not what I’m writing about today.
I have another bit of code that just deals with bytes as the smallest element. This is the BinaryStream object. BinaryStream allows me to simply read numeric values out of a stream. It takes care of handling the big/littleendian nature of things. The BinaryStream wrapps any other stream, so you can do things like this:
local mstream = MemoryStream.new(1024); local bstream = BinaryStream.new(mstream, true); bstream:WriteInt16(0x00ff); bstream:WriteInt16(0xff00); mstream:Seek(0); print(bstream:ReadInt16()) print(bstream:ReadInt16())
This quite handy for doing all the things related to packing/unpacking bytes of memory. Of course there are plenty of libraries that do this sort of thing, but this is the one that I use.
The revelation for me this time around had to do with the nature of my implementation. In my first incarnation of these routines, I was doing byte swapping manually, like this:
function BinaryStream:ReadInt16()
-- Read two bytes
-- return nil if two bytes not read
if (self.Stream:ReadBytes(types_buffer.bytes, 2, 0) <2)
then return nil
end
-- if we don't need to do any swapping, then
-- we can just return the Int16 right away
if not self.NeedSwap then
return types_buffer.Int16;
end
local tmp = types_buffer.bytes[0]
types_buffer.bytes[0] = types_buffer.bytes[1]
types_buffer.bytes[1] = tmp
return types_buffer.Int16;
end
Well, this works, but… It’s the kind of code I would teach to someone who was new to programming, not necessarily the best, but shows all the detail.
Given Lua’s nature, I could have done the byte swapping like this:
types_buffer.bytes[0], type_buffer.bytes[1] = types_buffer.bytes[1], types_buffer.bytes[0]
Yep, yes sir, that would work. But, it’s still a bit clunky.
I have recently also been implementing some TLS related stuff, and in TLS there are 24-bit (3 byte) integers. In order to read them, I really want a generic integer reader:
function BinaryStream:ReadIntN(n)
local value = 0;
if self.BigEndian then
for i=1,n do
value = lshift(value,8) + self:ReadByte()
end
else
for i=1,n do
value = value + lshift(self:ReadByte(),8*(i-1))
end
end
return value;
end
print(bstream:ReadIntN(3))
Well, this will work if there’s 1, 2, 3, or 4 byte integers. Can’t work beyond that because the bit operations only work up to 32 bits. But, ok, that makes things a lot easier, and reduces the amount of code I have to write, and puts all the endian stuff in one place.
Then there’s 64-bit, float, and double.
In these cases, the easiest thing is to use a union structure:
ffi.cdef[[
typedef union {
int64_t Int64;
uint64_t UInt64;
float Single;
double Double;
uint8_t bytes[8];
} bstream_types_t
function BinaryStream:ReadBytesN(buff, n, reverse)
if reverse then
for i=n,1,-1 do
buff[i-1] = self:ReadByte()
end
else
for i=1,n do
buff[i-1] = self:ReadByte()
end
end
end
function BinaryStream:ReadInt64()
self:ReadBytesN(self.valunion.bytes, 8, self.NeedSwap)
return tonumber(self.valunion.Int64);
end
function BinaryStream:ReadSingle()
self:ReadBytesN(self.valunion.bytes, 4, self.NeedSwap)
return tonumber(self.valunion.Single);
end
function BinaryStream:ReadDouble()
self:ReadBytesN(self.valunion.bytes, 8, self.NeedSwap)
return tonumber(self.valunion.Double);
end
Of course, with Lua, the 64-bit int is limited to only 52 bits, but the technique will work in general. The Single and Double, you just need to get the bytes in the right order and everything is fine. Whether this is compatible with another ordering of the bytes or not depends on the other application, but at least this is self consistant.
This incarnation uses functions a lot more than the last incarnation. This is the big revelation for me. In the past, I was thinking like a ‘C’ programmer, and essentially trying to do what I would do in assembly language. Well, I realize this is not necessarily the best way to go with LuaJIT. Also, I was trying to optimize by getting stuff into a buffer, and messing around with it from there, assuming getting stuff from the underlying stream is expensive. Well, that simply might not be a good assumption, so I relaxed it.
With this newer implementation, I was able to drop 200 lines of code, out of 428. That’s a pretty good savings. This in and of itself might be worthwhile because the code will be more easily maintained, due to smaller and simpler implementation.
So, every day, I see and hear things about either my own code, or someone else’s and I try to apply what I’ve learned to my own cases. I’m happy to rewrite code, when it results in smaller tighter, more accurate coding.
And there you have it.
Poor Man’s Event Driven IO
Posted: February 23, 2013 Filed under: Lua, LuaJIT, System Programming Leave a comment »A few months ago I set out to explore the possibilities of creating a networking stack that is as performant as node.js, but does not rely on a separate library such as libuv. Why do such a thing? One is simply for the educational benefit, and the other is because although I really like node.js, I believe there is room in the world for a moral equivalent that is implemented in Lua. Now of course there already are a couple of attempts at the same, and I’ve mentioned them before. In most of the cases that I’ve seen though, they rely heavily on a library written in C to do the real heavy lifting.
At the core of the event driven model of network programming is a scheduler which generally blocks/sleeps a ‘thread’ when it would be blocked waiting on some sort of IO to be available. This is the crux of what libuv provides to the user.
Here’s a simple example of the problem:
-- somewhere in my code: local err = readsocket(sock, buff, bufflen) if err == "wouldblock" then -- ideally yield and come back here -- when input is available to be read end -- continue doing whatever I was doing
It seems so simple, you’d think you could just do:
coroutine.yield()
and call it a day. But, you can’t. Instead you’d have to cook up a loop that repeatedly checks to see if you’d still be blocking, until you’re not:
while true do
local bytesread, err = readsocket(sock, buff, bufflen)
if bytesread then
break
end
if err ~= "wouldblock" then
return nil, err
end
coroutine.yield();
end
-- continue doing whatever I was doing
This will work, but it has the unfortunate side effect of spinning the cpu relentlessly for no good reason. This is essentially polling, trying the readsocket call, and letting that determine the state of things. Ideally, it would be more efficient if the kernel, or some other mechanism told me when this socket was actually ready to be read from without blocking, then I’d only do the read when I knew it would not block. No spinning, I could spend the rest of the time slepping the process.
So, how to do this notification thing? Well, there are many mechanisms, such as select(), poll(), epoll(), WSAPoll(). I’m going to focus on the Windows supplied WSAPoll(), because it’s the same as epoll() for the most part, and that’s what people use in Linux typically. On Windows, the absolutely most efficient way to do this is to use IO completion ports, but as soon as you step into that, you’re in a world that requires multiple threads, and that’s suddenly much more complex to deal with. So, I’ll sacrifice on absolute performance for Windows, while going for the simplest implementation possible.
First of all, here’s the code in its entirety:
local ffi = require("ffi");
local bit = require("bit")
local band = bit.band
local bor = bit.bor
local WinSock = require("WinSock_Utils");
local SocketIoPool_t = {}
local SocketIoPool_mt = {
__index = SocketIoPool_t,
}
local SocketIoPool = function(capacity)
capacity = capacity or 1
if capacity < 1 then return nil end
local obj = {
fdarray = ffi.new("WSAPOLLFD[?]", capacity);
Capacity = capacity, -- How many slots are available
Handles = {},
Sockets = {},
}
setmetatable(obj, SocketIoPool_mt);
obj:ClearAllSlots();
return obj
end
SocketIoPool_t.ClearSlot = function(self, slotnumber)
if not slotnumber or slotnumber = self.Capacity then
return false
end
self.fdarray[slotnumber].fd = -1;
self.fdarray[slotnumber].events = 0;
self.fdarray[slotnumber].revents = 0;
return true
end
SocketIoPool_t.ClearAllSlots = function(self)
for i=1,self.Capacity do
self:ClearSlot(i-1);
end
end
SocketIoPool_t.GetOpenSlot = function(self)
-- traverse each of the slots in the array
for i=0, self.Capacity-1 do
if self.fdarray[i].fd 0 then
if self.fdarray[i].revents > 0 then
self.Handles[self.fdarray[i].fd].fdarray.revents = self.fdarray[i].revents;
eventqueue:Enqueue(self.Handles[self.fdarray[i].fd]);
self.fdarray[i].revents = 0;
end
end
end
return success
end
SocketIoPool_t.AddSocket = function(self, sock, events)
local slot, err = self:GetOpenSlot()
--print("AddSocket, slot: ", slot, err)
if not slot then
return nil, err
end
events = events or bor(POLLWRNORM, POLLIN);
self.fdarray[slot].fd = sock.Handle;
self.fdarray[slot].events = events;
self.fdarray[slot].revents = 0;
self.Handles[sock.Handle] = sock;
self.Sockets[sock.Handle] = slot;
return slot
end
SocketIoPool_t.RemoveSocket = function(self, sock)
-- remove the associated handle
self.Handles[sock.Handle] = nil;
-- remove the associated socket
self:ClearSlot(self.Sockets[sock.Handle]);
self.Sockets[sock.Handle] = nil;
end
SocketIoPool_t.Cycle = function(self, eventqueue, timeout)
timeout = timeout or 0
local success, err = WinSock.WSAPoll(self.fdarray, self.Capacity, timeout);
--print("Cycle: ", success, err, timeout);
if not success then
return nil, err
end
-- Go through each of the slots looking
-- for the sockets that are ready for activity
for i=0, self.Capacity-1 do
if self.fdarray[i].fd > 0 then
if self.fdarray[i].revents > 0 then
self.Handles[self.fdarray[i].fd].fdarray.revents = self.fdarray[i].revents;
eventqueue:Enqueue(self.Handles[self.fdarray[i].fd]);
self.fdarray[i].revents = 0;
end
end
end
return success
end
return SocketIoPool
The core of the whole mess relies on a single system function call, WSAPoll():
local success, err = WinSock.WSAPoll(self.fdarray, self.Capacity, timeout);
What this routine does is take an array of socket descriptors, and tells you which ones are ready for the action you’ve asked about. In most cases, you’ll register to know when it is available for reading, or writing. You’ll get errors and hangups reported for free.
The first parameter to this call is an array of structures. This structure looks like this:
typedef struct pollfd {
SOCKET fd;
int16_t events;
int16_t revents;
} WSAPOLLFD;
That is, the socket descriptor, and the events you want to ‘listen’ for. Upon return from the WSAPoll() call, each of these structures will contain the event information in the ‘revents’ field. You can set the ‘fd’ field to ‘-1′ if you’re not using this particular slot.
So, there’s a little bit of a challenge here. You can’t just go to the system and say: watchsocket(fd, events). You have to use this array of structures. So, there’s a bit of management that goes on here. The SocketIoPool code will allocate an array of structures, with a fixed capacity. It will then manage this array using AddSocket(), and RemoveSocket(). If you call AddSocket(), and there isn’t an open slot, it will return an error, and that socket will not be watched.
The Cycle() function is meant to be called from within a scheduler loop. Each time it is called, it will take the socket that is ready for io operations, and place it on the waiting queue.
Ah, now things get interesting. This is where the break from callbacks starts. You could argue that the “Enqueue()” is in fact a callback, and you’d be right. But, given that queues are so useful, and can easily support other callback models, I figured it was good to start with them from the beginning. Of course, if you wanted to change this to a simple callback instead, you could easily do that.
The relevant part of the scheduler looks like this:
EventScheduler_t.YieldForIo = function(self, sock, iotype)
--print("EventScheduler_t.YieldForIo()");
--print("-- Current Fiber: ", self.CurrentFiber);
-- Try to add the socket to the event pool
local success, err = self.IoEventPool:AddSocket(sock, iotype)
if success then
-- associate a fiber with a socket
self.EventFibers[sock.Handle] = self.CurrentFiber;
-- Keep a list of fibers that are awaiting io
self.FibersAwaitingEvent[self.CurrentFiber] = true;
else
-- failed to add socket to event pool
end
-- Whether we were successful or not in adding the socket
-- to the pool, perform a yield() so the world can move on.
yield();
end
EventScheduler_t.ProcessEventQueue = function(self, queue)
for i=1,queue:Len() do
local sock = queue:Dequeue();
if sock then
local fiber = self.EventFibers[sock.Handle];
if fiber then
self:ScheduleFiber(fiber);
self.EventFibers[sock.Handle] = nil;
self.FibersAwaitingEvent[fiber] = nil;
self.IoEventPool:RemoveSocket(sock);
else
print("EventScheduler_t.ProcessEventQueue(), No Fiber waiting to process.")
-- remove the socket from the watch list
end
else
print("EventScheduler_t.ProcessEventQueue(), No sock found in queue");
end
end
end
while self.ContinueRunning do
-- First check if there are any io events
local success, err = self.IoEventPool:Cycle(ioeventqueue, 500);
--print("Event Pool: ", success, err, ioeventqueue:Len())
if success then
if success > 0 then
-- schedule the io operations
self:ProcessEventQueue(ioeventqueue);
end
else
--print("Event Pool ERROR: ", err);
end
With this bit of kit, instead of simply calling “coroutine.yield()” whenever you get a ‘wouldblock’ error, you would call “YieldForIo()”. This will essentially take the currently running ‘fiber’ and put it into a stack, separate from the regularly running coroutines, and only resume the fiber in the case where there has been any io activity on the socket that is being referred to.
This frees up the rest of the scheduler to deal with other routines, and if there are no routines, it can simply sleep for a bit, which will make it relatively easy on the CPU.
Well, there you have it. Almost a complete system, with the core written entirely in Lua, without any external dependencies, other than the core OS. This is a good thing because it basically says, all that libuv, or other code that you’d normally rely on to perform this task, can be left behind, because you can do it all in pure lua code, which makes my life simpler.
One of the benefits of doing this all in Lua is that for much smaller devices, particularly ones that don’t have a ton of memory, nor even a multi-thread environment, I can get the benefits of a highly performant networking stack, just like on the big machines. This is fairly important I think.
I’ve been playing with this model for a while, and have quite a few examples building up around it. I’ll have to put out the all encompassing guide that pulls it all together, but for now, this is pretty much the last piece of the puzzle.
A Different Take on Network Programming
Posted: February 18, 2013 Filed under: Lua, LuaJIT, System Programming | Tags: lua, luajit, network programming, queue Leave a comment »In recent months, I’ve spent some time programming with Node. Node is great because it contains zlib, openssl, a fast javascript runtime, and supports an asynchronous eventing model. It’s fairly light weight, compact, and efficient. Node can handle thousands of concurrent connections on basic hardware. The community around Node is very large, and growing. What more could you want in life?
The one thing I don’t like about node is the programming model. This refers to the http stack primarily. The basic model is to use callbacks, and chains of callbacks, to achieve most things. You pay for the efficiency through constraints in the programming model.
Recently, I’ve been playing with different models of programming. Rather than direct callbacks, I’m using queues of commands. Queuing systems are nothing new. The basic premise is, instead of one function calling the next function, it will place an ‘alert’ or ‘event’ or some bit of information into a queue indicating that the next action needs to occur. Some other bit of code is watching the queue waiting to see if new bits of information show up, and once it does, it takes appropriate action.
Queuing systems are great because they allow for greater flexibility, and ease decoupling a tightly coupled system. One are where I use this is in the core loop where my service accepts new connections. But first, a little setup. In my system, I assume there is a system wide scheduler already available. In the lua context, this deals with cooperative coroutines. The same can be achieved in other environments, as long as you have the ability to start code running, and allow other bits of code to run relatively concurrently.
-- Setup the runtime
-- This must happen before anything else
local SimpleDispatcher = require("SimpleDispatcher");
Runtime = {
Scheduler = SimpleDispatcher();
}
local http_service = require "http_service"
local port = tonumber(arg[1]) or 8080
Runtime.Scheduler:Spawn(http_service, {port=port, backlog =10});
Runtime.Scheduler:Start();
So, this is just setting up the main routine, which is an http server. Nothing too special from the queuing perspective as yet. One of the reasons I wanted to explore this line of reasoning in the first place is to reduce the amount of code required to talk to something connected to an IP network. Node is great, and it runs almost everywhere. But, at runtime, it can take up about 16Mb minimum, and the on disk footprint is in the megabytes as well. In the “internet of things”, this is kind of hefty.
And so, here’s the http_server portion:
local Collections = require("Collections")
local NetStream = require "NetStream"
local HttpRequest = require "HttpRequest"
local HttpResponse = require "HTTPResponse"
local URL = require("url");
local CoSocketIo = require ("CoSocketIo")
local SocketUtils = require ("SocketUtils")
local StaticService = require("StaticService");
local StopWatch = require("StopWatch")
local Scheduler = Runtime.Scheduler;
local contentTemplate = [[
This is the title
Hello, World!
]]
local ServiceHandler = function(config)
local PreamblePending = Collections.Queue.new();
local HandleSingleRequest = function(stream, pendingqueue)
-- try parsing the header
-- parse the request to see what we've got
local request, err = HttpRequest.Parse(stream);
if not request then
-- dump the stream
return
end
local urlparts = URL.parse(request.Resource)
if urlparts.path == "/echo/" then
local response = HttpResponse.Open(stream)
response:writeHead("204")
response:writeEnd();
return pendingqueue:Enqueue(stream)
elseif urlparts.path == "/status/" then
local response = HttpResponse.Open(stream)
response:writeHead("200")
response:writeEnd(contentTemplate);
return pendingqueue:Enqueue(stream)
end
local filename = '.'..urlparts.path;
-- Send the file
local response = HttpResponse.Open(request.DataStream);
StaticService.SendFile(filename, response)
-- recycle the stream in case a new request comes
-- in on it.
pendingqueue:Enqueue(stream)
end
local HandlePendingRequests = function(pendingqueue)
while true do
local netstream = pendingqueue:Dequeue();
if netstream then
if netstream:IsConnected() then
Scheduler:Spawn(HandleSingleRequest, netstream, pendingqueue)
else
print("netstream disconnected")
end
end
coroutine.yield();
end
end
local HandleNewConnection = function(config, pendingqueue)
config = config or {port=8080, backlog = 10}
local port = config.port or 8080
local backlog = config.backlog or 10
local Acceptor, err = SocketUtils.CreateTcpServerSocket({port = port, backlog = backlog, nonblocking=true, nodelay=true});
if not Acceptor then
print("Exiting Acceptor: ", err)
return nil, err
end
while true do
local accepted, err = Acceptor:Accept();
if accepted then
-- create a stream to wrap the raw socket
local res, err = accepted:SetNonBlocking(true);
res, err = accepted:SetNoDelay(true);
res, err = accepted:SetKeepAlive(true, 10*1000, 500);
local netstream = NetStream.new(accepted, CoSocketIo)
pendingqueue:Enqueue(netstream);
end
if err and err ~= WSAEWOULDBLOCK then
print("EXIT MAIN LOOP: ", err)
break
end
coroutine.yield()
end
end
Scheduler:Spawn(HandlePendingRequests, PreamblePending);
Scheduler:Spawn(HandleNewConnection, config, PreamblePending);
end
return ServiceHandler
Starting from the bottom, you notice that ‘ServiceHandler’ is the return value. This matches up with the way in which the service is started.
Runtime.Scheduler:Spawn(http_service, {port=port, backlog =100});
Runtime.Scheduler:Start();
That is, basically spawn a fiber which will run this ‘main’ routine, and start the scheduler running. Great, now anything running within the code knows that it is running within a coroutine context.
At the top of the ServiceHandler routine, we find this:
local PreamblePending = Collections.Queue.new();
When you’re dealing with the http protocol, the first thing you have to do is read the preamble of a request. The preamble consists of the first line, which might be something like:
GET / HTTP/1.1
This is then followed by any number of headers, finishing with a blank line.
The PreamblePending queue is the place where streams are placed when we believe their preamble is ready to be read. This occurs in two situations. The first is once a new socket is accepted by the server. The second is after we’ve dealt with one request, and we want to recycle the stream onto this queue in case there is another request that comes in on the same socket.
The first case is dealt with by the HandleNewConnection() routine. This routine is basically a continuous while loop which is polling the Accept() method of a socket which is listening on the specified port. The Accept() routine will return a valid socket, whenever there is a new connection. Otherwise it returns nil. This is a the most CPU intensive way to do things. The more efficient mechanism would be to have this performed as an asynchronous operation, whereby the process would effectively be sleeping until there was a new connection. Much lower CPU utilization. But, I’ll get to that eventually. Right now my focus is on the fundamental decoupling.
Once the new connection is accepted, it is wrapped up in a NetStream object, and placed onto the queue. This is all this routine has to deal with. I does not make any callbacks, or do anything else fancy. Just go on waiting for the next connection.
if accepted then
-- create a stream to wrap the raw socket
local res, err = accepted:SetNonBlocking(true);
res, err = accepted:SetNoDelay(true);
res, err = accepted:SetKeepAlive(true, 10*1000, 500);
local netstream = NetStream.new(accepted, CoSocketIo)
pendingqueue:Enqueue(netstream);
end
So far so good. I like this model because each little bit is fairly simple. I can easily understand the concept of accepting a new connection and throwing that connection onto a queue to be dealt with. It’s the same as a basic factory assembly line. If I were making cars, and I’m on the part of the assembly line that places the motor, I’m not overly concerned with the part of the assembly line that puts seats in place. I just do my motor placement, and move the car along to the next station. The same applies here. Once I accept the connection, I do a little bit to condition the connection (wrapping in a stream), then I move it along to the next station by placing it into the queue.
The other routine which is running continuously is “HandlePendingRequests()”. This routine’s sole purpose in life is to pull the next stream off the queue and handle the request. When pulling from the queue, if there’s nothing on the queue, a nil will be returned. In this infinite loop, this will simply result in the coroutine.yield() being called. This is how the two routines cooperate. In Lua, coroutines are NOT preemptive threads, but rather cooperative. If a coroutine never calls ‘yield()’, then it will hog all the CPU.
So what does the new routine do once it pulls a netstream off the queue?
if netstream then
if netstream:IsConnected() then
Scheduler:Spawn(HandleSingleRequest, netstream, pendingqueue)
else
print("netstream disconnected")
end
end
First it checks to see if the stream is still connected. If it is, then it will spawn a new fiber in the scheduler. This new fiber will run the “HandleSingleRequest” routine. Notice the two parameters to that routine are the stream itself, and the pendingqueue. Rather than have the “HandleSingleRequest()” routine call a callback once it is done withe the request, it will simply place the stream back into the pendingqueue, and it will ultimately be dealt with by the HandleRequests() routine.
This little bit of indirection might seem pointless. Why couldn’t the HandleNewConnection() routine have spawned the HandleSingleRequest() directly? It could have, and I could have collapsed the responsibility of checking the pending que into the routine as well. But, with this separation, I gain flexibility in how requests can be processed, without overly complicating the architecture, and without making the code more fragile. I can easily swap out the HandleRequests() routine for something more exotic, and the HandleNewConnection() will never notice.
Finally, within the HandleSingleRequest() routine, the request is dealt with. The preamble is parsed, and the resource is returned, and that’s the end of the routine.
This is the basic skeleton of the system. The usage of queues allows for easy decoupling of concerns. Unlike in node, you aren’t forced to utilize a (request, response) callback chain. You could easily implement that style if you wanted to by composing your parts that way. But, fundamentally, the system is based on queues at the lowest level. By explicitly exposing the scheduling, and the queuing system, your system can be composed however you want, to meet your specific needs. Some polling, some notifications. This gets back to the difference between IEnumerable vs IObservable.
This is great, and I’m enjoying the freedom. You could in fact do this same thing in node, but it would be breaking with the general programming patterns that are emerging in that environment. If you want a much smaller footprint, and a more decoupled programming style, this simple queuing system might be the ticket.
Next time I’ll look at being more efficient with the lowest level socket IO so that I don’t waste so much CPU time in polling.
Lua Coroutines – Getting Started
Posted: January 30, 2013 Filed under: Lua, System Programming | Tags: cooperative, coroutine, dispatcher, lua, programming, scheduler Leave a comment »Sometimes I run across a concept that’s really exciting, and I want to learn/know/master it, but my brain just throws up block after block, preventing me from truly learning it. Lua’s coroutines are kind of like that for me. I mean, think of the prospect of using ‘light weight threads’ in your programs. Lua’s coroutines aren’t ‘threads’ at all though, at least not in the usual ‘preemptive multitasking’ sense that one might think.
Lua coroutines are like programming in Windows 1.0. Basically, cooperative multi-tasking. If any one ‘thread’ doesn’t cooperate, the whole system comes to a standstill until that one task decides to take a break. So, some amount of work is required to do things correctly.
How to get started though?
Well, first of all, I start with a simple function:
local routines = {}
routines.doOnce = function(phrase)
print(phrase)
end
routines.doOnce("Hello, World")
>Hello, World
A simple routine, that prints whatever I tell it to print. Fantastic, woot! Raise the roof up!!
Why did I go through the trouble of creating a table, and having a function, just to print? All will become clear in a bit.
Now, how about another routine? Perhaps one that does something in a loop:
routines.doLoop = function(iterations)
for i=1,iterations do
print("Iteration: ", i)
end
end
routines.doLoop(3)
>Iteration: 1
>Iteration: 2
>Iteration: 3
Fantastic! Now I have one routine that prints a single thing, and exits, and another that prints a few numbers and exits.
Now just one last one:
routines.doTextChar = function(phrase)
local nchars = #phrase
for i=1,nchars do
print(phrase:sub(i,i))
end
end
>routines.doTextChar("brown")
>b
>r
>o
>w
>n
Now, what I really want to do is to be able to run all three of these routines “at the same time”, without a lot of fuss. That is, I want to allow the doOnce() routine to run once, then I want the doTextChar(), and doLoop() routines to interleave their work, first one going, then the next.
This is where lua’s coroutines come into the picture.
First of all, I’ll take the doOnce as an example. In order to run it as a corutine, there isn’t anything that needs to change about the routine. But, you do have to start using some coroutine machinery. Instead of calling the routine directly as usual, you call it using coroutine.resume(), and before that, you have to create something to resume, what Lua knows as a ‘thread’, using coroutine.create(). Like this:
local doOnceT = coroutine.create(doOnce) coroutine.resume(doOnceT, "Hello, World");
This will have the same effect as just running the routine directly, but will run it as a coroutine. Not particularly useful in this case, but it sets us up for some further success.
The looping routines are a little bit more interesting. You could do the following:
local doLoopT = coroutine.create(doLoop) coroutine.resume(doLoopT, 3)
That will have the same output as before. But, it won’t be very cooperative. The way in which a routine signals that it’s willing to give up a little slice of it’s CPU time is by calling the ‘coroutine.yield()’ function. So, to alter the original routine slightly:
routines.doLoop = function(iterations)
for i=1,iterations do
print("Iteration: ", i)
coroutine.yield();
end
end
Now, running will do exactly the same thing, but we’re now setup to cooperate with multiple tasks.
I will alter the ‘doTextChar()’ routine in a similar way:
routines.doTextChar = function(phrase)
local nchars = #phrase
for i=1,nchars do
print(phrase:sub(i,i))
coroutine.yield();
end
end
OK, so now I have three routines, which are ready for doing some coroutine cooperation. But how to do that? The essential ingredient that is missing is a scheduler, or rather, something to take care of the mundane task of resuming each routine until it is done.
So, in comes the SimpleDispatcher:
local Collections = require "Collections"
local SimpleDispatcher_t = {}
local SimpleDispatcher_mt = {
__index = SimpleDispatcher_t
}
local SimpleDispatcher = function()
local obj = {
tasklist = Collections.Queue.new();
}
setmetatable(obj, SimpleDispatcher_mt)
return obj
end
SimpleDispatcher_t.AddTask = function(self, atask)
self.tasklist:Enqueue(atask)
end
SimpleDispatcher_t.AddRoutine = function(self, aroutine, ...)
local routine = coroutine.create(aroutine)
local task = {routine = routine, params = {...}}
self:AddTask(task)
return task
end
SimpleDispatcher_t.Run = function(self)
while self.tasklist:Len() > 0 do
local task = self.tasklist:Dequeue()
if not task then
break
end
if coroutine.status(task.routine) ~= "dead" then
local status, values = coroutine.resume(task.routine, unpack(task.params));
if coroutine.status(task.routine) ~= "dead" then
self:AddTask(task)
else
print("TASK FINISHED")
end
else
print("DROPPING TASK")
end
end
end
return SimpleDispatcher
Before explaining much, here is how I would use it:
local runner = SimpleDispatcher(); runner:AddRoutine(routines.doOnce, "Hello, World"); runner:AddRoutine(routines.doLoop, 3) runner:AddRoutine(routines.doTextChar, "The brown") runner:Run();
Basically, create an instance of this simple dispatcher.
Then, add some routines to it for execution. AddRoutine() does not actually start the routines going, it just queues them up to be run using coroutine.resume().
Lastly, call ‘runner:Run’, which will start a loop going, resulting in each of the tasks being called one after the other. This will result in the following output:
>Hello, World >1 >T >2 >h >3 >e > >b >r >o >w >n
Basically, each time a routine calls ‘coroutine.yield()’, it’s giving up for a bit, and allowing the dispatcher to call the next routine. When it comes back around to calling the routine that gave up a slice of CPU time, it will resume from wherever the ‘yield()’ was called. And that’s the really part of coroutines!
I find the easiest way to think about coroutines is to simply code something, as if it were going to be an independent ‘thread’. Then I throw in a few choice yield() calls here and there to ensure the thread cooperates with other threads that might be running.
There are quite exotic things you can do with coroutines, and all sorts of frameworks to make working with them better/easier, and perhaps more mysterious.
For my simplistic pea brain though, this is a good place to start. Just taking simple routines, and handing them to the dispatcher.
That dispatcher is quite a nice little piece of work. If you think about it, it’s the equivalent of the scheduler within any OS. Assuming your Lua code is running in a single threaded process, being able to do your own scheduling is quite a powerful thing. You can add all sorts of exotics like aging, and thread priority. You can add timers, and I/O event or what have you. The beauty is, the scheduler can become as complex as you care to make it, and your coroutine code does not have to do anything special to adapt to it, just like with thread code in the OS proper.
So, there you have it. Getting started with Lua coroutines.
Now, if only I could combine socket pooling and this Dispatcher in some meaningful way. I wonder…
Managing Socket Pools
Posted: January 27, 2013 Filed under: Lua, System Programming | Tags: socket, tcp/ip Leave a comment »There are many times when I need to manage a set of network connections. One situation is when I’m the ‘client’ side of a connection. Ideally, I would have a pool of connections ready to go at a moment’s notice, because TCP/IP startup times can take a while. Additionally, I’d like to reuse connections that have already been established, for the same reason. So, what to do.
I have this SocketPool construct, which manages a set of network streams (tcp/ip connections). The code looks like this:
local Collections = require "Collections"
local NetStream = require "NetStream"
local SocketPool_t = {}
local SocketPool_mt = {
__index = SocketPool_t;
}
local SocketPool = function(params)
params = params or {host="localhost", port=80, reserve=2, timeout=60*2}
local obj = {
Connections = Collections.Queue.new();
Hostname = params.host or "localhost";
Port = tonumber(params.port) or 80;
Reserve = params.reserve or 2;
Timeout = params.timeout,
}
setmetatable(obj, SocketPool_mt);
obj:Cycle() -- Reserve initial set of connections
return obj
end
function SocketPool_t:CreateNewConnection()
local connection, err = NetStream.Open(self.Hostname, self.Port)
if not connection then
return nil, err
end
connection:SetIdleInterval(self.Timeout);
self:AddConnection(connection);
return connection
end
function SocketPool_t:CleanoutSockets()
local qLen = self.Connections:Len()
-- Check each connection to see if it's still connected
-- and if it has run past its idle time
while qLen > 0 do
local connection = self.Connections:Dequeue()
self:AddConnection(connection)
qLen = qLen - 1
end
end
function SocketPool_t:Cycle()
self:CleanoutSockets();
-- Fill back up to the reserve level
while self.Connections:Len() < self.Reserve do
local connection, err = self:CreateNewConnection()
if not connection then
return false, err
end
end
return true
end
function SocketPool_t:AddConnection(connection)
if not connection:IsConnected() or connection:IsIdle() then
return false
end
self.Connections:Enqueue(connection)
return true
end
function SocketPool_t:GetConnection()
self:Cycle();
return self.Connections:Dequeue()
end
return SocketPool
Well, that’s a bit of a mouthful, but really it’s fairly straight forward. The usage is like this:
local sockpool = SocketPool({host="www.google.com", port=80, reserve=2, timeout=120})
while running do
local conn = sockpool:GetConnection();
conn:Send("Hello, World");
sockpool:AddConnection(conn);
end
Basically, create an instance of a socket pool, connecting to ‘www.google.com:80′. Each time through the loop, I get a connection from the pool, do something with it, and return the connection back to the pool.
This is really convenient because the pool will take care of getting rid of the socket if it has already been closed, by either end, and creating new sockets when needed. In this particular case, I have setup the pool such that I will always have 2 sockets in reserve. That means that as long as I’m asking for one at a time, I’m always going to get a socket that’s already connected and ready to go.
I have put in a timeout of two minutes (120 seconds). That way, when I go to get a new socket, the pool will first get rid of those sockets that have been sitting around for too long. If it has fallen below the reserve mark (2), then it will refill with fresh new connections. The timeout is very useful to have because when you’re using tcp/ip based sockets out on the internet, there are numerous reasons why having a stale socket sitting around is not a good thing. So, dumping them out of your usage pool with some frequency ensures that what remains is fresh and will not suffer from various ailments such as being silently closed by some intervening router.
I’ve used a couple of things here like the NetStream, and the Queue, which aren’t standard fare, but you can get the general idea.
There is one little item that I found to be challening to really do correctly, on Win32 at least. The function: connection:IsConnected() is supposed to determine whether a tcp/ip socket is currently connected or not. On Windows, this is a surprisingly non-obvious thing to determine.
Here’s the socket code for it:
GetConnectionTime = function(self)
local poptvalue = ffi.new('int[1]')
local poptsize = ffi.new('int[1]',ffi.sizeof('int'))
local size = ffi.sizeof('int')
local success, err = WinSock.getsockopt(self.Handle, SOL_SOCKET, SO_CONNECT_TIME, poptvalue, poptsize)
if not success then
return nil, err
end
return poptvalue[0];
end,
IsConnected = function(self)
success, err = self:GetConnectionTime()
if success and success >= 0 then
return true
end
return false
end,
The key is the GetConnectionTime() function. This call will tell you how long a socket has been connected. There are two ways in which this call can fail. The first is that the socket was never actually connected to anything. In this case, getsockopt() will succeed, but the connection time will be ‘-1′. The second case is that the socket was connected at some point, but subsequently disconnected. In this case, the getsockopt() will return nil, WSAENOTSOCK (or some other error).
The IsConnected() function takes advantage of these return values, assuming the only valid state is that the connection time is greater than or equal to 0. All other cases indicate the socket is not connected.
This is a fairly quick and easy test. Perhaps a more robust situation might be to use io completion ports, and catch the state transition there. But, this is a nice quick and dirty mechanism that you can use at any time, without having to get your feet into the io completion world.
So, there you have it. Fairly easy management of client side tcp/ip connections. By using such a mechanism, my code is much more compact and clean, and I don’t have to worry about managing my socket connections.
Objects – Full of sound and fury
Posted: January 14, 2013 Filed under: Lua, System Programming Leave a comment »All my professional career, “object oriented programming” has been a mantra and guidepost. Going all the way back to Pascal, through Smalltalk, Objective-C, C++, C#, Jave, etc. Encapsultation, inheritance, polymorphism, etc. It’s a religion.
If I go back further, to simula, or snobol, or some such, I’m sure I can find the roots of the thing. Would I rip it out? I don’t know. One thing has become abundantly clear to me though. In an attempt to “objectify” the world, my programming sometimes takes on characteristics, and twists, and constraints that I did not really intend, and don’t actually help my cause at all.
So, like many things, I’ve begun to simplify. Case in point:
local Tadpole_t = {}
Tadpole_t.Speak = function(atad)
if atad and atad.name then
print("My name is:", atad.name)
else
print("No Name")
end
end
I have a function which prints out the “name” field of an object, if it exists. I so happened to put this function into a table called “Tadpole_t”, just because I want to group some other functions in this table as well. With this construct, I can easily do this:
-- First way, no real 'objects'
local pole1 = {name="William"}
Tadpole_t.Speak(pole1);
> My name is: William
Alrighty, no big deal, just a function that prints something out. Of course, there’s no structure here. I didn’t have to tie the ‘Speak’ function to a structure, or any other object hierarchy. It will work if the field ‘name’ exists, otherwise it won’t, and that’s that.
Taking another step, I want to create tables that have this field in them, and I want to ensure I do it the same way every time:
local Tadpole = function(name)
local obj = {
name = name,
}
return obj
end
This now allows me to do the following:
-- Second way, a constructed object
local pole2 = Tadpole("Albert")
Tadpole_t.Speak(pole2)
> My name is: Albert
Calling the “Speak” function the same as in the first case, only the little table that contained the ‘name’ field was constructed within a function. Some might call this a ‘constructor’ if you were following object oriented paradigms. But still, there’s not much ‘object oriented’ about this, other than some encapsulation. At this point, I realize that I have all the basics I need. Most of my ‘object oriented’ programming comes down to this simple construct. Creating something that has some well known fields in it, and calling some functions that expect those well known fields. I don’t need any further language constructs to achieve my goals, so anything else the language offers on top is gravy. And it’s just that, gravy, nothing more. So, in the following, there are more lines of code, but they’re just the gravy that goes with these core object oriented meat and potatoes:
local Tadpole_t = {}
local Tadpole_t.Speak = function(atad)
if atad and atad.name then
print("My name is:", atad.name)
else
print("No Name")
end
end
local Tadpole_mt = {
__index = Tadpole_t
}
local Tadpole = function(name)
local obj = {
name = name,
}
setmetatable(obj, Tadpole_mt)
return obj
end
-- Third way, constructed object, with associated function
local pole3 = Tadpole("Adams")
pole3:Speak()
> My name is: Adams
The ‘setmetatable()’, is Lua’s way of allowing the syntax which is: pole3:Speak(). That is, taking an instance of a table, associating some functions with it, and allowing you to use this ‘:’ syntax to make a ‘method’ call. But, you’ll notice, the ‘Speak()’ function never changed its signature. It started with having a single parameter, and that single parameter remains in the end. We just added some syntactic sugar through the language compiler to make things look more object oriented.
There are various other ways to achieve this, and there are multiple frameworks to make Lua ‘object oriented’. I have found that sticking with the simple basics though, is actually the best way to achieve what I’m after. I start programming like a typical ‘C’ programmer, not relying on too many fancy constructs, nailing my basic fields, properties and functions. Then, I slowly add in object oriented conceptual helpers when/if needed. I don’t get too hung up on trying to model a perfect simulation of a real world activity. I recognize that I’m just trying to call some functions that so happen to want to see some fields with some given names, and specific properties.
The object purist in me screams out “yah but a ‘name’ must be a string!!”. Well, does it really need to be? Will my world fall apart if it isn’t? Will some bugs creap in because the “Speak()” function is called on something that’s not a Tadpole_t? Probably not, and this way my world stays much more maintainable.