diff --git a/README.md b/README.md index 13690df4..c24cd50d 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,8 @@ They implement the following public methods: * `policyConfig` - SQS only - configuration for queue access policies (see [SQS Policy Configuration](#sqs-policy-configuration) for more information); * `deletionConfig` - automatic cleanup of resources; * `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information); - * `logMessages` - add logs for processed messages. + * `logMessages` - add debug logs for processed messages. When enabled, logs structured metadata including message id, type, timestamps, and queue name. For privacy reasons, the full message payload is not logged by default. See [Message Logging](#message-logging) for more details. + * `messageMetadataField` - which field in the message contains metadata for logging purposes (by default it is `metadata`). * `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading). * `messageDeduplicationConfig` - configuration for store-based message deduplication on publisher level. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication). * `enablePublisherDeduplication` - enable store-based publisher-level deduplication. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication). @@ -110,7 +111,8 @@ Multi-schema consumers support multiple message types via handler configs. They * `consumerOverrides` – available only for SQS consumers; * `deadLetterQueue` - available only for SQS and SNS consumers (see [Dead Letter Queue](#dead-letter-queue) for more information); * `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information); - * `logMessages` - add logs for processed messages. + * `logMessages` - add debug logs for processed messages. When enabled, logs structured metadata including message id, type, timestamps, and queue name. For privacy reasons, the full message payload is not logged by default. To include custom message data in logs, configure `messageLogFormatter` on your handlers. See [Message Logging](#message-logging) for more details. + * `messageMetadataField` - which field in the message contains metadata for logging purposes (by default it is `metadata`). * `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading). * `concurrentConsumersAmount` - configuration for specifying the number of concurrent consumers to create. Available only for SQS and SNS consumers * `messageDeduplicationConfig` - configuration for store-based message deduplication on consumer level. For more details on setting this up, see [Consumer-level store-based-message deduplication](#consumer-level-store-based-message-deduplication). @@ -173,7 +175,9 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< preHandlerBarrier: async (message) => { // do barrier check here return true - } + }, + // Optional: customize what message data is logged (see Message Logging section) + messageLogFormatter: (message) => ({ id: message.id, type: message.type }), }, ) .addConfig(PERMISSIONS_REMOVE_MESSAGE_SCHEMA, @@ -642,6 +646,56 @@ const result = await myConsumer.handlerSpy.waitForMessageWithId('1') expect(result.processingResult).toEqual({ status: 'consumed' }) ``` +## Message Logging + +When `logMessages` is enabled, processed messages are logged at the `debug` level with structured metadata. For privacy reasons, the full message payload is **not logged by default** to avoid exposing sensitive data. + +### What is logged by default + +Each log entry includes processed message metadata: +- `messageId` - unique identifier of the message +- `messageType` - type of the message +- `queueName` - name of the queue or topic +- `messageTimestamp` - when the message was originally sent +- `messageProcessingStartTimestamp` - when processing started +- `messageProcessingEndTimestamp` - when processing completed +- `messageDeduplicationId` - deduplication id (if deduplication is enabled) +- `messageMetadata` - contents of the metadata field (configurable via `messageMetadataField`) +- `processingResult` - outcome of processing (e.g., `{ status: 'consumed' }` or `{ status: 'published' }`) + +### Custom message logging with messageLogFormatter + +If you need to include additional message data in logs, you can configure a `messageLogFormatter` on your handler. This formatter receives the message and returns the data to be logged: + +```typescript +new MessageHandlerConfigBuilder() + .addConfig( + MY_MESSAGE_SCHEMA, + async (message, context) => { + // handler logic + return { result: 'success' } + }, + { + // Only log specific fields, excluding sensitive data + messageLogFormatter: (message) => ({ + id: message.id, + type: message.type, + // Exclude sensitive fields like email, password, etc. + }), + }, + ) + .build() +``` + +When a `messageLogFormatter` is provided, its output is included in the log under the `message` key alongside the processed message metadata. + +### Configuration options + +| Option | Default | Description | +|--------|---------|-------------| +| `logMessages` | `false` | Enable debug logging for processed messages | +| `messageMetadataField` | `'metadata'` | Field in the message containing metadata to include in logs | + ## Payload Offloading Payload offloading allows you to manage large message payloads by storing them in external storage, bypassing any message size restrictions imposed by queue systems. @@ -773,6 +827,7 @@ It needs to implement the following methods: - `messageProcessingStartTimestamp` - the timestamp when the processing of the message started - `messageProcessingEndTimestamp` - the timestamp when the processing of the message finished - `messageDeduplicationId` - the deduplication id of the message, in case deduplication is enabled + - `messageMetadata` - contents of the message metadata field (configurable via `messageMetadataField`) See [@message-queue-toolkit/metrics](packages/metrics/README.md) for concrete implementations diff --git a/packages/amqp/lib/AbstractAmqpConsumer.ts b/packages/amqp/lib/AbstractAmqpConsumer.ts index 43192b26..1cbb8d4f 100644 --- a/packages/amqp/lib/AbstractAmqpConsumer.ts +++ b/packages/amqp/lib/AbstractAmqpConsumer.ts @@ -6,13 +6,13 @@ import type { ParseMessageResult, PreHandlingOutputs, Prehandler, + ProcessedMessageMetadata, QueueConsumer, QueueConsumerOptions, TransactionObservabilityManager, } from '@message-queue-toolkit/core' import { HandlerContainer, isMessageError, parseMessage } from '@message-queue-toolkit/core' import type { ChannelModel, Message } from 'amqplib' - import type { AMQPConsumerDependencies, AMQPQueueCreationConfig, @@ -157,10 +157,6 @@ export abstract class AbstractAmqpConsumer< // @ts-expect-error const uniqueTransactionKey = parsedMessage[this.messageIdField] this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey) - if (this.logMessages) { - const resolvedLogMessage = this.resolveMessageLog(parsedMessage, messageType) - this.logMessage(resolvedLogMessage) - } this.internalProcessMessage(parsedMessage, messageType) .then((result) => { if (result.result === 'success') { @@ -267,9 +263,17 @@ export abstract class AbstractAmqpConsumer< return this._messageSchemaContainer.resolveSchema(message) } - protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown { - const handler = this.handlerContainer.resolveHandler(messageType) - return handler.messageLogFormatter(message) + protected override resolveMessageLog( + processedMessageMetadata: ProcessedMessageMetadata, + ): unknown | null { + if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) { + return null + } + const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType) + if (!handler.messageLogFormatter) { + return null + } + return handler.messageLogFormatter(processedMessageMetadata.message) } // eslint-disable-next-line max-params diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index 6366d5ae..e9b7aefd 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -88,12 +88,6 @@ export abstract class AbstractAmqpPublisher< return } - if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' - const resolvedLogMessage = this.resolveMessageLog(message, messageType) - this.logMessage(resolvedLogMessage) - } - message = this.updateInternalProperties(message) try { diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts index a15a2056..0ee623cc 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts @@ -95,22 +95,21 @@ describe('AmqpPermissionConsumer', () => { await newConsumer.close() - expect(logger.loggedMessages.length).toBe(6) + expect(logger.loggedMessages.length).toBe(4) expect(logger.loggedMessages).toMatchObject([ 'Propagating new connection across 0 receivers', - { - id: '1', - messageType: 'add', - }, 'timestamp not defined, adding it automatically', - expect.any(Object), { - id: '1', - messageType: 'add', - timestamp: expect.any(String), + processedMessageMetadata: expect.objectContaining({ + processingResult: { status: 'published' }, + }), }, { - processedMessageMetadata: expect.any(String), + processedMessageMetadata: expect.objectContaining({ + messageId: '1', + messageType: 'add', + processingResult: { status: 'consumed' }, + }), }, ]) }) @@ -164,6 +163,7 @@ describe('AmqpPermissionConsumer', () => { id: '1', messageType: 'add', }), + messageMetadata: undefined, }, ]) }) diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index a7db562f..3822aa82 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -47,13 +47,20 @@ describe('PermissionPublisher', () => { publisher.publish(message) await waitAndRetry(() => { - return logger.loggedMessages.length === 2 + return logger.loggedMessages.length === 3 }) - expect(logger.loggedMessages[1]).toEqual({ - id: '1', - messageType: 'add', - }) + expect(logger.loggedMessages).toMatchObject([ + 'Propagating new connection across 0 receivers', + 'timestamp not defined, adding it automatically', + { + processedMessageMetadata: expect.objectContaining({ + messageId: '1', + messageType: 'add', + processingResult: { status: 'published' }, + }), + }, + ]) }) }) diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index 4a9e6a39..2da2573e 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -114,6 +114,10 @@ export abstract class AbstractQueueService< * Used to know the store-based message deduplication options */ protected readonly messageDeduplicationOptionsField: string + /** + * Used to know where metadata is stored - for debug logging purposes only + */ + protected readonly messageMetadataField: string protected readonly errorReporter: ErrorReporter public readonly logger: CommonLogger protected readonly messageIdField: string @@ -157,6 +161,7 @@ export abstract class AbstractQueueService< this.messageDeduplicationIdField = options.messageDeduplicationIdField ?? 'deduplicationId' this.messageDeduplicationOptionsField = options.messageDeduplicationOptionsField ?? 'deduplicationOptions' + this.messageMetadataField = options.messageMetadataField ?? 'metadata' this.creationConfig = options.creationConfig this.locatorConfig = options.locatorConfig this.deletionConfig = options.deletionConfig @@ -239,15 +244,36 @@ export abstract class AbstractQueueService< /** * Format message for logging */ - protected resolveMessageLog(message: MessagePayloadSchemas, _messageType: string): unknown { - return message + protected resolveMessageLog( + _processedMessageMetadata: ProcessedMessageMetadata, + ): unknown | null { + return null } - /** - * Log preformatted and potentially presanitized message payload - */ - protected logMessage(messageLogEntry: unknown) { - this.logger.debug(messageLogEntry) + protected logMessageProcessed( + processedMessageMetadata: ProcessedMessageMetadata, + ) { + const processedMessageMetadataLog = { + processingResult: processedMessageMetadata.processingResult, + messageId: processedMessageMetadata.messageId, + messageType: processedMessageMetadata.messageType, + queueName: processedMessageMetadata.queueName, + messageTimestamp: processedMessageMetadata.messageTimestamp, + messageDeduplicationId: processedMessageMetadata.messageDeduplicationId, + messageProcessingStartTimestamp: processedMessageMetadata.messageProcessingStartTimestamp, + messageProcessingEndTimestamp: processedMessageMetadata.messageProcessingEndTimestamp, + messageMetadata: stringValueSerializer(processedMessageMetadata.messageMetadata), + } + + const resolvedMessageLog = this.resolveMessageLog(processedMessageMetadata) + + this.logger.debug( + { + processedMessageMetadata: processedMessageMetadataLog, + ...(resolvedMessageLog ? { message: resolvedMessageLog } : {}), + }, + `Finished processing message ${processedMessageMetadata.messageId}`, + ) } protected handleError(err: unknown, context?: Record) { @@ -284,8 +310,8 @@ export abstract class AbstractQueueService< messageType, ) - const debugLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug') - if (!debugLoggingEnabled && !this.messageMetricsManager) return + const debugMessageLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug') + if (!debugMessageLoggingEnabled && !this.messageMetricsManager) return const processedMessageMetadata = this.resolveProcessedMessageMetadata( message, @@ -295,11 +321,8 @@ export abstract class AbstractQueueService< params.queueName, messageId, ) - if (debugLoggingEnabled) { - this.logger.debug( - { processedMessageMetadata: stringValueSerializer(processedMessageMetadata) }, - `Finished processing message ${processedMessageMetadata.messageId}`, - ) + if (debugMessageLoggingEnabled) { + this.logMessageProcessed(processedMessageMetadata) } if (this.messageMetricsManager) { this.messageMetricsManager.registerProcessedMessage(processedMessageMetadata) @@ -321,8 +344,13 @@ export abstract class AbstractQueueService< const messageType = message ? this.resolveMessageTypeFromMessage(message) : undefined const messageDeduplicationId = message && this.messageDeduplicationIdField in message - ? // @ts-ignore - message[this.messageDeduplicationId] + ? // @ts-expect-error + message[this.messageDeduplicationIdField] + : undefined + const messageMetadata = + message && this.messageMetadataField in message + ? // @ts-expect-error + message[this.messageMetadataField] : undefined return { @@ -335,6 +363,7 @@ export abstract class AbstractQueueService< messageDeduplicationId, messageProcessingStartTimestamp, messageProcessingEndTimestamp, + messageMetadata, } } diff --git a/packages/core/lib/queues/HandlerContainer.ts b/packages/core/lib/queues/HandlerContainer.ts index cbfb8c0a..9beeefc4 100644 --- a/packages/core/lib/queues/HandlerContainer.ts +++ b/packages/core/lib/queues/HandlerContainer.ts @@ -55,8 +55,6 @@ export type Prehandler void, ) => void -export const defaultLogFormatter = (message: MessagePayloadSchema) => message - export type HandlerConfigOptions< MessagePayloadSchema extends object, ExecutionContext, @@ -98,7 +96,7 @@ export class MessageHandlerConfig< PrehandlerOutput, BarrierOutput > - public readonly messageLogFormatter: LogFormatter + public readonly messageLogFormatter?: LogFormatter public readonly preHandlerBarrier?: BarrierCallback< MessagePayloadSchema, ExecutionContext, @@ -126,7 +124,7 @@ export class MessageHandlerConfig< this.definition = eventDefinition this.messageType = options?.messageType this.handler = handler - this.messageLogFormatter = options?.messageLogFormatter ?? defaultLogFormatter + this.messageLogFormatter = options?.messageLogFormatter this.preHandlerBarrier = options?.preHandlerBarrier this.preHandlers = options?.preHandlers ?? [] } diff --git a/packages/core/lib/types/queueOptionsTypes.ts b/packages/core/lib/types/queueOptionsTypes.ts index 10507448..94a3cf37 100644 --- a/packages/core/lib/types/queueOptionsTypes.ts +++ b/packages/core/lib/types/queueOptionsTypes.ts @@ -62,6 +62,11 @@ export type ProcessedMessageMetadata } export interface MessageMetricsManager { @@ -113,6 +118,7 @@ export type CommonQueueOptions = { messageTimestampField?: string messageDeduplicationIdField?: string messageDeduplicationOptionsField?: string + messageMetadataField?: string handlerSpy?: HandlerSpy | HandlerSpyParams | boolean logMessages?: boolean deletionConfig?: DeletionConfig diff --git a/packages/core/test/queues/HandlerContainer.spec.ts b/packages/core/test/queues/HandlerContainer.spec.ts index fad743cf..a20c2a9e 100644 --- a/packages/core/test/queues/HandlerContainer.spec.ts +++ b/packages/core/test/queues/HandlerContainer.spec.ts @@ -106,20 +106,6 @@ describe('MessageHandlerConfigBuilder', () => { expect(config.messageLogFormatter).toBe(messageLogFormatter) expect(config.preHandlers).toEqual([]) }) - - it('should use default log formatter when not provided', () => { - const handler = () => Promise.resolve({ result: 'success' as const }) - - const config = new MessageHandlerConfig(USER_MESSAGE_SCHEMA, handler) - - const testMessage: UserMessage = { - type: 'user.created', - userId: '123', - email: 'test@example.com', - } - - expect(config.messageLogFormatter(testMessage)).toEqual(testMessage) - }) }) }) diff --git a/packages/core/vitest.config.ts b/packages/core/vitest.config.ts index 7ed17ad9..a1ab6c53 100644 --- a/packages/core/vitest.config.ts +++ b/packages/core/vitest.config.ts @@ -18,7 +18,7 @@ export default defineConfig({ thresholds: { lines: 42, functions: 51, - branches: 48, + branches: 47, statements: 42, }, }, diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index 68fff02f..5be4fff8 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -2,6 +2,7 @@ import { type Either, type ErrorResolver, isError } from '@lokalise/node-core' import type { MessageInvalidFormatError, MessageValidationError, + ProcessedMessageMetadata, ResolvedMessage, } from '@message-queue-toolkit/core' import { @@ -861,9 +862,17 @@ export abstract class AbstractPubSubConsumer< ) } - protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown { - const handler = this.handlerContainer.resolveHandler(messageType) - return handler.messageLogFormatter(message) + protected override resolveMessageLog( + processedMessageMetadata: ProcessedMessageMetadata, + ): unknown | null { + if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) { + return null + } + const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType) + if (!handler.messageLogFormatter) { + return null + } + return handler.messageLogFormatter(processedMessageMetadata.message) } protected override isDeduplicationEnabledForMessage(message: MessagePayloadType): boolean { diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts index d9484b43..5927afa0 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts @@ -68,12 +68,6 @@ export abstract class AbstractPubSubPublisher const messageProcessingStartTimestamp = Date.now() const parsedMessage = messageSchemaResult.result.parse(message) - if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' - const resolvedLogMessage = this.resolveMessageLog(message, messageType) - this.logMessage(resolvedLogMessage) - } - message = this.updateInternalProperties(message) const maybeOffloadedPayloadMessage = await this.offloadMessagePayloadIfNeeded(message, () => { // Calculate message size for PubSub @@ -163,8 +157,4 @@ export abstract class AbstractPubSubPublisher protected override resolveSchema(message: MessagePayloadType) { return this.messageSchemaContainer.resolveSchema(message) } - - protected override resolveMessageLog(message: MessagePayloadType, _messageType: string): unknown { - return message - } } diff --git a/packages/sns/lib/sns/AbstractSnsPublisher.ts b/packages/sns/lib/sns/AbstractSnsPublisher.ts index cc32d0e3..f6c14ad4 100644 --- a/packages/sns/lib/sns/AbstractSnsPublisher.ts +++ b/packages/sns/lib/sns/AbstractSnsPublisher.ts @@ -125,12 +125,6 @@ export abstract class AbstractSnsPublisher const topicName = this.locatorConfig?.topicName ?? this.creationConfig?.topic?.Name ?? 'unknown' - if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' - const resolvedLogMessage = this.resolveMessageLog(message, messageType) - this.logMessage(resolvedLogMessage) - } - const updatedMessage = this.updateInternalProperties(message) // Resolve FIFO options from original message BEFORE offloading diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index 0437a445..81a5c1b3 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -5,6 +5,7 @@ import { SetQueueAttributesCommand, } from '@aws-sdk/client-sqs' import type { Either, ErrorResolver } from '@lokalise/node-core' +import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' import { type BarrierResult, type DeadLetterQueueOptions, @@ -25,7 +26,6 @@ import { import type { ConsumerOptions } from 'sqs-consumer' import { Consumer } from 'sqs-consumer' import type { ZodSchema } from 'zod/v4' - import type { SQSMessage } from '../types/MessageTypes.ts' import { hasOffloadedPayload } from '../utils/messageUtils.ts' import { deleteSqs, initSqs } from '../utils/sqsInitter.ts' @@ -401,10 +401,7 @@ export abstract class AbstractSqsConsumer< // @ts-expect-error const uniqueTransactionKey = parsedMessage[this.messageIdField] this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey) - if (this.logMessages) { - const resolvedLogMessage = this.resolveMessageLog(parsedMessage, messageType) - this.logMessage(resolvedLogMessage) - } + const result: Either<'retryLater' | Error, 'success'> = await this.internalProcessMessage( parsedMessage, messageType, @@ -852,9 +849,17 @@ export abstract class AbstractSqsConsumer< ) } - protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown { - const handler = this.handlerContainer.resolveHandler(messageType) - return handler.messageLogFormatter(message) + protected override resolveMessageLog( + processedMessageMetadata: ProcessedMessageMetadata, + ): unknown | null { + if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) { + return null + } + const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType) + if (!handler.messageLogFormatter) { + return null + } + return handler.messageLogFormatter(processedMessageMetadata.message) } protected override resolveMessage(message: SQSMessage) { diff --git a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts index 620d821c..f4ed02d4 100644 --- a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts +++ b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts @@ -117,12 +117,6 @@ export abstract class AbstractSqsPublisher const messageProcessingStartTimestamp = Date.now() const parsedMessage = messageSchemaResult.result.parse(message) - if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' - const resolvedLogMessage = this.resolveMessageLog(message, messageType) - this.logMessage(resolvedLogMessage) - } - message = this.updateInternalProperties(message) // Resolve FIFO options from original message BEFORE offloading diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index 7c75a2e5..507b8202 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -443,15 +443,16 @@ describe('SqsPermissionConsumer', () => { await newConsumer.handlerSpy.waitForMessageWithId('1', 'consumed') - expect(logger.loggedMessages.length).toBe(2) + expect(logger.loggedMessages.length).toBe(1) expect(logger.loggedMessages).toMatchObject([ { - id: '1', - messageType: 'add', - timestamp: expect.any(String), - }, - { - processedMessageMetadata: expect.any(String), + processedMessageMetadata: expect.objectContaining({ + messageId: '1', + messageType: 'add', + processingResult: { + status: 'consumed', + }, + }), }, ]) await newConsumer.close() @@ -511,6 +512,9 @@ describe('SqsPermissionConsumer', () => { messageProcessingStartTimestamp: expect.any(Number), messageProcessingEndTimestamp: expect.any(Number), queueName: SqsPermissionConsumer.QUEUE_NAME, + messageMetadata: { + schemaVersions: '1.0.0', + }, message: expect.objectContaining({ id: '1', messageType: 'add',