Poor Man’s Event Driven IO

A few months ago I set out to explore the possibilities of creating a networking stack that is as performant as node.js, but does not rely on a separate library such as libuv. Why do such a thing? One is simply for the educational benefit, and the other is because although I really like node.js, I believe there is room in the world for a moral equivalent that is implemented in Lua. Now of course there already are a couple of attempts at the same, and I’ve mentioned them before. In most of the cases that I’ve seen though, they rely heavily on a library written in C to do the real heavy lifting.

At the core of the event driven model of network programming is a scheduler which generally blocks/sleeps a ‘thread’ when it would be blocked waiting on some sort of IO to be available. This is the crux of what libuv provides to the user.

Here’s a simple example of the problem:

-- somewhere in my code:
local err = readsocket(sock, buff, bufflen)
if err == "wouldblock" then
  -- ideally yield and come back here
  -- when input is available to be read
end

-- continue doing whatever I was doing

It seems so simple, you’d think you could just do:

coroutine.yield()

and call it a day. But, you can’t. Instead you’d have to cook up a loop that repeatedly checks to see if you’d still be blocking, until you’re not:

while true do
  local bytesread, err = readsocket(sock, buff, bufflen)
  if bytesread then
    break
  end

  if err ~= "wouldblock" then
    return nil, err
  end

  coroutine.yield();
end

-- continue doing whatever I was doing

This will work, but it has the unfortunate side effect of spinning the cpu relentlessly for no good reason. This is essentially polling, trying the readsocket call, and letting that determine the state of things. Ideally, it would be more efficient if the kernel, or some other mechanism told me when this socket was actually ready to be read from without blocking, then I’d only do the read when I knew it would not block. No spinning, I could spend the rest of the time slepping the process.

So, how to do this notification thing? Well, there are many mechanisms, such as select(), poll(), epoll(), WSAPoll(). I’m going to focus on the Windows supplied WSAPoll(), because it’s the same as epoll() for the most part, and that’s what people use in Linux typically. On Windows, the absolutely most efficient way to do this is to use IO completion ports, but as soon as you step into that, you’re in a world that requires multiple threads, and that’s suddenly much more complex to deal with. So, I’ll sacrifice on absolute performance for Windows, while going for the simplest implementation possible.

First of all, here’s the code in its entirety:


local ffi = require("ffi");
local bit = require("bit")
local band = bit.band
local bor = bit.bor

local WinSock = require("WinSock_Utils");


local SocketIoPool_t = {}
local SocketIoPool_mt = {
  __index = SocketIoPool_t,
}

local SocketIoPool = function(capacity)
  capacity = capacity or 1
	
  if capacity < 1 then return nil end

  local obj = {
    fdarray = ffi.new("WSAPOLLFD[?]", capacity);

    Capacity = capacity,	-- How many slots are available
    Handles = {},
    Sockets = {},
  }
  setmetatable(obj, SocketIoPool_mt);

  obj:ClearAllSlots();

  return obj
end

SocketIoPool_t.ClearSlot = function(self, slotnumber)
  if not slotnumber or slotnumber = self.Capacity then
    return false
  end

  self.fdarray[slotnumber].fd = -1;
  self.fdarray[slotnumber].events = 0;
  self.fdarray[slotnumber].revents = 0;

  return true
end

SocketIoPool_t.ClearAllSlots = function(self)
  for i=1,self.Capacity do
    self:ClearSlot(i-1);
  end
end

SocketIoPool_t.GetOpenSlot = function(self)
  -- traverse each of the slots in the array
  for i=0, self.Capacity-1 do
    if self.fdarray[i].fd  0 then
      if self.fdarray[i].revents > 0 then
        self.Handles[self.fdarray[i].fd].fdarray.revents = self.fdarray[i].revents;
	eventqueue:Enqueue(self.Handles[self.fdarray[i].fd]);
	self.fdarray[i].revents = 0;
      end
    end
  end

  return success
end

SocketIoPool_t.AddSocket = function(self, sock, events)
	local slot, err = self:GetOpenSlot()
--print("AddSocket, slot: ", slot, err)
	if not slot then 
		return nil, err
	end

	events = events or bor(POLLWRNORM, POLLIN);
	
	self.fdarray[slot].fd = sock.Handle;
	self.fdarray[slot].events = events;
	self.fdarray[slot].revents = 0;

	self.Handles[sock.Handle] = sock;
	self.Sockets[sock.Handle] = slot;

	return slot
end

SocketIoPool_t.RemoveSocket = function(self, sock)
	-- remove the associated handle
	self.Handles[sock.Handle] = nil;
	
	-- remove the associated socket
	self:ClearSlot(self.Sockets[sock.Handle]);

	self.Sockets[sock.Handle] = nil;
end


SocketIoPool_t.Cycle = function(self, eventqueue, timeout)
	timeout = timeout or 0

	local success, err = WinSock.WSAPoll(self.fdarray, self.Capacity, timeout);

