Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 91 additions & 26 deletions common/batch/batch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// packs consecutive L2 blocks into a chunk, seals with blob sidecars, and exposes query/delete APIs.
type BatchCache struct {
mu sync.RWMutex
initMu sync.Mutex
ctx context.Context
initDone bool

Expand Down Expand Up @@ -162,30 +163,49 @@ func (bc *BatchCache) Init() error {

// Initialize BatchCache parent batch information
// prevStateRoot should be the parent batch's postStateRoot
bc.parentBatchHeader = headerBytes
bc.prevStateRoot, err = headerBytes.PostStateRoot()
prevStateRoot, err := headerBytes.PostStateRoot()
if err != nil {
return fmt.Errorf("get post state root err: %w", err)
}
bc.lastPackedBlockHeight, err = headerBytes.LastBlockNumber()
lastPackedBlockHeight, err := headerBytes.LastBlockNumber()
if err != nil {
store, err := bc.rollupContract.BatchDataStore(nil, fi)
if err != nil {
return err
}
bc.lastPackedBlockHeight = store.BlockNumber.Uint64()
lastPackedBlockHeight = store.BlockNumber.Uint64()
}
bc.currentBlockNumber = bc.lastPackedBlockHeight
bc.totalL1MessagePopped, err = headerBytes.TotalL1MessagePopped()
totalL1MessagePopped, err := headerBytes.TotalL1MessagePopped()
if err != nil {
return fmt.Errorf("get total l1 message popped err: %w", err)
}
bc.mu.Lock()
bc.parentBatchHeader = headerBytes
bc.prevStateRoot = prevStateRoot
bc.lastPackedBlockHeight = lastPackedBlockHeight
bc.currentBlockNumber = lastPackedBlockHeight
bc.totalL1MessagePopped = totalL1MessagePopped
bc.mu.Unlock()
log.Info("Start assemble batch", "start batch", fi.Uint64(), "end batch", ci.Uint64())
return nil
}

func (bc *BatchCache) isInitialized() bool {
bc.mu.RLock()
defer bc.mu.RUnlock()
return bc.initDone
}

func (bc *BatchCache) setInitialized() {
bc.mu.Lock()
defer bc.mu.Unlock()
bc.initDone = true
}

func (bc *BatchCache) InitFromRollupByRange() error {
if bc.initDone {
bc.initMu.Lock()
defer bc.initMu.Unlock()
if bc.isInitialized() {
return nil
}
err := bc.Init()
Expand All @@ -196,13 +216,15 @@ func (bc *BatchCache) InitFromRollupByRange() error {
if err != nil {
return err
}
bc.initDone = true
bc.setInitialized()
log.Info("Initialized batch cache success")
return nil
}

func (bc *BatchCache) InitAndSyncFromDatabase() error {
if bc.initDone {
bc.initMu.Lock()
defer bc.initMu.Unlock()
if bc.isInitialized() {
return nil
}
err := bc.updateBatchConfigFromGov()
Expand All @@ -217,11 +239,11 @@ func (bc *BatchCache) InitAndSyncFromDatabase() error {
batches, headers, indices, err := bc.batchStorage.LoadAllSealedBatchesAndHeader()
if err != nil {
log.Error("Failed to load sealed batch headers from storage", "error", err)
return bc.DeleteBatchStorageAndInitFromRollup()
return bc.deleteBatchStorageAndInitFromRollupLocked()
}

if len(batches) == 0 {
return bc.InitAndSyncFromRollup()
return bc.initAndSyncFromRollupLocked()
}
maxIndex := indices[0]
for _, idx := range indices {
Expand All @@ -238,20 +260,20 @@ func (bc *BatchCache) InitAndSyncFromDatabase() error {
batchStorage, exist := batches[i]
if !exist || !bytes.Equal(batchHash[:], batchStorage.Hash.Bytes()) {
// batch not contiguous or batch is invalid
return bc.DeleteBatchStorageAndInitFromRollup()
return bc.deleteBatchStorageAndInitFromRollupLocked()
}
}

latestHeaderBytes := headers[maxIndex]
prevStateRoot, err := latestHeaderBytes.PostStateRoot()
if err != nil {
log.Error("Get post state root failed", "err", err)
return bc.DeleteBatchStorageAndInitFromRollup()
return bc.deleteBatchStorageAndInitFromRollupLocked()
}
totalL1MessagePopped, err := latestHeaderBytes.TotalL1MessagePopped()
if err != nil {
log.Error("Get total l1 message popped failed", "err", err)
return bc.DeleteBatchStorageAndInitFromRollup()
return bc.deleteBatchStorageAndInitFromRollupLocked()
}
lastPackedBlockHeight, err := latestHeaderBytes.LastBlockNumber()
if err != nil {
Expand All @@ -266,17 +288,18 @@ func (bc *BatchCache) InitAndSyncFromDatabase() error {
log.Error("Batch index is out of range",
"latestBatchIndex", latestBatchIndex,
"commitIndex", ci.Uint64(), "finalizeIndex", fi.Uint64())
return bc.DeleteBatchStorageAndInitFromRollup()
return bc.deleteBatchStorageAndInitFromRollupLocked()
}
store, err := bc.rollupContract.BatchDataStore(nil, new(big.Int).SetUint64(latestBatchIndex))
if err != nil {
log.Error("Failed to load latest batch index from rollup",
"error", err,
"batchIndex", latestBatchIndex)
return bc.DeleteBatchStorageAndInitFromRollup()
return bc.deleteBatchStorageAndInitFromRollupLocked()
}
lastPackedBlockHeight = store.BlockNumber.Uint64()
}
bc.mu.Lock()
bc.lastPackedBlockHeight = lastPackedBlockHeight
bc.sealedBatches = batches
bc.sealedBatchHeaders = headers
Expand All @@ -286,12 +309,19 @@ func (bc *BatchCache) InitAndSyncFromDatabase() error {
bc.totalL1MessagePopped = totalL1MessagePopped

bc.initDone = true
bc.mu.Unlock()
log.Info("Sync sealed batch from database success", "count", len(batches))
return nil
}

func (bc *BatchCache) InitAndSyncFromRollup() error {
if bc.initDone {
bc.initMu.Lock()
defer bc.initMu.Unlock()
return bc.initAndSyncFromRollupLocked()
}

func (bc *BatchCache) initAndSyncFromRollupLocked() error {
if bc.isInitialized() {
return nil
}
bc.replayL1CommittedBatches.Store(true)
Expand Down Expand Up @@ -336,7 +366,7 @@ func (bc *BatchCache) InitAndSyncFromRollup() error {
}
log.Info("Assemble batch success", "batch index", i, "last batch index", ci.Uint64())
}
bc.initDone = true
bc.setInitialized()
log.Info("Initialized batch cache success")
return nil
}
Expand Down Expand Up @@ -752,13 +782,14 @@ func (bc *BatchCache) SealBatch(sequencerSets []byte, blockTimestamp uint64, rep
copy(batchHeaderCopy, batchHeader)
bc.sealedBatchHeaders[batchIndex] = &batchHeaderCopy

err = bc.batchStorage.StoreSealedBatch(batchIndex, sealedBatch)
// Persist batch data, header and indices in one atomic write so the stored
// snapshot can never be partially updated.
err = bc.batchStorage.StoreSealedBatchAndHeader(batchIndex, sealedBatch, &batchHeaderCopy)
if err != nil {
log.Error("failed to store sealed batch", "err", err)
}
err = bc.batchStorage.StoreSealedBatchHeader(batchIndex, &batchHeaderCopy)
if err != nil {
log.Error("failed to store sealed batch header", "err", err)
log.Error("failed to store sealed batch and header", "batch_index", batchIndex, "err", err)
delete(bc.sealedBatches, batchIndex)
delete(bc.sealedBatchHeaders, batchIndex)
return 0, BatchHeaderBytes{}, false, fmt.Errorf("failed to store sealed batch and header for batch %d: %w", batchIndex, err)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
// Update parent batch information for next batch
bc.parentBatchHeader = &batchHeaderCopy
Expand Down Expand Up @@ -1258,6 +1289,34 @@ func (bc *BatchCache) Delete(batchIndex uint64) error {
return nil
}

// DeleteUntil removes every sealed batch and header with index <= maxIndex from
// both the in-memory maps and persistent storage. Finalize cleanup must use this
// range-based form: the finalize target can jump (multiple submitters, several
// batches finalized at once), so deleting a single index leaves stale lower
// indices behind and punches holes into the persisted indices snapshot, which
// breaks the contiguity assumption of the startup load path.
func (bc *BatchCache) DeleteUntil(maxIndex uint64) error {
bc.initMu.Lock()
defer bc.initMu.Unlock()
if err := bc.batchStorage.DeleteSealedBatchesUpTo(maxIndex); err != nil {
return err
}

bc.mu.Lock()
defer bc.mu.Unlock()
for idx := range bc.sealedBatches {
if idx <= maxIndex {
delete(bc.sealedBatches, idx)
}
}
for idx := range bc.sealedBatchHeaders {
if idx <= maxIndex {
delete(bc.sealedBatchHeaders, idx)
}
}
return nil
}

// logSealedBatch logs the details of the sealed batch for debugging purposes.
func (bc *BatchCache) logSealedBatch(batchHeader BatchHeaderBytes, batchHash common.Hash, blockCount uint16, blobCount int) {
version, err := batchHeader.Version()
Expand Down Expand Up @@ -1297,7 +1356,7 @@ func (bc *BatchCache) logSealedBatch(batchHeader BatchHeaderBytes, batchHash com
}

func (bc *BatchCache) AssembleCurrentBatchHeader() error {
if !bc.initDone {
if !bc.isInitialized() {
return errors.New("batch has not been initialized, should wait")
}
callOpts := &bind.CallOpts{
Expand Down Expand Up @@ -1512,11 +1571,17 @@ func maxUint64(values ...uint64) uint64 {
}

func (bc *BatchCache) DeleteBatchStorageAndInitFromRollup() error {
bc.initMu.Lock()
defer bc.initMu.Unlock()
return bc.deleteBatchStorageAndInitFromRollupLocked()
}

func (bc *BatchCache) deleteBatchStorageAndInitFromRollupLocked() error {
// should delete invalid batch data and batch header bytes
err := bc.batchStorage.DeleteAllSealedBatches()
if err != nil {
return err
}
// batch not contiguous or batch is invalid
return bc.InitAndSyncFromRollup()
return bc.initAndSyncFromRollupLocked()
}
Loading
Loading