diff --git a/src/apps/interlink/receiver.lua b/src/apps/interlink/receiver.lua index 07afa17965..8cfefd1e65 100644 --- a/src/apps/interlink/receiver.lua +++ b/src/apps/interlink/receiver.lua @@ -5,11 +5,22 @@ module(...,package.seeall) local shm = require("core.shm") local interlink = require("lib.interlink") -local Receiver = {name="apps.interlink.Receiver"} +local Receiver = { + name = "apps.interlink.Receiver", + config = { + queue = {}, + size = {default=1024} + } +} -function Receiver:new (queue) +function Receiver:new (conf) + local self = { + attached = false, + queue = conf.queue, + size = conf.size + } packet.enable_group_freelist() - return setmetatable({attached=false, queue=queue}, {__index=Receiver}) + return setmetatable(self, {__index=Receiver}) end function Receiver:link () @@ -17,7 +28,7 @@ function Receiver:link () if not self.attached then self.shm_name = "group/interlink/"..queue..".interlink" self.backlink = "interlink/receiver/"..queue..".interlink" - self.interlink = interlink.attach_receiver(self.shm_name) + self.interlink = interlink.attach_receiver(self.shm_name, self.size) shm.alias(self.backlink, self.shm_name) self.attached = true end diff --git a/src/apps/interlink/transmitter.lua b/src/apps/interlink/transmitter.lua index 9e4afd8ba5..e3f8448776 100644 --- a/src/apps/interlink/transmitter.lua +++ b/src/apps/interlink/transmitter.lua @@ -5,11 +5,22 @@ module(...,package.seeall) local shm = require("core.shm") local interlink = require("lib.interlink") -local Transmitter = {name="apps.interlink.Transmitter"} +local Transmitter = { + name = "apps.interlink.Transmitter", + config = { + queue = {}, + size = {default=1024} + } +} -function Transmitter:new (queue) +function Transmitter:new (conf) + local self = { + attached = false, + queue = conf.queue, + size = conf.size + } packet.enable_group_freelist() - return setmetatable({attached=false, queue=queue}, {__index=Transmitter}) + return setmetatable(self, {__index=Transmitter}) end function Transmitter:link () @@ -17,7 +28,7 @@ function Transmitter:link () if not self.attached then self.shm_name = "group/interlink/"..queue..".interlink" self.backlink = "interlink/transmitter/"..queue..".interlink" - self.interlink = interlink.attach_transmitter(self.shm_name) + self.interlink = interlink.attach_transmitter(self.shm_name, self.size) shm.alias(self.backlink, self.shm_name) self.attached = true end diff --git a/src/core/group_freelist.lua b/src/core/group_freelist.lua index 142a2fbc18..2d233a99b3 100644 --- a/src/core/group_freelist.lua +++ b/src/core/group_freelist.lua @@ -17,12 +17,11 @@ local band = bit.band -- -- NB: assumes 32-bit wide loads/stores are atomic (as is the fact on x86_64)! --- Group freelist holds up to SIZE chunks of chunksize packets each +-- Group freelist holds up to n chunks of chunksize packets each chunksize = 2048 --- (SIZE=1024)*(chunksize=2048) == roughly two million packets -local SIZE = 1024 -- must be a power of two -local MAX = SIZE - 1 +-- (default_size=1024)*(chunksize=2048) == roughly two million packets +local default_size = 1024 -- must be a power of two local CACHELINE = 64 -- XXX - make dynamic local INT = ffi.sizeof("uint32_t") @@ -35,47 +34,53 @@ struct group_freelist_chunk { ffi.cdef([[ struct group_freelist { - uint32_t enqueue_pos[1]; - uint8_t pad_enqueue_pos[]]..CACHELINE-1*INT..[[]; + uint32_t enqueue_pos[1], enqueue_mask; + uint8_t pad_enqueue_pos[]]..CACHELINE-2*INT..[[]; - uint32_t dequeue_pos[1]; - uint8_t pad_dequeue_pos[]]..CACHELINE-1*INT..[[]; + uint32_t dequeue_pos[1], dequeue_mask; + uint8_t pad_dequeue_pos[]]..CACHELINE-2*INT..[[]; - struct group_freelist_chunk chunk[]]..SIZE..[[]; + uint32_t size, state[1]; - uint32_t state[1]; + struct group_freelist_chunk chunk[?]; } __attribute__((packed, aligned(]]..CACHELINE..[[)))]]) -- Group freelists states local CREATE, INIT, READY = 0, 1, 2 -function freelist_create (name) - local fl = shm.create(name, "struct group_freelist") +function freelist_create (name, size) + size = size or default_size + assert(band(size, size-1) == 0, "size is not a power of two") + + local fl = shm.create(name, "struct group_freelist", size) if sync.cas(fl.state, CREATE, INIT) then - for i = 0, MAX do + fl.size = size + local mask = size - 1 + fl.enqueue_mask, fl.dequeue_mask = mask, mask + for i = 0, fl.size-1 do fl.chunk[i].sequence[0] = i end - fl.state[0] = READY + assert(sync.cas(fl.state, INIT, READY)) + return fl else - waitfor(function () return fl.state[0] == READY end) + shm.unmap(fl) + return freelist_open(name) end - return fl end function freelist_open (name, readonly) - local fl = shm.open(name, "struct group_freelist", readonly) + local fl = shm.open(name, "struct group_freelist", 'read-only', 1) waitfor(function () return fl.state[0] == READY end) - return fl -end - -local function mask (i) - return band(i, MAX) + local size = fl.size + shm.unmap(fl) + return shm.open(name, "struct group_freelist", readonly, size) end function start_add (fl) local pos = fl.enqueue_pos[0] + local mask = fl.enqueue_mask while true do - local chunk = fl.chunk[mask(pos)] + local chunk = fl.chunk[band(pos, mask)] local seq = chunk.sequence[0] local dif = seq - pos if dif == 0 then @@ -93,13 +98,14 @@ end function start_remove (fl) local pos = fl.dequeue_pos[0] + local mask = fl.dequeue_mask while true do - local chunk = fl.chunk[mask(pos)] + local chunk = fl.chunk[band(pos, mask)] local seq = chunk.sequence[0] local dif = seq - (pos+1) if dif == 0 then if sync.cas(fl.dequeue_pos, pos, pos+1) then - return chunk, pos+MAX+1 + return chunk, pos+mask+1 end elseif dif < 0 then return @@ -114,8 +120,25 @@ function finish (chunk, seq) chunk.sequence[0] = seq end +local function occupied_chunks (fl) + local enqueue, dequeue = fl.enqueue_pos[0], fl.dequeue_pos[0] + if dequeue > enqueue then + return enqueue + fl.size - dequeue + else + return enqueue - dequeue + end +end + +-- Register struct group_freelist as an abstract SHM object type so that +-- the group freelist can be recognized by shm.open_frame and described +-- with tostring(). +shm.register('group_freelist', {open=freelist_open}) +ffi.metatype("struct group_freelist", {__tostring = function (fl) + return ("%d/%d"):format(occupied_chunks(fl)*chunksize, fl.size*chunksize) +end}) + function selftest () - local fl = freelist_create("test_freelist") + local fl = freelist_create("test.group_freelist") assert(not start_remove(fl)) -- empty local w1, sw1 = start_add(fl) @@ -133,13 +156,13 @@ function selftest () finish(r2, sr2) assert(not start_remove(fl)) -- empty - for i=1,SIZE do + for i=1,fl.size do local w, sw = start_add(fl) assert(w) finish(w, sw) end assert(not start_add(fl)) -- full - for i=1,SIZE do + for i=1,fl.size do local r, sr = start_remove(fl) assert(r) finish(r, sr) @@ -148,7 +171,7 @@ function selftest () local w = {} for _=1,10000 do - for _=1,math.random(SIZE) do + for _=1,math.random(fl.size) do local w1, sw = start_add(fl) if not w1 then break end finish(w1, sw) @@ -160,4 +183,9 @@ function selftest () finish(r, sr) end end + + local flro = freelist_open("test.group_freelist", 'read-only') + assert(flro.size == fl.size) + local objsize = ffi.sizeof("struct group_freelist", fl.size) + assert(ffi.C.memcmp(fl, flro, objsize) == 0) end \ No newline at end of file diff --git a/src/core/packet.lua b/src/core/packet.lua index f282357f91..7c9a7e34ca 100644 --- a/src/core/packet.lua +++ b/src/core/packet.lua @@ -52,24 +52,28 @@ end -- Freelist containing empty packets ready for use. -local max_packets = 1e6 +local default_max_packets = 1e6 ffi.cdef([[ struct freelist { int nfree; int max; - struct packet *list[]]..max_packets..[[]; + struct packet *list[?]; }; ]]) -local function freelist_create(name) - local fl = shm.create(name, "struct freelist") +local function freelist_create(name, max_packets) + max_packets = max_packets or default_max_packets + local fl = shm.create(name, "struct freelist", max_packets) fl.max = max_packets return fl end local function freelist_open(name, readonly) - return shm.open(name, "struct freelist", readonly) + local fl = shm.open(name, "struct freelist", 'read-only', 1) + local max = fl.max + shm.unmap(fl) + return shm.open(name, "struct freelist", readonly, max) end local function freelist_full(freelist) @@ -104,14 +108,21 @@ local packets_allocated = 0 local packets_fl, group_fl -- Call to ensure packet freelist is enabled. -function initialize () - packets_fl = freelist_create("engine/packets.freelist") +function initialize (max_packets) + if packets_fl then + assert(packets_fl.nfree == 0, "freelist is already in use") + shm.unmap(packets_fl) + shm.unlink("engine/packets.freelist") + end + packets_fl = freelist_create("engine/packets.freelist", max_packets) end -- Call to ensure group freelist is enabled. -function enable_group_freelist () +function enable_group_freelist (nchunks) if not group_fl then - group_fl = group_freelist.freelist_create("group/packets.freelist") + group_fl = group_freelist.freelist_create( + "group/packets.group_freelist", nchunks + ) end end @@ -147,12 +158,9 @@ function reclaim_step () end end --- Register struct freelist as an abstract SHM object type so that the group +-- Register struct freelist as an abstract SHM object type so that the -- freelist can be recognized by shm.open_frame and described with tostring(). -shm.register( - 'freelist', - {open = function (name) return shm.open(name, "struct freelist") end} -) +shm.register('freelist', {open=freelist_open}) ffi.metatype("struct freelist", {__tostring = function (freelist) return ("%d/%d"):format(freelist.nfree, freelist.max) end}) @@ -176,7 +184,7 @@ end -- process termination. function shutdown (pid) local in_group, group_fl = pcall( - group_freelist.freelist_open, "/"..pid.."/group/packets.freelist" + group_freelist.freelist_open, "/"..pid.."/group/packets.group_freelist" ) if in_group then local packets_fl = freelist_open("/"..pid.."/engine/packets.freelist") @@ -308,7 +316,7 @@ end function preallocate_step() assert(packets_allocated + packet_allocation_step - <= max_packets - group_fl_chunksize, + <= packets_fl.max - group_fl_chunksize, "packet allocation overflow") for i=1, packet_allocation_step do @@ -319,6 +327,12 @@ function preallocate_step() end function selftest () + initialize(10000) + assert(packets_fl.max == 10000) + allocate() + local ok, err = pcall(initialize) + assert(not ok and err:match("freelist is already in use")) + assert(is_aligned(0, 1)) assert(is_aligned(1, 1)) assert(is_aligned(2, 1)) diff --git a/src/core/shm.lua b/src/core/shm.lua index 37f65af4ac..989fe1750f 100644 --- a/src/core/shm.lua +++ b/src/core/shm.lua @@ -16,13 +16,13 @@ root = os.getenv("SNABB_SHM_ROOT") or "/var/run/snabb" mappings = {} -- Map an object into memory. -local function map (name, type, readonly, create) +local function map (name, type, readonly, create, ...) local path = resolve(name) local mapmode = readonly and 'read' or 'read, write' local ctype = ffi.typeof(type) - local size = ffi.sizeof(ctype) + local size = ffi.sizeof(ctype, ...) local stat = S.stat(root..'/'..path) - if stat and stat.size ~= size then + if stat and stat.size < size then print(("shm warning: resizing %s from %d to %d bytes") :format(path, stat.size, size)) end @@ -38,7 +38,7 @@ local function map (name, type, readonly, create) if create then assert(fd:ftruncate(size), "shm: ftruncate failed") else - assert(fd:fstat().size == size, "shm: unexpected size") + assert(fd:fstat().size >= size, "shm: unexpected size") end local mem, err = S.mmap(nil, size, mapmode, "shared", fd, 0) fd:close() @@ -47,12 +47,12 @@ local function map (name, type, readonly, create) return ffi.cast(ffi.typeof("$&", ctype), mem) end -function create (name, type) - return map(name, type, false, true) +function create (name, type, ...) + return map(name, type, false, true, ...) end -function open (name, type, readonly) - return map(name, type, readonly, false) +function open (name, type, readonly, ...) + return map(name, type, readonly, false, ...) end function exists (name) @@ -215,6 +215,17 @@ function selftest () unmap(p1) assert(not exists(name)) + -- Checking parameterized types + print("checking parameterized types..") + local name = "shm/selftest/parameterized" + local p1 = create(name, "struct { int x; int xs[?]; }", 10) + local p2 = open(name, "struct { int x; int xs[?]; }", 'read-only', 10) + p1.xs[9] = 42 + assert(p2.xs[9] == 42) + unmap(p2) + unmap(p1) + assert(unlink(name)) + -- Test that we can open and cleanup many objects print("checking many objects..") local path = 'shm/selftest/manyobj' diff --git a/src/lib/interlink.lua b/src/lib/interlink.lua index d3f0b066ff..1dc26fb0db 100644 --- a/src/lib/interlink.lua +++ b/src/lib/interlink.lua @@ -63,24 +63,23 @@ local band = require("bit").band local waitfor = require("core.lib").waitfor local sync = require("core.sync") -local SIZE = 1024 local CACHELINE = 64 -- XXX - make dynamic -local INT = ffi.sizeof("int") - -assert(band(SIZE, SIZE-1) == 0, "SIZE is not a power of two") +local INT = ffi.sizeof("uint32_t") -- Based on MCRingBuffer, see -- http://www.cse.cuhk.edu.hk/%7Epclee/www/pubs/ipdps10.pdf -ffi.cdef([[ struct interlink { - int read, write, state[1]; - char pad1[]]..CACHELINE-3*INT..[[]; - int lwrite, nread; - char pad2[]]..CACHELINE-2*INT..[[]; - int lread, nwrite; - char pad3[]]..CACHELINE-2*INT..[[]; - struct packet *packets[]]..SIZE..[[]; -} __attribute__((packed, aligned(]]..CACHELINE..[[)))]]) +ffi.cdef([[ + struct interlink { + uint32_t read, write, size, state[1]; + char pad1[]]..CACHELINE-4*INT..[[]; + uint32_t lwrite, nread, rmask; + char pad2[]]..CACHELINE-3*INT..[[]; + uint32_t lread, nwrite, wmask; + char pad3[]]..CACHELINE-3*INT..[[]; + struct packet *packets[?]; + } __attribute__((packed, aligned(]]..CACHELINE..[[))) +]]) -- The life cycle of an interlink is managed using a state machine. This is -- necessary because we allow receiving and transmitting processes to attach @@ -92,13 +91,15 @@ ffi.cdef([[ struct interlink { -- once the former receiver has detached while the transmitter stays attached -- throughout, and vice-versa. -- --- Interlinks can be in one of five states: +-- Interlinks can be in one of six states: -local FREE = 0 -- Implicit initial state due to 0 value. -local RXUP = 1 -- Receiver has attached. -local TXUP = 2 -- Transmitter has attached. -local DXUP = 3 -- Both ends have attached. -local DOWN = 4 -- Both ends have detached; must be re-allocated. +local INIT = 0 -- Implicit initial state due to 0 value. +local CONF = 1 -- Queue size is being configured. +local FREE = 2 -- Queue is in free state, ready to attach. +local RXUP = 3 -- Receiver has attached. +local TXUP = 4 -- Transmitter has attached. +local DXUP = 5 -- Both ends have attached. +local DOWN = 6 -- Both ends have detached; must be re-allocated. -- If at any point both ends have detached from an interlink it stays in the -- DOWN state until it is deallocated. @@ -107,7 +108,9 @@ local DOWN = 4 -- Both ends have detached; must be re-allocated. -- -- Who Change Why -- ------ ------------- --------------------------------------------------- --- (any) none -> FREE A process creates the queue (initial state). +-- (any) none -> INIT A process creates the queue (initial state). +-- (any) INIT -> CONF A process has started configuring the queue. +-- (any) CONF -> FREE A process has initialized and configured the queue. -- recv. FREE -> RXUP Receiver attaches to free queue. -- recv. TXUP -> DXUP Receiver attaches to queue with ready transmitter. -- recv. DXUP -> TXUP Receiver detaches from queue. @@ -121,6 +124,10 @@ local DOWN = 4 -- Both ends have detached; must be re-allocated. -- -- Who Change Why *PROHIBITED* -- ------ ----------- -------------------------------------------------------- +-- recv. INIT->RXUP Can not attach to uninitialized queue. +-- trans. INIT->TXUP Can not attach to uninitialized queue. +-- recv. CONF->RXUP Can not attach to unconfigured queue. +-- trans. CONF->TXUP Can not attach to unconfigured queue. -- (any) FREE->DEAD Cannot shutdown before having attached. -- (any) *->FREE Cannot transition to FREE except by reallocating. -- recv. TXUP->DEAD Receiver cannot mutate queue after it has detached. @@ -130,15 +137,24 @@ local DOWN = 4 -- Both ends have detached; must be re-allocated. -- (any) DXUP->DOWN Cannot shutdown queue while it is in use. -- (any) DOWN->* Cannot transition from DOWN (must create new queue.) -local function attach (name, initialize) +local function attach (name, size, transitions) + assert(band(size, size-1) == 0, "size is not a power of two") local r local first_try = true waitfor( function () -- Create/open the queue. - r = shm.create(name, "struct interlink") - -- Return if we succeed to initialize it. - if initialize(r) then return true end + r = shm.create(name, "struct interlink", size) + -- Initialize queue and configure its size + -- (only one process can set size). + if sync.cas(r.state, INIT, CONF) then + r.size = size + local mask = size - 1 + r.rmask, r.wmask = mask, mask + assert(sync.cas(r.state, CONF, FREE)) + end + -- Return if we succeed to attach. + if transitions(r) then return true end -- We failed; handle error and try again. shm.unmap(r) if first_try then @@ -147,20 +163,22 @@ local function attach (name, initialize) end end ) + -- Make sure we agree on the queue size. + assert(r.size == size, "interlink: queue size mismatch on: "..name) -- Ready for action :) return r end -function attach_receiver (name) - return attach(name, +function attach_receiver (name, size) + return attach(name, size, -- Attach to free queue as receiver (FREE -> RXUP) -- or queue with ready transmitter (TXUP -> DXUP.) function (r) return sync.cas(r.state, FREE, RXUP) or sync.cas(r.state, TXUP, DXUP) end) end -function attach_transmitter (name) - return attach(name, +function attach_transmitter (name, size) + return attach(name, size, -- Attach to free queue as transmitter (FREE -> TXUP) -- or queue with ready receiver (RXUP -> DXUP.) function (r) return sync.cas(r.state, FREE, TXUP) @@ -206,12 +224,12 @@ end -- Queue operations follow below. -local function NEXT (i) - return band(i + 1, SIZE - 1) +local function NEXT (mask, i) + return band(i + 1, mask) end function full (r) - local after_nwrite = NEXT(r.nwrite) + local after_nwrite = NEXT(r.wmask, r.nwrite) if after_nwrite == r.lread then if after_nwrite == r.read then return true @@ -222,7 +240,7 @@ end function insert (r, p) r.packets[r.nwrite] = p - r.nwrite = NEXT(r.nwrite) + r.nwrite = NEXT(r.wmask, r.nwrite) end function push (r) @@ -241,7 +259,7 @@ end function extract (r) local p = r.packets[r.nread] - r.nread = NEXT(r.nread) + r.nread = NEXT(r.rmask, r.nread) return p end @@ -258,24 +276,29 @@ end shm.register('interlink', getfenv()) function open (name, readonly) - return shm.open(name, "struct interlink", readonly) + local r = shm.open(name, "struct interlink", 'read-only', 1) + local size = r.size + shm.unmap(r) + return shm.open(name, "struct interlink", readonly, size) end local function describe (r) local function queue_fill (r) - local read, write = r.read, r.write - return read > write and write + SIZE - read or write - read + local read, write, size = r.read, r.write, r.size + return read > write and write + size - read or write - read end local function status (r) return ({ - [FREE] = "initializing", + [INIT] = "being initialized", + [CONF] = "being configuring", + [FREE] = "free to attach", [RXUP] = "waiting for transmitter", [TXUP] = "waiting for receiver", [DXUP] = "in active use", [DOWN] = "deallocating" })[r.state[0]] end - return ("%d/%d (%s)"):format(queue_fill(r), SIZE - 1, status(r)) + return ("%d/%d (%s)"):format(queue_fill(r), size - 1, status(r)) end -ffi.metatype(ffi.typeof("struct interlink"), {__tostring=describe}) +ffi.metatype("struct interlink", {__tostring=describe}) diff --git a/src/lib/scheduling.lua b/src/lib/scheduling.lua index ef4bf81b28..e3a4769e01 100644 --- a/src/lib/scheduling.lua +++ b/src/lib/scheduling.lua @@ -21,6 +21,7 @@ local scheduling_opts = { jit_opt = {default=default_jit_opt}, -- JIT options. cpu = {}, -- CPU index (integer). real_time = {}, -- Boolean. + max_packets = {}, -- Positive integer. ingress_drop_monitor = {}, -- Action string: one of 'flush' or 'warn'. profile = {default=true}, -- Boolean. busywait = {default=true}, -- Boolean. @@ -54,6 +55,10 @@ function sched_apply.real_time (real_time) end end +function sched_apply.max_packets (max_packets) + packet.initialize(max_packets) +end + function sched_apply.busywait (busywait) engine.busywait = busywait end