Skip to content

Commit

Permalink
well it compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
QuiteStochastic committed Jun 7, 2023
1 parent b4c598c commit a9ed7e4
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 70 deletions.
141 changes: 92 additions & 49 deletions src/lib/integration_test_cloud_engine/graphql_polling_log_engine.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,40 @@ module Node = Kubernetes_network.Node
or_error_list_fold ls ~init:[] ~f:(fun t el ->
let%map h = f el in
h :: t ) *)
(*
let log_filter_of_event_type ev_existential =
let open Event_type in
let (Event_type ev_type) = ev_existential in
let (module Ty) = event_type_module ev_type in
match Ty.parse with
| From_error_log _ ->
[ "jsonPayload.level=(\"Warn\" OR \"Error\" OR \"Faulty_peer\" OR \
\"Fatal\")"
]
| From_daemon_log (struct_id, _) ->
let filter =
Printf.sprintf "jsonPayload.event_id=\"%s\""
(Structured_log_events.string_of_id struct_id)
in
[ filter ]
| From_puppeteer_log (id, _) ->
let filter =
Printf.sprintf "jsonPayload.puppeteer_event_type=\"%s\"" id
in
[ "jsonPayload.puppeteer_script_event=true"; filter ]
let all_event_types_log_filter =
let event_filters =
List.map Event_type.all_event_types ~f:log_filter_of_event_type
in
let nest s = "(" ^ s ^ ")" in
let disjunction =
event_filters
|> List.map ~f:(fun filter ->
nest (filter |> List.map ~f:nest |> String.concat ~sep:" AND ") )
|> String.concat ~sep:" OR "
in
[ disjunction ] *)

let log_filter_of_event_type ev_existential =
let open Event_type in
let (Event_type ev_type) = ev_existential in
let (module Ty) = event_type_module ev_type in
match Ty.parse with
| From_error_log _ ->
[ "jsonPayload.level=(\"Warn\" OR \"Error\" OR \"Faulty_peer\" OR \
\"Fatal\")"
]
| From_daemon_log (struct_id, _) ->
let filter =
Printf.sprintf "jsonPayload.event_id=\"%s\""
(Structured_log_events.string_of_id struct_id)
in
[ filter ]
| From_puppeteer_log (id, _) ->
let filter =
Printf.sprintf "jsonPayload.puppeteer_event_type=\"%s\"" id
in
[ "jsonPayload.puppeteer_script_event=true"; filter ]

let all_event_types_log_filter =
let event_filters =
List.map Event_type.all_event_types ~f:log_filter_of_event_type
in
let nest s = "(" ^ s ^ ")" in
let disjunction =
event_filters
|> List.map ~f:(fun filter ->
nest (filter |> List.map ~f:nest |> String.concat ~sep:" AND ") )
|> String.concat ~sep:" OR "
in
[ disjunction ]

