diff --git a/packages/api/README.md b/packages/api/README.md index 86dccecb..6b5f506a 100644 --- a/packages/api/README.md +++ b/packages/api/README.md @@ -96,6 +96,7 @@ Optional env: - `GET /federation/followers` - `GET /federation/following` - `GET /federation/liked` +- `GET /federation/exchange/status` (connection summary and recent exchange events) - `POST /federation/exchange/subscriptions` (discover remote actor, persist metadata, send signed `Follow`) - `GET /federation/exchange/subscriptions` - `POST /federation/exchange/poll` (manual remote outbox poll) @@ -131,10 +132,13 @@ Exchange targets must be explicit. Use `https://exchange.lefine.pro`, an actor U }' ./ctl request POST /federation/exchange/poll '{}' +./ctl request GET /federation/exchange/status ./ctl request GET /federation/exchange/subscriptions ./ctl request GET /federation/issues ``` +`GET /federation/exchange/status` is the live observability endpoint for a Lefine connection. It reports subscription counts, accepted/pending/rejected state, `lastInboxAt`, `lastPollAt`, persisted issue count, processed outbox items, and recent events such as `follow.sent`, `inbox.follow.accept`, `inbox.issue.received`, and `poll.completed`. + When a polled `Create(Ticket)` has no GitHub URL in the Ticket payload, `projectRepoUrl` or `DOCKER_GIT_EXCHANGE_PROJECT_REPO_URL` is required for the automatic docker-git project/agent run. 1. Read actor profile (contains `inbox/outbox/followers/following/liked`): diff --git a/packages/api/src/api/contracts.ts b/packages/api/src/api/contracts.ts index a21a5b72..a5b76218 100644 --- a/packages/api/src/api/contracts.ts +++ b/packages/api/src/api/contracts.ts @@ -662,6 +662,57 @@ export type ExchangePollResult = { readonly failedItems: number } +export type FederationExchangeEventKind = + | "follow.sent" + | "inbox.follow.accept" + | "inbox.follow.reject" + | "inbox.issue.received" + | "poll.completed" + +export type FederationExchangeEvent = { + readonly id: string + readonly kind: FederationExchangeEventKind + readonly occurredAt: string + readonly subscriptionId?: string | undefined + readonly target?: string | undefined + readonly queue?: string | undefined + readonly status?: FollowStatus | undefined + readonly issueId?: string | undefined + readonly remoteActor?: string | undefined + readonly totalItems?: number | undefined + readonly newItems?: number | undefined + readonly processedItems?: number | undefined + readonly failedItems?: number | undefined +} + +export type FederationExchangeStatusSubscription = { + readonly id: string + readonly target: string + readonly queue?: string | undefined + readonly status: FollowStatus + readonly remoteActor?: string | undefined + readonly remoteInbox?: string | undefined + readonly remoteOutbox?: string | undefined + readonly createdAt: string + readonly updatedAt: string +} + +export type FederationExchangeStatus = { + readonly publicActor: string + readonly summary: { + readonly subscriptions: number + readonly accepted: number + readonly pending: number + readonly rejected: number + readonly issues: number + readonly processedOutboxItems: number + readonly lastInboxAt?: string | undefined + readonly lastPollAt?: string | undefined + } + readonly subscriptions: ReadonlyArray + readonly recentEvents: ReadonlyArray +} + export type ApiEventType = | "snapshot" | "project.created" diff --git a/packages/api/src/http.ts b/packages/api/src/http.ts index cef4ce1f..d55e46e8 100644 --- a/packages/api/src/http.ts +++ b/packages/api/src/http.ts @@ -70,6 +70,7 @@ import { listFollowSubscriptions, makeFederationActorDocument, makeFederationContext, + makeFederationExchangeStatus, makeFederationFollowersCollection, makeFederationFollowingCollection, makeFederationLikedCollection, @@ -824,6 +825,14 @@ export const makeRouter = () => { return yield* _(activityJsonResponse(makeFederationLikedCollection(context), 200)) }).pipe(Effect.catchAll(errorResponse)) ), + HttpRouter.get( + "/federation/exchange/status", + Effect.gen(function*(_) { + const request = yield* _(HttpServerRequest.HttpServerRequest) + const context = yield* _(resolveFederationContext(request)) + return yield* _(jsonResponse(makeFederationExchangeStatus(context), 200)) + }).pipe(Effect.catchAll(errorResponse)) + ), HttpRouter.post( "/federation/exchange/subscriptions", Effect.gen(function*(_) { diff --git a/packages/api/src/services/federation.ts b/packages/api/src/services/federation.ts index 202742ad..2449a307 100644 --- a/packages/api/src/services/federation.ts +++ b/packages/api/src/services/federation.ts @@ -22,6 +22,8 @@ import type { CreateProjectRequest, ExchangePollRequest, ExchangeSubscribeRequest, + FederationExchangeEvent, + FederationExchangeStatus, FederationInboxResult, FederationIssueRecord, FollowStatus, @@ -47,6 +49,7 @@ type StoredFederationState = { readonly issues: ReadonlyArray readonly follows: ReadonlyArray readonly processedOutboxItems: ReadonlyArray + readonly exchangeEvents?: ReadonlyArray | undefined readonly localActorKeys?: LocalActorKeys | undefined } @@ -97,12 +100,14 @@ const jsonLdContentType = "application/ld+json; profile=\"https://www.w3.org/ns/ const activityAcceptHeader = `${jsonLdContentType}, ${activityJsonContentType}, application/json` const defaultExchangeQueue = "code" const stateVersion = 1 as const +const exchangeEventLimit = 100 const issueStore: Map = new Map() const followStore: Map = new Map() const followByActivityId: Map = new Map() const followByActorObject: Map = new Map() const processedOutboxItems: Set = new Set() +let exchangeEvents: ReadonlyArray = [] let localActorKeys: LocalActorKeys | null = null let stateLoaded = false @@ -120,6 +125,33 @@ const asNonEmptyString = (value: unknown): string | null => const readOptionalString = (record: JsonRecord, key: string): string | undefined => asNonEmptyString(record[key]) ?? undefined +type FederationExchangeEventDraft = Omit & { + readonly occurredAt?: string | undefined +} + +const exchangeSubscriptionTarget = (subscription: FollowSubscription): string => + subscription.subscriptionName ?? subscription.remoteActor ?? subscription.object + +const findExchangeSubscriptionByActor = (actor: string | undefined): FollowSubscription | undefined => + actor === undefined + ? undefined + : [...followStore.values()].find((subscription) => + subscription.remoteOutbox !== undefined && + (subscription.remoteActor === actor || subscription.object === actor) + ) + +const recordExchangeEvent = (event: FederationExchangeEventDraft): FederationExchangeEvent => { + const { occurredAt, ...details } = event + const stored: FederationExchangeEvent = { + id: randomUUID(), + occurredAt: occurredAt ?? nowIso(), + ...details + } + exchangeEvents = [...exchangeEvents, stored].slice(-exchangeEventLimit) + persistFederationStateBestEffort() + return stored +} + const readRequiredString = ( record: JsonRecord, key: string, @@ -302,6 +334,7 @@ const serializeState = (): StoredFederationState => ({ issues: [...issueStore.values()], follows: [...followStore.values()], processedOutboxItems: [...processedOutboxItems], + exchangeEvents, ...(localActorKeys === null ? {} : { localActorKeys }) }) @@ -377,6 +410,7 @@ const hydrateState = (state: StoredFederationState): void => { followByActivityId.clear() followByActorObject.clear() processedOutboxItems.clear() + exchangeEvents = [] for (const issue of state.issues ?? []) { issueStore.set(issue.issueId, issue) @@ -387,6 +421,7 @@ const hydrateState = (state: StoredFederationState): void => { for (const item of state.processedOutboxItems ?? []) { processedOutboxItems.add(item) } + exchangeEvents = [...(state.exchangeEvents ?? [])].slice(-exchangeEventLimit) localActorKeys = state.localActorKeys ?? null } @@ -740,6 +775,22 @@ const ingestCreateTicket = ( return issue }) +const recordIssueReceivedEvent = ( + issue: FederationIssueRecord, + options: IngestOptions +): void => { + const remoteActor = issue.actor ?? issue.ticket.attributedTo + const subscription = options.subscription ?? findExchangeSubscriptionByActor(remoteActor) + recordExchangeEvent({ + kind: "inbox.issue.received", + subscriptionId: subscription?.id, + target: subscription === undefined ? undefined : exchangeSubscriptionTarget(subscription), + queue: subscription?.queue, + issueId: issue.issueId, + remoteActor: subscription?.remoteActor ?? remoteActor + }) +} + // CHANGE: support ForgeFed issue inputs and ActivityPub inbox transitions in API mode. // WHY: issue #233 requires ForgeFed/ActivityPub subscription and task intake. // QUOTE(ТЗ): "Осталось forgefed допподержать" + "Законнектишь к exchange" @@ -768,16 +819,19 @@ export const ingestFederationInbox = ( if (hasType(record, "Offer")) { const issue = yield* _(ingestOfferTicket(record)) + recordIssueReceivedEvent(issue, options) return { kind: "issue.offer", issue } } if (hasType(record, "Create")) { const issue = yield* _(ingestCreateTicket(record, options)) + recordIssueReceivedEvent(issue, options) return { kind: "issue.create", issue } } if (hasType(record, "Ticket")) { const issue = yield* _(ingestDirectTicket(record)) + recordIssueReceivedEvent(issue, options) return { kind: "issue.ticket", issue } } @@ -785,6 +839,14 @@ export const ingestFederationInbox = ( const subscription = yield* _(resolveFollowFromInbox(record)) const status: FollowStatus = hasType(record, "Accept") ? "accepted" : "rejected" const updated = updateFollowStatus(subscription, status) + recordExchangeEvent({ + kind: status === "accepted" ? "inbox.follow.accept" : "inbox.follow.reject", + subscriptionId: updated.id, + target: exchangeSubscriptionTarget(updated), + queue: updated.queue, + status: updated.status, + remoteActor: updated.remoteActor + }) return status === "accepted" ? { kind: "follow.accept", subscription: updated } : { kind: "follow.reject", subscription: updated } @@ -1242,6 +1304,14 @@ export const ensureExchangeSubscription = ( if (inbox !== undefined) { yield* _(sendJsonLd(context, inbox, activity).pipe(Effect.ignore)) } + recordExchangeEvent({ + kind: "follow.sent", + subscriptionId: subscription.id, + target: exchangeSubscriptionTarget(subscription), + queue: subscription.queue, + status: subscription.status, + remoteActor: subscription.remoteActor + }) return { subscription, activity } }) @@ -1255,12 +1325,65 @@ export const listFollowSubscriptions = (): ReadonlyArray => export const listExchangeSubscriptions = (): ReadonlyArray => listFollowSubscriptions().filter((subscription) => subscription.remoteOutbox !== undefined) +const latestIso = (values: ReadonlyArray): string | undefined => + values + .filter((value): value is string => value !== undefined && value.length > 0) + .sort() + .at(-1) + +export const makeFederationExchangeStatus = ( + context: FederationContext +): FederationExchangeStatus => { + const subscriptions = listExchangeSubscriptions() + const recentEvents = [...exchangeEvents].sort((left, right) => right.occurredAt.localeCompare(left.occurredAt)) + const accepted = subscriptions.filter((subscription) => subscription.status === "accepted").length + const pending = subscriptions.filter((subscription) => subscription.status === "pending").length + const rejected = subscriptions.filter((subscription) => subscription.status === "rejected").length + const inboxEventTimes = exchangeEvents + .filter((event) => event.kind === "inbox.follow.accept" || event.kind === "inbox.follow.reject" || event.kind === "inbox.issue.received") + .map((event) => event.occurredAt) + const acceptedTransitionTimes = subscriptions + .filter((subscription) => subscription.status === "accepted" || subscription.status === "rejected") + .map((subscription) => subscription.updatedAt) + + return { + publicActor: context.actorId, + summary: { + subscriptions: subscriptions.length, + accepted, + pending, + rejected, + issues: issueStore.size, + processedOutboxItems: processedOutboxItems.size, + lastInboxAt: latestIso([...inboxEventTimes, ...acceptedTransitionTimes]), + lastPollAt: latestIso( + exchangeEvents + .filter((event) => event.kind === "poll.completed") + .map((event) => event.occurredAt) + ) + }, + subscriptions: subscriptions.map((subscription) => ({ + id: subscription.id, + target: exchangeSubscriptionTarget(subscription), + queue: subscription.queue, + status: subscription.status, + remoteActor: subscription.remoteActor, + remoteInbox: subscription.remoteInbox, + remoteOutbox: subscription.remoteOutbox, + createdAt: subscription.createdAt, + updatedAt: subscription.updatedAt + })), + recentEvents + } +} + export const clearFederationState = (): void => { issueStore.clear() followStore.clear() followByActivityId.clear() followByActorObject.clear() processedOutboxItems.clear() + exchangeEvents = [] localActorKeys = null stateLoaded = true } @@ -1377,8 +1500,19 @@ export const pollExchangeOutboxes = ( } } + const polledAt = nowIso() + recordExchangeEvent({ + kind: "poll.completed", + occurredAt: polledAt, + target: request.target, + totalItems, + newItems, + processedItems, + failedItems + }) + return { - polledAt: nowIso(), + polledAt, subscriptions: subscriptions.length, totalItems, newItems, diff --git a/packages/api/tests/federation.test.ts b/packages/api/tests/federation.test.ts index c525785d..3d009a59 100644 --- a/packages/api/tests/federation.test.ts +++ b/packages/api/tests/federation.test.ts @@ -12,6 +12,7 @@ import { listFollowSubscriptions, makeFederationActorDocument, makeFederationContext, + makeFederationExchangeStatus, makeFederationFollowingCollection, pollExchangeOutboxes } from "../src/services/federation.js" @@ -304,6 +305,23 @@ describe("federation service", () => { expect(created.subscription.queue).toBe("code") expect(listExchangeSubscriptions()).toHaveLength(1) + const pendingStatus = makeFederationExchangeStatus(context) + expect(pendingStatus.summary.pending).toBe(1) + expect(pendingStatus.recentEvents.map((event) => event.kind)).toContain("follow.sent") + + yield* _( + ingestFederationInbox({ + type: "Accept", + actor: "https://exchange.lefine.pro/actor/code", + object: created.activity.id + }) + ) + + const acceptedStatus = makeFederationExchangeStatus(context) + expect(acceptedStatus.summary.accepted).toBe(1) + expect(acceptedStatus.summary.lastInboxAt).toBeDefined() + expect(acceptedStatus.recentEvents.map((event) => event.kind)).toContain("inbox.follow.accept") + const firstPoll = yield* _(pollExchangeOutboxes({ runTasks: false }, context)) expect(firstPoll.newItems).toBe(1) expect(firstPoll.processedItems).toBe(1) @@ -312,10 +330,52 @@ describe("federation service", () => { expect(issues).toHaveLength(1) expect(issues[0]?.issueId).toBe("https://exchange.lefine.pro/orders/111") + const polledStatus = makeFederationExchangeStatus(context) + const polledEventKinds = polledStatus.recentEvents.map((event) => event.kind) + expect(polledStatus.summary.issues).toBe(1) + expect(polledStatus.summary.processedOutboxItems).toBe(1) + expect(polledStatus.summary.lastPollAt).toBe(firstPoll.polledAt) + expect(polledEventKinds).toContain("inbox.issue.received") + expect(polledEventKinds).toContain("poll.completed") + expect(polledStatus.recentEvents.find((event) => event.kind === "poll.completed")).toMatchObject({ + totalItems: 1, + newItems: 1, + processedItems: 1, + failedItems: 0 + }) + const secondPoll = yield* _(pollExchangeOutboxes({ runTasks: false }, context)) expect(secondPoll.newItems).toBe(0) } finally { globalThis.fetch = previousFetch } })) + + it.effect("bounds federation exchange event history", () => + Effect.gen(function*(_) { + clearFederationState() + + const context = yield* _( + makeFederationContext({ + publicOrigin: "https://social.provercoder.ai" + }) + ) + + for (let index = 0; index < 105; index += 1) { + yield* _( + ingestFederationInbox({ + type: "Ticket", + id: `https://tracker.example/issues/${index}`, + attributedTo: "https://origin.example/users/alice", + summary: `Issue ${index}`, + content: "Confirm bounded exchange event history." + }) + ) + } + + const status = makeFederationExchangeStatus(context) + expect(status.summary.issues).toBe(105) + expect(status.recentEvents).toHaveLength(100) + expect(status.recentEvents.map((event) => event.kind)).toContain("inbox.issue.received") + })) })