Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ets:update_counter #1

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions libs/estdlib/src/ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
insert/2,
lookup/2,
lookup_element/3,
delete/2
delete/2,
update_counter/3,
update_counter/4
]).

-export_type([
Expand Down Expand Up @@ -63,7 +65,7 @@ new(_Name, _Options) ->
%% @doc Insert an entry into an ets table.
%% @end
%%-----------------------------------------------------------------------------
-spec insert(Table :: table(), Entry :: tuple()) -> true.
-spec insert(Table :: table(), Entry :: tuple() | [tuple()]) -> true.
insert(_Table, _Entry) ->
erlang:nif_error(undefined).

Expand Down Expand Up @@ -101,3 +103,41 @@ lookup_element(_Table, _Key, _Pos) ->
-spec delete(Table :: table(), Key :: term()) -> true.
delete(_Table, _Key) ->
erlang:nif_error(undefined).

%%-----------------------------------------------------------------------------
%% @param Table a reference to the ets table
%% @param Key the key used to look up the entry expecting to contain a tuple of integers or a single integer
%% @param Params the increment value or a tuple {Pos, Increment} or {Pos, Increment, Treshold, SetValue},
%% where Pos is an integer (1-based index) specifying the position in the tuple to increment. Value is clamped to SetValue if it exceeds Threshold after update.
%% @returns the updated element's value after performing the increment, or the default value if applicable
%% @doc Updates a counter value at Key in the table. If Params is a single integer, it increments the direct integer value at Key or the first integer in a tuple. If Params is a tuple {Pos, Increment}, it increments the integer at the specified position Pos in the tuple stored at Key.
%% @end
%%-----------------------------------------------------------------------------
-spec update_counter(
Table :: table(),
Key :: term(),
Params ::
integer() | {pos_integer(), integer()} | {pos_integer(), integer(), integer(), integer()}
) -> integer().
update_counter(_Table, _Key, _Params) ->
erlang:nif_error(undefined).

%%-----------------------------------------------------------------------------
%% @param Table a reference to the ets table
%% @param Key the key used to look up the entry expecting to contain a tuple of integers or a single integer
%% @param Params the increment value or a tuple {Pos, Increment} or {Pos, Increment, Treshold, SetValue},
%% where Pos is an integer (1-based index) specifying the position in the tuple to increment. If after incrementation value exceeds the Treshold, it is set to SetValue.
%% @param Default the default value used if the entry at Key doesn't exist or doesn't contain a valid tuple with a sufficient size or integer at Pos
%% @returns the updated element's value after performing the increment, or the default value if applicable
%% @doc Updates a counter value at Key in the table. If Params is a single integer, it increments the direct integer value at Key or the first integer in a tuple. If Params is a tuple {Pos, Increment}, it increments the integer at the specified position Pos in the tuple stored at Key. If the needed element does not exist, uses Default value as a fallback.
%% @end
%%-----------------------------------------------------------------------------
-spec update_counter(
Table :: table(),
Key :: term(),
Params ::
integer() | {pos_integer(), integer()} | {pos_integer(), integer(), integer(), integer()},
Default :: integer()
) -> integer().
update_counter(_Table, _Key, _Params, _Default) ->
erlang:nif_error(undefined).
182 changes: 164 additions & 18 deletions src/libAtomVM/ets.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,9 @@ static void ets_delete_all_tables(struct Ets *ets, GlobalContext *global)
ets_delete_tables_internal(ets, true_pred, NULL, global);
}

EtsErrorCode ets_insert(term ref, term entry, Context *ctx)
EtsErrorCode ets_table_insert(struct EtsTable *ets_table, term entry, Context *ctx)
{
struct EtsTable *ets_table = term_is_atom(ref) ? ets_get_table_by_name(&ctx->global->ets, ref, TableAccessWrite) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(ref), TableAccessWrite);
if (ets_table == NULL) {
return EtsTableNotFound;
}

if (ets_table->access_type != EtsAccessPublic && ets_table->owner_process_id != ctx->process_id) {
SMP_UNLOCK(ets_table);
return EtsPermissionDenied;
}

Expand All @@ -271,39 +265,66 @@ EtsErrorCode ets_insert(term ref, term entry, Context *ctx)

Heap *heap = malloc(sizeof(Heap));
if (IS_NULL_PTR(heap)) {
SMP_UNLOCK(ets_table);
return EtsAllocationFailure;
}
size_t size = (size_t) memory_estimate_usage(entry);
if (memory_init_heap(heap, size) != MEMORY_GC_OK) {
free(heap);
SMP_UNLOCK(ets_table);
return EtsAllocationFailure;
}

term new_entry = memory_copy_term_tree(heap, entry);
term key = term_get_tuple_element(new_entry, (int) ets_table->keypos);

EtsErrorCode ret = EtsOk;
EtsErrorCode result = EtsOk;
EtsHashtableErrorCode res = ets_hashtable_insert(ets_table->hashtable, key, new_entry, EtsHashtableAllowOverwrite, heap, ctx->global);
if (UNLIKELY(res != EtsHashtableOk)) {
ret = EtsAllocationFailure;
result = EtsAllocationFailure;
}

SMP_UNLOCK(ets_table);
return result;
}

