From 31b3e37fd388cf34fe887352b42ddc21d99e2d5c Mon Sep 17 00:00:00 2001 From: hangleang Date: Fri, 1 Nov 2024 14:14:45 +0700 Subject: [PATCH] fix: rebase conflicts, reconstruction runtime error fix(wip): resolved conflicts after rebase fix(wip): resolved code conflicted fix: failed kzg_commitment_inclusion_proof tests and remove usued imports fix: subscribe data column subnets if eip7594 scheduled fix: kzg_inclusion_proofs in deneb and electra fix: validate block with DA checking update snapshot-tests, eth2_libp2p fix: missing check in spec test chore: sort custody subnets fix: reconstruction runtime error --- Cargo.lock | 418 ++++++++---------- Cargo.toml | 15 +- ad_hoc_bench/src/main.rs | 2 +- .../src/attestation_verifier.rs | 5 +- binary_utils/src/lib.rs | 14 +- block_producer/src/block_producer.rs | 2 +- consensus-spec-tests | 2 +- eip_7594/src/lib.rs | 126 ++++-- eth2_cache_utils/src/generic.rs | 2 +- eth2_libp2p | 2 +- fork_choice_control/Cargo.toml | 2 - fork_choice_control/src/controller.rs | 19 +- fork_choice_control/src/messages.rs | 1 - fork_choice_control/src/misc.rs | 7 +- fork_choice_control/src/mutator.rs | 157 +++---- fork_choice_control/src/spec_tests.rs | 65 ++- fork_choice_control/src/storage.rs | 4 +- fork_choice_control/src/tasks.rs | 6 +- fork_choice_store/Cargo.toml | 2 +- fork_choice_store/src/error.rs | 11 +- fork_choice_store/src/lib.rs | 2 +- fork_choice_store/src/misc.rs | 26 +- fork_choice_store/src/store.rs | 106 +++-- grandine-snapshot-tests | 2 +- grandine/Cargo.toml | 1 + grandine/src/grandine_args.rs | 1 - grandine/src/main.rs | 2 - helper_functions/src/error.rs | 2 + helper_functions/src/misc.rs | 9 +- helper_functions/src/signing.rs | 18 +- http_api/Cargo.toml | 2 +- http_api/src/standard.rs | 50 ++- metrics/Cargo.toml | 1 + operation_pools/Cargo.toml | 3 - .../attestation_packer.rs | 2 +- p2p/Cargo.toml | 1 - p2p/src/block_sync_service.rs | 63 ++- p2p/src/messages.rs | 1 + p2p/src/network.rs | 340 ++++---------- p2p/src/sync_manager.rs | 185 ++++---- prometheus_metrics/src/metrics.rs | 29 +- runtime/src/runtime.rs | 20 +- slashing_protection/src/lib.rs | 2 +- types/src/config.rs | 19 +- types/src/deneb/consts.rs | 6 +- types/src/eip7594.rs | 12 +- validator/Cargo.toml | 2 - validator/src/validator.rs | 62 --- 48 files changed, 826 insertions(+), 1005 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f243d1e1..a1060ab0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -523,34 +523,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" -[[package]] -name = "axum" -version = "0.6.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" -dependencies = [ - "async-trait", - "axum-core 0.3.4", - "bitflags 1.3.2", - "bytes", - "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.31", - "itoa", - "matchit", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "rustversion", - "serde", - "sync_wrapper 0.1.2", - "tower 0.4.13", - "tower-layer", - "tower-service", -] - [[package]] name = "axum" version = "0.7.7" @@ -558,7 +530,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" dependencies = [ "async-trait", - "axum-core 0.4.5", + "axum-core", "bytes", "futures-util", "http 1.1.0", @@ -579,29 +551,12 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.1", "tokio", - "tower 0.5.1", + "tower", "tower-layer", "tower-service", "tracing", ] -[[package]] -name = "axum-core" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "mime", - "rustversion", - "tower-layer", - "tower-service", -] - [[package]] name = "axum-core" version = "0.4.5" @@ -629,8 +584,8 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73c3220b188aea709cf1b6c5f9b01c3bd936bb08bd2b5184a12b35ac8131b1f9" dependencies = [ - "axum 0.7.7", - "axum-core 0.4.5", + "axum", + "axum-core", "bytes", "futures-util", "headers", @@ -641,7 +596,7 @@ dependencies = [ "pin-project-lite", "serde", "serde_html_form", - "tower 0.5.1", + "tower", "tower-layer", "tower-service", "tracing", @@ -779,7 +734,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "lazy_static", "lazycell", "proc-macro2 1.0.89", @@ -1043,7 +998,7 @@ dependencies = [ "itertools 0.12.1", "log", "prometheus_metrics", - "reqwest 0.11.27", + "reqwest", "serde", "serde_json", "serde_utils", @@ -2068,18 +2023,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "educe" -version = "0.4.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f0042ff8246a363dbe77d2ceedb073339e85a804b9a47636c6e016a9a32c05f" -dependencies = [ - "enum-ordinalize", - "proc-macro2 1.0.89", - "quote 1.0.37", - "syn 1.0.109", -] - [[package]] name = "eip_2335" version = "0.0.0" @@ -2248,19 +2191,6 @@ dependencies = [ "syn 2.0.85", ] -[[package]] -name = "enum-ordinalize" -version = "3.1.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bf1fa3f06bbff1ea5b1a9c7b14aa992a39657db60a2759457328d7e058f49ee" -dependencies = [ - "num-bigint", - "num-traits", - "proc-macro2 1.0.89", - "quote 1.0.37", - "syn 2.0.85", -] - [[package]] name = "enumset" version = "1.1.5" @@ -2357,7 +2287,7 @@ dependencies = [ "itertools 0.12.1", "log", "prometheus_metrics", - "reqwest 0.11.27", + "reqwest", "ssz", "std_ext", "thiserror", @@ -2391,7 +2321,7 @@ dependencies = [ "memoffset", "panics", "prometheus_metrics", - "reqwest 0.11.27", + "reqwest", "serde", "serde_json", "ssz", @@ -2426,9 +2356,7 @@ dependencies = [ "async-channel 1.9.0", "async-std", "asynchronous-codec", - "base64 0.22.1", "bls", - "byteorder", "bytes", "delay_map 0.4.0", "dirs", @@ -2440,16 +2368,13 @@ dependencies = [ "exit-future", "fnv", "futures", - "futures-ticker", "futures-timer", - "getrandom", "gossipsub", "grandine_version", "helper_functions", "hex", "hex_fmt", "itertools 0.12.1", - "lazy_static", "libp2p", "libp2p-mplex", "lru", @@ -2477,7 +2402,6 @@ dependencies = [ "tokio", "tokio-io-timeout", "tokio-util", - "tracing", "try_from_iterator", "typenum", "types", @@ -2704,9 +2628,7 @@ dependencies = [ "derive_more 1.0.0", "drain_filter_polyfill", "duplicate", - "educe", "eip_7594", - "eth2_cache_utils", "eth2_libp2p", "execution_engine", "factory", @@ -2726,7 +2648,7 @@ dependencies = [ "panics", "parking_lot 0.12.3", "prometheus_metrics", - "reqwest 0.11.27", + "reqwest", "serde", "serde-aux", "serde_utils", @@ -2756,7 +2678,6 @@ dependencies = [ "crossbeam-skiplist", "derivative", "derive_more 1.0.0", - "educe", "eip_7594", "eth2_libp2p", "execution_engine", @@ -2769,6 +2690,7 @@ dependencies = [ "kzg_utils", "log", "prometheus_metrics", + "serde", "ssz", "state_cache", "static_assertions", @@ -3091,7 +3013,6 @@ dependencies = [ "regex", "serde", "sha2 0.10.8", - "smallvec", "tracing", "void", "web-time", @@ -3140,7 +3061,7 @@ dependencies = [ "predefined_chains", "prometheus_metrics", "rayon", - "reqwest 0.11.27", + "reqwest", "reth-libmdbx", "runtime", "serde", @@ -3202,6 +3123,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.6.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.4.1" @@ -3577,12 +3517,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-range-header" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" - [[package]] name = "http_api" version = "0.0.0" @@ -3590,7 +3524,7 @@ dependencies = [ "anyhow", "arithmetic", "attestation_verifier", - "axum 0.7.7", + "axum", "axum-extra", "block_producer", "bls", @@ -3605,7 +3539,6 @@ dependencies = [ "enum-iterator", "eth1", "eth1_api", - "eth2_cache_utils", "eth2_libp2p", "execution_engine", "factory", @@ -3622,6 +3555,7 @@ dependencies = [ "keymanager", "liveness_tracker", "log", + "mediatype", "metrics", "mime", "num_cpus", @@ -3631,7 +3565,7 @@ dependencies = [ "parse-display", "predefined_chains", "prometheus_metrics", - "reqwest 0.11.27", + "reqwest", "serde", "serde-aux", "serde_json", @@ -3665,7 +3599,7 @@ name = "http_api_utils" version = "0.0.0" dependencies = [ "anyhow", - "axum 0.7.7", + "axum", "features", "hex-literal", "http-body-util", @@ -3676,7 +3610,7 @@ dependencies = [ "prometheus_metrics", "test-case", "thiserror", - "tower 0.4.13", + "tower", "tower-http", "tracing", "types", @@ -3738,7 +3672,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -3761,6 +3695,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "httparse", @@ -3773,16 +3708,20 @@ dependencies = [ ] [[package]] -name = "hyper-tls" -version = "0.5.0" +name = "hyper-rustls" +version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ - "bytes", - "hyper 0.14.31", - "native-tls", + "futures-util", + "http 1.1.0", + "hyper 1.5.0", + "hyper-util", + "rustls", + "rustls-pki-types", "tokio", - "tokio-native-tls", + "tokio-rustls", + "tower-service", ] [[package]] @@ -4023,7 +3962,7 @@ dependencies = [ "ipnet", "log", "rtnetlink", - "system-configuration", + "system-configuration 0.5.1", "tokio", "windows 0.51.1", ] @@ -4373,7 +4312,7 @@ dependencies = [ "hex-literal", "itertools 0.12.1", "log", - "reqwest 0.11.27", + "reqwest", "serde", "serde_json", "signer", @@ -4481,25 +4420,6 @@ version = "0.2.161" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" -[[package]] -name = "libffi" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce826c243048e3d5cec441799724de52e2d42f820468431fc3fceee2341871e2" -dependencies = [ - "libc", - "libffi-sys", -] - -[[package]] -name = "libffi-sys" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f36115160c57e8529781b4183c2bb51fdc1f6d6d1ed345591d84be7703befb3c" -dependencies = [ - "cc", -] - [[package]] name = "libloading" version = "0.8.5" @@ -4507,7 +4427,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -5123,6 +5043,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "mediatype" +version = "0.19.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8878cd8d1b3c8c8ae4b2ba0a36652b7cf192f618a599a7fbdfa25cffd4ea72dd" + [[package]] name = "memchr" version = "2.7.4" @@ -5143,7 +5069,7 @@ name = "metrics" version = "0.0.0" dependencies = [ "anyhow", - "axum 0.7.7", + "axum", "bls", "build-time", "chrono", @@ -5157,12 +5083,13 @@ dependencies = [ "http_api_utils", "jemalloc-ctl", "log", + "num_threads", "p2p", "prometheus", "prometheus-client", "prometheus_metrics", "psutil", - "reqwest 0.11.27", + "reqwest", "serde", "serde_json", "std_ext", @@ -5473,6 +5400,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9" +dependencies = [ + "libc", +] + [[package]] name = "object" version = "0.36.5" @@ -5579,7 +5515,6 @@ dependencies = [ "conv", "dedicated_executor", "eth1_api", - "eth2_cache_utils", "eth2_libp2p", "features", "fork_choice_control", @@ -5636,7 +5571,6 @@ dependencies = [ "enum-iterator", "eth1_api", "eth2_libp2p", - "execution_engine", "features", "fork_choice_control", "futures", @@ -6531,46 +6465,6 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a8614ee435691de62bcffcf4a66d91b3594bf1428a5722e79103249a095690" -[[package]] -name = "reqwest" -version = "0.11.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" -dependencies = [ - "base64 0.21.7", - "bytes", - "encoding_rs", - "futures-core", - "futures-util", - "h2", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.31", - "hyper-tls 0.5.0", - "ipnet", - "js-sys", - "log", - "mime", - "native-tls", - "once_cell", - "percent-encoding", - "pin-project-lite", - "rustls-pemfile 1.0.4", - "serde", - "serde_json", - "serde_urlencoded", - "sync_wrapper 0.1.2", - "system-configuration", - "tokio", - "tokio-native-tls", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", - "winreg", -] - [[package]] name = "reqwest" version = "0.12.9" @@ -6579,13 +6473,17 @@ checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", + "futures-channel", "futures-core", "futures-util", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "http-body-util", "hyper 1.5.0", - "hyper-tls 0.6.0", + "hyper-rustls", + "hyper-tls", "hyper-util", "ipnet", "js-sys", @@ -6595,11 +6493,12 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls-pemfile 2.2.0", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", "sync_wrapper 1.0.1", + "system-configuration 0.6.1", "tokio", "tokio-native-tls", "tower-service", @@ -6622,28 +6521,26 @@ dependencies = [ [[package]] name = "reth-libmdbx" -version = "0.1.0-alpha.16" -source = "git+https://github.com/paradigmxyz/reth.git?rev=2d01f3608697eed05357fb847e25ad33ab59d702#2d01f3608697eed05357fb847e25ad33ab59d702" +version = "1.0.6" +source = "git+https://github.com/paradigmxyz/reth.git?rev=c228fe15808c3acbf18dc3af1a03ef5cbdcda07a#c228fe15808c3acbf18dc3af1a03ef5cbdcda07a" dependencies = [ "bitflags 2.6.0", "byteorder", - "derive_more 0.99.18", + "derive_more 1.0.0", "indexmap 2.6.0", - "libc", - "libffi", "parking_lot 0.12.3", "reth-mdbx-sys", "thiserror", + "tracing", ] [[package]] name = "reth-mdbx-sys" -version = "0.1.0-alpha.16" -source = "git+https://github.com/paradigmxyz/reth.git?rev=2d01f3608697eed05357fb847e25ad33ab59d702#2d01f3608697eed05357fb847e25ad33ab59d702" +version = "1.0.6" +source = "git+https://github.com/paradigmxyz/reth.git?rev=c228fe15808c3acbf18dc3af1a03ef5cbdcda07a#c228fe15808c3acbf18dc3af1a03ef5cbdcda07a" dependencies = [ "bindgen 0.69.5", "cc", - "libc", ] [[package]] @@ -6876,15 +6773,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-pemfile" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" -dependencies = [ - "base64 0.21.7", -] - [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -7119,11 +7007,11 @@ dependencies = [ [[package]] name = "serde_qs" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0431a35568651e363364210c91983c1da5eb29404d9f0928b67d4ebcfa7d330c" +checksum = "cd34f36fe4c5ba9654417139a9b3a20d2e1de6012ee678ad14d240c22c78d8d6" dependencies = [ - "axum 0.6.20", + "axum", "futures", "percent-encoding", "serde", @@ -7335,7 +7223,7 @@ dependencies = [ "log", "prometheus_metrics", "rayon", - "reqwest 0.11.27", + "reqwest", "serde", "serde-aux", "serde_json", @@ -7839,17 +7727,16 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.30.13" +version = "0.31.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a5b4ddaee55fb2bea2bf0e5000747e5f5c0de765e5a5ff87f4cd106439f4bb3" +checksum = "355dbe4f8799b304b05e1b0f05fc59b2a18d36645cf169607da45bde2f69a1be" dependencies = [ - "cfg-if", "core-foundation-sys", "libc", + "memchr", "ntapi", - "once_cell", "rayon", - "windows 0.52.0", + "windows 0.57.0", ] [[package]] @@ -7860,7 +7747,18 @@ checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ "bitflags 1.3.2", "core-foundation", - "system-configuration-sys", + "system-configuration-sys 0.5.0", +] + +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags 2.6.0", + "core-foundation", + "system-configuration-sys 0.6.0", ] [[package]] @@ -7873,6 +7771,16 @@ dependencies = [ "libc", ] +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "take_mut" version = "0.2.2" @@ -8126,6 +8034,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.16" @@ -8187,22 +8106,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "tower" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" -dependencies = [ - "futures-core", - "futures-util", - "pin-project", - "pin-project-lite", - "tokio", - "tower-layer", - "tower-service", - "tracing", -] - [[package]] name = "tower" version = "0.5.1" @@ -8221,17 +8124,15 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.4.4" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ "bitflags 2.6.0", "bytes", - "futures-core", - "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "http-range-header", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", "pin-project-lite", "tower-layer", "tower-service", @@ -8539,7 +8440,7 @@ name = "validator" version = "0.0.0" dependencies = [ "anyhow", - "axum 0.7.7", + "axum", "axum-extra", "block_producer", "bls", @@ -8550,9 +8451,7 @@ dependencies = [ "derive_more 1.0.0", "directories", "doppelganger_protection", - "educe", "eip_7594", - "eth1", "eth1_api", "eth2_libp2p", "factory", @@ -8575,7 +8474,7 @@ dependencies = [ "prometheus_metrics", "rand", "rayon", - "reqwest 0.11.27", + "reqwest", "serde", "serde_json", "serde_utils", @@ -8792,7 +8691,7 @@ dependencies = [ "once_cell", "parking_lot 0.12.3", "pin-project", - "reqwest 0.12.9", + "reqwest", "rlp", "secp256k1", "serde", @@ -8867,11 +8766,11 @@ dependencies = [ [[package]] name = "windows" -version = "0.52.0" +version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143" dependencies = [ - "windows-core 0.52.0", + "windows-core 0.57.0", "windows-targets 0.52.6", ] @@ -8893,17 +8792,60 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result 0.1.2", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-implement" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" +dependencies = [ + "proc-macro2 1.0.89", + "quote 1.0.37", + "syn 2.0.85", +] + +[[package]] +name = "windows-interface" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" +dependencies = [ + "proc-macro2 1.0.89", + "quote 1.0.37", + "syn 2.0.85", +] + [[package]] name = "windows-registry" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" dependencies = [ - "windows-result", + "windows-result 0.2.0", "windows-strings", "windows-targets 0.52.6", ] +[[package]] +name = "windows-result" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-result" version = "0.2.0" @@ -8919,7 +8861,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" dependencies = [ - "windows-result", + "windows-result 0.2.0", "windows-targets 0.52.6", ] diff --git a/Cargo.toml b/Cargo.toml index 52aa0303..23d187d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -353,12 +353,12 @@ jemalloc-ctl = '0.5.4' jemallocator = '0.5.4' jwt-simple = { version = '0.12.6', default-features = false, features = ['pure-rust'] } kzg = { git = 'https://github.com/grandinetech/rust-kzg.git', branch = 'integration-raw-2' } -lazy_static = '1.4.0' -libmdbx = { git = 'https://github.com/paradigmxyz/reth.git', package = 'reth-libmdbx', rev = '2d01f3608697eed05357fb847e25ad33ab59d702' } +libmdbx = { git = 'https://github.com/paradigmxyz/reth.git', package = 'reth-libmdbx', rev = 'c228fe15808c3acbf18dc3af1a03ef5cbdcda07a' } libp2p = { version = '0.54', default-features = false, features = ['metrics', 'dns', 'ecdsa', 'identify', 'macros', 'noise', 'plaintext', 'secp256k1', 'serde', 'tcp', 'tokio', 'yamux', 'quic', 'upnp'] } libp2p-mplex = '0.42.0' log = '0.4.20' lru = '0.12.2' +mediatype = '0.19' memoffset = '0.9.0' mime = '0.3.17' nonzero_ext = '0.3.0' @@ -366,6 +366,7 @@ num-bigint = '0.4.4' num-integer = '0.1.45' num-traits = '0.2.17' num_cpus = '1.16.0' +num_threads = '0.1' once_cell = '1.19.0' openssl = '0.10.63' parking_lot = '0.12.1' @@ -389,7 +390,7 @@ rc-box = '1.2.0' refinery = { version = '0.8.12', features = ['rusqlite']} regex = '1.10.3' replace_with = '0.1.7' -reqwest = { version = '0.11.24', features = ['blocking', 'json', 'native-tls-vendored'] } +reqwest = { version = '0.12', features = ['blocking', 'json', 'native-tls-vendored'] } rusqlite = { version = '0.30.0', features = ['bundled'] } rust-kzg-blst = { git = 'https://github.com/grandinetech/rust-kzg.git', branch = 'integration-raw-2' } scrypt = '0.11.0' @@ -397,7 +398,7 @@ semver = '1.0.21' serde = { version = '1.0.196', features = ['derive', 'rc'] } serde-aux = '4.4.0' serde_json = { version = '1.0.113', features = ['preserve_order'] } -serde_qs = { version = '0.12.0', features = ['axum'] } +serde_qs = { version = '0.13', features = ['axum'] } serde_repr = '0.1.18' serde_with = '3.6.0' serde_yaml = '0.9.31' @@ -414,7 +415,7 @@ snap = '1.1.1' static_assertions = '1.1.0' strum = { version = '0.26.1', features = ['derive'] } syn = { version = '2.0.48', features = ['full'] } -sysinfo = '0.30.5' +sysinfo = '0.31' tap = '1.0.1' tempfile = '3.9.0' test-case = '3.3.1' @@ -426,8 +427,8 @@ tokio = { version = '1.36.0', features = ['fs', 'macros', 'rt-multi-thread', 'si tokio-io-timeout = '1.2.0' tokio-stream = { version = '0.1.14', features = ['sync'] } tokio-util = { version = "0.7", features = ["codec", "compat", "time"] } -tower = { version = '0.4.13', features = ['timeout'] } -tower-http = { version = '0.4.4', features = ['cors', 'trace'] } +tower = { version = '0.5', features = ['timeout'] } +tower-http = { version = '0.5', features = ['cors', 'trace'] } tracing = '0.1.40' triomphe = '0.1.11' tynm = '0.1.9' diff --git a/ad_hoc_bench/src/main.rs b/ad_hoc_bench/src/main.rs index 3254d347..275a4553 100644 --- a/ad_hoc_bench/src/main.rs +++ b/ad_hoc_bench/src/main.rs @@ -245,7 +245,7 @@ impl From for BlockParameters { } fn main() -> Result<()> { - binary_utils::initialize_logger(module_path!(), false)?; + binary_utils::initialize_logger(module_path!(), false, false)?; binary_utils::initialize_rayon()?; print_jemalloc_stats()?; diff --git a/attestation_verifier/src/attestation_verifier.rs b/attestation_verifier/src/attestation_verifier.rs index 970d96d3..08e08b0c 100644 --- a/attestation_verifier/src/attestation_verifier.rs +++ b/attestation_verifier/src/attestation_verifier.rs @@ -444,7 +444,7 @@ fn attestation_batch_triples<'a, P: Preset>( .map(|attestation| { let triple = match attestation { Attestation::Phase0(attestation) => { - let indexed_attestation = phase0::get_indexed_attestation(state, attestation)?; + let indexed_attestation = phase0::get_indexed_attestation(state, &attestation)?; let mut triple = Triple::default(); @@ -458,7 +458,8 @@ fn attestation_batch_triples<'a, P: Preset>( triple } Attestation::Electra(attestation) => { - let indexed_attestation = electra::get_indexed_attestation(state, attestation)?; + let indexed_attestation = + electra::get_indexed_attestation(state, &attestation)?; let mut triple = Triple::default(); diff --git a/binary_utils/src/lib.rs b/binary_utils/src/lib.rs index 48836d81..1c637ffb 100644 --- a/binary_utils/src/lib.rs +++ b/binary_utils/src/lib.rs @@ -7,7 +7,11 @@ use log::LevelFilter; use logging::PEER_LOG_METRICS; use rayon::ThreadPoolBuilder; -pub fn initialize_logger(module_path: &str, always_write_style: bool) -> Result<()> { +pub fn initialize_logger( + module_path: &str, + always_write_style: bool, + parse_env: bool, +) -> Result<()> { let mut builder = Builder::new(); builder @@ -64,6 +68,14 @@ pub fn initialize_logger(module_path: &str, always_write_style: bool) -> Result< builder.write_style(WriteStyle::Always); } + if parse_env { + let env = Env::new() + .filter("GRANDINE_LOG") + .write_style("GRANDINE_LOG_STYLE"); + + builder.parse_env(env); + } + let env = Env::new() .filter("GRANDINE_LOG") .write_style("GRANDINE_LOG_STYLE"); diff --git a/block_producer/src/block_producer.rs b/block_producer/src/block_producer.rs index 158802df..d525d93c 100644 --- a/block_producer/src/block_producer.rs +++ b/block_producer/src/block_producer.rs @@ -837,7 +837,7 @@ impl BlockBuildContext { .filter_map(|attestation| { operation_pools::convert_to_electra_attestation(attestation).ok() }) - .chunk_by(|attestation| attestation.data); + .group_by(|attestation| attestation.data); let attestations = attestations .into_iter() diff --git a/consensus-spec-tests b/consensus-spec-tests index c38c07c5..09c1e41b 160000 --- a/consensus-spec-tests +++ b/consensus-spec-tests @@ -1 +1 @@ -Subproject commit c38c07c525c3919ec2a84db2c9674e81a6c21eb3 +Subproject commit 09c1e41b722216efa9b1c6390169b984f0870052 diff --git a/eip_7594/src/lib.rs b/eip_7594/src/lib.rs index 7bc7c172..e8731ba6 100644 --- a/eip_7594/src/lib.rs +++ b/eip_7594/src/lib.rs @@ -16,12 +16,13 @@ use try_from_iterator::TryFromIterator as _; use typenum::Unsigned; use types::{ combined::SignedBeaconBlock, + config::Config, deneb::primitives::{Blob, KzgProof}, eip7594::{ - BlobCommitmentsInclusionProof, Cell, ColumnIndex, DataColumnSidecar, MatrixEntry, - NumberOfColumns, DATA_COLUMN_SIDECAR_SUBNET_COUNT, SAMPLES_PER_SLOT, + BlobCommitmentsInclusionProof, Cell, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, + MatrixEntry, NumberOfColumns, }, - phase0::primitives::{NodeId, SubnetId}, + phase0::primitives::NodeId, preset::Preset, traits::{PostDenebBeaconBlockBody, PostElectraBeaconBlockBody, SignedBeaconBlock as _}, }; @@ -128,9 +129,9 @@ pub fn verify_kzg_proofs( data_column_sidecar: &DataColumnSidecar

, metrics: &Option>, ) -> Result { - if let Some(metrics) = metrics.as_ref() { - let _timer = metrics.data_column_sidecar_verification_times.start_timer(); - } + let _timer = metrics + .as_ref() + .map(|metrics| metrics.data_column_sidecar_verification_times.start_timer()); let DataColumnSidecar { index, @@ -174,11 +175,11 @@ pub fn verify_sidecar_inclusion_proof( data_column_sidecar: &DataColumnSidecar

, metrics: &Option>, ) -> bool { - if let Some(metrics) = metrics.as_ref() { - let _timer = metrics + let _timer = metrics.as_ref().map(|metrics| { + metrics .data_column_sidecar_inclusion_proof_verification - .start_timer(); - } + .start_timer() + }); let DataColumnSidecar { kzg_commitments, @@ -202,8 +203,9 @@ pub fn verify_sidecar_inclusion_proof( pub fn get_custody_subnets( node_id: NodeId, custody_subnet_count: u64, -) -> impl Iterator { - assert!(custody_subnet_count <= DATA_COLUMN_SIDECAR_SUBNET_COUNT); + config: &Arc, +) -> impl Iterator { + assert!(custody_subnet_count <= config.data_column_sidecar_subnet_count); let mut subnet_ids = vec![]; let mut current_id = node_id; @@ -222,7 +224,7 @@ pub fn get_custody_subnets( ]; let output_prefix_u64 = u64::from_le_bytes(output_prefix); - let subnet_id = output_prefix_u64 % DATA_COLUMN_SIDECAR_SUBNET_COUNT; + let subnet_id = output_prefix_u64 % config.data_column_sidecar_subnet_count; if !subnet_ids.contains(&subnet_id) { subnet_ids.push(subnet_id); @@ -235,23 +237,29 @@ pub fn get_custody_subnets( current_id = current_id + Uint256::one(); } - subnet_ids.into_iter() + subnet_ids.into_iter().sorted() } pub fn get_custody_columns( node_id: NodeId, custody_subnet_count: u64, + config: &Arc, ) -> impl Iterator { - get_custody_subnets(node_id, custody_subnet_count) - .flat_map(|subnet_id| get_columns_index_for_subnet(subnet_id)) + get_custody_subnets(node_id, custody_subnet_count, &config) + .flat_map(|subnet_id| get_columns_index_for_subnet(subnet_id, &config)) .sorted() } -fn get_columns_index_for_subnet(subnet_id: SubnetId) -> impl Iterator { - let columns_per_subnet = NumberOfColumns::U64 / DATA_COLUMN_SIDECAR_SUBNET_COUNT; +fn get_columns_index_for_subnet( + subnet_id: DataColumnSubnetId, + config: &Arc, +) -> impl Iterator { + let data_column_sidecar_subnet_count = config.data_column_sidecar_subnet_count(); + let columns_per_subnet = NumberOfColumns::U64 / data_column_sidecar_subnet_count; (0..columns_per_subnet) - .map(move |column_index| (DATA_COLUMN_SIDECAR_SUBNET_COUNT * column_index + subnet_id)) + .map(move |column_index| (data_column_sidecar_subnet_count * column_index + subnet_id)) + .sorted() } pub fn compute_matrix_for_data_column_sidecar( @@ -287,9 +295,9 @@ pub fn compute_matrix( blobs: Vec, metrics: &Option>, ) -> Result> { - if let Some(metrics) = metrics.as_ref() { - let _timer = metrics.data_column_sidecar_computation.start_timer(); - } + let _timer = metrics + .as_ref() + .map(|metrics| metrics.data_column_sidecar_computation.start_timer()); let kzg_settings = settings(); @@ -319,9 +327,9 @@ pub fn recover_matrix( blob_count: usize, metrics: &Option>, ) -> Result> { - if let Some(metrics) = metrics.as_ref() { - let _timer = metrics.columns_reconstruction_time.start_timer(); - } + let _timer = metrics + .as_ref() + .map(|metrics| metrics.columns_reconstruction_time.start_timer()); let mut matrix = vec![]; for blob_index in 0..blob_count { @@ -407,9 +415,10 @@ pub fn get_data_column_sidecars( signed_block: &SignedBeaconBlock

, cells_and_kzg_proofs: Vec<([Cell; CELLS_PER_EXT_BLOB], [KzgProof; CELLS_PER_EXT_BLOB])>, ) -> Result>> { + let signed_block_header = signed_block.to_header(); + let mut sidecars: Vec> = Vec::new(); if let Some(post_deneb_beacon_block_body) = signed_block.message().body().post_deneb() { - let signed_block_header = signed_block.to_header(); let kzg_commitments = post_deneb_beacon_block_body.blob_kzg_commitments(); if kzg_commitments.is_empty() { @@ -424,8 +433,15 @@ pub fn get_data_column_sidecars( commitments_length: kzg_commitments.len(), } ); - let kzg_commitments_inclusion_proof = - kzg_commitment_inclusion_proof(post_deneb_beacon_block_body); + + let kzg_commitments_inclusion_proof = signed_block + .message() + .body() + .post_electra() + .map(|post_electra_beacon_block_body| { + electra_kzg_commitment_inclusion_proof(post_electra_beacon_block_body) + }) + .unwrap_or_else(|| kzg_commitment_inclusion_proof(post_deneb_beacon_block_body)); for column_index in 0..NumberOfColumns::U64 { let column_cells: Vec = (0..blob_count) @@ -454,7 +470,7 @@ pub fn get_data_column_sidecars( } fn kzg_commitment_inclusion_proof( - body: &(impl PostDenebBeaconBlockBody

+ PostElectraBeaconBlockBody

+ ?Sized), + body: &(impl PostDenebBeaconBlockBody

+ ?Sized), ) -> BlobCommitmentsInclusionProof { let mut proof = BlobCommitmentsInclusionProof::default(); @@ -476,10 +492,44 @@ fn kzg_commitment_inclusion_proof( hashing::hash_256_256(body.graffiti(), body.proposer_slashings().hash_tree_root()), ), hashing::hash_256_256( + hashing::hash_256_256(body.attester_slashings_root(), body.attestations_root()), hashing::hash_256_256( - body.attester_slashings().hash_tree_root(), - body.attestations().hash_tree_root(), + body.deposits().hash_tree_root(), + body.voluntary_exits().hash_tree_root(), ), + ), + ); + + proof +} + +fn electra_kzg_commitment_inclusion_proof( + body: &(impl PostElectraBeaconBlockBody

+ ?Sized), +) -> BlobCommitmentsInclusionProof { + let mut proof = BlobCommitmentsInclusionProof::default(); + + proof[0] = body.bls_to_execution_changes().hash_tree_root(); + + proof[1] = hashing::hash_256_256( + body.sync_aggregate().hash_tree_root(), + body.execution_payload().hash_tree_root(), + ); + + proof[2] = hashing::hash_256_256( + hashing::hash_256_256(body.execution_requests().hash_tree_root(), ZERO_HASHES[0]), + ZERO_HASHES[1], + ); + + proof[3] = hashing::hash_256_256( + hashing::hash_256_256( + hashing::hash_256_256( + body.randao_reveal().hash_tree_root(), + body.eth1_data().hash_tree_root(), + ), + hashing::hash_256_256(body.graffiti(), body.proposer_slashings().hash_tree_root()), + ), + hashing::hash_256_256( + hashing::hash_256_256(body.attester_slashings_root(), body.attestations_root()), hashing::hash_256_256( body.deposits().hash_tree_root(), body.voluntary_exits().hash_tree_root(), @@ -496,7 +546,7 @@ fn kzg_commitment_inclusion_proof( * This helper demonstrates how to calculate the number of columns to query per slot when * allowing given number of failures, assuming uniform random selection without replacement. */ -pub fn get_extended_sample_count(allowed_failures: u64) -> Result { +pub fn get_extended_sample_count(allowed_failures: u64, config: &Config) -> Result { // check that `allowed_failures` within the accepted range [0 -> NUMBER_OF_COLUMNS // 2] // missing chunks for more than a half is the worst case let worst_case_missing = NumberOfColumns::U64 / 2 + 1; @@ -525,11 +575,11 @@ pub fn get_extended_sample_count(allowed_failures: u64) -> Result { 0, NumberOfColumns::U64, worst_case_missing, - SAMPLES_PER_SLOT, + config.samples_per_slot(), ); // number of unique column IDs - let mut sample_count = SAMPLES_PER_SLOT; + let mut sample_count = config.samples_per_slot(); while sample_count <= NumberOfColumns::U64 { if hypergeom_cdf( allowed_failures, @@ -548,6 +598,8 @@ pub fn get_extended_sample_count(allowed_failures: u64) -> Result { #[cfg(test)] mod tests { + use std::sync::Arc; + use duplicate::duplicate_item; use helper_functions::predicates::{index_at_commitment_depth, is_valid_merkle_branch}; use serde::Deserialize; @@ -556,6 +608,7 @@ mod tests { use test_generator::test_resources; use typenum::Unsigned as _; use types::{ + config::Config, deneb::containers::BeaconBlockBody as DenebBeaconBlockBody, phase0::primitives::NodeId, preset::{Mainnet, Minimal, Preset}, @@ -590,8 +643,9 @@ mod tests { result, } = case.yaml::("meta"); + let config = Arc::new(Config::default()); assert_eq!( - get_custody_columns(node_id, custody_subnet_count).collect::>(), + get_custody_columns(node_id, custody_subnet_count, &config).collect::>(), result ); } @@ -605,7 +659,7 @@ mod tests { } #[duplicate_item( - glob function_name preset; + glob function_name preset; ["consensus-spec-tests/tests/mainnet/eip7594/merkle_proof/single_merkle_proof/BeaconBlockBody/blob_kzg_commitments_*"] [kzg_commitment_inclusion_proof_mainnet] [Mainnet]; ["consensus-spec-tests/tests/minimal/eip7594/merkle_proof/single_merkle_proof/BeaconBlockBody/blob_kzg_commitments_*"] [kzg_commitment_inclusion_proof_minimal] [Minimal]; )] diff --git a/eth2_cache_utils/src/generic.rs b/eth2_cache_utils/src/generic.rs index cef365d4..0020427c 100644 --- a/eth2_cache_utils/src/generic.rs +++ b/eth2_cache_utils/src/generic.rs @@ -162,7 +162,7 @@ pub fn blob_sidecars( .skip_while(|path| path < Path::new(low.as_str())) .take_while(|path| path < Path::new(high.as_str())) .map(|path| case.ssz_uncompressed::<_, Arc>>(config, path)) - .chunks(|blob| blob.signed_block_header.message.slot) + .group_by(|blob| blob.signed_block_header.message.slot) .into_iter() .map(|(slot, blobs)| (slot, blobs.collect_vec())) .collect::>(); diff --git a/eth2_libp2p b/eth2_libp2p index b92dcbfc..637988e6 160000 --- a/eth2_libp2p +++ b/eth2_libp2p @@ -1 +1 @@ -Subproject commit b92dcbfc0c4593c33706697217c6bb7230786713 +Subproject commit 637988e626f4677755521b6cdb56746736bc12c1 diff --git a/fork_choice_control/Cargo.toml b/fork_choice_control/Cargo.toml index ce46bdb2..dbd3947e 100644 --- a/fork_choice_control/Cargo.toml +++ b/fork_choice_control/Cargo.toml @@ -16,7 +16,6 @@ database = { workspace = true } derivative = { workspace = true } derive_more = { workspace = true } drain_filter_polyfill = { workspace = true } -educe = { workspace = true } eip_7594 = { workspace = true } eth2_libp2p = { workspace = true } execution_engine = { workspace = true } @@ -53,7 +52,6 @@ types = { workspace = true } [dev-dependencies] bytesize = { workspace = true } duplicate = { workspace = true } -eth2_cache_utils = { workspace = true } factory = { workspace = true } fs-err = { workspace = true } serde-aux = { workspace = true } diff --git a/fork_choice_control/src/controller.rs b/fork_choice_control/src/controller.rs index 70404645..1e19d232 100644 --- a/fork_choice_control/src/controller.rs +++ b/fork_choice_control/src/controller.rs @@ -24,7 +24,8 @@ use eth2_libp2p::{GossipId, PeerId}; use execution_engine::{ExecutionEngine, PayloadStatusV1}; use fork_choice_store::{ AggregateAndProofOrigin, AttestationItem, AttestationOrigin, AttesterSlashingOrigin, - BlobSidecarOrigin, BlockOrigin, DataColumnSidecarOrigin, StateCacheProcessor, Store, StoreConfig, + BlobSidecarOrigin, BlockOrigin, DataColumnSidecarOrigin, StateCacheProcessor, Store, + StoreConfig, }; use futures::channel::{mpsc::Sender as MultiSender, oneshot::Sender as OneshotSender}; use genesis::AnchorCheckpointProvider; @@ -39,10 +40,7 @@ use types::{ deneb::containers::BlobSidecar, eip7594::{ColumnIndex, DataColumnSidecar}, nonstandard::ValidationOutcome, - phase0::{ - containers::{Attestation, AttesterSlashing, SignedAggregateAndProof}, - primitives::{ExecutionBlockHash, Slot, SubnetId}, - }, + phase0::primitives::{ExecutionBlockHash, Slot, SubnetId}, preset::Preset, traits::SignedBeaconBlock as _, }; @@ -248,8 +246,15 @@ where ) } - pub fn on_api_data_column_sidecar(&self, data_column_sidecar: Arc>) { - self.spawn_data_column_sidecar_task(data_column_sidecar, DataColumnSidecarOrigin::Api) + pub fn on_api_data_column_sidecar( + &self, + data_column_sidecar: Arc>, + sender: Option>>, + ) { + self.spawn_data_column_sidecar_task( + data_column_sidecar, + DataColumnSidecarOrigin::Api(sender), + ) } pub fn on_api_block( diff --git a/fork_choice_control/src/messages.rs b/fork_choice_control/src/messages.rs index e49e7036..e7cff67d 100644 --- a/fork_choice_control/src/messages.rs +++ b/fork_choice_control/src/messages.rs @@ -194,7 +194,6 @@ pub enum P2pMessage { DataColumnsNeeded(Vec, Slot, Option), FinalizedCheckpoint(Checkpoint), HeadState(#[cfg_attr(test, derivative(Debug = "ignore"))] Arc>), - ReverifyGossipAttestation(Arc>, SubnetId, GossipId), DataColumnsReconstructed(Vec>>, Slot), } diff --git a/fork_choice_control/src/misc.rs b/fork_choice_control/src/misc.rs index e2d24758..07702fa5 100644 --- a/fork_choice_control/src/misc.rs +++ b/fork_choice_control/src/misc.rs @@ -7,8 +7,6 @@ use eth2_libp2p::GossipId; use fork_choice_store::{ AggregateAndProofAction, AggregateAndProofOrigin, AttestationAction, AttestationItem, AttestationValidationError, BlobSidecarOrigin, BlockOrigin, ChainLink, DataColumnSidecarOrigin, - AggregateAndProofAction, AggregateAndProofOrigin, AttestationAction, AttestationOrigin, - BlobSidecarOrigin, BlockOrigin, ChainLink, DataColumnSidecarOrigin, }; use serde::Serialize; use strum::IntoStaticStr; @@ -16,10 +14,7 @@ use types::{ combined::{SignedAggregateAndProof, SignedBeaconBlock}, deneb::containers::BlobSidecar, eip7594::DataColumnSidecar, - phase0::{ - containers::{Attestation, SignedAggregateAndProof}, - primitives::ValidatorIndex, - }, + phase0::primitives::ValidatorIndex, preset::Preset, }; diff --git a/fork_choice_control/src/mutator.rs b/fork_choice_control/src/mutator.rs index ff23b4b1..65590f5c 100644 --- a/fork_choice_control/src/mutator.rs +++ b/fork_choice_control/src/mutator.rs @@ -80,8 +80,8 @@ use crate::{ storage::Storage, tasks::{ AttestationTask, BlobSidecarTask, BlockAttestationsTask, BlockTask, CheckpointStateTask, - DataColumnSidecarTask, PersistBlobSidecarsTask, PersistDataColumnSidecarsTask, PreprocessStateTask, - ReconstructDataColumnSidecarsTask, + DataColumnSidecarTask, PersistBlobSidecarsTask, PersistDataColumnSidecarsTask, + PreprocessStateTask, ReconstructDataColumnSidecarsTask, }, thread_pool::{Spawn, ThreadPool}, unbounded_sink::UnboundedSink, @@ -274,8 +274,8 @@ where persisted_data_column_ids, ); } - MutatorMessage::PreprocessedBeaconState { block_root, state } => { - self.handle_preprocessed_beacon_state(block_root, &state); + MutatorMessage::PreprocessedBeaconState { state } => { + self.prepare_execution_payload_for_next_slot(&state); } MutatorMessage::NotifiedForkChoiceUpdate { wait_group, @@ -542,23 +542,19 @@ where .chain_config() .is_eip7594_fork(misc::compute_epoch_at_slot::

(slot)) { - let parent = self - .store - .chain_link(pending_block.block.message().parent_root()) - .expect("block data availability check should be done after block parent presence check"); + //let parent = self + // .store + // .chain_link(pending_block.block.message().parent_root()) + // .expect("block data availability check should be done after block parent presence check"); - let missing_column_indices = - self.store.indices_of_missing_data_columns(&parent.block); + let missing_column_indices = self.store.indices_of_missing_data_columns(&block); if missing_column_indices.len() * 2 < NumberOfColumns::USIZE || !self.store.is_forward_synced() { - let available_columns = - self.store.available_columns_at_block(&parent.block); + let available_columns = self.store.available_columns_at_block(&block); - if let Some(post_deneb_block_body) = - parent.block.message().body().post_deneb() - { + if let Some(post_deneb_block_body) = block.message().body().post_deneb() { let blob_count = post_deneb_block_body.blob_kzg_commitments().len(); // check if it is supernode, and obtaining columns more than half @@ -1423,72 +1419,83 @@ where block: Arc>, full_matrix: Vec, ) -> Result<()> { - // the node MUST expose the new column as if it had received it over the network. - // If the node is subscribed to the subnet corresponding to the column, - // it MUST send the reconstructed DataColumnSidecar to its topic mesh neighbors. - // If instead the node is not subscribed to the corresponding subnet, - // it SHOULD still expose the availability of the DataColumnSidecar as part of the gossip emission process. - // See - // - // first, convert the matrix into full data column sidecars - let mut cells_and_kzg_proofs: Vec<( - [Cell; NumberOfColumns::USIZE], - [KzgProof; NumberOfColumns::USIZE], - )> = vec![]; - for entry in full_matrix.into_iter() { - let MatrixEntry { - cell, - kzg_proof, - column_index, - row_index, - } = entry; - cells_and_kzg_proofs[row_index as usize].0[column_index as usize] = cell; - cells_and_kzg_proofs[row_index as usize].1[column_index as usize] = kzg_proof; - } - let data_column_sidecars = - eip_7594::get_data_column_sidecars(&block, cells_and_kzg_proofs)?; - - // then, accept/store those missing data column sidecars - let missing_column_indices = self.store.indices_of_missing_data_columns(&block); - let columns_to_store = data_column_sidecars - .into_iter() - .filter_map(|sidecar| { - if missing_column_indices.contains(&sidecar.index) { - Some(Arc::new(sidecar)) - } else { - None + if let Some(post_deneb_body) = block.message().body().post_deneb() { + let blob_count = post_deneb_body.blob_kzg_commitments().len(); + + // the node MUST expose the new column as if it had received it over the network. + // If the node is subscribed to the subnet corresponding to the column, + // it MUST send the reconstructed DataColumnSidecar to its topic mesh neighbors. + // If instead the node is not subscribed to the corresponding subnet, + // it SHOULD still expose the availability of the DataColumnSidecar as part of the gossip emission process. + // See + // + // first, convert the matrix into full data column sidecars + let default_cell = Cell::default(); + let mut cells_and_kzg_proofs: Vec<( + [Cell; NumberOfColumns::USIZE], + [KzgProof; NumberOfColumns::USIZE], + )> = vec![ + ( + core::array::from_fn(|_| default_cell.clone()), + [KzgProof::repeat_byte(u8::MAX); NumberOfColumns::USIZE], + ); + blob_count + ]; + for entry in full_matrix { + let MatrixEntry { + cell, + kzg_proof, + column_index, + row_index, + } = entry; + + cells_and_kzg_proofs[row_index as usize].0[column_index as usize] = cell; + cells_and_kzg_proofs[row_index as usize].1[column_index as usize] = kzg_proof; + } + + let data_column_sidecars = + eip_7594::get_data_column_sidecars(&block, cells_and_kzg_proofs)?; + + // then, accept/store those missing data column sidecars + let missing_column_indices = self.store.indices_of_missing_data_columns(&block); + let columns_to_store = data_column_sidecars + .into_iter() + .filter_map(|sidecar| { + if missing_column_indices.contains(&sidecar.index) { + Some(Arc::new(sidecar)) + } else { + None + } + }) + .collect::>(); + + if !columns_to_store.is_empty() { + info!( + "storing reconstructed data column sidecars (indexes: [{}], block: {}, slot: {})", + columns_to_store.iter().map(|c| c.index).join(", "), + block.message().hash_tree_root(), + block.message().slot(), + ); + + for data_column_sidecar in columns_to_store.iter() { + debug!( + "storing reconstructed data column sidecar (index: {}, column: {data_column_sidecar:?}", + data_column_sidecar.index, + ); + self.accept_data_column_sidecar(&wait_group, data_column_sidecar.clone_arc()); } - }) - .collect::>(); - info!( - "storing reconstructed data column sidecars (indexes: [{}], block: {}, slot: {})", - columns_to_store.iter().map(|c| c.index).join(", "), - block.message().hash_tree_root(), - block.message().slot(), - ); - for data_column_sidecar in columns_to_store.iter() { - debug!( - "storing reconstructed data column sidecar (index: {}, column: {data_column_sidecar:?}", - data_column_sidecar.index, - ); - self.accept_data_column_sidecar(&wait_group, data_column_sidecar.clone_arc()); + // after that, publish/propagate those columns on the respective subnets + P2pMessage::DataColumnsReconstructed(columns_to_store, block.message().slot()) + .send(&self.p2p_tx); + } else { + warn!("no missing columns, all are available after reconstruction"); + } } - // after that, publish/propagate those columns on the respective subnets - P2pMessage::DataColumnsReconstructed(columns_to_store, block.message().slot()) - .send(&self.p2p_tx); Ok(()) } - fn handle_preprocessed_beacon_state(&mut self, block_root: H256, state: &Arc>) { - self.store_mut() - .insert_preprocessed_state(block_root, state.clone_arc()); - self.update_store_snapshot(); - - self.prepare_execution_payload_for_next_slot(state); - } - fn handle_notified_forkchoice_update_result( &mut self, wait_group: &W, @@ -2696,7 +2703,7 @@ where debug!("pruned old blob sidecards from storage up to slot {up_to_slot}"); } Err(error) => { - error!("pruning old blob sidecards from storage failed: {error:?}") + error!("prutrack_collection_metricsning old blob sidecards from storage failed: {error:?}") } } })?; diff --git a/fork_choice_control/src/spec_tests.rs b/fork_choice_control/src/spec_tests.rs index 6b48612b..8428e5c8 100644 --- a/fork_choice_control/src/spec_tests.rs +++ b/fork_choice_control/src/spec_tests.rs @@ -4,6 +4,7 @@ use clock::Tick; use duplicate::duplicate_item; use execution_engine::PayloadStatusWithBlockHash; use helper_functions::misc; +use itertools::izip; use serde::Deserialize; use spec_test_utils::Case; use ssz::ContiguousList; @@ -13,14 +14,17 @@ use test_generator::test_resources; use types::{ combined::{Attestation, AttesterSlashing, BeaconBlock, BeaconState, SignedBeaconBlock}, config::Config, - deneb::primitives::{Blob, KzgProof}, + deneb::{ + containers::BlobSidecar, + primitives::{Blob, KzgProof}, + }, nonstandard::{Phase, TimedPowBlock}, phase0::{ containers::Checkpoint, primitives::{Slot, UnixSeconds, H256}, }, preset::{Mainnet, Minimal, Preset}, - traits::{BeaconState as _, PostDenebBeaconBlockBody, SignedBeaconBlock as _}, + traits::{BeaconState as _, SignedBeaconBlock as _}, }; use crate::helpers::Context; @@ -174,34 +178,49 @@ fn run_case(config: &Arc, case: Case) { proofs, valid, } => { - type BlobBundle

= ContiguousList,

::MaxBlobsPerBlock>; + let mut expected_blob_count = 0; let block = case.ssz::<_, Arc>>(config.as_ref(), block); - let blobs = blobs - .map(|path| case.ssz_default::>(path)) - .into_iter() - .flatten(); + if let Some(body) = block.message().body().post_deneb() { + type BlobBundle

= ContiguousList,

::MaxBlobsPerBlock>; + + let blobs = blobs.map(|path| case.ssz_default::>(path)); + let signed_block_header = block.to_header(); - let proofs = proofs.into_iter().flatten(); + for (index, blob, kzg_proof, kzg_commitment) in izip!( + 0.., + blobs.unwrap_or_default(), + proofs.unwrap_or_default(), + body.blob_kzg_commitments().iter().copied(), + ) { + // TODO(feature/deneb): Constructing proofs and sidecars is unnecessary. + // Consider mocking `retrieve_blobs_and_proofs` + // from `consensus-specs` using something like + // `TestExecutionEngine`. + let kzg_commitment_inclusion_proof = if let Some(post_electra_body) = + block.message().body().post_electra() + { + misc::electra_kzg_commitment_inclusion_proof(post_electra_body, index) + .expect("inclusion proof should be constructed successfully") + } else { + misc::deneb_kzg_commitment_inclusion_proof(body, index) + .expect("inclusion proof should be constructed successfully") + }; - // TODO(feature/deneb): Constructing proofs and sidecars is unnecessary. - // Consider mocking `retrieve_blobs_and_proofs` - // from `consensus-specs` using something like - // `TestExecutionEngine`. - let blob_sidecars = misc::construct_blob_sidecars(&block, blobs, proofs) - .expect("blob sidecars should be constructed successfully"); + let blob_sidecar = BlobSidecar { + index, + blob, + kzg_commitment, + kzg_proof, + signed_block_header, + kzg_commitment_inclusion_proof, + }; - let expected_blob_count = block - .message() - .body() - .post_deneb() - .map(PostDenebBeaconBlockBody::blob_kzg_commitments) - .map(|contiguous_list| contiguous_list.len()) - .unwrap_or_default(); + context.on_blob_sidecar(blob_sidecar); + } - for blob_sidecar in blob_sidecars { - context.on_blob_sidecar(blob_sidecar); + expected_blob_count = body.blob_kzg_commitments().len(); } if !valid && expected_blob_count > 0 { diff --git a/fork_choice_control/src/storage.rs b/fork_choice_control/src/storage.rs index 9e5b1597..0291a739 100644 --- a/fork_choice_control/src/storage.rs +++ b/fork_choice_control/src/storage.rs @@ -942,7 +942,7 @@ impl PrefixableKey for SlotBlobId { } #[derive(Display)] -#[display(fmt = "{}{_0:x}{_1}", Self::PREFIX)] +#[display("{}{_0:x}{_1}", Self::PREFIX)] pub struct DataColumnSidecarByColumnIndex(pub H256, pub ColumnIndex); impl DataColumnSidecarByColumnIndex { @@ -950,7 +950,7 @@ impl DataColumnSidecarByColumnIndex { } #[derive(Display)] -#[display(fmt = "{}{_0:020}{_1:x}{_2}", Self::PREFIX)] +#[display("{}{_0:020}{_1:x}{_2}", Self::PREFIX)] pub struct SlotColumnIndex(pub Slot, pub H256, pub ColumnIndex); impl SlotColumnIndex { diff --git a/fork_choice_control/src/tasks.rs b/fork_choice_control/src/tasks.rs index 2b2c6663..0159a6c6 100644 --- a/fork_choice_control/src/tasks.rs +++ b/fork_choice_control/src/tasks.rs @@ -10,7 +10,7 @@ use execution_engine::{ExecutionEngine, NullExecutionEngine}; use features::Feature; use fork_choice_store::{ AggregateAndProofOrigin, AttestationItem, AttestationOrigin, AttesterSlashingOrigin, - BlobSidecarOrigin, BlockAction, BlockOrigin, DataColumnSidecarOrigin, StateCacheProcessor, + BlobSidecarOrigin, BlockAction, BlockOrigin, DataColumnSidecarOrigin, StateCacheProcessor, Store, }; use futures::channel::mpsc::Sender as MultiSender; @@ -23,8 +23,8 @@ use prometheus_metrics::Metrics; use types::{ combined::{AttesterSlashing, SignedAggregateAndProof, SignedBeaconBlock}, deneb::containers::BlobSidecar, + eip7594::DataColumnSidecar, nonstandard::{RelativeEpoch, ValidationOutcome}, - eip7594::{ColumnIndex, DataColumnSidecar}, phase0::{ containers::Checkpoint, primitives::{Slot, H256}, @@ -494,7 +494,7 @@ pub struct ReconstructDataColumnSidecarsTask { impl Run for ReconstructDataColumnSidecarsTask { fn run(self) { let Self { - store_snapshot, + store_snapshot: _, mutator_tx, wait_group, block, diff --git a/fork_choice_store/Cargo.toml b/fork_choice_store/Cargo.toml index 14e7c295..725b7797 100644 --- a/fork_choice_store/Cargo.toml +++ b/fork_choice_store/Cargo.toml @@ -13,7 +13,6 @@ clock = { workspace = true } crossbeam-skiplist = { workspace = true } derivative = { workspace = true } derive_more = { workspace = true } -educe = { workspace = true } eip_7594 = { workspace = true } eth2_libp2p = { workspace = true } execution_engine = { workspace = true } @@ -26,6 +25,7 @@ itertools = { workspace = true } kzg_utils = { workspace = true } log = { workspace = true } prometheus_metrics = { workspace = true } +serde = { workspace = true } ssz = { workspace = true } state_cache = { workspace = true } static_assertions = { workspace = true } diff --git a/fork_choice_store/src/error.rs b/fork_choice_store/src/error.rs index d3191f7d..56f3c3cf 100644 --- a/fork_choice_store/src/error.rs +++ b/fork_choice_store/src/error.rs @@ -7,11 +7,8 @@ use types::{ bellatrix::containers::PowBlock, combined::{Attestation, SignedAggregateAndProof, SignedBeaconBlock}, deneb::containers::BlobSidecar, - eip7594::DataColumnSidecar, - phase0::{ - containers::{Attestation, SignedAggregateAndProof}, - primitives::{Slot, SubnetId, ValidatorIndex}, - }, + eip7594::{DataColumnSidecar, DataColumnSubnetId}, + phase0::primitives::{Slot, SubnetId, ValidatorIndex}, preset::{Mainnet, Preset}, }; @@ -117,8 +114,8 @@ pub enum Error { )] DataColumnSidecarOnIncorrectSubnet { data_column_sidecar: Arc>, - expected: SubnetId, - actual: SubnetId, + expected: DataColumnSubnetId, + actual: DataColumnSubnetId, }, #[error( "data_column sidecar has incorrect proposer index \ diff --git a/fork_choice_store/src/lib.rs b/fork_choice_store/src/lib.rs index dafb11a6..e5e5a19a 100644 --- a/fork_choice_store/src/lib.rs +++ b/fork_choice_store/src/lib.rs @@ -80,7 +80,7 @@ pub use crate::{ AggregateAndProofAction, AggregateAndProofOrigin, ApplyBlockChanges, ApplyTickChanges, AttestationAction, AttestationItem, AttestationOrigin, AttestationValidationError, AttesterSlashingOrigin, BlobSidecarAction, BlobSidecarOrigin, BlockAction, BlockOrigin, - ChainLink, DataColumnSidecarAction, DataColumnSidecarOrigin, PartialBlockAction, + ChainLink, DataColumnSidecarAction, DataColumnSidecarOrigin, PartialBlockAction, PayloadAction, ValidAttestation, }, segment::Segment, diff --git a/fork_choice_store/src/misc.rs b/fork_choice_store/src/misc.rs index cc59aa43..e7b7609b 100644 --- a/fork_choice_store/src/misc.rs +++ b/fork_choice_store/src/misc.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use anyhow::{Error as AnyhowError, Result}; use derivative::Derivative; use derive_more::Debug; -use educe::Educe; use eth2_libp2p::{GossipId, PeerId}; use features::Feature; use futures::channel::{mpsc::Sender, oneshot::Sender as OneshotSender}; @@ -23,8 +22,8 @@ use types::{ Attestation, AttestingIndices, BeaconState, SignedAggregateAndProof, SignedBeaconBlock, }, deneb::containers::BlobSidecar, + eip7594::{DataColumnSidecar, DataColumnSubnetId}, nonstandard::{PayloadStatus, Publishable, ValidationOutcome}, - eip7594::DataColumnSidecar, phase0::{ containers::{AttestationData, Checkpoint}, primitives::{Epoch, ExecutionBlockHash, Gwei, Slot, SubnetId, ValidatorIndex, H256}, @@ -231,6 +230,7 @@ impl BlockOrigin { #[derive(Debug, AsRefStr)] pub enum AggregateAndProofOrigin { Gossip(I), + GossipBatch(I), Api(OneshotSender>), } @@ -247,7 +247,7 @@ impl AggregateAndProofOrigin { #[must_use] pub fn split(self) -> (Option, Option>>) { match self { - Self::Gossip(gossip_id) => (Some(gossip_id), None), + Self::Gossip(gossip_id) | Self::GossipBatch(gossip_id) => (Some(gossip_id), None), Self::Api(sender) => (None, Some(sender)), } } @@ -255,7 +255,7 @@ impl AggregateAndProofOrigin { #[must_use] pub fn gossip_id(self) -> Option { match self { - Self::Gossip(gossip_id) => Some(gossip_id), + Self::Gossip(gossip_id) | Self::GossipBatch(gossip_id) => Some(gossip_id), Self::Api(_) => None, } } @@ -263,7 +263,7 @@ impl AggregateAndProofOrigin { #[must_use] pub const fn gossip_id_ref(&self) -> Option<&I> { match self { - Self::Gossip(gossip_id) => Some(gossip_id), + Self::Gossip(gossip_id) | Self::GossipBatch(gossip_id) => Some(gossip_id), Self::Api(_) => None, } } @@ -272,13 +272,14 @@ impl AggregateAndProofOrigin { pub const fn verify_signatures(&self) -> bool { match self { Self::Gossip(_) | Self::Api(_) => true, + Self::GossipBatch(_) => false, } } #[must_use] pub const fn send_to_validator(&self) -> bool { match self { - Self::Gossip(_) | Self::Api(_) => true, + Self::Gossip(_) | Self::GossipBatch(_) | Self::Api(_) => true, } } @@ -287,6 +288,7 @@ impl AggregateAndProofOrigin { pub const fn metrics_label(&self) -> &str { match self { Self::Gossip(_) => "Gossip", + Self::GossipBatch(_) => "GossipBatch", Self::Api(_) => "Api", } } @@ -539,8 +541,8 @@ impl BlobSidecarOrigin { #[derive(Debug)] pub enum DataColumnSidecarOrigin { - Api, - Gossip(SubnetId, GossipId), + Api(Option>>), + Gossip(DataColumnSubnetId, GossipId), Requested(PeerId), Own, } @@ -550,7 +552,7 @@ impl DataColumnSidecarOrigin { pub fn gossip_id(self) -> Option { match self { Self::Gossip(_, gossip_id) => Some(gossip_id), - Self::Api | Self::Own | Self::Requested(_) => None, + Self::Api(_) | Self::Own | Self::Requested(_) => None, } } @@ -559,15 +561,15 @@ impl DataColumnSidecarOrigin { match self { Self::Gossip(_, gossip_id) => Some(gossip_id.source), Self::Requested(peer_id) => Some(*peer_id), - Self::Api | Self::Own => None, + Self::Api(_) | Self::Own => None, } } #[must_use] - pub const fn subnet_id(&self) -> Option { + pub const fn subnet_id(&self) -> Option { match self { Self::Gossip(subnet_id, _) => Some(*subnet_id), - Self::Api | Self::Own | Self::Requested(_) => None, + Self::Api(_) | Self::Own | Self::Requested(_) => None, } } } diff --git a/fork_choice_store/src/store.rs b/fork_choice_store/src/store.rs index 3d39c50f..eba1b661 100644 --- a/fork_choice_store/src/store.rs +++ b/fork_choice_store/src/store.rs @@ -23,9 +23,8 @@ use helper_functions::{ use im::{hashmap, hashmap::HashMap, ordmap, vector, HashSet, OrdMap, Vector}; use itertools::{izip, Either, EitherOrBoth, Itertools as _}; use log::{error, warn}; -use primitive_types::H384; use prometheus_metrics::Metrics; -use ssz::SszHash as _; +use ssz::{ContiguousList, SszHash as _}; use std_ext::ArcExt as _; use tap::Pipe as _; use transition_functions::{ @@ -62,17 +61,16 @@ use crate::{ misc::{ AggregateAndProofAction, AggregateAndProofOrigin, ApplyBlockChanges, ApplyTickChanges, AttestationAction, AttestationItem, AttestationValidationError, AttesterSlashingOrigin, - BlobSidecarAction, BlobSidecarOrigin, BlockAction, BranchPoint, ChainLink, DataColumnSidecarAction, - Difference, DifferenceAtLocation, DissolvedDifference, LatestMessage, Location, - PartialAttestationAction, PartialBlockAction, PayloadAction, Score, SegmentId, + BlobSidecarAction, BlobSidecarOrigin, BlockAction, BranchPoint, ChainLink, + DataColumnSidecarAction, Difference, DifferenceAtLocation, DissolvedDifference, + LatestMessage, Location, PartialAttestationAction, PayloadAction, Score, SegmentId, UnfinalizedBlock, ValidAttestation, }, segment::{Position, Segment}, state_cache_processor::StateCacheProcessor, store_config::StoreConfig, supersets::MultiPhaseAggregateAndProofSets as AggregateAndProofSupersets, - validations::validate_merge_block, - DataColumnSidecarOrigin, + validate_merge_block, DataColumnSidecarOrigin, PartialBlockAction, }; /// [`Store`] from the Fork Choice specification. @@ -1060,25 +1058,34 @@ impl Store

{ return Some(BlockAction::Ignore(false)); } - // > [Modified in EIP7594] Check if blob data is available - // - // If not, this block MAY be queued and subsequently considered when blob data becomes available - if self - .chain_config - .is_eip7594_fork(accessors::get_current_epoch(&state)) - { - let missing_indices = self.indices_of_missing_data_columns(&parent.block); + None + } - if missing_indices.len() * 2 >= NumberOfColumns::USIZE && self.is_forward_synced() { - return Some(BlockAction::DelayUntilBlobs(block)); - } - } else { - if !self.indices_of_missing_blobs(&block).is_empty() { - return Some(BlockAction::DelayUntilBlobs(block)); - } + pub fn validate_block_for_gossip( + &self, + block: &Arc>, + state_transition_for_gossip: impl FnOnce(&ChainLink

) -> Result>>, + ) -> Result>> { + let block_root = block.message().hash_tree_root(); + let block_action = self.validate_gossip_rules(block, block_root); + + if let Some(action) = block_action { + return Ok(Some(action)); } - None + // > Parent block must be known + let Some(parent) = self.chain_link(block.message().parent_root()) else { + return Ok(Some(BlockAction::DelayUntilParent(block.clone_arc()))); + }; + + // > Check the block is valid and compute the post-state + let block_action = state_transition_for_gossip(parent)?; + + if let Some(action) = block_action { + return Ok(Some(action)); + } + + Ok(None) } pub fn validate_block_with_custom_state_transition( @@ -1096,9 +1103,36 @@ impl Store

{ return Ok(action); } + // > Parent block must be known + let Some(parent) = self.chain_link(block.message().parent_root()) else { + return Ok(BlockAction::DelayUntilParent(block.clone_arc())); + }; + + // > [Modified in EIP7594] Check if blob data is available + // + // If not, this block MAY be queued and subsequently considered when blob data becomes available + if self + .chain_config + .is_eip7594_fork(misc::compute_epoch_at_slot::

(block.message().slot())) + { + let missing_indices = self.indices_of_missing_data_columns(block); + + if missing_indices.len() * 2 >= NumberOfColumns::USIZE && self.is_forward_synced() { + return Ok(BlockAction::DelayUntilBlobs(block.clone_arc())); + } + } else { + if !self.indices_of_missing_blobs(block).is_empty() { + return Ok(BlockAction::DelayUntilBlobs(block.clone_arc())); + } + } + // > Check the block is valid and compute the post-state let (state, block_action) = state_transition(block_root, parent)?; - + + if let Some(action) = block_action { + return Ok(action); + } + let attester_slashing_results = block .message() .body() @@ -1864,9 +1898,8 @@ impl Store

{ let block_header = data_column_sidecar.signed_block_header.message; let mut state = self - .preprocessed_states - .before_or_at_slot(block_header.parent_root, block_header.slot) - .cloned() + .state_cache + .before_or_at_slot(self, block_header.parent_root, block_header.slot) .unwrap_or_else(|| { self.chain_link(block_header.parent_root) .or_else(|| self.chain_link_before_or_at(block_header.slot)) @@ -1884,7 +1917,10 @@ impl Store

{ // [REJECT] The sidecar is for the correct subnet -- i.e. compute_subnet_for_data_column_sidecar(sidecar.index) == subnet_id. if let Some(subnet_id) = origin.subnet_id() { - let expected = misc::compute_subnet_for_data_column_sidecar(data_column_sidecar.index); + let expected = misc::compute_subnet_for_data_column_sidecar( + data_column_sidecar.index, + &self.chain_config, + ); ensure!( subnet_id == expected, @@ -2777,7 +2813,7 @@ impl Store

{ for (segment_id, group) in &self .propagate_and_dissolve_differences(differences)? .into_iter() - .chunk_by(|dissolved_difference| dissolved_difference.segment_id) + .group_by(|dissolved_difference| dissolved_difference.segment_id) { let segment = &mut self.unfinalized[&segment_id]; @@ -3324,10 +3360,8 @@ impl Store

{ (0..NumberOfColumns::U64) .into_iter() .filter_map(|index| { - self.data_column_cache.get(DataColumnIdentifier { - block_root, - index - }) + self.data_column_cache + .get(DataColumnIdentifier { block_root, index }) }) .collect() } @@ -3353,9 +3387,9 @@ impl Store

{ ); metrics.set_collection_length( module_path!(), - &type_name, - "data_column_store", - self.data_column_cache.size() + &type_name, + "data_column_store", + self.data_column_cache.size(), ); metrics.set_collection_length( module_path!(), diff --git a/grandine-snapshot-tests b/grandine-snapshot-tests index 6cdef250..449b3b11 160000 --- a/grandine-snapshot-tests +++ b/grandine-snapshot-tests @@ -1 +1 @@ -Subproject commit 6cdef2501cf7d71ec96a871486c1ccfc63fb58ac +Subproject commit 449b3b119540b6b393a28fcde94e4ee2e1d435e1 diff --git a/grandine/Cargo.toml b/grandine/Cargo.toml index 2b4bbf12..a8eb9046 100644 --- a/grandine/Cargo.toml +++ b/grandine/Cargo.toml @@ -73,6 +73,7 @@ test-case = { workspace = true } [features] logger-always-write-style = [] +logger-parse-env = [] # `preset-any` and `network-any` should not be passed to Cargo. # They only exist to avoid duplicating lists of features. diff --git a/grandine/src/grandine_args.rs b/grandine/src/grandine_args.rs index 80e38f52..24c0c48b 100644 --- a/grandine/src/grandine_args.rs +++ b/grandine/src/grandine_args.rs @@ -3,7 +3,6 @@ #![allow(clippy::doc_markdown)] use core::{ - fmt::Display, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, num::{NonZeroU16, NonZeroU64}, ops::Not as _, diff --git a/grandine/src/main.rs b/grandine/src/main.rs index bf9fcb22..0ceaf63e 100644 --- a/grandine/src/main.rs +++ b/grandine/src/main.rs @@ -224,8 +224,6 @@ impl Context { storage_config.eth1_database()? }; - Feature::DebugP2p.enable(); - log::info!("CHAIN CONFIG: {chain_config:#?}"); let eth1_chain = Eth1Chain::new( diff --git a/helper_functions/src/error.rs b/helper_functions/src/error.rs index 1af76fa8..12434db4 100644 --- a/helper_functions/src/error.rs +++ b/helper_functions/src/error.rs @@ -51,6 +51,8 @@ pub enum SignatureKind { AggregateAndProof, #[display("attestation signature")] Attestation, + #[display("blob sidecar signature")] + BlobSidecar, #[display("block signature in blob sidecar")] BlockInBlobSidecar, #[display("block signature")] diff --git a/helper_functions/src/misc.rs b/helper_functions/src/misc.rs index 33f1d14f..ef6a5261 100644 --- a/helper_functions/src/misc.rs +++ b/helper_functions/src/misc.rs @@ -23,7 +23,7 @@ use types::{ Blob, BlobCommitmentInclusionProof, BlobIndex, KzgCommitment, KzgProof, VersionedHash, }, }, - eip7594::{ColumnIndex, DATA_COLUMN_SIDECAR_SUBNET_COUNT}, + eip7594::{ColumnIndex, DataColumnSubnetId}, phase0::{ consts::{ AttestationSubnetCount, BLS_WITHDRAWAL_PREFIX, ETH1_ADDRESS_WITHDRAWAL_PREFIX, @@ -211,8 +211,11 @@ pub fn compute_subnet_for_blob_sidecar(blob_index: BlobIndex) -> SubnetId { // source: https://github.com/ethereum/consensus-specs/pull/3574/files/cebf78a83e6fc8fa237daf4264b9ca0fe61473f4#diff-96cf4db15bede3d60f04584fb25339507c35755959159cdbe19d760ca92de109R106 #[must_use] -pub fn compute_subnet_for_data_column_sidecar(column_index: ColumnIndex) -> SubnetId { - (column_index % DATA_COLUMN_SIDECAR_SUBNET_COUNT) +pub fn compute_subnet_for_data_column_sidecar( + column_index: ColumnIndex, + config: &Config, +) -> DataColumnSubnetId { + (column_index % config.data_column_sidecar_subnet_count()) .try_into() .unwrap() } diff --git a/helper_functions/src/signing.rs b/helper_functions/src/signing.rs index 72c8a87e..bd53e7b8 100644 --- a/helper_functions/src/signing.rs +++ b/helper_functions/src/signing.rs @@ -22,7 +22,8 @@ use types::{ BlindedBeaconBlock, }, config::Config, - deneb::containers::BeaconBlock as DenebBeaconBlock, + deneb::{consts::DOMAIN_BLOB_SIDECAR, containers::BeaconBlock as DenebBeaconBlock}, + eip7594::DataColumnSidecar, electra::containers::{ AggregateAndProof as ElectraAggregateAndProof, BeaconBlock as ElectraBeaconBlock, }, @@ -418,6 +419,21 @@ impl SignForSingleFork

for VoluntaryExit { } } +// labai gali buti, kad neteisingas +impl SignForSingleFork

for DataColumnSidecar

{ + const DOMAIN_TYPE: DomainType = DOMAIN_BLOB_SIDECAR; + const SIGNATURE_KIND: SignatureKind = SignatureKind::BlobSidecar; + + fn epoch(&self) -> Epoch { + misc::compute_epoch_at_slot::

(self.signed_block_header.message.slot) + } + + fn signing_root(&self, config: &Config, beacon_state: &(impl BeaconState

+ ?Sized)) -> H256 { + let domain = accessors::get_domain(config, beacon_state, Self::DOMAIN_TYPE, None); + misc::compute_signing_root(self, domain) + } +} + /// impl SignForSingleForkAtSlot

for H256 { const DOMAIN_TYPE: DomainType = DOMAIN_SYNC_COMMITTEE; diff --git a/http_api/Cargo.toml b/http_api/Cargo.toml index cf608631..3acb69f1 100644 --- a/http_api/Cargo.toml +++ b/http_api/Cargo.toml @@ -28,6 +28,7 @@ http_api_utils = { workspace = true } itertools = { workspace = true } liveness_tracker = { workspace = true } log = { workspace = true } +mediatype = { workspace = true } metrics = { workspace = true } mime = { workspace = true } operation_pools = { workspace = true } @@ -64,7 +65,6 @@ database = { workspace = true } dedicated_executor = { workspace = true } deposit_tree = { workspace = true } eth1 = { workspace = true } -eth2_cache_utils = { workspace = true } execution_engine = { workspace = true } factory = { workspace = true } fork_choice_store = { workspace = true } diff --git a/http_api/src/standard.rs b/http_api/src/standard.rs index 0454fecb..1bfa3e41 100644 --- a/http_api/src/standard.rs +++ b/http_api/src/standard.rs @@ -1433,7 +1433,7 @@ pub async fn submit_pool_attestations( let grouped_by_target = attestations .into_iter() .enumerate() - .chunk_by(|(_, attestation)| attestation.data().target); + .group_by(|(_, attestation)| attestation.data().target); let (targets, target_attestations): (Vec<_>, Vec<_>) = grouped_by_target .into_iter() @@ -2703,10 +2703,18 @@ async fn publish_beacon_block_with_gossip_checks_data_column_sidecars { - publish_block_to_network_with_data_column_sidecars(block, data_column_sidecars, api_to_p2p_tx); + publish_block_to_network_with_data_column_sidecars( + block, + data_column_sidecars, + api_to_p2p_tx, + ); } Ok(Some(ValidationOutcome::Ignore(true))) => { - publish_block_to_network_with_data_column_sidecars(block, data_column_sidecars, api_to_p2p_tx); + publish_block_to_network_with_data_column_sidecars( + block, + data_column_sidecars, + api_to_p2p_tx, + ); return Ok(Some(StatusCode::ACCEPTED)); } Ok(Some(ValidationOutcome::Ignore(false))) => { @@ -2743,7 +2751,7 @@ fn publish_block_to_network_with_data_column_sidecars( data_column_sidecars: &[Arc>], api_to_p2p_tx: &UnboundedSender>, ) { - ApiToP2p::PublishDataColumnSidecars(data_column_sidecars.clone_arc()).send(api_to_p2p_tx); + ApiToP2p::PublishDataColumnSidecars(data_column_sidecars.to_vec()).send(api_to_p2p_tx); ApiToP2p::PublishBeaconBlock(block).send(api_to_p2p_tx); } @@ -2775,7 +2783,7 @@ async fn publish_signed_block_with_data_column_sidecar( let status_code = match receiver.next().await.transpose() { Ok(Some(ValidationOutcome::Accept)) => StatusCode::OK, - Ok(Some(ValidationOutcome::Ignore)) => { + Ok(Some(ValidationOutcome::Ignore(_))) => { // We log only the root with `info!` because this is not an exceptional case. // Vouch submits blocks it constructs to all beacon nodes it is connected to. // The blocks often reach our application through gossip faster than through the API. @@ -2912,18 +2920,22 @@ async fn publish_signed_block_v2_with_data_column_sidecar( return Ok(status_code); } } - + let (sender, mut receiver) = futures::channel::mpsc::channel(1); controller.on_api_block(block.clone_arc(), sender); let status_code = match receiver.next().await.transpose() { - Ok(Some(accept_or_ignore_status)) => { - match accept_or_ignore_status { + Ok(Some(accept_or_ignore_status)) => { + match accept_or_ignore_status { ValidationOutcome::Accept => match broadcast_validation { BroadcastValidation::Gossip => StatusCode::OK, BroadcastValidation::Consensus => { - publish_block_to_network_with_data_column_sidecars(block, &data_column_sidecars, &api_to_p2p_tx); + publish_block_to_network_with_data_column_sidecars( + block, + &data_column_sidecars, + &api_to_p2p_tx, + ); StatusCode::OK } BroadcastValidation::ConsensusAndEquivocation => { @@ -2931,10 +2943,14 @@ async fn publish_signed_block_v2_with_data_column_sidecar( return Err(Error::InvalidBlock(anyhow!("block exibits equivocation"))); } - publish_block_to_network_with_data_column_sidecars(block, &data_column_sidecars, &api_to_p2p_tx); + publish_block_to_network_with_data_column_sidecars( + block, + &data_column_sidecars, + &api_to_p2p_tx, + ); StatusCode::OK } - } + }, ValidationOutcome::Ignore(publishable) => { // We log only the root with `info!` because this is not an exceptional case. // Vouch submits blocks it constructs to all beacon nodes it is connected to. @@ -2948,7 +2964,11 @@ async fn publish_signed_block_v2_with_data_column_sidecar( if broadcast_validation == BroadcastValidation::Gossip { StatusCode::ACCEPTED } else if publishable { - publish_block_to_network_with_data_column_sidecars(block, &data_column_sidecars, &api_to_p2p_tx); + publish_block_to_network_with_data_column_sidecars( + block, + &data_column_sidecars, + &api_to_p2p_tx, + ); StatusCode::ACCEPTED } else { return Err(Error::UnableToPublishBlock); @@ -2963,7 +2983,6 @@ async fn publish_signed_block_v2_with_data_column_sidecar( } else { return Err(Error::UnableToPublishBlock); } - } Err(error) => { warn!("received invalid block through HTTP API (block: {block:?}, error: {error})"); @@ -3077,7 +3096,9 @@ async fn submit_data_column_sidecars( ) -> Result<(), Error> { let results: Result> = data_column_sidecars .iter() - .map(|data_column_sidecar| submit_data_column_sidecar(controller.clone_arc(), data_column_sidecar.clone_arc())) + .map(|data_column_sidecar| { + submit_data_column_sidecar(controller.clone_arc(), data_column_sidecar.clone_arc()) + }) .collect::>() .collect::>() .await @@ -3099,7 +3120,6 @@ async fn submit_data_column_sidecars( Ok(()) } - fn send_attester_slashing_event( attester_slashing: AttesterSlashing

, event_channels: &EventChannels, diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 40806959..69be48e3 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -22,6 +22,7 @@ helper_functions = { workspace = true } http_api_utils = { workspace = true } jemalloc-ctl = { workspace = true } log = { workspace = true } +num_threads = { workspace = true } p2p = { workspace = true } prometheus = { workspace = true } # `prometheus-client` is only needed for libp2p metrics. diff --git a/operation_pools/Cargo.toml b/operation_pools/Cargo.toml index db7a550d..610907b2 100644 --- a/operation_pools/Cargo.toml +++ b/operation_pools/Cargo.toml @@ -37,8 +37,5 @@ tynm = { workspace = true } typenum = { workspace = true } types = { workspace = true } -[dev-dependencies] -eth2_cache_utils = { workspace = true } - [features] eth2-cache = [] diff --git a/operation_pools/src/attestation_agg_pool/attestation_packer.rs b/operation_pools/src/attestation_agg_pool/attestation_packer.rs index e108ab17..d57c0b00 100644 --- a/operation_pools/src/attestation_agg_pool/attestation_packer.rs +++ b/operation_pools/src/attestation_agg_pool/attestation_packer.rs @@ -196,7 +196,7 @@ impl AttestationPacker

{ .take_while(|_| !self.deadline_reached()) .filter(|aggregate| self.is_valid_for_inclusion(aggregate)) .map(|aggregate| (aggregate.data, aggregate)) - .chunk_by(|&(data, _)| data) + .group_by(|&(data, _)| data) .into_iter() .map(|(_, group)| group.map(|(_, y)| y.clone()).collect()) .collect_vec() diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 9edbf679..e5521ba0 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -17,7 +17,6 @@ derive_more = { workspace = true } enum-iterator = { workspace = true } eth1_api = { workspace = true } eth2_libp2p = { workspace = true } -execution_engine = { workspace = true } features = { workspace = true } fork_choice_control = { workspace = true } futures = { workspace = true } diff --git a/p2p/src/block_sync_service.rs b/p2p/src/block_sync_service.rs index 0e5485ff..8eebbaf6 100644 --- a/p2p/src/block_sync_service.rs +++ b/p2p/src/block_sync_service.rs @@ -8,7 +8,6 @@ use eth2_libp2p::{ rpc::{RPCError, StatusMessage}, NetworkGlobals, PeerAction, PeerId, ReportSource, }; -use features::Feature; use fork_choice_control::SyncMessage; use futures::{ channel::mpsc::{UnboundedReceiver, UnboundedSender}, @@ -28,7 +27,6 @@ use tokio_stream::wrappers::IntervalStream; use types::{ deneb::containers::BlobIdentifier, eip7594::DataColumnIdentifier, - nonstandard::Phase, phase0::primitives::{Slot, H256}, preset::Preset, }; @@ -250,11 +248,11 @@ impl BlockSyncService

{ P2pToSync::RemovePeer(peer_id) => { let batches_to_retry = self.sync_manager.remove_peer(&peer_id); if self.retry_sync_batches(batches_to_retry).is_err() { - features::log!(DebugP2p, "Batch could not retried while removing peer: {peer_id}"); + error!("Batch could not retried while removing peer: {peer_id}"); } } P2pToSync::RequestFailed(peer_id, request_id, error) => { - features::log!(DebugP2p, "peer {peer_id} responded on request_id: {request_id} with error: {error:?}"); + error!("peer {peer_id} responded on request_id: {request_id} with error: {error:?}"); if !self.is_forward_synced { let batches_to_retry = if let RPCError::ErrorResponse(_code, _reason) = error { @@ -264,7 +262,7 @@ impl BlockSyncService

{ }; if self.retry_sync_batches(batches_to_retry).is_err() { - features::log!(DebugP2p, "Batch could not retired when request failed"); + error!("Batch could not retired when request failed"); } } } @@ -389,7 +387,7 @@ impl BlockSyncService

{ peer_id, .. } = batch; - + SyncToP2p::ReportPeer( peer_id, PeerAction::MidToleranceError, @@ -402,29 +400,24 @@ impl BlockSyncService

{ // TODO(feature/das): we should reconstruct the batch by: // - [ ] filter out the columns that are already received or accepted, // - [x] filter out peer that are their head slot is less than start slot - SyncTarget::DataColumnSidecar(columns) => self - .sync_manager - .map_peer_custody_columns(&columns, start_slot, None, Some(peer_id)) - .map(|peer_custody_columns_mapping| { - peer_custody_columns_mapping - .into_iter() - .map(|(new_peer_id, peer_custody_columns)| { - ( - request_id, - SyncBatch { - target: SyncTarget::DataColumnSidecar( - peer_custody_columns, - ), - direction, - peer_id: new_peer_id, - start_slot, - count, - }, - ) - }) - .collect_vec() - }) - .ok(), + SyncTarget::DataColumnSidecar(columns) => Some( + self.sync_manager + .map_peer_custody_columns(&columns, start_slot, None, Some(peer_id)) + .into_iter() + .map(|(new_peer_id, peer_custody_columns)| { + ( + request_id, + SyncBatch { + target: SyncTarget::DataColumnSidecar(peer_custody_columns), + direction, + peer_id: new_peer_id, + start_slot, + count, + }, + ) + }) + .collect_vec(), + ), SyncTarget::Block | SyncTarget::BlobSidecar => self .sync_manager .random_peer_with_head_slot_filtered(start_slot) @@ -601,8 +594,7 @@ impl BlockSyncService

{ ); if slot < data_column_serve_range_slot { - features::log!( - DebugP2p, + info!( "Ignoring needed data column sidecar request: slot: {slot} < data_column_serve_range_slot: {data_column_serve_range_slot}" ); return Ok(()); @@ -614,7 +606,7 @@ impl BlockSyncService

{ let columns_indices = identifiers.iter().map(|id| id.index).collect(); let peer_custody_columns_mapping = self.sync_manager - .map_peer_custody_columns(&columns_indices, slot, peer_id, None)?; + .map_peer_custody_columns(&columns_indices, slot, peer_id, None); for (peer_id, columns) in peer_custody_columns_mapping { if !columns.is_empty() { @@ -759,12 +751,7 @@ impl BlockSyncService

{ if !was_forward_synced && is_forward_synced { SyncToP2p::SubscribeToCoreTopics.send(&self.sync_to_p2p_tx); - let head_slot = self.controller.snapshot().head_slot(); - if self - .controller - .chain_config() - .is_eip7594_fork(misc::compute_epoch_at_slot::

(head_slot)) - { + if self.controller.chain_config().is_eip7594_enabled() { SyncToP2p::SubscribeToDataColumnTopics.send(&self.sync_to_p2p_tx); } diff --git a/p2p/src/messages.rs b/p2p/src/messages.rs index 26f7aa2a..44b7c8aa 100644 --- a/p2p/src/messages.rs +++ b/p2p/src/messages.rs @@ -117,6 +117,7 @@ impl SyncToMetrics { pub enum SyncToP2p { PruneReceivedBlocks, + ReportPeer(PeerId, PeerAction, ReportSource, PeerReportReason), RequestDataColumnsByRange(RequestId, PeerId, Slot, u64, Vec), RequestDataColumnsByRoot(RequestId, PeerId, Vec), RequestBlobsByRange(RequestId, PeerId, Slot, u64), diff --git a/p2p/src/network.rs b/p2p/src/network.rs index 2fee4a72..1170fcd7 100644 --- a/p2p/src/network.rs +++ b/p2p/src/network.rs @@ -333,15 +333,9 @@ impl Network

{ } P2pMessage::DataColumnsNeeded(identifiers, slot, peer_id) => { if let Some(peer_id) = peer_id { - self.log( - Level::Debug, - format_args!("data columns needed: {identifiers:?} from {peer_id}"), - ); + debug!("data columns needed: {identifiers:?} from {peer_id}"); } else { - self.log( - Level::Debug, - format_args!("data columns needed: {identifiers:?}"), - ); + debug!("data columns needed: {identifiers:?}"); } let peer_id = self.ensure_peer_connected(peer_id); @@ -365,19 +359,14 @@ impl Network

{ self.prune_received_blob_sidecars(finalized_checkpoint.epoch); self.prune_received_block_roots(finalized_checkpoint.epoch); self.prune_received_data_column_sidecars(finalized_checkpoint.epoch); - - P2pToSync::FinalizedEpoch(self.controller.finalized_epoch()).send(&self.channels.p2p_to_sync_tx); } P2pMessage::HeadState(_state) => { // This message is only used in tests } P2pMessage::DataColumnsReconstructed(data_column_sidecars, slot) => { - self.log( - Level::Debug, - format_args!( - "propagating data column sidecars after reconstructed (indexes: [{}], slot: {slot})", - data_column_sidecars.iter().map(|c| c.index).join(", "), - ) + debug!( + "propagating data column sidecars after reconstructed (indexes: [{}], slot: {slot})", + data_column_sidecars.iter().map(|c| c.index).join(", "), ); self.publish_data_column_sidecars(data_column_sidecars); } @@ -465,9 +454,6 @@ impl Network

{ SyncToP2p::SubscribeToDataColumnTopics => { self.subscribe_to_data_column_topics(); } - SyncToP2p::PruneReceivedBlocks => { - self.received_block_roots = HashMap::new(); - } } }, @@ -602,26 +588,27 @@ impl Network

{ } fn publish_data_column_sidecars(&self, data_column_sidecars: Vec>>) { + let chain_config = self.controller.chain_config(); let messages = data_column_sidecars .into_iter() .map(|data_column_sidecar| { PubsubMessage::DataColumnSidecar(Box::new(( - misc::compute_subnet_for_data_column_sidecar(data_column_sidecar.index), + misc::compute_subnet_for_data_column_sidecar( + data_column_sidecar.index, + &chain_config, + ), data_column_sidecar, ))) }) .collect::>(); - self.log( - Level::Debug, - format_args!( - "publishing [{}]", - messages - .iter() - .map(ToString::to_string) - .collect::>() - .join("; ") - ), + debug!( + "publishing [{}]", + messages + .iter() + .map(ToString::to_string) + .collect::>() + .join("; ") ); self.publish_batch(messages); @@ -872,10 +859,7 @@ impl Network

{ self.update_peer_count(); } NetworkEvent::RPCFailed { peer_id, id, error } => { - self.log( - Level::Debug, - format_args!("request {id:?} to peer {peer_id} failed: {error}"), - ); + debug!("request {id:?} to peer {peer_id} failed: {error}"); P2pToSync::RequestFailed(peer_id, id, error).send(&self.channels.p2p_to_sync_tx); } NetworkEvent::RequestReceived { @@ -884,10 +868,7 @@ impl Network

{ request, } => { if let Err(error) = self.handle_request(peer_id, id, request) { - self.log( - Level::Warn, - format_args!("response with error: {error} while handling request: {id:?} from peer: {peer_id}"), - ); + warn!("response with error: {error} while handling request: {id:?} from peer: {peer_id}"); } } NetworkEvent::ResponseReceived { @@ -947,20 +928,6 @@ impl Network

{ Ok(()) } - Request::LightClientOptimisticUpdate => { - self.log_with_feature(format_args!( - "received LightClientOptimisticUpdate request (peer_id: {peer_id})", - )); - - Ok(()) - } - Request::LightClientFinalityUpdate => { - self.log_with_feature(format_args!( - "received LightClientFinalityUpdate request (peer_id: {peer_id})", - )); - - Ok(()) - } Request::BlobsByRange(request) => { self.handle_blobs_by_range_request(peer_id, peer_request_id, request) } @@ -990,7 +957,7 @@ impl Network

{ debug!( "sending Status response (peer_request_id: {peer_request_id:?}, peer_id: {peer_id}, \ - local: {local:?})", + local: {local:?})", ); self.respond(peer_id, peer_request_id, Response::

::Status(local)); @@ -1029,8 +996,8 @@ impl Network

{ debug!( "sending BeaconBlocksByRange response chunk \ - (peer_request_id: {peer_request_id:?}, peer_id: {peer_id}, \ - slot: {}, root: {root:?})", + (peer_request_id: {peer_request_id:?}, peer_id: {peer_id}, \ + slot: {}, root: {root:?})", block.message().slot(), ); @@ -1087,18 +1054,10 @@ impl Network

{ let network_to_service_tx = self.network_to_service_tx.clone(); - let connected_peers = self.network_globals.connected_peers(); - let target_peers = self.target_peers; - if start_slot < blobs_serve_range_slot { - log( - Level::Debug, - connected_peers, - target_peers, - format_args!( - "sending BlobSidecarsByRange response with resource unavailable error \ - (peer_request_id: {peer_request_id:?}, peer_id: {peer_id})", - ), + debug!( + "sending BlobSidecarsByRange response with resource unavailable error \ + (peer_request_id: {peer_request_id:?}, peer_id: {peer_id})", ); ServiceInboundMessage::SendErrorResponse( @@ -1114,14 +1073,9 @@ impl Network

{ let blob_sidecars = controller.blob_sidecars_by_range(start_slot..end_slot)?; for blob_sidecar in blob_sidecars { - log( - Level::Debug, - connected_peers, - target_peers, - format_args!( - "sending BlobSidecarsByRange response chunk \ + debug!( + "sending BlobSidecarsByRange response chunk \ (peer_request_id: {peer_request_id:?}, peer_id: {peer_id}, blob_sidecar: {blob_sidecar:?})", - ), ); ServiceInboundMessage::SendResponse( @@ -1132,12 +1086,7 @@ impl Network

{ .send(&network_to_service_tx); } - log( - Level::Debug, - connected_peers, - target_peers, - "terminating BlobSidecarsByRange response stream", - ); + debug!("terminating BlobSidecarsByRange response stream"); ServiceInboundMessage::SendResponse( peer_id, @@ -1160,12 +1109,7 @@ impl Network

{ peer_request_id: PeerRequestId, request: DataColumnsByRootRequest, ) { - self.log( - Level::Debug, - format_args!( - "received DataColumnsByRoot request (peer_id: {peer_id}, request: {request:?})" - ), - ); + debug!("received DataColumnsByRoot request (peer_id: {peer_id}, request: {request:?})"); let DataColumnsByRootRequest { data_column_ids } = request; @@ -1173,9 +1117,6 @@ impl Network

{ let network_to_service_tx = self.network_to_service_tx.clone(); // TODO(feature/eip7549): MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS - let connected_peers = self.network_globals.connected_peers(); - let target_peers = self.target_peers; - self.dedicated_executor .spawn(async move { // > Clients MAY limit the number of blocks and sidecars in the response. @@ -1186,14 +1127,9 @@ impl Network

{ let data_column_sidecars = controller.data_column_sidecars_by_ids(data_column_ids)?; for data_column_sidecar in data_column_sidecars { - log( - Level::Debug, - connected_peers, - target_peers, - format_args!( - "sending DataColumnsSidecarsByRoot response chunk \ + debug!( + "sending DataColumnsSidecarsByRoot response chunk \ (peer_request_id: {peer_request_id:?}, peer_id: {peer_id}, data_column_sidecar: {data_column_sidecar:?})", - ), ); ServiceInboundMessage::SendResponse( @@ -1204,12 +1140,7 @@ impl Network

{ .send(&network_to_service_tx); } - log( - Level::Debug, - connected_peers, - target_peers, - "terminating DataColumnsByRoot response stream", - ); + debug!("terminating DataColumnsByRoot response stream"); ServiceInboundMessage::SendResponse( peer_id, @@ -1229,12 +1160,7 @@ impl Network

{ peer_request_id: PeerRequestId, request: DataColumnsByRangeRequest, ) -> Result<()> { - self.log( - Level::Debug, - format_args!( - "received DataColumnsByRange request (peer_id: {peer_id}, request: {request:?})" - ), - ); + debug!("received DataColumnsByRange request (peer_id: {peer_id}, request: {request:?})"); let DataColumnsByRangeRequest { start_slot, @@ -1270,18 +1196,11 @@ impl Network

{ .min(current_slot); let network_to_service_tx = self.network_to_service_tx.clone(); - let connected_peers = self.network_globals.connected_peers(); - let target_peers = self.target_peers; if start_slot < data_column_serve_range_slot { - log( - Level::Debug, - connected_peers, - target_peers, - format_args!( - "sending DataColumnsSidecarsByRange response with resource unavailable error \ - (peer_request_id: {peer_request_id:?}, peer_id: {peer_id})", - ), + debug!( + "sending DataColumnsSidecarsByRange response with resource unavailable error \ + (peer_request_id: {peer_request_id:?}, peer_id: {peer_id})", ); ServiceInboundMessage::SendErrorResponse( @@ -1300,14 +1219,9 @@ impl Network

{ data_column_sidecars.sort_by_key(|sidecar| (sidecar.slot(), sidecar.index)); for data_column_sidecar in data_column_sidecars { - log( - Level::Debug, - connected_peers, - target_peers, - format_args!( - "sending DataColumnsSidecarsByRange response chunk \ + debug!( + "sending DataColumnsSidecarsByRange response chunk \ (peer_request_id: {peer_request_id:?}, peer_id: {peer_id}, data_column_sidecar: {data_column_sidecar:?})", - ), ); ServiceInboundMessage::SendResponse( @@ -1318,12 +1232,7 @@ impl Network

{ .send(&network_to_service_tx); } - log( - Level::Debug, - connected_peers, - target_peers, - "terminating DataColumnsByRange response stream", - ); + debug!("terminating DataColumnsByRange response stream"); ServiceInboundMessage::SendResponse( peer_id, @@ -1371,8 +1280,8 @@ impl Network

{ debug!( "sending BlobSidecarsByRoot response chunk \ - (peer_request_id: {peer_request_id:?}, peer_id: {peer_id}, \ - slot: {}, id: {blob_identifier:?})", + (peer_request_id: {peer_request_id:?}, peer_id: {peer_id}, \ + slot: {}, id: {blob_identifier:?})", blob_sidecar.slot(), ); @@ -1424,8 +1333,8 @@ impl Network

{ for block in blocks.into_iter().map(WithStatus::value) { debug!( "sending BeaconBlocksByRoot response chunk \ - (peer_request_id: {peer_request_id:?}, peer_id: {peer_id}, \ - slot: {}, root: {:?})", + (peer_request_id: {peer_request_id:?}, peer_id: {peer_id}, \ + slot: {}, root: {:?})", block.message().slot(), block.message().hash_tree_root(), ); @@ -1468,17 +1377,14 @@ impl Network

{ let blob_sidecar_slot = blob_sidecar.signed_block_header.message.slot; let blob_identifier = blob_sidecar.as_ref().into(); - self.log( - Level::Debug, - format_args!( - "received BlobsByRange response chunk \ - (request_id: {request_id}, peer_id: {peer_id}, slot: {blob_sidecar_slot}, blob: {blob_sidecar:?})", - ), + debug!( + "received BlobsByRange response chunk \ + (request_id: {request_id}, peer_id: {peer_id}, slot: {blob_sidecar_slot}, blob: {blob_sidecar:?})", ); - self.log_with_feature(format_args!( + info!( "received blob from RPC (request_id: {request_id}, peer_id: {peer_id}, slot: {blob_sidecar_slot}, blob_id: {blob_identifier:?})", - )); + ); if self.register_new_received_blob_sidecar(blob_identifier, blob_sidecar_slot) { let block_seen = self @@ -1502,12 +1408,9 @@ impl Network

{ let blob_sidecar_slot = blob_sidecar.signed_block_header.message.slot; let blob_identifier = blob_sidecar.as_ref().into(); - self.log( - Level::Debug, - format_args!( - "received BlobsByRoot response chunk \ - (request_id: {request_id}, peer_id: {peer_id}, slot: {blob_sidecar_slot}, blob: {blob_sidecar:?})", - ), + debug!( + "received BlobsByRoot response chunk \ + (request_id: {request_id}, peer_id: {peer_id}, slot: {blob_sidecar_slot}, blob: {blob_sidecar:?})", ); if self.register_new_received_blob_sidecar(blob_identifier, blob_sidecar_slot) { @@ -1532,12 +1435,9 @@ impl Network

{ let block_slot = block.message().slot(); let block_root = block.message().hash_tree_root(); - self.log( - Level::Debug, - format_args!( - "received BeaconBlocksByRange response chunk \ - (request_id: {request_id}, peer_id: {peer_id}, slot: {block_slot}, block: {block:?})", - ), + debug!( + "received BeaconBlocksByRange response chunk \ + (request_id: {request_id}, peer_id: {peer_id}, slot: {block_slot}, block: {block:?})", ); if self.register_new_received_block(block_root, block_slot) { @@ -1558,17 +1458,14 @@ impl Network

{ let block_slot = block.message().slot(); let block_root = block.message().hash_tree_root(); - self.log( - Level::Debug, - format_args!( - "received BeaconBlocksByRoot response chunk \ - (request_id: {request_id}, peer_id: {peer_id}, slot: {block_slot}, block: {block:?})", - ), + debug!( + "received BeaconBlocksByRoot response chunk \ + (request_id: {request_id}, peer_id: {peer_id}, slot: {block_slot}, block: {block:?})", ); - self.log_with_feature(format_args!( + info!( "received beacon block from RPC (request_id: {request_id}, peer_id: {peer_id}, slot: {block_slot}, root: {block_root})", - )); + ); P2pToSync::BlockByRootRequestFinished(block_root) .send(&self.channels.p2p_to_sync_tx); @@ -1600,32 +1497,13 @@ impl Network

{ // TODO(Altair Light Client Sync Protocol) debug!("received LightClientOptimisticUpdate response (peer_id: {peer_id})"); } - Response::LightClientOptimisticUpdate(_) => { - self.log( - Level::Debug, - format_args!( - "received LightClientOptimisticUpdate response chunk (peer_id: {peer_id})", - ), - ); - } - Response::LightClientFinalityUpdate(_) => { - self.log( - Level::Debug, - format_args!( - "received LightClientFinalityUpdate response chunk (peer_id: {peer_id})", - ), - ); - } Response::DataColumnsByRange(Some(data_column_sidecar)) => { let data_column_sidecar_slot = data_column_sidecar.signed_block_header.message.slot; let data_column_identifier = data_column_sidecar.as_ref().into(); - self.log( - Level::Debug, - format_args!( - "received DataColumnsByRange response chunk \ - (request_id: {request_id}, peer_id: {peer_id}, slot: {data_column_sidecar_slot}, data_column: {data_column_sidecar:?})", - ), + debug!( + "received DataColumnsByRange response chunk \ + (request_id: {request_id}, peer_id: {peer_id}, slot: {data_column_sidecar_slot}, data_column: {data_column_sidecar:?})", ); if self.register_new_received_data_column_sidecar( @@ -1637,9 +1515,7 @@ impl Network

{ } } Response::DataColumnsByRange(None) => { - self.log_with_feature(format_args!( - "peer {peer_id} terminated DataColumnsByRange response stream for request_id: {request_id}", - )); + debug!("peer {peer_id} terminated DataColumnsByRange response stream for request_id: {request_id}"); P2pToSync::DataColumnsByRangeRequestFinished(request_id) .send(&self.channels.p2p_to_sync_tx); @@ -1648,12 +1524,9 @@ impl Network

{ let data_column_sidecar_slot = data_column_sidecar.signed_block_header.message.slot; let data_column_identifier = data_column_sidecar.as_ref().into(); - self.log( - Level::Debug, - format_args!( - "received DataColumnsByRoot response chunk \ - (request_id: {request_id}, peer_id: {peer_id}, slot: {data_column_sidecar_slot}, data_column: {data_column_sidecar:?})", - ), + debug!( + "received DataColumnsByRoot response chunk \ + (request_id: {request_id}, peer_id: {peer_id}, slot: {data_column_sidecar_slot}, data_column: {data_column_sidecar:?})", ); if self.register_new_received_data_column_sidecar( @@ -1676,9 +1549,7 @@ impl Network

{ .send(&self.channels.p2p_to_sync_tx); } Response::DataColumnsByRoot(None) => { - self.log_with_feature(format_args!( - "peer {peer_id} terminated DataColumnsByRoot response stream for request_id: {request_id}", - )); + debug!("peer {peer_id} terminated DataColumnsByRoot response stream for request_id: {request_id}"); } } } @@ -1759,9 +1630,9 @@ impl Network

{ let data_column_identifier: DataColumnIdentifier = data_column_sidecar.as_ref().into(); - self.log_with_feature(format_args!( + info!( "received data column sidecar as gossip in subnet {subnet_id}: {data_column_identifier:?} from {source}", - )); + ); let chain_config = self.controller.chain_config().as_ref(); let epoch = misc::compute_epoch_at_slot::

(data_column_sidecar.slot()); @@ -1773,10 +1644,10 @@ impl Network

{ GossipId { source, message_id }, ); } else { - self.log_with_feature(format_args!( + info!( "ignoring pre-eip7594 data column sidecar in slot {} / {epoch}", data_column_sidecar.slot(), - )); + ); } } PubsubMessage::AggregateAndProofAttestation(aggregate_and_proof) => { @@ -1955,7 +1826,7 @@ impl Network

{ } } - fn check_status(&mut self, local: &StatusMessage, remote: StatusMessage, peer_id: PeerId) { + fn check_status(&self, local: &StatusMessage, remote: StatusMessage, peer_id: PeerId) { if local.fork_digest != remote.fork_digest { warn!( "local fork digest doesn't match remote fork digest \ @@ -2132,12 +2003,7 @@ impl Network

{ ), }; - self.log( - Level::Debug, - format_args!( - "sending DataColumnsByRange request (request_id: {request_id} peer_id: {peer_id}, request: {request:?})", - ), - ); + debug!("sending DataColumnsByRange request (request_id: {request_id} peer_id: {peer_id}, request: {request:?})"); self.request(peer_id, request_id, Request::DataColumnsByRange(request)); } @@ -2154,13 +2020,7 @@ impl Network

{ .collect::>(); if data_column_identifiers.is_empty() { - self.log( - Level::Debug, - format_args!( - "cannot request DataColumnSidecarsByRoot: all requested data column sidecars have been received", - ), - ); - + debug!("cannot request DataColumnSidecarsByRoot: all requested data column sidecars have been received"); return; } @@ -2170,12 +2030,7 @@ impl Network

{ .expect("length is under maximum"), ); - self.log( - Level::Debug, - format_args!( - "sending DataColumnSidecarsByRoot request (request_id: {request_id}, peer_id: {peer_id}, request: {request:?})", - ), - ); + debug!("sending DataColumnSidecarsByRoot request (request_id: {request_id}, peer_id: {peer_id}, request: {request:?})"); self.request(peer_id, request_id, Request::DataColumnsByRoot(request)); } @@ -2306,26 +2161,6 @@ impl Network

{ .is_none() } - /// Log a message with peer count information. - fn log(&self, level: Level, message: impl Display) { - log( - level, - self.network_globals.connected_peers(), - self.target_peers, - message, - ); - } - - fn log_with_feature(&self, message: impl Display) { - features::log!( - DebugP2p, - "[Peers: {}/{}] {}", - self.network_globals.connected_peers(), - self.target_peers, - message, - ); - } - fn ensure_peer_connected(&self, peer_id: Option) -> Option { peer_id .filter(|peer_id| self.network_globals.is_peer_connected(peer_id)) @@ -2355,6 +2190,7 @@ impl Network

{ ); metrics.set_collection_length( + module_path!(), &type_name, "received_data_column_sidecars", self.received_data_column_sidecars.len(), @@ -2362,30 +2198,6 @@ impl Network

{ } } - /// Checks all custody column subnets for peers. Returns `true` if there is at least one peer in - /// every custody column subnet. - fn check_good_peers_on_column_subnets(&self, epoch: Epoch) -> bool { - let chain_config = self.controller.chain_config(); - - if chain_config.is_eip7594_fork(epoch) { - self.network_globals.custody_subnets().all(|subnet_id| { - let peer_count = self - .network_globals - .peers - .read() - .good_custody_subnet_peer(subnet_id) - .count(); - - self.metrics.as_ref().map(|metrics| { - metrics.set_column_subnet_peers(&subnet_id.to_string(), peer_count) - }); - peer_count > 0 - }) - } else { - true - } - } - const fn start_of_epoch(epoch: Epoch) -> Slot { misc::compute_start_slot_at_epoch::

(epoch) } diff --git a/p2p/src/sync_manager.rs b/p2p/src/sync_manager.rs index 8084792a..870eb234 100644 --- a/p2p/src/sync_manager.rs +++ b/p2p/src/sync_manager.rs @@ -7,7 +7,7 @@ use cached::{Cached as _, TimedSizedCache}; use eth2_libp2p::{rpc::StatusMessage, NetworkGlobals, PeerId}; use helper_functions::misc; use itertools::Itertools; -use log::{log, Level}; +use log::{debug, log, warn, Level}; use prometheus_metrics::Metrics; use rand::{prelude::SliceRandom, seq::IteratorRandom as _, thread_rng}; use thiserror::Error; @@ -21,7 +21,7 @@ use types::{ }; use crate::{ - back_sync::Data, block_sync_service::SyncDirection, misc::RequestId, + block_sync_service::SyncDirection, misc::RequestId, range_and_root_requests::RangeAndRootRequests, }; @@ -42,7 +42,7 @@ impl From<&StatusMessage> for ChainId { const BATCHES_PER_PEER: usize = 1; /// TODO(feature/das): set to only 1 epoch per request because rate limiting by peer -const EPOCHS_PER_REQUEST: u64 = 1; // max 32 +const EPOCHS_PER_REQUEST: u64 = 2; // max 32 const GREEDY_MODE_BATCH_MULTIPLIER: usize = 3; const GREEDY_MODE_PEER_LIMIT: usize = 2; const MAX_SYNC_DISTANCE_IN_SLOTS: u64 = 10000; @@ -109,16 +109,13 @@ impl SyncManager { } pub fn add_peer(&mut self, peer_id: PeerId, status: StatusMessage) { - self.log( - Level::Debug, - format_args!("add peer (peer_id: {peer_id}, status: {status:?})"), - ); + debug!("add peer (peer_id: {peer_id}, status: {status:?})"); self.peers.insert(peer_id, status); } pub fn remove_peer(&mut self, peer_id: &PeerId) -> Vec<(RequestId, SyncBatch)> { - self.log_with_feature(format_args!("remove peer (peer_id: {peer_id})")); + debug!("remove peer (peer_id: {peer_id})"); self.peers.remove(peer_id); self.block_requests @@ -141,9 +138,7 @@ impl SyncManager { request_id: RequestId, batch: SyncBatch, ) { - self.log_with_feature(format_args!( - "retrying request_id: {old_request_id} with (request_id: {request_id}, batch {batch:?})", - )); + debug!("retrying request_id: {old_request_id} with (request_id: {request_id}, batch {batch:?})"); let target = batch.target.clone(); match target { @@ -204,10 +199,7 @@ impl SyncManager { count, }; - self.log( - Level::Debug, - format_args!("back sync batch built: {batch:?})"), - ); + debug!("back sync batch built: {batch:?})"); sync_batches.push(batch); @@ -216,10 +208,7 @@ impl SyncManager { } } - self.log( - Level::Debug, - format_args!("new back sync batches count: {}", sync_batches.len(),), - ); + debug!("new back sync batches count: {}", sync_batches.len()); sync_batches } @@ -237,7 +226,7 @@ impl SyncManager { let sync_start_slot = { if local_head_slot <= self.last_sync_head { - self.log(Level::Debug, "local head not progressing"); + debug!("local head not progressing"); self.sequential_redownloads += 1; redownloads_increased = true; @@ -267,24 +256,27 @@ impl SyncManager { }; if remote_head_slot <= local_head_slot { - self.log_with_feature(format_args!( + debug!( "remote peers have no new slots \ (local_head_slot: {local_head_slot}, remote_head_slot: {remote_head_slot})", - )); + ); return Ok(vec![]); } - self.log_with_feature(format_args!( - "sequential redownloads: {}", - self.sequential_redownloads - )); - self.log_with_feature(format_args!("local finalized slot: {local_finalized_slot}")); - self.log_with_feature(format_args!("local head slot: {local_head_slot}")); - self.log_with_feature(format_args!("last sync head: {}", self.last_sync_head)); - self.log_with_feature(format_args!("remote head slot: {remote_head_slot}")); - self.log_with_feature(format_args!("last sync range: {:?}", self.last_sync_range)); - self.log_with_feature(format_args!("sync start slot: {sync_start_slot}")); + self.log( + Level::Debug, + format_args!( + "sequential redownloads: {}, \ + local finalized slot: {local_finalized_slot}, \ + local head slot: {local_head_slot}, \ + last sync head: {}, \ + remote head slot: {remote_head_slot}, \ + last sync range: {:?}, \ + sync start slot: {sync_start_slot},", + self.sequential_redownloads, self.last_sync_head, self.last_sync_range, + ), + ); self.last_sync_head = local_head_slot; @@ -353,7 +345,7 @@ impl SyncManager { start_slot, Some(peer_id), None, - )?; + ); for (peer_id, columns) in peer_custody_columns_mapping { sync_batches.push(SyncBatch { @@ -409,27 +401,24 @@ impl SyncManager { batch: SyncBatch, columns: &Vec, ) { - self.log_with_feature(format_args!( + debug!( "add data column request by range (request_id: {}, peer_id: {}, range: {:?}, columns: [{}])", request_id, batch.peer_id, (batch.start_slot..(batch.start_slot + batch.count)), columns.iter().join(", "), - )); + ); self.data_column_requests .add_request_by_range(request_id, batch) } pub fn add_blob_request_by_range(&mut self, request_id: RequestId, batch: SyncBatch) { - self.log( - Level::Debug, - format_args!( - "add blob request by range (request_id: {}, peer_id: {}, range: {:?})", - request_id, - batch.peer_id, - (batch.start_slot..(batch.start_slot + batch.count)), - ), + debug!( + "add blob request by range (request_id: {}, peer_id: {}, range: {:?})", + request_id, + batch.peer_id, + (batch.start_slot..(batch.start_slot + batch.count)), ); self.blob_requests.add_request_by_range(request_id, batch) @@ -440,9 +429,7 @@ impl SyncManager { blob_identifiers: Vec, peer_id: PeerId, ) -> Vec { - self.log(Level::Debug, format_args!( - "add blobs request by root (blob_identifiers: {blob_identifiers:?}, peer_id: {peer_id})", - )); + debug!("add blobs request by root (blob_identifiers: {blob_identifiers:?}, peer_id: {peer_id})"); blob_identifiers .into_iter() @@ -455,9 +442,7 @@ impl SyncManager { data_column_identifiers: Vec, peer_id: PeerId, ) -> Vec { - self.log_with_feature(format_args!( - "add data column request by root (identifiers: {data_column_identifiers:?}, peer_id: {peer_id})", - )); + debug!("add data column request by root (identifiers: {data_column_identifiers:?}, peer_id: {peer_id})"); data_column_identifiers .into_iter() @@ -469,26 +454,18 @@ impl SyncManager { } pub fn add_block_request_by_range(&mut self, request_id: RequestId, batch: SyncBatch) { - self.log( - Level::Debug, - format_args!( - "add block request by range (request_id: {}, peer_id: {}, range: {:?})", - request_id, - batch.peer_id, - (batch.start_slot..(batch.start_slot + batch.count)), - ), + debug!( + "add block request by range (request_id: {}, peer_id: {}, range: {:?})", + request_id, + batch.peer_id, + (batch.start_slot..(batch.start_slot + batch.count)), ); self.block_requests.add_request_by_range(request_id, batch) } pub fn add_block_request_by_root(&mut self, block_root: H256, peer_id: PeerId) -> bool { - self.log( - Level::Debug, - format_args!( - "add block request by root (block_root: {block_root:?}, peer_id: {peer_id})", - ), - ); + debug!("add block request by root (block_root: {block_root:?}, peer_id: {peer_id})"); self.block_requests.add_request_by_root(block_root, peer_id) } @@ -516,10 +493,7 @@ impl SyncManager { } pub fn blobs_by_range_request_finished(&mut self, request_id: RequestId) { - self.log( - Level::Debug, - format_args!("request blob sidecars by range finished (request_id: {request_id})",), - ); + debug!("request blob sidecars by range finished (request_id: {request_id})"); self.blob_requests.request_by_range_finished(request_id) } @@ -530,12 +504,9 @@ impl SyncManager { peer_id: PeerId, request_id: RequestId, ) { - self.log( - Level::Debug, - format_args!( - "received blob sidecar by root (blob_identifier: {blob_identifier:?}, \ + debug!( + "received blob sidecar by root (blob_identifier: {blob_identifier:?}, \ request_id: {request_id}, peer_id: {peer_id})", - ), ); self.blob_requests @@ -543,25 +514,17 @@ impl SyncManager { } pub fn blocks_by_range_request_finished(&mut self, request_id: RequestId) { - self.log( - Level::Debug, - format_args!("request blocks by range finished (request_id: {request_id})"), - ); + debug!("request blocks by range finished (request_id: {request_id})"); self.block_requests.request_by_range_finished(request_id) } pub fn block_by_root_request_finished(&self, block_root: H256) { - self.log( - Level::Debug, - format_args!("request block by root finished (block_root: {block_root:?})"), - ); + debug!("request block by root finished (block_root: {block_root:?})"); } pub fn data_columns_by_range_request_finished(&mut self, request_id: RequestId) { - self.log_with_feature(format_args!( - "request data columns by range finished (request_id: {request_id:?})", - )); + debug!("request data columns by range finished (request_id: {request_id:?})"); self.data_column_requests .request_by_range_finished(request_id) @@ -573,10 +536,10 @@ impl SyncManager { peer_id: PeerId, request_id: RequestId, ) { - self.log_with_feature(format_args!( + debug!( "received data column sidecar by root (data_column_identifier: {data_column_identifier:?}, \ request_id: {request_id}, peer_id: {peer_id})", - )); + ); self.data_column_requests .chunk_by_root_received(&data_column_identifier, &peer_id) @@ -613,7 +576,7 @@ impl SyncManager { self.find_chain_to_sync().map(|chain_id| { let peers_to_sync = self.chain_peers_with_head_slot_filtered(&chain_id, &min_head_slot); - self.log_with_feature(format_args!("peers to sync count: {}", peers_to_sync.len())); + debug!("peers to sync count: {}", peers_to_sync.len()); peers_to_sync }) @@ -622,12 +585,9 @@ impl SyncManager { fn find_chain_to_sync(&mut self) -> Option { match self.chain_with_max_peer_count() { Some(chain_id) => { - self.log( - Level::Debug, - format_args!( - "selected chain to sync (finalized root {:?}, finalized epoch {})", - chain_id.finalized_root, chain_id.finalized_epoch, - ), + debug!( + "selected chain to sync (finalized root {:?}, finalized epoch {})", + chain_id.finalized_root, chain_id.finalized_epoch, ); Some(chain_id) @@ -638,7 +598,7 @@ impl SyncManager { .map(|instant| instant.elapsed() > NOT_ENOUGH_PEERS_MESSAGE_COOLDOWN) .unwrap_or(true) { - self.log(Level::Debug, "waiting for more peers to join to start sync"); + debug!("waiting for more peers to join to start sync"); self.not_enough_peers_message_shown_at = Some(Instant::now()); } @@ -765,21 +725,23 @@ impl SyncManager { min_head_slot: Slot, prioritized_peer: Option, ignore_peer: Option, - ) -> Result>> { + ) -> HashMap> { let mut peer_columns_mapping = HashMap::new(); for column_index in custody_columns { - let custodial_peer = self - .get_random_custodial_peer( - *column_index, - min_head_slot, - prioritized_peer, - ignore_peer, - ) - .ok_or(Error::NoCustodyPeers { - column_index: *column_index, - min_head_slot, - })?; + let Some(custodial_peer) = self.get_random_custodial_peer( + *column_index, + min_head_slot, + prioritized_peer, + ignore_peer, + ) else { + warn!("No custodial peer for column_index: {column_index} with head slot greater than {min_head_slot}"); + //.ok_or(Error::NoCustodyPeers { + // column_index: *column_index, + // min_head_slot, + //})?; + continue; + }; let peer_custody_columns = peer_columns_mapping .entry(custodial_peer) @@ -788,7 +750,7 @@ impl SyncManager { peer_custody_columns.push(*column_index); } - Ok(peer_columns_mapping) + peer_columns_mapping } pub fn expired_blob_range_batches( @@ -874,8 +836,9 @@ pub(crate) enum Error { #[cfg(test)] mod tests { use slog::{o, Drain}; + use std::sync::Arc; use test_case::test_case; - use types::{eip7594::CUSTODY_REQUIREMENT, phase0::primitives::H32, preset::Minimal}; + use types::{config::Config as ChainConfig, phase0::primitives::H32, preset::Minimal}; use super::*; @@ -937,7 +900,13 @@ mod tests { }; let log = build_log(slog::Level::Debug, false); - let network_globals = NetworkGlobals::new_test_globals(vec![], CUSTODY_REQUIREMENT, &log); + let chain_config = Arc::new(ChainConfig::default()); + let network_globals = NetworkGlobals::new_test_globals( + vec![], + chain_config.custody_requirement, + &log, + &chain_config, + ); let mut sync_manager = SyncManager::new(network_globals.into()); sync_manager.add_peer(PeerId::random(), peer_status); diff --git a/prometheus_metrics/src/metrics.rs b/prometheus_metrics/src/metrics.rs index dcdcb577..2cd91b46 100644 --- a/prometheus_metrics/src/metrics.rs +++ b/prometheus_metrics/src/metrics.rs @@ -63,7 +63,7 @@ pub struct Metrics { pub data_column_sidecar_kzg_verification_single: Histogram, // TODO? pub data_column_sidecar_kzg_verification_batch: Histogram, pub beacon_custody_columns_count_total: IntCounter, // TODO - + // Extra Network stats gossip_block_slot_start_delay_time: Histogram, @@ -191,7 +191,7 @@ pub struct Metrics { pub jemalloc_bytes_retained: IntGauge, // Tick delay metrics - tick_delay_times: GaugeVec, + tick_delay_times: GaugeVec, } impl Metrics { @@ -799,7 +799,7 @@ impl Metrics { tick_delay_times: GaugeVec::new( opts!("TICK_DELAY_TIMES", "Tick delay times"), &["tick"], - )?, + )?, }) } @@ -829,20 +829,19 @@ impl Metrics { self.received_aggregated_attestation_subsets.clone(), ))?; default_registry.register(Box::new(self.column_subnet_peers.clone()))?; - default_registry.register(Box::new(self.data_column_sidecars_submitted_for_processing.clone()))?; - default_registry.register(Box::new(self.verified_gossip_data_column_sidecar.clone()))?; - default_registry.register(Box::new( - self.data_column_sidecar_verification_times.clone(), - ))?; - default_registry.register(Box::new(self.reconstructed_columns.clone(),))?; default_registry.register(Box::new( - self.columns_reconstruction_time.clone(), + self.data_column_sidecars_submitted_for_processing.clone(), ))?; + default_registry.register(Box::new(self.verified_gossip_data_column_sidecar.clone()))?; default_registry.register(Box::new( - self.data_column_sidecar_computation.clone(), + self.data_column_sidecar_verification_times.clone(), ))?; + default_registry.register(Box::new(self.reconstructed_columns.clone()))?; + default_registry.register(Box::new(self.columns_reconstruction_time.clone()))?; + default_registry.register(Box::new(self.data_column_sidecar_computation.clone()))?; default_registry.register(Box::new( - self.data_column_sidecar_inclusion_proof_verification.clone(), + self.data_column_sidecar_inclusion_proof_verification + .clone(), ))?; default_registry.register(Box::new( self.data_column_sidecar_kzg_verification_single.clone(), @@ -850,9 +849,7 @@ impl Metrics { default_registry.register(Box::new( self.data_column_sidecar_kzg_verification_batch.clone(), ))?; - default_registry.register(Box::new( - self.beacon_custody_columns_count_total.clone(), - ))?; + default_registry.register(Box::new(self.beacon_custody_columns_count_total.clone()))?; default_registry.register(Box::new(self.gossip_block_slot_start_delay_time.clone()))?; default_registry.register(Box::new(self.mutator_attestations.clone()))?; default_registry.register(Box::new(self.mutator_aggregate_and_proofs.clone()))?; @@ -1107,7 +1104,7 @@ impl Metrics { } } } - + // Extra Network stats pub fn observe_block_duration_to_slot(&self, block_slot_timestamp: UnixSeconds) { match helpers::duration_from_now_to(block_slot_timestamp) { diff --git a/runtime/src/runtime.rs b/runtime/src/runtime.rs index 8d636a4e..3de35e6f 100644 --- a/runtime/src/runtime.rs +++ b/runtime/src/runtime.rs @@ -551,15 +551,17 @@ pub async fn run_after_genesis( let block_sync_database = if in_memory { Database::in_memory() } else { - Database::persistent( - "sync", - directories - .store_directory - .clone() - .unwrap_or_default() - .join("sync"), - db_size, - )? + // Database::persistent( + // "sync", + // directories + // .store_directory + // .clone() + // .unwrap_or_default() + // .join("sync"), + // db_size, + + // )? + storage_config.sync_database()? }; let mut block_sync_service = BlockSyncService::new( diff --git a/slashing_protection/src/lib.rs b/slashing_protection/src/lib.rs index 3a456536..10741251 100644 --- a/slashing_protection/src/lib.rs +++ b/slashing_protection/src/lib.rs @@ -505,7 +505,7 @@ impl SlashingProtector { .map_err(Into::into) } - fn validate_and_store_proposal( + pub fn validate_and_store_proposal( &mut self, proposal: BlockProposal, pubkey: PublicKeyBytes, diff --git a/types/src/config.rs b/types/src/config.rs index 514eb8ac..c800a795 100644 --- a/types/src/config.rs +++ b/types/src/config.rs @@ -69,6 +69,7 @@ pub struct Config { pub electra_fork_version: Version, #[serde(with = "serde_utils::string_or_native")] pub eip7594_fork_epoch: Epoch, + pub eip7594_fork_version: Version, // Time parameters #[serde(with = "serde_utils::string_or_native")] @@ -198,7 +199,8 @@ impl Default for Config { electra_fork_epoch: FAR_FUTURE_EPOCH, electra_fork_version: H32(hex!("05000000")), eip7594_fork_epoch: FAR_FUTURE_EPOCH, - + eip7594_fork_version: H32(hex!("06000000")), + // Time parameters eth1_follow_distance: 2048, min_validator_withdrawability_delay: 256, @@ -315,6 +317,7 @@ impl Config { capella_fork_version: H32(hex!("03000001")), deneb_fork_version: H32(hex!("04000001")), electra_fork_version: H32(hex!("05000001")), + eip7594_fork_version: H32(hex!("06000001")), // Time parameters eth1_follow_distance: 16, @@ -672,7 +675,7 @@ impl Config { Phase::Bellatrix => self.bellatrix_fork_epoch, Phase::Capella => self.capella_fork_epoch, Phase::Deneb => self.deneb_fork_epoch, -Phase::Electra => self.electra_fork_epoch, + Phase::Electra => self.electra_fork_epoch, } } @@ -722,6 +725,18 @@ Phase::Electra => self.electra_fork_epoch, self.eip7594_fork_epoch != FAR_FUTURE_EPOCH } + pub const fn data_column_sidecar_subnet_count(&self) -> u64 { + self.data_column_sidecar_subnet_count + } + + pub const fn samples_per_slot(&self) -> u64 { + self.samples_per_slot + } + + pub const fn custody_requirement(&self) -> u64 { + self.custody_requirement + } + fn fork_slots(&self) -> impl Iterator)> + '_ { enum_iterator::all().map(|phase| (phase, self.fork_slot::

(phase))) } diff --git a/types/src/deneb/consts.rs b/types/src/deneb/consts.rs index 02ed3dfa..9cd57137 100644 --- a/types/src/deneb/consts.rs +++ b/types/src/deneb/consts.rs @@ -1,9 +1,13 @@ +use ethereum_types::H32; use hex_literal::hex; use ssz::MerkleElements; use typenum::{U32, U6}; -use crate::{deneb::primitives::KzgCommitment, preset::Preset}; +use crate::{deneb::primitives::KzgCommitment, phase0::primitives::DomainType, preset::Preset}; +// TODO(feature/deneb): Remove `DOMAIN_BLOB_SIDECAR` and everything that uses it. +// Blob sidecars are no longer signed starting with v1.4.0-beta.5. +pub const DOMAIN_BLOB_SIDECAR: DomainType = H32(hex!("0b000000")); pub const VERSIONED_HASH_VERSION_KZG: &[u8] = &hex!("01"); // TODO(feature/deneb): Can `BlobSidecarSubnetCount` be a `const`? diff --git a/types/src/eip7594.rs b/types/src/eip7594.rs index 9d6ddaec..b45eabf9 100644 --- a/types/src/eip7594.rs +++ b/types/src/eip7594.rs @@ -19,18 +19,14 @@ type BytesPerCell = Prod; pub type CellIndex = u64; pub type RowIndex = u64; pub type ColumnIndex = u64; -pub type Cell = Box>; -pub type NumberOfColumns = U128; +pub type DataColumnSubnetId = u64; -pub type KzgCommitmentsInclusionProofDepth = U4; +pub type NumberOfColumns = U128; +type KzgCommitmentsInclusionProofDepth = U4; +pub type Cell = Box>; pub type BlobCommitmentsInclusionProof = ContiguousVector; -// TODO(feature/das): convert to type const -pub const CUSTODY_REQUIREMENT: u64 = 4; -pub const DATA_COLUMN_SIDECAR_SUBNET_COUNT: u64 = 128; -pub const SAMPLES_PER_SLOT: u64 = 8; - #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Deserialize, Serialize, Ssz)] #[serde(deny_unknown_fields)] pub struct DataColumnIdentifier { diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 8b528676..7c6b6d3e 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -19,9 +19,7 @@ derivative = { workspace = true } derive_more = { workspace = true } directories = { workspace = true } doppelganger_protection = { workspace = true } -educe = { workspace = true } eip_7594 = { workspace = true } -eth1 = { workspace = true } eth1_api = { workspace = true } eth2_libp2p = { workspace = true } features = { workspace = true } diff --git a/validator/src/validator.rs b/validator/src/validator.rs index 0f4d3a49..d68b0369 100644 --- a/validator/src/validator.rs +++ b/validator/src/validator.rs @@ -830,21 +830,6 @@ impl Validator { } }; - // Check before broadcasting to avoid slashing. See: - // - let control_flow = self - .validate_and_store_block( - &beacon_block, - &slot_head.beacon_state, - public_key.to_bytes(), - slot_head.current_epoch(), - ) - .await?; - - if control_flow.is_break() { - return Ok(()); - } - info!( "validator {} proposing beacon block with root {:?} in slot {}", proposer_index, @@ -1283,53 +1268,6 @@ impl Validator { } } - async fn validate_and_store_block( - &self, - block: &SignedBeaconBlock

, - state: &BeaconState

, - pubkey: PublicKeyBytes, - current_epoch: Epoch, - ) -> Result> { - let proposal = BlockProposal { - slot: block.message().slot(), - signing_root: Some(block.message().signing_root(&self.chain_config, state)), - }; - - debug!("validating beacon block proposal: {block:?}"); - - let validation_outcome = { - // Tracking slashing protector metrics could be moved to slashing protector methods - // but here we additionally collect locking times - let _timer = self.metrics.as_ref().map(|metrics| { - metrics - .validator_proposal_slashing_protector_times - .start_timer() - }); - - self.slashing_protector - .lock() - .await - .validate_and_store_proposal(proposal, pubkey, current_epoch)? - }; - - let control_flow = match validation_outcome { - SlashingValidationOutcome::Accept => ControlFlow::Continue(()), - SlashingValidationOutcome::Ignore => { - warn!("slashing protector ignored duplicate beacon block: {block:?}"); - ControlFlow::Break(()) - } - SlashingValidationOutcome::Reject(error) => { - warn!( - "slashing protector rejected slashable beacon block \ - (error: {error}, block: {block:?})", - ); - ControlFlow::Break(()) - } - }; - - Ok(control_flow) - } - async fn attest_gossip_block(&mut self, wait_group: &W, head: ChainLink

) -> Result<()> { let Some(last_tick) = self.last_tick else { return Ok(());