Alertable Predicates? What’s that?

TINN does async on I/O without much fuss.  It even allows you to suspend a ‘fiber’ by putting it to sleep.  You can easily execute a function on a timer, etc.  These are all good tools to have in the parallel programming toolbox.  There is one tool that has been missing for me though.  The fact that I can run things in parallel means that I need a mechanism to synchronize when some of those tasks complete.  I don’t want to introduce callbacks, because that leads down a whole different path.

I want to execute code that looks something like this:

local finalGoal = function()
  waitFor(goal1)
print("Goal 1 Attained.")
  waitFor(goal2)
print("Goal 2 Attained..")
  waitFor(goal3)
print("Goal 3 Attained...")
print("All Goals Attained!!")

  -- Once all the goals have been met
  -- stop the scheduler.
  stop();
end

The key here is the ‘waitFor()’ function call. You call this function with a ‘predicate’. A predicate is nothing more than a function that returns true or false. If it returns ‘false’ then the condition of the predicate has not been met, and the fiber will not continue. Once the predicate returns ‘true’, the fiber will continue from the point directly after it called ‘waitFor()’. So, to see this sample in its entirety:

-- test_waitFor.lua

local IOProcessor = require("IOProcessor")
local Timer = require("Timer")

local count = 0;


local goal1 = function()
  return count >= 5;
end

local goal2 = function()
  return count >= 10;
end

local goal3 = function()
  return count >= 15;
end

local finalGoal = function()
  waitFor(goal1)
print("Goal 1 Attained.")
  waitFor(goal2)
print("Goal 2 Attained..")
  waitFor(goal3)
print("Goal 3 Attained...")
print("All Goals Attained!!")

  -- Once all the goals have been met
  -- stop the scheduler.
  stop();
end

-- setup a timer to increment the count
-- every 500 milliseconds
local incrTimer = Timer({Period = 500; OnTime = function(timer) count = count+1 end})

spawn(finalGoal)	

run()

This is a contrived case, but it’s illustrative. Down at the bottom, there is a timer being setup with an anonymous function. The timer is set to go off every 500 milliseconds. Once it does, it will execute the associated code, and the count variable will be incremented by 1. That’s all the time needs to be concerned with.

There are three predicate functions setup, goal1, goal2, and goal3. Each one of these predicates checks the value of the count variable, and returns true or false depending on the criterial of the goal.

There is a separate function, ‘finalGoal’. This one is spawned separately, and it will wait for each of the goals to be met in turn. Which order it does these does not really matter in this case. Once all of the goals are met, the scheduler will be stopped and the program will end.

Those predicates can be anything really. They can make calls out to the internet, they can count, they can check the value of any variable, they can check any state of the running process. Whatever you want can be a predicate. Perhaps you want to stop accepting new connections on a web server once a memory limit has been reached. Whatever.

Having the finalGoal() as a separate fiber is useful because the whole fiber will essentially ‘block’ until the goals are all met. This allows you to have other parallel fibers running at the same time, and they will go along their merry way while finalGoal() is waiting for its conditions to be met.

The implementation of this predicate yielding is pretty straight forward. Within the scheduler, there are only two new functions:

IOProcessor.yieldUntilPredicate = function(self, predicate)
	if self.CurrentFiber~= nil then
		self.CurrentFiber.Predicate = predicate;
		table.insert(self.FibersAwaitingPredicate, self.CurrentFiber)

		return self:yield();
	end

	return false;
end

IOProcessor.stepPredicates = function(self)
	local nPredicates = #self.FibersAwaitingPredicate

	for i=1,nPredicates do
		local fiber = self.FibersAwaitingPredicate[1];
		if fiber.Predicate() then
			fiber.Predicate = nil;
			self:scheduleFiber(fiber);

			table.remove(self.FibersAwaitingPredicate, 1)
		end
	end
end

Then, waitFor() is just a shorthand for yieldUntilPredicate():

waitFor = function(predicate)
  return IOProcessor:yieldUntilPredicate(predicate)
end

The predicates are checked every time through the scheduler loop:

IOProcessor.step = function(self)
	self:stepTimeEvents();
	self:stepFibers();
	self:stepIOEvents();
	self:stepPredicates();
.
.
.

This is essentially the same as installing a very high priority fiber because all code for all predicates will run every time through the scheduler loop. The predicates do not need to be ‘scheduled’ like other tasks. A predicate should probably be a fairly short bit of code that runs quickly, and is not very resource intensive. They should be used sparingly.

Quite a few years ago, I was involved with the creation of LinQ in the .net frameworks. At that time, I had this nagging notion that if we could only add a ‘when predicate do’ construct to the CLR, that would be a fairly useful thing. I think using predicates for control flow is especially useful for parallel programming. I also think this construct is easier to use and implement than various other forms of barriers and whatnot.

So, there you have it. TINN supports async IO, timers, yielding on time, and yielding on predicates. Perhaps a predicate can be used to bridge the gap between the normal scheduler, and the event loop that is presented by the typical keyboard and mouse loop.

 

Advertisements

One Comment on “Alertable Predicates? What’s that?”

  1. […] Alertable Predicates? What’s that? […]


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s