return ret;
EtsErrorCode ets_table_insert_list(struct EtsTable *ets_table, term list, Context *ctx)
{
while (term_is_nonempty_list(list)) {
term tuple = term_get_list_head(list);
if (!term_is_tuple(tuple) && term_get_tuple_arity(tuple) < 1) {
return EtsBadEntry;
}
EtsErrorCode result = ets_table_insert(ets_table, tuple, ctx);
if (result != EtsOk) {
AVM_ABORT(); // Abort because operation might not be atomic.
}

list = term_get_list_tail(list);
}

return EtsOk;
}

EtsErrorCode ets_lookup(term ref, term key, term *ret, Context *ctx)
EtsErrorCode ets_insert(term ref, term entry, Context *ctx)
{
struct EtsTable *ets_table = term_is_atom(ref) ? ets_get_table_by_name(&ctx->global->ets, ref, TableAccessRead) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(ref), TableAccessRead);
struct EtsTable *ets_table = term_is_atom(ref) ? ets_get_table_by_name(&ctx->global->ets, ref, TableAccessWrite) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(ref), TableAccessWrite);
if (ets_table == NULL) {
return EtsTableNotFound;
}
EtsErrorCode result = EtsBadEntry;

if (term_is_tuple(entry) && term_get_tuple_arity(entry) > 0) {
result = ets_table_insert(ets_table, entry, ctx);
} else if (term_is_list(entry)) {
result = ets_table_insert_list(ets_table, entry, ctx);
}

SMP_UNLOCK(ets_table);

return result;
}

EtsErrorCode ets_table_lookup(struct EtsTable *ets_table, term key, term *ret, Context *ctx)
{
if (ets_table->access_type == EtsAccessPrivate && ets_table->owner_process_id != ctx->process_id) {
SMP_UNLOCK(ets_table);
return EtsPermissionDenied;
}

Expand All @@ -316,17 +337,28 @@ EtsErrorCode ets_lookup(term ref, term key, term *ret, Context *ctx)
size_t size = (size_t) memory_estimate_usage(res);
// allocate [object]
if (UNLIKELY(memory_ensure_free_opt(ctx, size + CONS_SIZE, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
SMP_UNLOCK(ets_table);
return EtsAllocationFailure;
}
term new_res = memory_copy_term_tree(&ctx->heap, res);
*ret = term_list_prepend(new_res, term_nil(), &ctx->heap);
}
SMP_UNLOCK(ets_table);

return EtsOk;
}

EtsErrorCode ets_lookup(term ref, term key, term *ret, Context *ctx)
{
struct EtsTable *ets_table = term_is_atom(ref) ? ets_get_table_by_name(&ctx->global->ets, ref, TableAccessRead) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(ref), TableAccessRead);
if (ets_table == NULL) {
return EtsTableNotFound;
}

EtsErrorCode result = ets_table_lookup(ets_table, key, ret, ctx);
SMP_UNLOCK(ets_table);

return result;
}

