Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CA-386582: Always create exporting thread for observers #5294

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion ocaml/libs/tracing/tracing.ml
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ module Export = struct
|> List.concat_map (fun x -> TracerProvider.get_endpoints x)
|> List.iter (export_to_endpoint parent span_list)

let main () =
let create_exporter () =
enable_span_garbage_collector () ;
Thread.create
(fun () ->
Expand All @@ -832,6 +832,21 @@ module Export = struct
done
)
()

let exporter = ref None

let lock = Mutex.create ()

let main () =
Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () ->
match !exporter with
| None ->
let tid = create_exporter () in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are creating an exporter - but assign to a tid - which is a thread ID? Naming could be more congruent such that the function creating a value is related to the variable holding that value. We have here exporter_created, create_exporter and tid which refer to related values but you can't tell this from their names.

Copy link
Contributor Author

@Vincent-lau Vincent-lau Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tid is the thread id of the exporter, it is a temporary variable holding the value of that id. It's just easier to have this temporary variable rather than storing the value in exporter directly, which can be a bit awkward since it is a option value, and what we want to return is the actual value without the option

exporter := Some tid ;
tid
| Some tid ->
tid
)
end
end

Expand Down
4 changes: 2 additions & 2 deletions ocaml/xapi/xapi_cluster_host.ml
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ let resync_host ~__context ~host =
(* Note that join_internal and enable both use the clustering lock *)
Client.Client.Cluster_host.enable ~rpc ~session_id ~self
) ;
Xapi_observer.initialise_component ~__context
Xapi_observer.initialise_observer ~__context
Xapi_observer.Component.Xapi_clusterd ;
let verify = Stunnel_client.get_verify_by_default () in
set_tls_config ~__context ~self ~verify
Expand Down Expand Up @@ -302,7 +302,7 @@ let enable ~__context ~self =
"Cluster_host.enable: xapi-clusterd not running - attempting to start" ;
Xapi_clustering.Daemon.enable ~__context
) ;
Xapi_observer.initialise_component ~__context
Xapi_observer.initialise_observer ~__context
Xapi_observer.Component.Xapi_clusterd ;
let verify = Stunnel_client.get_verify_by_default () in
set_tls_config ~__context ~self ~verify ;
Expand Down
62 changes: 33 additions & 29 deletions ocaml/xapi/xapi_observer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,16 @@ module Xapi_cluster = struct
end
end

(* We startup the observer for clusterd separately in cluster_host so that
(* We start up the observer for clusterd only if clusterd has been enabled
otherwise we initialise clusterd separately in cluster_host so that
there is no need to restart xapi in order for clusterd to be observed.
This does mean that observer will always be enabled for clusterd. *)
let startup_components =
List.filter (( <> ) Component.Xapi_clusterd) Component.all
let startup_components () =
List.filter
(function
| Component.Xapi_clusterd -> !Xapi_clustering.Daemon.enabled | _ -> true
)
Component.all

let get_forwarder c =
let module Forwarder = ( val match c with
Expand All @@ -268,7 +273,7 @@ let observed_hosts_of ~__context hosts =
match hosts with [] -> Db.Host.get_all ~__context | hosts -> hosts

let observed_components_of components =
match components with [] -> startup_components | components -> components
match components with [] -> startup_components () | components -> components

let assert_valid_hosts ~__context hosts =
List.iter
Expand Down Expand Up @@ -428,38 +433,37 @@ let load ~__context component =
)
all

let initialise_component ~__context =
let comp_left = ref (List.map (fun c -> (c, 1)) Component.all) in
fun component ->
load ~__context component ;
List.assoc_opt component !comp_left
|> Option.iter (fun _ ->
comp_left := List.remove_assoc component !comp_left ;
set_trace_log_dir ~__context !Xapi_globs.trace_log_dir component ;
set_export_interval ~__context !Xapi_globs.export_interval component ;
set_max_spans ~__context !Xapi_globs.max_spans component ;
set_max_traces ~__context !Xapi_globs.max_traces component ;
set_max_file_size ~__context
!Xapi_globs.max_observer_file_size
component ;
set_host_id ~__context (Helpers.get_localhost_uuid ()) component ;
set_compress_tracing_files ~__context
!Xapi_globs.compress_tracing_files
component ;
if component = Component.Xapi then
Tracing.Export.set_service_name "xapi" ;
init ~__context component
)
(** Called only when there is a component associated with an observer*)
let initialise_observer_component ~__context component =
load ~__context component

(** Called only once for the same component *)
let initialise_observer_meta ~__context component =
set_trace_log_dir ~__context !Xapi_globs.trace_log_dir component ;
set_export_interval ~__context !Xapi_globs.export_interval component ;
set_max_spans ~__context !Xapi_globs.max_spans component ;
set_max_traces ~__context !Xapi_globs.max_traces component ;
set_max_file_size ~__context !Xapi_globs.max_observer_file_size component ;
set_host_id ~__context (Helpers.get_localhost_uuid ()) component ;
set_compress_tracing_files ~__context
!Xapi_globs.compress_tracing_files
component ;
init ~__context component

let initialise_observer ~__context component =
initialise_observer_meta ~__context component ;
initialise_observer_component ~__context component

let initialise ~__context =
let init_one = initialise_component ~__context in
List.iter (initialise_observer_meta ~__context) (startup_components ()) ;
Db.Observer.get_all ~__context
|> List.iter (fun self ->
Db.Observer.get_components ~__context ~self
|> List.map Component.of_string
|> observed_components_of
|> List.iter init_one
)
|> List.iter (initialise_observer_component ~__context)
) ;
Tracing.Export.set_service_name "xapi"

let set_hosts ~__context ~self ~value =
assert_valid_hosts ~__context value ;
Expand Down
2 changes: 1 addition & 1 deletion ocaml/xapi/xapi_observer.mli
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ val observed_hosts_of :

val initialise : __context:Context.t -> unit

val initialise_component : __context:Context.t -> Component.t -> unit
val initialise_observer : __context:Context.t -> Component.t -> unit

val create :
__context:Context.t
Expand Down