Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cumulative nonce map in archive processor #13406

Merged
merged 2 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/app/archive/archive_lib/processor.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1465,8 +1465,7 @@ module Block = struct
(* This is the only place we adjust the nonce_map -- we want to modify the public key associated with the fee_payer for this user-command to increment its nonce.
Note: Intentionally shadowing `nonce_map` here as we want to pass the updated map. *)
let nonce_map =
Signature_lib.Public_key.Compressed.Map.change
initial_nonce_map
Signature_lib.Public_key.Compressed.Map.change nonce_map
( Mina_base.User_command.fee_payer command
|> Account_id.public_key )
~f:(fun _ ->
Expand Down
1 change: 0 additions & 1 deletion src/app/replayer/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
(executable
(package replayer)
(name replayer)
(flags -w -32-34)
(public_name replayer)
(libraries
;; opam libraries
Expand Down
92 changes: 24 additions & 68 deletions src/app/replayer/replayer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ type output =
}
[@@deriving yojson]

type command_type = [ `Internal_command | `User_command | `Snapp_command ]

module type Get_command_ids = sig
val run :
Caqti_async.connection
Expand Down Expand Up @@ -203,47 +201,6 @@ let user_command_to_balance_block_data (user_cmd : Sql.User_command.t) =
; secondary_sequence_no = 0
}

let epoch_staking_id_of_state_hash ~logger pool state_hash =
match%map
Caqti_async.Pool.use
(fun db -> Sql.Epoch_data.get_staking_epoch_data_id db state_hash)
pool
with
| Ok staking_epoch_data_id ->
[%log info] "Found staking epoch data id for state hash %s" state_hash ;
staking_epoch_data_id
| Error msg ->
failwithf
"Error retrieving staking epoch data id for state hash %s, error: %s"
state_hash (Caqti_error.show msg) ()

let epoch_next_id_of_state_hash ~logger pool state_hash =
match%map
Caqti_async.Pool.use
(fun db -> Sql.Epoch_data.get_next_epoch_data_id db state_hash)
pool
with
| Ok next_epoch_data_id ->
[%log info] "Found next epoch data id for state hash %s" state_hash ;
next_epoch_data_id
| Error msg ->
failwithf
"Error retrieving next epoch data id for state hash %s, error: %s"
state_hash (Caqti_error.show msg) ()

let epoch_data_of_id ~logger pool epoch_data_id =
match%map
Caqti_async.Pool.use
(fun db -> Sql.Epoch_data.get_epoch_data db epoch_data_id)
pool
with
| Ok { epoch_ledger_hash; epoch_data_seed } ->
[%log info] "Found epoch data for id %d" epoch_data_id ;
({ epoch_ledger_hash; epoch_data_seed } : Sql.Epoch_data.epoch_data)
| Error msg ->
failwithf "Error retrieving epoch data for epoch data id %d, error: %s"
epoch_data_id (Caqti_error.show msg) ()

let process_block_infos_of_state_hash ~logger pool ~state_hash ~start_slot ~f =
match%bind
Caqti_async.Pool.use
Expand Down Expand Up @@ -983,13 +940,6 @@ let try_slot ~logger pool slot =
in
go ~slot ~tries_left:num_tries

let unquoted_string_of_yojson json =
(* Yojson.Safe.to_string produces double-quoted strings
remove those quotes for SQL queries
*)
let s = Yojson.Safe.to_string json in
String.sub s ~pos:1 ~len:(String.length s - 2)

let write_replayer_checkpoint ~logger ~ledger ~last_global_slot_since_genesis =
(* start replaying at the slot after the one we've just finished with *)
let start_slot_since_genesis = Int64.succ last_global_slot_since_genesis in
Expand Down Expand Up @@ -1234,6 +1184,24 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
in
[%compare: int64 * int] (tuple uc1) (tuple uc2) )
in
let checkpoint_interval_i64 =
Option.map checkpoint_interval ~f:Int64.of_int
in
let checkpoint_target =
ref
(Option.map checkpoint_interval_i64 ~f:(fun interval ->
Int64.(input.start_slot_since_genesis + interval) ) )
in
let incr_checkpoint_target () =
Option.iter !checkpoint_target ~f:(fun target ->
match checkpoint_interval_i64 with
| Some interval ->
[%log info] "Checkpoint target was %Ld, setting to %Ld" target
Int64.(target + interval) ;
checkpoint_target := Some Int64.(target + interval)
| None ->
failwith "Expected a checkpoint interval" )
in
(* apply commands in global slot, sequence order *)
let rec apply_commands (internal_cmds : Sql.Internal_command.t list)
(user_cmds : Sql.User_command.t list) ~last_global_slot_since_genesis
Expand Down Expand Up @@ -1281,15 +1249,13 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
$state_hash at global slot since genesis %Ld"
curr_global_slot_since_genesis
in
let checkpoint_interval_i64 =
Option.map checkpoint_interval ~f:Int64.of_int
in
let write_checkpoint_file curr_global_slot_since_genesis =
Option.iter checkpoint_interval_i64 ~f:(fun interval ->
if Int64.(last_global_slot_since_genesis % interval = 0L) then
let write_checkpoint_file () =
Option.iter !checkpoint_target ~f:(fun target ->
if Int64.(last_global_slot_since_genesis >= target) then (
incr_checkpoint_target () ;
Async.don't_wait_for
(write_replayer_checkpoint ~logger ~ledger
~last_global_slot_since_genesis ) )
~last_global_slot_since_genesis ) ) )
in
let run_actions_on_slot_change curr_global_slot_since_genesis =
if
Expand All @@ -1298,7 +1264,7 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
then (
log_ledger_hash_after_last_slot () ;
log_state_hash_on_next_slot curr_global_slot_since_genesis ;
write_checkpoint_file curr_global_slot_since_genesis )
write_checkpoint_file () )
in
let combine_or_run_internal_cmds (ic : Sql.Internal_command.t)
(ics : Sql.Internal_command.t list) =
Expand Down Expand Up @@ -1405,16 +1371,6 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
; ( "available_start_slot"
, `String (Int64.to_string start_slot_since_genesis) )
] ;
let%bind last_block_id =
if Int64.equal input.start_slot_since_genesis 0L then
query_db pool
~f:(fun db -> Sql.Block.genesis_block_id db)
~item:"Genesis block id"
else
query_db pool
~f:(fun db -> Sql.Block.parent_block_id db oldest_block_id)
~item:"Parent block id"
in
[%log info] "At start global slot %Ld, ledger hash"
start_slot_since_genesis
~metadata:[ ("ledger_hash", json_ledger_hash_of_ledger ledger) ] ;
Expand Down