polyflux/evloop.lua

-- module implementing nestable asynchronous event loop objects. the module
-- itself is an event loop object, representing the main event loop.

local M

local function new(...)
	local new = setmetatable({}, M)
	new.event_queue = {}
	new.tasks = {}
	new.named_tasks = {}
	for i = 1, select("#", ...) do
		new:wrap(select(i, ...))
	end
	return new
end

M = new()
M.__index = M

M.new = new

-- create a new task in the event loop.
function M:wrap(fn, ...)
	local name = false
	if type(fn) == "table" then
		name = fn.name
		fn = fn[1]
	end
	table.insert(self.event_queue, {coroutine.create(fn), name, ...})
end

-- send an event: evloop:queue(event, args...)
function M:queue(...)
	assert(type((...)))
	table.insert(self.event_queue, {...})
end

-- destroy a named task
function M:kill(name)
	if self.named_tasks[name] then
		self.tasks[self.named_tasks[name]] = nil
		self.named_tasks[name] = nil
	end
end

local function event_filter(timeout, ...)
	local filter = {}
	for i = 1, select("#", ...) do
		filter[select(i, ...)] = true
	end
	if type(timeout) == "string" then
		filter[timeout] = true
		timeout = nil
	end
	return filter, timeout
end

-- poll for an event: evloop.poll([timeout,] [events...])
-- returns: event, args...
function M.poll(...)
	local filter, timeout = event_filter(...)
	local e
	local t = 0
	repeat
		e = coroutine.yield(true)
		if e[1] == "timer" then
			t = t + e[2]
		end
		if timeout and t >= timeout then
			return
		end
	until filter[e[1]] or (not timeout and not next(filter))
	return unpack(e)
end

-- like evloop.poll but returns an iterator
function M.events(...)
	local filter, timeout = event_filter(...)
	local t = 0
	local function iter()
		if timeout and t >= timeout then
			t = t - timeout
			return false
		end
		local e = coroutine.yield(true)
		if e[1] == "timer" then
			t = t + e[2]
		end
		if (timeout or next(filter)) and not filter[e[1]] then
			return iter()
		else
			return unpack(e)
		end
	end
	return iter
end

function M.sleep(secs)
	M.poll(secs)
end

function M:quit(...)
	self:queue("quit", ...)
end

function M.debug_sleep(secs)
	local t = 0
	M.queue "debug.pause"
	while t < secs do
		local _, dt = M.poll "debug.timer"
		t = t + dt
	end
	M.queue "debug.unpause"
end

local function resume_task(co, ...)
	if coroutine.status(co) == "dead" then
		return false
	end

	local ok, ret = coroutine.resume(co, ...)
	if not ok then
		-- TODO: make this better somehow
		io.stderr:write(debug.traceback(co, ret)..'\n\n')
		error "error in event loop"
	end

	return coroutine.status(co) ~= "dead"
end

-- run each task in the event loop at once
function M:run()
	while true do
		::send_events::
		local q = self.event_queue
		self.event_queue = {}
		for _, e in ipairs(q) do
			if type(e[1]) == "thread" then
				-- start a new task
				local name = e[2]
				if resume_task(e[1], unpack(e, 3)) then
					self.tasks[e[1]] = true
					if name then self.named_tasks[name] = e[1] end
				end
			elseif e[1] == "quit" then
				return unpack(e, 2)
			else
				for task in pairs(self.tasks) do
					self.tasks[task] = resume_task(task, e) or nil
				end
			end
		end
		if #self.event_queue == 0 then
			if not next(self.tasks) then return end
			assert(coroutine.running(), "event queue depleted!")
			table.insert(self.event_queue, coroutine.yield(true))
		end
	end
end

return M