Computicles – Simplifying Chaining

Using the “exec()” method, I can chain computicles together, but the code is kind of raw. There is one addition I left out of the equation last time around, and that the hydrate/rehydrate of a computicle variable itself. Now it looks like this:

elseif dtype == "table" then
  if getmetatable(data) == Computicle_mt then
    -- package up a computicle
    datastr = string.format("Computicle:init(TINNThread:StringToPointer(%s),TINNThread:StringToPointer(%s));", 
      TINNThread:PointerToString(data.Heap:getNativeHandle()), 
      TINNThread:PointerToString(data.IOCP:getNativeHandle()));
  elseif getmetatable(data) == getmetatable(self.IOCP) then
    -- The data is an iocompletion port, so handle it specially
    datastr = string.format("IOCompletionPort:init(TINNThread:StringToPointer(%s))",
      TINNThread:PointerToString(data:getNativeHandle()));
  else
    -- get a json string representation of the table
    datastr = string.format("[[ %s ]]",JSON.encode(data, {indent=true}));
  end

This is just showing the types that are descended from the ‘table’ type. This includes IOCompletionPort, and ‘Computicle’. Anything other than these two will be serialized as fairly straight forward key/value pair tables. With this in place, I can now do the following.

-- Setup the splitter that will dispatch to the leaf nodes
local splitter = Computicle:load("comp_splittercode");

-- Setup the source, which will be originating messages
local source = Computicle:load("comp_sourcecode");
source.sink = splitter;

The code for the comp_sourcecode computicle looks like this:


RunOnce = function(sink)
  print("comp_sourcecode, RunOnce...")
  for i = 1, 10 do
    sink:postMessage(i);
  end
end

OnIdle = function(counter)
  print("comp_sourcecode, OnIdle(): ", counter)
  if sink ~= nil then 
    RunOnce(sink);

    sink = nil;
    exit();
  end
end

This computicle implements the OnIdle() function, as it will use that to determine when I can send messages off to the sink. If the sink hasn’t been set yet, then it will do nothing. If it has been set, then it will execute the ‘RunOnce()’ function, sending the sink a bunch of messages.

After performing the RunOnce() task, the computicle will simply exit(), which == SELFICLE:quit() == SELFICLE:postMessage(Computicle.Messages.QUIT);

And that’s the end of that computicle. The splitter has become much simpler as well.

local ffi = require("ffi");


OnMessage = function(msg)
	msg = ffi.cast("ComputicleMsg *", msg);
	local Message = msg.Message;
--print("comp_splittercode, OnMessage(): ", Message);

	if sink1 then
		sink1:postMessage(Message);
	end	

	if sink2 then
		sink2:postMessage(Message);
	end
end

Here, the splitter assumes there are two sinks. If either of them exists, they will receive the message that was passed into the splitter. If they don’t exist, then the message will be dropped. Of course, other behaviors, such as holding on the messages, could be implemented as well.

In this case, the computicle is not exited internally. This is because the splitter itself does not know when it is finished, all it knows is that when it receives a message, it is supposed to pass that message on to its two sinks.

The sink code is equally simple.

local ffi = require("ffi");

OnMessage = function(msg)
  msg = ffi.cast("ComputicleMsg *", msg);
  local Message = msg.Message;

  print(msg.Message*10);
end

Here again, just implement the ‘OnMessage()’ function, and the comp_msgpump code will call it automatically. And again, the sink does not know when it is done, so it does not explicitly exit.

Pulling it all together, the exiting logic becomes more clear:

local Computicle = require("Computicle");

-- Setup the leaf nodes to receive messages
local sink1 = Computicle:load("comp_sinkcode");
local sink2 = Computicle:load("comp_sinkcode");


-- Setup the splitter that will dispatch to the leaf nodes
local splitter = Computicle:load("comp_splittercode");

splitter.sink1 = sink1;
splitter.sink2 = sink2;


-- Setup the source, which will be originating messages
local source = Computicle:load("comp_sourcecode");
source.sink = splitter;


-- Close everything down from the outside
print("FINISH source: ", source:waitForFinish());

-- tell the splitter to quit
splitter:quit();
print("FINISH splitter:", splitter:waitForFinish());

-- the two sinks will receive the quit message from the splitter
-- so, just wait for them to quit.
print("Finish Sink 1: ", sink1:waitForFinish());
print("Finish Sink 2: ", sink2:waitForFinish());

There is an order in which computicles are created, and assigned, which stitches them together. The computicles could actually be constructed in a “suspended state”, but that would not help matters too much.

At the end, the quit() sequence can clearly be seen. First, wait for the source to be finished. Then tell the splitter to quit(). This is not an interrupt, so whatever was in its queue previously will flow through before the QUIT is processed. Then finally, do the same thing on the sinks, after the splitter has finished.

All messages flow, and all processing finishes cleanly. So, this is one way to perform the exit mechanism from the outside, as well as from the inside.

Adding this bit of ease in programming did not change the fundamentals of the computicle. It still has a single communciation mechanism (the queue). It still performs computation, it still communicates with the outside world. The new code saves the error prone process of type casting and hydrating objects that the system already knows about. This in turn makes the usage pattern that much easier to deal with.

The comp_msgpump was added in a fairly straight forward way. When a computicle is constructed, there is a bit of code (a Prolog) that is placed before the user’s code, and a bit of code (Epilog) placed after it. Those two bits of code look like this:

Computicle = {
	Prolog = [[
TINNThread = require("TINNThread");
Computicle = require("Computicle");
IOCompletionPort = require("IOCompletionPort");
Heap = require("Heap");

exit = function()
    SELFICLE:quit();
end
]];

Epilog = [[
require("comp_msgpump");
]];

Computicle.createThreadChunk = function(self, codechunk, params, codeparams)
	local res = {};

	-- What we want to load before any other code is executed
	-- This is typically some 'require' statements.
	table.insert(res, Computicle.Prolog);


	-- Package up the parameters that may want to be passed in
	local paramname = "_params";

	table.insert(res, string.format("%s = {};", paramname));
	table.insert(res, self:packParams(params, paramname));
	table.insert(res, self:packParams(codeparams, paramname));

	-- Create the Computicle instance that is running within 
	-- the Computicle
	table.insert(res, 
		string.format("SELFICLE = Computicle:init(%s.HeapHandle, %s.IOCPHandle);", paramname, paramname));


	-- Stuff in the user's code
	table.insert(res, codechunk);

	
	-- What we want to execute after the user's code is loaded
	-- By default, this will be a message pump
	table.insert(res, Computicle.Epilog);

	return table.concat(res, '\n');
end

If an application needs a different prolog or epilog, it’s fairly easy to change them either by changing the file that is being read in, or by changing the string itself: Computicle.Prolog = [[print(“Hello, World”)]]

And so it goes. I’ve never had it so easy creating multi-threaded bits of computation and stitching those bits together.

Advertisements


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s