Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Increase P2P pubsub max message size to match `DefaultMaxBlobSize`, preventing fullnode desync on large blocks

### Changes

- Add max bytes contraints in simple solo sequnecer [#3312](https://github.com/evstack/ev-node/pull/3312)
Expand Down
2 changes: 1 addition & 1 deletion apps/testapp/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM golang:1.26 AS base

#hadolint ignore=DL3018
RUN apt-get update && \

Check failure on line 4 in apps/testapp/Dockerfile

View workflow job for this annotation

GitHub Actions / lint / hadolint

DL3008 warning: Pin versions in apt get install. Instead of `apt-get install <package>` use `apt-get install <package>=<version>`
apt-get install -y --no-install-recommends \
build-essential \
ca-certificates \
Expand All @@ -21,7 +21,7 @@
# Dependencies are only re-downloaded when go.mod or go.sum change.
COPY go.mod go.sum ./
COPY apps/testapp/go.mod apps/testapp/go.sum ./apps/testapp/
RUN go mod download && (cd apps/testapp && go mod download)

Check failure on line 24 in apps/testapp/Dockerfile

View workflow job for this annotation

GitHub Actions / lint / hadolint

DL3003 warning: Use WORKDIR to switch to a directory

# Copy the rest of the source and build.
COPY . .
Expand All @@ -29,7 +29,7 @@
WORKDIR /ev-node/apps/testapp

# 125829120 = 120 MB
RUN go build -ldflags "-X github.com/evstack/ev-node/block/internal/common.defaultMaxBlobSizeStr=125829120" -o /go/bin/testapp .
RUN go build -ldflags "-X github.com/evstack/ev-node/pkg/blobsize.defaultMaxBlobSizeStr=125829120" -o /go/bin/testapp .

## prep the final image.
#
Expand Down
20 changes: 0 additions & 20 deletions block/internal/common/consts.go
Original file line number Diff line number Diff line change
@@ -1,21 +1 @@
package common

import "strconv"

// defaultMaxBlobSizeStr holds the string representation of the default blob
// size limit. Override at link time via:
//
// go build -ldflags "-X github.com/evstack/ev-node/block/internal/common.defaultMaxBlobSizeStr=125829120"
var defaultMaxBlobSizeStr = "5242880" // 5 MB

// DefaultMaxBlobSize is the max blob size limit used for blob submission.
var DefaultMaxBlobSize uint64

func init() {
v, err := strconv.ParseUint(defaultMaxBlobSizeStr, 10, 64)
if err != nil || v == 0 {
DefaultMaxBlobSize = 5 * 1024 * 1024 // 5 MB fallback
return
}
DefaultMaxBlobSize = v
}
6 changes: 3 additions & 3 deletions block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/celestiaorg/go-square/v3/share"
"github.com/rs/zerolog"

"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/pkg/blobsize"
blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
datypes "github.com/evstack/ev-node/pkg/da/types"
)
Expand Down Expand Up @@ -161,7 +161,7 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace

