Skip to content

Commit

Permalink
No commit message
Browse files Browse the repository at this point in the history
  • Loading branch information
aingerson committed Aug 29, 2023
1 parent 1fdb698 commit 6573cd0
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 29 deletions.
2 changes: 1 addition & 1 deletion include/ofi_atomic_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ extern "C" {
#define OFI_DECLARE_ATOMIC_Q(entrytype, name) \
struct name ## _entry { \
ofi_atomic64_t seq; \
bool noop; \
uint64_t noop; \
entrytype buf; \
}; \
struct name { \
Expand Down
23 changes: 16 additions & 7 deletions prov/shm/src/smr_fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,30 @@
* Writes are protected with atomics.
* Reads are not protected and assumed to be protected with user locking
*/
struct smr_fifo {

struct smr_fifo_hdr {
int64_t size;
int64_t size_mask;
int64_t read_pos;
};

struct smr_fifo {
struct smr_fifo_hdr hdr;

uint8_t pad[64 - sizeof(struct smr_fifo_hdr)];

ofi_atomic64_t write_pos;
ofi_atomic64_t free;

uintptr_t entries[];
};

static inline void smr_fifo_init(struct smr_fifo *queue, uint64_t size)
{
assert(size == roundup_power_of_two(size));
queue->size = size;
queue->size_mask = size - 1;
queue->read_pos = 0;
queue->hdr.size = size;
queue->hdr.size_mask = size - 1;
queue->hdr.read_pos = 0;
ofi_atomic_initialize64(&queue->write_pos, 0);
ofi_atomic_initialize64(&queue->free, size);
memset(queue->entries, 0, sizeof(*queue->entries) * size);
Expand All @@ -77,7 +86,7 @@ static inline int smr_fifo_commit(struct smr_fifo *queue, uintptr_t val)
break;
}
write = ofi_atomic_inc64(&queue->write_pos) - 1;//TODO add atomic to remove sub
queue->entries[write & queue->size_mask] = val;
queue->entries[write & queue->hdr.size_mask] = val;
return FI_SUCCESS;
}

Expand All @@ -86,11 +95,11 @@ static inline uintptr_t smr_fifo_read(struct smr_fifo *queue)
{
uintptr_t val;

val = queue->entries[queue->read_pos & queue->size_mask];
val = queue->entries[queue->hdr.read_pos & queue->hdr.size_mask];
if (!val)
return 0;

queue->entries[queue->read_pos++ & queue->size_mask] = 0;
queue->entries[queue->hdr.read_pos++ & queue->hdr.size_mask] = 0;
ofi_atomic_inc64(&queue->free);
return val;
}
Expand Down
36 changes: 34 additions & 2 deletions prov/shm/src/smr_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,22 +108,44 @@ size_t smr_calculate_size_offsets(size_t tx_count, size_t rx_count,
tx_size = roundup_power_of_two(tx_count);
rx_size = roundup_power_of_two(rx_count);

printf("size of smr %lu\n", sizeof(struct smr_region));
/* Align cmd_queue offset to 128-bit boundary. */
cmd_queue_offset = ofi_get_aligned_size(sizeof(struct smr_region), 16);
//cmd_queue_offset = sizeof(struct smr_region);
cmd_queue_offset = ofi_get_aligned_size(sizeof(struct smr_region), 64);
conn_queue_offset = cmd_queue_offset + sizeof(struct smr_fifo) +
sizeof(uintptr_t) * rx_size;
conn_queue_offset = ofi_get_aligned_size(conn_queue_offset, 64);
cmd_pool_offset = conn_queue_offset + sizeof(struct smr_conn_queue) +
sizeof(struct smr_conn_req) * SMR_MAX_PEERS;
cmd_pool_offset = ofi_get_aligned_size(cmd_pool_offset, 64);

inject_pool_offset = cmd_pool_offset +
freestack_size(sizeof(struct smr_cmd), tx_size); //double for RMA?
inject_pool_offset = ofi_get_aligned_size(inject_pool_offset, 64);

sar_pool_offset = inject_pool_offset +
freestack_size(sizeof(struct smr_inject_buf), rx_size);
sar_pool_offset = ofi_get_aligned_size(sar_pool_offset, 64);

peer_data_offset = sar_pool_offset +
freestack_size(sizeof(struct smr_sar_buf), SMR_MAX_PEERS);
peer_data_offset = ofi_get_aligned_size(peer_data_offset, 64);

ep_name_offset = peer_data_offset + sizeof(struct smr_peer_data) *
SMR_MAX_PEERS;
ep_name_offset = ofi_get_aligned_size(ep_name_offset, 64);

sock_name_offset = ep_name_offset + SMR_NAME_MAX;
sock_name_offset = ofi_get_aligned_size(sock_name_offset, 64);

printf("command queue offset %lu\n", cmd_queue_offset % 64);
printf("conn queue offset %lu\n", conn_queue_offset % 64);
printf("inject pool offset %lu\n", inject_pool_offset % 64);
printf("sar pool offset %lu\n", sar_pool_offset % 64);
printf("peer data offset %lu\n", peer_data_offset % 64);

printf("ep name offset %lu\n", ep_name_offset % 64);
printf("sock name offset %lu\n", sock_name_offset % 64);

if (cq_offset)
*cq_offset = cmd_queue_offset;
Expand Down Expand Up @@ -273,7 +295,7 @@ int smr_create(const struct fi_provider *prov, struct smr_map *map,

*smr = mapped_addr;

ofi_atomic_initialize32(&(*smr)->signal, 0);
//ofi_atomic_initialize32(&(*smr)->signal, 0);

(*smr)->map = map;
(*smr)->version = SMR_VERSION;
Expand Down Expand Up @@ -327,6 +349,16 @@ int smr_create(const struct fi_provider *prov, struct smr_map *map,

/* Must be set last to signal full initialization to peers */
(*smr)->pid = getpid();

// printf("addr version\t%p\n", &(*smr)->version);
// printf("addr resv\t%p\n", &(*smr)->resv);
// printf("addr flags\t%p\n", &(*smr)->flags);
// printf("addr pid\t%p\n", &(*smr)->pid);
// printf("addr cma_cap_peer\t%p\n", &(*smr)->cma_cap_peer);
// printf("addr cma_cap_self\t%p\n", &(*smr)->cma_cap_self);
// printf("addr xpmem_cap_self\t%p\n", &(*smr)->xpmem_cap_self);
// printf("addr max_sar_buf_per_peer\t%p\n", &(*smr)->max_sar_buf_per_peer);

return 0;

remove:
Expand Down
45 changes: 26 additions & 19 deletions prov/shm/src/smr_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ struct smr_peer_data {
struct smr_addr addr;
bool sar;//1 for in progress, 0 for no current sar
uint32_t name_sent;
//struct xpmem_client xpmem;
struct xpmem_client xpmem;
uintptr_t local_region;
} __attribute__ ((aligned(16)));

Expand Down Expand Up @@ -235,30 +235,37 @@ struct smr_region {
uint8_t version;
uint8_t resv;
uint16_t flags;
int pid;
int pid; //8

uint8_t cma_cap_peer;
uint8_t cma_cap_self;
uint32_t max_sar_buf_per_peer;
// uint8_t xpmem_cap_self;
// struct xpmem_pinfo xpmem_self;
// struct xpmem_pinfo xpmem_peer;
void *base_addr;

ofi_atomic32_t signal;
uint8_t xpmem_cap_self;
uint8_t pad1;
uint32_t max_sar_buf_per_peer; //8

struct smr_map *map;
void *base_addr; //8

size_t total_size;
struct smr_map *map; //8

uint32_t total_size;
/* offsets from start of smr_region */
size_t cmd_queue_offset;
size_t conn_queue_offset;
size_t cmd_pool_offset;
size_t inject_pool_offset;
size_t sar_pool_offset;//todo move sar to sender?
size_t peer_data_offset;
size_t name_offset;
size_t sock_name_offset;
uint32_t cmd_queue_offset;
uint32_t conn_queue_offset;
uint32_t cmd_pool_offset;

uint32_t inject_pool_offset;
uint32_t sar_pool_offset;//todo move sar to sender?
uint32_t peer_data_offset;
uint32_t name_offset;

uint32_t sock_name_offset;
uint32_t pad_explicit;

//uint8_t pad2[64];
//uint32_t fail;//on icx

//struct xpmem_pinfo xpmem_self;//32
//struct xpmem_pinfo xpmem_peer;//32
};

struct smr_inject_buf {
Expand Down

0 comments on commit 6573cd0

Please sign in to comment.