Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/clickhouse/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type MockClient struct {
DoFunc func(ctx context.Context, query ch.Query) error
QueryUInt64Func func(ctx context.Context, query string, columnName string) (*uint64, error)
QueryMinMaxUInt64Func func(ctx context.Context, query string) (minVal, maxVal *uint64, err error)
QueryUInt64SliceFunc func(ctx context.Context, query string, columnName string) ([]uint64, error)

// Track calls for assertions
Calls []MockCall
Expand Down Expand Up @@ -54,6 +55,9 @@ func NewMockClient() *MockClient {
QueryMinMaxUInt64Func: func(ctx context.Context, query string) (minVal, maxVal *uint64, err error) {
return nil, nil, nil
},
QueryUInt64SliceFunc: func(ctx context.Context, query string, columnName string) ([]uint64, error) {
return []uint64{}, nil
},
Calls: make([]MockCall, 0),
}
}
Expand Down Expand Up @@ -86,6 +90,20 @@ func (m *MockClient) QueryMinMaxUInt64(ctx context.Context, query string) (minVa
return nil, nil, nil
}

// QueryUInt64Slice implements ClientInterface.
func (m *MockClient) QueryUInt64Slice(ctx context.Context, query string, columnName string) ([]uint64, error) {
m.Calls = append(m.Calls, MockCall{
Method: "QueryUInt64Slice",
Args: []any{ctx, query, columnName},
})

if m.QueryUInt64SliceFunc != nil {
return m.QueryUInt64SliceFunc(ctx, query, columnName)
}

return []uint64{}, nil
}

// Execute implements ClientInterface.
func (m *MockClient) Execute(ctx context.Context, query string) error {
m.Calls = append(m.Calls, MockCall{
Expand Down
22 changes: 22 additions & 0 deletions pkg/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,26 @@ var (
Name: "execution_processor_row_buffer_pending_tasks",
Help: "Current number of tasks waiting for their rows to be flushed",
}, []string{"network", "processor", "table"})

// Gap detection metrics.
GapsDetected = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "execution_processor_gaps_detected_total",
Help: "Total number of gaps detected during gap scans",
}, []string{"network", "processor", "gap_type"}) // gap_type: "incomplete" or "missing"

GapsReprocessed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "execution_processor_gaps_reprocessed_total",
Help: "Total number of gap blocks reprocessed",
}, []string{"network", "processor", "gap_type", "status"}) // status: "success" or "error"

GapScanDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "execution_processor_gap_scan_duration_seconds",
Help: "Duration of gap detection scans",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 12),
}, []string{"network", "processor"})

