From 4dfb50056d377dacd4ecd65313f11cd6183f69ba Mon Sep 17 00:00:00 2001 From: chatton Date: Mon, 8 Jun 2026 15:23:21 +0100 Subject: [PATCH] fix: increase P2P pubsub max message size to match blob size limit Move DefaultMaxBlobSize from block/internal/common to pkg/blobsize so the P2P layer can import it. Pass it to pubsub.WithMaxMessageSize when creating FloodSub, preventing fullnode desync when blocks exceed the default 1 MB libp2p limit. --- CHANGELOG.md | 4 ++ apps/testapp/Dockerfile | 2 +- block/internal/common/consts.go | 20 --------- block/internal/da/client.go | 6 +-- block/internal/executing/executor.go | 3 +- .../submitting/batching_strategy_test.go | 12 +++--- block/internal/submitting/da_submitter.go | 3 +- block/internal/submitting/submitter.go | 5 ++- block/internal/syncing/syncer.go | 5 ++- .../syncing/syncer_forced_inclusion_test.go | 9 ++-- pkg/blobsize/blobsize.go | 21 ++++++++++ pkg/p2p/client.go | 3 +- pkg/p2p/client_test.go | 42 +++++++++++++++++++ 13 files changed, 94 insertions(+), 41 deletions(-) create mode 100644 pkg/blobsize/blobsize.go diff --git a/CHANGELOG.md b/CHANGELOG.md index de3cc9ee78..a524130dae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Increase P2P pubsub max message size to match `DefaultMaxBlobSize`, preventing fullnode desync on large blocks + ### Changes - Add max bytes contraints in simple solo sequnecer [#3312](https://github.com/evstack/ev-node/pull/3312) diff --git a/apps/testapp/Dockerfile b/apps/testapp/Dockerfile index e41335f7b6..99d8da796f 100644 --- a/apps/testapp/Dockerfile +++ b/apps/testapp/Dockerfile @@ -29,7 +29,7 @@ COPY . . WORKDIR /ev-node/apps/testapp # 125829120 = 120 MB -RUN go build -ldflags "-X github.com/evstack/ev-node/block/internal/common.defaultMaxBlobSizeStr=125829120" -o /go/bin/testapp . +RUN go build -ldflags "-X github.com/evstack/ev-node/pkg/blobsize.defaultMaxBlobSizeStr=125829120" -o /go/bin/testapp . ## prep the final image. # diff --git a/block/internal/common/consts.go b/block/internal/common/consts.go index 840b2faa97..805d0c79aa 100644 --- a/block/internal/common/consts.go +++ b/block/internal/common/consts.go @@ -1,21 +1 @@ package common - -import "strconv" - -// defaultMaxBlobSizeStr holds the string representation of the default blob -// size limit. Override at link time via: -// -// go build -ldflags "-X github.com/evstack/ev-node/block/internal/common.defaultMaxBlobSizeStr=125829120" -var defaultMaxBlobSizeStr = "5242880" // 5 MB - -// DefaultMaxBlobSize is the max blob size limit used for blob submission. -var DefaultMaxBlobSize uint64 - -func init() { - v, err := strconv.ParseUint(defaultMaxBlobSizeStr, 10, 64) - if err != nil || v == 0 { - DefaultMaxBlobSize = 5 * 1024 * 1024 // 5 MB fallback - return - } - DefaultMaxBlobSize = v -} diff --git a/block/internal/da/client.go b/block/internal/da/client.go index 35fd50b91d..455d151d9f 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -12,7 +12,7 @@ import ( "github.com/celestiaorg/go-square/v3/share" "github.com/rs/zerolog" - "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/blobsize" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" datypes "github.com/evstack/ev-node/pkg/da/types" ) @@ -161,7 +161,7 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace blobs := make([]*blobrpc.Blob, len(data)) for i, raw := range data { - if uint64(len(raw)) > common.DefaultMaxBlobSize { + if uint64(len(raw)) > blobsize.DefaultMaxBlobSize { return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ Code: datypes.StatusTooBig, @@ -559,7 +559,7 @@ func extractBlobData(resp *blobrpc.SubscriptionResponse) [][]byte { continue } data := blob.Data() - if len(data) == 0 || uint64(len(data)) > common.DefaultMaxBlobSize { + if len(data) == 0 || uint64(len(data)) > blobsize.DefaultMaxBlobSize { continue } blobs = append(blobs, data) diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index f5be5e1b40..753ec9b94b 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -19,6 +19,7 @@ import ( "github.com/evstack/ev-node/block/internal/common" coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/blobsize" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/raft" @@ -664,7 +665,7 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { func (e *Executor) RetrieveBatch(ctx context.Context) (*BatchData, error) { req := coresequencer.GetNextBatchRequest{ Id: []byte(e.genesis.ChainID), - MaxBytes: common.DefaultMaxBlobSize, + MaxBytes: blobsize.DefaultMaxBlobSize, LastBatchData: [][]byte{}, // Can be populated if needed for sequencer context } diff --git a/block/internal/submitting/batching_strategy_test.go b/block/internal/submitting/batching_strategy_test.go index 2246291d12..1e6b1386e4 100644 --- a/block/internal/submitting/batching_strategy_test.go +++ b/block/internal/submitting/batching_strategy_test.go @@ -7,13 +7,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/pkg/blobsize" "github.com/evstack/ev-node/pkg/config" ) func TestImmediateStrategy(t *testing.T) { strategy := &ImmediateStrategy{} - maxBlobSize := common.DefaultMaxBlobSize + maxBlobSize := blobsize.DefaultMaxBlobSize tests := []struct { name string @@ -50,7 +50,7 @@ func TestImmediateStrategy(t *testing.T) { } func TestSizeBasedStrategy(t *testing.T) { - maxBlobSize := common.DefaultMaxBlobSize + maxBlobSize := blobsize.DefaultMaxBlobSize tests := []struct { name string @@ -120,7 +120,7 @@ func TestSizeBasedStrategy(t *testing.T) { func TestTimeBasedStrategy(t *testing.T) { maxDelay := 6 * time.Second - maxBlobSize := common.DefaultMaxBlobSize + maxBlobSize := blobsize.DefaultMaxBlobSize tests := []struct { name string @@ -174,7 +174,7 @@ func TestTimeBasedStrategy(t *testing.T) { } func TestAdaptiveStrategy(t *testing.T) { - maxBlobSize := common.DefaultMaxBlobSize + maxBlobSize := blobsize.DefaultMaxBlobSize sizeThreshold := 0.8 maxDelay := 6 * time.Second @@ -306,7 +306,7 @@ func TestNewBatchingStrategy(t *testing.T) { } func TestBatchingStrategiesComparison(t *testing.T) { - maxBlobSize := common.DefaultMaxBlobSize + maxBlobSize := blobsize.DefaultMaxBlobSize pendingCount := uint64(10) totalSize := maxBlobSize / 2 timeSinceLastSubmit := 3 * time.Second diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 83f56d9cb5..4d4b018cdf 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -16,6 +16,7 @@ import ( "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/block/internal/da" + "github.com/evstack/ev-node/pkg/blobsize" "github.com/evstack/ev-node/pkg/config" pkgda "github.com/evstack/ev-node/pkg/da" datypes "github.com/evstack/ev-node/pkg/da/types" @@ -50,7 +51,7 @@ func defaultRetryPolicy(maxAttempts int, maxDuration time.Duration) retryPolicy MaxAttempts: maxAttempts, MinBackoff: initialBackoff, MaxBackoff: maxDuration, - MaxBlobBytes: common.DefaultMaxBlobSize, + MaxBlobBytes: blobsize.DefaultMaxBlobSize, } } diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 25dcd781a1..fbb6f302e5 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -16,6 +16,7 @@ import ( "github.com/evstack/ev-node/block/internal/common" coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" + "github.com/evstack/ev-node/pkg/blobsize" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/signer" @@ -219,7 +220,7 @@ func (s *Submitter) daSubmissionLoop() { shouldSubmit := s.batchingStrategy.ShouldSubmit( uint64(len(headers)), totalSize, - common.DefaultMaxBlobSize, + blobsize.DefaultMaxBlobSize, timeSinceLastSubmit, ) @@ -279,7 +280,7 @@ func (s *Submitter) daSubmissionLoop() { shouldSubmit := s.batchingStrategy.ShouldSubmit( uint64(len(signedDataList)), totalSize, - common.DefaultMaxBlobSize, + blobsize.DefaultMaxBlobSize, timeSinceLastSubmit, ) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 40e3c9523f..e552c890ba 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -20,6 +20,7 @@ import ( "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/block/internal/da" coreexecutor "github.com/evstack/ev-node/core/execution" + "github.com/evstack/ev-node/pkg/blobsize" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/raft" @@ -907,7 +908,7 @@ func (s *Syncer) gracePeriodForEpoch(epochStart, epochEnd uint64) uint64 { s.forcedInclusionMu.RUnlock() avgBytes := totalBytes / heightCount - threshold := uint64(math.Round(fullnessThreshold * float64(common.DefaultMaxBlobSize))) + threshold := uint64(math.Round(fullnessThreshold * float64(blobsize.DefaultMaxBlobSize))) var extra uint64 if avgBytes > threshold { @@ -991,7 +992,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, } // Skip intrinsically invalid txs so the sequencer isn't blamed for dropping them. - filterStatuses, filterErr := s.exec.FilterTxs(ctx, event.Txs, common.DefaultMaxBlobSize, executionInfo.MaxGas, true) + filterStatuses, filterErr := s.exec.FilterTxs(ctx, event.Txs, blobsize.DefaultMaxBlobSize, executionInfo.MaxGas, true) if filterErr != nil { return fmt.Errorf("failed to filter forced inclusion txs: %w", filterErr) } diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 3c15fde125..6148c835d2 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -15,6 +15,7 @@ import ( "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/block/internal/da" "github.com/evstack/ev-node/core/execution" + "github.com/evstack/ev-node/pkg/blobsize" "github.com/evstack/ev-node/pkg/config" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" @@ -480,7 +481,7 @@ func TestGracePeriodForEpoch_LightBlocks(t *testing.T) { func TestGracePeriodForEpoch_FullBlocks(t *testing.T) { s := &Syncer{daBlockBytes: make(map[uint64]uint64)} for h := uint64(0); h <= 4; h++ { - s.daBlockBytes[h] = common.DefaultMaxBlobSize + s.daBlockBytes[h] = blobsize.DefaultMaxBlobSize } grace := s.gracePeriodForEpoch(0, 4) require.GreaterOrEqual(t, grace, baseGracePeriodEpochs) @@ -490,7 +491,7 @@ func TestGracePeriodForEpoch_FullBlocks(t *testing.T) { // avgBytes=1.6·M, threshold=0.8·M → extra=1 → grace=base+1. func TestGracePeriodForEpoch_ExtendedUnderHighCongestion(t *testing.T) { s := &Syncer{daBlockBytes: make(map[uint64]uint64)} - congested := uint64(float64(common.DefaultMaxBlobSize) * 1.6) + congested := uint64(float64(blobsize.DefaultMaxBlobSize) * 1.6) for h := uint64(0); h <= 2; h++ { s.daBlockBytes[h] = congested } @@ -501,7 +502,7 @@ func TestGracePeriodForEpoch_ExtendedUnderHighCongestion(t *testing.T) { // TestGracePeriodForEpoch_CappedAtMax verifies the grace period never exceeds maxGracePeriodEpochs. func TestGracePeriodForEpoch_CappedAtMax(t *testing.T) { s := &Syncer{daBlockBytes: make(map[uint64]uint64)} - huge := common.DefaultMaxBlobSize * 100 + huge := blobsize.DefaultMaxBlobSize * 100 for h := uint64(0); h <= 4; h++ { s.daBlockBytes[h] = huge } @@ -526,7 +527,7 @@ func TestVerifyForcedInclusionTxs_DynamicGrace_CongestedEpochGetsExtraTime(t *te mockFIEmpty(client, 2, 9) // avgBytes = 1.6·M → extra=1 → gracePeriodForEpoch(0,1)=2 → graceBoundary=5. - blockBytes := uint64(float64(common.DefaultMaxBlobSize) * 1.6) + blockBytes := uint64(float64(blobsize.DefaultMaxBlobSize) * 1.6) d0 := makeData("tchain", 1, 1) d0.Txs[0] = types.Tx(make([]byte, blockBytes)) diff --git a/pkg/blobsize/blobsize.go b/pkg/blobsize/blobsize.go new file mode 100644 index 0000000000..a2b268b986 --- /dev/null +++ b/pkg/blobsize/blobsize.go @@ -0,0 +1,21 @@ +package blobsize + +import "strconv" + +// defaultMaxBlobSizeStr holds the string representation of the default blob +// size limit. Override at link time via: +// +// go build -ldflags "-X github.com/evstack/ev-node/pkg/blobsize.defaultMaxBlobSizeStr=125829120" +var defaultMaxBlobSizeStr = "5242880" // 5 MB + +// DefaultMaxBlobSize is the max blob size limit used for blob submission. +var DefaultMaxBlobSize uint64 + +func init() { + v, err := strconv.ParseUint(defaultMaxBlobSizeStr, 10, 64) + if err != nil || v == 0 { + DefaultMaxBlobSize = 5 * 1024 * 1024 // 5 MB fallback + return + } + DefaultMaxBlobSize = v +} diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index e0d8ddd189..bb0830780a 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -24,6 +24,7 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog" + "github.com/evstack/ev-node/pkg/blobsize" "github.com/evstack/ev-node/pkg/config" rollhash "github.com/evstack/ev-node/pkg/hash" ) @@ -465,7 +466,7 @@ func (c *Client) tryConnect(ctx context.Context, peer peer.AddrInfo) { func (c *Client) setupGossiping(ctx context.Context) error { var err error - c.ps, err = pubsub.NewFloodSub(ctx, c.host) + c.ps, err = pubsub.NewFloodSub(ctx, c.host, pubsub.WithMaxMessageSize(int(blobsize.DefaultMaxBlobSize))) if err != nil { return err } diff --git a/pkg/p2p/client_test.go b/pkg/p2p/client_test.go index 568b152ea4..c99ef17833 100644 --- a/pkg/p2p/client_test.go +++ b/pkg/p2p/client_test.go @@ -82,6 +82,48 @@ func TestNewClientWithHost(t *testing.T) { } +func TestPubSubMaxMessageSize(t *testing.T) { + require := require.New(t) + logger := zerolog.Nop() + + ctx := t.Context() + clients := startTestNetwork(ctx, t, 2, map[int]hostDescr{ + 1: {conns: []int{0}}, + }, logger) + defer clients.Close() + clients.WaitForDHT() + + const topic = "test-large-msg" + + subTopic, err := clients[1].PubSub().Join(topic) + require.NoError(err) + sub, err := subTopic.Subscribe() + require.NoError(err) + + // allow subscription to propagate + time.Sleep(200 * time.Millisecond) + + tp, err := clients[0].PubSub().Join(topic) + require.NoError(err) + + // 1.5 MB — exceeds the default 1 MB limit but fits within our configured limit + msgSize := 1.5 * 1024 * 1024 + payload := make([]byte, int(msgSize)) + for i := range payload { + payload[i] = byte(i % 256) + } + + err = tp.Publish(ctx, payload) + require.NoError(err) + + recvCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + msg, err := sub.Next(recvCtx) + require.NoError(err) + require.Equal(payload, msg.Data) +} + func TestClientStartup(t *testing.T) { assert := assert.New(t) // create temp config dir