Skip to content

Commit

Permalink
Use batch APIs to create arrays and hashes
Browse files Browse the repository at this point in the history
Basically a port of ruby/json#678

Rather than to allocate the container and push elements one by one,
we accumulate them on a stack and then use the faster batch APIs
to directly create the final container.

The original action stack remains, as we need to keep track of what
we're currently parsing and how big it is.

But for the recursive cases, we no longer need to create a child stack.
  • Loading branch information
byroot committed Nov 8, 2024
1 parent f028dca commit 2ffc3c9
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 73 deletions.
1 change: 1 addition & 0 deletions ext/msgpack/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
have_func("rb_enc_interned_str", "ruby.h") # Ruby 3.0+
have_func("rb_hash_new_capa", "ruby.h") # Ruby 3.2+
have_func("rb_proc_call_with_block", "ruby.h") # CRuby (TruffleRuby doesn't have it)
have_func("rb_hash_bulk_insert", "ruby.h") # Ruby 2.6+ and missing on TruffleRuby 24.0

append_cflags([
"-fvisibility=hidden",
Expand Down
199 changes: 137 additions & 62 deletions ext/msgpack/unpacker.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ static VALUE protected_proc_call(VALUE proc, int argc, VALUE *argv, int *raised)

static int RAW_TYPE_STRING = 256;
static int RAW_TYPE_BINARY = 257;
static int16_t INITIAL_BUFFER_CAPACITY_MAX = SHRT_MAX;

static msgpack_rmem_t s_stack_rmem;

Expand All @@ -60,9 +59,106 @@ static inline VALUE rb_hash_new_capa(long capa)
}
#endif

static inline int16_t initial_buffer_size(long size)
#ifndef HAVE_RB_HASH_BULK_INSERT
void rb_hash_bulk_insert(long count, const VALUE *pairs, VALUE hash)
{
return (size > INITIAL_BUFFER_CAPACITY_MAX) ? INITIAL_BUFFER_CAPACITY_MAX : size;
long index = 0;
while (index < count) {
VALUE name = pairs[index++];
VALUE value = pairs[index++];
rb_hash_aset(hash, name, value);
}
RB_GC_GUARD(hash);
}
#endif

/* rvalue_stack functions */

static inline void _msgpack_unpacker_rvalue_stack_free(msgpack_rvalue_stack_t* value_stack) {
switch (value_stack->type) {
case STACK_TYPE_UNALLOCATED:
break;
case STACK_TYPE_RMEM:
if (!msgpack_rmem_free(&s_stack_rmem, value_stack->data)) {
rb_bug("Failed to free an rmem pointer, memory leak?");
}
break;
case STACK_TYPE_HEAP:
xfree(value_stack->data);
break;
}
memset(value_stack, 0, sizeof(msgpack_rvalue_stack_t));
}

static void _msgpack_unpacker_rvalue_stack_grow(msgpack_rvalue_stack_t* value_stack) {
switch (value_stack->type) {
case STACK_TYPE_UNALLOCATED: {
value_stack->data = msgpack_rmem_alloc(&s_stack_rmem);
value_stack->capacity = MSGPACK_RMEM_PAGE_SIZE / sizeof(VALUE);
value_stack->type = STACK_TYPE_RMEM;
break;
}
case STACK_TYPE_RMEM: {
size_t new_capacity = value_stack->capacity * 2;
VALUE *new_ptr = ALLOC_N(VALUE, new_capacity);
MEMCPY(new_ptr, value_stack->data, VALUE, value_stack->depth);
if (!msgpack_rmem_free(&s_stack_rmem, value_stack->data)) {
rb_bug("Failed to free an rmem pointer, memory leak?");
}
value_stack->type = STACK_TYPE_HEAP;
value_stack->data = new_ptr;
value_stack->capacity = new_capacity;
break;
}
case STACK_TYPE_HEAP: {
size_t new_capacity = value_stack->capacity * 2;
REALLOC_N(value_stack->data, VALUE, new_capacity);
value_stack->capacity = new_capacity;
break;
}
}
}

static inline void _msgpack_unpacker_rvalue_stack_push(msgpack_unpacker_t* uk, VALUE value) {
size_t free_slots = uk->value_stack.capacity - uk->value_stack.depth;

if (RB_UNLIKELY(free_slots == 0)) {
_msgpack_unpacker_rvalue_stack_grow(&uk->value_stack);
free_slots = uk->value_stack.capacity - uk->value_stack.depth;
}

RB_OBJ_WRITE(uk->self, &uk->value_stack.data[uk->value_stack.depth++], value);
}

static inline VALUE _msgpack_unpacker_rvalue_stack_create_array(msgpack_unpacker_t* uk, size_t length) {
VALUE *data = &uk->value_stack.data[uk->value_stack.depth - length];

VALUE array = rb_ary_new_from_values(length, data);

RB_OBJ_WRITE(uk->self, data, array);
uk->value_stack.depth -= (length - 1);

return RB_GC_GUARD(array);
}

static inline VALUE _msgpack_unpacker_rvalue_stack_create_hash(msgpack_unpacker_t* uk, size_t items_count) {
size_t length = items_count / 2;
VALUE *data = &uk->value_stack.data[uk->value_stack.depth - items_count];

VALUE hash = rb_hash_new_capa(length);
rb_hash_bulk_insert(items_count, data, hash);

RB_OBJ_WRITE(uk->self, data, hash);
uk->value_stack.depth -= (items_count - 1);

return RB_GC_GUARD(hash);
}

void msgpack_unpacker_mark_rvalue_stack(msgpack_rvalue_stack_t* value_stack)
{
if (value_stack->data) {
rb_gc_mark_locations(value_stack->data, value_stack->data + value_stack->depth);
}
}

void msgpack_unpacker_static_init(void)
Expand Down Expand Up @@ -108,33 +204,16 @@ static inline void _msgpack_unpacker_free_stack(msgpack_unpacker_stack_t* stack)

void _msgpack_unpacker_destroy(msgpack_unpacker_t* uk)
{
msgpack_unpacker_stack_t *stack;
while ((stack = uk->stack)) {
uk->stack = stack->parent;
_msgpack_unpacker_free_stack(stack);
}

_msgpack_unpacker_free_stack(uk->stack);
_msgpack_unpacker_rvalue_stack_free(&uk->value_stack);
msgpack_buffer_destroy(UNPACKER_BUFFER_(uk));
}

void msgpack_unpacker_mark_stack(msgpack_unpacker_stack_t* stack)
{
while (stack) {
msgpack_unpacker_stack_entry_t* s = stack->data;
msgpack_unpacker_stack_entry_t* send = stack->data + stack->depth;
for(; s < send; s++) {
rb_gc_mark(s->object);
rb_gc_mark(s->key);
}
stack = stack->parent;
}
}

void msgpack_unpacker_mark(msgpack_unpacker_t* uk)
{
rb_gc_mark(uk->last_object);
rb_gc_mark(uk->reading_raw);
msgpack_unpacker_mark_stack(uk->stack);
msgpack_unpacker_mark_rvalue_stack(&uk->value_stack);
/* See MessagePack_Buffer_wrap */
/* msgpack_buffer_mark(UNPACKER_BUFFER_(uk)); */
rb_gc_mark(uk->buffer_ref);
Expand All @@ -149,6 +228,7 @@ void _msgpack_unpacker_reset(msgpack_unpacker_t* uk)

/*memset(uk->stack, 0, sizeof(msgpack_unpacker_t) * uk->stack->depth);*/
uk->stack->depth = 0;
uk->value_stack.depth = 0;
uk->last_object = Qnil;
uk->reading_raw = Qnil;
uk->reading_raw_remaining = 0;
Expand Down Expand Up @@ -186,13 +266,15 @@ static inline int object_complete(msgpack_unpacker_t* uk, VALUE object)
}

uk->last_object = object;
_msgpack_unpacker_rvalue_stack_push(uk, object);
reset_head_byte(uk);
return PRIMITIVE_OBJECT_COMPLETE;
}

