Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions js/src/logger-misc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
initDataset,
initExperiment,
initLogger,
login,
NOOP_SPAN,
permalink,
BraintrustState,
Expand All @@ -27,6 +28,55 @@ configureNode();
const { extractAttachments, deepCopyEvent, validateTags } =
_exportsForTestingOnly;

const TEST_API_KEY = "___TEST_API_KEY__THIS_IS_NOT_REAL___";

type HttpLogger = ReturnType<BraintrustState["httpLogger"]>;

function getOnFlushError(bgLogger: HttpLogger) {
return (
bgLogger as unknown as {
onFlushError?: (error: unknown) => void;
}
).onFlushError;
}

function mockFlushFailure(state: BraintrustState, error: Error): HttpLogger {
vi.spyOn(console, "warn").mockImplementation(() => {});
vi.spyOn(state.apiConn(), "get_json").mockResolvedValue({
logs3_payload_max_bytes: null,
});

const bgLogger = state.httpLogger();
bgLogger.numTries = 1;
vi.spyOn(
bgLogger as unknown as {
submitLogsRequest: () => Promise<void>;
},
"submitLogsRequest",
).mockRejectedValue(error);
return bgLogger;
}

function mockExperimentRegister(state: BraintrustState) {
vi.spyOn(state.appConn(), "post_json").mockImplementation(async (path) => {
if (path === "api/experiment/register") {
return {
project: {
id: "test-project-id",
name: "test-project",
},
experiment: {
id: "test-experiment-id",
name: "test-experiment",
created: "2024-01-01T00:00:00.000Z",
},
};
}

throw new Error(`Unexpected app connection request: ${path}`);
});
}

describe("validateTags", () => {
test("accepts valid tags", () => {
expect(() => validateTags(["foo", "bar", "baz"])).not.toThrow();
Expand Down Expand Up @@ -562,3 +612,86 @@ test("disable logging", async () => {
expect(submitLogsRequestSpy).toHaveBeenCalledTimes(1);
expect(submittedItems.length).toEqual(0);
});

describe("onFlushError wiring", () => {
beforeEach(() => {
vi.restoreAllMocks();
_exportsForTestingOnly.simulateLogoutForTests();
});

afterEach(() => {
vi.restoreAllMocks();
_exportsForTestingOnly.simulateLogoutForTests();
});

test("initLogger updates an existing background logger callback", async () => {
const state = new BraintrustState({});
await state.login({
apiKey: TEST_API_KEY,
appUrl: "https://braintrust.dev",
});
const flushError = new Error("initLogger flush failure");
const onFlushError = vi.fn();

mockFlushFailure(state, flushError);

const logger = initLogger({
projectName: "test-project",
projectId: "test-project-id",
onFlushError,
state,
});

const span = logger.startSpan({ name: "test-span" });
span.end();
await logger.flush();

expect(onFlushError).toHaveBeenCalledTimes(1);
expect(onFlushError).toHaveBeenCalledWith(flushError);
});

test("login fast path updates an existing background logger callback", async () => {
const state = await _exportsForTestingOnly.simulateLoginForTests();
const onFlushError = vi.fn();
const bgLogger = state.httpLogger();

await login({
apiKey: state.loginToken!,
appUrl: state.appUrl!,
orgName: state.orgName!,
onFlushError,
});

expect(getOnFlushError(bgLogger)).toBe(onFlushError);
expect(getOnFlushError(state.httpLogger())).toBe(onFlushError);
});

test("initExperiment updates an existing background logger callback", async () => {
const state = new BraintrustState({});
await state.login({
apiKey: TEST_API_KEY,
appUrl: "https://braintrust.dev",
});
const flushError = new Error("initExperiment flush failure");
const onFlushError = vi.fn();

mockFlushFailure(state, flushError);
mockExperimentRegister(state);

const experiment = initExperiment({
project: "test-project",
projectId: "test-project-id",
experiment: "test-experiment",
onFlushError,
repoInfo: {},
state,
});

const span = experiment.startSpan({ name: "test-span" });
span.end();
await experiment.flush();

expect(onFlushError).toHaveBeenCalledTimes(1);
expect(onFlushError).toHaveBeenCalledWith(flushError);
});
});
41 changes: 39 additions & 2 deletions js/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,19 @@ export class BraintrustState {
this.bgLogger().setMaskingFunction(maskingFunction);
}

public setOnFlushError(onFlushError?: (error: unknown) => void): void {
if (onFlushError === undefined) {
return;
}

this.loginParams.onFlushError = onFlushError;
if (this._bgLogger.hasSucceeded) {
this._bgLogger.get().setOnFlushError(onFlushError);
}
}

public async login(loginParams: LoginOptions & { forceLogin?: boolean }) {
this.setOnFlushError(loginParams.onFlushError);
if (this.apiUrl && !loginParams.forceLogin) {
return;
}
Expand Down Expand Up @@ -2759,6 +2771,14 @@ class HTTPBackgroundLogger implements BackgroundLogger {
this.maskingFunction = maskingFunction;
}

setOnFlushError(onFlushError?: (error: unknown) => void): void {
if (onFlushError === undefined) {
return;
}

this.onFlushError = onFlushError;
}

pendingFlushBytes(): number {
return this._pendingBytes;
}
Expand Down Expand Up @@ -3399,6 +3419,7 @@ export function init<IsOpen extends boolean = false>(

const state = stateArg ?? _globalState;

state.setOnFlushError(options.onFlushError);
// Ensure unlimited queue for init() calls (experiments)
// Experiments should never drop data
state.enforceQueueSizeLimit(false);
Expand Down Expand Up @@ -3917,6 +3938,7 @@ export function initLogger<IsAsyncFlush extends boolean = true>(

const state = stateArg ?? _globalState;

state.setOnFlushError(options.onFlushError);
// Enable queue size limit enforcement for initLogger() calls
// This ensures production observability doesn't OOM customer processes
state.enforceQueueSizeLimit(true);
Expand Down Expand Up @@ -4447,6 +4469,7 @@ export async function login(
state.loginToken,
);
checkUpdatedParam("orgName", options.orgName, state.orgName);
state.setOnFlushError(options.onFlushError);
return state;
}

Expand Down Expand Up @@ -5518,15 +5541,28 @@ export class ObjectFetcher<RecordType> implements AsyncIterable<
): AsyncGenerator<WithTransactionId<RecordType>> {
const state = await this.getState();
const objectId = await this.id;
const limit = batchSize ?? DEFAULT_FETCH_BATCH_SIZE;
const batchLimit = batchSize ?? DEFAULT_FETCH_BATCH_SIZE;
const internalLimit = (
this._internal_btql as { limit?: number } | undefined
)?.limit;
const limit =
batchSize !== undefined ? batchSize : (internalLimit ?? batchLimit);
const internalBtqlWithoutReservedQueryKeys = Object.fromEntries(
Object.entries(this._internal_btql ?? {}).filter(
([key]) =>
key !== "cursor" &&
key !== "limit" &&
key !== "select" &&
key !== "from",
),
);
let cursor = undefined;
let iterations = 0;
while (true) {
const resp = await state.apiConn().post(
`btql`,
{
query: {
...this._internal_btql,
select: [
{
op: "star",
Expand All @@ -5547,6 +5583,7 @@ export class ObjectFetcher<RecordType> implements AsyncIterable<
},
cursor,
limit,
...internalBtqlWithoutReservedQueryKeys,
},
use_columnstore: false,
brainstore_realtime: true,
Expand Down
Loading
Loading