EtsErrorCode ets_lookup_element(term ref, term key, size_t pos, term *ret, Context *ctx)
{
if (UNLIKELY(pos == 0)) {
Expand Down Expand Up @@ -388,3 +420,117 @@ EtsErrorCode ets_delete(term ref, term key, term *ret, Context *ctx)

return EtsOk;
}

bool operation_to_tuple4(term operation, term *position, term *increment, term *threshold, term *set_value)
{
if (term_is_integer(operation)) {
*increment = operation;
*position = term_from_int(2);
*threshold = term_invalid_term();
*set_value = term_invalid_term();
return true;
}

if (!term_is_tuple(operation)) {
return false;
}
int n = term_get_tuple_arity(operation);
if (n != 2 && n != 4) {
return false;
}

term pos = term_get_tuple_element(operation, 0);
term incr = term_get_tuple_element(operation, 1);
if (!term_is_integer(pos) || !term_is_integer(incr)) {
return false;
}

if (n == 2) {
*position = pos;
*increment = incr;
*threshold = term_invalid_term();
*set_value = term_invalid_term();
return true;
}

term tresh = term_get_tuple_element(operation, 2);
term set_val = term_get_tuple_element(operation, 3);
if (!term_is_integer(tresh) || !term_is_integer(set_val)) {
return false;
}

*position = pos;
*increment = incr;
*threshold = tresh;
*set_value = set_val;
return true;
}

EtsErrorCode ets_update_counter(term ref, term key, term operation, term default_value, term *ret, Context *ctx)
{
struct EtsTable *ets_table = term_is_atom(ref) ? ets_get_table_by_name(&ctx->global->ets, ref, TableAccessWrite) : ets_get_table_by_ref(&ctx->global->ets, term_to_ref_ticks(ref), TableAccessWrite);
if (ets_table == NULL) {
return EtsTableNotFound;
}
term to_insert = term_invalid_term();
term list = term_invalid_term();
EtsErrorCode result = ets_table_lookup(ets_table, key, &list, ctx);
if (result != EtsOk) {
SMP_UNLOCK(ets_table);
return result;
}
if (term_is_nil(list)) {
if (term_is_invalid_term(default_value)) {
SMP_UNLOCK(ets_table);
return EtsBadEntry;
}
to_insert = default_value;
} else {
to_insert = term_get_list_head(list);
}

if (!(term_is_tuple(to_insert))) {
SMP_UNLOCK(ets_table);
return EtsBadEntry;
}
term position_term, increment_term, threshold_term, set_value_term;

if (!operation_to_tuple4(operation, &position_term, &increment_term, &threshold_term, &set_value_term)) {
SMP_UNLOCK(ets_table);
return EtsBadEntry;
}
int arity = term_get_tuple_arity(to_insert);
int position = term_to_int(position_term) - 1;
if (arity <= position || position < 1) {
SMP_UNLOCK(ets_table);
return EtsBadEntry;
}

term elem = term_get_tuple_element(to_insert, position);
if (!term_is_integer(elem)) {
SMP_UNLOCK(ets_table);
return EtsBadEntry;
}
int increment = term_to_int(increment_term);
// We don't check overflow here.
int elem_value = term_to_int(elem) + increment;
if (!term_is_invalid_term(threshold_term) && !term_is_invalid_term(set_value_term)) {
int threshold = term_to_int(threshold_term);
int set_value = term_to_int(set_value_term);

if (increment >= 0 && elem_value > threshold) {
elem_value = set_value;
} else if (increment < 0 && elem_value < threshold) {
elem_value = set_value;
}
}

elem = term_from_int(elem_value);
term_put_tuple_element(to_insert, position, elem);
EtsErrorCode insert_result = ets_table_insert(ets_table, to_insert, ctx);
if (insert_result == EtsOk) {
*ret = elem;
}
SMP_UNLOCK(ets_table);
return insert_result;
}
2 changes: 1 addition & 1 deletion src/libAtomVM/ets.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ EtsErrorCode ets_insert(term ref, term entry, Context *ctx);
EtsErrorCode ets_lookup(term ref, term key, term *ret, Context *ctx);
EtsErrorCode ets_lookup_element(term ref, term key, size_t pos, term *ret, Context *ctx);
EtsErrorCode ets_delete(term ref, term key, term *ret, Context *ctx);

EtsErrorCode ets_update_counter(term ref, term key, term value, term pos, term *ret, Context *ctx);
#ifdef __cplusplus
}
#endif
Expand Down
40 changes: 36 additions & 4 deletions src/libAtomVM/nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ static term nif_ets_insert(Context *ctx, int argc, term argv[]);
static term nif_ets_lookup(Context *ctx, int argc, term argv[]);
static term nif_ets_lookup_element(Context *ctx, int argc, term argv[]);
static term nif_ets_delete(Context *ctx, int argc, term argv[]);
static term nif_ets_update_counter(Context *ctx, int argc, term argv[]);
static term nif_erlang_pid_to_list(Context *ctx, int argc, term argv[]);
static term nif_erlang_ref_to_list(Context *ctx, int argc, term argv[]);
static term nif_erlang_fun_to_list(Context *ctx, int argc, term argv[]);
Expand Down Expand Up @@ -689,6 +690,12 @@ static const struct Nif ets_delete_nif =
.nif_ptr = nif_ets_delete
};

