Optimize At the Right Time and Level

It takes a long time to do the simplest things correctly. You want to receive some bytes on a socket?
int recv(SOCKET s,char *buf,int len,int flags);

No problem right? How about that socket? How did that get setup? Is it right for low latency, big buffers, small buffers? what? And what about that buffer? and lord knows which are the best flags to use? First question I guess is, ‘for what purpose’?

For the longest time, I have known that my IP sockets were not the most performant. Through rounds of trying to get low level networking, all the way up through http handling working correctly, I deliberately made some things extremely brain dead simple.

One of those things was my readline() function. this is the workhorse of http processing. Previously, I would literally read a single character at a time, looking for those line terminators. Which meant making that recv() call a bazillion times during normal http processing.

Well, it has worked great, reduces the surface area to help flesh out various bugs, and allowed me to finally implement the IO Completion Port based networking stack that I now have. It works well enough. But, it’s time to optimize. I am now about to add various types of streams such as compressed, encrypted, and the like. For that I need to actually buffer up chunks at a time, not read single characters at a time.

So, I finally optimized this code.

function IOCPNetStream:refillReadingBuffer()
	print("IOCPNetStream:RefillReadingBuffer(): ",self.ReadingBuffer:BytesReadyToBeRead());

	-- if the buffer already has data in it
	-- then just return the number of bytes that
	-- are currently ready to be read.
	if self.ReadingBuffer:BytesReadyToBeRead() > 0 then
		return self.ReadingBuffer:BytesReadyToBeRead();
	end

	-- If there are currently no bytes ready, then we need
	-- to refill the buffer from the source
	local err
	local bytesread

	bytesread, err = self.Socket:receive(self.ReadingBuffer.Buffer, self.ReadingBuffer.Length)

	print("-- LOADED BYTES: ", bytesread, err);

	if not bytesread then
		return false, err;
	end

	if bytesread == 0 then
		return false, "eof"
	end

	self.ReadingBuffer:Reset()
	self.ReadingBuffer.BytesWritten = bytesread

	return bytesread
end

Basically, whenever some bytes are needed within the stream, the refillReadingBuffer() function is called. If there are still bytes sitting in the buffer, then those bytes are used. If it’s empty, then new bytes are read in. The buffer can be whatever size you want, so if your app is better off reading 1500 bytes at a time, then make it so. If you find performance is best at 4096 bytes per read, then set it up that way.

Now that pre-fetching is occuring, all operations related to reading bytes from the source MUST go through this buffer, or bytes will be missed. So, the readLine() function looks like this now:

function IOCPNetStream:readByte()
	-- First see if we can get a byte out of the
	-- Reading buffer
	local abyte,err = self.ReadingBuffer:ReadByte()

	if abyte then
		return abyte
	end

	-- If we did not get a byte out of the reading buffer
	-- try refilling the buffer, and try again
	local bytesread, err = self:refillReadingBuffer()

	if bytesread then
		abyte, err = self.ReadingBuffer:ReadByte()
		return abyte, err
	end

	-- If there was an error
	-- then return that error immediately
	print("-- IOCPNetStream:readByte, ERROR: ", err)
	return false, err
end

function IOCPNetStream:readOneLine(buffer, size)
	local nchars = 0;
	local ptr = ffi.cast("uint8_t *", buff);
	local bytesread
	local byteread
	local err

	while nchars < size do
		byteread, err = self:readByte();

		if not byteread then
			-- err is either "eof" or some other socket error
			break
		else
			if byteread == 0 then
				break
			elseif byteread == LF then
				--io.write("LF]\n")
				break
			elseif byteread == CR then
				-- swallow it and continue on
				--io.write("[CR")
			else
				-- Just a regular character, so save it
				buffer[nchars] = byteread
				nchars = nchars+1
			end
		end
	end

	if err and err ~= "eof" then
		return nil, err
	end

	return nchars
end

The readLine() is still reading a byte at a time, but this time, instead of going to the underlying socket directly, it’s calling the stream’s version of readByte(), which in turn is reading a single byte from the pre-fetched buffer. That’s a lot faster than making the recv() system call, and when the buffer runs out of bytes, it will be automatically refilled, until there is a socket error.

Well, that’s just great. The performance boost is one thing, but there is another benefit. Now that I am reading in controllable chunks at a time, I could up the size to say 16K for the first chunk read for an http request. That would likely get all the data for whatever headers there are included in the buffer up front. From there I could do simple pointer offset/size objects, instead of creating actual strings for every line (creates copies). That would be a great optimization.

I can wait for further optimizations. Getting the pre-fetch working, and continuing to work with IOCP is a more important optimization at this time.

And so it goes. Don’t optimize too early until you really understand the performance characteristics of your system. Then, only belatedly, do what minimal amount you can to achieve the specific goal you are trying to achieve before moving on.


Managing Socket Pools

There are many times when I need to manage a set of network connections. One situation is when I’m the ‘client’ side of a connection. Ideally, I would have a pool of connections ready to go at a moment’s notice, because TCP/IP startup times can take a while. Additionally, I’d like to reuse connections that have already been established, for the same reason. So, what to do.

