diff --git a/.changeset/fix-ensindexer-hot-reload.md b/.changeset/fix-ensindexer-hot-reload.md new file mode 100644 index 000000000..0d7354b2d --- /dev/null +++ b/.changeset/fix-ensindexer-hot-reload.md @@ -0,0 +1,6 @@ +--- +"ensindexer": patch +"@ensnode/ponder-sdk": patch +--- + +ENSIndexer in dev mode no longer crashes during hot reloading due to EnsDbWriterWorker failure. diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts index ba0f0bee5..da56acf23 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.test.ts @@ -78,7 +78,7 @@ describe("EnsDbWriterWorker", () => { ); // cleanup - worker.stop(); + await worker.stop(); }); it("throws when stored config is incompatible", async () => { @@ -129,7 +129,7 @@ describe("EnsDbWriterWorker", () => { expect(ensDbClient.upsertEnsIndexerPublicConfig).toHaveBeenCalledWith(mockPublicConfig); // cleanup - worker.stop(); + await worker.stop(); }); it("throws error when worker is already running", async () => { @@ -143,7 +143,7 @@ describe("EnsDbWriterWorker", () => { await expect(worker.run()).rejects.toThrow("EnsDbWriterWorker is already running"); // cleanup - worker.stop(); + await worker.stop(); }); it("throws error when config fetch fails", async () => { @@ -193,7 +193,7 @@ describe("EnsDbWriterWorker", () => { expect(publicConfigBuilder.getPublicConfig).toHaveBeenCalledTimes(1); // cleanup - worker.stop(); + await worker.stop(); }); it("calls pRetry for config fetch with retry logic", async () => { @@ -213,7 +213,7 @@ describe("EnsDbWriterWorker", () => { expect(ensDbClient.upsertEnsIndexerPublicConfig).toHaveBeenCalledWith(mockPublicConfig); // cleanup - worker.stop(); + await worker.stop(); }); }); @@ -230,7 +230,7 @@ describe("EnsDbWriterWorker", () => { const callCountBeforeStop = upsertIndexingStatusSnapshot.mock.calls.length; - worker.stop(); + await worker.stop(); // advance time after stop await vi.advanceTimersByTimeAsync(2000); @@ -255,7 +255,7 @@ describe("EnsDbWriterWorker", () => { expect(worker.isRunning).toBe(true); // act - stop worker - worker.stop(); + await worker.stop(); // assert - not running after stop expect(worker.isRunning).toBe(false); @@ -303,7 +303,7 @@ describe("EnsDbWriterWorker", () => { expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledWith(crossChainSnapshot); // cleanup - worker.stop(); + await worker.stop(); }); it("recovers from errors and continues upserting snapshots", async () => { @@ -361,7 +361,72 @@ describe("EnsDbWriterWorker", () => { expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(3); // cleanup - worker.stop(); + await worker.stop(); + }); + }); + + describe("cancellation - stopRequested and skip-overlap", () => { + it("stop() during run() startup causes run() to reject with AbortError", async () => { + // arrange — make upsertEnsDbVersion block until we release it + let resolveUpsert!: () => void; + const upsertPromise = new Promise((resolve) => { + resolveUpsert = resolve; + }); + const ensDbClient = createMockEnsDbWriter({ + upsertEnsDbVersion: vi.fn().mockReturnValue(upsertPromise), + }); + const worker = createMockEnsDbWriterWorker({ ensDbClient }); + + // act — start run, then stop while it's blocked on upsertEnsDbVersion + const runPromise = worker.run(); + await worker.stop(); + resolveUpsert(); + + // assert — run rejects with AbortError + await expect(runPromise).rejects.toThrow("Worker stop requested"); + + // the interval was never armed + expect(worker.isRunning).toBe(false); + }); + + it("skips overlapping snapshot ticks when a prior upsert is still in flight", async () => { + // arrange — make upsertIndexingStatusSnapshot block until released + let resolveSnapshot!: () => void; + const snapshotPromise = new Promise((resolve) => { + resolveSnapshot = resolve; + }); + const omnichainSnapshot = createMockOmnichainSnapshot(); + const crossChainSnapshot = createMockCrossChainSnapshot({ omnichainSnapshot }); + vi.mocked(buildCrossChainIndexingStatusSnapshotOmnichain).mockReturnValue(crossChainSnapshot); + + const ensDbClient = createMockEnsDbWriter({ + upsertIndexingStatusSnapshot: vi.fn().mockReturnValueOnce(snapshotPromise), + }); + const worker = createMockEnsDbWriterWorker({ + ensDbClient, + indexingStatusBuilder: createMockIndexingStatusBuilder(omnichainSnapshot), + }); + + await worker.run(); + + // act — first tick starts the slow upsert + await vi.advanceTimersByTimeAsync(1000); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(1); + + // second + third ticks should be skipped (still in flight) + await vi.advanceTimersByTimeAsync(2000); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(1); + + // release the slow upsert + resolveSnapshot(); + await vi.advanceTimersByTimeAsync(0); + + // next tick should fire again + await vi.advanceTimersByTimeAsync(1000); + expect(ensDbClient.upsertIndexingStatusSnapshot).toHaveBeenCalledTimes(2); + + // cleanup + await worker.stop(); }); }); }); diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts index 38aa6d12d..f044bcaf0 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/ensdb-writer-worker.ts @@ -37,6 +37,21 @@ export class EnsDbWriterWorker { */ private indexingStatusInterval: ReturnType | null = null; + /** + * Tracks the most recently launched snapshot upsert so that {@link stop} + * can wait for any in-flight work to settle before returning. + */ + private inFlightSnapshot: Promise | undefined; + + /** + * Set by {@link stop} to signal an in-progress {@link run} that it should + * bail before scheduling the recurring interval. Required because the + * external `signal` parameter to `run` may not be aborted in every + * cancellation path (e.g. a defensive caller-driven `stop()` in singleton + * cleanup). + */ + private stopRequested = false; + /** * ENSDb Client instance used by the worker to interact with ENSDb. */ @@ -87,22 +102,34 @@ export class EnsDbWriterWorker { * 3) A recurring attempt to upsert serialized representation of * {@link CrossChainIndexingStatusSnapshot} into ENSDb. * + * @param signal Optional AbortSignal that, if aborted, causes `run` to bail + * between its async setup steps. Use this to prevent the worker + * from finishing initialization (and starting the recurring + * interval) after the surrounding API instance has begun + * shutting down. * @throws Error if the worker is already running, or * if the in-memory ENSIndexer Public Config could not be fetched, or * if the in-memory ENSIndexer Public Config is incompatible with the stored config in ENSDb. + * @throws DOMException with `name === "AbortError"` if `signal` is aborted + * or if {@link stop} is called before the recurring interval is scheduled. */ - public async run(): Promise { + public async run(signal?: AbortSignal): Promise { // Do not allow multiple concurrent runs of the worker if (this.isRunning) { throw new Error("EnsDbWriterWorker is already running"); } + this.stopRequested = false; + this.checkCancellation(signal); + // Fetch data required for task 1 and task 2. const inMemoryConfig = await this.getValidatedEnsIndexerPublicConfig(); + this.checkCancellation(signal); // Task 1: upsert ENSDb version into ENSDb. logger.debug({ msg: "Upserting ENSDb version", module: "EnsDbWriterWorker" }); await this.ensDbClient.upsertEnsDbVersion(inMemoryConfig.versionInfo.ensDb); + this.checkCancellation(signal); logger.info({ msg: "Upserted ENSDb version", ensDbVersion: inMemoryConfig.versionInfo.ensDb, @@ -115,16 +142,22 @@ export class EnsDbWriterWorker { module: "EnsDbWriterWorker", }); await this.ensDbClient.upsertEnsIndexerPublicConfig(inMemoryConfig); + this.checkCancellation(signal); logger.info({ msg: "Upserted ENSIndexer public config", module: "EnsDbWriterWorker", }); // Task 3: recurring upsert of Indexing Status Snapshot into ENSDb. - this.indexingStatusInterval = setInterval( - () => this.upsertIndexingStatusSnapshot(), - secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL), - ); + // Skip overlapping ticks so a slow upsert can't pile up concurrent + // ENSDb writes. With skip-overlap there is at most one in-flight + // upsert at a time, which `stop()` then has a single promise to await. + this.indexingStatusInterval = setInterval(() => { + if (this.inFlightSnapshot) return; + this.inFlightSnapshot = this.upsertIndexingStatusSnapshot().finally(() => { + this.inFlightSnapshot = undefined; + }); + }, secondsToMilliseconds(INDEXING_STATUS_RECORD_UPDATE_INTERVAL)); } /** @@ -137,13 +170,36 @@ export class EnsDbWriterWorker { /** * Stop the ENSDb Writer Worker * - * Stops all recurring tasks in the worker. + * Cancels any in-progress {@link run} startup, stops all recurring tasks, + * and waits for any in-flight snapshot upsert to settle. Safe to call when + * not running. */ - public stop(): void { + public async stop(): Promise { + this.stopRequested = true; if (this.indexingStatusInterval) { clearInterval(this.indexingStatusInterval); this.indexingStatusInterval = null; } + if (this.inFlightSnapshot) { + // Errors are already logged inside upsertIndexingStatusSnapshot; swallow here. + await this.inFlightSnapshot.catch(() => {}); + this.inFlightSnapshot = undefined; + } + } + + /** + * Throw an `AbortError` if cancellation has been requested either via the + * caller's `AbortSignal` or via an internal {@link stop} call. + */ + private checkCancellation(signal?: AbortSignal): void { + signal?.throwIfAborted(); + if (this.stopRequested) { + // Match what `AbortSignal.throwIfAborted()` throws — a DOMException + // with `name === "AbortError"` — so callers' `isAbortError` checks + // treat both abort sources (external signal and internal stop) + // identically. + throw new DOMException("Worker stop requested", "AbortError"); + } } /** diff --git a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts index 22fd6a5e9..f9622207e 100644 --- a/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts +++ b/apps/ensindexer/src/lib/ensdb-writer-worker/singleton.ts @@ -1,48 +1,115 @@ import { ensDbClient } from "@/lib/ensdb/singleton"; import { indexingStatusBuilder } from "@/lib/indexing-status-builder/singleton"; import { localPonderClient } from "@/lib/local-ponder-client"; +import { getApiShutdown } from "@/lib/local-ponder-context"; import { logger } from "@/lib/logger"; import { publicConfigBuilder } from "@/lib/public-config-builder/singleton"; import { EnsDbWriterWorker } from "./ensdb-writer-worker"; -let ensDbWriterWorker: EnsDbWriterWorker; +let ensDbWriterWorker: EnsDbWriterWorker | undefined; + +function isAbortError(error: unknown): boolean { + // `fetch` aborts reject with a `DOMException` whose `name === "AbortError"`, + // which is not always `instanceof Error` across runtimes. Check by name. + return ( + typeof error === "object" && + error !== null && + (error as { name?: unknown }).name === "AbortError" + ); +} /** - * Starts the EnsDbWriterWorker in a new asynchronous context. + * Stop the given worker and, if it is still the active singleton, clear the + * singleton reference BEFORE awaiting shutdown. Clearing first ensures that + * if `stop()` causes an in-progress `run()` to reject with AbortError, the + * catch discriminator immediately sees `ensDbWriterWorker !== worker` and + * classifies it as an intentional stop rather than a fatal error. + * Safe to call multiple times. + */ +async function gracefulShutdown(worker: EnsDbWriterWorker, reason: string): Promise { + logger.info({ + msg: `Stopping EnsDbWriterWorker: ${reason}`, + module: "EnsDbWriterWorker", + }); + if (ensDbWriterWorker === worker) { + ensDbWriterWorker = undefined; + } + await worker.stop(); +} + +/** + * Start (or restart) the EnsDbWriterWorker. * - * The worker will run indefinitely until it is stopped via {@link EnsDbWriterWorker.stop}, - * for example in response to a process termination signal or an internal error, at - * which point it will attempt to gracefully shut down. + * Called from `apps/ensindexer/ponder/src/api/index.ts` on every Ponder + * API exec. Ponder re-executes the API entry file on hot reload, but this + * module is cached by vite-node, so module-level state survives across + * reloads. This function therefore must: * - * @throws Error if the worker is already running when this function is called. + * 1. Be idempotent — treat a re-call as "the previous instance is dead, + * replace it" rather than throwing. + * 2. Re-bind reload-scoped resources (e.g. `apiShutdown`) fresh from + * `localPonderContext` on every call. Never hoist them to module + * scope. See `local-ponder-context.ts` for the staleness contract. */ -export function startEnsDbWriterWorker() { - if (typeof ensDbWriterWorker !== "undefined") { - throw new Error("EnsDbWriterWorker has already been initialized"); +export async function startEnsDbWriterWorker(): Promise { + // Defensively reset any prior instance. The apiShutdown.add() callback + // from the previous API exec is the primary cleanup path on hot reload; + // this is a safety net for cases where the callback didn't run (e.g. + // unexpected shutdown ordering). + if (ensDbWriterWorker) { + await gracefulShutdown(ensDbWriterWorker, "stale instance from previous API exec"); } - ensDbWriterWorker = new EnsDbWriterWorker( + const worker = new EnsDbWriterWorker( ensDbClient, publicConfigBuilder, indexingStatusBuilder, localPonderClient, ); + ensDbWriterWorker = worker; + + // Read apiShutdown FRESH via getApiShutdown(). Ponder kills and + // replaces this on every dev-mode hot reload, so this read MUST happen + // inside the function call (not at module scope). + const apiShutdown = getApiShutdown(); + const abortSignal = apiShutdown.abortController.signal; - ensDbWriterWorker - .run() + apiShutdown.add(() => gracefulShutdown(worker, "API shutdown")); + + worker + .run(abortSignal) // Handle any uncaught errors from the worker - .catch((error) => { - // Abort the worker on error to trigger cleanup - ensDbWriterWorker.stop(); + .catch(async (error) => { + // Treat as a clean stop when the error is an AbortError AND the + // worker was intentionally stopped — either because Ponder aborted + // the captured shutdown signal, or because this worker has been + // superseded in the singleton (the defensive stale-instance + // cleanup path inside `startEnsDbWriterWorker` calls + // `worker.stop()` on the old worker, which causes its `run()` to + // throw AbortError without `abortSignal.aborted` necessarily + // being true). Requiring `isAbortError(error)` keeps unrelated + // failures from being silently swallowed. + const intentionallyStopped = abortSignal.aborted || ensDbWriterWorker !== worker; + if (intentionallyStopped && isAbortError(error)) { + await gracefulShutdown(worker, "API shutdown (run aborted)"); + return; + } + + // Real worker error — clean up and fail-fast. The worker is a startup + // invariant for the API layer; leaving the process half-alive (just + // setting `process.exitCode`) would let ensindexer keep serving with a + // dead writer. We can't rethrow because this `.catch()` is on a + // fire-and-forget promise, so a rethrow becomes an unhandled rejection + // instead of reaching a top-level handler — call `process.exit(1)` to + // terminate immediately. + await gracefulShutdown(worker, "uncaught error"); logger.error({ msg: "EnsDbWriterWorker encountered an error", error, }); - // Re-throw the error to ensure the application shuts down with a non-zero exit code. - process.exitCode = 1; - throw error; + process.exit(1); }); } diff --git a/apps/ensindexer/src/lib/local-ponder-client.ts b/apps/ensindexer/src/lib/local-ponder-client.ts index fcfe293a9..6cd153864 100644 --- a/apps/ensindexer/src/lib/local-ponder-client.ts +++ b/apps/ensindexer/src/lib/local-ponder-client.ts @@ -7,7 +7,7 @@ import { LocalPonderClient } from "@ensnode/ponder-sdk"; import { getPluginsAllDatasourceNames } from "@/lib/plugin-helpers"; -import { localPonderContext } from "./local-ponder-context"; +import { getApiShutdown, localPonderContext } from "./local-ponder-context"; const pluginsAllDatasourceNames = getPluginsAllDatasourceNames(config.plugins); const indexedBlockranges = buildIndexedBlockranges(config.namespace, pluginsAllDatasourceNames); @@ -17,4 +17,6 @@ export const localPonderClient = new LocalPonderClient( indexedBlockranges, publicClients, localPonderContext, + // See local-ponder-context.ts for the staleness contract. + () => getApiShutdown().abortController.signal, ); diff --git a/apps/ensindexer/src/lib/local-ponder-context.ts b/apps/ensindexer/src/lib/local-ponder-context.ts index ed66a4088..ce8b6d35a 100644 --- a/apps/ensindexer/src/lib/local-ponder-context.ts +++ b/apps/ensindexer/src/lib/local-ponder-context.ts @@ -1,14 +1,66 @@ -import { deserializePonderAppContext, type PonderAppContext } from "@ensnode/ponder-sdk"; +import { + deserializePonderAppContext, + isPonderAppShutdownManager, + type PonderAppShutdownManager, +} from "@ensnode/ponder-sdk"; + +/** + * Local Ponder Context — wrappers over Ponder's runtime globals. + * + * Stable vs reload-scoped fields: + * + * Ponder's dev mode hot-reloads the API entry file by re-executing it via + * vite-node. On every indexing-file change, Ponder ALSO kills and replaces + * `common.shutdown` and `common.apiShutdown` on `globalThis.PONDER_COMMON` + * (see `ponder/src/bin/commands/dev.ts:95-101`). Modules in our API-side + * dependency graph (this file included) are NOT re-evaluated when only an + * indexing file changes — vite-node only invalidates the changed file's + * dep tree. So any value cached in a module-level closure during the + * original boot becomes stale on the very next reload. + * + * Stable fields (`command`, `localPonderAppUrl`, `logger`) are eagerly + * deserialized once at module load — Ponder does not mutate `options` or + * `logger` on reload. Reload-scoped fields are exposed as FUNCTIONS + * ({@link getApiShutdown}, {@link getShutdown}) rather than properties on + * the context object so that every call site is forced to re-read fresh + * from `globalThis.PONDER_COMMON`. The function call form makes the + * staleness contract visible at the call site — a captured `const sig = + * getApiShutdown().abortController.signal` is obviously caching a + * function result, whereas a captured `localPonderContext.apiShutdown` + * would have looked like an innocent property access. + * + * Contract for callers: NEVER cache the return value of `getApiShutdown` + * or `getShutdown` (or any field on it) in a module-level closure or a + * constructor argument. If you need to attach a listener or read the + * signal across calls, store the GETTER function, not its return value. + */ if (!globalThis.PONDER_COMMON) { throw new Error("PONDER_COMMON must be defined by Ponder at runtime as a global variable."); } +export const localPonderContext = deserializePonderAppContext(globalThis.PONDER_COMMON); + +function readShutdownManager(field: "apiShutdown" | "shutdown"): PonderAppShutdownManager { + const raw = (globalThis.PONDER_COMMON as Record | undefined)?.[field]; + if (!isPonderAppShutdownManager(raw)) { + throw new Error(`globalThis.PONDER_COMMON.${field} is not a valid Ponder shutdown manager.`); + } + return raw; +} + /** - * Local Ponder app context - * - * Represents the {@link PonderAppContext} object provided by Ponder runtime to - * the local Ponder app. Useful for accessing internal Ponder app configuration - * and utilities such as the logger. + * Returns the current `apiShutdown` manager. RELOAD-SCOPED — Ponder + * replaces this on every API hot reload. Always call fresh; never cache. */ -export const localPonderContext = deserializePonderAppContext(globalThis.PONDER_COMMON); +export function getApiShutdown(): PonderAppShutdownManager { + return readShutdownManager("apiShutdown"); +} + +/** + * Returns the current `shutdown` manager. RELOAD-SCOPED — Ponder + * replaces this on every indexing hot reload. Always call fresh; never cache. + */ +export function getShutdown(): PonderAppShutdownManager { + return readShutdownManager("shutdown"); +} diff --git a/packages/ponder-sdk/src/client.test.ts b/packages/ponder-sdk/src/client.test.ts index 3b4bd73d4..043c243bb 100644 --- a/packages/ponder-sdk/src/client.test.ts +++ b/packages/ponder-sdk/src/client.test.ts @@ -322,4 +322,82 @@ describe("Ponder Client", () => { }); }); }); + + describe("getAbortSignal getter", () => { + it("invokes getAbortSignal on every fetch and forwards the signal", async () => { + // Arrange + mockFetch.mockResolvedValue(new Response(null, { status: 200 })); + const signal = new AbortController().signal; + const getAbortSignal = vi.fn(() => signal); + const ponderClient = new PonderClient(new URL("http://localhost:3000"), getAbortSignal); + + // Act + await ponderClient.health(); + await ponderClient.health(); + + // Assert + expect(getAbortSignal).toHaveBeenCalledTimes(2); + expect(mockFetch).toHaveBeenCalledTimes(2); + expect(mockFetch).toHaveBeenNthCalledWith(1, expect.any(URL), { signal }); + expect(mockFetch).toHaveBeenNthCalledWith(2, expect.any(URL), { signal }); + }); + + it("re-reads the signal between fetches so callers get fresh identity", async () => { + // Arrange + mockFetch.mockResolvedValue(new Response(null, { status: 200 })); + const firstSignal = new AbortController().signal; + const secondSignal = new AbortController().signal; + const getAbortSignal = vi + .fn<() => AbortSignal | undefined>() + .mockReturnValueOnce(firstSignal) + .mockReturnValueOnce(secondSignal); + const ponderClient = new PonderClient(new URL("http://localhost:3000"), getAbortSignal); + + // Act + await ponderClient.health(); + await ponderClient.health(); + + // Assert + expect(mockFetch).toHaveBeenNthCalledWith(1, expect.any(URL), { signal: firstSignal }); + expect(mockFetch).toHaveBeenNthCalledWith(2, expect.any(URL), { signal: secondSignal }); + }); + + it("aborting the signal cancels in-flight fetches", async () => { + // Arrange + const abortController = new AbortController(); + mockFetch.mockImplementation( + (_input, init) => + new Promise((_resolve, reject) => { + init?.signal?.addEventListener("abort", () => { + const error = new Error("aborted"); + error.name = "AbortError"; + reject(error); + }); + }), + ); + const ponderClient = new PonderClient( + new URL("http://localhost:3000"), + () => abortController.signal, + ); + + // Act + const pending = ponderClient.health(); + abortController.abort(); + + // Assert + await expect(pending).rejects.toThrowError(/aborted/); + }); + + it("treats getAbortSignal as optional (undefined signal → no abort)", async () => { + // Arrange + mockFetch.mockResolvedValueOnce(new Response(null, { status: 200 })); + const ponderClient = new PonderClient(new URL("http://localhost:3000")); + + // Act + await ponderClient.health(); + + // Assert + expect(mockFetch).toHaveBeenCalledWith(expect.any(URL), { signal: undefined }); + }); + }); }); diff --git a/packages/ponder-sdk/src/client.ts b/packages/ponder-sdk/src/client.ts index 84d740f75..6de4e30bb 100644 --- a/packages/ponder-sdk/src/client.ts +++ b/packages/ponder-sdk/src/client.ts @@ -3,11 +3,24 @@ import { deserializePonderIndexingStatus } from "./deserialize/indexing-status"; import type { PonderIndexingMetrics } from "./indexing-metrics"; import type { PonderIndexingStatus } from "./indexing-status"; +/** + * Returns the current `AbortSignal` to attach to outgoing requests. + * + * Consumers must use a getter (not a captured `AbortSignal`) when the + * underlying signal can change identity over time — e.g. signals derived + * from Ponder's `apiShutdown` manager, which Ponder kills and replaces on + * every dev-mode hot reload. + */ +export type AbortSignalGetter = () => AbortSignal | undefined; + /** * PonderClient for fetching data from Ponder apps. */ export class PonderClient { - constructor(private readonly baseUrl: URL) {} + constructor( + private readonly baseUrl: URL, + private readonly getAbortSignal?: AbortSignalGetter, + ) {} /** * Check Ponder Health @@ -18,7 +31,7 @@ export class PonderClient { */ async health(): Promise { const requestUrl = new URL("/health", this.baseUrl); - const response = await fetch(requestUrl); + const response = await fetch(requestUrl, { signal: this.getAbortSignal?.() }); if (!response.ok) { throw new Error( @@ -35,7 +48,7 @@ export class PonderClient { */ async metrics(): Promise { const requestUrl = new URL("/metrics", this.baseUrl); - const response = await fetch(requestUrl); + const response = await fetch(requestUrl, { signal: this.getAbortSignal?.() }); if (!response.ok) { throw new Error( @@ -56,7 +69,7 @@ export class PonderClient { */ async status(): Promise { const requestUrl = new URL("/status", this.baseUrl); - const response = await fetch(requestUrl); + const response = await fetch(requestUrl, { signal: this.getAbortSignal?.() }); if (!response.ok) { throw new Error( diff --git a/packages/ponder-sdk/src/local-ponder-client.ts b/packages/ponder-sdk/src/local-ponder-client.ts index 0e8e786c5..375509c67 100644 --- a/packages/ponder-sdk/src/local-ponder-client.ts +++ b/packages/ponder-sdk/src/local-ponder-client.ts @@ -1,7 +1,7 @@ import type { BlockNumberRangeWithStartBlock } from "./blockrange"; import type { CachedPublicClient } from "./cached-public-client"; import type { ChainId, ChainIdString } from "./chains"; -import { PonderClient } from "./client"; +import { type AbortSignalGetter, PonderClient } from "./client"; import { deserializeChainId } from "./deserialize/chains"; import { type ChainIndexingMetrics, @@ -82,14 +82,16 @@ export class LocalPonderClient extends PonderClient { * @param ponderPublicClients All cached public clients provided by the local Ponder app * (may include non-indexed chains). * @param ponderAppContext The internal context of the local Ponder app. + * @param getAbortSignal Optional {@link AbortSignalGetter} invoked at fetch time. See its docs for the staleness contract. */ constructor( indexedChainIds: Set, indexedBlockranges: Map, ponderPublicClients: Record, ponderAppContext: PonderAppContext, + getAbortSignal?: AbortSignalGetter, ) { - super(ponderAppContext.localPonderAppUrl); + super(ponderAppContext.localPonderAppUrl, getAbortSignal); this.indexedChainIds = indexedChainIds; diff --git a/packages/ponder-sdk/src/ponder-app-context.ts b/packages/ponder-sdk/src/ponder-app-context.ts index 4e560f1aa..51f3eb2a9 100644 --- a/packages/ponder-sdk/src/ponder-app-context.ts +++ b/packages/ponder-sdk/src/ponder-app-context.ts @@ -12,6 +12,35 @@ export const PonderAppCommands = { export type PonderAppCommand = (typeof PonderAppCommands)[keyof typeof PonderAppCommands]; +/** + * Ponder shutdown manager runtime shape. + * + * Mirrors `ponder/src/internal/shutdown.ts` — the object Ponder publishes + * on `globalThis.PONDER_COMMON.{shutdown,apiShutdown}`. Reload-scoped: + * Ponder kills and replaces these on every dev-mode hot reload, so + * consumers must always read the current instance fresh and never cache + * a captured reference. + */ +export interface PonderAppShutdownManager { + // Ponder awaits the callback's return value internally, so any value + // (sync or async) is accepted. Typed as `unknown` to keep callbacks + // that return `void` assignable without a Biome `noConfusingVoidType` + // violation. + add: (callback: () => unknown) => void; + isKilled: boolean; + abortController: AbortController; +} + +export function isPonderAppShutdownManager(value: unknown): value is PonderAppShutdownManager { + if (typeof value !== "object" || value === null) return false; + const obj = value as Record; + return ( + typeof obj.add === "function" && + typeof obj.isKilled === "boolean" && + obj.abortController instanceof AbortController + ); +} + /** * Ponder app context *