1414
1515-behaviour (gen_server ).
1616
17+ % % TODO: delete these exports after csrt_logger experiment concluded
1718-export ([
1819 new /0 ,
1920 tnow /0
3738
3839% % Context API
3940-export ([
40- create_context /5 ,
41+ create_context /2 ,
4142 create_coordinator_context /2 ,
4243 create_resource /1 ,
4344 create_worker_context /3 ,
4950 set_context_dbname /2 ,
5051 set_context_handler_fun /1 ,
5152 set_context_handler_fun /2 ,
53+ set_context_handler_fun /3 ,
5254 set_context_username /1 ,
5355 set_context_username /2
5456]).
156158
157159- record (st , {}).
158160
161+ - record (rpc_worker , {
162+ mod :: atom () | '_' ,
163+ func :: atom () | '_' ,
164+ from :: {pid (), reference ()} | '_'
165+ }).
166+
167+ - record (coordinator , {
168+ mod :: atom () | '_' ,
169+ func :: atom () | '_' ,
170+ method :: atom () | '_' ,
171+ path :: binary () | '_'
172+ }).
173+
159174- record (rctx , {
160175 % % Metadata
161176 started_at = ? MODULE :tnow (),
162177 updated_at = ? MODULE :tnow (),
163178 pid_ref ,
164- mfa ,
165179 nonce ,
166- from ,
167- type = unknown , % % unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc
180+ type , % % #coordinator{}/#rpc_worker{}/#replication_worker{}/#compaction_worker
168181 dbname ,
169182 username ,
170- path ,
171183
172184 % % Stats counters
173185 db_open = 0 ,
@@ -216,9 +228,7 @@ create_pid_ref() ->
216228 throw ({epidexist , PidRef0 }),
217229 close_pid_ref (PidRef0 )
218230 end ,
219- PidRef = {self (), make_ref ()},
220- set_pid_ref (PidRef ),
221- PidRef .
231+ {self (), make_ref ()}.
222232
223233close_pid_ref () ->
224234 close_pid_ref (get_pid_ref ()).
@@ -279,9 +289,9 @@ active_int(all) ->
279289 select_by_type (all ).
280290
281291select_by_type (coordinators ) ->
282- ets :select (? MODULE , ets :fun2ms (fun (# rctx {type = { coordinator , _ , _ }} = R ) -> R end ));
292+ ets :select (? MODULE , ets :fun2ms (fun (# rctx {type = # coordinator { }} = R ) -> R end ));
283293select_by_type (workers ) ->
284- ets :select (? MODULE , ets :fun2ms (fun (# rctx {type = { worker , _ , _ }} = R ) -> R end ));
294+ ets :select (? MODULE , ets :fun2ms (fun (# rctx {type = # rpc_worker { }} = R ) -> R end ));
285295select_by_type (all ) ->
286296 ets :tab2list (? MODULE ).
287297
@@ -295,7 +305,7 @@ find_by_pidref(PidRef) ->
295305 [R || # rctx {} = R <- ets :match_object (? MODULE , # rctx {pid_ref = PidRef , _ = '_' })].
296306
297307find_workers_by_pidref (PidRef ) ->
298- [R || # rctx {} = R <- ets :match_object (? MODULE , # rctx {from = PidRef , _ = '_' })].
308+ [R || # rctx {} = R <- ets :match_object (? MODULE , # rctx {type = # rpc_worker { from = PidRef } , _ = '_' })].
299309
300310field (# rctx {pid_ref = Val }, pid_ref ) -> Val ;
301311% % NOTE: Pros and cons to doing these convert functions here
@@ -304,13 +314,12 @@ field(#rctx{pid_ref=Val}, pid_ref) -> Val;
304314% % be jiffy:encode'able. The tricky bit is dynamically encoding the group_by
305315% % structure provided by the caller of *_by aggregator functions below.
306316% % For now, we just always return jiffy:encode'able data types.
307- field (# rctx {mfa = Val }, mfa ) -> convert_mfa (Val );
308317field (# rctx {nonce = Val }, nonce ) -> Val ;
309- field (# rctx {from = Val }, from ) -> Val ;
318+ % % field(#rctx{from=Val}, from) -> Val;
310319field (# rctx {type = Val }, type ) -> convert_type (Val );
311320field (# rctx {dbname = Val }, dbname ) -> Val ;
312321field (# rctx {username = Val }, username ) -> Val ;
313- field (# rctx {path = Val }, path ) -> Val ;
322+ % % field(#rctx{path=Val}, path) -> Val;
314323field (# rctx {db_open = Val }, db_open ) -> Val ;
315324field (# rctx {docs_read = Val }, docs_read ) -> Val ;
316325field (# rctx {rows_read = Val }, rows_read ) -> Val ;
@@ -381,42 +390,19 @@ sorted_by(KeyFun, ValFun, AggFun) -> shortened(sorted(group_by(KeyFun, ValFun, A
381390% % Conversion API for outputting JSON
382391% %
383392
384- convert_mfa (MFA ) when is_list (MFA ) ->
385- list_to_binary (MFA );
386- convert_mfa ({M0 , F0 , A0 }) ->
393+ convert_type (# coordinator {method = Verb0 , path = Path , mod = M0 , func = F0 }) ->
387394 M = atom_to_binary (M0 ),
388395 F = atom_to_binary (F0 ),
389- A = integer_to_binary (A0 ),
390- <<M /binary , " :" , F /binary , " /" , A /binary >>;
391- convert_mfa (null ) ->
392- null ;
393- convert_mfa (undefined ) ->
394- null .
395-
396- convert_type (Atom ) when is_atom (Atom ) ->
397- atom_to_binary (Atom );
398- convert_type ({coordinator , Verb0 , Atom0 }) when is_atom (Atom0 ) ->
399- Verb = atom_to_binary (Verb0 ),
400- Atom = atom_to_binary (Atom0 ),
401- <<" coordinator:" , Verb /binary , " :" , Atom /binary >>;
402- convert_type ({coordinator , Verb0 , Path0 }) ->
403396 Verb = atom_to_binary (Verb0 ),
404- Path = list_to_binary (Path0 ),
405- <<" coordinator:" , Verb /binary , " :" , Path /binary >>;
406- convert_type ({worker , M0 , F0 }) ->
397+ <<" coordinator-{" , M /binary , " :" , F /binary , " }:" , Verb /binary , " :" , Path /binary >>;
398+ convert_type (# rpc_worker {mod = M0 , func = F0 , from = From0 }) ->
407399 M = atom_to_binary (M0 ),
408400 F = atom_to_binary (F0 ),
409- <<" worker:" , M /binary , " :" , F /binary >>;
410- convert_type (null ) ->
411- null ;
401+ From = convert_pidref (From0 ),
402+ <<" rpc_worker-{" , From /binary , " }:" , M /binary , " :" , F /binary >>;
412403convert_type (undefined ) ->
413404 null .
414405
415- convert_path (undefined ) ->
416- null ;
417- convert_path (Path ) when is_binary (Path ) ->
418- Path .
419-
420406convert_pidref ({Parent0 , ParentRef0 }) ->
421407 Parent = convert_pid (Parent0 ),
422408 ParentRef = convert_ref (ParentRef0 ),
@@ -437,11 +423,8 @@ to_json(#rctx{}=Rctx) ->
437423 updated_at => Rctx # rctx .updated_at ,
438424 started_at => Rctx # rctx .started_at ,
439425 pid_ref => convert_pidref (Rctx # rctx .pid_ref ),
440- mfa => convert_mfa (Rctx # rctx .mfa ),
441426 nonce => Rctx # rctx .nonce ,
442- from => convert_pidref (Rctx # rctx .from ),
443427 dbname => Rctx # rctx .dbname ,
444- path => convert_path (Rctx # rctx .path ),
445428 username => Rctx # rctx .username ,
446429 db_open => Rctx # rctx .db_open ,
447430 docs_read => Rctx # rctx .docs_read ,
@@ -464,44 +447,40 @@ to_json(#rctx{}=Rctx) ->
464447create_resource (# rctx {} = Rctx ) ->
465448 catch ets :insert (? MODULE , Rctx ).
466449
467- create_worker_context (From , {M ,F ,_A } = MFA , Nonce ) ->
450+ create_worker_context (From , {M ,F ,_A }, Nonce ) ->
468451 case is_enabled () of
469452 true ->
470- create_context (MFA , {worker , M , F }, null , From , Nonce );
453+ Type = # rpc_worker {from = From , mod = M , func = F },
454+ create_context (Type , Nonce );
471455 false ->
472456 false
473457 end .
474458
475- create_coordinator_context (# httpd {} = Req , Path0 ) ->
459+ create_coordinator_context (# httpd {method = Verb , nonce = Nonce } , Path0 ) ->
476460 case is_enabled () of
477461 true ->
478- # httpd {
479- method = Verb ,
480- nonce = Nonce
481- % %path_parts = Parts
482- } = Req ,
483- % %Path = list_to_binary([$/ | io_lib:format("~p", [Parts])]),
484462 Path = list_to_binary ([$/ | Path0 ]),
485- Type = { coordinator , Verb , init },
486- create_context (null , Type , Path , null , Nonce );
463+ Type = # coordinator { method = Verb , path = Path },
464+ create_context (Type , Nonce );
487465 false ->
488466 false
489467 end .
490468
491- create_context (MFA , Type , Path , From , Nonce ) ->
492- PidRef = create_pid_ref (),
493- Rctx = # rctx {
494- from = From ,
495- pid_ref = PidRef ,
496- mfa = MFA ,
497- nonce = Nonce ,
498- path = Path ,
499- type = Type
500- },
469+ create_context (Type , Nonce ) ->
470+ Rctx = new_context (Type , Nonce ),
471+ set_pid_ref (Rctx # rctx .pid_ref ),
501472 erlang :put (? DELTA_TZ , Rctx ),
502473 create_resource (Rctx ),
503474 track (Rctx ),
504- PidRef .
475+ Rctx # rctx .pid_ref .
476+
477+ % % Might be useful to export this but the internal worker types aren't exported
478+ new_context (Type , Nonce ) ->
479+ # rctx {
480+ nonce = Nonce ,
481+ pid_ref = create_pid_ref (),
482+ type = Type
483+ }.
505484
506485set_context_dbname (DbName ) ->
507486 set_context_dbname (DbName , get_pid_ref ()).
@@ -512,17 +491,28 @@ set_context_dbname(DbName, PidRef) ->
512491 is_enabled () andalso update_element (PidRef , [{# rctx .dbname , DbName }]).
513492
514493set_context_handler_fun (Fun ) when is_function (Fun ) ->
515- set_context_handler_fun (Fun , get_pid_ref ()).
516- set_context_handler_fun (_ , undefined ) ->
494+ case is_enabled () of
495+ false ->
496+ ok ;
497+ true ->
498+ FProps = erlang :fun_info (Fun ),
499+ Mod = proplists :get_value (module , FProps ),
500+ Func = proplists :get_value (name , FProps ),
501+ set_context_handler_fun (Mod , Func )
502+ end .
503+
504+ set_context_handler_fun (Mod , Func ) when is_atom (Mod ) andalso is_atom (Func ) ->
505+ set_context_handler_fun (Mod , Func , get_pid_ref ()).
506+
507+ set_context_handler_fun (_ , _ , undefined ) ->
517508 ok ;
518- set_context_handler_fun (Fun , PidRef ) when is_function ( Fun ) ->
509+ set_context_handler_fun (Mod , Func , PidRef ) when is_atom ( Mod ) andalso is_atom ( Func ) ->
519510 case is_enabled () of
520511 false ->
521512 ok ;
522513 true ->
523- FunName = erlang :fun_to_list (Fun ),
524- # rctx {type = {coordinator , Verb , _ }} = get_resource (),
525- Update = [{# rctx .type , {coordinator , Verb , FunName }}],
514+ # rctx {type = # coordinator {}= Coordinator } = get_resource (),
515+ Update = [{# rctx .type , Coordinator # coordinator {mod = Mod , func = Func }}],
526516 update_element (PidRef , Update )
527517 end .
528518
@@ -796,9 +786,9 @@ should_log(#rctx{}, true) ->
796786 true ;
797787should_log (# rctx {}, false ) ->
798788 false ;
799- should_log (# rctx {type = { coordinator , _ , _ }}, coordinator ) ->
789+ should_log (# rctx {type = # coordinator { }}, coordinator ) ->
800790 true ;
801- should_log (# rctx {type = { worker , fabric_rpc , FName }}, _ ) ->
791+ should_log (# rctx {type = # rpc_worker { mod = fabric_rpc , func = FName }}, _ ) ->
802792 case conf_get (" log_fabric_rpc" ) of
803793 " true" ->
804794 true ;
0 commit comments