From 097840dde3656ca8ccab3647d47cbd58cad123e5 Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Tue, 3 Feb 2026 16:20:57 +1000 Subject: [PATCH 1/4] feat(gap-detection): detect and process missing blocks in addition to incomplete blocks Previously gap detection only found incomplete blocks (rows where complete=0), missing truly missing blocks (rows that were never inserted). This change: - Add GetMissingBlocksInRange() to state manager using ClickHouse numbers() - Update GapStateProvider interface and GetGaps() to return GapResult with both Incomplete and Missing block lists plus scan duration - Make processBlock public (ProcessBlock) in all processors for gap filling - Add ProcessBlock to BlockProcessor interface - Add Prometheus metrics: GapsDetected, GapsReprocessed, GapScanDuration, GapsFound - Update checkGaps() to handle both types: ReprocessBlock for incomplete, ProcessBlock for missing blocks - Add comprehensive tests for both gap types --- pkg/clickhouse/mock.go | 18 + pkg/common/metrics.go | 22 + pkg/processor/manager.go | 73 ++- pkg/processor/tracker/limiter.go | 61 ++- pkg/processor/tracker/limiter_gap_test.go | 414 +++++++++++++++--- pkg/processor/tracker/processor.go | 5 + .../transaction/simple/block_processing.go | 7 +- .../transaction/structlog/block_processing.go | 7 +- .../structlog_agg/block_processing.go | 7 +- pkg/state/manager.go | 44 ++ 10 files changed, 559 insertions(+), 99 deletions(-) diff --git a/pkg/clickhouse/mock.go b/pkg/clickhouse/mock.go index ff1bc93..f7d876e 100644 --- a/pkg/clickhouse/mock.go +++ b/pkg/clickhouse/mock.go @@ -19,6 +19,7 @@ type MockClient struct { DoFunc func(ctx context.Context, query ch.Query) error QueryUInt64Func func(ctx context.Context, query string, columnName string) (*uint64, error) QueryMinMaxUInt64Func func(ctx context.Context, query string) (minVal, maxVal *uint64, err error) + QueryUInt64SliceFunc func(ctx context.Context, query string, columnName string) ([]uint64, error) // Track calls for assertions Calls []MockCall @@ -54,6 +55,9 @@ func NewMockClient() *MockClient { QueryMinMaxUInt64Func: func(ctx context.Context, query string) (minVal, maxVal *uint64, err error) { return nil, nil, nil }, + QueryUInt64SliceFunc: func(ctx context.Context, query string, columnName string) ([]uint64, error) { + return []uint64{}, nil + }, Calls: make([]MockCall, 0), } } @@ -86,6 +90,20 @@ func (m *MockClient) QueryMinMaxUInt64(ctx context.Context, query string) (minVa return nil, nil, nil } +// QueryUInt64Slice implements ClientInterface. +func (m *MockClient) QueryUInt64Slice(ctx context.Context, query string, columnName string) ([]uint64, error) { + m.Calls = append(m.Calls, MockCall{ + Method: "QueryUInt64Slice", + Args: []any{ctx, query, columnName}, + }) + + if m.QueryUInt64SliceFunc != nil { + return m.QueryUInt64SliceFunc(ctx, query, columnName) + } + + return []uint64{}, nil +} + // Execute implements ClientInterface. func (m *MockClient) Execute(ctx context.Context, query string) error { m.Calls = append(m.Calls, MockCall{ diff --git a/pkg/common/metrics.go b/pkg/common/metrics.go index 4ed8f94..24a56c9 100644 --- a/pkg/common/metrics.go +++ b/pkg/common/metrics.go @@ -233,4 +233,26 @@ var ( Name: "execution_processor_row_buffer_pending_tasks", Help: "Current number of tasks waiting for their rows to be flushed", }, []string{"network", "processor", "table"}) + + // Gap detection metrics. + GapsDetected = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "execution_processor_gaps_detected_total", + Help: "Total number of gaps detected during gap scans", + }, []string{"network", "processor", "gap_type"}) // gap_type: "incomplete" or "missing" + + GapsReprocessed = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "execution_processor_gaps_reprocessed_total", + Help: "Total number of gap blocks reprocessed", + }, []string{"network", "processor", "gap_type", "status"}) // status: "success" or "error" + + GapScanDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "execution_processor_gap_scan_duration_seconds", + Help: "Duration of gap detection scans", + Buckets: prometheus.ExponentialBuckets(0.01, 2, 12), + }, []string{"network", "processor"}) + + GapsFound = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "execution_processor_gaps_found", + Help: "Current count of gaps found in last scan", + }, []string{"network", "processor", "gap_type"}) ) diff --git a/pkg/processor/manager.go b/pkg/processor/manager.go index 4f437e1..2b3e2f7 100644 --- a/pkg/processor/manager.go +++ b/pkg/processor/manager.go @@ -1417,6 +1417,9 @@ func (m *Manager) checkStaleBlocks(ctx context.Context) { } // checkGaps checks for gaps in block processing across all processors and triggers reprocessing. +// It detects two types of gaps: +// - Incomplete: blocks with a row in DB but complete=0 (use ReprocessBlock) +// - Missing: blocks with no row at all (use ProcessBlock) func (m *Manager) checkGaps(ctx context.Context) { if !m.config.GapDetection.Enabled { return @@ -1464,7 +1467,7 @@ func (m *Manager) checkGaps(ctx context.Context) { continue } - gaps, gapErr := limiter.GetGaps( + gapResult, gapErr := limiter.GetGaps( ctx, currentBlock, m.config.GapDetection.LookbackRange, @@ -1476,22 +1479,74 @@ func (m *Manager) checkGaps(ctx context.Context) { continue } - if len(gaps) == 0 { + // Record scan duration metric + common.GapScanDuration.WithLabelValues(m.network.Name, processorName).Observe(gapResult.ScanDuration.Seconds()) + + // Record gap counts + common.GapsFound.WithLabelValues(m.network.Name, processorName, "incomplete").Set(float64(len(gapResult.Incomplete))) + common.GapsFound.WithLabelValues(m.network.Name, processorName, "missing").Set(float64(len(gapResult.Missing))) + + if len(gapResult.Incomplete) > 0 { + common.GapsDetected.WithLabelValues(m.network.Name, processorName, "incomplete").Add(float64(len(gapResult.Incomplete))) + } + + if len(gapResult.Missing) > 0 { + common.GapsDetected.WithLabelValues(m.network.Name, processorName, "missing").Add(float64(len(gapResult.Missing))) + } + + totalGaps := len(gapResult.Incomplete) + len(gapResult.Missing) + if totalGaps == 0 { continue } m.log.WithFields(logrus.Fields{ - "processor": processorName, - "gap_count": len(gaps), - "current_block": currentBlock, - }).Info("Detected gaps, reprocessing") - - for _, gapBlock := range gaps { + "processor": processorName, + "incomplete_count": len(gapResult.Incomplete), + "missing_count": len(gapResult.Missing), + "current_block": currentBlock, + }).Info("Detected gaps, processing") + + // Handle INCOMPLETE blocks -> ReprocessBlock (row exists, just stuck) + for _, gapBlock := range gapResult.Incomplete { if reprocessErr := processor.ReprocessBlock(ctx, gapBlock); reprocessErr != nil { m.log.WithError(reprocessErr).WithFields(logrus.Fields{ "processor": processorName, "block": gapBlock, - }).Warn("Failed to reprocess gap block") + "gap_type": "incomplete", + }).Warn("Failed to reprocess incomplete block") + + common.GapsReprocessed.WithLabelValues(m.network.Name, processorName, "incomplete", "error").Inc() + } else { + common.GapsReprocessed.WithLabelValues(m.network.Name, processorName, "incomplete", "success").Inc() + } + } + + // Handle MISSING blocks -> ProcessBlock (no row, needs full processing) + for _, gapBlock := range gapResult.Missing { + // Fetch the block first + block, fetchErr := node.BlockByNumber(ctx, new(big.Int).SetUint64(gapBlock)) + if fetchErr != nil { + m.log.WithError(fetchErr).WithFields(logrus.Fields{ + "processor": processorName, + "block": gapBlock, + "gap_type": "missing", + }).Warn("Failed to fetch missing block") + + common.GapsReprocessed.WithLabelValues(m.network.Name, processorName, "missing", "error").Inc() + + continue + } + + if processErr := processor.ProcessBlock(ctx, block); processErr != nil { + m.log.WithError(processErr).WithFields(logrus.Fields{ + "processor": processorName, + "block": gapBlock, + "gap_type": "missing", + }).Warn("Failed to process missing block") + + common.GapsReprocessed.WithLabelValues(m.network.Name, processorName, "missing", "error").Inc() + } else { + common.GapsReprocessed.WithLabelValues(m.network.Name, processorName, "missing", "success").Inc() } } } diff --git a/pkg/processor/tracker/limiter.go b/pkg/processor/tracker/limiter.go index a1260be..79101a0 100644 --- a/pkg/processor/tracker/limiter.go +++ b/pkg/processor/tracker/limiter.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "time" "github.com/ethpandaops/execution-processor/pkg/common" "github.com/sirupsen/logrus" @@ -20,9 +21,17 @@ type StateProvider interface { type GapStateProvider interface { StateProvider GetIncompleteBlocksInRange(ctx context.Context, network, processor string, minBlock, maxBlock uint64, limit int) ([]uint64, error) + GetMissingBlocksInRange(ctx context.Context, network, processor string, minBlock, maxBlock uint64, limit int) ([]uint64, error) GetMinMaxStoredBlocks(ctx context.Context, network, processor string) (*big.Int, *big.Int, error) } +// GapResult contains the results of a gap scan. +type GapResult struct { + Incomplete []uint64 // Blocks with row but complete=0 + Missing []uint64 // Blocks with no row at all + ScanDuration time.Duration // Time taken to perform the scan +} + // LimiterConfig holds configuration for the Limiter. type LimiterConfig struct { MaxPendingBlockRange int @@ -216,11 +225,16 @@ func (l *Limiter) ValidateBatchWithinLeash(ctx context.Context, startBlock uint6 return nil } -// GetGaps returns incomplete blocks outside the maxPendingBlockRange window. +// GetGaps returns both incomplete and missing blocks outside the maxPendingBlockRange window. // If lookbackRange is 0, scans from the oldest stored block. // This performs a full-range scan for gap detection, excluding the recent window // that is already handled by IsBlockedByIncompleteBlocks. -func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRange uint64, limit int) ([]uint64, error) { +// Returns a GapResult containing: +// - Incomplete: blocks with a row in DB but complete=0 +// - Missing: blocks with no row in DB at all +func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRange uint64, limit int) (*GapResult, error) { + startTime := time.Now() + gapProvider, ok := l.stateProvider.(GapStateProvider) if !ok { return nil, fmt.Errorf("state provider does not support gap detection") @@ -237,7 +251,7 @@ func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRang if minStored == nil { // No blocks stored yet - return nil, nil + return &GapResult{ScanDuration: time.Since(startTime)}, nil } minBlock = minStored.Uint64() @@ -260,16 +274,17 @@ func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRang maxBlock = currentBlock - exclusionWindow - 1 } else { // Current block is within the exclusion window, nothing to scan - return nil, nil + return &GapResult{ScanDuration: time.Since(startTime)}, nil } } // Ensure minBlock doesn't exceed maxBlock if minBlock > maxBlock { - return nil, nil + return &GapResult{ScanDuration: time.Since(startTime)}, nil } - gaps, err := gapProvider.GetIncompleteBlocksInRange( + // Get incomplete blocks (have row, complete=0) + incomplete, err := gapProvider.GetIncompleteBlocksInRange( ctx, l.network, l.processor, minBlock, maxBlock, limit, ) @@ -277,14 +292,36 @@ func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRang return nil, fmt.Errorf("failed to get incomplete blocks in range: %w", err) } - if len(gaps) > 0 { + // Calculate remaining limit for missing blocks + remainingLimit := limit - len(incomplete) + + var missing []uint64 + + if remainingLimit > 0 { + missing, err = gapProvider.GetMissingBlocksInRange( + ctx, l.network, l.processor, + minBlock, maxBlock, remainingLimit, + ) + if err != nil { + return nil, fmt.Errorf("failed to get missing blocks in range: %w", err) + } + } + + result := &GapResult{ + Incomplete: incomplete, + Missing: missing, + ScanDuration: time.Since(startTime), + } + + if len(incomplete) > 0 || len(missing) > 0 { l.log.WithFields(logrus.Fields{ - "min_block": minBlock, - "max_block": maxBlock, - "gap_count": len(gaps), - "first_gap": gaps[0], + "min_block": minBlock, + "max_block": maxBlock, + "incomplete_count": len(incomplete), + "missing_count": len(missing), + "scan_duration": result.ScanDuration, }).Debug("Found gaps in block range") } - return gaps, nil + return result, nil } diff --git a/pkg/processor/tracker/limiter_gap_test.go b/pkg/processor/tracker/limiter_gap_test.go index 45f0c5b..2c73c27 100644 --- a/pkg/processor/tracker/limiter_gap_test.go +++ b/pkg/processor/tracker/limiter_gap_test.go @@ -2,6 +2,7 @@ package tracker import ( "context" + "errors" "math/big" "testing" @@ -14,15 +15,35 @@ import ( type mockGapStateProvider struct { mockStateProvider incompleteBlocksInRange []uint64 + missingBlocksInRange []uint64 minStoredBlock *big.Int maxStoredBlock *big.Int getIncompleteErr error + getMissingErr error getMinMaxErr error + + // Track calls for parameter validation + lastIncompleteCall struct { + network, processor string + minBlock, maxBlock uint64 + limit int + } + lastMissingCall struct { + network, processor string + minBlock, maxBlock uint64 + limit int + } } func (m *mockGapStateProvider) GetIncompleteBlocksInRange( - _ context.Context, _, _ string, _, _ uint64, _ int, + _ context.Context, network, processor string, minBlock, maxBlock uint64, limit int, ) ([]uint64, error) { + m.lastIncompleteCall.network = network + m.lastIncompleteCall.processor = processor + m.lastIncompleteCall.minBlock = minBlock + m.lastIncompleteCall.maxBlock = maxBlock + m.lastIncompleteCall.limit = limit + if m.getIncompleteErr != nil { return nil, m.getIncompleteErr } @@ -30,6 +51,22 @@ func (m *mockGapStateProvider) GetIncompleteBlocksInRange( return m.incompleteBlocksInRange, nil } +func (m *mockGapStateProvider) GetMissingBlocksInRange( + _ context.Context, network, processor string, minBlock, maxBlock uint64, limit int, +) ([]uint64, error) { + m.lastMissingCall.network = network + m.lastMissingCall.processor = processor + m.lastMissingCall.minBlock = minBlock + m.lastMissingCall.maxBlock = maxBlock + m.lastMissingCall.limit = limit + + if m.getMissingErr != nil { + return nil, m.getMissingErr + } + + return m.missingBlocksInRange, nil +} + func (m *mockGapStateProvider) GetMinMaxStoredBlocks( _ context.Context, _, _ string, ) (*big.Int, *big.Int, error) { @@ -40,14 +77,13 @@ func (m *mockGapStateProvider) GetMinMaxStoredBlocks( return m.minStoredBlock, m.maxStoredBlock, nil } -func TestGetGaps_FindsMissingBlocks(t *testing.T) { - // Setup: blocks 5,6,7,9,10...100 are complete, block 8 is incomplete - // With maxPendingBlockRange=2 and currentBlock=100, gap scanner looks at blocks up to 97 - // (excluding the window 98-100 that the limiter handles) +func TestGetGaps_FindsBothTypes(t *testing.T) { + // Test that GetGaps returns both incomplete and missing blocks mockProvider := &mockGapStateProvider{ minStoredBlock: big.NewInt(5), maxStoredBlock: big.NewInt(100), - incompleteBlocksInRange: []uint64{8}, + incompleteBlocksInRange: []uint64{8, 15}, + missingBlocksInRange: []uint64{12, 20}, } limiter := NewLimiter(&LimiterDeps{ @@ -61,17 +97,22 @@ func TestGetGaps_FindsMissingBlocks(t *testing.T) { currentBlock := uint64(100) lookbackRange := uint64(0) // Unlimited - // Gap scanner searches [5, 97] (excludes 98-100 handled by limiter) - gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + result, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) require.NoError(t, err) - assert.Equal(t, []uint64{8}, gaps) + require.NotNil(t, result) + assert.Equal(t, []uint64{8, 15}, result.Incomplete) + assert.Equal(t, []uint64{12, 20}, result.Missing) + assert.True(t, result.ScanDuration > 0) } -func TestGetGaps_RespectsLookbackRange(t *testing.T) { +func TestGetGaps_OnlyIncomplete(t *testing.T) { + // Test when only incomplete blocks exist (no missing) mockProvider := &mockGapStateProvider{ - // GetMinMaxStoredBlocks should NOT be called when lookbackRange is set - incompleteBlocksInRange: []uint64{75, 80}, + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{5, 10, 15}, + missingBlocksInRange: []uint64{}, } limiter := NewLimiter(&LimiterDeps{ @@ -82,14 +123,37 @@ func TestGetGaps_RespectsLookbackRange(t *testing.T) { }, LimiterConfig{MaxPendingBlockRange: 2}) ctx := context.Background() - currentBlock := uint64(100) - lookbackRange := uint64(50) // Only look back 50 blocks + result, err := limiter.GetGaps(ctx, 100, 0, 100) + + require.NoError(t, err) + require.NotNil(t, result) + assert.Equal(t, []uint64{5, 10, 15}, result.Incomplete) + assert.Empty(t, result.Missing) +} + +func TestGetGaps_OnlyMissing(t *testing.T) { + // Test when only missing blocks exist (no incomplete) + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{}, + missingBlocksInRange: []uint64{7, 14, 21}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) - // Should query from block 50 (100 - 50) to 100 - gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + ctx := context.Background() + result, err := limiter.GetGaps(ctx, 100, 0, 100) require.NoError(t, err) - assert.Equal(t, []uint64{75, 80}, gaps) + require.NotNil(t, result) + assert.Empty(t, result.Incomplete) + assert.Equal(t, []uint64{7, 14, 21}, result.Missing) } func TestGetGaps_NoGaps(t *testing.T) { @@ -97,6 +161,7 @@ func TestGetGaps_NoGaps(t *testing.T) { minStoredBlock: big.NewInt(1), maxStoredBlock: big.NewInt(100), incompleteBlocksInRange: []uint64{}, + missingBlocksInRange: []uint64{}, } limiter := NewLimiter(&LimiterDeps{ @@ -107,22 +172,44 @@ func TestGetGaps_NoGaps(t *testing.T) { }, LimiterConfig{MaxPendingBlockRange: 2}) ctx := context.Background() - currentBlock := uint64(100) - lookbackRange := uint64(0) - - gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + result, err := limiter.GetGaps(ctx, 100, 0, 100) require.NoError(t, err) - assert.Empty(t, gaps) + require.NotNil(t, result) + assert.Empty(t, result.Incomplete) + assert.Empty(t, result.Missing) } -func TestGetGaps_DoesNotLookBeforeOldestStoredBlock(t *testing.T) { - // Ensure we don't query for blocks before the oldest stored block +func TestGetGaps_ErrorFromGetIncomplete(t *testing.T) { + expectedErr := errors.New("incomplete query failed") mockProvider := &mockGapStateProvider{ - // Oldest stored is 50, so we should only query from 50 onwards - minStoredBlock: big.NewInt(50), + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(100), + getIncompleteErr: expectedErr, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + result, err := limiter.GetGaps(ctx, 100, 0, 100) + + require.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "failed to get incomplete blocks") +} + +func TestGetGaps_ErrorFromGetMissing(t *testing.T) { + expectedErr := errors.New("missing query failed") + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), maxStoredBlock: big.NewInt(100), incompleteBlocksInRange: []uint64{}, + getMissingErr: expectedErr, } limiter := NewLimiter(&LimiterDeps{ @@ -133,13 +220,33 @@ func TestGetGaps_DoesNotLookBeforeOldestStoredBlock(t *testing.T) { }, LimiterConfig{MaxPendingBlockRange: 2}) ctx := context.Background() - currentBlock := uint64(100) - lookbackRange := uint64(0) // Unlimited + result, err := limiter.GetGaps(ctx, 100, 0, 100) + + require.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "failed to get missing blocks") +} - gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) +func TestGetGaps_ErrorFromGetMinMax(t *testing.T) { + expectedErr := errors.New("min/max query failed") + mockProvider := &mockGapStateProvider{ + getMinMaxErr: expectedErr, + } - require.NoError(t, err) - assert.Empty(t, gaps) + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + // lookbackRange=0 triggers GetMinMaxStoredBlocks call + result, err := limiter.GetGaps(ctx, 100, 0, 100) + + require.Error(t, err) + assert.Nil(t, result) + assert.Contains(t, err.Error(), "failed to get min stored block") } func TestGetGaps_NoBlocksStored(t *testing.T) { @@ -156,13 +263,12 @@ func TestGetGaps_NoBlocksStored(t *testing.T) { }, LimiterConfig{MaxPendingBlockRange: 2}) ctx := context.Background() - currentBlock := uint64(100) - lookbackRange := uint64(0) - - gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + result, err := limiter.GetGaps(ctx, 100, 0, 100) require.NoError(t, err) - assert.Nil(t, gaps) + require.NotNil(t, result) + assert.Empty(t, result.Incomplete) + assert.Empty(t, result.Missing) } func TestGetGaps_StateProviderDoesNotSupportGapDetection(t *testing.T) { @@ -177,21 +283,157 @@ func TestGetGaps_StateProviderDoesNotSupportGapDetection(t *testing.T) { }, LimiterConfig{MaxPendingBlockRange: 2}) ctx := context.Background() - currentBlock := uint64(100) - lookbackRange := uint64(0) - - gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + result, err := limiter.GetGaps(ctx, 100, 0, 100) require.Error(t, err) assert.Contains(t, err.Error(), "state provider does not support gap detection") - assert.Nil(t, gaps) + assert.Nil(t, result) +} + +func TestGetGaps_MaxPendingBlockRangeZero(t *testing.T) { + // When maxPendingBlockRange is 0, no exclusion window should be applied + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{95, 98}, + missingBlocksInRange: []uint64{99}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 0}) + + ctx := context.Background() + result, err := limiter.GetGaps(ctx, 100, 0, 100) + + require.NoError(t, err) + require.NotNil(t, result) + assert.Equal(t, []uint64{95, 98}, result.Incomplete) + assert.Equal(t, []uint64{99}, result.Missing) +} + +func TestGetGaps_CurrentBlockWithinExclusionWindow(t *testing.T) { + // When currentBlock is smaller than or equal to maxPendingBlockRange, + // there's nothing to scan outside the exclusion window + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(5), + incompleteBlocksInRange: []uint64{2}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 10}) // Larger than currentBlock + + ctx := context.Background() + result, err := limiter.GetGaps(ctx, 5, 0, 100) + + require.NoError(t, err) + require.NotNil(t, result) + // No scanning should occur when currentBlock <= maxPendingBlockRange + assert.Empty(t, result.Incomplete) + assert.Empty(t, result.Missing) +} + +func TestGetGaps_ParameterValidation(t *testing.T) { + // Verify correct parameters are passed to state provider + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(10), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{}, + missingBlocksInRange: []uint64{}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "testnet", + Processor: "structlog", + }, LimiterConfig{MaxPendingBlockRange: 5}) + + ctx := context.Background() + // currentBlock=100, maxPendingBlockRange=5 means maxBlock=94 + // lookbackRange=0 uses minStored=10 + _, err := limiter.GetGaps(ctx, 100, 0, 50) + + require.NoError(t, err) + + // Verify incomplete call parameters + assert.Equal(t, "testnet", mockProvider.lastIncompleteCall.network) + assert.Equal(t, "structlog", mockProvider.lastIncompleteCall.processor) + assert.Equal(t, uint64(10), mockProvider.lastIncompleteCall.minBlock) + assert.Equal(t, uint64(94), mockProvider.lastIncompleteCall.maxBlock) // 100 - 5 - 1 + assert.Equal(t, 50, mockProvider.lastIncompleteCall.limit) + + // Verify missing call parameters + assert.Equal(t, "testnet", mockProvider.lastMissingCall.network) + assert.Equal(t, "structlog", mockProvider.lastMissingCall.processor) + assert.Equal(t, uint64(10), mockProvider.lastMissingCall.minBlock) + assert.Equal(t, uint64(94), mockProvider.lastMissingCall.maxBlock) + assert.Equal(t, 50, mockProvider.lastMissingCall.limit) // Full limit since no incomplete found } -func TestGetGaps_MultipleGaps(t *testing.T) { +func TestGetGaps_LimitSplitBetweenTypes(t *testing.T) { + // Test that remaining limit is correctly calculated for missing blocks mockProvider := &mockGapStateProvider{ minStoredBlock: big.NewInt(1), maxStoredBlock: big.NewInt(100), - incompleteBlocksInRange: []uint64{5, 10, 15, 20, 25}, + incompleteBlocksInRange: []uint64{5, 10, 15}, // 3 incomplete + missingBlocksInRange: []uint64{20, 25}, // Should only get limit-3=7 slots + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + result, err := limiter.GetGaps(ctx, 100, 0, 10) + + require.NoError(t, err) + require.NotNil(t, result) + + // With limit=10 and 3 incomplete, missing should be called with limit=7 + assert.Equal(t, 7, mockProvider.lastMissingCall.limit) +} + +func TestGetGaps_ScanDurationTracked(t *testing.T) { + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{}, + missingBlocksInRange: []uint64{}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + result, err := limiter.GetGaps(ctx, 100, 0, 100) + + require.NoError(t, err) + require.NotNil(t, result) + // Duration should be tracked even with no gaps found + assert.True(t, result.ScanDuration >= 0) +} + +func TestGetGaps_RespectsLookbackRange(t *testing.T) { + mockProvider := &mockGapStateProvider{ + // GetMinMaxStoredBlocks should NOT be called when lookbackRange is set + incompleteBlocksInRange: []uint64{75, 80}, + missingBlocksInRange: []uint64{77}, } limiter := NewLimiter(&LimiterDeps{ @@ -203,19 +445,23 @@ func TestGetGaps_MultipleGaps(t *testing.T) { ctx := context.Background() currentBlock := uint64(100) - lookbackRange := uint64(0) + lookbackRange := uint64(50) // Only look back 50 blocks - gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + // With lookbackRange=50 and currentBlock=100, minBlock=50 + // With maxPendingBlockRange=2, maxBlock=97 + result, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) require.NoError(t, err) - assert.Equal(t, []uint64{5, 10, 15, 20, 25}, gaps) + require.NotNil(t, result) + assert.Equal(t, []uint64{75, 80}, result.Incomplete) + assert.Equal(t, []uint64{77}, result.Missing) } func TestGetGaps_LookbackRangeGreaterThanCurrentBlock(t *testing.T) { // When lookbackRange is greater than currentBlock, minBlock should be 0 - // With maxPendingBlockRange=2 and currentBlock=10, gap scanner looks at blocks up to 7 mockProvider := &mockGapStateProvider{ incompleteBlocksInRange: []uint64{3}, + missingBlocksInRange: []uint64{5}, } limiter := NewLimiter(&LimiterDeps{ @@ -229,20 +475,47 @@ func TestGetGaps_LookbackRangeGreaterThanCurrentBlock(t *testing.T) { currentBlock := uint64(10) lookbackRange := uint64(100) // Greater than currentBlock - // Gap scanner searches [0, 7] (excludes 8-10 handled by limiter) - gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + result, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) require.NoError(t, err) - assert.Equal(t, []uint64{3}, gaps) + require.NotNil(t, result) + assert.Equal(t, []uint64{3}, result.Incomplete) + assert.Equal(t, []uint64{5}, result.Missing) +} + +func TestGetGaps_MinBlockGreaterThanMaxBlock(t *testing.T) { + // When minBlock > maxBlock, should return empty result + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(95), // Min stored is 95 + maxStoredBlock: big.NewInt(100), + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 10}) // Excludes 90-100 + + ctx := context.Background() + // minBlock=95, maxBlock=100-10-1=89 + // 95 > 89, so no scanning should occur + result, err := limiter.GetGaps(ctx, 100, 0, 100) + + require.NoError(t, err) + require.NotNil(t, result) + assert.Empty(t, result.Incomplete) + assert.Empty(t, result.Missing) } func TestGetGaps_ExcludesMaxPendingBlockRangeWindow(t *testing.T) { - // Verify that gaps within maxPendingBlockRange are NOT returned + // Verify that gaps within maxPendingBlockRange are NOT scanned // because they're already handled by IsBlockedByIncompleteBlocks mockProvider := &mockGapStateProvider{ minStoredBlock: big.NewInt(1), maxStoredBlock: big.NewInt(100), - incompleteBlocksInRange: []uint64{50}, // Gap at block 50, outside the exclusion window + incompleteBlocksInRange: []uint64{50}, + missingBlocksInRange: []uint64{60}, } limiter := NewLimiter(&LimiterDeps{ @@ -253,24 +526,27 @@ func TestGetGaps_ExcludesMaxPendingBlockRangeWindow(t *testing.T) { }, LimiterConfig{MaxPendingBlockRange: 5}) ctx := context.Background() - currentBlock := uint64(100) - lookbackRange := uint64(0) // With maxPendingBlockRange=5, gap scanner searches [1, 94] (excludes 95-100) - // Block 50 should be found since it's outside the exclusion window - gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + result, err := limiter.GetGaps(ctx, 100, 0, 100) require.NoError(t, err) - assert.Equal(t, []uint64{50}, gaps) + require.NotNil(t, result) + assert.Equal(t, []uint64{50}, result.Incomplete) + assert.Equal(t, []uint64{60}, result.Missing) + + // Verify maxBlock was correctly calculated + assert.Equal(t, uint64(94), mockProvider.lastIncompleteCall.maxBlock) + assert.Equal(t, uint64(94), mockProvider.lastMissingCall.maxBlock) } -func TestGetGaps_CurrentBlockWithinExclusionWindow(t *testing.T) { - // When currentBlock is smaller than or equal to maxPendingBlockRange, - // there's nothing to scan outside the exclusion window +func TestGetGaps_LimitExhaustedByIncomplete(t *testing.T) { + // Test when incomplete fills entire limit, missing should not be called mockProvider := &mockGapStateProvider{ minStoredBlock: big.NewInt(1), - maxStoredBlock: big.NewInt(5), - incompleteBlocksInRange: []uint64{2}, + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{5, 10, 15, 20, 25}, // 5 items, fills limit + missingBlocksInRange: []uint64{30, 35}, // Should not be called } limiter := NewLimiter(&LimiterDeps{ @@ -278,15 +554,15 @@ func TestGetGaps_CurrentBlockWithinExclusionWindow(t *testing.T) { StateProvider: mockProvider, Network: "mainnet", Processor: "simple", - }, LimiterConfig{MaxPendingBlockRange: 10}) // Larger than currentBlock + }, LimiterConfig{MaxPendingBlockRange: 2}) ctx := context.Background() - currentBlock := uint64(5) - lookbackRange := uint64(0) - - // currentBlock (5) <= maxPendingBlockRange (10), so nothing to scan - gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + result, err := limiter.GetGaps(ctx, 100, 0, 5) // limit=5 require.NoError(t, err) - assert.Nil(t, gaps) + require.NotNil(t, result) + assert.Len(t, result.Incomplete, 5) + + // Missing should be called with limit=0, resulting in empty result + assert.Equal(t, 0, mockProvider.lastMissingCall.limit) } diff --git a/pkg/processor/tracker/processor.go b/pkg/processor/tracker/processor.go index da183ce..ee1e4af 100644 --- a/pkg/processor/tracker/processor.go +++ b/pkg/processor/tracker/processor.go @@ -55,6 +55,7 @@ import ( "context" "time" + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" "github.com/hibiken/asynq" ) @@ -121,6 +122,10 @@ type BlockProcessor interface { // Used when a block is in ClickHouse (complete=0) but has no Redis tracking. // This can happen due to Redis TTL expiry, Redis restart, or crashes. ReprocessBlock(ctx context.Context, blockNum uint64) error + + // ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks. + // This is used for gap filling of missing blocks (blocks with no row in DB). + ProcessBlock(ctx context.Context, block execution.Block) error } // QueueInfo contains information about a processor queue. diff --git a/pkg/processor/transaction/simple/block_processing.go b/pkg/processor/transaction/simple/block_processing.go index 6ed3dd9..631b130 100644 --- a/pkg/processor/transaction/simple/block_processing.go +++ b/pkg/processor/transaction/simple/block_processing.go @@ -138,7 +138,7 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { // Process each block, stopping on first error for _, block := range blocks { - if processErr := p.processBlock(ctx, block); processErr != nil { + if processErr := p.ProcessBlock(ctx, block); processErr != nil { return processErr } } @@ -156,8 +156,9 @@ func (p *Processor) handleBlockNotFound(_ context.Context, nextBlock *big.Int) e return fmt.Errorf("block %s not yet available", nextBlock.String()) } -// processBlock processes a single block - the core logic extracted from the original ProcessNextBlock. -func (p *Processor) processBlock(ctx context.Context, block execution.Block) error { +// ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks. +// This is used for both normal processing and gap filling of missing blocks. +func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) error { blockNumber := block.Number() p.log.WithFields(logrus.Fields{ diff --git a/pkg/processor/transaction/structlog/block_processing.go b/pkg/processor/transaction/structlog/block_processing.go index 19a292b..774b612 100644 --- a/pkg/processor/transaction/structlog/block_processing.go +++ b/pkg/processor/transaction/structlog/block_processing.go @@ -141,7 +141,7 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { // Process each block, stopping on first error for _, block := range blocks { - if processErr := p.processBlock(ctx, block); processErr != nil { + if processErr := p.ProcessBlock(ctx, block); processErr != nil { return processErr } } @@ -171,8 +171,9 @@ func (p *Processor) handleBlockNotFound(ctx context.Context, node execution.Node return fmt.Errorf("block %s not found", nextBlock) } -// processBlock processes a single block - the core logic extracted from the original ProcessNextBlock. -func (p *Processor) processBlock(ctx context.Context, block execution.Block) error { +// ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks. +// This is used for both normal processing and gap filling of missing blocks. +func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) error { blockNumber := block.Number() // Check if this block was recently processed to avoid rapid reprocessing diff --git a/pkg/processor/transaction/structlog_agg/block_processing.go b/pkg/processor/transaction/structlog_agg/block_processing.go index 598e150..5dcd245 100644 --- a/pkg/processor/transaction/structlog_agg/block_processing.go +++ b/pkg/processor/transaction/structlog_agg/block_processing.go @@ -141,7 +141,7 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { // Process each block, stopping on first error for _, block := range blocks { - if processErr := p.processBlock(ctx, block); processErr != nil { + if processErr := p.ProcessBlock(ctx, block); processErr != nil { return processErr } } @@ -171,8 +171,9 @@ func (p *Processor) handleBlockNotFound(ctx context.Context, node execution.Node return fmt.Errorf("block %s not found", nextBlock) } -// processBlock processes a single block - the core logic extracted from the original ProcessNextBlock. -func (p *Processor) processBlock(ctx context.Context, block execution.Block) error { +// ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks. +// This is used for both normal processing and gap filling of missing blocks. +func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) error { blockNumber := block.Number() // Check if this block was recently processed to avoid rapid reprocessing diff --git a/pkg/state/manager.go b/pkg/state/manager.go index cba4c01..f2de16a 100644 --- a/pkg/state/manager.go +++ b/pkg/state/manager.go @@ -719,6 +719,50 @@ func (s *Manager) GetIncompleteBlocksInRange( return blocks, nil } +// GetMissingBlocksInRange returns block numbers that have no row in the database. +// This finds blocks that were never processed, not incomplete blocks. +// Uses ClickHouse's numbers() function to generate a sequence and LEFT JOIN to find gaps. +func (s *Manager) GetMissingBlocksInRange( + ctx context.Context, + network, processor string, + minBlock, maxBlock uint64, + limit int, +) ([]uint64, error) { + // Use numbers() to generate a sequence from minBlock to maxBlock, + // then LEFT JOIN to find blocks that don't exist in storage. + query := fmt.Sprintf(` + SELECT n.number AS block_number + FROM numbers(%d, %d) AS n + LEFT JOIN ( + SELECT DISTINCT block_number + FROM %s FINAL + WHERE processor = '%s' + AND meta_network_name = '%s' + AND block_number >= %d + AND block_number <= %d + ) AS e ON n.number = e.block_number + WHERE e.block_number IS NULL + ORDER BY block_number ASC + LIMIT %d + `, minBlock, maxBlock-minBlock+1, s.storageTable, processor, network, + minBlock, maxBlock, limit) + + s.log.WithFields(logrus.Fields{ + "processor": processor, + "network": network, + "min_block": minBlock, + "max_block": maxBlock, + "limit": limit, + }).Debug("Querying for missing blocks in range") + + blocks, err := s.storageClient.QueryUInt64Slice(ctx, query, "block_number") + if err != nil { + return nil, fmt.Errorf("failed to get missing blocks in range: %w", err) + } + + return blocks, nil +} + func (s *Manager) GetMinMaxStoredBlocks(ctx context.Context, network, processor string) (minBlock, maxBlock *big.Int, err error) { query := fmt.Sprintf(` SELECT min(block_number) as min, max(block_number) as max From 7125f8088ac1d29ae527dee3f03fa8fe87cf5331 Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Tue, 3 Feb 2026 17:09:04 +1000 Subject: [PATCH 2/4] feat(tracker): optimize stale block detection with Redis sorted sets and Lua script - Replace O(N) SCAN with O(log N + M) ZRANGEBYSCORE using sorted set - Add Lua script for atomic task completion tracking (2 round trips -> 1) - Add ClearStaleBlocks bulk cleanup method using pipeline - Add pendingBlocksKey sorted set for tracking pending blocks by timestamp - Update RegisterBlock, MarkBlockComplete, ClearBlock to maintain sorted set - Remove unused metaKeyPattern and extractBlockNumFromKey functions - Add comprehensive tests for new functionality including concurrency test --- pkg/processor/tracker/block_tracker.go | 206 ++++++---- pkg/processor/tracker/block_tracker_test.go | 397 ++++++++++++++++++++ 2 files changed, 530 insertions(+), 73 deletions(-) diff --git a/pkg/processor/tracker/block_tracker.go b/pkg/processor/tracker/block_tracker.go index 735f89c..cd51547 100644 --- a/pkg/processor/tracker/block_tracker.go +++ b/pkg/processor/tracker/block_tracker.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strconv" - "strings" "time" "github.com/redis/go-redis/v9" @@ -20,6 +19,30 @@ const ( DefaultStaleThreshold = 5 * time.Minute ) +// trackTaskCompletionScript is a Lua script for atomic task completion tracking. +// It adds a task to the completed set and returns both the completed count and expected count +// in a single round trip, ensuring atomicity. +// +//nolint:gochecknoglobals // Lua scripts are safely shared across goroutines +var trackTaskCompletionScript = redis.NewScript(` +local completedKey = KEYS[1] +local expectedKey = KEYS[2] +local taskID = ARGV[1] +local ttlSeconds = tonumber(ARGV[2]) + +redis.call('SADD', completedKey, taskID) +redis.call('EXPIRE', completedKey, ttlSeconds) + +local completedCount = redis.call('SCARD', completedKey) +local expectedStr = redis.call('GET', expectedKey) + +if expectedStr == false then + return {completedCount, -1} +end + +return {completedCount, tonumber(expectedStr)} +`) + // BlockCompletionTrackerConfig holds configuration for the BlockCompletionTracker. type BlockCompletionTrackerConfig struct { // StaleThreshold is the time after which a block is considered stale. @@ -95,13 +118,14 @@ func (t *BlockCompletionTracker) metaKey(network, processor, mode string, blockN return fmt.Sprintf("%s:block_meta:%s:%s:%s:%d", t.prefix, processor, network, mode, blockNum) } -// metaKeyPattern returns a pattern for scanning block_meta keys. -func (t *BlockCompletionTracker) metaKeyPattern(network, processor, mode string) string { +// pendingBlocksKey returns the key for the sorted set of pending blocks. +// The sorted set uses enqueue timestamps as scores for O(log N) stale detection. +func (t *BlockCompletionTracker) pendingBlocksKey(network, processor, mode string) string { if t.prefix == "" { - return fmt.Sprintf("block_meta:%s:%s:%s:*", processor, network, mode) + return fmt.Sprintf("pending_blocks:%s:%s:%s", processor, network, mode) } - return fmt.Sprintf("%s:block_meta:%s:%s:%s:*", t.prefix, processor, network, mode) + return fmt.Sprintf("%s:pending_blocks:%s:%s:%s", t.prefix, processor, network, mode) } // RegisterBlock initializes tracking for a new block. @@ -116,16 +140,24 @@ func (t *BlockCompletionTracker) RegisterBlock( completedKey := t.completedKey(network, processor, mode, blockNum) expectedKey := t.expectedKey(network, processor, mode, blockNum) metaKey := t.metaKey(network, processor, mode, blockNum) + pendingKey := t.pendingBlocksKey(network, processor, mode) + + now := time.Now() pipe := t.redis.Pipeline() pipe.Del(ctx, completedKey) // Clear old completions pipe.Set(ctx, expectedKey, expectedCount, DefaultBlockMetaTTL) // Set expected count pipe.HSet(ctx, metaKey, map[string]any{ - "enqueued_at": time.Now().Unix(), + "enqueued_at": now.Unix(), "queue": queue, "expected": expectedCount, }) pipe.Expire(ctx, metaKey, DefaultBlockMetaTTL) + // Add to pending blocks sorted set with timestamp as score for O(log N) stale detection + pipe.ZAdd(ctx, pendingKey, redis.Z{ + Score: float64(now.Unix()), + Member: blockNum, + }) _, err := pipe.Exec(ctx) if err != nil { @@ -146,6 +178,7 @@ func (t *BlockCompletionTracker) RegisterBlock( // TrackTaskCompletion records a task completion and checks if block is done. // Returns true if all tasks are now complete. +// Uses a Lua script for atomic completion tracking in a single round trip. func (t *BlockCompletionTracker) TrackTaskCompletion( ctx context.Context, taskID string, @@ -155,25 +188,31 @@ func (t *BlockCompletionTracker) TrackTaskCompletion( completedKey := t.completedKey(network, processor, mode, blockNum) expectedKey := t.expectedKey(network, processor, mode, blockNum) - // Add to completed set (idempotent - same task completing twice is fine) - // Set TTL to ensure cleanup if block never completes - pipe := t.redis.Pipeline() - pipe.SAdd(ctx, completedKey, taskID) - pipe.Expire(ctx, completedKey, DefaultBlockMetaTTL) + // Execute Lua script for atomic task completion tracking + result, err := trackTaskCompletionScript.Run(ctx, t.redis, + []string{completedKey, expectedKey}, + taskID, int(DefaultBlockMetaTTL.Seconds()), + ).Slice() + if err != nil { + return false, fmt.Errorf("failed to track task completion: %w", err) + } - if _, err := pipe.Exec(ctx); err != nil { - return false, fmt.Errorf("failed to add task to completed set: %w", err) + if len(result) != 2 { + return false, fmt.Errorf("unexpected result length from Lua script: %d", len(result)) } - // Get counts - completedCount, err := t.redis.SCard(ctx, completedKey).Result() - if err != nil { - return false, fmt.Errorf("failed to get completed count: %w", err) + completedCount, ok := result[0].(int64) + if !ok { + return false, fmt.Errorf("failed to parse completed count from Lua script result") } - expectedStr, err := t.redis.Get(ctx, expectedKey).Result() - if err == redis.Nil { - // Block not registered - might be old task from before retry, or already cleaned up + expected, ok := result[1].(int64) + if !ok { + return false, fmt.Errorf("failed to parse expected count from Lua script result") + } + + // -1 indicates block not registered (expected key doesn't exist) + if expected == -1 { t.log.WithFields(logrus.Fields{ "block_number": blockNum, "task_id": taskID, @@ -185,15 +224,6 @@ func (t *BlockCompletionTracker) TrackTaskCompletion( return false, nil } - if err != nil { - return false, fmt.Errorf("failed to get expected count: %w", err) - } - - expected, err := strconv.ParseInt(expectedStr, 10, 64) - if err != nil { - return false, fmt.Errorf("failed to parse expected count: %w", err) - } - t.log.WithFields(logrus.Fields{ "block_number": blockNum, "task_id": taskID, @@ -222,8 +252,13 @@ func (t *BlockCompletionTracker) MarkBlockComplete( completedKey := t.completedKey(network, processor, mode, blockNum) expectedKey := t.expectedKey(network, processor, mode, blockNum) metaKey := t.metaKey(network, processor, mode, blockNum) + pendingKey := t.pendingBlocksKey(network, processor, mode) - if err := t.redis.Del(ctx, completedKey, expectedKey, metaKey).Err(); err != nil { + pipe := t.redis.Pipeline() + pipe.Del(ctx, completedKey, expectedKey, metaKey) + pipe.ZRem(ctx, pendingKey, blockNum) + + if _, err := pipe.Exec(ctx); err != nil { // Log but don't fail - keys will expire anyway t.log.WithError(err).WithFields(logrus.Fields{ "block_number": blockNum, @@ -244,45 +279,85 @@ func (t *BlockCompletionTracker) MarkBlockComplete( } // GetStaleBlocks returns blocks that have been processing longer than the stale threshold. +// Uses ZRANGEBYSCORE on the pending blocks sorted set for O(log N + M) complexity. func (t *BlockCompletionTracker) GetStaleBlocks( ctx context.Context, network, processor, mode string, ) ([]uint64, error) { - pattern := t.metaKeyPattern(network, processor, mode) - staleBlocks := make([]uint64, 0) + pendingKey := t.pendingBlocksKey(network, processor, mode) + staleThreshold := time.Now().Add(-t.config.StaleThreshold).Unix() - iter := t.redis.Scan(ctx, 0, pattern, 100).Iterator() - for iter.Next(ctx) { - key := iter.Val() + members, err := t.redis.ZRangeByScore(ctx, pendingKey, &redis.ZRangeBy{ + Min: "-inf", + Max: fmt.Sprintf("%d", staleThreshold), + }).Result() + if err != nil { + return nil, fmt.Errorf("failed to get stale blocks: %w", err) + } + + staleBlocks := make([]uint64, 0, len(members)) - enqueuedAtStr, err := t.redis.HGet(ctx, key, "enqueued_at").Result() + for _, member := range members { + blockNum, err := strconv.ParseUint(member, 10, 64) if err != nil { - t.log.WithError(err).WithField("key", key).Debug("Failed to get enqueued_at") + t.log.WithError(err).WithField("member", member).Debug("Failed to parse block number") continue } - enqueuedAt, err := strconv.ParseInt(enqueuedAtStr, 10, 64) - if err != nil { - t.log.WithError(err).WithField("key", key).Debug("Failed to parse enqueued_at") + staleBlocks = append(staleBlocks, blockNum) + } - continue - } + return staleBlocks, nil +} - if time.Since(time.Unix(enqueuedAt, 0)) > t.config.StaleThreshold { - // Extract block number from key - blockNum := extractBlockNumFromKey(key) - if blockNum != 0 { - staleBlocks = append(staleBlocks, blockNum) - } - } +// ClearStaleBlocks removes all stale blocks and their associated tracking data. +// Uses ZRANGEBYSCORE to identify stale blocks and a pipeline to efficiently delete all related keys. +// Returns the number of blocks cleared. +func (t *BlockCompletionTracker) ClearStaleBlocks( + ctx context.Context, + network, processor, mode string, +) (int, error) { + staleBlocks, err := t.GetStaleBlocks(ctx, network, processor, mode) + if err != nil { + return 0, fmt.Errorf("failed to get stale blocks: %w", err) } - if err := iter.Err(); err != nil { - return nil, fmt.Errorf("failed to scan for stale blocks: %w", err) + if len(staleBlocks) == 0 { + return 0, nil } - return staleBlocks, nil + pendingKey := t.pendingBlocksKey(network, processor, mode) + pipe := t.redis.Pipeline() + + // Collect all keys to delete and members to remove from sorted set + members := make([]any, 0, len(staleBlocks)) + + for _, blockNum := range staleBlocks { + completedKey := t.completedKey(network, processor, mode, blockNum) + expectedKey := t.expectedKey(network, processor, mode, blockNum) + metaKey := t.metaKey(network, processor, mode, blockNum) + + pipe.Del(ctx, completedKey, expectedKey, metaKey) + + members = append(members, blockNum) + } + + // Remove all stale blocks from the sorted set in one operation + pipe.ZRem(ctx, pendingKey, members...) + + if _, err := pipe.Exec(ctx); err != nil { + return 0, fmt.Errorf("failed to clear stale blocks: %w", err) + } + + t.log.WithFields(logrus.Fields{ + "count": len(staleBlocks), + "network": network, + "processor": processor, + "mode": mode, + }).Debug("Cleared stale blocks") + + return len(staleBlocks), nil } // GetBlockStatus returns the completion status of a block. @@ -330,8 +405,13 @@ func (t *BlockCompletionTracker) ClearBlock( completedKey := t.completedKey(network, processor, mode, blockNum) expectedKey := t.expectedKey(network, processor, mode, blockNum) metaKey := t.metaKey(network, processor, mode, blockNum) + pendingKey := t.pendingBlocksKey(network, processor, mode) - if err := t.redis.Del(ctx, completedKey, expectedKey, metaKey).Err(); err != nil { + pipe := t.redis.Pipeline() + pipe.Del(ctx, completedKey, expectedKey, metaKey) + pipe.ZRem(ctx, pendingKey, blockNum) + + if _, err := pipe.Exec(ctx); err != nil { return fmt.Errorf("failed to clear block tracking: %w", err) } @@ -345,26 +425,6 @@ func (t *BlockCompletionTracker) ClearBlock( return nil } -// extractBlockNumFromKey extracts the block number from a Redis key. -// Key format: block_meta:{prefix}:{processor}:{network}:{mode}:{blockNum} -// or: block_meta:{processor}:{network}:{mode}:{blockNum} -func extractBlockNumFromKey(key string) uint64 { - parts := strings.Split(key, ":") - if len(parts) < 2 { - return 0 - } - - // Block number is always the last part - blockNumStr := parts[len(parts)-1] - - blockNum, err := strconv.ParseUint(blockNumStr, 10, 64) - if err != nil { - return 0 - } - - return blockNum -} - // HasBlockTracking checks if a block has Redis tracking data. // Returns true if block_meta key exists (block is being tracked). // Used to detect orphaned blocks that are in ClickHouse (complete=0) but have no Redis tracking. diff --git a/pkg/processor/tracker/block_tracker_test.go b/pkg/processor/tracker/block_tracker_test.go index 8fe2d49..7edc2e1 100644 --- a/pkg/processor/tracker/block_tracker_test.go +++ b/pkg/processor/tracker/block_tracker_test.go @@ -3,6 +3,7 @@ package tracker import ( "context" "fmt" + "sync" "testing" "time" @@ -272,3 +273,399 @@ func (m *mockStateProviderForLimiter) MarkBlockComplete( ) error { return nil } + +func TestBlockCompletionTracker_PendingBlocksSortedSet(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{StaleThreshold: 5 * time.Minute}, + ) + + ctx := context.Background() + network := "test_network" + processor := "test_processor" + mode := "forwards" + + t.Run("RegisterBlock adds to sorted set", func(t *testing.T) { + blockNum := uint64(100) + + err := tracker.RegisterBlock(ctx, blockNum, 5, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Verify block is in sorted set + pendingKey := fmt.Sprintf("pending_blocks:%s:%s:%s", processor, network, mode) + score, err := client.ZScore(ctx, pendingKey, "100").Result() + require.NoError(t, err) + assert.Greater(t, score, float64(0), "score should be a positive timestamp") + }) + + t.Run("MarkBlockComplete removes from sorted set", func(t *testing.T) { + blockNum := uint64(101) + + err := tracker.RegisterBlock(ctx, blockNum, 1, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Verify block is in sorted set + pendingKey := fmt.Sprintf("pending_blocks:%s:%s:%s", processor, network, mode) + exists, err := client.ZScore(ctx, pendingKey, "101").Result() + require.NoError(t, err) + assert.Greater(t, exists, float64(0)) + + // Mark complete + err = tracker.MarkBlockComplete(ctx, blockNum, network, processor, mode) + require.NoError(t, err) + + // Verify block is removed from sorted set + _, err = client.ZScore(ctx, pendingKey, "101").Result() + assert.ErrorIs(t, err, redis.Nil, "block should be removed from sorted set") + }) + + t.Run("ClearBlock removes from sorted set", func(t *testing.T) { + blockNum := uint64(102) + + err := tracker.RegisterBlock(ctx, blockNum, 1, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Verify block is in sorted set + pendingKey := fmt.Sprintf("pending_blocks:%s:%s:%s", processor, network, mode) + _, err = client.ZScore(ctx, pendingKey, "102").Result() + require.NoError(t, err) + + // Clear block + err = tracker.ClearBlock(ctx, blockNum, network, processor, mode) + require.NoError(t, err) + + // Verify block is removed from sorted set + _, err = client.ZScore(ctx, pendingKey, "102").Result() + assert.ErrorIs(t, err, redis.Nil, "block should be removed from sorted set") + }) +} + +func TestBlockCompletionTracker_GetStaleBlocks_SortedSet(t *testing.T) { + ctx := context.Background() + network := "test_network" + processor := "test_processor" + mode := "forwards" + + t.Run("returns empty when no blocks registered", func(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{StaleThreshold: 100 * time.Millisecond}, + ) + + staleBlocks, err := tracker.GetStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.Empty(t, staleBlocks) + }) + + t.Run("returns stale blocks only", func(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + // Use a longer stale threshold to ensure fresh blocks are not marked stale + staleThreshold := 1 * time.Minute + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{StaleThreshold: staleThreshold}, + ) + + // Register blocks with old timestamps directly in Redis + pendingKey := fmt.Sprintf("pending_blocks:%s:%s:%s", processor, network, mode) + oldTimestamp := float64(time.Now().Add(-5 * time.Minute).Unix()) + // Fresh timestamp should be well within the stale threshold + newTimestamp := float64(time.Now().Unix()) + + // Add stale block (old timestamp - 5 minutes ago, well past 1 minute threshold) + err = client.ZAdd(ctx, pendingKey, redis.Z{Score: oldTimestamp, Member: "200"}).Err() + require.NoError(t, err) + + // Add fresh block (current timestamp - not past 1 minute threshold) + err = client.ZAdd(ctx, pendingKey, redis.Z{Score: newTimestamp, Member: "201"}).Err() + require.NoError(t, err) + + staleBlocks, err := tracker.GetStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + + assert.Len(t, staleBlocks, 1) + assert.Contains(t, staleBlocks, uint64(200)) + assert.NotContains(t, staleBlocks, uint64(201)) + }) + + t.Run("blocks become stale after threshold", func(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + // Use 1 second threshold for reliable testing + staleThreshold := 1 * time.Second + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{StaleThreshold: staleThreshold}, + ) + + blockNum := uint64(300) + + err = tracker.RegisterBlock(ctx, blockNum, 1, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Initially not stale (we just registered it, well within 1 second) + staleBlocks, err := tracker.GetStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.NotContains(t, staleBlocks, blockNum, "block should not be stale immediately after registration") + + // Wait for stale threshold + time.Sleep(staleThreshold + 100*time.Millisecond) + + // Now stale + staleBlocks, err = tracker.GetStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.Contains(t, staleBlocks, blockNum, "block should be stale after threshold") + }) +} + +func TestBlockCompletionTracker_TrackTaskCompletion_LuaScript(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{}, + ) + + ctx := context.Background() + network := "test_network" + processor := "test_processor" + mode := "forwards" + blockNum := uint64(500) + + t.Run("returns false when block not registered", func(t *testing.T) { + complete, err := tracker.TrackTaskCompletion(ctx, "task1", blockNum, network, processor, mode) + require.NoError(t, err) + assert.False(t, complete) + }) + + t.Run("tracks completion correctly", func(t *testing.T) { + err := tracker.RegisterBlock(ctx, blockNum, 3, network, processor, mode, "test_queue") + require.NoError(t, err) + + // First task + complete, err := tracker.TrackTaskCompletion(ctx, "task1", blockNum, network, processor, mode) + require.NoError(t, err) + assert.False(t, complete) + + // Second task + complete, err = tracker.TrackTaskCompletion(ctx, "task2", blockNum, network, processor, mode) + require.NoError(t, err) + assert.False(t, complete) + + // Third task - should complete + complete, err = tracker.TrackTaskCompletion(ctx, "task3", blockNum, network, processor, mode) + require.NoError(t, err) + assert.True(t, complete) + }) + + t.Run("is idempotent", func(t *testing.T) { + blockNum2 := uint64(501) + + err := tracker.RegisterBlock(ctx, blockNum2, 2, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Track same task twice + complete, err := tracker.TrackTaskCompletion(ctx, "task1", blockNum2, network, processor, mode) + require.NoError(t, err) + assert.False(t, complete) + + complete, err = tracker.TrackTaskCompletion(ctx, "task1", blockNum2, network, processor, mode) + require.NoError(t, err) + assert.False(t, complete, "duplicate task should not increase count") + + // Second unique task should complete + complete, err = tracker.TrackTaskCompletion(ctx, "task2", blockNum2, network, processor, mode) + require.NoError(t, err) + assert.True(t, complete) + }) +} + +func TestBlockCompletionTracker_ClearStaleBlocks_Bulk(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + staleThreshold := 50 * time.Millisecond + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{StaleThreshold: staleThreshold}, + ) + + ctx := context.Background() + network := "test_network" + processor := "test_processor" + mode := "forwards" + + t.Run("returns 0 when no stale blocks", func(t *testing.T) { + cleared, err := tracker.ClearStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.Equal(t, 0, cleared) + }) + + t.Run("clears stale blocks and their keys", func(t *testing.T) { + // Register multiple blocks + for i := uint64(600); i < 605; i++ { + err := tracker.RegisterBlock(ctx, i, 1, network, processor, mode, "test_queue") + require.NoError(t, err) + } + + // Wait for blocks to become stale + time.Sleep(staleThreshold + 50*time.Millisecond) + + // Verify blocks are stale + staleBlocks, err := tracker.GetStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.Len(t, staleBlocks, 5) + + // Clear stale blocks + cleared, err := tracker.ClearStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.Equal(t, 5, cleared) + + // Verify blocks are cleared + staleBlocks, err = tracker.GetStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.Empty(t, staleBlocks) + + // Verify keys are deleted + for i := uint64(600); i < 605; i++ { + hasTracking, err := tracker.HasBlockTracking(ctx, i, network, processor, mode) + require.NoError(t, err) + assert.False(t, hasTracking, "block %d should not have tracking", i) + } + }) +} + +func TestBlockCompletionTracker_LuaScript_Concurrent(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{}, + ) + + ctx := context.Background() + network := "test_network" + processor := "test_processor" + mode := "forwards" + blockNum := uint64(700) + expectedTasks := 100 + + err = tracker.RegisterBlock(ctx, blockNum, expectedTasks, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Track completions concurrently + var ( + wg sync.WaitGroup + completionCount int + mu sync.Mutex + ) + + for i := 0; i < expectedTasks; i++ { + wg.Add(1) + + go func(taskNum int) { + defer wg.Done() + + taskID := fmt.Sprintf("task%d", taskNum) + + complete, trackErr := tracker.TrackTaskCompletion(ctx, taskID, blockNum, network, processor, mode) + require.NoError(t, trackErr) + + if complete { + mu.Lock() + + completionCount++ + + mu.Unlock() + } + }(i) + } + + wg.Wait() + + // Exactly one goroutine should have seen the completion + assert.Equal(t, 1, completionCount, "exactly one task should report completion") + + // Verify final state + completed, expected, _, err := tracker.GetBlockStatus(ctx, blockNum, network, processor, mode) + require.NoError(t, err) + assert.Equal(t, int64(expectedTasks), completed) + assert.Equal(t, int64(expectedTasks), expected) +} + +func TestBlockCompletionTracker_PendingBlocksKey_WithPrefix(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + prefix := "myapp" + tracker := NewBlockCompletionTracker( + client, prefix, logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{}, + ) + + ctx := context.Background() + network := "test_network" + processor := "test_processor" + mode := "forwards" + blockNum := uint64(800) + + err = tracker.RegisterBlock(ctx, blockNum, 1, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Verify the key includes the prefix + pendingKey := fmt.Sprintf("%s:pending_blocks:%s:%s:%s", prefix, processor, network, mode) + score, err := client.ZScore(ctx, pendingKey, "800").Result() + require.NoError(t, err) + assert.Greater(t, score, float64(0)) +} From 3be8a955e9b4249a136be5f7cab932c51efa91f5 Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Wed, 4 Feb 2026 09:48:27 +1000 Subject: [PATCH 3/4] lint --- pkg/processor/tracker/block_tracker_test.go | 59 +++++++++++---------- pkg/processor/tracker/queues_test.go | 4 +- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/pkg/processor/tracker/block_tracker_test.go b/pkg/processor/tracker/block_tracker_test.go index 7edc2e1..90c13b6 100644 --- a/pkg/processor/tracker/block_tracker_test.go +++ b/pkg/processor/tracker/block_tracker_test.go @@ -14,6 +14,11 @@ import ( "github.com/stretchr/testify/require" ) +const ( + testNetwork = "test_network" + testProcessor = "test_processor" +) + // mockStateProviderForTracker implements StateProvider for testing. type mockStateProviderForTracker struct { oldestIncomplete *uint64 @@ -51,7 +56,7 @@ func TestBlockCompletionTracker_HasBlockTracking(t *testing.T) { blockNum: 100, setupRedis: func(mr *miniredis.Miniredis, blockNum uint64) { mr.HSet( - fmt.Sprintf("block_meta:test_processor:test_network:forwards:%d", blockNum), + fmt.Sprintf("block_meta:%s:%s:%s:%d", testProcessor, testNetwork, FORWARDS_MODE, blockNum), "enqueued_at", fmt.Sprintf("%d", time.Now().Unix()), ) }, @@ -69,7 +74,7 @@ func TestBlockCompletionTracker_HasBlockTracking(t *testing.T) { setupRedis: func(mr *miniredis.Miniredis, _ uint64) { // Set up tracking for a DIFFERENT block mr.HSet( - "block_meta:test_processor:test_network:forwards:999", + fmt.Sprintf("block_meta:%s:%s:%s:999", testProcessor, testNetwork, FORWARDS_MODE), "enqueued_at", fmt.Sprintf("%d", time.Now().Unix()), ) }, @@ -81,7 +86,7 @@ func TestBlockCompletionTracker_HasBlockTracking(t *testing.T) { setupRedis: func(mr *miniredis.Miniredis, blockNum uint64) { // Note: prefix is empty in this test, so key format doesn't include prefix mr.HSet( - fmt.Sprintf("block_meta:test_processor:test_network:forwards:%d", blockNum), + fmt.Sprintf("block_meta:%s:%s:%s:%d", testProcessor, testNetwork, FORWARDS_MODE, blockNum), "enqueued_at", fmt.Sprintf("%d", time.Now().Unix()), ) }, @@ -108,7 +113,7 @@ func TestBlockCompletionTracker_HasBlockTracking(t *testing.T) { got, err := tracker.HasBlockTracking( context.Background(), tt.blockNum, - "test_network", "test_processor", "forwards", + testNetwork, testProcessor, FORWARDS_MODE, ) require.NoError(t, err) @@ -135,13 +140,13 @@ func TestBlockCompletionTracker_HasBlockTracking_WithPrefix(t *testing.T) { // Set up tracking with prefix blockNum := uint64(100) mr.HSet( - fmt.Sprintf("%s:block_meta:test_processor:test_network:forwards:%d", prefix, blockNum), + fmt.Sprintf("%s:block_meta:%s:%s:%s:%d", prefix, testProcessor, testNetwork, FORWARDS_MODE, blockNum), "enqueued_at", fmt.Sprintf("%d", time.Now().Unix()), ) got, err := tracker.HasBlockTracking( context.Background(), blockNum, - "test_network", "test_processor", "forwards", + testNetwork, testProcessor, FORWARDS_MODE, ) require.NoError(t, err) @@ -163,9 +168,9 @@ func TestBlockCompletionTracker_HasBlockTracking_AfterRegisterBlock(t *testing.T ) blockNum := uint64(100) - network := "test_network" - processor := "test_processor" - mode := "forwards" + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE // Verify block has no tracking initially hasTracking, err := tracker.HasBlockTracking(context.Background(), blockNum, network, processor, mode) @@ -289,9 +294,9 @@ func TestBlockCompletionTracker_PendingBlocksSortedSet(t *testing.T) { ) ctx := context.Background() - network := "test_network" - processor := "test_processor" - mode := "forwards" + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE t.Run("RegisterBlock adds to sorted set", func(t *testing.T) { blockNum := uint64(100) @@ -350,9 +355,9 @@ func TestBlockCompletionTracker_PendingBlocksSortedSet(t *testing.T) { func TestBlockCompletionTracker_GetStaleBlocks_SortedSet(t *testing.T) { ctx := context.Background() - network := "test_network" - processor := "test_processor" - mode := "forwards" + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE t.Run("returns empty when no blocks registered", func(t *testing.T) { mr, err := miniredis.Run() @@ -462,9 +467,9 @@ func TestBlockCompletionTracker_TrackTaskCompletion_LuaScript(t *testing.T) { ) ctx := context.Background() - network := "test_network" - processor := "test_processor" - mode := "forwards" + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE blockNum := uint64(500) t.Run("returns false when block not registered", func(t *testing.T) { @@ -531,9 +536,9 @@ func TestBlockCompletionTracker_ClearStaleBlocks_Bulk(t *testing.T) { ) ctx := context.Background() - network := "test_network" - processor := "test_processor" - mode := "forwards" + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE t.Run("returns 0 when no stale blocks", func(t *testing.T) { cleared, err := tracker.ClearStaleBlocks(ctx, network, processor, mode) @@ -590,9 +595,9 @@ func TestBlockCompletionTracker_LuaScript_Concurrent(t *testing.T) { ) ctx := context.Background() - network := "test_network" - processor := "test_processor" - mode := "forwards" + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE blockNum := uint64(700) expectedTasks := 100 @@ -655,9 +660,9 @@ func TestBlockCompletionTracker_PendingBlocksKey_WithPrefix(t *testing.T) { ) ctx := context.Background() - network := "test_network" - processor := "test_processor" - mode := "forwards" + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE blockNum := uint64(800) err = tracker.RegisterBlock(ctx, blockNum, 1, network, processor, mode, "test_queue") diff --git a/pkg/processor/tracker/queues_test.go b/pkg/processor/tracker/queues_test.go index b9dc614..7d5b1ef 100644 --- a/pkg/processor/tracker/queues_test.go +++ b/pkg/processor/tracker/queues_test.go @@ -6,6 +6,8 @@ import ( "github.com/stretchr/testify/assert" ) +const testProcessorName = "test_processor" + func TestProcessReprocessForwardsQueue(t *testing.T) { tests := []struct { name string @@ -212,7 +214,7 @@ func TestPrefixedProcessReprocessBackwardsQueue_EmptyPrefix(t *testing.T) { func TestQueueConsistency(t *testing.T) { // Ensure all queue functions follow consistent naming conventions - processorName := "test_processor" + processorName := testProcessorName prefix := "prefix" forwardsUnprefixed := ProcessForwardsQueue(processorName) From 0d08eecca42a7778f9a7c42b2bb878d3197210be Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Wed, 4 Feb 2026 13:20:44 +1000 Subject: [PATCH 4/4] fix: add IsBlockComplete check to prevent gap detection race condition Add verification that a block is still incomplete in ClickHouse before calling ReprocessBlock in gap detection. This closes a TOCTOU race window where a block could complete (MarkBlockComplete writes complete=1 and cleans up Redis tracking) between when gap detection queries ClickHouse and when it decides to reprocess. Previously, gap detection only checked HasBlockTracking() which returns false after a block completes and Redis is cleaned up, causing already- complete blocks to be unnecessarily reprocessed. - Add IsBlockComplete() to state manager for fresh ClickHouse query - Add completion check in checkGaps() before ReprocessBlock call - Add unit test for race condition logic --- pkg/processor/manager.go | 98 ++++++++++++++----- pkg/processor/manager_test.go | 62 ++++++++++++ pkg/processor/tracker/limiter.go | 47 +++++---- pkg/processor/tracker/limiter_gap_test.go | 9 +- pkg/processor/tracker/processor.go | 3 + .../transaction/simple/block_processing.go | 53 +++++++++- pkg/processor/transaction/simple/processor.go | 17 ++-- .../transaction/structlog/block_processing.go | 47 +++++++++ .../transaction/structlog/processor.go | 17 ++-- .../structlog_agg/block_processing.go | 47 +++++++++ .../transaction/structlog_agg/processor.go | 17 ++-- pkg/state/manager.go | 50 ++++++++-- 12 files changed, 391 insertions(+), 76 deletions(-) diff --git a/pkg/processor/manager.go b/pkg/processor/manager.go index 2b3e2f7..6251eb9 100644 --- a/pkg/processor/manager.go +++ b/pkg/processor/manager.go @@ -74,10 +74,11 @@ type Manager struct { processors map[string]tracker.BlockProcessor // Redis/Asynq for distributed processing - redisClient *r.Client - redisPrefix string - asynqClient *asynq.Client - asynqServer *asynq.Server + redisClient *r.Client + redisPrefix string + asynqClient *asynq.Client + asynqServer *asynq.Server + asynqInspector *asynq.Inspector network *ethereum.Network @@ -121,6 +122,9 @@ func NewManager(log logrus.FieldLogger, config *Config, pool *ethereum.Pool, sta // Initialize Asynq client with its own Redis connection asynqClient := asynq.NewClient(asynqRedisOpt) + // Initialize Asynq inspector for task management (deletion before reprocessing) + asynqInspector := asynq.NewInspector(asynqRedisOpt) + var asynqServer *asynq.Server // Setup queue priorities dynamically based on processors // This will be populated after processors are initialized @@ -145,6 +149,7 @@ func NewManager(log logrus.FieldLogger, config *Config, pool *ethereum.Pool, sta redisPrefix: redisPrefix, asynqClient: asynqClient, asynqServer: asynqServer, + asynqInspector: asynqInspector, stopChan: make(chan struct{}), blockProcessStop: make(chan struct{}), queueHighWaterMarks: make(map[string]int), @@ -369,6 +374,13 @@ func (m *Manager) Stop(ctx context.Context) error { } } + // Close Asynq inspector + if m.asynqInspector != nil { + if err := m.asynqInspector.Close(); err != nil { + m.log.WithError(err).Error("Failed to close Asynq inspector") + } + } + // Wait for all goroutines to complete m.wg.Wait() m.log.Info("All goroutines stopped") @@ -384,13 +396,14 @@ func (m *Manager) initializeProcessors(ctx context.Context) error { m.log.Debug("Transaction structlog processor is enabled, initializing...") processor, err := transaction_structlog.New(&transaction_structlog.Dependencies{ - Log: m.log.WithField("processor", "transaction_structlog"), - Pool: m.pool, - State: m.state, - AsynqClient: m.asynqClient, - RedisClient: m.redisClient, - Network: m.network, - RedisPrefix: m.redisPrefix, + Log: m.log.WithField("processor", "transaction_structlog"), + Pool: m.pool, + State: m.state, + AsynqClient: m.asynqClient, + AsynqInspector: m.asynqInspector, + RedisClient: m.redisClient, + Network: m.network, + RedisPrefix: m.redisPrefix, }, &m.config.TransactionStructlog) if err != nil { return fmt.Errorf("failed to create transaction_structlog processor: %w", err) @@ -415,13 +428,14 @@ func (m *Manager) initializeProcessors(ctx context.Context) error { m.log.Debug("Transaction simple processor is enabled, initializing...") processor, err := transaction_simple.New(&transaction_simple.Dependencies{ - Log: m.log.WithField("processor", "transaction_simple"), - Pool: m.pool, - State: m.state, - AsynqClient: m.asynqClient, - RedisClient: m.redisClient, - Network: m.network, - RedisPrefix: m.redisPrefix, + Log: m.log.WithField("processor", "transaction_simple"), + Pool: m.pool, + State: m.state, + AsynqClient: m.asynqClient, + AsynqInspector: m.asynqInspector, + RedisClient: m.redisClient, + Network: m.network, + RedisPrefix: m.redisPrefix, }, &m.config.TransactionSimple) if err != nil { return fmt.Errorf("failed to create transaction_simple processor: %w", err) @@ -446,13 +460,14 @@ func (m *Manager) initializeProcessors(ctx context.Context) error { m.log.Debug("Transaction structlog_agg processor is enabled, initializing...") processor, err := transaction_structlog_agg.New(&transaction_structlog_agg.Dependencies{ - Log: m.log.WithField("processor", "transaction_structlog_agg"), - Pool: m.pool, - State: m.state, - AsynqClient: m.asynqClient, - RedisClient: m.redisClient, - Network: m.network, - RedisPrefix: m.redisPrefix, + Log: m.log.WithField("processor", "transaction_structlog_agg"), + Pool: m.pool, + State: m.state, + AsynqClient: m.asynqClient, + AsynqInspector: m.asynqInspector, + RedisClient: m.redisClient, + Network: m.network, + RedisPrefix: m.redisPrefix, }, &m.config.TransactionStructlogAgg) if err != nil { return fmt.Errorf("failed to create transaction_structlog_agg processor: %w", err) @@ -1508,6 +1523,39 @@ func (m *Manager) checkGaps(ctx context.Context) { // Handle INCOMPLETE blocks -> ReprocessBlock (row exists, just stuck) for _, gapBlock := range gapResult.Incomplete { + // Skip if block already has active Redis tracking (being processed) + hasTracking, trackErr := processor.GetCompletionTracker().HasBlockTracking( + ctx, gapBlock, m.network.Name, processorName, m.config.Mode) + if trackErr != nil { + m.log.WithError(trackErr).WithField("block", gapBlock).Debug("Failed to check block tracking") + } + + if hasTracking { + m.log.WithFields(logrus.Fields{ + "processor": processorName, + "block": gapBlock, + }).Debug("Skipping incomplete block - already has active Redis tracking") + + continue + } + + // Re-verify block is still incomplete in ClickHouse. + // This closes a race window where a block completes between + // the gap scan query and now (Redis tracking cleaned up). + isComplete, completeErr := m.state.IsBlockComplete(ctx, gapBlock, m.network.Name, processorName) + if completeErr != nil { + m.log.WithError(completeErr).WithField("block", gapBlock).Debug("Failed to verify block complete status") + } + + if isComplete { + m.log.WithFields(logrus.Fields{ + "processor": processorName, + "block": gapBlock, + }).Debug("Skipping incomplete block - completed during gap scan (race avoided)") + + continue + } + if reprocessErr := processor.ReprocessBlock(ctx, gapBlock); reprocessErr != nil { m.log.WithError(reprocessErr).WithFields(logrus.Fields{ "processor": processorName, diff --git a/pkg/processor/manager_test.go b/pkg/processor/manager_test.go index 06be9c9..3f98e7e 100644 --- a/pkg/processor/manager_test.go +++ b/pkg/processor/manager_test.go @@ -435,6 +435,68 @@ func TestManager_LeaderElectionDisabled(t *testing.T) { // Race condition tests +// TestGapDetectionRaceConditionLogic tests the race condition where a block +// completes between gap scan and reprocess decision. +// +// The race occurs when: +// 1. Gap detection queries ClickHouse - returns block 100 as incomplete +// 2. Block 100 completes (MarkBlockComplete writes complete=1, cleans Redis) +// 3. Gap detection checks Redis - HasBlockTracking returns false +// 4. Gap detection calls ReprocessBlock - WRONG! Block is already complete +// +// Current (buggy) logic in checkGaps(): +// +// if !hasTracking { ReprocessBlock() } // BUG: doesn't check if complete +// +// Fixed logic: +// +// if !hasTracking && !isComplete { ReprocessBlock() } +func TestGapDetectionRaceConditionLogic(t *testing.T) { + tests := []struct { + name string + hasTracking bool // Redis tracking exists + isComplete bool // Block is complete in ClickHouse + shouldReprocess bool // Expected decision + }{ + { + name: "has tracking - skip (being processed)", + hasTracking: true, + isComplete: false, + shouldReprocess: false, + }, + { + name: "no tracking, incomplete - reprocess (orphaned)", + hasTracking: false, + isComplete: false, + shouldReprocess: true, + }, + { + name: "no tracking, complete - skip (race condition)", + hasTracking: false, + isComplete: true, + shouldReprocess: false, // THIS CASE IS THE BUG - currently would reprocess! + }, + { + name: "has tracking, complete - skip (already being processed)", + hasTracking: true, + isComplete: true, + shouldReprocess: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // FIXED logic (now in checkGaps): + // Only reprocess if no Redis tracking AND not complete in ClickHouse + shouldReprocess := !tt.hasTracking && !tt.isComplete + + assert.Equal(t, tt.shouldReprocess, shouldReprocess, + "block with hasTracking=%v, isComplete=%v should reprocess=%v, but got %v", + tt.hasTracking, tt.isComplete, tt.shouldReprocess, shouldReprocess) + }) + } +} + // TestManager_RaceConditions specifically tests for race conditions in manager. func TestManager_RaceConditions(t *testing.T) { log := logrus.New() diff --git a/pkg/processor/tracker/limiter.go b/pkg/processor/tracker/limiter.go index 79101a0..2342b75 100644 --- a/pkg/processor/tracker/limiter.go +++ b/pkg/processor/tracker/limiter.go @@ -240,40 +240,51 @@ func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRang return nil, fmt.Errorf("state provider does not support gap detection") } + // Get min and max stored blocks to constrain our search range. + // We can only find gaps within blocks that have actually been stored. + minStored, maxStored, err := gapProvider.GetMinMaxStoredBlocks(ctx, l.network, l.processor) + if err != nil { + return nil, fmt.Errorf("failed to get min/max stored blocks: %w", err) + } + + if minStored == nil || maxStored == nil { + // No blocks stored yet + return &GapResult{ScanDuration: time.Since(startTime)}, nil + } + + // Use the stored max as reference point, not the chain head. + // We can only find gaps within data we've actually stored. + referenceBlock := maxStored.Uint64() + var minBlock uint64 if lookbackRange == 0 { // Unlimited: scan from oldest stored block - minStored, _, err := gapProvider.GetMinMaxStoredBlocks(ctx, l.network, l.processor) - if err != nil { - return nil, fmt.Errorf("failed to get min stored block: %w", err) - } - - if minStored == nil { - // No blocks stored yet - return &GapResult{ScanDuration: time.Since(startTime)}, nil - } - minBlock = minStored.Uint64() } else { - // Limited: scan from currentBlock - lookbackRange - if currentBlock > lookbackRange { - minBlock = currentBlock - lookbackRange + // Limited: scan from referenceBlock - lookbackRange + if referenceBlock > lookbackRange { + minBlock = referenceBlock - lookbackRange + } + + // Constrain to actual stored range - can't find gaps before the first stored block + if minBlock < minStored.Uint64() { + minBlock = minStored.Uint64() } } // Calculate maxBlock to exclude the window handled by IsBlockedByIncompleteBlocks. // The limiter already handles blocks within [currentBlock - maxPendingBlockRange, currentBlock], - // so we only scan up to (currentBlock - maxPendingBlockRange - 1) to avoid double work. - maxBlock := currentBlock + // so we only scan up to (referenceBlock - maxPendingBlockRange - 1) to avoid double work. + maxBlock := referenceBlock if l.config.MaxPendingBlockRange > 0 { exclusionWindow := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated in config - if currentBlock > exclusionWindow { - maxBlock = currentBlock - exclusionWindow - 1 + if referenceBlock > exclusionWindow { + maxBlock = referenceBlock - exclusionWindow - 1 } else { - // Current block is within the exclusion window, nothing to scan + // Reference block is within the exclusion window, nothing to scan return &GapResult{ScanDuration: time.Since(startTime)}, nil } } diff --git a/pkg/processor/tracker/limiter_gap_test.go b/pkg/processor/tracker/limiter_gap_test.go index 2c73c27..b857a2d 100644 --- a/pkg/processor/tracker/limiter_gap_test.go +++ b/pkg/processor/tracker/limiter_gap_test.go @@ -246,7 +246,7 @@ func TestGetGaps_ErrorFromGetMinMax(t *testing.T) { require.Error(t, err) assert.Nil(t, result) - assert.Contains(t, err.Error(), "failed to get min stored block") + assert.Contains(t, err.Error(), "failed to get min/max stored blocks") } func TestGetGaps_NoBlocksStored(t *testing.T) { @@ -431,7 +431,9 @@ func TestGetGaps_ScanDurationTracked(t *testing.T) { func TestGetGaps_RespectsLookbackRange(t *testing.T) { mockProvider := &mockGapStateProvider{ - // GetMinMaxStoredBlocks should NOT be called when lookbackRange is set + // minStoredBlock is always needed now to constrain the search range + minStoredBlock: big.NewInt(50), // Matches the calculated min from lookback + maxStoredBlock: big.NewInt(100), incompleteBlocksInRange: []uint64{75, 80}, missingBlocksInRange: []uint64{77}, } @@ -459,9 +461,12 @@ func TestGetGaps_RespectsLookbackRange(t *testing.T) { func TestGetGaps_LookbackRangeGreaterThanCurrentBlock(t *testing.T) { // When lookbackRange is greater than currentBlock, minBlock should be 0 + // but constrained to minStoredBlock mockProvider := &mockGapStateProvider{ incompleteBlocksInRange: []uint64{3}, missingBlocksInRange: []uint64{5}, + minStoredBlock: big.NewInt(0), // Set min stored to 0 so gaps at 3,5 are valid + maxStoredBlock: big.NewInt(10), } limiter := NewLimiter(&LimiterDeps{ diff --git a/pkg/processor/tracker/processor.go b/pkg/processor/tracker/processor.go index ee1e4af..9f0322f 100644 --- a/pkg/processor/tracker/processor.go +++ b/pkg/processor/tracker/processor.go @@ -126,6 +126,9 @@ type BlockProcessor interface { // ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks. // This is used for gap filling of missing blocks (blocks with no row in DB). ProcessBlock(ctx context.Context, block execution.Block) error + + // GetCompletionTracker returns the block completion tracker for checking tracking status. + GetCompletionTracker() *BlockCompletionTracker } // QueueInfo contains information about a processor queue. diff --git a/pkg/processor/transaction/simple/block_processing.go b/pkg/processor/transaction/simple/block_processing.go index 631b130..5d8786d 100644 --- a/pkg/processor/transaction/simple/block_processing.go +++ b/pkg/processor/transaction/simple/block_processing.go @@ -265,6 +265,45 @@ func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) err return nil } +// deleteTaskFromMainQueue attempts to delete a task from the main processing queue. +// Returns nil if deleted, not found, or active (acceptable to skip active tasks). +func (p *Processor) deleteTaskFromMainQueue(taskID string) error { + if p.asynqInspector == nil { + return nil + } + + var mainQueue string + if p.processingMode == tracker.BACKWARDS_MODE { + mainQueue = p.getProcessBackwardsQueue() + } else { + mainQueue = p.getProcessForwardsQueue() + } + + err := p.asynqInspector.DeleteTask(mainQueue, taskID) + if err == nil { + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "queue": mainQueue, + }).Debug("Deleted task from main queue before reprocess") + + return nil + } + + // Task not found or queue not found - fine, proceed + if errors.Is(err, asynq.ErrTaskNotFound) || errors.Is(err, asynq.ErrQueueNotFound) { + return nil + } + + // Active tasks can't be deleted - that's OK, they'll complete naturally + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "queue": mainQueue, + "error": err, + }).Debug("Could not delete task from main queue (may be active)") + + return nil +} + // ReprocessBlock re-enqueues tasks for an orphaned block. // Used when a block is in ClickHouse (complete=0) but has no Redis tracking. // TaskID deduplication ensures no duplicate tasks are created. @@ -330,6 +369,13 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { return fmt.Errorf("failed to create task: %w", err) } + // Try to delete existing task from main queue before re-enqueueing + var deletedFromMain bool + + if delErr := p.deleteTaskFromMainQueue(taskID); delErr == nil { + deletedFromMain = true + } + // Enqueue to the high-priority reprocess queue err = p.EnqueueTask(ctx, task, asynq.Queue(queue), @@ -349,9 +395,10 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { } p.log.WithFields(logrus.Fields{ - "block_number": blockNum, - "tx_count": len(block.Transactions()), - "queue": queue, + "block_number": blockNum, + "tx_count": len(block.Transactions()), + "queue": queue, + "deleted_from_main": deletedFromMain, }).Info("Reprocessed orphaned block to high-priority queue") return nil diff --git a/pkg/processor/transaction/simple/processor.go b/pkg/processor/transaction/simple/processor.go index 2550aa7..f8b0ada 100644 --- a/pkg/processor/transaction/simple/processor.go +++ b/pkg/processor/transaction/simple/processor.go @@ -24,13 +24,14 @@ var _ tracker.BlockProcessor = (*Processor)(nil) // Dependencies contains the dependencies needed for the processor. type Dependencies struct { - Log logrus.FieldLogger - Pool *ethereum.Pool - Network *ethereum.Network - State *state.Manager - AsynqClient *asynq.Client - RedisClient *redis.Client - RedisPrefix string + Log logrus.FieldLogger + Pool *ethereum.Pool + Network *ethereum.Network + State *state.Manager + AsynqClient *asynq.Client + AsynqInspector *asynq.Inspector + RedisClient *redis.Client + RedisPrefix string } // Processor handles simple transaction processing. @@ -42,6 +43,7 @@ type Processor struct { config *Config network *ethereum.Network asynqClient *asynq.Client + asynqInspector *asynq.Inspector processingMode string redisPrefix string @@ -120,6 +122,7 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { config: config, network: deps.Network, asynqClient: deps.AsynqClient, + asynqInspector: deps.AsynqInspector, processingMode: tracker.FORWARDS_MODE, // Default mode redisPrefix: deps.RedisPrefix, Limiter: limiter, diff --git a/pkg/processor/transaction/structlog/block_processing.go b/pkg/processor/transaction/structlog/block_processing.go index 774b612..22b104b 100644 --- a/pkg/processor/transaction/structlog/block_processing.go +++ b/pkg/processor/transaction/structlog/block_processing.go @@ -364,6 +364,45 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution return enqueuedCount + skippedCount, nil } +// deleteTaskFromMainQueue attempts to delete a task from the main processing queue. +// Returns nil if deleted, not found, or active (acceptable to skip active tasks). +func (p *Processor) deleteTaskFromMainQueue(taskID string) error { + if p.asynqInspector == nil { + return nil + } + + var mainQueue string + if p.processingMode == tracker.BACKWARDS_MODE { + mainQueue = p.getProcessBackwardsQueue() + } else { + mainQueue = p.getProcessForwardsQueue() + } + + err := p.asynqInspector.DeleteTask(mainQueue, taskID) + if err == nil { + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "queue": mainQueue, + }).Debug("Deleted task from main queue before reprocess") + + return nil + } + + // Task not found or queue not found - fine, proceed + if errors.Is(err, asynq.ErrTaskNotFound) || errors.Is(err, asynq.ErrQueueNotFound) { + return nil + } + + // Active tasks can't be deleted - that's OK, they'll complete naturally + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "queue": mainQueue, + "error": err, + }).Debug("Could not delete task from main queue (may be active)") + + return nil +} + // ReprocessBlock re-enqueues tasks for an orphaned block. // Used when a block is in ClickHouse (complete=0) but has no Redis tracking. // TaskID deduplication ensures no duplicate tasks are created. @@ -414,6 +453,8 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { var skippedCount int + var deletedCount int + for index, tx := range block.Transactions() { payload := &ProcessPayload{ BlockNumber: *block.Number(), @@ -438,6 +479,11 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { return fmt.Errorf("failed to create task for tx %s: %w", tx.Hash().String(), err) } + // Try to delete existing task from main queue before re-enqueueing + if delErr := p.deleteTaskFromMainQueue(taskID); delErr == nil { + deletedCount++ + } + // Enqueue to the high-priority reprocess queue err = p.EnqueueTask(ctx, task, asynq.Queue(queue), @@ -462,6 +508,7 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { "expected_count": expectedCount, "enqueued_count": enqueuedCount, "skipped_count": skippedCount, + "deleted_count": deletedCount, "queue": queue, }).Info("Reprocessed orphaned block to high-priority queue") diff --git a/pkg/processor/transaction/structlog/processor.go b/pkg/processor/transaction/structlog/processor.go index f58e69d..34e1281 100644 --- a/pkg/processor/transaction/structlog/processor.go +++ b/pkg/processor/transaction/structlog/processor.go @@ -30,13 +30,14 @@ var _ tracker.BlockProcessor = (*Processor)(nil) // Dependencies contains the dependencies needed for the processor. type Dependencies struct { - Log logrus.FieldLogger - Pool *ethereum.Pool - Network *ethereum.Network - State *state.Manager - AsynqClient *asynq.Client - RedisClient *redis.Client - RedisPrefix string + Log logrus.FieldLogger + Pool *ethereum.Pool + Network *ethereum.Network + State *state.Manager + AsynqClient *asynq.Client + AsynqInspector *asynq.Inspector + RedisClient *redis.Client + RedisPrefix string } // Processor handles transaction structlog processing. @@ -48,6 +49,7 @@ type Processor struct { config *Config network *ethereum.Network asynqClient *asynq.Client + asynqInspector *asynq.Inspector processingMode string redisPrefix string @@ -127,6 +129,7 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { clickhouse: clickhouseClient, config: config, asynqClient: deps.AsynqClient, + asynqInspector: deps.AsynqInspector, processingMode: tracker.FORWARDS_MODE, // Default mode redisPrefix: deps.RedisPrefix, Limiter: limiter, diff --git a/pkg/processor/transaction/structlog_agg/block_processing.go b/pkg/processor/transaction/structlog_agg/block_processing.go index 5dcd245..8030daa 100644 --- a/pkg/processor/transaction/structlog_agg/block_processing.go +++ b/pkg/processor/transaction/structlog_agg/block_processing.go @@ -364,6 +364,45 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution return enqueuedCount + skippedCount, nil } +// deleteTaskFromMainQueue attempts to delete a task from the main processing queue. +// Returns nil if deleted, not found, or active (acceptable to skip active tasks). +func (p *Processor) deleteTaskFromMainQueue(taskID string) error { + if p.asynqInspector == nil { + return nil + } + + var mainQueue string + if p.processingMode == tracker.BACKWARDS_MODE { + mainQueue = p.getProcessBackwardsQueue() + } else { + mainQueue = p.getProcessForwardsQueue() + } + + err := p.asynqInspector.DeleteTask(mainQueue, taskID) + if err == nil { + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "queue": mainQueue, + }).Debug("Deleted task from main queue before reprocess") + + return nil + } + + // Task not found or queue not found - fine, proceed + if errors.Is(err, asynq.ErrTaskNotFound) || errors.Is(err, asynq.ErrQueueNotFound) { + return nil + } + + // Active tasks can't be deleted - that's OK, they'll complete naturally + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "queue": mainQueue, + "error": err, + }).Debug("Could not delete task from main queue (may be active)") + + return nil +} + // ReprocessBlock re-enqueues tasks for an orphaned block. // Used when a block is in ClickHouse (complete=0) but has no Redis tracking. // TaskID deduplication ensures no duplicate tasks are created. @@ -414,6 +453,8 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { var skippedCount int + var deletedCount int + for index, tx := range block.Transactions() { payload := &ProcessPayload{ BlockNumber: *block.Number(), @@ -438,6 +479,11 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { return fmt.Errorf("failed to create task for tx %s: %w", tx.Hash().String(), err) } + // Try to delete existing task from main queue before re-enqueueing + if delErr := p.deleteTaskFromMainQueue(taskID); delErr == nil { + deletedCount++ + } + // Enqueue to the high-priority reprocess queue err = p.EnqueueTask(ctx, task, asynq.Queue(queue), @@ -462,6 +508,7 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { "expected_count": expectedCount, "enqueued_count": enqueuedCount, "skipped_count": skippedCount, + "deleted_count": deletedCount, "queue": queue, }).Info("Reprocessed orphaned block to high-priority queue") diff --git a/pkg/processor/transaction/structlog_agg/processor.go b/pkg/processor/transaction/structlog_agg/processor.go index 4a765a7..7f7c03b 100644 --- a/pkg/processor/transaction/structlog_agg/processor.go +++ b/pkg/processor/transaction/structlog_agg/processor.go @@ -30,13 +30,14 @@ var _ tracker.BlockProcessor = (*Processor)(nil) // Dependencies contains the dependencies needed for the processor. type Dependencies struct { - Log logrus.FieldLogger - Pool *ethereum.Pool - Network *ethereum.Network - State *state.Manager - AsynqClient *asynq.Client - RedisClient *redis.Client - RedisPrefix string + Log logrus.FieldLogger + Pool *ethereum.Pool + Network *ethereum.Network + State *state.Manager + AsynqClient *asynq.Client + AsynqInspector *asynq.Inspector + RedisClient *redis.Client + RedisPrefix string } // insertRow wraps CallFrameRow with additional context needed for batched inserts. @@ -58,6 +59,7 @@ type Processor struct { config *Config network *ethereum.Network asynqClient *asynq.Client + asynqInspector *asynq.Inspector processingMode string redisPrefix string @@ -137,6 +139,7 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { clickhouse: clickhouseClient, config: config, asynqClient: deps.AsynqClient, + asynqInspector: deps.AsynqInspector, processingMode: tracker.FORWARDS_MODE, // Default mode redisPrefix: deps.RedisPrefix, Limiter: limiter, diff --git a/pkg/state/manager.go b/pkg/state/manager.go index f2de16a..5e55925 100644 --- a/pkg/state/manager.go +++ b/pkg/state/manager.go @@ -721,7 +721,7 @@ func (s *Manager) GetIncompleteBlocksInRange( // GetMissingBlocksInRange returns block numbers that have no row in the database. // This finds blocks that were never processed, not incomplete blocks. -// Uses ClickHouse's numbers() function to generate a sequence and LEFT JOIN to find gaps. +// Uses ClickHouse's numbers() function to generate a sequence and NOT IN to find gaps. func (s *Manager) GetMissingBlocksInRange( ctx context.Context, network, processor string, @@ -729,19 +729,21 @@ func (s *Manager) GetMissingBlocksInRange( limit int, ) ([]uint64, error) { // Use numbers() to generate a sequence from minBlock to maxBlock, - // then LEFT JOIN to find blocks that don't exist in storage. + // then NOT IN to find blocks that don't exist in storage. + // NOTE: We use NOT IN instead of LEFT JOIN + IS NULL because ClickHouse's + // UInt64 columns can't be NULL - LEFT JOIN produces 0 instead of NULL for + // non-matching rows, which breaks the IS NULL check. query := fmt.Sprintf(` - SELECT n.number AS block_number - FROM numbers(%d, %d) AS n - LEFT JOIN ( + SELECT number AS block_number + FROM numbers(%d, %d) + WHERE number NOT IN ( SELECT DISTINCT block_number FROM %s FINAL WHERE processor = '%s' AND meta_network_name = '%s' AND block_number >= %d AND block_number <= %d - ) AS e ON n.number = e.block_number - WHERE e.block_number IS NULL + ) ORDER BY block_number ASC LIMIT %d `, minBlock, maxBlock-minBlock+1, s.storageTable, processor, network, @@ -830,6 +832,40 @@ func (s *Manager) IsBlockRecentlyProcessed(ctx context.Context, blockNumber uint return count != nil && *count > 0, nil } +// IsBlockComplete checks if a block is marked complete in ClickHouse. +// Uses FINAL to get the latest state after ReplacingMergeTree deduplication. +// This is used to prevent race conditions where a block completes between +// gap detection scan and the reprocess decision. +func (s *Manager) IsBlockComplete(ctx context.Context, blockNumber uint64, network, processor string) (bool, error) { + query := fmt.Sprintf(` + SELECT toUInt64(complete) as complete + FROM %s FINAL + WHERE processor = '%s' + AND meta_network_name = '%s' + AND block_number = %d + LIMIT 1 + `, s.storageTable, processor, network, blockNumber) + + s.log.WithFields(logrus.Fields{ + "network": network, + "processor": processor, + "block_number": blockNumber, + "table": s.storageTable, + }).Debug("Checking if block is complete") + + complete, err := s.storageClient.QueryUInt64(ctx, query, "complete") + if err != nil { + return false, fmt.Errorf("failed to check block complete status: %w", err) + } + + // No row found = not complete + if complete == nil { + return false, nil + } + + return *complete == 1, nil +} + // GetHeadDistance calculates the distance between current processing block and the relevant head. func (s *Manager) GetHeadDistance(ctx context.Context, processor, network, mode string, executionHead *big.Int) (distance int64, headType string, err error) { // Get the current processing block (what would be next to process)