Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fix-ensindexer-hot-reload.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"ensindexer": patch
"@ensnode/ponder-sdk": patch
---

ENSIndexer in dev mode no longer crashes during hot reloading due to EnsDbWriterWorker failure.
Comment thread
shrugs marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ describe("EnsDbWriterWorker", () => {
);

// cleanup
worker.stop();
await worker.stop();
});

it("throws when stored config is incompatible", async () => {
Expand Down Expand Up @@ -129,7 +129,7 @@ describe("EnsDbWriterWorker", () => {
expect(ensDbClient.upsertEnsIndexerPublicConfig).toHaveBeenCalledWith(mockPublicConfig);

// cleanup
worker.stop();
await worker.stop();
});

it("throws error when worker is already running", async () => {
Expand All @@ -143,7 +143,7 @@ describe("EnsDbWriterWorker", () => {
await expect(worker.run()).rejects.toThrow("EnsDbWriterWorker is already running");

// cleanup
worker.stop();
await worker.stop();
});

it("throws error when config fetch fails", async () => {
Expand Down Expand Up @@ -193,7 +193,7 @@ describe("EnsDbWriterWorker", () => {
expect(publicConfigBuilder.getPublicConfig).toHaveBeenCalledTimes(1);

// cleanup
worker.stop();
await worker.stop();
});

it("calls pRetry for config fetch with retry logic", async () => {
Expand All @@ -213,7 +213,7 @@ describe("EnsDbWriterWorker", () => {
expect(ensDbClient.upsertEnsIndexerPublicConfig).toHaveBeenCalledWith(mockPublicConfig);

// cleanup
worker.stop();
await worker.stop();
});
});

Expand All @@ -230,7 +230,7 @@ describe("EnsDbWriterWorker", () => {

const callCountBeforeStop = upsertIndexingStatusSnapshot.mock.calls.length;

worker.stop();
await worker.stop();

// advance time after stop
await vi.advanceTimersByTimeAsync(2000);
Expand All @@ -255,7 +255,7 @@ describe("EnsDbWriterWorker", () => {
expect(worker.isRunning).toBe(true);

// act - stop worker
worker.stop();
await worker.stop();

// assert - not running after stop
expect(worker.isRunning).toBe(false);
Expand Down Expand Up @@ -303,7 +303,7 @@ describe("EnsDbWriterWorker", () => {
expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledWith(crossChainSnapshot);

// cleanup
worker.stop();
await worker.stop();
});

it("recovers from errors and continues upserting snapshots", async () => {
Expand Down Expand Up @@ -361,7 +361,72 @@ describe("EnsDbWriterWorker", () => {
expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(3);

// cleanup
worker.stop();
await worker.stop();
});
});

describe("cancellation - stopRequested and skip-overlap", () => {
it("stop() during run() startup causes run() to reject with AbortError", async () => {
// arrange — make upsertEnsDbVersion block until we release it
let resolveUpsert!: () => void;
const upsertPromise = new Promise<void>((resolve) => {
resolveUpsert = resolve;
});
const ensDbClient = createMockEnsDbWriter({
upsertEnsDbVersion: vi.fn().mockReturnValue(upsertPromise),
});
const worker = createMockEnsDbWriterWorker({ ensDbClient });

// act — start run, then stop while it's blocked on upsertEnsDbVersion
const runPromise = worker.run();
await worker.stop();
resolveUpsert();

// assert — run rejects with AbortError
await expect(runPromise).rejects.toThrow("Worker stop requested");

// the interval was never armed
expect(worker.isRunning).toBe(false);
});

it("skips overlapping snapshot ticks when a prior upsert is still in flight", async () => {
// arrange — make upsertIndexingStatusSnapshot block until released
let resolveSnapshot!: () => void;
const snapshotPromise = new Promise<void>((resolve) => {
resolveSnapshot = resolve;
});
const omnichainSnapshot = createMockOmnichainSnapshot();
const crossChainSnapshot = createMockCrossChainSnapshot({ omnichainSnapshot });
vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain).mockReturnValue(crossChainSnapshot);

const ensDbClient = createMockEnsDbWriter({
upsertIndexingStatusSnapshot: vi.fn().mockReturnValueOnce(snapshotPromise),
});
const worker = createMockEnsDbWriterWorker({
ensDbClient,
indexingStatusBuilder: createMockIndexingStatusBuilder(omnichainSnapshot),
});

await worker.run();

// act — first tick starts the slow upsert
await vi.advanceTimersByTimeAsync(1000);
expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(1);

// second + third ticks should be skipped (still in flight)
await vi.advanceTimersByTimeAsync(2000);
expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(1);

// release the slow upsert
resolveSnapshot();
await vi.advanceTimersByTimeAsync(0);

// next tick should fire again
await vi.advanceTimersByTimeAsync(1000);
expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(2);

// cleanup
await worker.stop();
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ export class EnsDbWriterWorker {
*/
private indexingStatusInterval: ReturnType<typeof setInterval> | null = null;

/**
* Tracks the most recently launched snapshot upsert so that {@link stop}
* can wait for any in-flight work to settle before returning.
*/
private inFlightSnapshot: Promise<unknown> | undefined;

Comment thread
shrugs marked this conversation as resolved.
/**
* Set by {@link stop} to signal an in-progress {@link run} that it should
* bail before scheduling the recurring interval. Required because the
* external `signal` parameter to `run` may not be aborted in every
* cancellation path (e.g. a defensive caller-driven `stop()` in singleton
* cleanup).
*/
private stopRequested = false;

/**
* ENSDb Client instance used by the worker to interact with ENSDb.
*/
Expand Down Expand Up @@ -87,22 +102,34 @@ export class EnsDbWriterWorker {
* 3) A recurring attempt to upsert serialized representation of
* {@link CrossChainIndexingStatusSnapshot} into ENSDb.
*
* @param signal Optional AbortSignal that, if aborted, causes `run` to bail
* between its async setup steps. Use this to prevent the worker
* from finishing initialization (and starting the recurring
* interval) after the surrounding API instance has begun
* shutting down.
* @throws Error if the worker is already running, or
* if the in-memory ENSIndexer Public Config could not be fetched, or
* if the in-memory ENSIndexer Public Config is incompatible with the stored config in ENSDb.
* @throws DOMException with `name === "AbortError"` if `signal` is aborted
* or if {@link stop} is called before the recurring interval is scheduled.
*/
public async run(): Promise<void> {
public async run(signal?: AbortSignal): Promise<void> {
// Do not allow multiple concurrent runs of the worker
if (this.isRunning) {
throw new Error("EnsDbWriterWorker is already running");
}

this.stopRequested = false;
this.checkCancellation(signal);

// Fetch data required for task 1 and task 2.
const inMemoryConfig = await this.getValidatedEnsIndexerPublicConfig();
this.checkCancellation(signal);

// Task 1: upsert ENSDb version into ENSDb.
logger.debug({ msg: "Upserting ENSDb version", module: "EnsDbWriterWorker" });
await this.ensDbClient.upsertEnsDbVersion(inMemoryConfig.versionInfo.ensDb);
this.checkCancellation(signal);
logger.info({
msg: "Upserted ENSDb version",
ensDbVersion: inMemoryConfig.versionInfo.ensDb,
Expand All @@ -115,16 +142,22 @@ export class EnsDbWriterWorker {
module: "EnsDbWriterWorker",
});
await this.ensDbClient.upsertEnsIndexerPublicConfig(inMemoryConfig);
this.checkCancellation(signal);
logger.info({
msg: "Upserted ENSIndexer public config",
module: "EnsDbWriterWorker",
});

// Task 3: recurring upsert of Indexing Status Snapshot into ENSDb.
this.indexingStatusInterval = setInterval(
() => this.upsertIndexingStatusSnapshot(),
secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL),
);
// Skip overlapping ticks so a slow upsert can't pile up concurrent
// ENSDb writes. With skip-overlap there is at most one in-flight
// upsert at a time, which `stop()` then has a single promise to await.
this.indexingStatusInterval = setInterval(() => {
if (this.inFlightSnapshot) return;
this.inFlightSnapshot = this.upsertIndexingStatusSnapshot().finally(() => {
this.inFlightSnapshot = undefined;
});
}, secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL));
Comment thread
shrugs marked this conversation as resolved.
Comment thread
shrugs marked this conversation as resolved.
}

/**
Expand All @@ -137,13 +170,36 @@ export class EnsDbWriterWorker {
/**
* Stop the ENSDb Writer Worker
*
* Stops all recurring tasks in the worker.
* Cancels any in-progress {@link run} startup, stops all recurring tasks,
* and waits for any in-flight snapshot upsert to settle. Safe to call when
* not running.
*/
public stop(): void {
public async stop(): Promise<void> {
this.stopRequested = true;
if (this.indexingStatusInterval) {
clearInterval(this.indexingStatusInterval);
this.indexingStatusInterval = null;
}
if (this.inFlightSnapshot) {
// Errors are already logged inside upsertIndexingStatusSnapshot; swallow here.
await this.inFlightSnapshot.catch(() => {});
this.inFlightSnapshot = undefined;
Comment thread
shrugs marked this conversation as resolved.
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/**
* Throw an `AbortError` if cancellation has been requested either via the
* caller's `AbortSignal` or via an internal {@link stop} call.
*/
private checkCancellation(signal?: AbortSignal): void {
signal?.throwIfAborted();
if (this.stopRequested) {
// Match what `AbortSignal.throwIfAborted()` throws — a DOMException
// with `name === "AbortError"` — so callers' `isAbortError` checks
// treat both abort sources (external signal and internal stop)
// identically.
throw new DOMException("Worker stop requested", "AbortError");
}
}

/**
Expand Down
Loading
Loading