type t =
{ logger : Logger.t
Expand Down Expand Up @@ -110,29 +110,38 @@ let parse_event_from_log_entry ~logger ~network log_entry =
in
(node, event)

let poll_for_logs ~logger ~network =
let get_filtered_log_entries ~logger ~network ~last_log_index_seen =
let open Deferred.Or_error.Let_syntax in
let allpods = Kubernetes_network.all_pods network |> Core.String.Map.data in
let logs =
Deferred.Or_error.List.fold allpods ~init:[] ~f:(fun acc node ->
let%bind node_logs = Kubernetes_network.Node.get_logs ~logger node in
let%bind node_logs =
Kubernetes_network.Node.get_filtered_log_entries ~logger
~last_log_index_seen node
in
return (List.append acc node_logs) )
in
logs

let rec poll_for_logs_in_background ~logger ~network ~event_writer =
let rec poll_get_filtered_log_entries ~last_log_index_seen ~logger ~network
~event_writer =
if not (Pipe.is_closed event_writer) then (
[%log spam] "Polling all testnet nodes for logs" ;
let%bind log_entries =
Deferred.map (poll_for_logs ~logger ~network) ~f:Or_error.ok_exn
Deferred.map
(get_filtered_log_entries ~logger ~network ~last_log_index_seen)
~f:Or_error.ok_exn
in
if List.length log_entries > 0 then
[%log spam] "Parsing events from $n logs"
~metadata:[ ("n", `Int (List.length log_entries)) ]
else [%log spam] "No logs were pulled" ;
let%bind () =
Deferred.List.iter ~how:`Sequential log_entries ~f:(fun log_entry ->
( match log_entry |> parse_event_from_log_entry ~logger ~network with
( match
log_entry |> Yojson.Safe.from_string
|> parse_event_from_log_entry ~logger ~network
with
| Ok a ->
Pipe.write_without_pushback_if_open event_writer a
| Error e ->
Expand All @@ -141,22 +150,56 @@ let rec poll_for_logs_in_background ~logger ~network ~event_writer =
Deferred.unit )
in
let%bind () = after (Time.Span.of_ms 10000.0) in
poll_for_logs_in_background ~logger ~network ~event_writer )
let new_index = last_log_index_seen + List.length log_entries in
poll_get_filtered_log_entries ~logger ~network ~event_writer
~last_log_index_seen:new_index )
else Deferred.unit

let start_filtered_log ~logger ~network ~log_filter =
let allpods = Kubernetes_network.all_pods network |> Core.String.Map.data in
let logs =
Deferred.Or_error.List.iter allpods ~f:(fun node ->
Kubernetes_network.Node.start_filtered_log ~logger ~log_filter node )
in
logs

let rec poll_start_filtered_log ~log_filter ~logger ~network =
let open Deferred.Let_syntax in
[%log spam]
"Polling all testnet nodes to get them to start their filtered logs" ;
let%bind res = start_filtered_log ~log_filter ~logger ~network in
match res with
| Ok () ->
Deferred.return ()
| Error _ ->
poll_start_filtered_log ~log_filter ~logger ~network

let poll_for_logs_in_background ~log_filter ~logger ~network ~event_writer =
[%log info] "Attempting to start the filtered log in all testnet nodes" ;
let _ = poll_start_filtered_log ~log_filter ~logger ~network in
[%log info]
"Filtered logs in all testnet nodes successfully started. Will now poll \
for log entries" ;
let _ =
poll_get_filtered_log_entries ~logger ~network ~event_writer
~last_log_index_seen:0
in
[%log info] "poll_for_logs_in_background will now exit" ;
Deferred.unit

let create ~logger ~(network : Kubernetes_network.t) =
let open Deferred.Or_error.Let_syntax in
(* let log_filter =
let mina_container_filter = "resource.labels.container_name=\"mina\"" in
let filters =
[ network.testnet_log_filter; mina_container_filter ]
@ all_event_types_log_filter
in
String.concat filters ~sep:"\n"
in *)
let log_filter =
let mina_container_filter = "resource.labels.container_name=\"mina\"" in
let filters =
[ network.testnet_log_filter; mina_container_filter ]
@ all_event_types_log_filter
in
filters
in
let event_reader, event_writer = Pipe.create () in
let background_job =
poll_for_logs_in_background ~logger ~network ~event_writer
poll_for_logs_in_background ~log_filter ~logger ~network ~event_writer
in
return { logger; event_reader; event_writer; background_job }

Expand Down
50 changes: 31 additions & 19 deletions src/lib/integration_test_cloud_engine/kubernetes_network.ml
Original file line number Diff line number Diff line change
Expand Up @@ -660,25 +660,37 @@ module Node = struct
set_snark_worker ~logger t ~new_snark_pub_key
|> Deferred.bind ~f:Malleable_error.or_hard_error

(* TODO: this is a complete stub of course, since the actual graphql endpoint it would hit doesn't exist yet *)
let get_logs ~logger t =
[%log info] "Getting logs of node" ~metadata:(logger_metadata t) ;
Deferred.Or_error.return
[ Yojson.Safe.from_string ""; Yojson.Safe.from_string "" ]
(* let open Deferred.Or_error.Let_syntax in
[%log info] "Getting logs of node"
~metadata:(logger_metadata t) ;
let query_obj = Graphql.Query_latest_logs.(make @@ makeVariables ()) in
let%bind query_result_obj =
exec_graphql_request ~logger ~node:t ~query_name:"query_logs" query_obj
in
[%log info] "get_logs, finished exec_graphql_request" ;
let new_loglines = query_result_obj.newloglines |> Array.to_list in
return new_loglines *)

(* let must_get_logs ~logger t =
get_logs ~logger t |> Deferred.bind ~f:Malleable_error.or_hard_error
*)
let start_filtered_log ~logger ~log_filter t =
let open Deferred.Or_error.Let_syntax in
[%log info] "Starting filtered log" ~metadata:(logger_metadata t) ;
let query_obj =
Graphql.StartFilteredLog.(make @@ makeVariables ~filter:log_filter ())
in
let%bind query_result_obj =
exec_graphql_request ~logger ~node:t ~query_name:"query_logs" query_obj
in
[%log debug] "start_filtered_log, finished exec_graphql_request" ;
let returned_result = query_result_obj.startFilteredLog in
(* returned_result should just be the node echoing the log filter back, if it's successful *)
if not @@ String.is_empty returned_result then return ()
else Deferred.Or_error.errorf "start_filtered_log did not seem to succeed"

let get_filtered_log_entries ~logger ~last_log_index_seen t =
let open Deferred.Or_error.Let_syntax in
[%log info] "Getting logs from node, starting from log entry number %d"
last_log_index_seen ~metadata:(logger_metadata t) ;
let query_obj =
Graphql.GetFilteredLogEntries.(
make @@ makeVariables ~offset:last_log_index_seen ())
in
let%bind query_result_obj =
exec_graphql_request ~logger ~node:t ~query_name:"query_logs" query_obj
in
[%log info] "get_logs, finished exec_graphql_request" ;
let new_loglines =
query_result_obj.getFilteredLogEntries |> Array.to_list
in
return new_loglines

let dump_archive_data ~logger (t : t) ~data_file =
(* this function won't work if `t` doesn't happen to be an archive node *)
Expand Down
11 changes: 9 additions & 2 deletions src/lib/integration_test_lib/intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,17 @@ module Engine = struct
-> public_key:Signature_lib.Public_key.Compressed.t
-> account_data Malleable_error.t

val get_logs :
val get_filtered_log_entries :
logger:Logger.t
-> last_log_index_seen:int
-> t
-> Yojson.Safe.t list Async_kernel.Deferred.Or_error.t
-> string list Async_kernel.Deferred.Or_error.t

val start_filtered_log :
logger:Logger.t
-> log_filter:string list
-> t
-> unit Async_kernel.Deferred.Or_error.t

val get_peer_id :
logger:Logger.t
Expand Down

0 comments on commit a9ed7e4

Please sign in to comment.