feat(rpc): add GET /lean/v0/events SSE stream (head/block/finalized_checkpoint)#460
feat(rpc): add GET /lean/v0/events SSE stream (head/block/finalized_checkpoint)#460MegaRedHand wants to merge 4 commits into
Conversation
Split the monolithic lib.rs API router into focused modules: core.rs holds the finalized-state/finalized-block/justified-checkpoint handlers and shared response helpers; blocks.rs, fork_choice.rs, and admin.rs each expose pub(crate) routes() -> Router<Store>. build_api_router merges them with .with_state(store). No behavior change.
Add a ChainEvent enum and a broadcast channel owned by the BlockChainServer actor. The store's update_head path emits Head and FinalizedCheckpoint when fork choice moves the head or finalization advances; on_block_core emits Block on import. The sender is threaded as Option<&ChainEventTx> so spec-test and test-driver entry points pass None. Keeps the actor as the sole writer: the flow is strictly one-directional (actor -> broadcast).
Subscribe a fresh broadcast receiver per connection and forward each ChainEvent as a Server-Sent Event. start_rpc_server takes the broadcast sender and attaches it via Extension; main.rs creates the channel and threads it into both BlockChain::spawn and start_rpc_server. RPC stays read-only: it only subscribes, never writes back to the actor.
🤖 Kimi Code ReviewOverall Assessment: Critical Issues1. Panic risk in fork choice head update let new_header = store
.get_block_header(&new_head)
.expect("head block exists");Using Recommendation: Use Security & Correctness2. Silent event drops on serialization failure Some(Ok(Event::default().event(name).json_data(ev).ok()?))If Recommendation: Log the error at 3. H256 serialization format Code Quality & Maintainability4. Incorrect comment in test // Issue the request first so the handler subscribes its receiver before
// we publish — `broadcast::send` errors if there are no live receivers.
5. Redundant finalized checkpoint check if new_finalized.slot > old_finalized.slot || new_finalized.root != old_finalized.rootFinalization is strictly monotonic in slot; the root check is redundant. Not harmful, but suggests uncertainty about invariants. Consider asserting monotonicity instead. 6. Dependency placement Performance & Observability7. Lagged client logging 8. Channel capacity sizing Positive Notes
Suggested Patch (Critical Fix)// crates/blockchain/src/store.rs
if old_head != new_head {
if let Some(header) = store.get_block_header(&new_head) {
let _ = events.send(ChainEvent::Head {
slot: header.slot,
root: new_head,
parent_root: header.parent_root,
});
} else {
tracing::error!(?new_head, "Head block missing from store during fork choice update");
}
}Automated review by Kimi (Moonshot AI) · kimi-k2.5 · custom prompt |
🤖 Codex Code ReviewNo material findings in the diff. The event emission points are side-band only and are placed after the store commit points, so I don’t see a consensus-path regression in fork choice, attestation validation, justification/finalization, XMSS verification, or SSZ handling from these changes. The Open questions / residual risk:
I couldn’t run Automated review by OpenAI Codex · gpt-5.4 · custom prompt |
Greptile SummaryThis PR introduces a Server-Sent Events endpoint (
Confidence Score: 3/5The blockchain actor plumbing and event emission logic are solid, but the RPC handler does not implement the topics filter documented in the PR, meaning every subscriber receives every event type regardless of what they request. The blockchain-side changes (event emission ordering, finalization condition, broadcast channel wiring) are well-reasoned and safe. The RPC handler ships without the topic-filtering behaviour the PR description advertises, creating an API contract mismatch from day one. Additionally, the emitted event name for finalized checkpoints differs from the documented topic name, and there is no SSE keep-alive configured, which would cause idle connections to be dropped by most proxies in production. crates/net/rpc/src/events.rs needs the most attention — topics filtering, keep-alive, and event name alignment all require changes before the endpoint matches its specification.
|
| Filename | Overview |
|---|---|
| crates/net/rpc/src/events.rs | New SSE handler for GET /lean/v0/events; missing topics filtering, no keep-alive, and a naming mismatch between the documented "finalized" topic and the emitted "finalized_checkpoint" event name. |
| crates/blockchain/src/events.rs | New ChainEvent enum and broadcast channel type aliases; clean, well-documented, and correctly sized at 256-capacity. |
| crates/blockchain/src/store.rs | Adds ChainEvent emission at head updates and block import; ordering (block before head) is intentional and correct; finalization condition is sound. |
| bin/ethlambda/src/main.rs | Wires broadcast channel from blockchain actor to RPC server; initial receiver correctly dropped; clone semantics for Sender are appropriate. |
| crates/net/rpc/src/lib.rs | Adds chain_events Extension layer to the API router and merges the new events routes; clean integration. |
| crates/blockchain/src/lib.rs | Threads ChainEventTx into BlockChainServer and on_tick/on_block calls; straightforward plumbing with no logic changes. |
| crates/blockchain/tests/forkchoice_spectests.rs | Updates test call sites to pass None for the new events parameter; mechanical change with no logic impact. |
| crates/blockchain/tests/signature_spectests.rs | Same mechanical None-passing update to on_tick and on_block call sites in signature spec tests. |
| crates/net/rpc/src/test_driver.rs | Updates test-driver on_tick call sites to pass None for events; no behavioral change. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant A as BlockChainServer
participant B as store.rs
participant C as broadcast channel
participant D as SSE handler
participant E as SSE Client
A->>B: "on_block(signed_block, events)"
B->>C: "send(Block{slot, root})"
B->>B: "update_head(events)"
B->>C: "send(Head{slot, root, parent_root})"
B->>C: "send(FinalizedCheckpoint{slot, root})"
E->>D: "GET /lean/v0/events"
D->>C: "tx.subscribe()"
C-->>D: "BroadcastStream events"
D-->>E: "event:block data:{...}"
D-->>E: "event:head data:{...}"
D-->>E: "event:finalized_checkpoint data:{...}"
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant A as BlockChainServer
participant B as store.rs
participant C as broadcast channel
participant D as SSE handler
participant E as SSE Client
A->>B: "on_block(signed_block, events)"
B->>C: "send(Block{slot, root})"
B->>B: "update_head(events)"
B->>C: "send(Head{slot, root, parent_root})"
B->>C: "send(FinalizedCheckpoint{slot, root})"
E->>D: "GET /lean/v0/events"
D->>C: "tx.subscribe()"
C-->>D: "BroadcastStream events"
D-->>E: "event:block data:{...}"
D-->>E: "event:head data:{...}"
D-->>E: "event:finalized_checkpoint data:{...}"
Prompt To Fix All With AI
Fix the following 4 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 4
crates/net/rpc/src/events.rs:25-46
**`topics` filter advertised but not implemented**
The PR description and endpoint URL document `GET /lean/v0/events?topics=head,block,finalized`, implying clients can selectively subscribe to a subset of event types. The handler ignores any query parameter entirely, so every subscriber receives every event type (`Head`, `Block`, and `FinalizedCheckpoint`) regardless of what they pass in `?topics=`. A monitoring tool subscribing only to `?topics=finalized` will receive `head` and `block` events it never asked for, and a future client that parses the query parameter would behave differently from what's deployed today.
### Issue 2 of 4
crates/net/rpc/src/events.rs:45-46
**Missing SSE keep-alive — idle connections will be dropped by proxies**
`Sse::new(stream)` with no `.keep_alive()` means the TCP connection sits completely idle between chain events. Most HTTP proxies, load balancers, and some browsers close idle connections after 30–60 s. Adding a keep-alive interval ensures the connection stays open during quiet periods.
```suggestion
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(std::time::Duration::from_secs(15))
.text("keep-alive"),
)
}
```
### Issue 3 of 4
crates/net/rpc/src/events.rs:43
**Silently drops events when `json_data` fails, with no log**
`json_data(ev).ok()?` turns a serialization error into `None`, which `filter_map` silently skips. While `ChainEvent` is unlikely to fail serialization today, a future variant that does would produce an invisible event hole. A `tracing::warn!` here keeps the skip explicit rather than surprising.
```suggestion
match Event::default().event(name).json_data(ev) {
Ok(sse_event) => Some(Ok(sse_event)),
Err(err) => {
tracing::warn!(%err, "failed to serialize chain event; skipping");
None
}
}
```
### Issue 4 of 4
crates/net/rpc/src/events.rs:41
**Event name `finalized_checkpoint` doesn't match the documented topic `finalized`**
The PR description documents the topic name as `finalized`, but the SSE `event:` field emitted on the wire is `"finalized_checkpoint"`. If `topics` filtering is ever implemented, a client passing `?topics=finalized` would receive no events for this type because the names won't match. Aligning the wire name to `"finalized"` now avoids a breaking change later.
Reviews (1): Last reviewed commit: "feat(rpc): add GET /lean/v0/events SSE s..." | Re-trigger Greptile
| async fn get_events( | ||
| Extension(tx): Extension<broadcast::Sender<ChainEvent>>, | ||
| ) -> Sse<impl Stream<Item = Result<Event, Infallible>>> { | ||
| let stream = BroadcastStream::new(tx.subscribe()).filter_map(|res| { | ||
| // A slow client falls behind and the broadcast channel overwrites | ||
| // events it never read. Surface that rather than silently dropping. | ||
| let ev = match res { | ||
| Ok(ev) => ev, | ||
| Err(BroadcastStreamRecvError::Lagged(skipped)) => { | ||
| tracing::debug!(skipped, "SSE client lagged; dropped chain events"); | ||
| return None; | ||
| } | ||
| }; | ||
| let name = match &ev { | ||
| ChainEvent::Head { .. } => "head", | ||
| ChainEvent::Block { .. } => "block", | ||
| ChainEvent::FinalizedCheckpoint { .. } => "finalized_checkpoint", | ||
| }; | ||
| Some(Ok(Event::default().event(name).json_data(ev).ok()?)) | ||
| }); | ||
| Sse::new(stream) | ||
| } |
There was a problem hiding this comment.
topics filter advertised but not implemented
The PR description and endpoint URL document GET /lean/v0/events?topics=head,block,finalized, implying clients can selectively subscribe to a subset of event types. The handler ignores any query parameter entirely, so every subscriber receives every event type (Head, Block, and FinalizedCheckpoint) regardless of what they pass in ?topics=. A monitoring tool subscribing only to ?topics=finalized will receive head and block events it never asked for, and a future client that parses the query parameter would behave differently from what's deployed today.
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/rpc/src/events.rs
Line: 25-46
Comment:
**`topics` filter advertised but not implemented**
The PR description and endpoint URL document `GET /lean/v0/events?topics=head,block,finalized`, implying clients can selectively subscribe to a subset of event types. The handler ignores any query parameter entirely, so every subscriber receives every event type (`Head`, `Block`, and `FinalizedCheckpoint`) regardless of what they pass in `?topics=`. A monitoring tool subscribing only to `?topics=finalized` will receive `head` and `block` events it never asked for, and a future client that parses the query parameter would behave differently from what's deployed today.
How can I resolve this? If you propose a fix, please make it concise.| Sse::new(stream) | ||
| } |
There was a problem hiding this comment.
Missing SSE keep-alive — idle connections will be dropped by proxies
Sse::new(stream) with no .keep_alive() means the TCP connection sits completely idle between chain events. Most HTTP proxies, load balancers, and some browsers close idle connections after 30–60 s. Adding a keep-alive interval ensures the connection stays open during quiet periods.
| Sse::new(stream) | |
| } | |
| Sse::new(stream).keep_alive( | |
| axum::response::sse::KeepAlive::new() | |
| .interval(std::time::Duration::from_secs(15)) | |
| .text("keep-alive"), | |
| ) | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/rpc/src/events.rs
Line: 45-46
Comment:
**Missing SSE keep-alive — idle connections will be dropped by proxies**
`Sse::new(stream)` with no `.keep_alive()` means the TCP connection sits completely idle between chain events. Most HTTP proxies, load balancers, and some browsers close idle connections after 30–60 s. Adding a keep-alive interval ensures the connection stays open during quiet periods.
```suggestion
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(std::time::Duration::from_secs(15))
.text("keep-alive"),
)
}
```
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| ChainEvent::Block { .. } => "block", | ||
| ChainEvent::FinalizedCheckpoint { .. } => "finalized_checkpoint", | ||
| }; | ||
| Some(Ok(Event::default().event(name).json_data(ev).ok()?)) |
There was a problem hiding this comment.
Silently drops events when
json_data fails, with no log
json_data(ev).ok()? turns a serialization error into None, which filter_map silently skips. While ChainEvent is unlikely to fail serialization today, a future variant that does would produce an invisible event hole. A tracing::warn! here keeps the skip explicit rather than surprising.
| Some(Ok(Event::default().event(name).json_data(ev).ok()?)) | |
| match Event::default().event(name).json_data(ev) { | |
| Ok(sse_event) => Some(Ok(sse_event)), | |
| Err(err) => { | |
| tracing::warn!(%err, "failed to serialize chain event; skipping"); | |
| None | |
| } | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/rpc/src/events.rs
Line: 43
Comment:
**Silently drops events when `json_data` fails, with no log**
`json_data(ev).ok()?` turns a serialization error into `None`, which `filter_map` silently skips. While `ChainEvent` is unlikely to fail serialization today, a future variant that does would produce an invisible event hole. A `tracing::warn!` here keeps the skip explicit rather than surprising.
```suggestion
match Event::default().event(name).json_data(ev) {
Ok(sse_event) => Some(Ok(sse_event)),
Err(err) => {
tracing::warn!(%err, "failed to serialize chain event; skipping");
None
}
}
```
How can I resolve this? If you propose a fix, please make it concise.| let name = match &ev { | ||
| ChainEvent::Head { .. } => "head", | ||
| ChainEvent::Block { .. } => "block", | ||
| ChainEvent::FinalizedCheckpoint { .. } => "finalized_checkpoint", |
There was a problem hiding this comment.
Event name
finalized_checkpoint doesn't match the documented topic finalized
The PR description documents the topic name as finalized, but the SSE event: field emitted on the wire is "finalized_checkpoint". If topics filtering is ever implemented, a client passing ?topics=finalized would receive no events for this type because the names won't match. Aligning the wire name to "finalized" now avoids a breaking change later.
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/net/rpc/src/events.rs
Line: 41
Comment:
**Event name `finalized_checkpoint` doesn't match the documented topic `finalized`**
The PR description documents the topic name as `finalized`, but the SSE `event:` field emitted on the wire is `"finalized_checkpoint"`. If `topics` filtering is ever implemented, a client passing `?topics=finalized` would receive no events for this type because the names won't match. Aligning the wire name to `"finalized"` now avoids a breaking change later.
How can I resolve this? If you propose a fix, please make it concise.
🤖 Claude Code ReviewI have enough verified findings to write the review. Here it is: PR #460 —
|
Adds a Server-Sent Events endpoint (
GET /lean/v0/events) that pushes real-time chain events to subscribers. All subscribers receive every event; there is no server-side topic filtering.The three event types are
head,block, andfinalized_checkpoint.Also adds the
ChainEventbroadcast channel in the blockchain actor and wires it throughmain.rsso the RPC server receives events without polling. Enables reactive explorer UIs and monitoring tools. Has unit tests and passed clippy.Stacked on #454.