--print("Cycle: ", success, err, timeout);

	if not success then
		return nil, err
	end

	-- Go through each of the slots looking
	-- for the sockets that are ready for activity
	for i=0, self.Capacity-1 do
		if self.fdarray[i].fd > 0 then
			if self.fdarray[i].revents > 0 then
				self.Handles[self.fdarray[i].fd].fdarray.revents = self.fdarray[i].revents;
				eventqueue:Enqueue(self.Handles[self.fdarray[i].fd]);
				self.fdarray[i].revents = 0;
			end
		end
	end

	return success
end

return SocketIoPool

The core of the whole mess relies on a single system function call, WSAPoll():

  local success, err = WinSock.WSAPoll(self.fdarray, self.Capacity, timeout);

What this routine does is take an array of socket descriptors, and tells you which ones are ready for the action you’ve asked about. In most cases, you’ll register to know when it is available for reading, or writing. You’ll get errors and hangups reported for free.

The first parameter to this call is an array of structures. This structure looks like this:

typedef struct pollfd {
    SOCKET		fd;
    int16_t		events;
    int16_t		revents;
} WSAPOLLFD;

That is, the socket descriptor, and the events you want to ‘listen’ for. Upon return from the WSAPoll() call, each of these structures will contain the event information in the ‘revents’ field. You can set the ‘fd’ field to ‘-1’ if you’re not using this particular slot.

So, there’s a little bit of a challenge here. You can’t just go to the system and say: watchsocket(fd, events). You have to use this array of structures. So, there’s a bit of management that goes on here. The SocketIoPool code will allocate an array of structures, with a fixed capacity. It will then manage this array using AddSocket(), and RemoveSocket(). If you call AddSocket(), and there isn’t an open slot, it will return an error, and that socket will not be watched.

The Cycle() function is meant to be called from within a scheduler loop. Each time it is called, it will take the socket that is ready for io operations, and place it on the waiting queue.

Ah, now things get interesting. This is where the break from callbacks starts. You could argue that the “Enqueue()” is in fact a callback, and you’d be right. But, given that queues are so useful, and can easily support other callback models, I figured it was good to start with them from the beginning. Of course, if you wanted to change this to a simple callback instead, you could easily do that.

The relevant part of the scheduler looks like this:

EventScheduler_t.YieldForIo = function(self, sock, iotype)
--print("EventScheduler_t.YieldForIo()");
--print("-- Current Fiber: ", self.CurrentFiber);

  -- Try to add the socket to the event pool
  local success, err = self.IoEventPool:AddSocket(sock, iotype)

  if success then
    -- associate a fiber with a socket
    self.EventFibers[sock.Handle] = self.CurrentFiber;

    -- Keep a list of fibers that are awaiting io
    self.FibersAwaitingEvent[self.CurrentFiber] = true;
  else
    -- failed to add socket to event pool
  end

  -- Whether we were successful or not in adding the socket
  -- to the pool, perform a yield() so the world can move on.
  yield();
end

EventScheduler_t.ProcessEventQueue = function(self, queue)
  for i=1,queue:Len() do
    local sock = queue:Dequeue();

    if sock then
      local fiber = self.EventFibers[sock.Handle];
      if fiber then
        self:ScheduleFiber(fiber);
        self.EventFibers[sock.Handle] = nil;
        self.FibersAwaitingEvent[fiber] = nil;
        self.IoEventPool:RemoveSocket(sock);
      else
        print("EventScheduler_t.ProcessEventQueue(), No Fiber waiting to process.")
        -- remove the socket from the watch list
      end
    else
      print("EventScheduler_t.ProcessEventQueue(), No sock found in queue");
    end
  end
end

while self.ContinueRunning do
  -- First check if there are any io events
  local success, err = self.IoEventPool:Cycle(ioeventqueue, 500);
  --print("Event Pool: ", success, err, ioeventqueue:Len())
  if success  then
    if success > 0 then
      -- schedule the io operations
      self:ProcessEventQueue(ioeventqueue);
    end
  else
    --print("Event Pool ERROR: ", err);
  end

With this bit of kit, instead of simply calling “coroutine.yield()” whenever you get a ‘wouldblock’ error, you would call “YieldForIo()”. This will essentially take the currently running ‘fiber’ and put it into a stack, separate from the regularly running coroutines, and only resume the fiber in the case where there has been any io activity on the socket that is being referred to.

This frees up the rest of the scheduler to deal with other routines, and if there are no routines, it can simply sleep for a bit, which will make it relatively easy on the CPU.

Well, there you have it. Almost a complete system, with the core written entirely in Lua, without any external dependencies, other than the core OS. This is a good thing because it basically says, all that libuv, or other code that you’d normally rely on to perform this task, can be left behind, because you can do it all in pure lua code, which makes my life simpler.

One of the benefits of doing this all in Lua is that for much smaller devices, particularly ones that don’t have a ton of memory, nor even a multi-thread environment, I can get the benefits of a highly performant networking stack, just like on the big machines. This is fairly important I think.

I’ve been playing with this model for a while, and have quite a few examples building up around it. I’ll have to put out the all encompassing guide that pulls it all together, but for now, this is pretty much the last piece of the puzzle.

Advertisements


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s