Multitasking single threaded UI – Gaming meets networking

There are two worlds that I want to collide. There is the active UI gaming world, then there’s the more passive networking server world. The difference between these two worlds is the kind of control loop and scheduler that they both typically use.

The scheduler for the networking server is roughly:

while true do
  waitFor(networkingEvent)
  
  doNetworkingStuff()
end

This is great, and works quite well to limit the amount of resources required to run the server because most of the time it’s sitting around idle. This allows you to serve up web pages with a much smaller machine than if you were running flat out, with say a “polling” api.

The game loop is a bit different. On Windows it looks something like this:

while true do
  ffi.fill(msg, ffi.sizeof("MSG"))
  local peeked = User32.PeekMessageA(msg, nil, 0, 0, User32.PM_REMOVE);
			
  if peeked ~= 0 then
    -- do regular Windows message processing
    local res = User32.TranslateMessage(msg)		
    User32.DispatchMessageA(msg)
  end

  doOtherStuffLikeRunningGamePhysics();
end

So, when I want to run a networking game, where I take some input from the internet, and incorporate that into the gameplay, I’ve got a basic problem. Who’s on top? Which loop is going to be THE loop?

The answer lies in the fact that the TINN scheduler is a basic infinite loop, and you can modify what occurs within that loop. Last time around, I showed how the waitFor() function can be used to insert a ‘predicate’ into the scheduler’s primary loop. So, perhaps I can recast the gaming loop as a predicate and insert it into the scheduler?

I have a ‘GameWindow’ class that takes care of creating a window, showing it on the screen and dealing with drawing. This window has a run() function which has the typical gaming infinite loop. I have modified this code to recast things in such a way that I can use the waitFor() as the primary looping mechanism. The modification make it look like this:

local appFinish = function(win)

  win.IsRunning = true
  local msg = ffi.new("MSG")

  local closure = function()
    ffi.fill(msg, ffi.sizeof("MSG"))
    local peeked = User32.PeekMessageA(msg, nil, 0, 0, User32.PM_REMOVE);
			
    if peeked ~= 0 then
      local res = User32.TranslateMessage(msg)
      User32.DispatchMessageA(msg)

      if msg.message == User32.WM_QUIT then
        return win:OnQuit()
      end
    end

    if not win.IsRunning then
      return true;
    end
  end

  return closure;
end

local runWindow = function(self)
	
  self:show()
  self:update()

  -- Start the FrameTimer
  local period = 1000/self.FrameRate;
  self.FrameTimer = Timer({Delay=period, Period=period, OnTime =self:handleFrameTick()})

  -- wait here until the application window is closed
  waitFor(appFinish(self))

  -- cancel the frame timer
  self.FrameTimer:cancel();
end

GameWindow.run = function(self)
  -- spawn the fiber that will wait
  -- for messages to finish
  Task:spawn(runWindow, self);

  -- set quanta to 0 so we don't waste time
  -- in i/o processing if there's nothing there
  Task:setMessageQuanta(0);
	
  Task:start()
end

Starting from the run() function. Only three things need to occur. First, spawn ‘runWindow’ in its own fiber. I want this routine to run in a fiber so that it is cooperative with other parts of the system that might be running.

Second, call ‘setMessageQuanta(0)’. This is a critical piece to get a ‘gaming loop’. This quanta is the amount of time the IO processing part of the scheduler will spend waiting for an IO event to occur. This time will be spent every time through the primary scheduler’s loop. If the value is set to 0, then effectively we have a nice runaway infinite loop for the scheduler. IO events will still be processed, but we won’t spend any time waiting for them to occur.

This has the effect of providing maximum CPU timeslice to various other waiting fibers. If this value is anything other than 0, let’s say ‘5’ for example, then the inner loop of the scheduler will slow to a crawl, providing no better than 60 iterations of the loop per second. Not enough time slice for a game. Setting it to 0 allows more like 3000 iteractions of the loop per second, which gives more time to other fibers.

That’s the trick of this integration right there. Just set the messageQuanta to 0, and away you go. To finish this out, take a look at the runWindow() function. Here just a couple of things are set in place. First, a timer is created. This timer will ultimately end up calling a ‘tick()’ function that the user can specify.

