Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Optional env:
- `GET /federation/followers`
- `GET /federation/following`
- `GET /federation/liked`
- `GET /federation/exchange/status` (connection summary and recent exchange events)
- `POST /federation/exchange/subscriptions` (discover remote actor, persist metadata, send signed `Follow`)
- `GET /federation/exchange/subscriptions`
- `POST /federation/exchange/poll` (manual remote outbox poll)
Expand Down Expand Up @@ -131,10 +132,13 @@ Exchange targets must be explicit. Use `https://exchange.lefine.pro`, an actor U
}'

./ctl request POST /federation/exchange/poll '{}'
./ctl request GET /federation/exchange/status
./ctl request GET /federation/exchange/subscriptions
./ctl request GET /federation/issues
```

`GET /federation/exchange/status` is the live observability endpoint for a Lefine connection. It reports subscription counts, accepted/pending/rejected state, `lastInboxAt`, `lastPollAt`, persisted issue count, processed outbox items, and recent events such as `follow.sent`, `inbox.follow.accept`, `inbox.issue.received`, and `poll.completed`.

When a polled `Create(Ticket)` has no GitHub URL in the Ticket payload, `projectRepoUrl` or `DOCKER_GIT_EXCHANGE_PROJECT_REPO_URL` is required for the automatic docker-git project/agent run.

1. Read actor profile (contains `inbox/outbox/followers/following/liked`):
Expand Down
51 changes: 51 additions & 0 deletions packages/api/src/api/contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,57 @@ export type ExchangePollResult = {
readonly failedItems: number
}

export type FederationExchangeEventKind =
| "follow.sent"
| "inbox.follow.accept"
| "inbox.follow.reject"
| "inbox.issue.received"
| "poll.completed"

export type FederationExchangeEvent = {
readonly id: string
readonly kind: FederationExchangeEventKind
readonly occurredAt: string
readonly subscriptionId?: string | undefined
readonly target?: string | undefined
readonly queue?: string | undefined
readonly status?: FollowStatus | undefined
readonly issueId?: string | undefined
readonly remoteActor?: string | undefined
readonly totalItems?: number | undefined
readonly newItems?: number | undefined
readonly processedItems?: number | undefined
readonly failedItems?: number | undefined
}

export type FederationExchangeStatusSubscription = {
readonly id: string
readonly target: string
readonly queue?: string | undefined
readonly status: FollowStatus
readonly remoteActor?: string | undefined
readonly remoteInbox?: string | undefined
readonly remoteOutbox?: string | undefined
readonly createdAt: string
readonly updatedAt: string
}

export type FederationExchangeStatus = {
readonly publicActor: string
readonly summary: {
readonly subscriptions: number
readonly accepted: number
readonly pending: number
readonly rejected: number
readonly issues: number
readonly processedOutboxItems: number
readonly lastInboxAt?: string | undefined
readonly lastPollAt?: string | undefined
}
readonly subscriptions: ReadonlyArray<FederationExchangeStatusSubscription>
readonly recentEvents: ReadonlyArray<FederationExchangeEvent>
}

export type ApiEventType =
| "snapshot"
| "project.created"
Expand Down
9 changes: 9 additions & 0 deletions packages/api/src/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import {
listFollowSubscriptions,
makeFederationActorDocument,
makeFederationContext,
makeFederationExchangeStatus,
makeFederationFollowersCollection,
makeFederationFollowingCollection,
makeFederationLikedCollection,
Expand Down Expand Up @@ -824,6 +825,14 @@ export const makeRouter = () => {
return yield* _(activityJsonResponse(makeFederationLikedCollection(context), 200))
}).pipe(Effect.catchAll(errorResponse))
),
HttpRouter.get(
"/federation/exchange/status",
Effect.gen(function*(_) {
const request = yield* _(HttpServerRequest.HttpServerRequest)
const context = yield* _(resolveFederationContext(request))
return yield* _(jsonResponse(makeFederationExchangeStatus(context), 200))
}).pipe(Effect.catchAll(errorResponse))
),
HttpRouter.post(
"/federation/exchange/subscriptions",
Effect.gen(function*(_) {
Expand Down
136 changes: 135 additions & 1 deletion packages/api/src/services/federation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import type {
CreateProjectRequest,
ExchangePollRequest,
ExchangeSubscribeRequest,
FederationExchangeEvent,
FederationExchangeStatus,
FederationInboxResult,
FederationIssueRecord,
FollowStatus,
Expand All @@ -47,6 +49,7 @@ type StoredFederationState = {
readonly issues: ReadonlyArray<FederationIssueRecord>
readonly follows: ReadonlyArray<FollowSubscription>
readonly processedOutboxItems: ReadonlyArray<string>
readonly exchangeEvents?: ReadonlyArray<FederationExchangeEvent> | undefined
readonly localActorKeys?: LocalActorKeys | undefined
}

Expand Down Expand Up @@ -97,12 +100,14 @@ const jsonLdContentType = "application/ld+json; profile=\"https://www.w3.org/ns/
const activityAcceptHeader = `${jsonLdContentType}, ${activityJsonContentType}, application/json`
const defaultExchangeQueue = "code"
const stateVersion = 1 as const
const exchangeEventLimit = 100

const issueStore: Map<string, FederationIssueRecord> = new Map()
const followStore: Map<string, FollowSubscription> = new Map()
const followByActivityId: Map<string, string> = new Map()
const followByActorObject: Map<string, string> = new Map()
const processedOutboxItems: Set<string> = new Set()
let exchangeEvents: ReadonlyArray<FederationExchangeEvent> = []
let localActorKeys: LocalActorKeys | null = null
let stateLoaded = false

Expand All @@ -120,6 +125,33 @@ const asNonEmptyString = (value: unknown): string | null =>
const readOptionalString = (record: JsonRecord, key: string): string | undefined =>
asNonEmptyString(record[key]) ?? undefined

type FederationExchangeEventDraft = Omit<FederationExchangeEvent, "id" | "occurredAt"> & {
readonly occurredAt?: string | undefined
}

const exchangeSubscriptionTarget = (subscription: FollowSubscription): string =>
subscription.subscriptionName ?? subscription.remoteActor ?? subscription.object

const findExchangeSubscriptionByActor = (actor: string | undefined): FollowSubscription | undefined =>
actor === undefined
? undefined
: [...followStore.values()].find((subscription) =>
subscription.remoteOutbox !== undefined &&
(subscription.remoteActor === actor || subscription.object === actor)
)

const recordExchangeEvent = (event: FederationExchangeEventDraft): FederationExchangeEvent => {
const { occurredAt, ...details } = event
const stored: FederationExchangeEvent = {
id: randomUUID(),
occurredAt: occurredAt ?? nowIso(),
...details
}
exchangeEvents = [...exchangeEvents, stored].slice(-exchangeEventLimit)
persistFederationStateBestEffort()
return stored
}

const readRequiredString = (
record: JsonRecord,
key: string,
Expand Down Expand Up @@ -302,6 +334,7 @@ const serializeState = (): StoredFederationState => ({
issues: [...issueStore.values()],
follows: [...followStore.values()],
processedOutboxItems: [...processedOutboxItems],
exchangeEvents,
...(localActorKeys === null ? {} : { localActorKeys })
})

Expand Down Expand Up @@ -377,6 +410,7 @@ const hydrateState = (state: StoredFederationState): void => {
followByActivityId.clear()
followByActorObject.clear()
processedOutboxItems.clear()
exchangeEvents = []

for (const issue of state.issues ?? []) {
issueStore.set(issue.issueId, issue)
Expand All @@ -387,6 +421,7 @@ const hydrateState = (state: StoredFederationState): void => {
for (const item of state.processedOutboxItems ?? []) {
processedOutboxItems.add(item)
}
exchangeEvents = [...(state.exchangeEvents ?? [])].slice(-exchangeEventLimit)
localActorKeys = state.localActorKeys ?? null
}

Expand Down Expand Up @@ -740,6 +775,22 @@ const ingestCreateTicket = (
return issue
})

const recordIssueReceivedEvent = (
issue: FederationIssueRecord,
options: IngestOptions
): void => {
const remoteActor = issue.actor ?? issue.ticket.attributedTo
const subscription = options.subscription ?? findExchangeSubscriptionByActor(remoteActor)
recordExchangeEvent({
kind: "inbox.issue.received",
subscriptionId: subscription?.id,
target: subscription === undefined ? undefined : exchangeSubscriptionTarget(subscription),
queue: subscription?.queue,
issueId: issue.issueId,
remoteActor: subscription?.remoteActor ?? remoteActor
})
}

// CHANGE: support ForgeFed issue inputs and ActivityPub inbox transitions in API mode.
// WHY: issue #233 requires ForgeFed/ActivityPub subscription and task intake.
// QUOTE(ТЗ): "Осталось forgefed допподержать" + "Законнектишь к exchange"
Expand Down Expand Up @@ -768,23 +819,34 @@ export const ingestFederationInbox = (

if (hasType(record, "Offer")) {
const issue = yield* _(ingestOfferTicket(record))
recordIssueReceivedEvent(issue, options)
return { kind: "issue.offer", issue }
}

if (hasType(record, "Create")) {
const issue = yield* _(ingestCreateTicket(record, options))
recordIssueReceivedEvent(issue, options)
return { kind: "issue.create", issue }
}

if (hasType(record, "Ticket")) {
const issue = yield* _(ingestDirectTicket(record))
recordIssueReceivedEvent(issue, options)
return { kind: "issue.ticket", issue }
}

if (hasType(record, "Accept") || hasType(record, "Reject")) {
const subscription = yield* _(resolveFollowFromInbox(record))
const status: FollowStatus = hasType(record, "Accept") ? "accepted" : "rejected"
const updated = updateFollowStatus(subscription, status)
recordExchangeEvent({
kind: status === "accepted" ? "inbox.follow.accept" : "inbox.follow.reject",
subscriptionId: updated.id,
target: exchangeSubscriptionTarget(updated),
queue: updated.queue,
status: updated.status,
remoteActor: updated.remoteActor
})
return status === "accepted"
? { kind: "follow.accept", subscription: updated }
: { kind: "follow.reject", subscription: updated }
Expand Down Expand Up @@ -1242,6 +1304,14 @@ export const ensureExchangeSubscription = (
if (inbox !== undefined) {
yield* _(sendJsonLd(context, inbox, activity).pipe(Effect.ignore))
}
recordExchangeEvent({
kind: "follow.sent",
subscriptionId: subscription.id,
target: exchangeSubscriptionTarget(subscription),
queue: subscription.queue,
status: subscription.status,
remoteActor: subscription.remoteActor
})

return { subscription, activity }
})
Expand All @@ -1255,12 +1325,65 @@ export const listFollowSubscriptions = (): ReadonlyArray<FollowSubscription> =>
export const listExchangeSubscriptions = (): ReadonlyArray<FollowSubscription> =>
listFollowSubscriptions().filter((subscription) => subscription.remoteOutbox !== undefined)

const latestIso = (values: ReadonlyArray<string | undefined>): string | undefined =>
values
.filter((value): value is string => value !== undefined && value.length > 0)
.sort()
.at(-1)

export const makeFederationExchangeStatus = (
context: FederationContext
): FederationExchangeStatus => {
const subscriptions = listExchangeSubscriptions()
const recentEvents = [...exchangeEvents].sort((left, right) => right.occurredAt.localeCompare(left.occurredAt))
const accepted = subscriptions.filter((subscription) => subscription.status === "accepted").length
const pending = subscriptions.filter((subscription) => subscription.status === "pending").length
const rejected = subscriptions.filter((subscription) => subscription.status === "rejected").length
const inboxEventTimes = exchangeEvents
.filter((event) => event.kind === "inbox.follow.accept" || event.kind === "inbox.follow.reject" || event.kind === "inbox.issue.received")
.map((event) => event.occurredAt)
const acceptedTransitionTimes = subscriptions
.filter((subscription) => subscription.status === "accepted" || subscription.status === "rejected")
.map((subscription) => subscription.updatedAt)

return {
publicActor: context.actorId,
summary: {
subscriptions: subscriptions.length,
accepted,
pending,
rejected,
issues: issueStore.size,
processedOutboxItems: processedOutboxItems.size,
lastInboxAt: latestIso([...inboxEventTimes, ...acceptedTransitionTimes]),
lastPollAt: latestIso(
exchangeEvents
.filter((event) => event.kind === "poll.completed")
.map((event) => event.occurredAt)
)
},
subscriptions: subscriptions.map((subscription) => ({
id: subscription.id,
target: exchangeSubscriptionTarget(subscription),
queue: subscription.queue,
status: subscription.status,
remoteActor: subscription.remoteActor,
remoteInbox: subscription.remoteInbox,
remoteOutbox: subscription.remoteOutbox,
createdAt: subscription.createdAt,
updatedAt: subscription.updatedAt
})),
recentEvents
}
}

export const clearFederationState = (): void => {
issueStore.clear()
followStore.clear()
followByActivityId.clear()
followByActorObject.clear()
processedOutboxItems.clear()
exchangeEvents = []
localActorKeys = null
stateLoaded = true
}
Expand Down Expand Up @@ -1377,8 +1500,19 @@ export const pollExchangeOutboxes = (
}
}

const polledAt = nowIso()
recordExchangeEvent({
kind: "poll.completed",
occurredAt: polledAt,
target: request.target,
totalItems,
newItems,
processedItems,
failedItems
})

return {
polledAt: nowIso(),
polledAt,
subscriptions: subscriptions.length,
totalItems,
newItems,
Expand Down
Loading