diff --git a/.env.docker b/.env.docker index 20a6c6a7..2f3bd0f4 100644 --- a/.env.docker +++ b/.env.docker @@ -1,4 +1,4 @@ DB_CONNECTION_STRING="postgres://postgres:xmtp@db:5432/postgres?sslmode=disable" -XMTP_GRPC_ADDRESS="node:5556" +XMTP_GRPC_ADDRESS="xnet-100:5050" LOG_ENCODING=console -API_PORT="8080" \ No newline at end of file +API_PORT="8080" diff --git a/.env.local b/.env.local index 868cbfd6..f628ca3f 100644 --- a/.env.local +++ b/.env.local @@ -1,4 +1,4 @@ DB_CONNECTION_STRING="postgres://postgres:xmtp@localhost:25432/postgres?sslmode=disable" -XMTP_GRPC_ADDRESS="localhost:25556" +XMTP_GRPC_ADDRESS="localhost:5556" LOG_ENCODING=console API_PORT="8080" \ No newline at end of file diff --git a/.github/actions/setup-xnet/action.yml b/.github/actions/setup-xnet/action.yml new file mode 100644 index 00000000..b970dc82 --- /dev/null +++ b/.github/actions/setup-xnet/action.yml @@ -0,0 +1,38 @@ +name: "Setup xnet" +description: "Install Nix and xnet-cli for XMTP network testing" + +inputs: + github-token: + description: "GitHub token for Nix installation" + required: true + cachix-auth-token: + description: "Cachix auth token for cache pushes" + required: false + +runs: + using: "composite" + steps: + - uses: cachix/install-nix-action@v31 + with: + github_access_token: ${{ inputs.github-token }} + extra_nix_config: | + accept-flake-config = true + extra-trusted-public-keys = xmtp.cachix.org-1:nFPFrqLQ9kjYQKiWL7gKq6llcNEeaV4iI+Ka1F+Tmq0= + extra-substituters = https://xmtp.cachix.org + - uses: cachix/cachix-action@v16 + with: + name: xmtp + authToken: ${{ inputs.cachix-auth-token }} + - name: Install xnet-cli + shell: bash + run: ./dev/install-xnet + - name: Setup DNS + shell: bash + run: | + sudo mkdir -p /etc/systemd/resolved.conf.d + sudo tee /etc/systemd/resolved.conf.d/xmtp.conf </dev/null 2>&1; then + xnet delete +fi diff --git a/dev/install-xnet b/dev/install-xnet new file mode 100755 index 00000000..87b08723 --- /dev/null +++ b/dev/install-xnet @@ -0,0 +1,52 @@ +#!/bin/bash +set -euo pipefail + +XNET_COMMIT="001ce0a76a8727319e3209b1bd509792388d67d2" +XNET_FLAKE="github:xmtp/libxmtp/${XNET_COMMIT}" +XNET_REF="${XNET_FLAKE}#xnet-cli" +XNET_LOCKED_PREFIX="${XNET_FLAKE}" + +have_command() { command -v "$1" >/dev/null 2>&1; } + +ensure_nix_profile_path() { + local nix_path + + for nix_path in \ + "$HOME/.nix-profile/bin" \ + "$HOME/.local/state/nix/profile/bin" \ + "/nix/var/nix/profiles/default/bin" + do + if [[ -d "$nix_path" ]] && [[ ":$PATH:" != *":$nix_path:"* ]]; then + PATH="$nix_path:$PATH" + fi + done +} + +ensure_nix_profile_path + +if ! have_command nix; then + echo "Nix is required to install xnet-cli." >&2 + exit 1 +fi + +if nix profile list --json --no-pretty | jq -e --arg locked_prefix "$XNET_LOCKED_PREFIX" ' + .elements + | [to_entries[] + | select( + .value.active + and ((.value.attrPath // "") | endswith(".xnet-cli")) + ) + ] as $xnet_entries + | ($xnet_entries | length) > 0 + and all( + $xnet_entries[]; + ((.value.url // "") | startswith($locked_prefix)) + ) +' >/dev/null; then + echo "xnet-cli already matches pinned commit $XNET_COMMIT" + exit 0 +fi + +echo "Installing xnet-cli from $XNET_REF" +nix profile remove xnet-cli >/dev/null 2>&1 || true +nix --accept-flake-config profile add "$XNET_REF" diff --git a/dev/integration b/dev/integration index 174c7ab4..5f59459c 100755 --- a/dev/integration +++ b/dev/integration @@ -1,6 +1,47 @@ #!/bin/bash set -eou pipefail -docker compose up -d node -docker compose up integration +SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" + +# message-source controls both how the XMTP client writes messages and what +# listener topology the notification server is expected to exercise: +# - v3-direct: +# client writes directly to node-go and the notification server listens to the +# same v3 node with the v3 listener. +# - v4-with-migrator: +# client writes to node-go, the migrator forwards those messages into xmtpd, +# and the notification server listens to xmtpd with the v4 listener. +# - v4-direct: +# client writes to xmtpd after cutover + d14n activation, and the +# notification server also listens directly to xmtpd with the v4 listener. +MESSAGE_SOURCE="${1:-v3-direct}" +source "$SCRIPT_DIR/xnet-env" "$MESSAGE_SOURCE" + +"$SCRIPT_DIR/down" +"$SCRIPT_DIR/up" "$MESSAGE_SOURCE" + +echo "message-source=$MESSAGE_SOURCE listener_type=$LISTENER_TYPE listener_address=$XMTP_LISTENER_ADDRESS migrator_running=$XMTP_MIGRATOR_RUNNING xmtp_client_url=$XMTP_CLIENT_URL xmtp_gateway_url=${XMTP_GATEWAY_URL:-}" + +docker compose build -q notification_server integration + +# Get CoreDNS container IP for DNS resolution inside containers +COREDNS_IP=$(docker inspect xnet-coredns -f '{{.NetworkSettings.Networks.xnet.IPAddress}}') +export COREDNS_IP + +docker compose up -d notification_server +notification_server_container_id="$(docker compose ps -q notification_server)" +for _ in $(seq 1 30); do + if [ "$(docker inspect --format '{{if .State.Health}}{{.State.Health.Status}}{{end}}' "$notification_server_container_id")" = "healthy" ]; then + break + fi + sleep 1 +done + +if [ "$(docker inspect --format '{{if .State.Health}}{{.State.Health.Status}}{{end}}' "$notification_server_container_id")" != "healthy" ]; then + echo "notification_server did not become healthy for message-source $MESSAGE_SOURCE" >&2 + docker compose logs notification_server >&2 || true + exit 1 +fi + +docker compose run --rm --use-aliases integration npx vitest run exit $? diff --git a/dev/up b/dev/up index 4ad2582e..0544de25 100755 --- a/dev/up +++ b/dev/up @@ -1,4 +1,89 @@ #!/bin/bash set -eou pipefail -docker compose up -d node +SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" +ensure_nix_profile_path() { + local nix_path + + for nix_path in \ + "$HOME/.nix-profile/bin" \ + "$HOME/.local/state/nix/profile/bin" \ + "/nix/var/nix/profiles/default/bin" + do + if [[ -d "$nix_path" ]] && [[ ":$PATH:" != *":$nix_path:"* ]]; then + PATH="$nix_path:$PATH" + fi + done +} + +ensure_xnet_cli() { + "$SCRIPT_DIR/install-xnet" + ensure_nix_profile_path + + if ! command -v xnet-cli >/dev/null 2>&1; then + echo "xnet-cli is required. Install Nix and rerun ./dev/up, or install xnet-cli manually." >&2 + exit 1 + fi +} + +xnet() { "$SCRIPT_DIR/xnet" "$@"; } +is_node_running() { docker inspect xnet-100 --format '{{.State.Running}}' 2>/dev/null | grep -q '^true$'; } +is_node_registered() { xnet addresses 2>&1 | grep -q "xnet-100"; } +is_v3_node_running() { docker inspect xnet-node --format '{{.State.Running}}' 2>/dev/null | grep -q '^true$'; } + +MESSAGE_SOURCE="${1:-v3-direct}" +source "$SCRIPT_DIR/xnet-env" "$MESSAGE_SOURCE" + +ensure_xnet_cli +xnet up --paused + +if [[ "$XMTP_CLIENT_URL" == "http://xnet-node:5556" ]]; then + for _ in $(seq 1 30); do + if is_v3_node_running; then + break + fi + sleep 1 + done + + if ! is_v3_node_running; then + echo "xnet-node did not become ready for message-source $MESSAGE_SOURCE" >&2 + exit 1 + fi +fi + +if [[ "$XMTP_NEEDS_XMTPD_NODE" == "true" ]]; then + xnet node add --migrator + + if [[ "$XMTP_SHOULD_ENABLE_D14N" == "true" ]]; then + xnet migrate + xnet activate-d14n + fi + + for _ in $(seq 1 30); do + if is_node_running; then + break + fi + sleep 1 + done + + if ! is_node_running; then + echo "xnet-100 did not become ready for message-source $MESSAGE_SOURCE" >&2 + exit 1 + fi + + if [[ "$XMTP_SHOULD_ENABLE_D14N" == "true" ]]; then + for _ in $(seq 1 90); do + if is_node_registered; then + break + fi + sleep 1 + done + + if ! is_node_registered; then + echo "xnet-100 did not become registered for message-source $MESSAGE_SOURCE" >&2 + exit 1 + fi + fi +fi + +docker compose up -d db diff --git a/dev/xnet b/dev/xnet new file mode 100755 index 00000000..a8d2e010 --- /dev/null +++ b/dev/xnet @@ -0,0 +1,6 @@ +#!/bin/bash +set -eou pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +xnet-cli -c "$SCRIPT_DIR/../xnet.toml" "$@" diff --git a/dev/xnet-env b/dev/xnet-env new file mode 100755 index 00000000..d31987ea --- /dev/null +++ b/dev/xnet-env @@ -0,0 +1,40 @@ +#!/bin/bash + +MESSAGE_SOURCE="${1:-v3-direct}" + +case "$MESSAGE_SOURCE" in + v3-direct) + export MESSAGE_SOURCE + export LISTENER_TYPE="v3" + export XMTP_LISTENER_ADDRESS="xnet-node:5556" + export XMTP_CLIENT_URL="http://xnet-node:5556" + export XMTP_GATEWAY_URL="" + export XMTP_NEEDS_XMTPD_NODE="false" + export XMTP_SHOULD_ENABLE_D14N="false" + export XMTP_MIGRATOR_RUNNING="false" + ;; + v4-with-migrator) + export MESSAGE_SOURCE + export LISTENER_TYPE="v4" + export XMTP_LISTENER_ADDRESS="xnet-100:5050" + export XMTP_CLIENT_URL="http://xnet-node:5556" + export XMTP_GATEWAY_URL="" + export XMTP_NEEDS_XMTPD_NODE="true" + export XMTP_SHOULD_ENABLE_D14N="false" + export XMTP_MIGRATOR_RUNNING="true" + ;; + v4-direct) + export MESSAGE_SOURCE + export LISTENER_TYPE="v4" + export XMTP_LISTENER_ADDRESS="xnet-100:5050" + export XMTP_CLIENT_URL="http://xnet-100:5050" + export XMTP_GATEWAY_URL="http://xnet-gateway:5050" + export XMTP_NEEDS_XMTPD_NODE="true" + export XMTP_SHOULD_ENABLE_D14N="true" + export XMTP_MIGRATOR_RUNNING="false" + ;; + *) + echo "Unknown message-source: $MESSAGE_SOURCE (use v3-direct, v4-with-migrator, or v4-direct)" >&2 + return 1 2>/dev/null || exit 1 + ;; +esac diff --git a/docker-compose.yml b/docker-compose.yml index c6d55109..b326de62 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,50 +1,36 @@ services: - node: - image: ghcr.io/xmtp/node-go:main - platform: linux/amd64 - environment: - - GOWAKU-NODEKEY=8a30dcb604b0b53627a5adc054dbf434b446628d4bd1eccc681d223f0550ce67 - command: - - --store.enable - - --store.db-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable - - --store.reader-db-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable - - --mls-store.db-connection-string=postgres://postgres:xmtp@mlsdb:5432/postgres?sslmode=disable - - --mls-validation.grpc-address=validation:50051 - - --api.enable-mls - - --wait-for-db=30s - ports: - - 25555:5555 - - 25556:5556 - depends_on: - - db - - mlsdb - - validation - - validation: - image: ghcr.io/xmtp/mls-validation-service:main - platform: linux/amd64 - db: image: postgres:13 environment: POSTGRES_PASSWORD: xmtp ports: - 25432:5432 - - mlsdb: - image: postgres:13 - environment: - POSTGRES_PASSWORD: xmtp + networks: + - xnet notification_server: build: context: . - env_file: .env.docker + environment: + DB_CONNECTION_STRING: "postgres://postgres:xmtp@db:5432/postgres?sslmode=disable" + LOG_ENCODING: console + API_PORT: "8080" + dns: + - ${COREDNS_IP:-172.18.0.2} depends_on: - - node + - db + healthcheck: + test: ["CMD-SHELL", "wget -q -O /dev/null http://localhost:8080/readyz"] + interval: 2s + timeout: 2s + retries: 30 + networks: + - xnet command: - --xmtp-listener - --api + - --listener-type=${LISTENER_TYPE:-v4} + - --xmtp-address=${XMTP_LISTENER_ADDRESS:-xnet-100:5050} - --http-delivery - --http-delivery-address=http://integration:7777/post - --api-port=8080 @@ -57,13 +43,22 @@ services: - 7777 volumes: - ./integration/src:/usr/app/src:ro + dns: + - ${COREDNS_IP:-172.18.0.2} depends_on: - - node - - notification_server + notification_server: + condition: service_healthy environment: - - XMTP_NODE_URL=http://node:5556 + - XMTP_NODE_URL=${XMTP_CLIENT_URL:-http://xnet-node:5556} - NOTIFICATION_SERVER_URL=http://notification_server:8080 + - XMTP_GATEWAY_URL=${XMTP_GATEWAY_URL:-} + networks: + - xnet command: - npx - vitest - run + +networks: + xnet: + external: true diff --git a/docs/v4-migration-guide.md b/docs/v4-migration-guide.md new file mode 100644 index 00000000..43926a0a --- /dev/null +++ b/docs/v4-migration-guide.md @@ -0,0 +1,56 @@ +# Decentralized Network Migration Guide + +## Who is this for + +This guide is for developers already running a notification server (either this code directly or a fork) in a deployed environment. This guide walks you through how to handle the change without interrupting deliveries for clients. + +## What's changing + +On [CUTOVER DATE] all XMTP clients will stop writing to the current XMTP `v3` network and begin writing to the decentralized `v4` network. The current version of the notification server is configured to read from either network. Before the cutover date, you will need to configure your notification server to read from a `v4` endpoint or message deliveries will stop. + +## Preparing for the migration + +The latest version of the notification server is designed to smooth over the transition by translating messages from `v4` formats to `v3` formats. This means that **no client changes are required to upgrade your notification server to the latest version**. Even if you are connected to the `v4` network, legacy clients can receive messages in the format they already expect. + +## Required changes before the cutover + + +### 1. Update your client + +[TODO: Describe changes to client methods for receiving v4 payload format notifications] + +### 2. Switch your development environment to `testnet` + +If you currently use the `dev` environment for local development and test versions of your application, you should configure those clients to use the `testnet` network instead. + +Once you have rolled out that change, configure the notification server you use for your development environment with the following flags + +- Set the `--listener-type` command line flag or `LISTENER_TYPE` environment variable to `v4`. This tells the notification server to expect payloads from the XMTP network to arrive in a `v4` format +- Set the `--xmtp-address` command line flag or `XMTP_GRPC_ADDRESS` environment variable to `https://grpc.testnet.xmtp.network:443`. This tells the notification server to connect to the new testnet nodes. + +Testnet is a wholly new environment and old messages do not carry over. Any clients previously on the `dev` network will need to sign in again and start with a clean messaging history. + +### 3. Switch your production environment to `mainnet` + +In mainnet, XMTP is running a continuous migration of new messages from `v3` to `v4` up until the cutover. That means that your notification server can start receiving and forwarding messages from the `v4` network today, before clients start talking directly to the `v4` network on [CUTOVER DATE]. + +For the notification server you use in your production environment, make the following configuration changes: + +- Set the `--listener-type` command line flag or `LISTENER_TYPE` environment variable to `v4`. This tells the notification server to expect payloads from the XMTP network to arrive in a `v4` format. +- Set the `--xmtp-address` command line flag or `XMTP_GRPC_ADDRESS` environment variable to `https://grpc.mainnet.xmtp.network:443`. This tells the notification server to connect to mainnet. + +## Performance and latency + +The `v4` network has comparable latency and throughput to `v3`. Until [CUTOVER DATE] messages sent to `v3` production must travel from `production` -> migration service -> `mainnet`. That additional hop adds approximately 2 seconds of delay before a message is received by the push server. After [CUTOVER DATE] latency will return to normal. + +## Breaking changes + +- Previous versions of the example notification server did not validate whether subscribed topics matched the expected XMTP topic format. You could subscribe to `/foo` previously and it would be accepted. Version 2.0 and above will assert that all subscribed topics must match the expected topic format and error on subscriptions that contain invalid topics. This is mostly relevant if you are writing integration tests against your notification server. + +## Under the hood changes + +As XMTP moves from the `v3` network to the `v4` decentralized backend there are a few important changes to the protocol that impact the internals of the notification server. + +- New binary format for topics (convertible in both directions). A database migration upgrades all previously saved topics. +- New wrapper envelope types (`v4` envelopes can be converted back to `v3` envelopes) +- New URL to conenct to the `mainnet` (f.k.a `production`) and `testnet` (f.k.a `dev`) environments diff --git a/integration/Dockerfile b/integration/Dockerfile index e56b11f1..ecde1a64 100644 --- a/integration/Dockerfile +++ b/integration/Dockerfile @@ -1,5 +1,8 @@ -FROM node:22-slim AS base +FROM node:22-bookworm-slim AS node + +FROM ubuntu:24.04 AS base RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* +COPY --from=node /usr/local /usr/local WORKDIR /usr/app # Install dependencies into temp directory diff --git a/integration/src/config.ts b/integration/src/config.ts index 7d88f96a..d39afbe9 100644 --- a/integration/src/config.ts +++ b/integration/src/config.ts @@ -10,4 +10,5 @@ function assertEnvVar(key: string): string { export const config = { nodeUrl: assertEnvVar("XMTP_NODE_URL"), notificationServerUrl: assertEnvVar("NOTIFICATION_SERVER_URL"), + gatewayHost: process.env.XMTP_GATEWAY_URL || undefined, } as const; diff --git a/integration/src/index.test.ts b/integration/src/index.test.ts index 1a0812aa..ab4f6823 100644 --- a/integration/src/index.test.ts +++ b/integration/src/index.test.ts @@ -1,20 +1,40 @@ import Koa from "koa"; import { bodyParser } from "@koa/bodyparser"; +import type { Client as XmtpClient } from "@xmtp/node-sdk"; import { expect, test, afterAll, describe } from "vitest"; -import { createNotificationClient, randomClient } from "."; +import { createNotificationClient, randomClient, sleep } from "."; import type { NotificationResponse } from "./types"; const PORT = 7777; describe("notifications", () => { - let onRequest = (req: NotificationResponse) => - console.log("No request handler set for", req); + const pendingResolvers = new Map< + string, + (body: NotificationResponse) => void + >(); + const pendingNotifications = new Map(); // Set up a Koa server to receive messages from the HttpDelivery service const app = new Koa(); app.use(bodyParser()); app.use(async (ctx) => { - onRequest(ctx.request.body as NotificationResponse); + const body = ctx.request.body as NotificationResponse; + const installationId = body.installation?.id; + if (!installationId) { + console.log("Notification missing installation id"); + ctx.status = 200; + return; + } + + const resolver = pendingResolvers.get(installationId); + if (resolver) { + pendingResolvers.delete(installationId); + resolver(body); + } else { + const queue = pendingNotifications.get(installationId) ?? []; + queue.push(body); + pendingNotifications.set(installationId, queue); + } ctx.status = 200; }); const server = app.listen(PORT); @@ -23,17 +43,80 @@ describe("notifications", () => { server.close(); }); - const waitForNextRequest = ( + const waitForNotification = ( + installationId: string, timeoutMs: number, ): Promise => new Promise((resolve, reject) => { - onRequest = (body) => resolve(body); - setTimeout(reject, timeoutMs); + const queued = pendingNotifications.get(installationId); + if (queued && queued.length > 0) { + const [nextNotification, ...rest] = queued; + if (rest.length > 0) { + pendingNotifications.set(installationId, rest); + } else { + pendingNotifications.delete(installationId); + } + resolve(nextNotification); + return; + } + + const timer = setTimeout(() => { + pendingResolvers.delete(installationId); + reject( + new Error(`Timed out waiting for notification for ${installationId}`), + ); + }, timeoutMs); + + pendingResolvers.set(installationId, (body) => { + clearTimeout(timer); + resolve(body); + }); }); + const expectNoNotification = async ( + installationId: string, + timeoutMs: number, + ) => { + const result = await waitForNotification(installationId, timeoutMs).catch( + () => "timeout", + ); + expect(result).toEqual("timeout"); + }; + + const waitForConversationCount = async ( + client: XmtpClient, + expectedCount: number, + timeoutMs: number, + ) => { + const start = Date.now(); + let lastError: unknown; + + while (Date.now() - start < timeoutMs) { + try { + await client.conversations.syncAll(); + } catch (error) { + lastError = error; + } + + const conversations = await client.conversations.list(); + if (conversations.length >= expectedCount) { + return conversations; + } + + await new Promise((resolve) => setTimeout(resolve, 250)); + } + + if (lastError) { + throw lastError; + } + + throw new Error(`Timed out waiting for ${expectedCount} conversations`); + }; + test("conversation invites", async () => { const alix = await randomClient(); const bo = await randomClient(); + const alixNotificationClient = createNotificationClient(); await alixNotificationClient.registerInstallation({ installationId: alix.installationId, @@ -56,7 +139,7 @@ describe("notifications", () => { ], }); - const notificationPromise = waitForNextRequest(10000); + const notificationPromise = waitForNotification(alix.installationId, 30000); // Bo creates a DM with alix, which sends a welcome to alix's installation await bo.conversations.createDm(alix.inboxId); const notification = await notificationPromise; @@ -87,8 +170,7 @@ describe("notifications", () => { const boGroup = await bo.conversations.createGroup([alix.inboxId]); expect((await alix.conversations.list()).length).toEqual(0); - await alix.conversations.syncAll(); - const alixGroups = await alix.conversations.list(); + const alixGroups = await waitForConversationCount(alix, 1, 15000); expect(alixGroups.length).toEqual(1); const alixGroup = alixGroups[0]; @@ -113,7 +195,7 @@ describe("notifications", () => { ], }); - const notificationPromise = waitForNextRequest(10000); + const notificationPromise = waitForNotification(alix.installationId, 15000); await alixGroup.sendText("This should never be delivered"); await boGroup.sendText("This should be delivered"); @@ -125,4 +207,127 @@ describe("notifications", () => { expect(notification.subscription.is_silent).toBe(false); expect(notification.installation.delivery_mechanism.token).toEqual("token"); }); + + test("selective unsubscribe", async () => { + const alix = await randomClient(); + const bo = await randomClient(); + const notifClient = createNotificationClient(); + + // Register alix + await notifClient.registerInstallation({ + installationId: alix.installationId, + deliveryMechanism: { + deliveryMechanismType: { value: "token", case: "apnsDeviceToken" }, + }, + }); + + // Bo creates two groups with alix + const group1 = await bo.conversations.createGroup([alix.inboxId]); + const group2 = await bo.conversations.createGroup([alix.inboxId]); + const alixGroups = await waitForConversationCount(alix, 2, 15000); + + // Subscribe to both group topics with HMAC keys + const hmacKeys = alix.conversations.hmacKeys(); + await notifClient.subscribeWithMetadata({ + installationId: alix.installationId, + subscriptions: alixGroups.map((g) => ({ + topic: g.topic, + isSilent: false, + hmacKeys: hmacKeys[g.id]?.map((v) => ({ + thirtyDayPeriodsSinceEpoch: Number(v.epoch), + key: Uint8Array.from(v.key), + })), + })), + }); + + // Unsubscribe from group1 while keeping group2 active + await notifClient.unsubscribe({ + installationId: alix.installationId, + topics: [group1.topic], + }); + + // Send messages to both groups — only group2 should be delivered + const notificationPromise = waitForNotification(alix.installationId, 15000); + await group1.sendText("Should NOT be delivered"); + await group2.sendText("Should be delivered"); + + const notification = await notificationPromise; + expect(notification.message.content_topic).toEqual(group2.topic); + await expectNoNotification(alix.installationId, 3000); + }); + + test("group message sender filtering", async () => { + const alix = await randomClient(); + const bo = await randomClient(); + const notifClient = createNotificationClient(); + + // Register alix + await notifClient.registerInstallation({ + installationId: alix.installationId, + deliveryMechanism: { + deliveryMechanismType: { value: "token", case: "apnsDeviceToken" }, + }, + }); + + // Bo creates group, invites alix + const boGroup = await bo.conversations.createGroup([alix.inboxId]); + const alixGroups = await waitForConversationCount(alix, 1, 15000); + const alixGroup = alixGroups[0]; + + // Alix subscribes with HMAC keys + const hmacKeys = alix.conversations.hmacKeys(); + await notifClient.subscribeWithMetadata({ + installationId: alix.installationId, + subscriptions: [ + { + topic: alixGroup.topic, + isSilent: false, + hmacKeys: hmacKeys[alixGroup.id]?.map((v) => ({ + thirtyDayPeriodsSinceEpoch: Number(v.epoch), + key: Uint8Array.from(v.key), + })), + }, + ], + }); + + // Both send messages — only bo's should be delivered + const notificationPromise = waitForNotification(alix.installationId, 15000); + await alixGroup.sendText("From alix — should NOT be delivered"); + await boGroup.sendText("From bo — should be delivered"); + + const notification = await notificationPromise; + expect(notification.message.content_topic).toEqual(alixGroup.topic); + expect(notification.idempotency_key).toBeTypeOf("string"); + await expectNoNotification(alix.installationId, 3000); + }); + + test("unregister stops notifications", async () => { + const alix = await randomClient(); + const bo = await randomClient(); + const notifClient = createNotificationClient(); + + // Register alix and subscribe to welcome topic + await notifClient.registerInstallation({ + installationId: alix.installationId, + deliveryMechanism: { + deliveryMechanismType: { value: "token", case: "apnsDeviceToken" }, + }, + }); + const welcomeTopic = `/xmtp/mls/1/w-${alix.installationId}/proto`; + await notifClient.subscribeWithMetadata({ + installationId: alix.installationId, + subscriptions: [{ topic: welcomeTopic, isSilent: true }], + }); + + // Unregister alix + await notifClient.deleteInstallation({ + installationId: alix.installationId, + }); + + // Bo creates group with alix — should NOT trigger notification + await bo.conversations.createGroup([alix.inboxId]); + + // Wait briefly and verify no notification arrived + await expectNoNotification(alix.installationId, 5000); + }); }); diff --git a/integration/src/index.ts b/integration/src/index.ts index 1f0552f7..1951b41d 100644 --- a/integration/src/index.ts +++ b/integration/src/index.ts @@ -3,7 +3,7 @@ import { type Signer, type ClientOptions, IdentifierKind, - createBackend, + LogLevel, } from "@xmtp/node-sdk"; import { createWalletClient, http, toBytes } from "viem"; import { mainnet } from "viem/chains"; @@ -46,11 +46,14 @@ export async function randomClient() { }; const encKey = getRandomValues(new Uint8Array(32)); + console.log(config); const opts: ClientOptions = { apiUrl: config.nodeUrl, env: "local", dbEncryptionKey: encKey, + loggingLevel: LogLevel.Info, dbPath: `/tmp/test-${wallet.account.address}.db3`, + ...(config.gatewayHost ? { gatewayHost: config.gatewayHost } : {}), }; return await Client.create(signer, opts); } @@ -63,6 +66,10 @@ export function createNotificationClient() { return createClient(Notifications, transport); } +export function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + export async function subscribeToTopics( // The installationId we want to apply the subscription to installationId: string, diff --git a/pkg/api/api.go b/pkg/api/api.go index e21b5415..171ab442 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "net" "net/http" "time" @@ -29,6 +30,7 @@ type ApiServer struct { port int listener net.Listener listenerType interfaces.ListenerType + readyCheck func() bool } func NewApiServer(logger *zap.Logger, opts options.ApiOptions, installations interfaces.Installations, subscriptions interfaces.Subscriptions, listenerType interfaces.ListenerType) *ApiServer { @@ -54,10 +56,15 @@ func (s *ApiServer) SetListener(listener net.Listener) error { return nil } +func (s *ApiServer) SetReadyCheck(readyCheck func() bool) { + s.readyCheck = readyCheck +} + func (s *ApiServer) Start() { mux := http.NewServeMux() path, handler := notificationsv1connect.NewNotificationsHandler(s) mux.Handle(path, handler) + mux.HandleFunc("/readyz", s.handleReady) addr := fmt.Sprintf(":%d", s.port) if s.listener != nil { @@ -284,3 +291,14 @@ func convertDeliveryMechanism(mechanism *proto.DeliveryMechanism) *interfaces.De return &interfaces.DeliveryMechanism{Kind: interfaces.FCM, Token: fcmToken} } } + +func (s *ApiServer) handleReady(w http.ResponseWriter, _ *http.Request) { + if s.readyCheck != nil && !s.readyCheck() { + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = io.WriteString(w, "listener not ready") + return + } + + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, "ok") +} diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 4f1cc18d..9e8373c1 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "io" "net" "net/http" "testing" @@ -16,9 +17,9 @@ import ( "github.com/xmtp/example-notification-server-go/mocks" "github.com/xmtp/example-notification-server-go/pkg/interfaces" "github.com/xmtp/example-notification-server-go/pkg/options" - "github.com/xmtp/example-notification-server-go/pkg/testutils" proto "github.com/xmtp/example-notification-server-go/pkg/proto/notifications/v1" protoconnect "github.com/xmtp/example-notification-server-go/pkg/proto/notifications/v1/notificationsv1connect" + "github.com/xmtp/example-notification-server-go/pkg/testutils" topicpkg "github.com/xmtp/xmtpd/pkg/topic" ) @@ -36,7 +37,6 @@ type testContext struct { apiServer *ApiServer } - func matchTopics(expected ...*topicpkg.Topic) interface{} { return mock.MatchedBy(func(actual []*topicpkg.Topic) bool { if len(actual) != len(expected) { @@ -489,7 +489,6 @@ func TestRegisterInstallation_WithPayloadFormatV4_OnV4Listener_Succeeds(t *testi func TestRegisterInstallation_WithUnspecified_DefaultsToV3(t *testing.T) { ctx := setupTest(t) - validUntil := time.Now() ctx.installationsMock.On( "Register", @@ -519,3 +518,34 @@ func TestRegisterInstallation_WithUnspecified_DefaultsToV3(t *testing.T) { require.NoError(t, err) require.Equal(t, INSTALLATION_ID, result.Msg.InstallationId) } + +func Test_Readyz_DefaultsToOk(t *testing.T) { + ctx := setupTest(t) + + resp, err := ctx.httpClient.Get(fmt.Sprintf("http://127.0.0.1:%d/readyz", ctx.apiServer.port)) + require.NoError(t, err) + defer func() { + require.NoError(t, resp.Body.Close()) + }() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, "ok", string(body)) +} + +func Test_Readyz_ReflectsReadyCheck(t *testing.T) { + ctx := setupTest(t) + ctx.apiServer.SetReadyCheck(func() bool { return false }) + + resp, err := ctx.httpClient.Get(fmt.Sprintf("http://127.0.0.1:%d/readyz", ctx.apiServer.port)) + require.NoError(t, err) + defer func() { + require.NoError(t, resp.Body.Close()) + }() + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + require.Equal(t, "listener not ready", string(body)) +} diff --git a/pkg/xmtp/common.go b/pkg/xmtp/common.go index e0f8bc8a..b2a24d3b 100644 --- a/pkg/xmtp/common.go +++ b/pkg/xmtp/common.go @@ -25,6 +25,7 @@ func cappedBackoff(sleepTime time.Duration) time.Duration { type NotificationListener interface { Start() Stop() + Ready() bool } // deliveryDispatcher handles shared delivery logic for both V3 and V4 listeners diff --git a/pkg/xmtp/v3_listener.go b/pkg/xmtp/v3_listener.go index d65b7c99..1e197c23 100644 --- a/pkg/xmtp/v3_listener.go +++ b/pkg/xmtp/v3_listener.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "io" "strings" + "sync/atomic" "time" "github.com/xmtp/example-notification-server-go/pkg/interfaces" @@ -29,6 +30,7 @@ type Listener struct { clientVersion string appVersion string dispatcher deliveryDispatcher + ready atomic.Bool } func NewListener( @@ -74,17 +76,32 @@ func (l *Listener) Start() { } func (l *Listener) Stop() { + l.ready.Store(false) l.cancelFunc() } +func (l *Listener) Ready() bool { + return l.ready.Load() +} + func (l *Listener) startMessageListener() { l.logger.Info("starting message listener") - var stream v1.MessageApi_SubscribeAllClient - var err error + defer close(l.messageChannel) + sleepTime := STARTING_SLEEP_TIME for { - stream, err = l.xmtpClient.SubscribeAll(l.ctx, &v1.SubscribeAllRequest{}) + select { + case <-l.ctx.Done(): + return + default: + } + + stream, err := l.xmtpClient.SubscribeAll(l.ctx, &v1.SubscribeAllRequest{}) if err != nil { + if l.ctx.Err() != nil { + return + } + l.logger.Error("error connecting to stream", zap.Error(err)) time.Sleep(sleepTime) sleepTime = cappedBackoff(sleepTime) @@ -93,35 +110,43 @@ func (l *Listener) startMessageListener() { } continue } - streamLoop: - for { - select { - case <-l.ctx.Done(): - close(l.messageChannel) - return - default: - msg, err := stream.Recv() - if err == io.EOF { - l.logger.Info("stream closed") - break streamLoop - } - if err != nil { - l.logger.Warn("error reading from stream", zap.Error(err)) - // Wait 100ms to avoid hammering the API and getting rate limited - time.Sleep(sleepTime) - sleepTime = cappedBackoff(sleepTime) - if err = l.refreshClient(); err != nil { - l.logger.Error("error refreshing client", zap.Error(err)) - } - break streamLoop - } + l.ready.Store(true) + if l.consumeMessageStream(stream, &sleepTime) { + return + } + } +} + +func (l *Listener) consumeMessageStream(stream v1.MessageApi_SubscribeAllClient, sleepTime *time.Duration) bool { + defer l.ready.Store(false) - if msg != nil { - // Reset the sleep time on first successful message - sleepTime = STARTING_SLEEP_TIME - l.messageChannel <- msg + for { + select { + case <-l.ctx.Done(): + return true + default: + msg, err := stream.Recv() + if err == io.EOF { + l.logger.Info("stream closed") + return false + } + + if err != nil { + l.logger.Warn("error reading from stream", zap.Error(err)) + // Wait 100ms to avoid hammering the API and getting rate limited + time.Sleep(*sleepTime) + *sleepTime = cappedBackoff(*sleepTime) + if err = l.refreshClient(); err != nil { + l.logger.Error("error refreshing client", zap.Error(err)) } + return false + } + + if msg != nil { + // Reset the sleep time on first successful message + *sleepTime = STARTING_SLEEP_TIME + l.messageChannel <- msg } } } diff --git a/pkg/xmtp/v3_listener_test.go b/pkg/xmtp/v3_listener_test.go index e9d2af12..d6e13763 100644 --- a/pkg/xmtp/v3_listener_test.go +++ b/pkg/xmtp/v3_listener_test.go @@ -14,13 +14,14 @@ import ( "github.com/xmtp/example-notification-server-go/pkg/interfaces" "github.com/xmtp/example-notification-server-go/pkg/options" "github.com/xmtp/example-notification-server-go/pkg/subscriptions" - topics "github.com/xmtp/example-notification-server-go/pkg/topics" "github.com/xmtp/example-notification-server-go/pkg/testutils" + topics "github.com/xmtp/example-notification-server-go/pkg/topics" v1 "github.com/xmtp/xmtpd/pkg/proto/message_api/v1" + "google.golang.org/grpc" ) const ( - XMTP_ADDRESS = "localhost:25556" + XMTP_ADDRESS = "localhost:5556" INSTALLATION_ID = "test_installation" INSTALLATION_ID_2 = "test_installation_2" TEST_TOPIC = "/xmtp/mls/1/w-abcdef0123456789/proto" @@ -130,3 +131,70 @@ func Test_MultipleDeliveries(t *testing.T) { require.Equal(t, TEST_TOPIC, sendReqs[0].Topic) require.Equal(t, TEST_TOPIC, sendReqs[1].Topic) } + +type subscribeAllOnlyMessageAPIClient struct { + subscribeAll func(context.Context, *v1.SubscribeAllRequest, ...grpc.CallOption) (grpc.ServerStreamingClient[v1.Envelope], error) +} + +func (c *subscribeAllOnlyMessageAPIClient) Publish(context.Context, *v1.PublishRequest, ...grpc.CallOption) (*v1.PublishResponse, error) { + panic("unexpected Publish call") +} + +func (c *subscribeAllOnlyMessageAPIClient) Subscribe(context.Context, *v1.SubscribeRequest, ...grpc.CallOption) (grpc.ServerStreamingClient[v1.Envelope], error) { + panic("unexpected Subscribe call") +} + +func (c *subscribeAllOnlyMessageAPIClient) Subscribe2(context.Context, ...grpc.CallOption) (grpc.BidiStreamingClient[v1.SubscribeRequest, v1.Envelope], error) { + panic("unexpected Subscribe2 call") +} + +func (c *subscribeAllOnlyMessageAPIClient) SubscribeAll(ctx context.Context, req *v1.SubscribeAllRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[v1.Envelope], error) { + return c.subscribeAll(ctx, req, opts...) +} + +func (c *subscribeAllOnlyMessageAPIClient) Query(context.Context, *v1.QueryRequest, ...grpc.CallOption) (*v1.QueryResponse, error) { + panic("unexpected Query call") +} + +func (c *subscribeAllOnlyMessageAPIClient) BatchQuery(context.Context, *v1.BatchQueryRequest, ...grpc.CallOption) (*v1.BatchQueryResponse, error) { + panic("unexpected BatchQuery call") +} + +func Test_StartMessageListenerStopsOnCanceledSubscribe(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + listener := &Listener{ + ctx: ctx, + cancelFunc: cancel, + logger: testutils.TestLogger(t), + messageChannel: make(chan *v1.Envelope), + xmtpClient: &subscribeAllOnlyMessageAPIClient{ + subscribeAll: func(ctx context.Context, _ *v1.SubscribeAllRequest, _ ...grpc.CallOption) (grpc.ServerStreamingClient[v1.Envelope], error) { + <-ctx.Done() + return nil, ctx.Err() + }, + }, + } + + done := make(chan struct{}) + go func() { + listener.startMessageListener() + close(done) + }() + + cancel() + + select { + case <-done: + case <-time.After(500 * time.Millisecond): + t.Fatal("startMessageListener did not exit after context cancellation") + } + + select { + case _, ok := <-listener.messageChannel: + require.False(t, ok, "messageChannel should be closed when listener exits") + case <-time.After(100 * time.Millisecond): + t.Fatal("messageChannel was not closed after listener exit") + } +} diff --git a/pkg/xmtp/v4_listener.go b/pkg/xmtp/v4_listener.go index 096490be..fa7778d7 100644 --- a/pkg/xmtp/v4_listener.go +++ b/pkg/xmtp/v4_listener.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "time" "github.com/xmtp/example-notification-server-go/pkg/interfaces" @@ -33,6 +34,7 @@ type V4Listener struct { subscriptions interfaces.Subscriptions clientVersion string appVersion string + ready atomic.Bool } func NewV4Listener( @@ -79,6 +81,7 @@ func (l *V4Listener) Start() { } func (l *V4Listener) Stop() { + l.ready.Store(false) l.cancelFunc() l.connMu.Lock() defer l.connMu.Unlock() @@ -88,6 +91,10 @@ func (l *V4Listener) Stop() { } } +func (l *V4Listener) Ready() bool { + return l.ready.Load() +} + func (l *V4Listener) startEnvelopeListener() { l.logger.Info("starting V4 envelope listener") sleepTime := STARTING_SLEEP_TIME @@ -109,34 +116,43 @@ func (l *V4Listener) startEnvelopeListener() { } continue } - streamLoop: - for { - select { - case <-l.ctx.Done(): - close(l.envelopeChannel) - return - default: - resp, err := stream.Recv() - if err == io.EOF { - l.logger.Info("V4 stream closed") - break streamLoop - } - if err != nil { - l.logger.Error("error reading from V4 stream", zap.Error(err)) - time.Sleep(sleepTime) - sleepTime = cappedBackoff(sleepTime) - if err = l.refreshV4Client(); err != nil { - l.logger.Error("error refreshing V4 client", zap.Error(err)) - } - break streamLoop + l.ready.Store(true) + if l.consumeEnvelopeStream(stream, &sleepTime) { + return + } + } +} + +func (l *V4Listener) consumeEnvelopeStream(stream notificationApi.NotificationApi_SubscribeAllEnvelopesClient, sleepTime *time.Duration) bool { + defer l.ready.Store(false) + + for { + select { + case <-l.ctx.Done(): + close(l.envelopeChannel) + return true + default: + resp, err := stream.Recv() + if err == io.EOF { + l.logger.Info("V4 stream closed") + return false + } + + if err != nil { + l.logger.Error("error reading from V4 stream", zap.Error(err)) + time.Sleep(*sleepTime) + *sleepTime = cappedBackoff(*sleepTime) + if err = l.refreshV4Client(); err != nil { + l.logger.Error("error refreshing V4 client", zap.Error(err)) } + return false + } - if resp != nil { - sleepTime = STARTING_SLEEP_TIME - for _, env := range resp.GetEnvelopes() { - l.envelopeChannel <- env - } + if resp != nil { + *sleepTime = STARTING_SLEEP_TIME + for _, env := range resp.GetEnvelopes() { + l.envelopeChannel <- env } } } @@ -222,7 +238,7 @@ func (l *V4Listener) processOriginatorEnvelope(env *envelopesProto.OriginatorEnv ) continue } - + logger.Info("delivering notification") if err = l.dispatcher.deliver(req); err != nil { logger.Error("error delivering V4 request", zap.Error(err)) } diff --git a/pkg/xmtp/v4_listener_test.go b/pkg/xmtp/v4_listener_test.go index 9fd9f01c..ee1433fe 100644 --- a/pkg/xmtp/v4_listener_test.go +++ b/pkg/xmtp/v4_listener_test.go @@ -35,7 +35,7 @@ func TestV4Listener_NewAndStop(t *testing.T) { l, err := NewV4Listener(ctx, logger, options.XmtpOptions{ ListenerEnabled: true, - GrpcAddress: "localhost:25556", + GrpcAddress: "localhost:5556", NumWorkers: 5, }, instSvc, subsSvc, []interfaces.Delivery{mockDelivery}, "test", "test") require.NoError(t, err) diff --git a/xnet.toml b/xnet.toml new file mode 100644 index 00000000..2663db33 --- /dev/null +++ b/xnet.toml @@ -0,0 +1,24 @@ +[xnet] +use_standard_ports = true + +[xmtpd] +image = "ghcr.io/xmtp/xmtpd" +version = "04-01-implement_api_schema_changes" + +[gateway] +image = "ghcr.io/xmtp/xmtpd-gateway" +version = "04-01-implement_api_schema_changes" + +[v3] +image = "ghcr.io/xmtp/node-go" +version = "main" + +[validation] +image = "ghcr.io/xmtp/mls-validation-service" +version = "main" + +[contracts] +version = "v2026.02.10-1" + +# Nodes are added manually via: xnet-cli node add --migrator +# This is done in dev/up after `xnet-cli up --paused`