The other thing of note is the use of the waitFor(appCheck(self)). This fiber will block here until the “appCheck()” predicate returns true.

So, finally, the appFinish() predicate, what does that do?

Well, I’ll be darned if it isn’t the essence of the typical game window loop, at least the Windows message handling part of it. Remember that a predicate that is injected to the scheduler using “waitFor()” is executed every time through the scheduler’s loop, so the scheduler’s loop is essentially the same as the outer loop of a typical game.

With all this in place, you can finally do the following:


local GameWindow = require "GameWindow"
local StopWatch = require "StopWatch"

local sw = StopWatch();

-- The routine that gets called for any
-- mouse activity messages
function mouseinteraction(msg, wparam, lparam)
	print(string.format("Mouse: 0x%x", msg))
end

function keyboardinteraction(msg, wparam, lparam)
	print(string.format("Keyboard: 0x%x", msg))
end


function randomColor()
		local r = math.random(0,255)
		local g = math.random(0,255)
		local b = math.random(0,255)
		local color = RGB(r,g,b)

	return color
end

function randomline(win)
	local x1 = math.random() * win.Width
	local y1 = 40 + (math.random() * (win.Height - 40))
	local x2 = math.random() * win.Width
	local y2 = 40 + (math.random() * (win.Height - 40))

	local color = randomColor()
	local ctxt = win.GDIContext;

	ctxt:SetDCPenColor(color)

	ctxt:MoveTo(x1, y1)
	ctxt:LineTo(x2, y2)
end

function randomrect(win)
	local width = math.random(2,40)
	local height = math.random(2,40)
	local x = math.random(0,win.Width-1-width)
	local y = math.random(0, win.Height-1-height)
	local right = x + width
	local bottom = y + height
	local brushColor = randomColor()

	local ctxt = win.GDIContext;

	ctxt:SetDCBrushColor(brushColor)
	--ctxt:RoundRect(x, y, right, bottom, 0, 0)
	ctxt:Rectangle(x, y, right, bottom)
end


function ontick(win, tickCount)

	for i=1,30 do
		randomrect(win)
		randomline(win)
	end

	local stats = string.format("Seconds: %f  Frame: %d  FPS: %f", sw:Seconds(), tickCount, tickCount/sw:Seconds())
	win.GDIContext:Text(stats)
end



local appwin = GameWindow({
		Title = "Game Window",
		KeyboardInteractor = keyboardinteraction,
		MouseInteractor = mouseinteraction,
		FrameRate = 24,
		OnTickDelegate = ontick,
		Extent = {1024,768},
		})


appwin:run()

This will put a window on the screen, and draw some lines and rectangles at a frame rate of roughly 24 frames per second.

And thus, the two worlds have been melded together. I’m not doing any networking in this particular case, but adding it is no different than doing it for any other networking application. The same scheduler is still at play, and everything else will work as expected.

The cool thing about this is the integration of these two worlds does not require the introduction of multiple threads of execution. Everything is still within the same single threaded context that TINN normally runs with. If real threads are required, they can easily be added and communicated with using the computicles that are within TINN.


Pinging Peers – Creating Network Meshes

I have the need to create a mesh of network nodes on occasion. The configuration of the mesh is typically a set of VMs running on some cloud service or another. The code running on these VMs may perform some basic task, probably similar across all the VMs. They do need to communicate information with each other, and to do that, they need to maintain a list of their peers and the latencies between them, and the like.

So, I started modeling this thing called the Boundary Gateway Protocol (BGP), which is one of the ancient low lying protocols of the internet. I model the protocol by first creating an object BGPPeer. The intention of this object is to act as both the host and client of the fundamental mesh communications. It looks like this:

-- BGProtocol.lua
--[[
	Implementation of Boundary Gateway Protocol (BGP)
--]]

local ffi = require("ffi");
local Timer = require("Timer")
local IOCPSocket = require("IOCPSocket");
local SocketUtils = require("SocketUtils")
local Network = require("Network")

--[[
	BGP Relies on the presence of a fully connected graph
	The Protocol object is initialized by reading a list of nodes
	and creating the right channels between each of the nodes.
--]]

