Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't check ledger hash at start; restore final checkpoint #13600

Merged
merged 15 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions helm/cron_jobs/mainnet-replayer-cronjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ spec:
echo "deb https://packages.cloud.google.com/apt cloud-sdk main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list;
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add - ;
apt-get update && apt-get install -y google-cloud-cli ;
ARCHIVE_DUMP_URI=$(gsutil ls gs://mina-archive-dumps/mainnet-archive-dump-*.sql.tar.gz | sort -r | head -n 1);
ARCHIVE_DUMP_URI=$(gsutil -o Credentials:gs_service_key_file=/gcloud/keyfile.json ls gs://mina-archive-dumps/mainnet-archive-dump-*.sql.tar.gz | sort -r | head -n 1);
ARCHIVE_DUMP=$(basename $ARCHIVE_DUMP_URI);
ARCHIVE_SQL=$(basename $ARCHIVE_DUMP_URI .tar.gz);
echo "Getting archive dump" $ARCHIVE_DUMP_URI;
gsutil -o Credentials:gs_service_key_file=/gcloud/keyfile.json cp $ARCHIVE_DUMP_URI . ;
MOST_RECENT_CHECKPOINT_URI=$(gsutil ls gs://mainnet-replayer-checkpoints/replayer-checkpoint-*.json | sort -r | head -n 1);
MOST_RECENT_CHECKPOINT_URI=$(gsutil -o Credentials:gs_service_key_file=/gcloud/keyfile.json ls gs://mainnet-replayer-checkpoints/replayer-checkpoint-*.json | sort -r | head -n 1);
MOST_RECENT_CHECKPOINT=$(basename $MOST_RECENT_CHECKPOINT_URI);
echo "Getting replayer checkpoint file" $MOST_RECENT_CHECKPOINT;
gsutil -o Credentials:gs_service_key_file=/gcloud/keyfile.json cp $MOST_RECENT_CHECKPOINT_URI . ;
Expand Down Expand Up @@ -73,7 +73,7 @@ spec:
env:
- name: GCLOUD_KEYFILE
value: /gcloud/keyfile.json
image: gcr.io/o1labs-192920/mina-rosetta:1.4.0beta2-fix-replayer-no-final-checkpoint-5af6dba-bullseye
image: gcr.io/o1labs-192920/mina-rosetta:1.4.0beta2-fix-replayer-start-ledger-hash-2627643-bullseye
imagePullPolicy: IfNotPresent
name: mainnet-replayer-cronjob
resources:
Expand Down
213 changes: 123 additions & 90 deletions src/app/replayer/replayer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -118,31 +118,7 @@ let create_replayer_checkpoint ~ledger ~start_slot_since_genesis :
let global_slot_hashes_tbl : (Int64.t, State_hash.t * Ledger_hash.t) Hashtbl.t =
Int64.Table.create ()

(* the starting slot may not have a block, so there may not be an entry in the table
of hashes
look at the predecessor slots until we find an entry
this search should only happen on the start slot, all other slots
come from commands in blocks, for which we have an entry in the table
*)
let get_slot_hashes ~logger slot =
let rec go curr_slot =
if Int64.is_negative curr_slot then (
[%log fatal]
"Could not find state and ledger hashes for slot %Ld, despite trying \
all predecessor slots"
slot ;
Core.exit 1 ) ;
match Hashtbl.find global_slot_hashes_tbl curr_slot with
| None ->
[%log info]
"State and ledger hashes not available at slot since genesis %Ld, \
will try predecessor slot"
curr_slot ;
go (Int64.pred curr_slot)
| Some hashes ->
hashes
in
go slot
let get_slot_hashes slot = Hashtbl.find global_slot_hashes_tbl slot

(* cache of account keys *)
let pk_tbl : (int, Account.key) Hashtbl.t = Int.Table.create ()
Expand Down Expand Up @@ -931,32 +907,39 @@ let try_slot ~logger pool slot =
Core_kernel.exit 1 ) ;
match%bind find_canonical_chain ~logger pool slot with
| None ->
go ~slot:(slot - 1) ~tries_left:(tries_left - 1)
go ~slot:(Int64.pred slot) ~tries_left:(tries_left - 1)
| Some state_hash ->
[%log info]
"Found possible canonical chain to target state hash %s at slot %d"
"Found possible canonical chain to target state hash %s at slot %Ld"
state_hash slot ;
return state_hash
in
go ~slot ~tries_left:num_tries

let write_replayer_checkpoint ~logger ~ledger ~last_global_slot_since_genesis =
(* start replaying at the slot after the one we've just finished with *)
let start_slot_since_genesis = Int64.succ last_global_slot_since_genesis in
let%bind replayer_checkpoint =
let%map input =
create_replayer_checkpoint ~ledger ~start_slot_since_genesis
let write_replayer_checkpoint ~logger ~ledger ~last_global_slot_since_genesis
~max_canonical_slot =
if Int64.( <= ) last_global_slot_since_genesis max_canonical_slot then (
(* start replaying at the slot after the one we've just finished with *)
let start_slot_since_genesis = Int64.succ last_global_slot_since_genesis in
let%map replayer_checkpoint =
let%map input =
create_replayer_checkpoint ~ledger ~start_slot_since_genesis
in
input_to_yojson input |> Yojson.Safe.pretty_to_string
in
input_to_yojson input |> Yojson.Safe.pretty_to_string
in
let checkpoint_file =
sprintf "replayer-checkpoint-%Ld.json" start_slot_since_genesis
in
[%log info] "Writing checkpoint file"
~metadata:[ ("checkpoint_file", `String checkpoint_file) ] ;
return
@@ Out_channel.with_file checkpoint_file ~f:(fun oc ->
Out_channel.output_string oc replayer_checkpoint )
let checkpoint_file =
sprintf "replayer-checkpoint-%Ld.json" start_slot_since_genesis
in
[%log info] "Writing checkpoint file"
~metadata:[ ("checkpoint_file", `String checkpoint_file) ] ;
Out_channel.with_file checkpoint_file ~f:(fun oc ->
Out_channel.output_string oc replayer_checkpoint ) )
else (
[%log info] "Not writing checkpoint file at slot %Ld, because not canonical"
last_global_slot_since_genesis
~metadata:
[ ("max_canonical_slot", `String (Int64.to_string max_canonical_slot)) ] ;
Deferred.unit )

let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
~checkpoint_interval ~continue_on_error () =
Expand Down Expand Up @@ -1022,7 +1005,7 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
~f:(fun db -> Sql.Block.get_max_slot db ())
~item:"max slot"
in
[%log info] "Maximum global slot since genesis in blocks is %d"
[%log info] "Maximum global slot since genesis in blocks is %Ldd"
max_slot ;
try_slot ~logger pool max_slot
in
Expand Down Expand Up @@ -1192,13 +1175,27 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
(Option.map checkpoint_interval_i64 ~f:(fun interval ->
Int64.(input.start_slot_since_genesis + interval) ) )
in
let%bind max_canonical_slot =
query_db pool
~f:(fun db -> Sql.Block.get_max_canonical_slot db ())
~item:"max canonical slot"
in
let incr_checkpoint_target () =
Option.iter !checkpoint_target ~f:(fun target ->
match checkpoint_interval_i64 with
| Some interval ->
[%log info] "Checkpoint target was %Ld, setting to %Ld" target
Int64.(target + interval) ;
checkpoint_target := Some Int64.(target + interval)
let new_target = Int64.(target + interval) in
if Int64.( <= ) new_target max_canonical_slot then (
[%log info] "Checkpoint target was %Ld, setting to %Ld" target
new_target ;
checkpoint_target := Some new_target )
else (
(* set target so it can't be reached *)
[%log info]
"Checkpoint target was %Ld, new target would be at \
noncanonical slot, set target to unreachable value"
target ;
checkpoint_target := Some Int64.max_value )
| None ->
failwith "Expected a checkpoint interval" )
in
Expand All @@ -1215,47 +1212,71 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
~next_epoch_ledger
in
let log_ledger_hash_after_last_slot () =
let _state_hash, expected_ledger_hash =
get_slot_hashes ~logger last_global_slot_since_genesis
in
if Ledger_hash.equal (Ledger.merkle_root ledger) expected_ledger_hash
then
[%log info]
"Applied all commands at global slot since genesis %Ld, got \
expected ledger hash"
~metadata:[ ("ledger_hash", json_ledger_hash_of_ledger ledger) ]
last_global_slot_since_genesis
else (
[%log error]
"Applied all commands at global slot since genesis %Ld, ledger \
hash differs from expected ledger hash"
~metadata:
[ ("ledger_hash", json_ledger_hash_of_ledger ledger)
; ( "expected_ledger_hash"
, Ledger_hash.to_yojson expected_ledger_hash )
]
last_global_slot_since_genesis ;
if continue_on_error then incr error_count else Core_kernel.exit 1 )
match get_slot_hashes last_global_slot_since_genesis with
| None ->
if
Int64.equal last_global_slot_since_genesis
input.start_slot_since_genesis
then
[%log info]
"No ledger hash information at start slot, not checking \
against ledger"
else (
[%log fatal]
"Missing ledger hash information for last global slot, which \
is not the start slot" ;
Core.exit 1 )
| Some (_state_hash, expected_ledger_hash) ->
if
Ledger_hash.equal
(Ledger.merkle_root ledger)
expected_ledger_hash
then
[%log info]
"Applied all commands at global slot since genesis %Ld, got \
expected ledger hash"
~metadata:
[ ("ledger_hash", json_ledger_hash_of_ledger ledger) ]
last_global_slot_since_genesis
else (
[%log error]
"Applied all commands at global slot since genesis %Ld, \
ledger hash differs from expected ledger hash"
~metadata:
[ ("ledger_hash", json_ledger_hash_of_ledger ledger)
; ( "expected_ledger_hash"
, Ledger_hash.to_yojson expected_ledger_hash )
]
last_global_slot_since_genesis ;
if continue_on_error then incr error_count
else Core_kernel.exit 1 )
in
let log_state_hash_on_next_slot curr_global_slot_since_genesis =
let state_hash, _ledger_hash =
get_slot_hashes ~logger curr_global_slot_since_genesis
in
[%log info]
~metadata:
[ ("state_hash", `String (State_hash.to_base58_check state_hash))
]
"Starting processing of commands in block with state_hash \
$state_hash at global slot since genesis %Ld"
curr_global_slot_since_genesis
match get_slot_hashes curr_global_slot_since_genesis with
| None ->
[%log fatal]
"Missing state hash information for current global slot" ;
Core.exit 1
| Some (state_hash, _ledger_hash) ->
[%log info]
~metadata:
[ ( "state_hash"
, `String (State_hash.to_base58_check state_hash) )
]
"Starting processing of commands in block with state_hash \
$state_hash at global slot since genesis %Ld"
curr_global_slot_since_genesis
in
let write_checkpoint_file () =
Option.iter !checkpoint_target ~f:(fun target ->
match !checkpoint_target with
| None ->
Deferred.unit
| Some target ->
if Int64.(last_global_slot_since_genesis >= target) then (
incr_checkpoint_target () ;
Async.don't_wait_for
(write_replayer_checkpoint ~logger ~ledger
~last_global_slot_since_genesis ) ) )
write_replayer_checkpoint ~logger ~ledger
~last_global_slot_since_genesis ~max_canonical_slot )
else Deferred.unit
in
let run_actions_on_slot_change curr_global_slot_since_genesis =
if
Expand All @@ -1265,6 +1286,7 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
log_ledger_hash_after_last_slot () ;
log_state_hash_on_next_slot curr_global_slot_since_genesis ;
write_checkpoint_file () )
else Deferred.unit
in
let combine_or_run_internal_cmds (ic : Sql.Internal_command.t)
(ics : Sql.Internal_command.t list) =
Expand All @@ -1278,7 +1300,9 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
(* combining situation 2
two fee transfer commands with same global slot since genesis, sequence number
*)
run_actions_on_slot_change ic.global_slot_since_genesis ;
let%bind () =
run_actions_on_slot_change ic.global_slot_since_genesis
in
let%bind () =
apply_combined_fee_transfer ~logger ~pool ~ledger ~set_nonces
~repair_nonces ~continue_on_error ic ic2
Expand All @@ -1288,7 +1312,9 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
~last_block_id:ic.block_id ~staking_epoch_ledger
~next_epoch_ledger
| _ ->
run_actions_on_slot_change ic.global_slot_since_genesis ;
let%bind () =
run_actions_on_slot_change ic.global_slot_since_genesis
in
let%bind () =
run_internal_command ~logger ~pool ~ledger ~set_nonces
~repair_nonces ~continue_on_error ic
Expand All @@ -1307,10 +1333,15 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
match (internal_cmds, user_cmds) with
| [], [] ->
log_ledger_hash_after_last_slot () ;
Deferred.return
(staking_epoch_ledger, staking_seed, next_epoch_ledger, next_seed)
let%map () =
write_replayer_checkpoint ~logger ~ledger
~last_global_slot_since_genesis ~max_canonical_slot
in
(staking_epoch_ledger, staking_seed, next_epoch_ledger, next_seed)
| [], uc :: ucs ->
run_actions_on_slot_change uc.global_slot_since_genesis ;
let%bind () =
run_actions_on_slot_change uc.global_slot_since_genesis
in
let%bind () =
run_user_command ~logger ~pool ~ledger ~set_nonces ~repair_nonces
~continue_on_error uc
Expand All @@ -1320,7 +1351,9 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
~last_block_id:uc.block_id ~staking_epoch_ledger
~next_epoch_ledger
| ic :: _, uc :: ucs when cmp_ic_uc ic uc > 0 ->
run_actions_on_slot_change uc.global_slot_since_genesis ;
let%bind () =
run_actions_on_slot_change uc.global_slot_since_genesis
in
let%bind () =
run_user_command ~logger ~pool ~ledger ~set_nonces ~repair_nonces
~continue_on_error uc
Expand Down Expand Up @@ -1379,8 +1412,8 @@ let main ~input_file ~output_file_opt ~archive_uri ~set_nonces ~repair_nonces
in
match input.target_epoch_ledgers_state_hash with
| None ->
[%log info] "No target epoch ledger supplied, not writing output" ;
return ()
[%log info] "No target epoch ledger hash supplied, not writing output" ;
Deferred.unit
| Some target_epoch_ledgers_state_hash -> (
match output_file_opt with
| None ->
Expand Down
19 changes: 14 additions & 5 deletions src/app/replayer/sql.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module Block_info = struct
typ
{sql| WITH RECURSIVE chain AS (

SELECT id,parent_id,global_slot_since_genesis,state_hash,ledger_hash FROM blocks b
SELECT id,parent_id,global_slot_since_genesis,state_hash,ledger_hash FROM blocks b
WHERE b.state_hash = $1

UNION ALL
Expand Down Expand Up @@ -55,8 +55,8 @@ let find_command_ids_query s =
{sql| WITH RECURSIVE chain AS (

SELECT id,parent_id,global_slot_since_genesis FROM blocks b
WHERE b.state_hash = $1
WHERE b.state_hash = $1

UNION ALL

SELECT b.id,b.parent_id,b.global_slot_since_genesis FROM blocks b
Expand Down Expand Up @@ -94,12 +94,21 @@ module Block = struct
Conn.find get_height_query block_id

let max_slot_query =
Caqti_request.find Caqti_type.unit Caqti_type.int
Caqti_request.find Caqti_type.unit Caqti_type.int64
{sql| SELECT MAX(global_slot_since_genesis) FROM blocks |sql}

let get_max_slot (module Conn : Caqti_async.CONNECTION) () =
Conn.find max_slot_query ()

let max_canonical_slot_query =
Caqti_request.find Caqti_type.unit Caqti_type.int64
{sql| SELECT MAX(global_slot_since_genesis) FROM blocks
WHERE chain_status = 'canonical'
|sql}

let get_max_canonical_slot (module Conn : Caqti_async.CONNECTION) () =
Conn.find max_canonical_slot_query ()

let next_slot_query =
Caqti_request.find_opt Caqti_type.int64 Caqti_type.int64
{sql| SELECT global_slot_since_genesis FROM blocks
Expand Down Expand Up @@ -131,7 +140,7 @@ module Block = struct
Conn.find genesis_block_id_query ()

let state_hashes_by_slot_query =
Caqti_request.collect Caqti_type.int Caqti_type.string
Caqti_request.collect Caqti_type.int64 Caqti_type.string
{sql| SELECT state_hash FROM blocks WHERE global_slot_since_genesis = $1 |sql}

let get_state_hashes_by_slot (module Conn : Caqti_async.CONNECTION) slot =
Expand Down