static const struct Nif ets_update_counter_nif =
{
.base.type = NIFFunctionType,
.nif_ptr = nif_ets_update_counter
};

static const struct Nif atomvm_add_avm_pack_binary_nif =
{
.base.type = NIFFunctionType,
Expand Down Expand Up @@ -3310,10 +3317,6 @@ static term nif_ets_insert(Context *ctx, int argc, term argv[])
VALIDATE_VALUE(ref, is_ets_table_id);

term entry = argv[1];
VALIDATE_VALUE(entry, term_is_tuple);
if (term_get_tuple_arity(entry) < 1) {
RAISE_ERROR(BADARG_ATOM);
}

EtsErrorCode result = ets_insert(ref, entry, ctx);
switch (result) {
Expand Down Expand Up @@ -3406,6 +3409,35 @@ static term nif_ets_delete(Context *ctx, int argc, term argv[])
}
}

static term nif_ets_update_counter(Context *ctx, int argc, term argv[])
{
term ref = argv[0];
VALIDATE_VALUE(ref, is_ets_table_id);

term key = argv[1];
term operation = argv[2];
term default_value = term_invalid_term();
if (argc == 4) {
default_value = argv[3];
VALIDATE_VALUE(default_value, term_is_tuple);
term_put_tuple_element(default_value, 0, key);
}
term ret = term_invalid_term();
EtsErrorCode result = ets_update_counter(ref, key, operation, default_value, &ret, ctx);
switch (result) {
case EtsOk:
return ret;
case EtsTableNotFound:
case EtsPermissionDenied:
case EtsBadEntry:
RAISE_ERROR(BADARG_ATOM);
case EtsAllocationFailure:
RAISE_ERROR(MEMORY_ATOM);
default:
AVM_ABORT();
}
}

static term nif_erts_debug_flat_size(Context *ctx, int argc, term argv[])
{
UNUSED(ctx);
Expand Down
2 changes: 2 additions & 0 deletions src/libAtomVM/nifs.gperf
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ ets:insert/2, &ets_insert_nif
ets:lookup/2, &ets_lookup_nif
ets:lookup_element/3, &ets_lookup_element_nif
ets:delete/2, &ets_delete_nif
ets:update_counter/3, &ets_update_counter_nif
ets:update_counter/4, &ets_update_counter_nif
atomvm:add_avm_pack_binary/2, &atomvm_add_avm_pack_binary_nif
atomvm:add_avm_pack_file/2, &atomvm_add_avm_pack_file_nif
atomvm:close_avm_pack/2, &atomvm_close_avm_pack_nif
Expand Down
Loading