blobs := make([]*blobrpc.Blob, len(data))
for i, raw := range data {
if uint64(len(raw)) > common.DefaultMaxBlobSize {
if uint64(len(raw)) > blobsize.DefaultMaxBlobSize {
return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: datypes.StatusTooBig,
Expand Down Expand Up @@ -559,7 +559,7 @@ func extractBlobData(resp *blobrpc.SubscriptionResponse) [][]byte {
continue
}
data := blob.Data()
if len(data) == 0 || uint64(len(data)) > common.DefaultMaxBlobSize {
if len(data) == 0 || uint64(len(data)) > blobsize.DefaultMaxBlobSize {
continue
}
blobs = append(blobs, data)
Expand Down
3 changes: 2 additions & 1 deletion block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/evstack/ev-node/block/internal/common"
coreexecutor "github.com/evstack/ev-node/core/execution"
coresequencer "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/blobsize"
"github.com/evstack/ev-node/pkg/config"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/raft"
Expand Down Expand Up @@ -664,7 +665,7 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
func (e *Executor) RetrieveBatch(ctx context.Context) (*BatchData, error) {
req := coresequencer.GetNextBatchRequest{
Id: []byte(e.genesis.ChainID),
MaxBytes: common.DefaultMaxBlobSize,
MaxBytes: blobsize.DefaultMaxBlobSize,
LastBatchData: [][]byte{}, // Can be populated if needed for sequencer context
}

Expand Down
12 changes: 6 additions & 6 deletions block/internal/submitting/batching_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/pkg/blobsize"
"github.com/evstack/ev-node/pkg/config"
)

func TestImmediateStrategy(t *testing.T) {
strategy := &ImmediateStrategy{}
maxBlobSize := common.DefaultMaxBlobSize
maxBlobSize := blobsize.DefaultMaxBlobSize

tests := []struct {
name string
Expand Down Expand Up @@ -50,7 +50,7 @@ func TestImmediateStrategy(t *testing.T) {
}

func TestSizeBasedStrategy(t *testing.T) {
maxBlobSize := common.DefaultMaxBlobSize
maxBlobSize := blobsize.DefaultMaxBlobSize

tests := []struct {
name string
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestSizeBasedStrategy(t *testing.T) {

func TestTimeBasedStrategy(t *testing.T) {
maxDelay := 6 * time.Second
maxBlobSize := common.DefaultMaxBlobSize
maxBlobSize := blobsize.DefaultMaxBlobSize

tests := []struct {
name string
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestTimeBasedStrategy(t *testing.T) {
}

func TestAdaptiveStrategy(t *testing.T) {
maxBlobSize := common.DefaultMaxBlobSize
maxBlobSize := blobsize.DefaultMaxBlobSize
sizeThreshold := 0.8
maxDelay := 6 * time.Second

Expand Down Expand Up @@ -306,7 +306,7 @@ func TestNewBatchingStrategy(t *testing.T) {
}

func TestBatchingStrategiesComparison(t *testing.T) {
maxBlobSize := common.DefaultMaxBlobSize
maxBlobSize := blobsize.DefaultMaxBlobSize
pendingCount := uint64(10)
totalSize := maxBlobSize / 2
timeSinceLastSubmit := 3 * time.Second
Expand Down
3 changes: 2 additions & 1 deletion block/internal/submitting/da_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/block/internal/da"
"github.com/evstack/ev-node/pkg/blobsize"
"github.com/evstack/ev-node/pkg/config"
pkgda "github.com/evstack/ev-node/pkg/da"
datypes "github.com/evstack/ev-node/pkg/da/types"
Expand Down Expand Up @@ -50,7 +51,7 @@ func defaultRetryPolicy(maxAttempts int, maxDuration time.Duration) retryPolicy
MaxAttempts: maxAttempts,
MinBackoff: initialBackoff,
MaxBackoff: maxDuration,
MaxBlobBytes: common.DefaultMaxBlobSize,
MaxBlobBytes: blobsize.DefaultMaxBlobSize,
}
}

Expand Down
5 changes: 3 additions & 2 deletions block/internal/submitting/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/evstack/ev-node/block/internal/common"
coreexecutor "github.com/evstack/ev-node/core/execution"
coresequencer "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/blobsize"
"github.com/evstack/ev-node/pkg/config"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/signer"
Expand Down Expand Up @@ -219,7 +220,7 @@ func (s *Submitter) daSubmissionLoop() {
shouldSubmit := s.batchingStrategy.ShouldSubmit(
uint64(len(headers)),
totalSize,
common.DefaultMaxBlobSize,
blobsize.DefaultMaxBlobSize,
timeSinceLastSubmit,
)

Expand Down Expand Up @@ -279,7 +280,7 @@ func (s *Submitter) daSubmissionLoop() {
shouldSubmit := s.batchingStrategy.ShouldSubmit(
uint64(len(signedDataList)),
totalSize,
common.DefaultMaxBlobSize,
blobsize.DefaultMaxBlobSize,
timeSinceLastSubmit,
)

Expand Down
5 changes: 3 additions & 2 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/block/internal/da"
coreexecutor "github.com/evstack/ev-node/core/execution"
"github.com/evstack/ev-node/pkg/blobsize"
"github.com/evstack/ev-node/pkg/config"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/raft"
Expand Down Expand Up @@ -907,7 +908,7 @@ func (s *Syncer) gracePeriodForEpoch(epochStart, epochEnd uint64) uint64 {
s.forcedInclusionMu.RUnlock()

avgBytes := totalBytes / heightCount
threshold := uint64(math.Round(fullnessThreshold * float64(common.DefaultMaxBlobSize)))
threshold := uint64(math.Round(fullnessThreshold * float64(blobsize.DefaultMaxBlobSize)))

var extra uint64
if avgBytes > threshold {
Expand Down Expand Up @@ -991,7 +992,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64,
}

// Skip intrinsically invalid txs so the sequencer isn't blamed for dropping them.
filterStatuses, filterErr := s.exec.FilterTxs(ctx, event.Txs, common.DefaultMaxBlobSize, executionInfo.MaxGas, true)
filterStatuses, filterErr := s.exec.FilterTxs(ctx, event.Txs, blobsize.DefaultMaxBlobSize, executionInfo.MaxGas, true)
if filterErr != nil {
return fmt.Errorf("failed to filter forced inclusion txs: %w", filterErr)
}
Expand Down
9 changes: 5 additions & 4 deletions block/internal/syncing/syncer_forced_inclusion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/block/internal/da"
"github.com/evstack/ev-node/core/execution"
"github.com/evstack/ev-node/pkg/blobsize"
"github.com/evstack/ev-node/pkg/config"
datypes "github.com/evstack/ev-node/pkg/da/types"
"github.com/evstack/ev-node/pkg/genesis"
Expand Down Expand Up @@ -480,7 +481,7 @@ func TestGracePeriodForEpoch_LightBlocks(t *testing.T) {
func TestGracePeriodForEpoch_FullBlocks(t *testing.T) {
s := &Syncer{daBlockBytes: make(map[uint64]uint64)}
for h := uint64(0); h <= 4; h++ {
s.daBlockBytes[h] = common.DefaultMaxBlobSize
s.daBlockBytes[h] = blobsize.DefaultMaxBlobSize
}
grace := s.gracePeriodForEpoch(0, 4)
require.GreaterOrEqual(t, grace, baseGracePeriodEpochs)
Expand All @@ -490,7 +491,7 @@ func TestGracePeriodForEpoch_FullBlocks(t *testing.T) {
// avgBytes=1.6·M, threshold=0.8·M → extra=1 → grace=base+1.
func TestGracePeriodForEpoch_ExtendedUnderHighCongestion(t *testing.T) {
s := &Syncer{daBlockBytes: make(map[uint64]uint64)}
congested := uint64(float64(common.DefaultMaxBlobSize) * 1.6)
congested := uint64(float64(blobsize.DefaultMaxBlobSize) * 1.6)
for h := uint64(0); h <= 2; h++ {
s.daBlockBytes[h] = congested
}
Expand All @@ -501,7 +502,7 @@ func TestGracePeriodForEpoch_ExtendedUnderHighCongestion(t *testing.T) {
// TestGracePeriodForEpoch_CappedAtMax verifies the grace period never exceeds maxGracePeriodEpochs.
func TestGracePeriodForEpoch_CappedAtMax(t *testing.T) {
s := &Syncer{daBlockBytes: make(map[uint64]uint64)}
huge := common.DefaultMaxBlobSize * 100
huge := blobsize.DefaultMaxBlobSize * 100
for h := uint64(0); h <= 4; h++ {
s.daBlockBytes[h] = huge
}
Expand All @@ -526,7 +527,7 @@ func TestVerifyForcedInclusionTxs_DynamicGrace_CongestedEpochGetsExtraTime(t *te
mockFIEmpty(client, 2, 9)

// avgBytes = 1.6·M → extra=1 → gracePeriodForEpoch(0,1)=2 → graceBoundary=5.
blockBytes := uint64(float64(common.DefaultMaxBlobSize) * 1.6)
blockBytes := uint64(float64(blobsize.DefaultMaxBlobSize) * 1.6)

d0 := makeData("tchain", 1, 1)
d0.Txs[0] = types.Tx(make([]byte, blockBytes))
Expand Down
21 changes: 21 additions & 0 deletions pkg/blobsize/blobsize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package blobsize

import "strconv"

// defaultMaxBlobSizeStr holds the string representation of the default blob
// size limit. Override at link time via:
//
// go build -ldflags "-X github.com/evstack/ev-node/pkg/blobsize.defaultMaxBlobSizeStr=125829120"
var defaultMaxBlobSizeStr = "5242880" // 5 MB

// DefaultMaxBlobSize is the max blob size limit used for blob submission.
var DefaultMaxBlobSize uint64

func init() {
v, err := strconv.ParseUint(defaultMaxBlobSizeStr, 10, 64)
if err != nil || v == 0 {
DefaultMaxBlobSize = 5 * 1024 * 1024 // 5 MB fallback
return
}
DefaultMaxBlobSize = v
}
Comment on lines +14 to +21

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify where DefaultMaxBlobSize is cast to int and confirm no other guards exist.
rg -nP 'WithMaxMessageSize\s*\(\s*int\s*\(\s*blobsize\.DefaultMaxBlobSize\s*\)\s*\)' -C 2
rg -nP '\bDefaultMaxBlobSize\b' --type=go -C 2
rg -nP 'defaultMaxBlobSizeStr' -C 2

Repository: evstack/ev-node

Length of output: 13381


Guard DefaultMaxBlobSize against uint64int overflow before pubsub.WithMaxMessageSize.

pkg/blobsize/blobsize.go parses defaultMaxBlobSizeStr into DefaultMaxBlobSize and only checks err != nil || v == 0, while pkg/p2p/client.go later does pubsub.WithMaxMessageSize(int(blobsize.DefaultMaxBlobSize)). If defaultMaxBlobSizeStr is set to a value > maxInt, the uint64int cast can wrap/truncate and produce an invalid pubsub max message size.

Suggested fix
 package blobsize
 
 import "strconv"
 
+const fallbackMaxBlobSize uint64 = 5 * 1024 * 1024 // 5 MB
+
 // defaultMaxBlobSizeStr holds the string representation of the default blob
 // size limit. Override at link time via:
 //
 //	go build -ldflags "-X github.com/evstack/ev-node/pkg/blobsize.defaultMaxBlobSizeStr=125829120"
 //
 var defaultMaxBlobSizeStr = "5242880" // 5 MB
 
 // DefaultMaxBlobSize is the max blob size limit used for blob submission.
 var DefaultMaxBlobSize uint64
 
 func init() {
 	v, err := strconv.ParseUint(defaultMaxBlobSizeStr, 10, 64)
 	if err != nil || v == 0 {
-		DefaultMaxBlobSize = 5 * 1024 * 1024 // 5 MB fallback
+		DefaultMaxBlobSize = fallbackMaxBlobSize
 		return
 	}
+	maxInt := uint64(^uint(0) >> 1)
+	if v > maxInt {
+		DefaultMaxBlobSize = maxInt
+		return
+	}
 	DefaultMaxBlobSize = v
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/blobsize/blobsize.go` around lines 14 - 21, The init() in
pkg/blobsize/blobsize.go must guard DefaultMaxBlobSize against uint64→int
overflow: after parsing defaultMaxBlobSizeStr into v, verify v is non-zero and v
<= uint64(math.MaxInt) (or equivalent maxInt) before assigning
DefaultMaxBlobSize; if it exceeds max int, set DefaultMaxBlobSize to a safe
fallback or clamp to int(maxInt) so that later cast in pkg/p2p/client.go where
pubsub.WithMaxMessageSize(int(blobsize.DefaultMaxBlobSize)) cannot
wrap/truncate. Ensure you reference DefaultMaxBlobSize, init(), and
defaultMaxBlobSizeStr when making the change.

Source: Coding guidelines

3 changes: 2 additions & 1 deletion pkg/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"github.com/multiformats/go-multiaddr"
"github.com/rs/zerolog"

"github.com/evstack/ev-node/pkg/blobsize"
"github.com/evstack/ev-node/pkg/config"
rollhash "github.com/evstack/ev-node/pkg/hash"
)
Expand Down Expand Up @@ -465,7 +466,7 @@

func (c *Client) setupGossiping(ctx context.Context) error {
var err error
c.ps, err = pubsub.NewFloodSub(ctx, c.host)
c.ps, err = pubsub.NewFloodSub(ctx, c.host, pubsub.WithMaxMessageSize(int(blobsize.DefaultMaxBlobSize)))

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types High

Incorrect conversion of an unsigned 64-bit integer from
strconv.ParseUint
to a lower bit size type int without an upper bound check.
if err != nil {
return err
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/p2p/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,48 @@ func TestNewClientWithHost(t *testing.T) {

}

func TestPubSubMaxMessageSize(t *testing.T) {
require := require.New(t)
logger := zerolog.Nop()

ctx := t.Context()
clients := startTestNetwork(ctx, t, 2, map[int]hostDescr{
1: {conns: []int{0}},
}, logger)
defer clients.Close()
clients.WaitForDHT()

const topic = "test-large-msg"

subTopic, err := clients[1].PubSub().Join(topic)
require.NoError(err)
sub, err := subTopic.Subscribe()
require.NoError(err)

// allow subscription to propagate
time.Sleep(200 * time.Millisecond)

tp, err := clients[0].PubSub().Join(topic)
require.NoError(err)

// 1.5 MB — exceeds the default 1 MB limit but fits within our configured limit
msgSize := 1.5 * 1024 * 1024
payload := make([]byte, int(msgSize))
for i := range payload {
payload[i] = byte(i % 256)
}

err = tp.Publish(ctx, payload)
require.NoError(err)

recvCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

msg, err := sub.Next(recvCtx)
require.NoError(err)
require.Equal(payload, msg.Data)
}

func TestClientStartup(t *testing.T) {
assert := assert.New(t)
// create temp config dir
Expand Down
Loading