Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable interlink queue, freelist, and group freelist sizes #1489

Merged
merged 15 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/apps/interlink/receiver.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@ module(...,package.seeall)
local shm = require("core.shm")
local interlink = require("lib.interlink")

local Receiver = {name="apps.interlink.Receiver"}
local Receiver = {
name = "apps.interlink.Receiver",
config = {
queue = {},
size = {default=1024}
}
}

function Receiver:new (queue)
function Receiver:new (conf)
local self = {
attached = false,
queue = conf.queue,
size = conf.size
}
packet.enable_group_freelist()
return setmetatable({attached=false, queue=queue}, {__index=Receiver})
return setmetatable(self, {__index=Receiver})
end

function Receiver:link ()
local queue = self.queue or self.appname
if not self.attached then
self.shm_name = "group/interlink/"..queue..".interlink"
self.backlink = "interlink/receiver/"..queue..".interlink"
self.interlink = interlink.attach_receiver(self.shm_name)
self.interlink = interlink.attach_receiver(self.shm_name, self.size)
shm.alias(self.backlink, self.shm_name)
self.attached = true
end
Expand Down
19 changes: 15 additions & 4 deletions src/apps/interlink/transmitter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@ module(...,package.seeall)
local shm = require("core.shm")
local interlink = require("lib.interlink")

local Transmitter = {name="apps.interlink.Transmitter"}
local Transmitter = {
name = "apps.interlink.Transmitter",
config = {
queue = {},
size = {default=1024}
}
}

function Transmitter:new (queue)
function Transmitter:new (conf)
local self = {
attached = false,
queue = conf.queue,
size = conf.size
}
packet.enable_group_freelist()
return setmetatable({attached=false, queue=queue}, {__index=Transmitter})
return setmetatable(self, {__index=Transmitter})
end

function Transmitter:link ()
local queue = self.queue or self.appname
if not self.attached then
self.shm_name = "group/interlink/"..queue..".interlink"
self.backlink = "interlink/transmitter/"..queue..".interlink"
self.interlink = interlink.attach_transmitter(self.shm_name)
self.interlink = interlink.attach_transmitter(self.shm_name, self.size)
shm.alias(self.backlink, self.shm_name)
self.attached = true
end
Expand Down
86 changes: 57 additions & 29 deletions src/core/group_freelist.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ local band = bit.band
--
-- NB: assumes 32-bit wide loads/stores are atomic (as is the fact on x86_64)!

-- Group freelist holds up to SIZE chunks of chunksize packets each
-- Group freelist holds up to n chunks of chunksize packets each
chunksize = 2048

-- (SIZE=1024)*(chunksize=2048) == roughly two million packets
local SIZE = 1024 -- must be a power of two
local MAX = SIZE - 1
-- (default_size=1024)*(chunksize=2048) == roughly two million packets
local default_size = 1024 -- must be a power of two