static inline int object_complete_symbol(msgpack_unpacker_t* uk, VALUE object)
{
uk->last_object = object;
_msgpack_unpacker_rvalue_stack_push(uk, object);
reset_head_byte(uk);
return PRIMITIVE_OBJECT_COMPLETE;
}
Expand All @@ -212,12 +294,14 @@ static inline int object_complete_ext(msgpack_unpacker_t* uk, int ext_type, VALU
if(proc != Qnil) {
VALUE obj;
VALUE arg = (str == Qnil ? rb_str_buf_new(0) : str);

int raised;
obj = protected_proc_call(proc, 1, &arg, &raised);
if (raised) {
uk->last_object = rb_errinfo();
return PRIMITIVE_RECURSIVE_RAISED;
}

return object_complete(uk, obj);
}

Expand All @@ -235,7 +319,7 @@ static inline msgpack_unpacker_stack_entry_t* _msgpack_unpacker_stack_entry_top(
return &uk->stack->data[uk->stack->depth-1];
}

static inline int _msgpack_unpacker_stack_push(msgpack_unpacker_t* uk, enum stack_type_t type, size_t count, VALUE object)
static inline int _msgpack_unpacker_stack_push(msgpack_unpacker_t* uk, enum stack_type_t type, size_t count)
{
reset_head_byte(uk);

Expand All @@ -245,22 +329,20 @@ static inline int _msgpack_unpacker_stack_push(msgpack_unpacker_t* uk, enum stac

msgpack_unpacker_stack_entry_t* next = &uk->stack->data[uk->stack->depth];
next->count = count;
next->size = count;
next->type = type;
next->object = object;
next->key = Qnil;

uk->stack->depth++;
return PRIMITIVE_CONTAINER_START;
}

static inline VALUE msgpack_unpacker_stack_pop(msgpack_unpacker_t* uk)
static inline size_t msgpack_unpacker_stack_pop(msgpack_unpacker_t* uk)
{
return --uk->stack->depth;
}

static inline bool msgpack_unpacker_stack_is_empty(msgpack_unpacker_t* uk)
{
return uk->stack->depth == 0;
return uk->stack->depth == 0 || _msgpack_unpacker_stack_entry_top(uk)->type == STACK_TYPE_RECURSIVE;
}

#ifdef USE_CASE_RANGE
Expand Down Expand Up @@ -290,9 +372,7 @@ static inline bool is_reading_map_key(msgpack_unpacker_t* uk)
{
if(uk->stack->depth > 0) {
msgpack_unpacker_stack_entry_t* top = _msgpack_unpacker_stack_entry_top(uk);
if(top->type == STACK_TYPE_MAP_KEY) {
return true;
}
return top->type == STACK_TYPE_MAP && (top->count % 2 == 0);
}
return false;
}
Expand Down Expand Up @@ -343,17 +423,17 @@ static inline int read_raw_body_begin(msgpack_unpacker_t* uk, int raw_type)
reset_head_byte(uk);
uk->reading_raw_remaining = 0;

msgpack_unpacker_stack_t* child_stack = _msgpack_unpacker_new_stack();
child_stack->parent = uk->stack;
uk->stack = child_stack;

_msgpack_unpacker_stack_push(uk, STACK_TYPE_RECURSIVE, 1);
size_t value_stack_depth = uk->value_stack.depth;
int raised;
obj = protected_proc_call(proc, 1, &uk->self, &raised);
uk->stack = child_stack->parent;
_msgpack_unpacker_free_stack(child_stack);
uk->value_stack.depth = value_stack_depth;
msgpack_unpacker_stack_pop(uk);

if (raised) {
uk->last_object = rb_errinfo();
_msgpack_unpacker_rvalue_stack_push(uk, Qnil);
return PRIMITIVE_RECURSIVE_RAISED;
}

Expand Down Expand Up @@ -418,14 +498,14 @@ static int read_primitive(msgpack_unpacker_t* uk)
if(count == 0) {
return object_complete(uk, rb_ary_new());
}
return _msgpack_unpacker_stack_push(uk, STACK_TYPE_ARRAY, count, rb_ary_new2(initial_buffer_size(count)));
return _msgpack_unpacker_stack_push(uk, STACK_TYPE_ARRAY, count);

SWITCH_RANGE(b, 0x80, 0x8f) // FixMap
int count = b & 0x0f;
if(count == 0) {
return object_complete(uk, rb_hash_new());
}
return _msgpack_unpacker_stack_push(uk, STACK_TYPE_MAP_KEY, count*2, rb_hash_new_capa(initial_buffer_size(count)));
return _msgpack_unpacker_stack_push(uk, STACK_TYPE_MAP, count*2);

SWITCH_RANGE(b, 0xc0, 0xdf) // Variable
switch(b) {
Expand Down Expand Up @@ -648,7 +728,7 @@ static int read_primitive(msgpack_unpacker_t* uk)
if(count == 0) {
return object_complete(uk, rb_ary_new());
}
return _msgpack_unpacker_stack_push(uk, STACK_TYPE_ARRAY, count, rb_ary_new2(initial_buffer_size(count)));
return _msgpack_unpacker_stack_push(uk, STACK_TYPE_ARRAY, count);
}

case 0xdd: // array 32
Expand All @@ -658,7 +738,7 @@ static int read_primitive(msgpack_unpacker_t* uk)
if(count == 0) {
return object_complete(uk, rb_ary_new());
}
return _msgpack_unpacker_stack_push(uk, STACK_TYPE_ARRAY, count, rb_ary_new2(initial_buffer_size(count)));
return _msgpack_unpacker_stack_push(uk, STACK_TYPE_ARRAY, count);
}

case 0xde: // map 16
Expand All @@ -668,7 +748,7 @@ static int read_primitive(msgpack_unpacker_t* uk)
if(count == 0) {
return object_complete(uk, rb_hash_new());
}
return _msgpack_unpacker_stack_push(uk, STACK_TYPE_MAP_KEY, count*2, rb_hash_new_capa(initial_buffer_size(count)));
return _msgpack_unpacker_stack_push(uk, STACK_TYPE_MAP, count*2);
}

