Parallel Conversations

I have been tinkering with words of late. I’ve added ‘waitFor’, ‘when’ and ‘whenever’ to the TINN lexicon, and my programming is becoming easier, more organized, and easier to describe.

Recently I added another set of words: ‘waitSignal’, ‘signalOne’, and ‘signalAll’. If you were using another system, you might call these ‘events’, but really they’re just signaling.

Here’s how I might use them:

local Application = require("Application")(true)

local function waiter(num)
  num = num or 0

  local function closure()
    print(string.format("WAITED: %d",num))
  end

  return closure;
end

local function main()

  for i=1,4 do
    onSignal(waiter(i), "waiting")
  end

  -- signal only one of them
  signalOne("waiting")

  -- sleep a bit giving other tasks
  -- a chance to run
  sleep(500)

  -- signal the rest of them	
  signalAll("waiting"))
  sleep(2000)
	
  print("SLEEP AFTER signalAll")
end

run(main)

First, it spawns 4 tasks which will all be waiting on the ‘waiting’ signal. With ‘signalOne’, only 1 of the 4 waiting tasks will be awakened and continue execution. With the ‘signalAll’, the rest of the waiting tasks are all scheduled to run, and thus continue from where they left of.

The core primitive for signaling is the ‘waitSignal()’ function. This will essentially suspend the currently running task until the specified signal has been given. The ‘onSignal()’ function is a convenience which will spawn a separate task which will do the waiting and execute the specified function at the appropriate time.

If you were using another language, these might be ‘onEvent()’ and ’emit()’. Of course it’s really easy to code up that way, just create a name alias to whatever you like. You could even do:

local function on(eventName, func)
  return onSignal(func, eventName)
end

So, these are some more words that are in the parallel programming arsenal. In totality, the set is:

Time related

  • sleep
  • delay
  • periodic

Predicates

  • waitFor
  • when
  • whenever

Signaling

  • onSignal
  • signalOne
  • signalAll
  • waitSignal

Task and scheduler

  • run
  • stop
  • spawn
  • yield

With these words in hand, doing my multi-task programming is becoming easier by the moment. I don’t worry about the lower level multi-tasking primitive such as mutexes, locks, and the like. I’m not really that concerned with timers, thread pools, or any of the other machinery that makes this all happen. I just write my simple code.

One thing has struck me though. This is all great for a single threaded environment. Basically collaborative multi-tasking. In this age of super fast CPUs, it turns out that going single threaded is just fine. In many cases it’s actually better than going preemptive multi-threaded because you don’t necessarily incur as much OS thread level context switching.

I’d like to go one step further though. A long time ago we didn’t have L1/L2 caches on the CPUs, then they came along, got better, and now are fairly large. As a programmer, my concern for them is fairly minimal. I don’t do much to ensure that the caches are used properly. I just write my code, and either the compiler, or the CPU itself deals with all the mechanics. Same goes with prefetching instructions and the like. I don’t write my code to make that job any easier, it just happens automagically.

Here I am contemplating the same thing about threads. As my CPU has increasingly more cores, I am wondering if the CPU cycles should be viewed the same as we viewed levels of memory in the past. Is the a C1/C2 level of core execution. That is, most things should stay on the same thread because switching to a different core is costly. Something very low level should decide on when that switching occurs.

My application/runtime can know a lot about what I’m going to do because the thing is all laid out in script. After some analysis, perhaps I can decide “this thing is going to sleep for more than 500 ms, therefore I’m going to actually put it over here on this other thread which is actually sleeping.”. In that way, I can reduce my resource usage because I won’t be polling the timer every time through the scheduling loop, I can be smarter.

If I had such smartness, then when I have 16 – 100 cores, I’ll be able to easily take advantage of those multiple cores without having to truly understand and navigate that madness on my own. Let the machine figure it out, it’s much better at that than I am.

In the meanwhile, I’ve got these multi-tasking primitives which are making my life a lot easier when it comes to programming fairly complex systems.


Leap Motion Event Filtering

So, I’m always trying to make my code simpler. Easier to understand, easier to maintain. With the Leap, or any input device, you are faced with a continuous stream of data. In many situations, you’d like to just filter through stuff and only deal with certain types of data. In one sense, you need a simple stream based query processor.

I had the beginnings of such a thing, and I’ve now separated out the pieces. In the case of the Leap Motion, you are faced with streams of data that might look like this:

{
	"id":1237560,
	"r":[[0.444044,0.663489,-0.602169],[0.184129,-0.725287,-0.663367],[-0.876882,0.183687,-0.444227]],
	"s":762.482,
	"t":[5336.48,-24560.1,5768.29],
	"timestamp":13587071004,

	"hands":[{
		"id":4,
		"direction":[-0.0793992,0.899586,-0.427785],
		"palmNormal":[-0.16208,-0.432144,-0.886711],
		"palmPosition":[27.138,227.235,80.2504],
		"palmVelocity":[-136.716,-134.926,-359.534],
		"sphereCenter":[9.15823,202.468,9.29922],
		"sphereRadius":106.122,
		"r":[[0.989305,-0.132062,-0.0619254],[0.117032,0.97208,-0.203384],[0.0870557,0.193962,0.977139]],
		"s":1.45151,
		"t":[-18.2708,21.6366,-106.687]
	}],
	
	"pointables":[
		{
			"direction":[0.196259,0.670762,-0.715235],
			"handId":4,
			"id":7,
			"length":68.7964,
			"tipPosition":[61.3422,285.46,38.3742],
			"tipVelocity":[-184.398,-119.405,-322.679],
			"tool":false
		},
		{
			"direction":[0.0324904,0.792378,-0.609165],
			"handId":4,
			"id":3,
			"length":76.8893,
			"tipPosition":[14.7425,304.766,41.4163],
			"tipVelocity":[-229.246,-95.6285,-323.667],
			"tool":false
		}
}]
}