I have this SocketPool construct, which manages a set of network streams (tcp/ip connections). The code looks like this:

local Collections = require "Collections"
local NetStream = require "NetStream"

local SocketPool_t = {}
local SocketPool_mt = {
  __index = SocketPool_t;
}

local SocketPool = function(params)
  params = params or {host="localhost", port=80, reserve=2, timeout=60*2}

  local obj = {
    Connections = Collections.Queue.new();
    Hostname = params.host or "localhost";
    Port = tonumber(params.port) or 80;
    Reserve = params.reserve or 2;
    Timeout = params.timeout,
  }
  setmetatable(obj, SocketPool_mt);

  obj:Cycle()  -- Reserve initial set of connections
  return obj
end

function SocketPool_t:CreateNewConnection()
  local connection, err = NetStream.Open(self.Hostname, self.Port)

  if not connection then
    return nil, err
  end
  connection:SetIdleInterval(self.Timeout);
  self:AddConnection(connection);
  
  return connection
end

function SocketPool_t:CleanoutSockets()
  local qLen = self.Connections:Len()

  -- Check each connection to see if it's still connected
  -- and if it has run past its idle time
  while qLen > 0 do
    local connection = self.Connections:Dequeue()
    self:AddConnection(connection)

    qLen = qLen - 1
  end
end

function SocketPool_t:Cycle()
  self:CleanoutSockets();

  -- Fill back up to the reserve level
  while self.Connections:Len() < self.Reserve do
    local connection, err = self:CreateNewConnection()
    if not connection then
      return false, err
    end
  end
  return true
end

function SocketPool_t:AddConnection(connection)
  if not connection:IsConnected() or connection:IsIdle() then
    return false
  end
  self.Connections:Enqueue(connection)
  return true
end

function SocketPool_t:GetConnection()
  self:Cycle();
  return self.Connections:Dequeue()
end

return SocketPool

Well, that’s a bit of a mouthful, but really it’s fairly straight forward. The usage is like this:

local sockpool = SocketPool({host="www.google.com", port=80, reserve=2, timeout=120})

while running do
  local conn = sockpool:GetConnection();
  conn:Send("Hello, World");
  sockpool:AddConnection(conn);
end

Basically, create an instance of a socket pool, connecting to ‘www.google.com:80’. Each time through the loop, I get a connection from the pool, do something with it, and return the connection back to the pool.

This is really convenient because the pool will take care of getting rid of the socket if it has already been closed, by either end, and creating new sockets when needed. In this particular case, I have setup the pool such that I will always have 2 sockets in reserve. That means that as long as I’m asking for one at a time, I’m always going to get a socket that’s already connected and ready to go.

I have put in a timeout of two minutes (120 seconds). That way, when I go to get a new socket, the pool will first get rid of those sockets that have been sitting around for too long. If it has fallen below the reserve mark (2), then it will refill with fresh new connections. The timeout is very useful to have because when you’re using tcp/ip based sockets out on the internet, there are numerous reasons why having a stale socket sitting around is not a good thing. So, dumping them out of your usage pool with some frequency ensures that what remains is fresh and will not suffer from various ailments such as being silently closed by some intervening router.

I’ve used a couple of things here like the NetStream, and the Queue, which aren’t standard fare, but you can get the general idea.

There is one little item that I found to be challening to really do correctly, on Win32 at least. The function: connection:IsConnected() is supposed to determine whether a tcp/ip socket is currently connected or not. On Windows, this is a surprisingly non-obvious thing to determine.

Here’s the socket code for it:

GetConnectionTime = function(self)
  local poptvalue = ffi.new('int[1]')
  local poptsize = ffi.new('int[1]',ffi.sizeof('int'))
  local size = ffi.sizeof('int')

  local success, err = WinSock.getsockopt(self.Handle, SOL_SOCKET, SO_CONNECT_TIME, poptvalue, poptsize)
		
  if not success then
    return nil, err
  end

  return poptvalue[0];		
end,

IsConnected = function(self)
  success, err = self:GetConnectionTime()
  if success and success >= 0 then
    return true
  end

  return false
end,

The key is the GetConnectionTime() function. This call will tell you how long a socket has been connected. There are two ways in which this call can fail. The first is that the socket was never actually connected to anything. In this case, getsockopt() will succeed, but the connection time will be ‘-1’. The second case is that the socket was connected at some point, but subsequently disconnected. In this case, the getsockopt() will return nil, WSAENOTSOCK (or some other error).

The IsConnected() function takes advantage of these return values, assuming the only valid state is that the connection time is greater than or equal to 0. All other cases indicate the socket is not connected.

This is a fairly quick and easy test. Perhaps a more robust situation might be to use io completion ports, and catch the state transition there. But, this is a nice quick and dirty mechanism that you can use at any time, without having to get your feet into the io completion world.

So, there you have it. Fairly easy management of client side tcp/ip connections. By using such a mechanism, my code is much more compact and clean, and I don’t have to worry about managing my socket connections.