-
Notifications
You must be signed in to change notification settings - Fork 429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async pools #3433
Async pools #3433
Conversation
This comment has been minimized.
This comment has been minimized.
Codecov Report
@@ Coverage Diff @@
## master #3433 +/- ##
==========================================
+ Coverage 80.78% 80.83% +0.04%
==========================================
Files 415 415
Lines 32306 32266 -40
==========================================
- Hits 26100 26083 -17
+ Misses 6206 6183 -23
Continue to review full report at Codecov.
|
c909eac
to
5a687a9
Compare
This comment has been minimized.
This comment has been minimized.
5a687a9
to
976ac65
Compare
This comment has been minimized.
This comment has been minimized.
976ac65
to
60b4a88
Compare
This comment has been minimized.
This comment has been minimized.
c04f26a
to
7671aea
Compare
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good in general, my main concern is the low test coverage, the workers are tested only for storing messages one by one. A load test of this PR for the PM and MUC scenarios with MAM enabled should be enough here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks pretty nice!
InsertResult = | ||
case MessageCount of | ||
MaxSize -> | ||
mongoose_rdbms:execute(HostType, insert_mam_messages, lists:append(Rows)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line could be covered with batch_size = 1
case InsertResult of | ||
{updated, _Count} -> ok; | ||
{error, Reason} -> | ||
mongoose_metrics:update(HostType, modMamDropped, MessageCount), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we would need a drop test to test that the metric is incremented. Maybe not in this PR
src/mongoose_batch_worker.erl
Outdated
flush_queue_len => length(State#state.flush_queue)}), | ||
NewState = do_run_flush(State#state{flush_queue = lists:reverse(Queue), | ||
flush_interval_tref = undefined}), | ||
erlang:garbage_collect(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering if that would collect Queue
(something says to me it will still keep Queue, because the caller is still referencing it).
There is another way to collect for gen_server: is to return {noreply,NewState,hibernate}
after a flush, could be more efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hibernate more efficient, not at all, because it will not only GC the heap, but also collapse the whole process memory to a PCB, which will then require a new heap when the process is woken up, which will be a very small heap because hibernation imposes a shrink as well, that will very likely need to be grown once the queue starts filling... no way.
But I'll make sure the queue is GC'ed here, hold on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You were right, it wasn't collected. But got how to, see ee83696 😁
617c016
to
ee83696
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
294568a
to
ee83696
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok to me, only the load test concerns remain.
ee83696
to
89acc2b
Compare
This comment has been minimized.
This comment has been minimized.
src/mongoose_batch_worker.erl
Outdated
|
||
% Don't leak the tasks to logs, can contain private information | ||
format_status(_Opt, [_PDict, State | _]) -> | ||
[{data, [{"State", State#state{flush_queue = []}}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush_queue = censored
;)
actually, setting it to a valid value here (i.e. []) could be pretty confusing, when people try to call for status.
could be = hidden or sanitized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
censored returns a dialyzer error here, but can add a type for that in the typedef
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sad :(
Actually, just actually, why not to convert State
into a map in format status? :)
It is much easier to read ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a super canonical example for format_status was actually converting state into a proplist :)
(it is what I think about when see that gen-server callback).
89acc2b
to
d4f68a1
Compare
d4f68a1
to
92a3089
Compare
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Brilliant. Merging after a load test done by Nelson :)
92a3089
to
19e818d
Compare
This comment has been minimized.
This comment has been minimized.
This is a gen_server, whose purpose is to be part of a worker pool. The server accumulates tasks to take action to, until the number of tasks reaches a configured threshold, or, a timeout is fired since the first task that was requested in this batch (this is to ensure that no queue awaits too long for action). Note that we don't trigger timeouts unless the queue is not empty, to avoid waking up an unused worker. The worker also takes a callback, and a map with any metadata the callback may need. When is time to flush a queue, it is this callback that will be ran, taking the queue in the order in which it has been submitted, and the optionally provided metadata. Most of the code for this worker is learnt from MAM's async workers, with some changes or learnt lessons: * Note that this worker sets info logs on start and terminate. * There is also a debug log on queue flushes. * The terminate callback flushes the task queue if there's any task pending, to try to avoid losing tasks inadvertently. * The format_status callback prevents printing the actual tasks on crashes or system state lookups, as the tasks might contain data private to clients under any GDPR-like legal constrains. * Garbage collection is triggered after every flush, first to ensure saving memory, but second, to ensure the release of any binaries this worker might be holding.
This module implements a worker pool abstraction to handle batch workers. It will start a transient supervisor under ejabberd_sup, that will spawn a pool using the `worker_pool` library. This will then supervise the three supervisors that `worker_pool` supervises (time-manager, queue-manager, and worker-sup). One of these, 'worker-sup', will in turn spawn all the mongoose_batch_worker workers with the correct configuration.
First of all, remove all the old modules implementing the mam rdbms async pools. Then, create a new module called mod_mam_rdbms_arch_async, that abstract what the previous two where doing, by using the modules created in the two previous commits. This module starts the pools with all the correct parameters, and registers the hooks that will then call `worker_pool` using the `hash_worker` strategy, to ensure MAM entries to a single archive are all processed in parallel. Note a few lessons here: * Selecting the worker isn't sound based on the `rem` operator, because the distribution of archives over workers is not at all ensured to be uniform. `worker_pool` uses `erlang:phash2/2`, which ensures such uniform distribution. * The callbacks for the batch workers abstract all the metrics. * mod_mam_meta parsing has breaking changes: this is what was reported as broken in the previous implementation. Now there's an entire toml section called `async_writer`, that can be enabled, disabled and configured both globally for mam, or in a fine-grained manner within the `pm` and `muc` sections.
The purpose of this is for the supervisor to be able to dynamically generate data that will be passed to the workers, an example being, the supervisor might create ets tables. We keep this in the context of the supervisor and its workers, to keep the abstraction isolated to this supervision tree: an ets table may not make any sense outside of this supervision tree, we might want the ets table not to even be named, so that nobody outside of the tree will have access to it, only the workers will get an opaque reference. We will also want the ets tables to be cleaned when the supervision tree dies, as the data might not make sense on a newly restarted tree anymore, or we will want the tree to die fast if the ets table is lost. This init callback can also set global configuration flags, or send messages, or log something, or define new metrics, or many other things that for example the current MongooseIM init callbacks for `mongoose_wpool` are already doing.
The whole operation to build lists and append them and generate an atom in the end is quite expensive: not only it generates a lot of lists and copies of lists that later need to be garbage collected, but also, the `list_to_atom` operation is a concurrent bottleneck: the atom table is not optimised for inserts, only for reads, this one needs to grab locks and check if the atom already existed and so on. Instead, store the name on a persistent_term record, and simply fetch that when needed, making the hotter part of the code, finding the pool to submit a task to, faster.
Note that running garbage collection within a function will not collect the references this function has, so the queue will actually not be cleared after being flushed as it was expected. Instead, we use the option `async` for the GC. This will make this process end its reductions, and once it has been preempted, it will be scheduled for garbage collection, and thereafter, a message will be delivered notifying him of so.
19e818d
to
6479445
Compare
small_tests_24 / small_tests / 6479445 small_tests_23 / small_tests / 6479445 dynamic_domains_pgsql_mnesia_23 / pgsql_mnesia / 6479445 dynamic_domains_pgsql_mnesia_24 / pgsql_mnesia / 6479445 dynamic_domains_mysql_redis_24 / mysql_redis / 6479445 dynamic_domains_mssql_mnesia_24 / odbc_mssql_mnesia / 6479445 mam_SUITE:rdbms_async_cache_mam_all:mam06:retract_message{error,{{archive_size,1,[{times,200,0}]},
[{mongoose_helper,do_wait_until,2,
[{file,"/home/circleci/project/big_tests/tests/mongoose_helper.erl"},
{line,377}]},
{mam_SUITE,'-retract_message/1-fun-0-',4,
[{file,"/home/circleci/project/big_tests/tests/mam_SUITE.erl"},
{line,1927}]},
{escalus_story,story,4,
[{file,"/home/circleci/project/big_tests/_build/default/lib/escalus/src/escalus_story.erl"},
{line,72}]},
{test_server,ts_tc,3,[{file,"test_server.erl"},{line,1783}]},
{test_server,run_test_case_eval1,6,
[{file,"test_server.erl"},{line,1292}]},
{test_server,run_test_case_eval,9,
[{file,"test_server.erl"},{line,1224}]}]}} ldap_mnesia_24 / ldap_mnesia / 6479445 ldap_mnesia_23 / ldap_mnesia / 6479445 internal_mnesia_24 / internal_mnesia / 6479445 elasticsearch_and_cassandra_24 / elasticsearch_and_cassandra_mnesia / 6479445 pgsql_mnesia_24 / pgsql_mnesia / 6479445 pgsql_mnesia_23 / pgsql_mnesia / 6479445 mysql_redis_24 / mysql_redis / 6479445 riak_mnesia_24 / riak_mnesia / 6479445 dynamic_domains_mssql_mnesia_24 / odbc_mssql_mnesia / 6479445 service_domain_db_SUITE:db:db_keeps_syncing_after_cluster_join{error,{test_case_failed,{[<<"example1.com">>],
[<<"example1.com">>,<<"example2.com">>]}}} service_domain_db_SUITE:db:rest_with_auth:rest_delete_domain_cleans_data_from_mam{error,
{timeout_when_waiting_for_stanza,
[{escalus_client,wait_for_stanza,
[{client,
<<"bob_rest_delete_domain_cleans_data_from_mam_1935@example.org/res1">>,
escalus_tcp,<0.29327.1>,
[{event_manager,<0.29321.1>},
{server,<<"example.org">>},
{username,
<<"bob_rest_delete_domain_cleans_data_from_mam_1935">>},
{resource,<<"res1">>}],
[{event_client,
[{event_manager,<0.29321.1>},
{server,<<"example.org">>},
{username,
<<"bob_rest_delete_domain_cleans_data_from_mam_1935">>},
{resource,<<"res1">>}]},
{resource,<<"res1">>},
{username,
<<"bob_rest_delete_domain_cleans_data_from_mam_1935">>},
{server,<<"example.org">>},
{host,<<"localhost">>},
{port,5232},
{auth,{escalus_auth,auth_plain}},
{wspath,undefined},
{username,
<<"bob_rest_delete_domain_cleans_data_from_mam_1935">>},
{server,<<"example.org">>},
{host,<<"localhost">>},
{password,<<"makota3">>},
{port,5232},
{stream_id,<<"7a54345859a684a1">>}]},
5000],
[{file,
"/home/circleci/project/big_tests/_build/default/lib/escalus/src/escalus_client.erl"},
{line,136}]},
{service_domain_db_SUITE,
'-rest_delete_domain_cleans_data_from_mam/1-fun-0-',5,
[{file,
... service_domain_db_SUITE:db:rest_without_auth:rest_delete_domain_cleans_data_from_mam{error,
{timeout_when_waiting_for_stanza,
[{escalus_client,wait_for_stanza,
[{client,
<<"bob_rest_delete_domain_cleans_data_from_mam_1936@example.org/res1">>,
escalus_tcp,<0.29923.1>,
[{event_manager,<0.29917.1>},
{server,<<"example.org">>},
{username,
<<"bob_rest_delete_domain_cleans_data_from_mam_1936">>},
{resource,<<"res1">>}],
[{event_client,
[{event_manager,<0.29917.1>},
{server,<<"example.org">>},
{username,
<<"bob_rest_delete_domain_cleans_data_from_mam_1936">>},
{resource,<<"res1">>}]},
{resource,<<"res1">>},
{username,
<<"bob_rest_delete_domain_cleans_data_from_mam_1936">>},
{server,<<"example.org">>},
{host,<<"localhost">>},
{port,5232},
{auth,{escalus_auth,auth_plain}},
{wspath,undefined},
{username,
<<"bob_rest_delete_domain_cleans_data_from_mam_1936">>},
{server,<<"example.org">>},
{host,<<"localhost">>},
{password,<<"makota3">>},
{port,5232},
{stream_id,<<"bc16d79a34247558">>}]},
5000],
[{file,
"/home/circleci/project/big_tests/_build/default/lib/escalus/src/escalus_client.erl"},
{line,136}]},
{service_domain_db_SUITE,
'-rest_delete_domain_cleans_data_from_mam/1-fun-0-',5,
[{file,
... dynamic_domains_mssql_mnesia_24 / odbc_mssql_mnesia / 6479445 mam_SUITE:rdbms_async_cache_mam_all:mam06:retract_wrong_message{error,{{archive_size,1,[{times,200,0}]},
[{mongoose_helper,do_wait_until,2,
[{file,"/home/circleci/project/big_tests/tests/mongoose_helper.erl"},
{line,377}]},
{mam_SUITE,'-retract_message/1-fun-0-',4,
[{file,"/home/circleci/project/big_tests/tests/mam_SUITE.erl"},
{line,1927}]},
{escalus_story,story,4,
[{file,"/home/circleci/project/big_tests/_build/default/lib/escalus/src/escalus_story.erl"},
{line,72}]},
{test_server,ts_tc,3,[{file,"test_server.erl"},{line,1783}]},
{test_server,run_test_case_eval1,6,
[{file,"test_server.erl"},{line,1292}]},
{test_server,run_test_case_eval,9,
[{file,"test_server.erl"},{line,1224}]}]}} mam_SUITE:rdbms_async_cache_mam_all:mam06:querying_for_all_messages_with_jid{error,{test_case_failed,"Respond size is 7, 17 is expected."}} vcard_SUITE:ro_full:retrieve_own_card{error,{test_case_failed,"Expected <<\"alice\">> got undefined\n"}} dynamic_domains_mssql_mnesia_24 / odbc_mssql_mnesia / 6479445 |
This PR learns from the MAM async workers, and tries to separate two different concepts into two different abstractions: one, is the concept of a pool of workers that accumulate tasks, and act on the tasks every time a batch size is reached, or a timeout since the first submitted task is triggered; the second, is how MAM acts on the batches.
I attempt here to implement the first concept in a very generic way, using the new modules
mongoose_batch_worker
andmongoose_async_pools
, as an abstraction that takes the batch size, the flush timeout, and a callback to be run on the accumulated queues. This is then used by a new mam async module, but the goal is, that it can be used as well in for example a future inbox async module, or any other extension that would benefit from batching (I know of quite a few myself!).Note that in the case of inbox, the batches would accumulate inbox changes for a user and then actually only submit the last one to the DB, as, note, unlike mam being an append-only operation, inbox is an update-only operation, so applying all inbox changes sequentially is equivalent to applying the last one.
NOTE: please do read the commit messages, as they contain really detailed information on the code submitted on each of them.