diff --git a/src/lib/integration_test_cloud_engine/graphql_polling_log_engine.ml b/src/lib/integration_test_cloud_engine/graphql_polling_log_engine.ml index 3aa906d20d1f..bd1b720a598c 100644 --- a/src/lib/integration_test_cloud_engine/graphql_polling_log_engine.ml +++ b/src/lib/integration_test_cloud_engine/graphql_polling_log_engine.ml @@ -22,40 +22,40 @@ module Node = Kubernetes_network.Node or_error_list_fold ls ~init:[] ~f:(fun t el -> let%map h = f el in h :: t ) *) -(* - let log_filter_of_event_type ev_existential = - let open Event_type in - let (Event_type ev_type) = ev_existential in - let (module Ty) = event_type_module ev_type in - match Ty.parse with - | From_error_log _ -> - [ "jsonPayload.level=(\"Warn\" OR \"Error\" OR \"Faulty_peer\" OR \ - \"Fatal\")" - ] - | From_daemon_log (struct_id, _) -> - let filter = - Printf.sprintf "jsonPayload.event_id=\"%s\"" - (Structured_log_events.string_of_id struct_id) - in - [ filter ] - | From_puppeteer_log (id, _) -> - let filter = - Printf.sprintf "jsonPayload.puppeteer_event_type=\"%s\"" id - in - [ "jsonPayload.puppeteer_script_event=true"; filter ] - - let all_event_types_log_filter = - let event_filters = - List.map Event_type.all_event_types ~f:log_filter_of_event_type - in - let nest s = "(" ^ s ^ ")" in - let disjunction = - event_filters - |> List.map ~f:(fun filter -> - nest (filter |> List.map ~f:nest |> String.concat ~sep:" AND ") ) - |> String.concat ~sep:" OR " - in - [ disjunction ] *) + +let log_filter_of_event_type ev_existential = + let open Event_type in + let (Event_type ev_type) = ev_existential in + let (module Ty) = event_type_module ev_type in + match Ty.parse with + | From_error_log _ -> + [ "jsonPayload.level=(\"Warn\" OR \"Error\" OR \"Faulty_peer\" OR \ + \"Fatal\")" + ] + | From_daemon_log (struct_id, _) -> + let filter = + Printf.sprintf "jsonPayload.event_id=\"%s\"" + (Structured_log_events.string_of_id struct_id) + in + [ filter ] + | From_puppeteer_log (id, _) -> + let filter = + Printf.sprintf "jsonPayload.puppeteer_event_type=\"%s\"" id + in + [ "jsonPayload.puppeteer_script_event=true"; filter ] + +let all_event_types_log_filter = + let event_filters = + List.map Event_type.all_event_types ~f:log_filter_of_event_type + in + let nest s = "(" ^ s ^ ")" in + let disjunction = + event_filters + |> List.map ~f:(fun filter -> + nest (filter |> List.map ~f:nest |> String.concat ~sep:" AND ") ) + |> String.concat ~sep:" OR " + in + [ disjunction ] type t = { logger : Logger.t @@ -110,21 +110,27 @@ let parse_event_from_log_entry ~logger ~network log_entry = in (node, event) -let poll_for_logs ~logger ~network = +let get_filtered_log_entries ~logger ~network ~last_log_index_seen = let open Deferred.Or_error.Let_syntax in let allpods = Kubernetes_network.all_pods network |> Core.String.Map.data in let logs = Deferred.Or_error.List.fold allpods ~init:[] ~f:(fun acc node -> - let%bind node_logs = Kubernetes_network.Node.get_logs ~logger node in + let%bind node_logs = + Kubernetes_network.Node.get_filtered_log_entries ~logger + ~last_log_index_seen node + in return (List.append acc node_logs) ) in logs -let rec poll_for_logs_in_background ~logger ~network ~event_writer = +let rec poll_get_filtered_log_entries ~last_log_index_seen ~logger ~network + ~event_writer = if not (Pipe.is_closed event_writer) then ( [%log spam] "Polling all testnet nodes for logs" ; let%bind log_entries = - Deferred.map (poll_for_logs ~logger ~network) ~f:Or_error.ok_exn + Deferred.map + (get_filtered_log_entries ~logger ~network ~last_log_index_seen) + ~f:Or_error.ok_exn in if List.length log_entries > 0 then [%log spam] "Parsing events from $n logs" @@ -132,7 +138,10 @@ let rec poll_for_logs_in_background ~logger ~network ~event_writer = else [%log spam] "No logs were pulled" ; let%bind () = Deferred.List.iter ~how:`Sequential log_entries ~f:(fun log_entry -> - ( match log_entry |> parse_event_from_log_entry ~logger ~network with + ( match + log_entry |> Yojson.Safe.from_string + |> parse_event_from_log_entry ~logger ~network + with | Ok a -> Pipe.write_without_pushback_if_open event_writer a | Error e -> @@ -141,22 +150,56 @@ let rec poll_for_logs_in_background ~logger ~network ~event_writer = Deferred.unit ) in let%bind () = after (Time.Span.of_ms 10000.0) in - poll_for_logs_in_background ~logger ~network ~event_writer ) + let new_index = last_log_index_seen + List.length log_entries in + poll_get_filtered_log_entries ~logger ~network ~event_writer + ~last_log_index_seen:new_index ) else Deferred.unit +let start_filtered_log ~logger ~network ~log_filter = + let allpods = Kubernetes_network.all_pods network |> Core.String.Map.data in + let logs = + Deferred.Or_error.List.iter allpods ~f:(fun node -> + Kubernetes_network.Node.start_filtered_log ~logger ~log_filter node ) + in + logs + +let rec poll_start_filtered_log ~log_filter ~logger ~network = + let open Deferred.Let_syntax in + [%log spam] + "Polling all testnet nodes to get them to start their filtered logs" ; + let%bind res = start_filtered_log ~log_filter ~logger ~network in + match res with + | Ok () -> + Deferred.return () + | Error _ -> + poll_start_filtered_log ~log_filter ~logger ~network + +let poll_for_logs_in_background ~log_filter ~logger ~network ~event_writer = + [%log info] "Attempting to start the filtered log in all testnet nodes" ; + let _ = poll_start_filtered_log ~log_filter ~logger ~network in + [%log info] + "Filtered logs in all testnet nodes successfully started. Will now poll \ + for log entries" ; + let _ = + poll_get_filtered_log_entries ~logger ~network ~event_writer + ~last_log_index_seen:0 + in + [%log info] "poll_for_logs_in_background will now exit" ; + Deferred.unit + let create ~logger ~(network : Kubernetes_network.t) = let open Deferred.Or_error.Let_syntax in - (* let log_filter = - let mina_container_filter = "resource.labels.container_name=\"mina\"" in - let filters = - [ network.testnet_log_filter; mina_container_filter ] - @ all_event_types_log_filter - in - String.concat filters ~sep:"\n" - in *) + let log_filter = + let mina_container_filter = "resource.labels.container_name=\"mina\"" in + let filters = + [ network.testnet_log_filter; mina_container_filter ] + @ all_event_types_log_filter + in + filters + in let event_reader, event_writer = Pipe.create () in let background_job = - poll_for_logs_in_background ~logger ~network ~event_writer + poll_for_logs_in_background ~log_filter ~logger ~network ~event_writer in return { logger; event_reader; event_writer; background_job } diff --git a/src/lib/integration_test_cloud_engine/kubernetes_network.ml b/src/lib/integration_test_cloud_engine/kubernetes_network.ml index 117ccbc7cf0d..be5f5cf66b02 100644 --- a/src/lib/integration_test_cloud_engine/kubernetes_network.ml +++ b/src/lib/integration_test_cloud_engine/kubernetes_network.ml @@ -660,25 +660,37 @@ module Node = struct set_snark_worker ~logger t ~new_snark_pub_key |> Deferred.bind ~f:Malleable_error.or_hard_error - (* TODO: this is a complete stub of course, since the actual graphql endpoint it would hit doesn't exist yet *) - let get_logs ~logger t = - [%log info] "Getting logs of node" ~metadata:(logger_metadata t) ; - Deferred.Or_error.return - [ Yojson.Safe.from_string ""; Yojson.Safe.from_string "" ] - (* let open Deferred.Or_error.Let_syntax in - [%log info] "Getting logs of node" - ~metadata:(logger_metadata t) ; - let query_obj = Graphql.Query_latest_logs.(make @@ makeVariables ()) in - let%bind query_result_obj = - exec_graphql_request ~logger ~node:t ~query_name:"query_logs" query_obj - in - [%log info] "get_logs, finished exec_graphql_request" ; - let new_loglines = query_result_obj.newloglines |> Array.to_list in - return new_loglines *) - - (* let must_get_logs ~logger t = - get_logs ~logger t |> Deferred.bind ~f:Malleable_error.or_hard_error - *) + let start_filtered_log ~logger ~log_filter t = + let open Deferred.Or_error.Let_syntax in + [%log info] "Starting filtered log" ~metadata:(logger_metadata t) ; + let query_obj = + Graphql.StartFilteredLog.(make @@ makeVariables ~filter:log_filter ()) + in + let%bind query_result_obj = + exec_graphql_request ~logger ~node:t ~query_name:"query_logs" query_obj + in + [%log debug] "start_filtered_log, finished exec_graphql_request" ; + let returned_result = query_result_obj.startFilteredLog in + (* returned_result should just be the node echoing the log filter back, if it's successful *) + if not @@ String.is_empty returned_result then return () + else Deferred.Or_error.errorf "start_filtered_log did not seem to succeed" + + let get_filtered_log_entries ~logger ~last_log_index_seen t = + let open Deferred.Or_error.Let_syntax in + [%log info] "Getting logs from node, starting from log entry number %d" + last_log_index_seen ~metadata:(logger_metadata t) ; + let query_obj = + Graphql.GetFilteredLogEntries.( + make @@ makeVariables ~offset:last_log_index_seen ()) + in + let%bind query_result_obj = + exec_graphql_request ~logger ~node:t ~query_name:"query_logs" query_obj + in + [%log info] "get_logs, finished exec_graphql_request" ; + let new_loglines = + query_result_obj.getFilteredLogEntries |> Array.to_list + in + return new_loglines let dump_archive_data ~logger (t : t) ~data_file = (* this function won't work if `t` doesn't happen to be an archive node *) diff --git a/src/lib/integration_test_lib/intf.ml b/src/lib/integration_test_lib/intf.ml index 35c8fc5620b3..38f840c61ae5 100644 --- a/src/lib/integration_test_lib/intf.ml +++ b/src/lib/integration_test_lib/intf.ml @@ -151,10 +151,17 @@ module Engine = struct -> public_key:Signature_lib.Public_key.Compressed.t -> account_data Malleable_error.t - val get_logs : + val get_filtered_log_entries : logger:Logger.t + -> last_log_index_seen:int -> t - -> Yojson.Safe.t list Async_kernel.Deferred.Or_error.t + -> string list Async_kernel.Deferred.Or_error.t + + val start_filtered_log : + logger:Logger.t + -> log_filter:string list + -> t + -> unit Async_kernel.Deferred.Or_error.t val get_peer_id : logger:Logger.t