GapsFound = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "execution_processor_gaps_found",
Help: "Current count of gaps found in last scan",
}, []string{"network", "processor", "gap_type"})
)
169 changes: 136 additions & 33 deletions pkg/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ type Manager struct {
processors map[string]tracker.BlockProcessor

// Redis/Asynq for distributed processing
redisClient *r.Client
redisPrefix string
asynqClient *asynq.Client
asynqServer *asynq.Server
redisClient *r.Client
redisPrefix string
asynqClient *asynq.Client
asynqServer *asynq.Server
asynqInspector *asynq.Inspector

network *ethereum.Network

Expand Down Expand Up @@ -121,6 +122,9 @@ func NewManager(log logrus.FieldLogger, config *Config, pool *ethereum.Pool, sta
// Initialize Asynq client with its own Redis connection
asynqClient := asynq.NewClient(asynqRedisOpt)

// Initialize Asynq inspector for task management (deletion before reprocessing)
asynqInspector := asynq.NewInspector(asynqRedisOpt)

var asynqServer *asynq.Server
// Setup queue priorities dynamically based on processors
// This will be populated after processors are initialized
Expand All @@ -145,6 +149,7 @@ func NewManager(log logrus.FieldLogger, config *Config, pool *ethereum.Pool, sta
redisPrefix: redisPrefix,
asynqClient: asynqClient,
asynqServer: asynqServer,
asynqInspector: asynqInspector,
stopChan: make(chan struct{}),
blockProcessStop: make(chan struct{}),
queueHighWaterMarks: make(map[string]int),
Expand Down Expand Up @@ -369,6 +374,13 @@ func (m *Manager) Stop(ctx context.Context) error {
}
}

// Close Asynq inspector
if m.asynqInspector != nil {
if err := m.asynqInspector.Close(); err != nil {
m.log.WithError(err).Error("Failed to close Asynq inspector")
}
}

// Wait for all goroutines to complete
m.wg.Wait()
m.log.Info("All goroutines stopped")
Expand All @@ -384,13 +396,14 @@ func (m *Manager) initializeProcessors(ctx context.Context) error {
m.log.Debug("Transaction structlog processor is enabled, initializing...")

processor, err := transaction_structlog.New(&transaction_structlog.Dependencies{
Log: m.log.WithField("processor", "transaction_structlog"),
Pool: m.pool,
State: m.state,
AsynqClient: m.asynqClient,
RedisClient: m.redisClient,
Network: m.network,
RedisPrefix: m.redisPrefix,
Log: m.log.WithField("processor", "transaction_structlog"),
Pool: m.pool,
State: m.state,
AsynqClient: m.asynqClient,
AsynqInspector: m.asynqInspector,
RedisClient: m.redisClient,
Network: m.network,
RedisPrefix: m.redisPrefix,
}, &m.config.TransactionStructlog)
if err != nil {
return fmt.Errorf("failed to create transaction_structlog processor: %w", err)
Expand All @@ -415,13 +428,14 @@ func (m *Manager) initializeProcessors(ctx context.Context) error {
m.log.Debug("Transaction simple processor is enabled, initializing...")

processor, err := transaction_simple.New(&transaction_simple.Dependencies{
Log: m.log.WithField("processor", "transaction_simple"),
Pool: m.pool,
State: m.state,
AsynqClient: m.asynqClient,
RedisClient: m.redisClient,
Network: m.network,
RedisPrefix: m.redisPrefix,
Log: m.log.WithField("processor", "transaction_simple"),
Pool: m.pool,
State: m.state,
AsynqClient: m.asynqClient,
AsynqInspector: m.asynqInspector,
RedisClient: m.redisClient,
Network: m.network,
RedisPrefix: m.redisPrefix,
}, &m.config.TransactionSimple)
if err != nil {
return fmt.Errorf("failed to create transaction_simple processor: %w", err)
Expand All @@ -446,13 +460,14 @@ func (m *Manager) initializeProcessors(ctx context.Context) error {
m.log.Debug("Transaction structlog_agg processor is enabled, initializing...")

processor, err := transaction_structlog_agg.New(&transaction_structlog_agg.Dependencies{
Log: m.log.WithField("processor", "transaction_structlog_agg"),
Pool: m.pool,
State: m.state,
AsynqClient: m.asynqClient,
RedisClient: m.redisClient,
Network: m.network,
RedisPrefix: m.redisPrefix,
Log: m.log.WithField("processor", "transaction_structlog_agg"),
Pool: m.pool,
State: m.state,
AsynqClient: m.asynqClient,
AsynqInspector: m.asynqInspector,
RedisClient: m.redisClient,
Network: m.network,
RedisPrefix: m.redisPrefix,
}, &m.config.TransactionStructlogAgg)
if err != nil {
return fmt.Errorf("failed to create transaction_structlog_agg processor: %w", err)
Expand Down Expand Up @@ -1417,6 +1432,9 @@ func (m *Manager) checkStaleBlocks(ctx context.Context) {
}

// checkGaps checks for gaps in block processing across all processors and triggers reprocessing.
// It detects two types of gaps:
// - Incomplete: blocks with a row in DB but complete=0 (use ReprocessBlock)
// - Missing: blocks with no row at all (use ProcessBlock)
func (m *Manager) checkGaps(ctx context.Context) {
if !m.config.GapDetection.Enabled {
return
Expand Down Expand Up @@ -1464,7 +1482,7 @@ func (m *Manager) checkGaps(ctx context.Context) {
continue
}

gaps, gapErr := limiter.GetGaps(
gapResult, gapErr := limiter.GetGaps(
ctx,
currentBlock,
m.config.GapDetection.LookbackRange,
Expand All @@ -1476,22 +1494,107 @@ func (m *Manager) checkGaps(ctx context.Context) {
continue
}

if len(gaps) == 0 {
// Record scan duration metric
common.GapScanDuration.WithLabelValues(m.network.Name, processorName).Observe(gapResult.ScanDuration.Seconds())

// Record gap counts
common.GapsFound.WithLabelValues(m.network.Name, processorName, "incomplete").Set(float64(len(gapResult.Incomplete)))
common.GapsFound.WithLabelValues(m.network.Name, processorName, "missing").Set(float64(len(gapResult.Missing)))

if len(gapResult.Incomplete) > 0 {
common.GapsDetected.WithLabelValues(m.network.Name, processorName, "incomplete").Add(float64(len(gapResult.Incomplete)))
}

if len(gapResult.Missing) > 0 {
common.GapsDetected.WithLabelValues(m.network.Name, processorName, "missing").Add(float64(len(gapResult.Missing)))
}

totalGaps := len(gapResult.Incomplete) + len(gapResult.Missing)
if totalGaps == 0 {
continue
}

m.log.WithFields(logrus.Fields{
"processor": processorName,
"gap_count": len(gaps),
"current_block": currentBlock,
}).Info("Detected gaps, reprocessing")
"processor": processorName,
"incomplete_count": len(gapResult.Incomplete),
"missing_count": len(gapResult.Missing),
"current_block": currentBlock,
}).Info("Detected gaps, processing")

// Handle INCOMPLETE blocks -> ReprocessBlock (row exists, just stuck)
for _, gapBlock := range gapResult.Incomplete {
// Skip if block already has active Redis tracking (being processed)
hasTracking, trackErr := processor.GetCompletionTracker().HasBlockTracking(
ctx, gapBlock, m.network.Name, processorName, m.config.Mode)
if trackErr != nil {
m.log.WithError(trackErr).WithField("block", gapBlock).Debug("Failed to check block tracking")
}

if hasTracking {
m.log.WithFields(logrus.Fields{
"processor": processorName,
"block": gapBlock,
}).Debug("Skipping incomplete block - already has active Redis tracking")

continue
}

// Re-verify block is still incomplete in ClickHouse.
// This closes a race window where a block completes between
// the gap scan query and now (Redis tracking cleaned up).
isComplete, completeErr := m.state.IsBlockComplete(ctx, gapBlock, m.network.Name, processorName)
if completeErr != nil {
m.log.WithError(completeErr).WithField("block", gapBlock).Debug("Failed to verify block complete status")
}

if isComplete {
m.log.WithFields(logrus.Fields{
"processor": processorName,
"block": gapBlock,
}).Debug("Skipping incomplete block - completed during gap scan (race avoided)")

continue
}

for _, gapBlock := range gaps {
if reprocessErr := processor.ReprocessBlock(ctx, gapBlock); reprocessErr != nil {
m.log.WithError(reprocessErr).WithFields(logrus.Fields{
"processor": processorName,
"block": gapBlock,
}).Warn("Failed to reprocess gap block")
"gap_type": "incomplete",
}).Warn("Failed to reprocess incomplete block")

common.GapsReprocessed.WithLabelValues(m.network.Name, processorName, "incomplete", "error").Inc()
} else {
common.GapsReprocessed.WithLabelValues(m.network.Name, processorName, "incomplete", "success").Inc()
}
}

// Handle MISSING blocks -> ProcessBlock (no row, needs full processing)
for _, gapBlock := range gapResult.Missing {
// Fetch the block first
block, fetchErr := node.BlockByNumber(ctx, new(big.Int).SetUint64(gapBlock))
if fetchErr != nil {
m.log.WithError(fetchErr).WithFields(logrus.Fields{
"processor": processorName,
"block": gapBlock,
"gap_type": "missing",
}).Warn("Failed to fetch missing block")

common.GapsReprocessed.WithLabelValues(m.network.Name, processorName, "missing", "error").Inc()

continue
}

if processErr := processor.ProcessBlock(ctx, block); processErr != nil {
m.log.WithError(processErr).WithFields(logrus.Fields{
"processor": processorName,
"block": gapBlock,
"gap_type": "missing",
}).Warn("Failed to process missing block")

common.GapsReprocessed.WithLabelValues(m.network.Name, processorName, "missing", "error").Inc()
} else {
common.GapsReprocessed.WithLabelValues(m.network.Name, processorName, "missing", "success").Inc()
}
}
}
Expand Down
62 changes: 62 additions & 0 deletions pkg/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,68 @@ func TestManager_LeaderElectionDisabled(t *testing.T) {

// Race condition tests

// TestGapDetectionRaceConditionLogic tests the race condition where a block
// completes between gap scan and reprocess decision.
//
// The race occurs when:
// 1. Gap detection queries ClickHouse - returns block 100 as incomplete
// 2. Block 100 completes (MarkBlockComplete writes complete=1, cleans Redis)
// 3. Gap detection checks Redis - HasBlockTracking returns false
// 4. Gap detection calls ReprocessBlock - WRONG! Block is already complete
//
// Current (buggy) logic in checkGaps():
//
// if !hasTracking { ReprocessBlock() } // BUG: doesn't check if complete
//
// Fixed logic:
//
// if !hasTracking && !isComplete { ReprocessBlock() }
func TestGapDetectionRaceConditionLogic(t *testing.T) {
tests := []struct {
name string
hasTracking bool // Redis tracking exists
isComplete bool // Block is complete in ClickHouse
shouldReprocess bool // Expected decision
}{
{
name: "has tracking - skip (being processed)",
hasTracking: true,
isComplete: false,
shouldReprocess: false,
},
{
name: "no tracking, incomplete - reprocess (orphaned)",
hasTracking: false,
isComplete: false,
shouldReprocess: true,
},
{
name: "no tracking, complete - skip (race condition)",
hasTracking: false,
isComplete: true,
shouldReprocess: false, // THIS CASE IS THE BUG - currently would reprocess!
},
{
name: "has tracking, complete - skip (already being processed)",
hasTracking: true,
isComplete: true,
shouldReprocess: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// FIXED logic (now in checkGaps):
// Only reprocess if no Redis tracking AND not complete in ClickHouse
shouldReprocess := !tt.hasTracking && !tt.isComplete

assert.Equal(t, tt.shouldReprocess, shouldReprocess,
"block with hasTracking=%v, isComplete=%v should reprocess=%v, but got %v",
tt.hasTracking, tt.isComplete, tt.shouldReprocess, shouldReprocess)
})
}
}

// TestManager_RaceConditions specifically tests for race conditions in manager.
func TestManager_RaceConditions(t *testing.T) {
log := logrus.New()
Expand Down
Loading