Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-batch-queue-processing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/redis-worker": patch
---

Fix slow batch queue processing by removing spurious cooloff on concurrency blocks and fixing a race condition where retry attempt counts were not atomically updated during message re-queue.
2 changes: 1 addition & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ const EnvironmentSchema = z
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100),
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),
BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(1),
BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(5),

REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000),
Expand Down
4 changes: 1 addition & 3 deletions internal-packages/run-engine/src/batch-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ export class BatchQueue {
visibilityTimeoutMs: 60_000, // 1 minute for batch item processing
startConsumers: false, // We control when to start
cooloff: {
enabled: true,
threshold: 5,
periodMs: 5_000,
enabled: false,
},
// Worker queue configuration - FairQueue routes all messages to our single worker queue
workerQueue: {
Expand Down
16 changes: 9 additions & 7 deletions packages/redis-worker/src/fair-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
if (this.concurrencyManager) {
const availableCapacity = await this.concurrencyManager.getAvailableCapacity(descriptor);
if (availableCapacity === 0) {
// Queue at max concurrency, back off to avoid repeated attempts
this.#incrementCooloff(queueId);
// Queue at max concurrency - don't increment cooloff here.
// The outer loop already handles this case (concurrency blocked)
// and explicitly avoids cooloff for it. Cooloff here causes
// spurious 5s stalls when capacity races between the tenant
// pre-check and this per-queue check.
return 0;
}
maxClaimCount = Math.min(maxClaimCount, availableCapacity);
Expand Down Expand Up @@ -1228,19 +1231,18 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
attempt: storedMessage.attempt + 1,
};

// Release with delay (and ensure queue is in master queue)
// Release with delay, passing the updated message data so the Lua script
// atomically writes the incremented attempt count when re-queuing.
await this.visibilityManager.release(
storedMessage.id,
queueId,
queueKey,
queueItemsKey,
masterQueueKey,
Date.now() + nextDelay
Date.now() + nextDelay,
JSON.stringify(updatedMessage)
);

// Update message in items hash with new attempt count
await this.redis.hset(queueItemsKey, storedMessage.id, JSON.stringify(updatedMessage));

// Release concurrency
if (this.concurrencyManager) {
await this.concurrencyManager.release(descriptor, storedMessage.id);
Expand Down
100 changes: 100 additions & 0 deletions packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1182,4 +1182,104 @@ describe("FairQueue", () => {
}
);
});

describe("concurrency block should not trigger cooloff", () => {
redisTest(
"should not enter cooloff when queue hits concurrency limit",
{ timeout: 15000 },
async ({ redisOptions }) => {
const processed: string[] = [];
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });

const scheduler = new DRRScheduler({
redis: redisOptions,
keys,
quantum: 10,
maxDeficit: 100,
});

const queue = new TestFairQueueHelper(redisOptions, keys, {
scheduler,
payloadSchema: TestPayloadSchema,
shardCount: 1,
consumerCount: 1,
consumerIntervalMs: 20,
visibilityTimeoutMs: 5000,
cooloff: {
periodMs: 5000, // Long cooloff - if triggered, messages would stall
threshold: 1, // Enter cooloff after just 1 increment
},
concurrencyGroups: [
{
name: "tenant",
extractGroupId: (q) => q.tenantId,
getLimit: async () => 1, // Only 1 concurrent per tenant
defaultLimit: 1,
},
],
startConsumers: false,
});

// Hold first message to keep concurrency slot occupied
let releaseFirst: (() => void) | undefined;
const firstBlocking = new Promise<void>((resolve) => {
releaseFirst = resolve;
});
let firstStarted = false;

queue.onMessage(async (ctx) => {
if (ctx.message.payload.value === "msg-0") {
firstStarted = true;
// Block this message to saturate concurrency
await firstBlocking;
}
processed.push(ctx.message.payload.value);
await ctx.complete();
});

// Enqueue 3 messages to same tenant
for (let i = 0; i < 3; i++) {
await queue.enqueue({
queueId: "tenant:t1:queue:q1",
tenantId: "t1",
payload: { value: `msg-${i}` },
});
}

queue.start();

// Wait for first message to start processing (blocking the concurrency slot)
await vi.waitFor(
() => {
expect(firstStarted).toBe(true);
},
{ timeout: 5000 }
);

// Release the first message so others can proceed
releaseFirst!();

// All 3 messages should process within a reasonable time.
// If cooloff was incorrectly triggered, this would take 5+ seconds.
const startTime = Date.now();
await vi.waitFor(
() => {
expect(processed).toHaveLength(3);
},
{ timeout: 5000 }
);
const elapsed = Date.now() - startTime;

// Should complete well under the 5s cooloff period
expect(elapsed).toBeLessThan(3000);

// Cooloff states should be empty (no spurious cooloffs)
const cacheSizes = queue.fairQueue.getCacheSizes();
expect(cacheSizes.cooloffStatesSize).toBe(0);

await queue.close();
}
);
});

});
21 changes: 16 additions & 5 deletions packages/redis-worker/src/fair-queue/visibility.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ export class VisibilityManager {
queueKey: string,
queueItemsKey: string,
masterQueueKey: string,
score?: number
score?: number,
updatedData?: string
): Promise<void> {
const shardId = this.#getShardForQueue(queueId);
const inflightKey = this.keys.inflightKey(shardId);
Expand All @@ -293,7 +294,7 @@ export class VisibilityManager {
const messageScore = score ?? Date.now();

// Use Lua script to atomically:
// 1. Get message data from in-flight
// 1. Get message data from in-flight (or use updatedData if provided)
// 2. Remove from in-flight
// 3. Add back to queue
// 4. Update master queue to ensure queue is picked up
Expand All @@ -306,7 +307,8 @@ export class VisibilityManager {
member,
messageId,
messageScore.toString(),
queueId
queueId,
updatedData ?? ""
);

this.logger.debug("Message released", {
Expand Down Expand Up @@ -434,7 +436,8 @@ export class VisibilityManager {
member,
messageId,
score.toString(),
queueId
queueId,
""
);

// Track reclaimed message for concurrency release
Expand Down Expand Up @@ -680,6 +683,7 @@ local member = ARGV[1]
local messageId = ARGV[2]
local score = tonumber(ARGV[3])
local queueId = ARGV[4]
local updatedData = ARGV[5]

-- Get message data from in-flight
local payload = redis.call('HGET', inflightDataKey, messageId)
Expand All @@ -688,6 +692,12 @@ if not payload then
return 0
end

-- Use updatedData if provided (e.g. incremented attempt count for retries),
-- otherwise use the original in-flight data
if updatedData and updatedData ~= "" then
payload = updatedData
end

-- Remove from in-flight
redis.call('ZREM', inflightKey, member)
redis.call('HDEL', inflightDataKey, messageId)
Expand Down Expand Up @@ -816,7 +826,8 @@ declare module "@internal/redis" {
member: string,
messageId: string,
score: string,
queueId: string
queueId: string,
updatedData: string
): Promise<number>;

releaseMessageBatch(
Expand Down