case 0xdf: // map 32
Expand All @@ -678,7 +758,7 @@ static int read_primitive(msgpack_unpacker_t* uk)
if(count == 0) {
return object_complete(uk, rb_hash_new());
}
return _msgpack_unpacker_stack_push(uk, STACK_TYPE_MAP_KEY, count*2, rb_hash_new_capa(initial_buffer_size(count)));
return _msgpack_unpacker_stack_push(uk, STACK_TYPE_MAP, count*2);
}

default:
Expand Down Expand Up @@ -766,28 +846,23 @@ int msgpack_unpacker_read(msgpack_unpacker_t* uk, size_t target_stack_depth)
container_completed:
{
msgpack_unpacker_stack_entry_t* top = _msgpack_unpacker_stack_entry_top(uk);
switch(top->type) {
case STACK_TYPE_ARRAY:
rb_ary_push(top->object, uk->last_object);
break;
case STACK_TYPE_MAP_KEY:
top->key = uk->last_object;
top->type = STACK_TYPE_MAP_VALUE;
break;
case STACK_TYPE_MAP_VALUE:
if(uk->symbolize_keys && rb_type(top->key) == T_STRING) {
/* here uses rb_str_intern instead of rb_intern so that Ruby VM can GC unused symbols */
rb_hash_aset(top->object, rb_str_intern(top->key), uk->last_object);
} else {
rb_hash_aset(top->object, top->key, uk->last_object);
}
top->type = STACK_TYPE_MAP_KEY;
break;
}
size_t count = --top->count;

if(count == 0) {
object_complete(uk, top->object);
switch(top->type) {
case STACK_TYPE_ARRAY:
_msgpack_unpacker_rvalue_stack_create_array(uk, top->size);
break;
case STACK_TYPE_MAP:
_msgpack_unpacker_rvalue_stack_create_hash(uk, top->size);
break;
case STACK_TYPE_RECURSIVE:
object_complete(uk, _msgpack_unpacker_rvalue_stack_pop(uk));
return PRIMITIVE_OBJECT_COMPLETE;
break;
}

object_complete(uk, _msgpack_unpacker_rvalue_stack_pop(uk));
if(msgpack_unpacker_stack_pop(uk) <= target_stack_depth) {
return PRIMITIVE_OBJECT_COMPLETE;
}
Expand Down
Loading

0 comments on commit 2ffc3c9

Please sign in to comment.