diff --git a/Cargo.lock b/Cargo.lock index 764d89b0..344b3206 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,6 +63,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" +dependencies = [ + "getrandom 0.2.17", + "once_cell", + "version_check", +] + [[package]] name = "ahash" version = "0.8.12" @@ -211,7 +222,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43d68f2d516162846c1238e755a7c4d131b892b70cc70c471a8e3ca3ed818fce" dependencies = [ - "ahash", + "ahash 0.8.12", "ark-ff 0.5.0", "ark-poly", "ark-serialize 0.5.0", @@ -358,7 +369,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "579305839da207f02b89cd1679e50e67b4331e2f9294a57693e5051b7703fe27" dependencies = [ - "ahash", + "ahash 0.8.12", "ark-ff 0.5.0", "ark-serialize 0.5.0", "ark-std 0.5.0", @@ -738,10 +749,10 @@ version = "0.72.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" dependencies = [ - "bitflags", + "bitflags 2.11.1", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -781,6 +792,12 @@ dependencies = [ "hex-conservative", ] +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.11.1" @@ -1542,7 +1559,7 @@ dependencies = [ "hashbrown 0.14.5", "lock_api", "once_cell", - "parking_lot_core", + "parking_lot_core 0.9.12", ] [[package]] @@ -1568,7 +1585,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccc2776f0c61eca1ca32528f85548abd1a4be8fb53d1b21c013e4f18da1e7090" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.117", ] [[package]] @@ -1754,7 +1771,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e0e367e4e7da84520dedcac1901e4da967309406d1e51017ae1abfb97adbd38" dependencies = [ - "bitflags", + "bitflags 2.11.1", "block2", "libc", "objc2", @@ -2103,6 +2120,8 @@ dependencies = [ "ethlambda-network-api", "ethlambda-storage", "ethlambda-types", + "ethp2p-broadcast", + "ethp2p-transport", "ethrex-common", "ethrex-p2p", "ethrex-rlp", @@ -2113,6 +2132,7 @@ dependencies = [ "libssz-derive", "libssz-merkle", "libssz-types", + "prost 0.13.5", "rand 0.8.6", "sha2", "snap", @@ -2208,6 +2228,48 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "ethp2p-broadcast" +version = "0.0.0" +source = "git+https://github.com/lambdaclass/ethp2p-rs?rev=ba3ed8b490a853a54343741f5d1def158e1bb82c#ba3ed8b490a853a54343741f5d1def158e1bb82c" +dependencies = [ + "bytes", + "ethp2p-protocol", + "futures", + "prost 0.13.5", + "prost-build", + "reed-solomon-erasure", + "sha2", + "tokio", + "tokio-stream", + "tracing", +] + +[[package]] +name = "ethp2p-protocol" +version = "0.0.0" +source = "git+https://github.com/lambdaclass/ethp2p-rs?rev=ba3ed8b490a853a54343741f5d1def158e1bb82c#ba3ed8b490a853a54343741f5d1def158e1bb82c" +dependencies = [ + "prost 0.13.5", + "prost-build", +] + +[[package]] +name = "ethp2p-transport" +version = "0.0.0" +source = "git+https://github.com/lambdaclass/ethp2p-rs?rev=ba3ed8b490a853a54343741f5d1def158e1bb82c#ba3ed8b490a853a54343741f5d1def158e1bb82c" +dependencies = [ + "ethp2p-broadcast", + "futures", + "prost 0.13.5", + "quinn", + "rcgen", + "rustls", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "ethrex-blockchain" version = "8.0.0" @@ -2386,7 +2448,7 @@ dependencies = [ "ethrex-rlp", "ethrex-trie", "hex", - "lru", + "lru 0.16.4", "qfilter", "rayon", "rustc-hash", @@ -2573,6 +2635,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.1.9" @@ -2836,7 +2904,7 @@ version = "0.20.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b88256088d75a56f8ecfa070513a775dd9107f6530ef14919dac831af9cfe2b" dependencies = [ - "bitflags", + "bitflags 2.11.1", "libc", "libgit2-sys", "log", @@ -2900,6 +2968,15 @@ dependencies = [ "byteorder", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.8", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -3038,7 +3115,7 @@ dependencies = [ "ipconfig", "moka", "once_cell", - "parking_lot", + "parking_lot 0.12.5", "rand 0.9.4", "resolv-conf", "smallvec", @@ -3439,7 +3516,7 @@ version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90807d610575744524d9bdc69f3885d96f0e6c3354565b0828354a7ff2a262b8" dependencies = [ - "ahash", + "ahash 0.8.12", "clap", "crossbeam-channel 0.5.15", "crossbeam-utils 0.8.21", @@ -3464,6 +3541,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if 1.0.4", +] + [[package]] name = "ipconfig" version = "0.3.4" @@ -3929,7 +4015,7 @@ dependencies = [ "multiaddr", "multihash", "multistream-select", - "parking_lot", + "parking_lot 0.12.5", "pin-project", "quick-protobuf", "rand 0.8.6", @@ -3970,7 +4056,7 @@ dependencies = [ "hickory-resolver", "libp2p-core", "libp2p-identity", - "parking_lot", + "parking_lot 0.12.5", "smallvec", "tracing", ] @@ -4440,7 +4526,7 @@ dependencies = [ "futures-rustls", "libp2p-core", "libp2p-identity", - "parking_lot", + "parking_lot 0.12.5", "pin-project-lite", "rw-stream-sink", "soketto", @@ -4607,6 +4693,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a" +dependencies = [ + "hashbrown 0.12.3", +] + [[package]] name = "lru" version = "0.16.4" @@ -4800,7 +4895,7 @@ dependencies = [ "crossbeam-epoch 0.9.18", "crossbeam-utils 0.8.21", "equivalent", - "parking_lot", + "parking_lot 0.12.5", "portable-atomic", "smallvec", "tagptr", @@ -4837,7 +4932,7 @@ source = "git+https://github.com/leanEthereum/leanVM.git?rev=e2592df#e2592df4e30 dependencies = [ "itertools 0.14.0", "mt-utils", - "num-bigint 0.3.3", + "num-bigint 0.4.6", "parallel", "paste", "rand 0.10.1", @@ -4853,7 +4948,7 @@ dependencies = [ "itertools 0.14.0", "mt-field", "mt-utils", - "num-bigint 0.3.3", + "num-bigint 0.4.6", "paste", "rand 0.10.1", "serde", @@ -4969,6 +5064,12 @@ dependencies = [ "unsigned-varint", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "multistream-select" version = "0.14.0" @@ -5017,7 +5118,7 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ce3636fa715e988114552619582b530481fd5ef176a1e5c1bf024077c2c9445" dependencies = [ - "bitflags", + "bitflags 2.11.1", "libc", "log", "netlink-packet-core", @@ -5056,7 +5157,7 @@ version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" dependencies = [ - "bitflags", + "bitflags 2.11.1", "cfg-if 1.0.4", "cfg_aliases", "libc", @@ -5068,7 +5169,7 @@ version = "0.31.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf20d2fde8ff38632c426f1165ed7436270b44f199fc55284c38276f9db47c3d" dependencies = [ - "bitflags", + "bitflags 2.11.1", "cfg-if 1.0.4", "cfg_aliases", "libc", @@ -5249,7 +5350,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3e0adef53c21f888deb4fa59fc59f7eb17404926ee8a6f59f5df0fd7f9f3272" dependencies = [ - "bitflags", + "bitflags 2.11.1", "objc2", ] @@ -5667,6 +5768,17 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -5674,7 +5786,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.12", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if 1.0.4", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -5685,7 +5811,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if 1.0.4", "libc", - "redox_syscall", + "redox_syscall 0.5.18", "smallvec", "windows-link", ] @@ -5764,6 +5890,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pin-project" version = "1.1.13" @@ -5889,7 +6025,7 @@ dependencies = [ "inferno", "num", "paste", - "prost", + "prost 0.14.3", ] [[package]] @@ -5972,7 +6108,7 @@ dependencies = [ "fnv", "lazy_static", "memchr", - "parking_lot", + "parking_lot 0.12.5", "protobuf", "thiserror 2.0.18", ] @@ -5985,7 +6121,7 @@ checksum = "cca3d75b4566b9a29fe1ed623587fb058e826eb329a0be4b7c4da1ebb2d7a6ca" dependencies = [ "dtoa", "itoa", - "parking_lot", + "parking_lot 0.12.5", "prometheus-client-derive-encode", ] @@ -6008,7 +6144,7 @@ checksum = "4b45fcc2344c680f5025fe57779faef368840d0bd1f42f216291f0dc4ace4744" dependencies = [ "bit-set", "bit-vec", - "bitflags", + "bitflags 2.11.1", "num-traits", "rand 0.9.4", "rand_chacha 0.9.0", @@ -6019,6 +6155,16 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive 0.13.5", +] + [[package]] name = "prost" version = "0.14.3" @@ -6026,7 +6172,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.14.3", +] + +[[package]] +name = "prost-build" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +dependencies = [ + "heck", + "itertools 0.14.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost 0.13.5", + "prost-types", + "regex", + "syn 2.0.117", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.117", ] [[package]] @@ -6036,12 +6215,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", ] +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost 0.13.5", +] + [[package]] name = "protobuf" version = "3.7.2" @@ -6413,13 +6601,35 @@ dependencies = [ "zk-alloc", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.11.1", +] + +[[package]] +name = "reed-solomon-erasure" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f" +dependencies = [ + "libm", + "lru 0.7.8", + "parking_lot 0.11.2", + "smallvec", + "spin 0.9.8", ] [[package]] @@ -6709,7 +6919,7 @@ version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ - "bitflags", + "bitflags 2.11.1", "errno", "libc", "linux-raw-sys", @@ -7517,7 +7727,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" dependencies = [ - "bitflags", + "bitflags 2.11.1", "core-foundation", "system-configuration-sys", ] @@ -7559,7 +7769,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.3.4", + "getrandom 0.4.2", "once_cell", "rustix", "windows-sys 0.61.2", @@ -7740,7 +7950,7 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", + "parking_lot 0.12.5", "pin-project-lite", "signal-hook-registry", "socket2 0.6.3", @@ -7847,7 +8057,7 @@ version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ - "bitflags", + "bitflags 2.11.1", "bytes", "futures-util", "http", @@ -8301,7 +8511,7 @@ version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags", + "bitflags 2.11.1", "hashbrown 0.15.5", "indexmap", "semver 1.0.28", @@ -8795,7 +9005,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags", + "bitflags 2.11.1", "indexmap", "log", "serde", @@ -8899,7 +9109,7 @@ dependencies = [ "futures", "log", "nohash-hasher", - "parking_lot", + "parking_lot 0.12.5", "pin-project", "rand 0.8.6", "static_assertions", @@ -8914,7 +9124,7 @@ dependencies = [ "futures", "log", "nohash-hasher", - "parking_lot", + "parking_lot 0.12.5", "pin-project", "rand 0.9.4", "static_assertions", diff --git a/crates/net/p2p/Cargo.toml b/crates/net/p2p/Cargo.toml index d766b6a8..1675cdc0 100644 --- a/crates/net/p2p/Cargo.toml +++ b/crates/net/p2p/Cargo.toml @@ -45,5 +45,23 @@ libssz-types.workspace = true sha2 = "0.10" +# ethp2p-rs erasure-coded broadcast (experimental, off by default). +# Pinned to a github.com/lambdaclass/ethp2p-rs commit. ethp2p-rs is an +# INTERNAL repo: building with the `ethp2p` feature requires git credentials +# for it (CI already sets `CARGO_NET_GIT_FETCH_WITH_CLI=true`) plus `protoc` +# on the build host (ethp2p-broadcast compiles its .proto via prost-build). +# The default (feature-off) build needs neither. +ethp2p-broadcast = { git = "https://github.com/lambdaclass/ethp2p-rs", rev = "ba3ed8b490a853a54343741f5d1def158e1bb82c", optional = true } +ethp2p-transport = { git = "https://github.com/lambdaclass/ethp2p-rs", rev = "ba3ed8b490a853a54343741f5d1def158e1bb82c", optional = true } +# Needed to serialize the RS preamble (a prost message) before publishing. +# Version-matched to ethp2p-broadcast's prost so cargo unifies them. +prost = { version = "0.13", optional = true } + +[features] +# Experimental: also broadcast gossip via ethp2p-rs erasure-coded +# broadcast over a parallel QUIC network, alongside libp2p gossipsub. +# Off by default; ethlambda <-> ethlambda only. Requires `protoc` to build. +ethp2p = ["dep:ethp2p-broadcast", "dep:ethp2p-transport", "dep:prost"] + [dev-dependencies] hex.workspace = true diff --git a/crates/net/p2p/src/ethp2p/mod.rs b/crates/net/p2p/src/ethp2p/mod.rs new file mode 100644 index 00000000..0030be46 --- /dev/null +++ b/crates/net/p2p/src/ethp2p/mod.rs @@ -0,0 +1,454 @@ +//! Experimental ethp2p-rs erasure-coded broadcast adapter. +//! +//! This module is **feature-gated** behind `ethp2p` and **off by +//! default**. It lets ethlambda *also* broadcast gossip through +//! [ethp2p-rs](https://github.com/lambdaclass/ethp2p-rs)'s Reed-Solomon +//! broadcast engine, over a **parallel QUIC network** alongside libp2p +//! gossipsub (which is unaffected). It is ethlambda↔ethlambda only and +//! makes no spec-conformance or interop claim — see +//! `docs/ethlambda-integration-plan.md` in ethp2p-rs. +//! +//! This module wires the broadcast engine into ethlambda's P2P actor: +//! the publish handlers tee gossip here, a background task drives the +//! engine and forwards reconstructed messages back into the actor, and +//! peers are derived from the static bootnode set. It has been validated +//! by compilation + the isolated round-trip test below; end-to-end +//! validation on a multi-node devnet is the remaining step. +//! +//! ## Model +//! +//! ethlambda uses **static bootnodes** (no dynamic discovery), so the peer +//! set is known at startup: [`Ethp2pBroadcast::start`] binds a QUIC +//! endpoint, dials the known peers, and subscribes the gossip channels. +//! Inbound peers register themselves via the QUIC transport's stream +//! preface, so only one side needs to dial. + +use std::fmt; +use std::io; +use std::net::SocketAddr; + +use ethlambda_network_api::P2PToBlockChainRef; +use ethlambda_types::attestation::{SignedAggregatedAttestation, SignedAttestation}; +use ethlambda_types::block::SignedBlock; +use ethp2p_broadcast::engine::{DeliveredMessage, Engine, StepResult, rs_relay_factory}; +use ethp2p_broadcast::strategy::config::RsConfig; +use ethp2p_broadcast::strategy::rs::encode::encode as rs_encode; +use ethp2p_broadcast::strategy::rs::state::RsStrategy; +use ethp2p_transport::QuicNet; +use libssz::SszDecode; +use prost::Message as _; +use sha2::{Digest, Sha256}; +use spawned_concurrency::tasks::ActorRef; +use tokio::sync::mpsc; +use tracing::{error, info, trace, warn}; + +use crate::gossipsub::decompress_message; +use crate::{P2PServer, WrappedEthp2pDelivery}; + +/// ethp2p channel ids — mirror the gossipsub topic kinds. +pub(crate) const CHANNEL_BLOCK: &str = "block"; +pub(crate) const CHANNEL_AGGREGATION: &str = "aggregation"; +pub(crate) const CHANNEL_ATTESTATION: &str = "attestation"; + +/// The set of channels every ethlambda node subscribes to. +pub(crate) fn all_channels() -> Vec { + vec![ + CHANNEL_BLOCK.to_string(), + CHANNEL_AGGREGATION.to_string(), + CHANNEL_ATTESTATION.to_string(), + ] +} + +/// Capacity of the reconstructed-message delivery channel. +const DELIVERY_CAPACITY: usize = 256; + +/// Derive a stable ethp2p peer id (`u64`) from a secp256k1 compressed +/// public key: the first 8 bytes of its SHA-256, big-endian. +/// +/// Deterministic across restarts (same key → same id). ethp2p uses a +/// `u64` peer-id space, distinct from libp2p's multi-byte `PeerId`; this +/// is the bridge. Collision probability is negligible at devnet scale. +#[must_use] +pub fn derive_peer_id(compressed_pubkey: &[u8]) -> u64 { + let digest = Sha256::digest(compressed_pubkey); + let mut bytes = [0_u8; 8]; + bytes.copy_from_slice(&digest[..8]); + u64::from_be_bytes(bytes) +} + +/// Errors from the broadcast adapter. +#[derive(Debug)] +pub enum Ethp2pError { + /// Reed-Solomon encoding of the payload failed. + Encode(String), + /// The broadcast engine rejected the operation. + Engine(String), +} + +impl fmt::Display for Ethp2pError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Encode(e) => write!(f, "ethp2p encode: {e}"), + Self::Engine(e) => write!(f, "ethp2p engine: {e}"), + } + } +} + +impl std::error::Error for Ethp2pError {} + +/// A handle to the ethp2p broadcast engine running over a parallel QUIC +/// network. Driven by the caller via [`Ethp2pBroadcast::run_one_step`]. +pub struct Ethp2pBroadcast { + engine: Engine, + config: RsConfig, + local_addr: SocketAddr, +} + +impl fmt::Debug for Ethp2pBroadcast { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Ethp2pBroadcast") + .field("local_addr", &self.local_addr) + .finish_non_exhaustive() + } +} + +impl Ethp2pBroadcast { + /// Bind a QUIC endpoint for `local_peer`, then for each mesh peer + /// `(id, addr)`: QUIC-dial it if `addr` is `Some` (peers with `None` + /// are expected to dial us — the QUIC connection is bidirectional, so + /// only one side dials), and engine-connect *all* of them so each node + /// announces its channel subscriptions via the BCAST handshake. + /// Finally subscribe the given `channels`. + /// + /// Returns the handle plus the receiver of reconstructed + /// [`DeliveredMessage`]s; the caller forwards those into the same + /// pipeline gossipsub feeds (Phase 2). + pub async fn start( + local_peer: u64, + bind_addr: SocketAddr, + peers: &[(u64, Option)], + channels: &[String], + config: RsConfig, + ) -> io::Result<(Self, mpsc::Receiver)> { + let net = QuicNet::bind(local_peer, bind_addr)?; + let local_addr = net.local_addr()?; + + // QUIC-level dial of peers we're responsible for dialing; peers + // with no address dial us and self-register via the stream preface. + for (peer, addr) in peers { + if let Some(addr) = addr { + net.connect(*peer, *addr).await?; + } + } + + let (delivered_tx, delivered_rx) = mpsc::channel(DELIVERY_CAPACITY); + let mut engine = Engine::new(local_peer, net, delivered_tx); + + for channel in channels { + engine + .subscribe(channel.clone(), rs_relay_factory(config)) + .map_err(|e| io::Error::other(e.to_string()))?; + } + // Engine-level handshake to every mesh peer (announces our + // subscriptions so peers will dispatch chunks to us, and vice + // versa). Fire-and-forget: the transport delivers once the QUIC + // connection is established in either direction. + for (peer, _) in peers { + engine + .connect(*peer) + .map_err(|e| io::Error::other(e.to_string()))?; + } + + Ok(( + Self { + engine, + config, + local_addr, + }, + delivered_rx, + )) + } + + /// The QUIC socket address this endpoint is bound to (useful when + /// binding to an ephemeral port). + #[must_use] + pub fn local_addr(&self) -> SocketAddr { + self.local_addr + } + + /// Publish an opaque payload (e.g. an ssz+snappy gossip blob) on + /// `channel` under `message_id`. Wraps the Reed-Solomon encode + + /// origin-session setup the engine requires. + pub fn publish_bytes( + &mut self, + channel: &str, + message_id: &str, + payload: &[u8], + ) -> Result<(), Ethp2pError> { + let (preamble, _shards) = + rs_encode(payload, &self.config).map_err(|e| Ethp2pError::Encode(format!("{e:?}")))?; + let mut preamble_bytes = Vec::with_capacity(preamble.encoded_len()); + preamble + .encode(&mut preamble_bytes) + .map_err(|e| Ethp2pError::Encode(e.to_string()))?; + let strategy = RsStrategy::new_origin(payload, self.config) + .map_err(|e| Ethp2pError::Encode(format!("{e:?}")))?; + self.engine + .publish( + &channel.to_string(), + message_id.to_string(), + strategy, + preamble_bytes, + ) + .map_err(|e| Ethp2pError::Engine(e.to_string())) + } + + /// Process one inbound network event (drive the engine forward). + /// Callers loop this; reconstructed messages arrive on the receiver + /// returned by [`Ethp2pBroadcast::start`]. + /// + /// # Errors + /// Returns the engine's error if event processing fails. + pub async fn run_one_step(&mut self) -> Result { + self.engine + .run_one_step() + .await + .map_err(|e| Ethp2pError::Engine(e.to_string())) + } +} + +/// Parameters to construct the broadcast engine, computed in `build_swarm` +/// and carried to `P2P::spawn` where the engine task is spawned. +pub(crate) struct Ethp2pParams { + pub(crate) local_peer: u64, + pub(crate) bind_addr: SocketAddr, + pub(crate) peers: Vec<(u64, Option)>, + pub(crate) channels: Vec, + pub(crate) config: RsConfig, +} + +impl Ethp2pParams { + /// Construct with the default channel set and Reed-Solomon config. + pub(crate) fn new( + local_peer: u64, + bind_addr: SocketAddr, + peers: Vec<(u64, Option)>, + ) -> Self { + Self { + local_peer, + bind_addr, + peers, + channels: all_channels(), + config: RsConfig::default(), + } + } +} + +/// A request to publish a gossip payload over ethp2p, sent from the +/// P2PServer's publish handlers to the engine task. +pub(crate) struct PublishCmd { + pub(crate) channel: String, + pub(crate) message_id: String, + pub(crate) payload: Vec, +} + +/// Stable per-message id for an ethp2p session: hex of the SSZ bytes' +/// SHA-256. Unique per message and independent of the transport. +pub(crate) fn message_id(ssz_bytes: &[u8]) -> String { + let digest = Sha256::digest(ssz_bytes); + digest.iter().map(|b| format!("{b:02x}")).collect() +} + +/// Long-lived task that owns the broadcast engine: drives it forward, +/// services publish requests, and forwards reconstructed messages into +/// the P2P actor (so they flow through the same consensus pipeline as +/// gossipsub). +pub(crate) async fn run_engine_task( + params: Ethp2pParams, + mut publish_rx: mpsc::UnboundedReceiver, + actor: ActorRef, +) { + let (mut broadcast, mut delivered_rx) = match Ethp2pBroadcast::start( + params.local_peer, + params.bind_addr, + ¶ms.peers, + ¶ms.channels, + params.config, + ) + .await + { + Ok(pair) => pair, + Err(e) => { + error!(%e, "ethp2p: failed to start broadcast engine; ethp2p disabled"); + return; + } + }; + info!( + local_peer = params.local_peer, + peers = params.peers.len(), + bind = %params.bind_addr, + "ethp2p broadcast engine started" + ); + + loop { + tokio::select! { + r = broadcast.run_one_step() => { + if let Err(e) = r { + warn!(%e, "ethp2p: run_one_step error"); + } + } + cmd = publish_rx.recv() => { + match cmd { + Some(cmd) => { + if let Err(e) = + broadcast.publish_bytes(&cmd.channel, &cmd.message_id, &cmd.payload) + { + warn!(%e, channel = %cmd.channel, "ethp2p: publish failed"); + } + } + // Publish sender dropped (the P2P actor stopped). The + // engine has no driver left; shut the task down rather + // than spin. + None => { + info!("ethp2p: publish channel closed; stopping engine task"); + break; + } + } + } + delivered = delivered_rx.recv() => { + match delivered { + Some(delivered) => { + if let Err(e) = actor + .recipient::() + .send(WrappedEthp2pDelivery(delivered)) + { + warn!(%e, "ethp2p: failed to forward delivery to P2P actor"); + } + } + None => { + warn!("ethp2p: delivery channel closed; stopping engine task"); + break; + } + } + } + } + } +} + +/// Decode an ethp2p-reconstructed payload (snappy-compressed SSZ, same +/// wire form as gossipsub) and hand it to the same blockchain handlers +/// the gossipsub path uses. Dual delivery is safe: block import and +/// attestation handling are idempotent. +pub(crate) fn dispatch_delivered(blockchain: &P2PToBlockChainRef, channel: &str, payload: &[u8]) { + let uncompressed = match decompress_message(payload) { + Ok(bytes) => bytes, + Err(e) => { + error!(%e, channel, "ethp2p: failed to decompress delivered payload"); + return; + } + }; + match channel { + CHANNEL_BLOCK => match SignedBlock::from_ssz_bytes(&uncompressed) { + Ok(block) => { + let _ = blockchain + .new_block(block) + .inspect_err(|e| error!(%e, "ethp2p: failed to forward block to blockchain")); + } + Err(e) => error!(?e, "ethp2p: failed to decode delivered block"), + }, + CHANNEL_AGGREGATION => match SignedAggregatedAttestation::from_ssz_bytes(&uncompressed) { + Ok(agg) => { + let _ = blockchain.new_aggregated_attestation(agg).inspect_err( + |e| error!(%e, "ethp2p: failed to forward aggregation to blockchain"), + ); + } + Err(e) => error!(?e, "ethp2p: failed to decode delivered aggregation"), + }, + CHANNEL_ATTESTATION => match SignedAttestation::from_ssz_bytes(&uncompressed) { + Ok(att) => { + let _ = blockchain.new_attestation(att).inspect_err( + |e| error!(%e, "ethp2p: failed to forward attestation to blockchain"), + ); + } + Err(e) => error!(?e, "ethp2p: failed to decode delivered attestation"), + }, + other => trace!(channel = other, "ethp2p: delivered on unknown channel"), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::{Ipv4Addr, SocketAddr}; + use std::time::Duration; + + fn loopback() -> SocketAddr { + SocketAddr::from((Ipv4Addr::LOCALHOST, 0)) + } + + fn pseudorandom(len: usize, seed: u64) -> Vec { + let mut state = seed.wrapping_add(0x9E37_79B9_7F4A_7C15); + let mut out = Vec::with_capacity(len); + for _ in 0..len { + state ^= state << 13; + state ^= state >> 7; + state ^= state << 17; + out.push((state & 0xff) as u8); + } + out + } + + #[test] + fn peer_id_derivation_is_stable_and_distinct() { + let key_a = [1_u8; 33]; + let key_b = [2_u8; 33]; + assert_eq!(derive_peer_id(&key_a), derive_peer_id(&key_a)); + assert_ne!(derive_peer_id(&key_a), derive_peer_id(&key_b)); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn round_trip_block_sized_payload_over_quic() { + let config = RsConfig::default(); + let channels = vec!["block".to_string()]; + let payload = pseudorandom(64 * 1024, 0xDEAD_BEEF); + + // Relay binds and announces to origin (origin will dial it). + let (mut relay, mut relay_rx) = + Ethp2pBroadcast::start(2, loopback(), &[(1, None)], &channels, config) + .await + .expect("relay start"); + let relay_addr = relay.local_addr(); + let (mut origin, mut _origin_rx) = + Ethp2pBroadcast::start(1, loopback(), &[(2, Some(relay_addr))], &channels, config) + .await + .expect("origin start"); + + let driver = async { + let mut published = false; + let mut steps = 0_u32; + loop { + tokio::select! { + r = origin.run_one_step() => { r.expect("origin step"); } + r = relay.run_one_step() => { r.expect("relay step"); } + Some(msg) = relay_rx.recv() => return msg, + } + steps += 1; + if !published && steps >= 2 { + origin + .publish_bytes("block", "msg-1", &payload) + .expect("publish"); + published = true; + } + } + }; + + match tokio::time::timeout(Duration::from_secs(20), driver).await { + Ok(msg) => { + assert_eq!(msg.channel_id, "block"); + assert_eq!(msg.message_id, "msg-1"); + assert_eq!(msg.payload, payload, "payload must round-trip over QUIC"); + } + Err(_) => panic!("round trip timed out after 20s"), + } + } +} diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index c257006b..045aaa66 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -154,7 +154,19 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte .cloned() .unwrap_or_else(|| attestation_subnet_topic(subnet_id)); + #[cfg(feature = "ethp2p")] + let ethp2p_payload = compressed.clone(); + server.swarm_handle.publish(topic, compressed); + + #[cfg(feature = "ethp2p")] + if let Some(tx) = server.ethp2p_publish.as_ref() { + let _ = tx.send(crate::ethp2p::PublishCmd { + channel: crate::ethp2p::CHANNEL_ATTESTATION.to_string(), + message_id: crate::ethp2p::message_id(&ssz_bytes), + payload: ethp2p_payload, + }); + } info!( %slot, validator, @@ -182,10 +194,24 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) { metrics::observe_gossip_block_size(ssz_bytes.len(), compressed.len()); + // Experimental: also broadcast the same bytes via ethp2p (no-op + // unless the `ethp2p` feature is built and the engine is running). + #[cfg(feature = "ethp2p")] + let ethp2p_payload = compressed.clone(); + // Publish to gossipsub server .swarm_handle .publish(server.block_topic.clone(), compressed); + + #[cfg(feature = "ethp2p")] + if let Some(tx) = server.ethp2p_publish.as_ref() { + let _ = tx.send(crate::ethp2p::PublishCmd { + channel: crate::ethp2p::CHANNEL_BLOCK.to_string(), + message_id: crate::ethp2p::message_id(&ssz_bytes), + payload: ethp2p_payload, + }); + } info!( %slot, proposer, @@ -210,10 +236,22 @@ pub async fn publish_aggregated_attestation( metrics::observe_gossip_aggregation_size(ssz_bytes.len(), compressed.len()); + #[cfg(feature = "ethp2p")] + let ethp2p_payload = compressed.clone(); + // Publish to the aggregation topic server .swarm_handle .publish(server.aggregation_topic.clone(), compressed); + + #[cfg(feature = "ethp2p")] + if let Some(tx) = server.ethp2p_publish.as_ref() { + let _ = tx.send(crate::ethp2p::PublishCmd { + channel: crate::ethp2p::CHANNEL_AGGREGATION.to_string(), + message_id: crate::ethp2p::message_id(&ssz_bytes), + payload: ethp2p_payload, + }); + } info!( %slot, target_slot = attestation.data.target.slot, diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index de34e469..ce7547cc 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -48,6 +48,8 @@ use crate::{ swarm_adapter::SwarmHandle, }; +#[cfg(feature = "ethp2p")] +pub mod ethp2p; mod gossipsub; pub mod metrics; mod req_resp; @@ -180,6 +182,17 @@ pub struct BuiltSwarm { pub(crate) block_topic: libp2p::gossipsub::IdentTopic, pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic, pub(crate) bootnode_addrs: HashMap, + /// Parameters for the experimental ethp2p broadcast engine (Phase 2). + #[cfg(feature = "ethp2p")] + pub(crate) ethp2p: crate::ethp2p::Ethp2pParams, +} + +/// Extract the 33-byte compressed secp256k1 public key from a libp2p +/// identity public key, for deriving a stable ethp2p `u64` peer id. +#[cfg(feature = "ethp2p")] +fn secp256k1_compressed(pk: &PublicKey) -> Option<[u8; 33]> { + let secp: secp256k1::PublicKey = pk.clone().try_into().ok()?; + Some(secp.to_bytes()) } /// Build and configure the libp2p swarm, dial bootnodes, subscribe to topics. @@ -237,6 +250,12 @@ pub fn build_swarm( secp256k1::SecretKey::try_from_bytes(config.node_key).expect("invalid node key"); let identity = libp2p::identity::Keypair::from(secp256k1::Keypair::from(secret_key)); + // ethp2p: derive this node's u64 broadcast peer id from its secp256k1 key. + #[cfg(feature = "ethp2p")] + let ethp2p_local_peer = crate::ethp2p::derive_peer_id( + &secp256k1_compressed(&identity.public()).expect("local node key is secp256k1"), + ); + // Use the same `protocol_version` string as zeam let identify = libp2p::identify::Behaviour::new(libp2p::identify::Config::new( "/ipfs/0.1.0".to_owned(), @@ -263,11 +282,26 @@ pub fn build_swarm( .build(); let local_peer_id = *swarm.local_peer_id(); let mut bootnode_addrs = HashMap::new(); + // ethp2p: parallel QUIC mesh peers, derived from the same bootnodes. + // Each peer's ethp2p QUIC port is the gossipsub QUIC port + 1; we dial + // them (Some(addr)) since ethlambda's bootnode topology is static. + #[cfg(feature = "ethp2p")] + let mut ethp2p_peers: Vec<(u64, Option)> = Vec::new(); for bootnode in config.bootnodes { let peer_id = PeerId::from_public_key(&bootnode.public_key); if peer_id == local_peer_id { continue; } + #[cfg(feature = "ethp2p")] + if let Some(pubkey) = secp256k1_compressed(&bootnode.public_key) { + ethp2p_peers.push(( + crate::ethp2p::derive_peer_id(&pubkey), + Some(SocketAddr::new( + bootnode.ip, + bootnode.quic_port.wrapping_add(1), + )), + )); + } let addr = Multiaddr::empty() .with(bootnode.ip.into()) .with(Protocol::Udp(bootnode.quic_port)) @@ -338,6 +372,17 @@ pub fn build_swarm( info!(socket=%config.listening_socket, "P2P node started"); + // ethp2p: bind the parallel QUIC endpoint on gossipsub port + 1. + #[cfg(feature = "ethp2p")] + let ethp2p = crate::ethp2p::Ethp2pParams::new( + ethp2p_local_peer, + SocketAddr::new( + config.listening_socket.ip(), + config.listening_socket.port().wrapping_add(1), + ), + ethp2p_peers, + ); + Ok(BuiltSwarm { swarm, attestation_topics, @@ -345,6 +390,8 @@ pub fn build_swarm( block_topic, aggregation_topic, bootnode_addrs, + #[cfg(feature = "ethp2p")] + ethp2p, }) } @@ -361,6 +408,10 @@ impl P2P { let (swarm_stream, swarm_handle) = swarm_adapter::start_swarm_adapter(built.swarm, node_names.clone()); + #[cfg(feature = "ethp2p")] + let (ethp2p_tx, ethp2p_rx) = + tokio::sync::mpsc::unbounded_channel::(); + let server = P2PServer { swarm_handle, store, @@ -375,9 +426,21 @@ impl P2P { range_sync_state: None, bootnode_addrs: built.bootnode_addrs, node_names, + #[cfg(feature = "ethp2p")] + ethp2p_publish: Some(ethp2p_tx), }; let handle = server.start(); spawn_listener(handle.context(), swarm_stream.map(WrappedSwarmEvent)); + + // Spawn the experimental ethp2p broadcast engine task, forwarding + // reconstructed messages back into this actor. + #[cfg(feature = "ethp2p")] + tokio::spawn(crate::ethp2p::run_engine_task( + built.ethp2p, + ethp2p_rx, + handle.clone(), + )); + P2P { handle } } @@ -393,6 +456,16 @@ impl Message for WrappedSwarmEvent { type Result = (); } +/// Wrapper for a message reconstructed by the ethp2p broadcast engine, +/// forwarded from the engine task into the actor so it flows through the +/// same consensus pipeline as gossipsub. +#[cfg(feature = "ethp2p")] +pub(crate) struct WrappedEthp2pDelivery(pub(crate) ethp2p_broadcast::engine::DeliveredMessage); +#[cfg(feature = "ethp2p")] +impl Message for WrappedEthp2pDelivery { + type Result = (); +} + /// P2P actor state. pub struct P2PServer { pub(crate) swarm_handle: SwarmHandle, @@ -412,6 +485,13 @@ pub struct P2PServer { pub(crate) range_sync_state: Option, bootnode_addrs: HashMap, node_names: HashMap, + + /// Command sender to the experimental ethp2p broadcast engine task. + /// `None` when the engine isn't running (feature off path). The + /// publish handlers tee gossip here. + #[cfg(feature = "ethp2p")] + pub(crate) ethp2p_publish: + Option>, } impl P2PServer { @@ -504,6 +584,20 @@ impl Handler for P2PServer { } } +#[cfg(feature = "ethp2p")] +impl Handler for P2PServer { + async fn handle(&mut self, msg: WrappedEthp2pDelivery, _ctx: &Context) { + let delivered = msg.0; + if let Some(ref blockchain) = self.blockchain { + crate::ethp2p::dispatch_delivered( + blockchain, + &delivered.channel_id, + &delivered.payload, + ); + } + } +} + impl Handler for P2PServer { async fn handle(&mut self, msg: FetchBlock, _ctx: &Context) { let root = msg.root;