diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 17fb575b81..1972ab891b 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -448,7 +448,7 @@ jobs: directives: dtrace - image: ghcr.io/ponylang/ponyc-ci-x86-64-unknown-linux-ubuntu24.04-builder:20250115 debugger: lldb - directives: pool_memalign + directives: pool_message_passing - image: ghcr.io/ponylang/ponyc-ci-x86-64-unknown-linux-ubuntu24.04-builder:20250115 debugger: lldb directives: runtimestats diff --git a/CMakeLists.txt b/CMakeLists.txt index ac77e995b6..386f6cb236 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -104,6 +104,11 @@ if(PONY_USE_POOL_MEMALIGN) add_compile_options(-DUSE_POOL_MEMALIGN) endif() +if(PONY_USE_POOL_MESSAGE_PASSING) + set(PONY_OUTPUT_SUFFIX "${PONY_OUTPUT_SUFFIX}-pool_message_passing") + add_compile_options(-DUSE_POOL_MESSAGE_PASSING) +endif() + # LibPonyC tests assume that our outputs are two directories above the root directory. set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_DEBUG "${CMAKE_BINARY_DIR}/../debug${PONY_OUTPUT_SUFFIX}") set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_RELEASE "${CMAKE_BINARY_DIR}/../release${PONY_OUTPUT_SUFFIX}") diff --git a/Makefile b/Makefile index 84e619c1bd..a8b436e6b9 100644 --- a/Makefile +++ b/Makefile @@ -148,6 +148,8 @@ define USE_CHECK PONY_USES += -DPONY_USE_RUNTIMESTATS_MESSAGES=true else ifeq ($1,pool_memalign) PONY_USES += -DPONY_USE_POOL_MEMALIGN=true + else ifeq ($1,pool_message_passing) + PONY_USES += -DPONY_USE_POOL_MESSAGE_PASSING=true else $$(error ERROR: Unknown use option specified: $1) endif diff --git a/benchmark/libponyrt/mem/pool.cc b/benchmark/libponyrt/mem/pool.cc index 8e7ba7e132..fb673692f0 100644 --- a/benchmark/libponyrt/mem/pool.cc +++ b/benchmark/libponyrt/mem/pool.cc @@ -4,13 +4,6 @@ #define LARGE_ALLOC ponyint_pool_adjust_size(POOL_MAX + 1) -/// When we mmap, pull at least this many bytes. -#ifdef PLATFORM_IS_ILP32 -#define POOL_MMAP (16 * 1024 * 1024) // 16 MB -#else -#define POOL_MMAP (128 * 1024 * 1024) // 128 MB -#endif - typedef char block_t[32]; class PoolBench: public ::benchmark::Fixture diff --git a/src/libponyrt/CMakeLists.txt b/src/libponyrt/CMakeLists.txt index f8f13fd10d..9ca34dfa69 100644 --- a/src/libponyrt/CMakeLists.txt +++ b/src/libponyrt/CMakeLists.txt @@ -40,6 +40,7 @@ set(_c_src mem/pagemap.c mem/pool.c mem/pool_memalign.c + mem/pool_message_passing.c options/options.c platform/ponyassert.c platform/threads.c diff --git a/src/libponyrt/mem/alloc.c b/src/libponyrt/mem/alloc.c index e2f5837358..fd1adcd7a5 100644 --- a/src/libponyrt/mem/alloc.c +++ b/src/libponyrt/mem/alloc.c @@ -1,6 +1,8 @@ #ifdef __linux__ #define _GNU_SOURCE #endif +#include "ponyassert.h" +#include "alloc.h" #include #include #include @@ -55,11 +57,90 @@ void* ponyint_virt_alloc(size_t bytes) return p; } +#ifdef POOL_USE_MESSAGE_PASSING +void* ponyint_virt_alloc_aligned(size_t size) +{ + // size must be a multiple of POOL_MMAP. + pony_assert((size & (POOL_MMAP - 1)) == 0); + + // NOTE: This can likely be made more efficient depending on the system to + // avoid the extra allocation/freeing to get alingment. + + // overallocate to ensure we can align. + size_t bytes = size + POOL_MMAP; + + void* p; + bool ok = true; + +#if defined(PLATFORM_IS_WINDOWS) + // This is not supported on Windows at the moment. + pony_assert(false); + p = VirtualAlloc(NULL, bytes, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); + if(p == NULL) + ok = false; +#elif defined(PLATFORM_IS_POSIX_BASED) +#if defined(PLATFORM_IS_LINUX) || defined(PLATFORM_IS_EMSCRIPTEN) + p = mmap(0, bytes, PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE, -1, 0); +#elif defined(PLATFORM_IS_MACOSX) + p = mmap(0, bytes, PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANON, -1, 0); +#elif defined(PLATFORM_IS_DRAGONFLY) + p = mmap(0, bytes, PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANON, -1, 0); +#elif defined(PLATFORM_IS_OPENBSD) + p = mmap(0, bytes, PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANON, -1, 0); +#elif defined(PLATFORM_IS_BSD) +#ifndef MAP_ALIGNED_SUPER +#define MAP_ALIGNED_SUPER 0 +#endif + p = mmap(0, bytes, PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANON | MAP_ALIGNED_SUPER, -1, 0); +#endif + if(p == MAP_FAILED) + ok = false; + + void* oldp = p; + + // Align the pointer to first POOL_MMAP multiple in allocation. + p = (void*)(((uintptr_t)p + POOL_MMAP) & ~(POOL_MMAP - 1)); + + // Free the memory before the aligned pointer. + ponyint_virt_free(oldp, ((uintptr_t)p) - ((uintptr_t)oldp)); + + // Free the memory after the aligned pointer region end. + void* endp = (void*)((uintptr_t)p + size); + ponyint_virt_free(endp, ((uintptr_t)oldp + bytes) - ((uintptr_t)endp)); + + // ensure that the pointer is correctly aligned and the size is correct. + pony_assert((((uintptr_t)endp) - ((uintptr_t)p)) == size); + pony_assert((((uintptr_t)p) & (POOL_MMAP - 1)) == 0); +#endif + + if(!ok) + { + perror("out of memory: "); + abort(); + } + + return p; +} +#endif + void ponyint_virt_free(void* p, size_t bytes) { #if defined(PLATFORM_IS_WINDOWS) VirtualFree(p, 0, MEM_RELEASE); #elif defined(PLATFORM_IS_POSIX_BASED) - munmap(p, bytes); + if (0 == bytes) + return; + + int r = munmap(p, bytes); + if(0 != r) + { + perror("unable to free memory: "); + abort(); + } #endif } diff --git a/src/libponyrt/mem/alloc.h b/src/libponyrt/mem/alloc.h index af5bd25473..f6f194def7 100644 --- a/src/libponyrt/mem/alloc.h +++ b/src/libponyrt/mem/alloc.h @@ -1,11 +1,21 @@ #ifndef mem_alloc_h #define mem_alloc_h +#include "pool.h" + /** * Allocates memory in the virtual address space. */ void* ponyint_virt_alloc(size_t bytes); +#ifdef POOL_USE_MESSAGE_PASSING +/** + * Allocates memory in the virtual address space aligned to POOL_MMAP. + * All allocations are required to be a multiple of POOL_MMAP. + */ +void* ponyint_virt_alloc_aligned(size_t bytes); +#endif + /** * Deallocates a chunk of memory that was previously allocated with * ponyint_virt_alloc. diff --git a/src/libponyrt/mem/pool.c b/src/libponyrt/mem/pool.c index 266d2b9e6f..c873b2dc60 100644 --- a/src/libponyrt/mem/pool.c +++ b/src/libponyrt/mem/pool.c @@ -30,17 +30,6 @@ #include #endif -/// When we mmap, pull at least this many bytes. -#ifdef PLATFORM_IS_ILP32 -# define POOL_MMAP (16 * 1024 * 1024) // 16 MB -#else -# ifdef PLATFORM_IS_WINDOWS -# define POOL_MMAP (16 * 1024 * 1024) // 16 MB -# else -# define POOL_MMAP (128 * 1024 * 1024) // 128 MB -# endif -#endif - /// An item on a per-size thread-local free list. typedef struct pool_item_t { diff --git a/src/libponyrt/mem/pool.h b/src/libponyrt/mem/pool.h index d294d16062..e73f1c3af5 100644 --- a/src/libponyrt/mem/pool.h +++ b/src/libponyrt/mem/pool.h @@ -8,15 +8,28 @@ #if defined(USE_POOL_MEMALIGN) # define POOL_USE_MEMALIGN +#elif defined(USE_POOL_MESSAGE_PASSING) +# define POOL_USE_MESSAGE_PASSING #else # define POOL_USE_DEFAULT #endif +/// When we mmap, pull at least this many bytes. +#ifdef PLATFORM_IS_ILP32 +# define POOL_MMAP (16 * 1024 * 1024) // 16 MB +#else +# ifdef PLATFORM_IS_WINDOWS +# define POOL_MMAP (16 * 1024 * 1024) // 16 MB +# else +# define POOL_MMAP (128 * 1024 * 1024) // 128 MB +# endif +#endif + PONY_EXTERN_C_BEGIN /* Because of the way free memory is reused as its own linked list container, * the minimum allocation size is 32 bytes for the default pool implementation - * and 16 bytes for the memalign pool implementation. + * and 16 bytes for the memalign and message passing pool implementations. */ #ifndef POOL_USE_DEFAULT @@ -101,6 +114,13 @@ size_t ponyint_pool_adjust_size(size_t size); #define POOL_SIZE(INDEX) \ ((size_t)1 << (POOL_MIN_BITS + INDEX)) +#ifdef POOL_USE_MESSAGE_PASSING +typedef struct pool_remote_allocs_t pool_remote_allocs_t; +pool_remote_allocs_t* ponyint_initialize_pool_message_passing(); +void ponyint_pool_gather_receive_remote_allocations(pool_remote_allocs_t* remote_allocs); +void ponyint_pool_receive_remote_allocations(pool_remote_allocs_t* remote_allocs); +#endif + #ifdef USE_RUNTIMESTATS #define POOL_ALLOC_SIZE(TYPE) \ POOL_SIZE(POOL_INDEX(sizeof(TYPE))) diff --git a/src/libponyrt/mem/pool_message_passing.c b/src/libponyrt/mem/pool_message_passing.c new file mode 100644 index 0000000000..529acb3614 --- /dev/null +++ b/src/libponyrt/mem/pool_message_passing.c @@ -0,0 +1,789 @@ +#include "pool.h" +#include "alloc.h" +#include "../ds/fun.h" +#include "../ds/hash.h" +#include "../sched/cpu.h" +#include "ponyassert.h" +#include +#include +#include +#include + +#include + +/// Allocations this size and above are aligned on this size. This is needed +/// so that the pagemap for the heap is aligned. +#define POOL_ALIGN_INDEX (POOL_ALIGN_BITS - POOL_MIN_BITS) +#define POOL_MMAP_MASK (~(POOL_MMAP - 1)) +#define POOL_MMAP_ALIGN(x) ((void*)(((uintptr_t)x) & POOL_MMAP_MASK)) + +#ifdef POOL_USE_MESSAGE_PASSING + +#if defined(PLATFORM_IS_WINDOWS) +pony_static_assert(false, "pool message passing is not currently implemented for windows!"); +#endif + +#ifdef USE_ADDRESS_SANITIZER +#include +#endif + +#ifdef USE_VALGRIND +#include +#include +#endif + + +static size_t memallocmap_hash(void* alloc) +{ + return ponyint_hash_ptr(alloc); +} + +static bool memallocmap_cmp(void* a, void* b) +{ + return a == b; +} + +DECLARE_HASHMAP(memallocmap, memallocmap_t, void); + +DEFINE_HASHMAP(memallocmap, memallocmap_t, void, memallocmap_hash, + memallocmap_cmp, NULL); + +/// An item on a per-size free list. +typedef struct pool_item_t +{ + struct pool_item_t* next; +} pool_item_t; + +/// A per-size thread-local free list header. +typedef struct pool_local_t +{ + pool_item_t* pool; + size_t length; + char* start; + char* end; +} pool_local_t; + +/// An item on a list of free blocks. +typedef struct pool_block_t +{ + struct pool_block_t* next; + size_t size; +} pool_block_t; + +/// A list of free blocks header. +typedef struct pool_block_header_t +{ + pool_block_t* head; + size_t total_size; + size_t largest_size; +} pool_block_header_t; + +#ifndef PONY_NDEBUG +// only used as part of a pony_assert +static size_t pool_sizeclasses[POOL_COUNT] = +{ + POOL_MIN << 0, + POOL_MIN << 1, + POOL_MIN << 2, + POOL_MIN << 3, + POOL_MIN << 4, + POOL_MIN << 5, + POOL_MIN << 6, + POOL_MIN << 7, + POOL_MIN << 8, + POOL_MIN << 9, + POOL_MIN << 10, + POOL_MIN << 11, + POOL_MIN << 12, + POOL_MIN << 13, + POOL_MIN << 14, + POOL_MIN << 15, + POOL_MIN << 16, +}; +#endif + +typedef struct pool_remote_allocs_t +{ + pool_item_t* pool_remote[POOL_COUNT]; + pool_block_t* pool_remote_blocks; +} pool_remote_allocs_t; + +static __pony_thread_local pool_local_t pool_local[POOL_COUNT]; +static __pony_thread_local pool_block_header_t pool_local_block_header; +static __pony_thread_local memallocmap_t memallocmap; +static __pony_thread_local void* low_virt_alloc = (void*)-1; +static __pony_thread_local void* high_virt_alloc = NULL; + +// include it after defining `pool_item_t`/`pool_block_t` +#include "pool_sorting.h" + +// check if the pointer is within the virtual allocation range or not +// if it is, it's possibly a local allocation, if not, it's definitely a remote allocation +// this is likely to be a less helpful check in non-numa environments +static bool possibly_a_local_allocation(void* p) +{ + return (p >= low_virt_alloc) && (p < high_virt_alloc); +} + +static bool is_a_local_allocation(void* p, bool* prev_alloc_pool_addr_is_local, void** prev_alloc_pool_addr) +{ + // round down to the nearest POOL_MMAP boundary + void* curr_alloc_pool_addr = POOL_MMAP_ALIGN(p); + + // fast path + // it's a local allocation because the previous allocation was in the same mmap'd range + if (*prev_alloc_pool_addr_is_local && *prev_alloc_pool_addr == curr_alloc_pool_addr) + { + *prev_alloc_pool_addr = curr_alloc_pool_addr; + *prev_alloc_pool_addr_is_local = true; + return true; + } + + // fast path + // it's a remote allocation because the pointer is definitely not a local allocation + if (!possibly_a_local_allocation(curr_alloc_pool_addr)) + { + *prev_alloc_pool_addr = curr_alloc_pool_addr; + *prev_alloc_pool_addr_is_local = false; + return false; + } + + // slow path + // it may or may not be a local allocation; need to check the memalloc map to be sure + + // check if it's in the memallocmap to confirm if it's a local allocation or not + size_t index = -1; + bool is_local = (NULL != memallocmap_get(&memallocmap, curr_alloc_pool_addr, &index)); + + *prev_alloc_pool_addr = curr_alloc_pool_addr; + *prev_alloc_pool_addr_is_local = is_local; + + return is_local; +} + +static void pool_block_insert(pool_block_t* block) +{ + pool_block_t* next = pool_local_block_header.head; + pool_block_t* prev = NULL; + + while(NULL != next) + { + if(block->size <= next->size) + break; + + prev = next; + next = next->next; + } + + block->next = next; + + if(NULL != prev) + prev->next = block; + else + pool_local_block_header.head = block; +} + +static void* pool_block_get(size_t size) +{ + pool_block_t* head_block = pool_local_block_header.head; + if(pool_local_block_header.largest_size >= size) + { + pool_block_t* block = head_block; + pool_block_t* prev = NULL; + + while(NULL != block) + { + if(block->size > size) + { + // Use size bytes from the end of the block. This allows us to keep the + // block info inside the block instead of using another data structure. + size_t rem = block->size - size; + block->size = rem; + pool_local_block_header.total_size -= size; + + if((NULL != prev) && (prev->size > block->size)) + { + // If we are now smaller than the previous block, move us forward in + // the list. + if(NULL == block->next) + pool_local_block_header.largest_size = prev->size; + + // Remove the block from the list. + if(NULL != prev) + prev->next = block->next; + else + pool_local_block_header.head = block->next; + + pool_block_insert(block); + } else if(NULL == block->next) { + pool_local_block_header.largest_size = rem; + } + + return (char*)block + rem; + } else if(block->size == size) { + if(NULL == block->next) + { + pool_local_block_header.largest_size = + (NULL == prev) ? 0 : prev->size; + } + + // Remove the block from the list. + if(NULL != prev) + prev->next = block->next; + else + pool_local_block_header.head = block->next; + + // Return the block pointer itself. + pool_local_block_header.total_size -= size; + return block; + } + + prev = block; + block = block->next; + } + + // If we didn't find any suitable block, something has gone really wrong. + pony_assert(false); + } + + // no block big enough available + return NULL; +} + +static void add_alloc_to_memallocmap(void* p, size_t size) +{ + // chop up the allocation into POOL_MMAP sized blocks and put them into the map + // this is so that we can keep track of the allocations and free them later + while (size > 0) + { + memallocmap_put(&memallocmap, p); + p = (char*)p + POOL_MMAP; + size -= POOL_MMAP; + } +} + +static void* pool_alloc_pages(size_t size) +{ + // Try to get a block from the pool_local_block_header + void* p = pool_block_get(size); + + if(NULL != p) + return p; + + // We have no free blocks big enough. + // make sure we allocate enough memory to satisfy the request rounded up to the next POOL_MMAP boundary + size_t alloc_size = (size + POOL_MMAP) & POOL_MMAP_MASK; + + pool_block_t* block = (pool_block_t*)ponyint_virt_alloc_aligned(alloc_size); + +#ifdef USE_ADDRESS_SANITIZER + ASAN_POISON_MEMORY_REGION(block, alloc_size); +#endif + + // track the virtual allocation range + if ((void*)block < low_virt_alloc) + low_virt_alloc = block; + + if ((void*)block + alloc_size > high_virt_alloc) + high_virt_alloc = (void*)block + alloc_size; + +#ifdef USE_ADDRESS_SANITIZER + ASAN_UNPOISON_MEMORY_REGION(block, sizeof(pool_block_t)); +#endif + + // add the whole virtual allocation into the pool_local_block_header + block->size = alloc_size; + block->next = NULL; + pool_block_insert(block); + pool_local_block_header.total_size += alloc_size; + if(pool_local_block_header.largest_size < alloc_size) + pool_local_block_header.largest_size = alloc_size; + + // put the allocation into the map (this will initialize it or potentially + // resize it and allocate from the block) + add_alloc_to_memallocmap(block, alloc_size); + + p = pool_block_get(size); + + pony_assert(NULL != p); + + return p; +} + +static void pool_free_pages(void* p, size_t size) +{ +#ifdef USE_ADDRESS_SANITIZER + ASAN_UNPOISON_MEMORY_REGION(p, sizeof(pool_block_t)); +#endif + + pool_block_t* block = (pool_block_t*)p; + block->next = NULL; + block->size = size; + + pool_block_insert(block); + + pool_local_block_header.total_size += size; + if(pool_local_block_header.largest_size < size) + pool_local_block_header.largest_size = size; +} + +static void* pool_get(size_t index) +{ + // Try per-size thread-local free list first. + pool_local_t* local = &pool_local[index]; + pool_item_t* p = local->pool; + + if(NULL != p) + { + local->pool = p->next; + local->length--; + return p; + } + + size_t sizeclass = POOL_MIN << index; + + pony_assert(sizeclass == pool_sizeclasses[index]); + + if(sizeclass < POOL_ALIGN) + { + // Check our per-size local-local free block. + if(local->start < local->end) + { + void* p = local->start; + local->start += sizeclass; + return p; + } + + // Use the pool allocator to get a block POOL_ALIGN bytes in size + // and treat it as a free block. + // ?? do we want to take the block from the pool_local_block_header only when we're planning on splitting it into tiny pieces? + char* mem = (char*)pool_get(POOL_ALIGN_INDEX); + local->start = mem + sizeclass; + local->end = mem + POOL_ALIGN; + return mem; + } + + // Pull size bytes from the list of free blocks. Don't use a size-specific + // free block. + return pool_alloc_pages(sizeclass); +} + +void* ponyint_pool_alloc(size_t index) +{ + void* p = pool_get(index); + +#ifdef USE_ADDRESS_SANITIZER + // TODO: if we know the original size requested, we can only unpoison that + // instead of the whole sizeclass block size + ASAN_UNPOISON_MEMORY_REGION(p, POOL_MIN << index); +#endif + +#ifdef USE_VALGRIND + VALGRIND_HG_CLEAN_MEMORY(p, POOL_SIZE(index)); + VALGRIND_MALLOCLIKE_BLOCK(p, POOL_SIZE(index), 0, 0); +#endif + + return p; +} + +void ponyint_pool_free(size_t index, void* p) +{ +#ifdef USE_VALGRIND + VALGRIND_HG_CLEAN_MEMORY(p, POOL_SIZE(index)); +#endif + + pony_assert(index < POOL_COUNT); + +#ifdef USE_ADDRESS_SANITIZER + ASAN_POISON_MEMORY_REGION(p, POOL_MIN << index); + ASAN_UNPOISON_MEMORY_REGION(p, sizeof(pool_item_t)); +#endif + + pool_item_t* lp = (pool_item_t*)p; + pool_local_t* local = &pool_local[index]; + lp->next = local->pool; + local->pool = lp; + local->length++; + +#ifdef USE_VALGRIND + VALGRIND_FREELIKE_BLOCK(p, 0); +#endif +} + +static void* pool_alloc_size(size_t size) +{ + void* p = pool_alloc_pages(size); + +#ifdef USE_VALGRIND + VALGRIND_HG_CLEAN_MEMORY(p, size); + VALGRIND_MALLOCLIKE_BLOCK(p, size, 0, 0); +#endif + + return p; +} + +void* ponyint_pool_alloc_size(size_t size) +{ + size_t index = ponyint_pool_index(size); + + if(index < POOL_COUNT) + return ponyint_pool_alloc(index); + + size = ponyint_pool_adjust_size(size); + void* p = pool_alloc_size(size); + +#ifdef USE_ADDRESS_SANITIZER + // TODO: if we know the original size requested, we can only unpoison that + // instead of the whole sizeclass block size + ASAN_UNPOISON_MEMORY_REGION(p, size); +#endif + + return p; +} + +static void pool_free_size(size_t size, void* p) +{ +#ifdef USE_VALGRIND + VALGRIND_HG_CLEAN_MEMORY(p, size); +#endif + +#ifdef USE_ADDRESS_SANITIZER + ASAN_POISON_MEMORY_REGION(p, size); +#endif + + pool_free_pages(p, size); + + +#ifdef USE_VALGRIND + VALGRIND_FREELIKE_BLOCK(p, 0); +#endif +} + +void ponyint_pool_free_size(size_t size, void* p) +{ + size_t index = ponyint_pool_index(size); + + if(index < POOL_COUNT) + return ponyint_pool_free(index, p); + + size = ponyint_pool_adjust_size(size); + pool_free_size(size, p); +} + +void* ponyint_pool_realloc_size(size_t old_size, size_t new_size, void* p) +{ + // Can only reuse the old pointer if the old index/adjusted size is equal to + // the new one, not greater. + + if(NULL == p) + return ponyint_pool_alloc_size(new_size); + + size_t old_index = ponyint_pool_index(old_size); + size_t new_index = ponyint_pool_index(new_size); + size_t old_adj_size = 0; + + void* new_p; + + if(new_index < POOL_COUNT) + { + if(old_index == new_index) + return p; + + new_p = ponyint_pool_alloc(new_index); + } else { + size_t new_adj_size = ponyint_pool_adjust_size(new_size); + + if(old_index >= POOL_COUNT) + { + old_adj_size = ponyint_pool_adjust_size(old_size); + + if(old_adj_size == new_adj_size) + return p; + } + + new_p = pool_alloc_size(new_adj_size); + +#ifdef USE_ADDRESS_SANITIZER + // TODO: if we know the original size requested, we can only unpoison that + // instead of the whole sizeclass block size + ASAN_UNPOISON_MEMORY_REGION(new_p, new_adj_size); +#endif + } + + memcpy(new_p, p, old_size < new_size ? old_size : new_size); + + if(old_index < POOL_COUNT) + ponyint_pool_free(old_index, p); + else + pool_free_size(old_adj_size, p); + + return new_p; +} + +void ponyint_pool_thread_cleanup() +{ + // TODO: in theory we should be able to free all the virtual allocations we've made + // to reset everything for a clean shutdown +} + +static pool_item_t* gather_remote_allocs_to_send(pool_local_t* local) +{ + pool_item_t* p = local->pool; + pool_item_t* next = NULL; + pool_item_t* prev = NULL; + pool_item_t* allocs_to_send = NULL; + void* prev_alloc_pool_addr = NULL; + bool prev_alloc_pool_addr_is_local = false; + + while (NULL != p) + { + next = p->next; + + if (is_a_local_allocation(p, &prev_alloc_pool_addr_is_local, &prev_alloc_pool_addr)) + { + prev = p; + } else { + if (NULL == prev) + { + local->pool = next; + } else { + prev->next = next; + } + + // add the allocation to the allocs_to_send + p->next = allocs_to_send; + allocs_to_send = p; + } + + p = next; + } + return allocs_to_send; +} + +static pool_block_t* gather_remote_blocks_to_send() +{ + pool_block_t* block = pool_local_block_header.head; + pool_block_t* next = NULL; + pool_block_t* prev = NULL; + pool_block_t* blocks_to_send = NULL; + void* prev_alloc_pool_addr = NULL; + bool prev_alloc_pool_addr_is_local = false; + + while(NULL != block) + { + next = block->next; + + if (is_a_local_allocation(block, &prev_alloc_pool_addr_is_local, &prev_alloc_pool_addr)) + { + prev = block; + } else { + if(NULL == block->next) + { + pool_local_block_header.largest_size = + (NULL == prev) ? 0 : prev->size; + } + + pool_local_block_header.total_size -= block->size; + + // Remove the block from the list. + if(NULL != prev) + prev->next = next; + else + pool_local_block_header.head = next; + + block->next = blocks_to_send; + blocks_to_send = block; + } + + block = next; + } + + return blocks_to_send; +} + +void ponyint_pool_receive_remote_allocations(pool_remote_allocs_t* remote_allocs) +{ + for (int i = 0; i < POOL_COUNT; i++) + { + pool_item_t* p = remote_allocs->pool_remote[i]; + + if (NULL == p) + continue; + + while (NULL != p && NULL != p->next) + { + p = p->next; + } + + pool_local_t* local = &pool_local[i]; + p->next = local->pool; + local->pool = remote_allocs->pool_remote[i]; + + remote_allocs->pool_remote[i] = NULL; + } + + pool_block_t* block = remote_allocs->pool_remote_blocks; + pool_block_t* next = NULL; + void* prev_alloc_pool_addr = NULL; + void* temp_alloc_pool_addr = NULL; + bool prev_alloc_pool_addr_is_local = false; + bool temp_alloc_pool_addr_is_local = false; + + while(NULL != block) + { + if (is_a_local_allocation(block, &prev_alloc_pool_addr_is_local, &prev_alloc_pool_addr)) + { + temp_alloc_pool_addr = prev_alloc_pool_addr; + temp_alloc_pool_addr_is_local = prev_alloc_pool_addr_is_local; + + if(block->size + block == block->next && is_a_local_allocation(block->next, &temp_alloc_pool_addr_is_local, &temp_alloc_pool_addr)) + { + // coalesce the blocks if they're adjacent to each other and both local allocations + block->size += block->next->size; + block->next = block->next->next; + } else { + // insert the block if they cannot be coalesced + next = block->next; + + pool_block_insert(block); + + pool_local_block_header.total_size += block->size; + if(pool_local_block_header.largest_size < block->size) + pool_local_block_header.largest_size = block->size; + + block = next; + } + } else { + // don't coalesce blocks that don't belong to this scheduler but insert them into the pool for use + next = block->next; + + pool_block_insert(block); + + pool_local_block_header.total_size += block->size; + if(pool_local_block_header.largest_size < block->size) + pool_local_block_header.largest_size = block->size; + + block = next; + } + } + + remote_allocs->pool_remote_blocks = NULL; +} + +void ponyint_pool_gather_receive_remote_allocations(pool_remote_allocs_t* remote_allocs) +{ + // scan the remote allocations and add them to the local pool if they're in the virtual allocation range for this thread + for (int i = 0; i < POOL_COUNT; i++) + { + pool_local_t* local = &pool_local[i]; + + // get local ones that need sending + pool_item_t* allocs_to_send = gather_remote_allocs_to_send(local); + + pool_item_t* p = remote_allocs->pool_remote[i]; + pool_item_t* next = NULL; + pool_item_t* prev = NULL; + void* prev_alloc_pool_addr = NULL; + bool prev_alloc_pool_addr_is_local = false; + + while (NULL != p) + { + next = p->next; + + if (is_a_local_allocation(p, &prev_alloc_pool_addr_is_local, &prev_alloc_pool_addr)) + { + if (NULL == prev) + { + remote_allocs->pool_remote[i] = next; + } else { + prev->next = next; + } + + p->next = local->pool; + local->pool = p; + local->length++; + } else { + prev = p; + } + + p = next; + } + + // append allocs to send to the end of the remote_allocs list + if (NULL == prev) + { + remote_allocs->pool_remote[i] = allocs_to_send; + } else { + prev->next = allocs_to_send; + } + + // sort by address before sending so that it's more efficient to process them on the receiving side + remote_allocs->pool_remote[i] = sort_pool_item_list_by_address(remote_allocs->pool_remote[i]); + } + + pool_block_t* remote_blocks = gather_remote_blocks_to_send(); + + pool_block_t* block = remote_allocs->pool_remote_blocks; + pool_block_t* next = NULL; + pool_block_t* prev = NULL; + void* prev_alloc_pool_addr = NULL; + void* temp_alloc_pool_addr = NULL; + bool prev_alloc_pool_addr_is_local = false; + bool temp_alloc_pool_addr_is_local = false; + + while(NULL != block) + { + if (is_a_local_allocation(block, &prev_alloc_pool_addr_is_local, &prev_alloc_pool_addr)) + { + temp_alloc_pool_addr = prev_alloc_pool_addr; + temp_alloc_pool_addr_is_local = prev_alloc_pool_addr_is_local; + + if(block->size + block == block->next && is_a_local_allocation(block->next, &temp_alloc_pool_addr_is_local, &temp_alloc_pool_addr)) + { + // coalesce the blocks if they're adjacent to each other and both local allocations + block->size += block->next->size; + block->next = block->next->next; + } else { + // insert the block if they cannot be coalesced + next = block->next; + + if(NULL == prev) + { + remote_allocs->pool_remote_blocks = next; + } else { + prev->next = next; + } + + pool_block_insert(block); + + pool_local_block_header.total_size += block->size; + if(pool_local_block_header.largest_size < block->size) + pool_local_block_header.largest_size = block->size; + + block = next; + } + } else { + next = block->next; + prev = block; + block = next; + } + } + + // append allocs to send to the end of the remote_allocs list + if (NULL == prev) + { + remote_allocs->pool_remote_blocks = remote_blocks; + } else { + prev->next = remote_blocks; + } + + // sort by address before sending so that it's more efficient to process them on the receiving side + remote_allocs->pool_remote_blocks = sort_pool_block_list_by_address(remote_allocs->pool_remote_blocks); +} + +pool_remote_allocs_t* ponyint_initialize_pool_message_passing() +{ + return POOL_ALLOC(pool_remote_allocs_t); +} + +#endif diff --git a/src/libponyrt/mem/pool_sorting.h b/src/libponyrt/mem/pool_sorting.h new file mode 100644 index 0000000000..1deb17d213 --- /dev/null +++ b/src/libponyrt/mem/pool_sorting.h @@ -0,0 +1,208 @@ +/* + * Shamelessly stolen/adapted from Simon Tatham from: + * https://www.chiark.greenend.org.uk/~sgtatham/algorithms/listsort.c + * + * This file is copyright 2001 Simon Tatham. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL SIMON TATHAM BE LIABLE FOR + * ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +static pool_item_t* sort_pool_item_list_by_address(pool_item_t *list) +{ + pool_item_t *p, *q, *e, *tail; + int32_t insize, nmerges, psize, qsize, i; + + /* + * Silly special case: if `list' was passed in as NULL, return + * NULL immediately. + */ + if (NULL == list) + return NULL; + + insize = 1; + + while (true) + { + p = list; + list = NULL; + tail = NULL; + + nmerges = 0; /* count number of merges we do in this pass */ + + while (NULL != p) + { + nmerges++; /* there exists a merge to be done */ + /* step `insize' places along from p */ + q = p; + psize = 0; + for (i = 0; i < insize; i++) + { + psize++; + q = q->next; + if (NULL == q) + break; + } + + /* if q hasn't fallen off end, we have two lists to merge */ + qsize = insize; + + /* now we have two lists; merge them */ + while (psize > 0 || (qsize > 0 && (NULL != q))) + { + /* decide whether next element of merge comes from p or q */ + if (psize == 0) + { + /* p is empty; e must come from q. */ + e = q; + q = q->next; + qsize--; + } else if (qsize == 0 || !q) { + /* q is empty; e must come from p. */ + e = p; + p = p->next; + psize--; + } else if (p <= q) { + /* First element of p is lower (or same); + * e must come from p. */ + e = p; + p = p->next; + psize--; + } else { + /* First element of q is lower; e must come from q. */ + e = q; + q = q->next; + qsize--; + } + + /* add the next element to the merged list */ + if (NULL != tail) + tail->next = e; + else + list = e; + + tail = e; + } + + /* now p has stepped `insize' places along, and q has too */ + p = q; + } + + tail->next = NULL; + + /* If we have done only one merge, we're finished. */ + if (nmerges <= 1) /* allow for nmerges==0, the empty list case */ + return list; + + /* Otherwise repeat, merging lists twice the size */ + insize *= 2; + } +} + +static pool_block_t* sort_pool_block_list_by_address(pool_block_t *list) +{ + pool_block_t *p, *q, *e, *tail; + int32_t insize, nmerges, psize, qsize, i; + + /* + * Silly special case: if `list' was passed in as NULL, return + * NULL immediately. + */ + if (NULL == list) + return NULL; + + insize = 1; + + while (true) + { + p = list; + list = NULL; + tail = NULL; + + nmerges = 0; /* count number of merges we do in this pass */ + + while (NULL != p) + { + nmerges++; /* there exists a merge to be done */ + /* step `insize' places along from p */ + q = p; + psize = 0; + for (i = 0; i < insize; i++) + { + psize++; + q = q->next; + if (NULL == q) + break; + } + + /* if q hasn't fallen off end, we have two lists to merge */ + qsize = insize; + + /* now we have two lists; merge them */ + while (psize > 0 || (qsize > 0 && (NULL != q))) + { + /* decide whether next element of merge comes from p or q */ + if (psize == 0) + { + /* p is empty; e must come from q. */ + e = q; + q = q->next; + qsize--; + } else if (qsize == 0 || !q) { + /* q is empty; e must come from p. */ + e = p; + p = p->next; + psize--; + } else if (p <= q) { + /* First element of p is lower (or same); + * e must come from p. */ + e = p; + p = p->next; + psize--; + } else { + /* First element of q is lower; e must come from q. */ + e = q; + q = q->next; + qsize--; + } + + /* add the next element to the merged list */ + if (NULL != tail) + tail->next = e; + else + list = e; + + tail = e; + } + + /* now p has stepped `insize' places along, and q has too */ + p = q; + } + + tail->next = NULL; + + /* If we have done only one merge, we're finished. */ + if (nmerges <= 1) /* allow for nmerges==0, the empty list case */ + return list; + + /* Otherwise repeat, merging lists twice the size */ + insize *= 2; + } +} \ No newline at end of file diff --git a/src/libponyrt/sched/scheduler.c b/src/libponyrt/sched/scheduler.c index cb5a7c40a6..2c7d8454d4 100644 --- a/src/libponyrt/sched/scheduler.c +++ b/src/libponyrt/sched/scheduler.c @@ -20,6 +20,10 @@ #define PONY_SCHED_BLOCK_THRESHOLD 1000000 +#ifdef POOL_USE_MESSAGE_PASSING +#define POOL_ALLOC_INTERVAL 2000000000 +#endif + static DECLARE_THREAD_FN(run_thread); typedef enum @@ -31,7 +35,10 @@ typedef enum SCHED_TERMINATE = 40, SCHED_UNMUTE_ACTOR = 50, SCHED_NOISY_ASIO = 51, - SCHED_UNNOISY_ASIO = 52 + SCHED_UNNOISY_ASIO = 52, +#ifdef POOL_USE_MESSAGE_PASSING + SCHED_POOL_ALLOCS = 60 +#endif } sched_msg_t; // Scheduler global data. @@ -52,6 +59,11 @@ static PONY_ATOMIC(bool) pinned_actor_scheduler_suspended_check; static scheduler_t* pinned_actor_scheduler; static __pony_thread_local scheduler_t* this_scheduler; +#ifdef POOL_USE_MESSAGE_PASSING +static pool_remote_allocs_t* pool_remote_allocs; +static uint64_t last_pool_alloc_share_tsc; +#endif + #if defined(USE_SCHEDULER_SCALING_PTHREADS) static pthread_mutex_t sched_mut; @@ -463,6 +475,55 @@ static void handle_sched_unblock(scheduler_t* sched) pony_assert(sched->block_count <= scheduler_count); } +#ifdef POOL_USE_MESSAGE_PASSING +static bool maybe_send_pool_message_passing_allocs(scheduler_t* sched, pool_remote_allocs_t* remote_allocs) +{ + // only scheduler 0 should be calling this function + pony_assert(0 == sched->index); + + uint64_t current_tsc = ponyint_cpu_tick(); + + // at least approximately 1000 milliseconds has passed since the last time + if(POOL_ALLOC_INTERVAL < ponyint_cpu_tick_diff(last_pool_alloc_share_tsc, current_tsc)) + { + last_pool_alloc_share_tsc = current_tsc; + + uint32_t current_active_scheduler_count = get_active_scheduler_count(); + bool is_pinned_actor_scheduler_suspended = atomic_load_explicit(&pinned_actor_scheduler_suspended, memory_order_relaxed); + + if(1 < current_active_scheduler_count || !is_pinned_actor_scheduler_suspended) + { + // only send allocs if there are other schedulers awake + + // tell the pool message passing implementation to do whatever it needs to + // for sending pool allocations back to owning scheduler threads + ponyint_pool_gather_receive_remote_allocations(remote_allocs); + + if(1 < current_active_scheduler_count) + { + // send the allocs to scheduler 1 if it is awake + send_msg(sched->index, 1, SCHED_POOL_ALLOCS, (intptr_t)remote_allocs); + + // make sure scheduler 1 is awake in case it went to sleep in the meantime + ponyint_sched_maybe_wakeup(sched->index, 2); + } else { + // send the allocs to the pinned actor thread if scheduler 1 is not awake (or doesn't exist) + send_msg_pinned_actor_thread(sched->index, SCHED_POOL_ALLOCS, (intptr_t)remote_allocs); + + // make sure pinned actor thread is awake in case it went to sleep in the meantime + wake_suspended_pinned_actor_thread(); + } + + // sent the allocs + return true; + } + } + + // didn't send the allocs + return false; +} +#endif + static bool read_msg(scheduler_t* sched, pony_actor_t* actor) { #ifdef USE_RUNTIMESTATS @@ -563,6 +624,53 @@ static bool read_msg(scheduler_t* sched, pony_actor_t* actor) break; } +#ifdef POOL_USE_MESSAGE_PASSING + case SCHED_POOL_ALLOCS: + { + pony_assert(PONY_UNKNOWN_SCHEDULER_INDEX != sched->index); + // tell the pool message passing implementation to do whatever it needs to + // for receiving pool allocations from other scheduler threads + pool_remote_allocs_t* remote_allocs = (pool_remote_allocs_t*)m->i; + if (0 != sched->index) + { + // if we're not scheduler 0, we need to keep passing the remote_allocs along + // after adding our own allocations to it + ponyint_pool_gather_receive_remote_allocations(remote_allocs); + + uint32_t current_active_scheduler_count = get_active_scheduler_count(); + uint32_t next_sched_index = (sched->index + 1) % current_active_scheduler_count; + bool is_pinned_actor_scheduler_suspended = atomic_load_explicit(&pinned_actor_scheduler_suspended, memory_order_relaxed); + + if(0 == next_sched_index && !is_pinned_actor_scheduler_suspended) + { + // send the allocs to the pinned actor thread if we would otherwise send to scheduler 0 + send_msg_pinned_actor_thread(sched->index, SCHED_POOL_ALLOCS, (intptr_t)remote_allocs); + + // make sure pinned actor thread is awake in case it went to sleep in the meantime + wake_suspended_pinned_actor_thread(); + } else { + // send the allocs to the next active scheduler (don't want to send to + // a sleeping scheduler because we don't want to wake them up only for this) + send_msg(sched->index, next_sched_index, SCHED_POOL_ALLOCS, (intptr_t)remote_allocs); + + // make sure the next scheduler is awake in case it went to sleep in the meantime + ponyint_sched_maybe_wakeup(sched->index, next_sched_index + 1); + } + } else { + // if we're scheduler 0, we send again only if it's been long enough + if(!maybe_send_pool_message_passing_allocs(sched, remote_allocs)) + { + // didn't send allocs to other schedulers + // need to add the remote_allocs to the local pool first + // and then wait until it's time to send again + ponyint_pool_receive_remote_allocations(remote_allocs); + pool_remote_allocs = remote_allocs; + } + } + break; + } +#endif + default: {} } } @@ -1095,6 +1203,10 @@ static void run(scheduler_t* sched) if(sched->index == 0) { pause_cycle_detection = false; last_cd_tsc = 0; +#ifdef POOL_USE_MESSAGE_PASSING + last_pool_alloc_share_tsc = ponyint_cpu_tick(); + pool_remote_allocs = ponyint_initialize_pool_message_passing(); +#endif } pony_actor_t* actor = pop_global(sched); @@ -1152,6 +1264,20 @@ static void run(scheduler_t* sched) // but is necessary in case the signal to wake a thread was missed signal_suspended_threads(current_active_scheduler_count, sched->index); } + +#ifdef POOL_USE_MESSAGE_PASSING + // we only want to do pool message passing shenanigans if we haven't already + if (NULL != pool_remote_allocs) + { + if(maybe_send_pool_message_passing_allocs(sched, pool_remote_allocs)) + { + // sent the allocations to other schedulers + // set to null so we don't try to send the same allocs again + // and as a way to know we're waiting to receive allocs back from other schedulers + pool_remote_allocs = NULL; + } + } +#endif } // In response to reading a message, we might have unmuted an actor and @@ -1215,7 +1341,7 @@ static void run(scheduler_t* sched) // the extra scheduler threads would keep being woken up and then go back // to sleep over and over again. if(ponyint_mutemap_size(&sched->mute_mapping) > 0) - ponyint_sched_maybe_wakeup(sched->index); + ponyint_sched_maybe_wakeup(sched->index, scheduler_count); pony_assert(!ponyint_actor_is_pinned(actor)); @@ -1243,7 +1369,7 @@ static void run(scheduler_t* sched) // parallel. // Try and wake up a sleeping scheduler thread to help with the load. // If there isn't enough work, they'll go back to sleep. - ponyint_sched_maybe_wakeup(sched->index); + ponyint_sched_maybe_wakeup(sched->index, scheduler_count); } } else { // We aren't rescheduling, so run the next actor. This may be NULL if our @@ -1376,9 +1502,9 @@ static void perhaps_suspend_pinned_actor_scheduler( /** * Run a custom scheduler thread for pinned actors until termination. - * This thread does not partiticpate in most normal scheduler messaging - * like CNF/ACK/block/unblock/suspend/noisy/unnoisy. it does participate in - * muting messages and termination messages. + * This thread does not partiticpate in some normal scheduler messaging + * like block/unblock/noisy/unnoisy. it does participate in + * muting messages, CNF/ACK, POOL_ALLOCS and termination messages. */ static void run_pinned_actors() { @@ -1885,13 +2011,13 @@ void ponyint_sched_unnoisy_asio(int32_t from) // Maybe wake up a scheduler thread if possible void ponyint_sched_maybe_wakeup_if_all_asleep(int32_t current_scheduler_id) { - uint32_t current_active_scheduler_count = get_active_scheduler_count(); + uint32_t current_active_scheduler_count = -1; // wake up threads if the current active count is 0 // keep trying until successful to avoid deadlock while((current_active_scheduler_count = get_active_scheduler_count()) == 0) { - ponyint_sched_maybe_wakeup(current_scheduler_id); + ponyint_sched_maybe_wakeup(current_scheduler_id, 1); current_active_scheduler_count = get_active_scheduler_count(); @@ -1914,12 +2040,12 @@ void ponyint_sched_maybe_wakeup_if_all_asleep(int32_t current_scheduler_id) } // Maybe wake up a scheduler thread if possible -void ponyint_sched_maybe_wakeup(int32_t current_scheduler_id) +void ponyint_sched_maybe_wakeup(int32_t current_scheduler_id, uint32_t max_schedulers_required) { uint32_t current_active_scheduler_count = get_active_scheduler_count(); // if we have some schedulers that are sleeping, wake one up - if((current_active_scheduler_count < scheduler_count) && + if((current_active_scheduler_count < max_schedulers_required) && #if defined(USE_SCHEDULER_SCALING_PTHREADS) // try to acquire mutex if using pthreads !pthread_mutex_trylock(&sched_mut) @@ -1935,7 +2061,7 @@ void ponyint_sched_maybe_wakeup(int32_t current_scheduler_id) // in case the count changed between the while check and now current_active_scheduler_count = get_active_scheduler_count(); - if(current_active_scheduler_count < scheduler_count) + if(current_active_scheduler_count < max_schedulers_required) { // increment active_scheduler_count to wake a new scheduler up current_active_scheduler_count++; diff --git a/src/libponyrt/sched/scheduler.h b/src/libponyrt/sched/scheduler.h index 4fefd6c051..1060ee4487 100644 --- a/src/libponyrt/sched/scheduler.h +++ b/src/libponyrt/sched/scheduler.h @@ -150,7 +150,7 @@ void ponyint_sched_noisy_asio(int32_t from); void ponyint_sched_unnoisy_asio(int32_t from); // Try and wake up a sleeping scheduler thread to help with load -void ponyint_sched_maybe_wakeup(int32_t current_scheduler_id); +void ponyint_sched_maybe_wakeup(int32_t current_scheduler_id, uint32_t max_schedulers_required); // Try and wake up a sleeping scheduler thread only if all scheduler // threads are asleep