--[[
print("ComputerNameNetBIOS: ", Network.getHostName(ffi.C.ComputerNameNetBIOS))
print("ComputerNameDnsHostname: ", Network.getHostName(ffi.C.ComputerNameDnsHostname))
print("ComputerNameDnsDomain: ", Network.getHostName(ffi.C.ComputerNameDnsDomain))
print("ComputerNameDnsFullyQualified: ", Network.getHostName(ffi.C.ComputerNameDnsFullyQualified))
print("ComputerNamePhysicalNetBIOS: ", Network.getHostName(ffi.C.ComputerNamePhysicalNetBIOS))
print("ComputerNamePhysicalDnsHostname: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsHostname))
print("ComputerNamePhysicalDnsDomain: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsDomain))
print("ComputerNamePhysicalDnsFullyQualified: ", Network.getHostName(ffi.C.ComputerNamePhysicalDnsFullyQualified))
--]]

local selfhost = Network.getHostName(ffi.C.ComputerNameDnsHostname):lower()
--print("Self Host: ",selfhost)

local BGPPeer = {
  HeartbeatInterval = 50 * 1000;	-- send out a hearbeat on this interval
  UdpPort = 1313;					-- port used to communicate UDP
  EliminateSelf = true;			-- don't consider self to be a peer
}
setmetatable(BGPPeer, {
  __call = function(self, ...)
    return self:create(...)
  end,
})

local BGPPeer_mt = {
  __index = BGPPeer,
}

BGPPeer.init = function(self, config)
  local err
  local obj = {
    Config = config,
    Peers = {},
  }

  -- if there is config information, then use
  -- it to setup sockets to the peer
  if config then
    -- create the client socket
    obj.Name = config.name;
    obj.UdpSender, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
    obj.UdpPeerAddress, err = SocketUtils.CreateSocketAddress(config.host, config.udpport or BGPPeer.UdpPort)


    obj.UdpPeerAddressLen = ffi.sizeof(obj.UdpPeerAddress);
  end

  setmetatable(obj, BGPPeer_mt)

  return obj;
end

BGPPeer.create = function(self, configfile)
  -- load the configuration file
  local fs, err = io.open(configfile, "rb")
	
  if not fs then return nil, "could not load file" end

  local configdata = fs:read("*all")
  fs:close();
	
  local func, err = loadstring(configdata)
  local config = func()

  -- create self, so peers can be added
  local obj = self:init()

  -- add the udp socket
  -- Setup the server socket
  obj.UdpListener, err = IOCPSocket:create(AF_INET, SOCK_DGRAM, 0);
  if not obj.UdpListener then
    return nil, err;
  end

  local success, err = obj.UdpListener:bindToPort(BGPPeer.UdpPort);

  if not success then
    return nil, err;
  end

  -- for each of the peers within the config 
  -- create a new peer, and add it to the 
  -- list of peers for the first object
  for _,peerconfig in ipairs(config) do
		--print("PEER: ", peerconfig.name, peerconfig.host)
    if peerconfig.name:lower() ~= selfhost then
      obj:createPeer(peerconfig)
    end
  end

  -- Need to setup a listener for the UDP port

  -- add the heartbeat timer
  obj.HeartbeatTimer = Timer({
		Delay =  BGPPeer.HeartbeatInterval;
		Period = BGPPeer.HeartbeatInterval;
		OnTime = obj:onHeartbeat();})

  -- start a fiber which will run the UDP listener loop
  spawn(obj:handleUdpListen())

  return obj;
end

BGPPeer.onHeartbeat = function(self)
  local message = "HELLO FROM: "..Network:getHostName();
  local closure = function(timer)

  for peername, peer in pairs(self.Peers) do
    local bytessent, err = peer.UdpSender:sendTo(peer.UdpPeerAddress, peer.UdpPeerAddressLen, 
      message, #message);

    end
  end

  return closure
end

BGPPeer.addPeer = function(self, peer)
  self.Peers[peer.Name] = peer
end

BGPPeer.createPeer = function(self, peerconfig)
  self:addPeer(BGPPeer:init(peerconfig));
end

local count = 0;
BGPPeer.onReceiveHeartBeat = function(self, buff, bytesread)
  count = count + 1
  print("onReceiveHeartBeat: ", count, ffi.string(buff, bytesread));
end


BGPPeer.handleUdpListen = function(self)
  local bufflen = 1500;
  local buff = ffi.new("uint8_t[1500]");
  local from = sockaddr_in();
  local fromLen = ffi.sizeof(from);


  local closure = function()

    while true do
      local bytesread, err = self.UdpListener:receiveFrom(from, fromLen, buff, bufflen);

      if not bytesread then
        print("receiveFrom ERROR: ", err)
        return false, err;
      end

      self:onReceiveHeartBeat(buff, bytesread)
    end
  end

  return closure
end

return BGPPeer

There are a couple things of note here. One of the basic problems I need to solve is the fact that this is not a basic client/server app, but rather a peer to peer setup. As such, each peer both listens on a port, and sends data.

A typical network ‘server’ app may block on listening, and not perform any actions until it receives a request from a ‘client’. In my case, I don’t want to block, but I do want to initiate a ‘blocking’ loop in parallel with some non-blocking stuff.

In this BGPPeer, there is the handleUdpListen() function. This function is the heart of the Udp listening loop, or the “server” side of the peer communication. This function is run in parallel as it is invoked in the constructor of the object using this:

  spawn(obj:handleUdpListen())

In this particular case, the handleUdpListen() function contains a closure (another function) which is the function that is actually returned, and spawned in its own fiber. The function is pretty straight forward.

  local closure = function()

    while true do
      local bytesread, err = self.UdpListener:receiveFrom(from, fromLen, buff, bufflen);

      if not bytesread then
        print("receiveFrom ERROR: ", err)
        return false, err;
      end

      self:onReceiveHeartBeat(buff, bytesread)
    end
  end

Basically, just run a loop forever, waiting to receiveFrom some Udp packet. Since in TINN all IO calls are cooperative, while this receiveFrom is ‘blocked’, the current coroutine is actually yielding, so that other work can continue. That’s great, that’s exactly what I want. So, now the listening side is setup.

How about the sending side? I want to send out a formatted packet on a regular interval. Since TINN has a Timer object, this is a pretty straight forward matter. Within the constructor of the BGPPeer, we find the following:

  -- add the heartbeat timer
  obj.HeartbeatTimer = Timer({
		Delay =  BGPPeer.HeartbeatInterval;
		Period = BGPPeer.HeartbeatInterval;
		OnTime = obj:onHeartbeat();})

With this setup, the timer will execute the ‘onHeartbeat()’ function every “Period”. This is some number of milliseconds. The default is 50 seconds, but it can be configured to any value.

As we saw from the last article on Timers, this timer object will start automatically, and run cooperatively with anything else running in the process. That’s great. So, onHeartbeat just needs to send out information to all its peers whenever it is called.

BGPPeer.onHeartbeat = function(self)
  local message = "HELLO FROM: "..Network:getHostName();
  local closure = function(timer)

  for peername, peer in pairs(self.Peers) do
    local bytessent, err = peer.UdpSender:sendTo(peer.UdpPeerAddress, peer.UdpPeerAddressLen, 
      message, #message);

    end
  end

  return closure
end

This becomes a simple matter of traversing the list of peers, and using the Udp socket that was setup with each one of them to send out the packet we want to send out.

What’s that altogether then? We saw the spawn that sent the Udp listener off in a cooperative manner. And just now I showed the timer going in parallel. The two combined give you a model where you can both listen and send ‘at the same time’.

It looks so easy, and that’s the power of TINN, the scheduler, and Lua’s coroutines. It’s fairly easy to write such code without having to worry overly much about old school things like mutexes, thread context switching, and the like. Just write your code however you want, sprinkle in a few ‘spawn()’ functions here and there, and call it a day. This is how easy it is to write network programs that can walk and chew gum at the same time.

Next time around, I want to write the simple handler that can deal with Win32 message looping and peer networking at the same time. Same general mechanisms, just a cooperative User32 message pump to incorporate.