This data is representative of a single ‘frame’ coming off the device. As you can see, it’s hierarchical. That is, for every discreet frame, I get a grouping of hand(s) information as well as pointables and possibly gestures. This is how the Leap Motion software packages things up.

In some applications, what I really want is a stream of events, not presented hierarchically. Additionally, I want to read that stream of events and easily filter it looking for particular patterns. Basically, I’d like to be able to write a program like the following, which does nothing more than print the hand information. In particular, I’m looking for the radius of the sphere, as well as the sphere center.

package.path = package.path.."../?.lua"

local LeapInterface = require("LeapInterface");
local FrameEnumerator = require("FrameEnumerator");
local EventEnumerator = require("EventEnumerator");

local leap, err = LeapInterface();

assert(leap, "Error Loading Leap Interface: ", err);

local printHand = function(hand)
  local c = hand.sphereCenter;
  local r = hand.sphereRadius;
  local n = hand.palmNormal;
  print(string.format("HAND: %3.2f  [%3.2f %3.2f %3.2f]", r, c[1], c[3], c[3]));
end

-- Only allow the hand events to come through
local handfilter = function(event)
  return event.palmNormal ~= nil
end

local main = function()
  for event in EventEnumerator(FrameEnumerator(leap), handfilter) do
    printHand(event);
  end	
end

run(main);

Easy to digest. Looking at the main() function, there is an iterator chain. The core is the ‘FrameEnumerator(leap)’. This is the source. That is wrapped by the EventEnumerator() iterator, which consumes the frameiterator, as well as the ‘handfilter()’ function.

Looking at the FrameEnumerator:

-- FrameEnumerator.lua
local JSON = require("dkjson");
local ffi = require("ffi");

local FrameEnumerator = function(interface)
  local closure = function()
    for rawframe in interface:RawFrames() do
      local frame = JSON.decode(ffi.string(rawframe.Data, rawframe.DataLength));
      return frame;
    end
  end	

  return closure;
end

return FrameEnumerator

Not very much code at all, and borrowed from the FrameObserver object in a previous article. This enumerator has the sole purpose of turning the raw strings that arrive from the WebSocket interface into a stream of discreet table objects. It does that by doing the JSON.decode(), which takes a JSON formatted string and turns it into a Lua table object. It just hands those out until it can’t hand out any more.

The EventEnumerator is a little bit more complex, but not much. It’s sole purpose in life is to take Lua tables, which presumably have the format of these Leap Motion frames, and turn them into discreet events, basically flattening the hierarchical data structure. In addition to flattening the data structure, you can apply the filter, thus throwing away bits of the frame that are not going to be processed any further:

-- EventEnumerator.lua
local Collections = require("Collections");

local EventEnumerator = function(frameemitter, filterfunc)
  local eventqueue = Collections.Queue.new();

  local addevent = function(event)
    if filterfunc then
      if filterfunc(event) then
        eventqueue:Enqueue(event);
      end
    else
      eventqueue:Enqueue(event);
    end
  end


  local closure = function()
    -- If the event queue is empty, then grab the
    -- next frame from the emitter, and turn it into
    -- discreet events.
    while eventqueue:Len() < 1 do
      local frame = frameemitter();
      if frame == nil then
        return nil
      end

      if frame.hands ~= nil then
        for _,hand in ipairs(frame.hands) do
          addevent(hand);
        end
      end

      if frame.pointables ~= nil then
        for _,pointable in ipairs(frame.pointables) do
          addevent(pointable);
        end
      end

      if frame.gestures then
        for _,gesture in ipairs(frame.gestures) do
          addevent(gesture);
        end
      end						
    end

    return eventqueue:Dequeue();
  end	

  return closure;
end

return EventEnumerator

Stitch it all together and the first program actually works, you can get a stream of ‘hand’ events. The beauty of this system is that you can further string iterators together in this way. Of course, at the lowest level, you can easily filter to find hands, fingers, tools, gestures, and the like. You might also realize something else special about this. The ‘filter’ is just Lua code, so it can be as complex or simple as you want. In this particular case, I just wanted to check for the existance of a single field. If I thought the amount of data was too much, and I wanted to cut it in half, I could have easily kept a counter, and only returned every other event. You could go further, and not just return a true/false, but you could possibly alter the event as well in some way. But, the true power comes from composition. Rather than making a more complex single filter, I’d rather just make more filters and string them together.

Using this simple concept of Enumeration, I can imagine performing a whole bunch of simple, and even complex operations, simply by tying the proper enumerators, observers, queues and the like together.