local CACHELINE = 64 -- XXX - make dynamic
local INT = ffi.sizeof("uint32_t")
Expand All @@ -35,47 +34,53 @@ struct group_freelist_chunk {

ffi.cdef([[
struct group_freelist {
uint32_t enqueue_pos[1];
uint8_t pad_enqueue_pos[]]..CACHELINE-1*INT..[[];
uint32_t enqueue_pos[1], enqueue_mask;
uint8_t pad_enqueue_pos[]]..CACHELINE-2*INT..[[];

uint32_t dequeue_pos[1];
uint8_t pad_dequeue_pos[]]..CACHELINE-1*INT..[[];
uint32_t dequeue_pos[1], dequeue_mask;
uint8_t pad_dequeue_pos[]]..CACHELINE-2*INT..[[];

struct group_freelist_chunk chunk[]]..SIZE..[[];
uint32_t size, state[1];

uint32_t state[1];
struct group_freelist_chunk chunk[?];
} __attribute__((packed, aligned(]]..CACHELINE..[[)))]])

-- Group freelists states
local CREATE, INIT, READY = 0, 1, 2

function freelist_create (name)
local fl = shm.create(name, "struct group_freelist")
function freelist_create (name, size)
size = size or default_size
assert(band(size, size-1) == 0, "size is not a power of two")

local fl = shm.create(name, "struct group_freelist", size)
if sync.cas(fl.state, CREATE, INIT) then
for i = 0, MAX do
fl.size = size
local mask = size - 1
fl.enqueue_mask, fl.dequeue_mask = mask, mask
for i = 0, fl.size-1 do
fl.chunk[i].sequence[0] = i
end
fl.state[0] = READY
assert(sync.cas(fl.state, INIT, READY))
return fl
else
waitfor(function () return fl.state[0] == READY end)
shm.unmap(fl)
return freelist_open(name)
end
return fl
end

function freelist_open (name, readonly)
local fl = shm.open(name, "struct group_freelist", readonly)
local fl = shm.open(name, "struct group_freelist", 'read-only', 1)
waitfor(function () return fl.state[0] == READY end)
return fl
end

local function mask (i)
return band(i, MAX)
local size = fl.size
shm.unmap(fl)
return shm.open(name, "struct group_freelist", readonly, size)
end

function start_add (fl)
local pos = fl.enqueue_pos[0]
local mask = fl.enqueue_mask
while true do
local chunk = fl.chunk[mask(pos)]
local chunk = fl.chunk[band(pos, mask)]
local seq = chunk.sequence[0]
local dif = seq - pos
if dif == 0 then
Expand All @@ -93,13 +98,14 @@ end

function start_remove (fl)
local pos = fl.dequeue_pos[0]
local mask = fl.dequeue_mask
while true do
local chunk = fl.chunk[mask(pos)]
local chunk = fl.chunk[band(pos, mask)]
local seq = chunk.sequence[0]
local dif = seq - (pos+1)
if dif == 0 then
if sync.cas(fl.dequeue_pos, pos, pos+1) then
return chunk, pos+MAX+1
return chunk, pos+mask+1
end
elseif dif < 0 then
return
Expand All @@ -114,8 +120,25 @@ function finish (chunk, seq)
chunk.sequence[0] = seq
end

local function occupied_chunks (fl)
local enqueue, dequeue = fl.enqueue_pos[0], fl.dequeue_pos[0]
if dequeue > enqueue then
return enqueue + fl.size - dequeue
else
return enqueue - dequeue
end
end

-- Register struct group_freelist as an abstract SHM object type so that
-- the group freelist can be recognized by shm.open_frame and described
-- with tostring().
shm.register('group_freelist', {open=freelist_open})
ffi.metatype("struct group_freelist", {__tostring = function (fl)
return ("%d/%d"):format(occupied_chunks(fl)*chunksize, fl.size*chunksize)
end})

function selftest ()
local fl = freelist_create("test_freelist")
local fl = freelist_create("test.group_freelist")
assert(not start_remove(fl)) -- empty

local w1, sw1 = start_add(fl)
Expand All @@ -133,13 +156,13 @@ function selftest ()
finish(r2, sr2)
assert(not start_remove(fl)) -- empty

for i=1,SIZE do
for i=1,fl.size do
local w, sw = start_add(fl)
assert(w)
finish(w, sw)
end
assert(not start_add(fl)) -- full
for i=1,SIZE do
for i=1,fl.size do
local r, sr = start_remove(fl)
assert(r)
finish(r, sr)
Expand All @@ -148,7 +171,7 @@ function selftest ()

local w = {}
for _=1,10000 do
for _=1,math.random(SIZE) do
for _=1,math.random(fl.size) do
local w1, sw = start_add(fl)
if not w1 then break end
finish(w1, sw)
Expand All @@ -160,4 +183,9 @@ function selftest ()
finish(r, sr)
end
end

local flro = freelist_open("test.group_freelist", 'read-only')
assert(flro.size == fl.size)
local objsize = ffi.sizeof("struct group_freelist", fl.size)
assert(ffi.C.memcmp(fl, flro, objsize) == 0)
end
46 changes: 30 additions & 16 deletions src/core/packet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,28 @@ end

-- Freelist containing empty packets ready for use.

local max_packets = 1e6
local default_max_packets = 1e6

ffi.cdef([[
struct freelist {
int nfree;
int max;
struct packet *list[]]..max_packets..[[];
struct packet *list[?];
};
]])

local function freelist_create(name)
local fl = shm.create(name, "struct freelist")
local function freelist_create(name, max_packets)
max_packets = max_packets or default_max_packets
local fl = shm.create(name, "struct freelist", max_packets)
fl.max = max_packets
return fl
end

local function freelist_open(name, readonly)
return shm.open(name, "struct freelist", readonly)
local fl = shm.open(name, "struct freelist", 'read-only', 1)
local max = fl.max
shm.unmap(fl)
return shm.open(name, "struct freelist", readonly, max)
end

local function freelist_full(freelist)
Expand Down Expand Up @@ -104,14 +108,21 @@ local packets_allocated = 0
local packets_fl, group_fl

-- Call to ensure packet freelist is enabled.
function initialize ()
packets_fl = freelist_create("engine/packets.freelist")
function initialize (max_packets)
if packets_fl then
assert(packets_fl.nfree == 0, "freelist is already in use")
shm.unmap(packets_fl)
shm.unlink("engine/packets.freelist")
end
packets_fl = freelist_create("engine/packets.freelist", max_packets)
end

-- Call to ensure group freelist is enabled.
function enable_group_freelist ()
function enable_group_freelist (nchunks)
if not group_fl then
group_fl = group_freelist.freelist_create("group/packets.freelist")
group_fl = group_freelist.freelist_create(
"group/packets.group_freelist", nchunks
)
end
end

Expand Down Expand Up @@ -147,12 +158,9 @@ function reclaim_step ()
end
end

-- Register struct freelist as an abstract SHM object type so that the group
-- Register struct freelist as an abstract SHM object type so that the
-- freelist can be recognized by shm.open_frame and described with tostring().
shm.register(
'freelist',
{open = function (name) return shm.open(name, "struct freelist") end}
)
shm.register('freelist', {open=freelist_open})
ffi.metatype("struct freelist", {__tostring = function (freelist)
return ("%d/%d"):format(freelist.nfree, freelist.max)
end})
Expand All @@ -176,7 +184,7 @@ end
-- process termination.
function shutdown (pid)
local in_group, group_fl = pcall(
group_freelist.freelist_open, "/"..pid.."/group/packets.freelist"
group_freelist.freelist_open, "/"..pid.."/group/packets.group_freelist"
)
if in_group then
local packets_fl = freelist_open("/"..pid.."/engine/packets.freelist")
Expand Down Expand Up @@ -308,7 +316,7 @@ end

function preallocate_step()
assert(packets_allocated + packet_allocation_step
<= max_packets - group_fl_chunksize,
<= packets_fl.max - group_fl_chunksize,
"packet allocation overflow")

for i=1, packet_allocation_step do
Expand All @@ -319,6 +327,12 @@ function preallocate_step()
end

function selftest ()
initialize(10000)
assert(packets_fl.max == 10000)
allocate()
local ok, err = pcall(initialize)
assert(not ok and err:match("freelist is already in use"))

assert(is_aligned(0, 1))
assert(is_aligned(1, 1))
assert(is_aligned(2, 1))
Expand Down
Loading