Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ They implement the following public methods:
* `policyConfig` - SQS only - configuration for queue access policies (see [SQS Policy Configuration](#sqs-policy-configuration) for more information);
* `deletionConfig` - automatic cleanup of resources;
* `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information);
* `logMessages` - add logs for processed messages.
* `logMessages` - add debug logs for processed messages. When enabled, logs structured metadata including message id, type, timestamps, and queue name. For privacy reasons, the full message payload is not logged by default. See [Message Logging](#message-logging) for more details.
* `messageMetadataField` - which field in the message contains metadata for logging purposes (by default it is `metadata`).
* `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading).
* `messageDeduplicationConfig` - configuration for store-based message deduplication on publisher level. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication).
* `enablePublisherDeduplication` - enable store-based publisher-level deduplication. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication).
Expand Down Expand Up @@ -110,7 +111,8 @@ Multi-schema consumers support multiple message types via handler configs. They
* `consumerOverrides` – available only for SQS consumers;
* `deadLetterQueue` - available only for SQS and SNS consumers (see [Dead Letter Queue](#dead-letter-queue) for more information);
* `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information);
* `logMessages` - add logs for processed messages.
* `logMessages` - add debug logs for processed messages. When enabled, logs structured metadata including message id, type, timestamps, and queue name. For privacy reasons, the full message payload is not logged by default. To include custom message data in logs, configure `messageLogFormatter` on your handlers. See [Message Logging](#message-logging) for more details.
* `messageMetadataField` - which field in the message contains metadata for logging purposes (by default it is `metadata`).
* `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading).
* `concurrentConsumersAmount` - configuration for specifying the number of concurrent consumers to create. Available only for SQS and SNS consumers
* `messageDeduplicationConfig` - configuration for store-based message deduplication on consumer level. For more details on setting this up, see [Consumer-level store-based-message deduplication](#consumer-level-store-based-message-deduplication).
Expand Down Expand Up @@ -173,7 +175,9 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer<
preHandlerBarrier: async (message) => {
// do barrier check here
return true
}
},
// Optional: customize what message data is logged (see Message Logging section)
messageLogFormatter: (message) => ({ id: message.id, type: message.type }),
},
)
.addConfig(PERMISSIONS_REMOVE_MESSAGE_SCHEMA,
Expand Down Expand Up @@ -642,6 +646,56 @@ const result = await myConsumer.handlerSpy.waitForMessageWithId('1')
expect(result.processingResult).toEqual({ status: 'consumed' })
```

## Message Logging

When `logMessages` is enabled, processed messages are logged at the `debug` level with structured metadata. For privacy reasons, the full message payload is **not logged by default** to avoid exposing sensitive data.

### What is logged by default

Each log entry includes processed message metadata:
- `messageId` - unique identifier of the message
- `messageType` - type of the message
- `queueName` - name of the queue or topic
- `messageTimestamp` - when the message was originally sent
- `messageProcessingStartTimestamp` - when processing started
- `messageProcessingEndTimestamp` - when processing completed
- `messageDeduplicationId` - deduplication id (if deduplication is enabled)
- `messageMetadata` - contents of the metadata field (configurable via `messageMetadataField`)
- `processingResult` - outcome of processing (e.g., `{ status: 'consumed' }` or `{ status: 'published' }`)

### Custom message logging with messageLogFormatter

If you need to include additional message data in logs, you can configure a `messageLogFormatter` on your handler. This formatter receives the message and returns the data to be logged:

```typescript
new MessageHandlerConfigBuilder<SupportedMessages, ExecutionContext>()
.addConfig(
MY_MESSAGE_SCHEMA,
async (message, context) => {
// handler logic
return { result: 'success' }
},
{
// Only log specific fields, excluding sensitive data
messageLogFormatter: (message) => ({
id: message.id,
type: message.type,
// Exclude sensitive fields like email, password, etc.
}),
},
)
.build()
```

When a `messageLogFormatter` is provided, its output is included in the log under the `message` key alongside the processed message metadata.

### Configuration options

| Option | Default | Description |
|--------|---------|-------------|
| `logMessages` | `false` | Enable debug logging for processed messages |
| `messageMetadataField` | `'metadata'` | Field in the message containing metadata to include in logs |

## Payload Offloading
Payload offloading allows you to manage large message payloads by storing them in external storage, bypassing any message size restrictions imposed by queue systems.

Expand Down Expand Up @@ -773,6 +827,7 @@ It needs to implement the following methods:
- `messageProcessingStartTimestamp` - the timestamp when the processing of the message started
- `messageProcessingEndTimestamp` - the timestamp when the processing of the message finished
- `messageDeduplicationId` - the deduplication id of the message, in case deduplication is enabled
- `messageMetadata` - contents of the message metadata field (configurable via `messageMetadataField`)

See [@message-queue-toolkit/metrics](packages/metrics/README.md) for concrete implementations

Expand Down
20 changes: 12 additions & 8 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import type {
ParseMessageResult,
PreHandlingOutputs,
Prehandler,
ProcessedMessageMetadata,
QueueConsumer,
QueueConsumerOptions,
TransactionObservabilityManager,
} from '@message-queue-toolkit/core'
import { HandlerContainer, isMessageError, parseMessage } from '@message-queue-toolkit/core'
import type { ChannelModel, Message } from 'amqplib'

import type {
AMQPConsumerDependencies,
AMQPQueueCreationConfig,
Expand Down Expand Up @@ -157,10 +157,6 @@ export abstract class AbstractAmqpConsumer<
// @ts-expect-error
const uniqueTransactionKey = parsedMessage[this.messageIdField]
this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey)
if (this.logMessages) {
const resolvedLogMessage = this.resolveMessageLog(parsedMessage, messageType)
this.logMessage(resolvedLogMessage)
}
this.internalProcessMessage(parsedMessage, messageType)
.then((result) => {
if (result.result === 'success') {
Expand Down Expand Up @@ -267,9 +263,17 @@ export abstract class AbstractAmqpConsumer<
return this._messageSchemaContainer.resolveSchema(message)
}

protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown {
const handler = this.handlerContainer.resolveHandler(messageType)
return handler.messageLogFormatter(message)
protected override resolveMessageLog(
processedMessageMetadata: ProcessedMessageMetadata<MessagePayloadType>,
): unknown | null {
if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) {
return null
}
const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType)
if (!handler.messageLogFormatter) {
return null
}
return handler.messageLogFormatter(processedMessageMetadata.message)
}

// eslint-disable-next-line max-params
Expand Down
6 changes: 0 additions & 6 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,6 @@ export abstract class AbstractAmqpPublisher<
return
}

if (this.logMessages) {
const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown'
const resolvedLogMessage = this.resolveMessageLog(message, messageType)
this.logMessage(resolvedLogMessage)
}

message = this.updateInternalProperties(message)

try {
Expand Down
20 changes: 10 additions & 10 deletions packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,21 @@ describe('AmqpPermissionConsumer', () => {

await newConsumer.close()

expect(logger.loggedMessages.length).toBe(6)
expect(logger.loggedMessages.length).toBe(4)
expect(logger.loggedMessages).toMatchObject([
'Propagating new connection across 0 receivers',
{
id: '1',
messageType: 'add',
},
'timestamp not defined, adding it automatically',
expect.any(Object),
{
id: '1',
messageType: 'add',
timestamp: expect.any(String),
processedMessageMetadata: expect.objectContaining({
processingResult: { status: 'published' },
}),
},
{
processedMessageMetadata: expect.any(String),
processedMessageMetadata: expect.objectContaining({
messageId: '1',
messageType: 'add',
processingResult: { status: 'consumed' },
}),
},
])
})
Expand Down Expand Up @@ -164,6 +163,7 @@ describe('AmqpPermissionConsumer', () => {
id: '1',
messageType: 'add',
}),
messageMetadata: undefined,
},
])
})
Expand Down
17 changes: 12 additions & 5 deletions packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,20 @@ describe('PermissionPublisher', () => {
publisher.publish(message)

await waitAndRetry(() => {
return logger.loggedMessages.length === 2
return logger.loggedMessages.length === 3
})

expect(logger.loggedMessages[1]).toEqual({
id: '1',
messageType: 'add',
})
expect(logger.loggedMessages).toMatchObject([
'Propagating new connection across 0 receivers',
'timestamp not defined, adding it automatically',
{
processedMessageMetadata: expect.objectContaining({
messageId: '1',
messageType: 'add',
processingResult: { status: 'published' },
}),
},
])
})
})

Expand Down
61 changes: 45 additions & 16 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ export abstract class AbstractQueueService<
* Used to know the store-based message deduplication options
*/
protected readonly messageDeduplicationOptionsField: string
/**
* Used to know where metadata is stored - for debug logging purposes only
*/
protected readonly messageMetadataField: string
protected readonly errorReporter: ErrorReporter
public readonly logger: CommonLogger
protected readonly messageIdField: string
Expand Down Expand Up @@ -157,6 +161,7 @@ export abstract class AbstractQueueService<
this.messageDeduplicationIdField = options.messageDeduplicationIdField ?? 'deduplicationId'
this.messageDeduplicationOptionsField =
options.messageDeduplicationOptionsField ?? 'deduplicationOptions'
this.messageMetadataField = options.messageMetadataField ?? 'metadata'
this.creationConfig = options.creationConfig
this.locatorConfig = options.locatorConfig
this.deletionConfig = options.deletionConfig
Expand Down Expand Up @@ -239,15 +244,36 @@ export abstract class AbstractQueueService<
/**
* Format message for logging
*/
protected resolveMessageLog(message: MessagePayloadSchemas, _messageType: string): unknown {
return message
protected resolveMessageLog(
_processedMessageMetadata: ProcessedMessageMetadata<MessagePayloadSchemas>,
): unknown | null {
return null
}

/**
* Log preformatted and potentially presanitized message payload
*/
protected logMessage(messageLogEntry: unknown) {
this.logger.debug(messageLogEntry)
protected logMessageProcessed(
processedMessageMetadata: ProcessedMessageMetadata<MessagePayloadSchemas>,
) {
const processedMessageMetadataLog = {
processingResult: processedMessageMetadata.processingResult,
messageId: processedMessageMetadata.messageId,
messageType: processedMessageMetadata.messageType,
queueName: processedMessageMetadata.queueName,
messageTimestamp: processedMessageMetadata.messageTimestamp,
messageDeduplicationId: processedMessageMetadata.messageDeduplicationId,
messageProcessingStartTimestamp: processedMessageMetadata.messageProcessingStartTimestamp,
messageProcessingEndTimestamp: processedMessageMetadata.messageProcessingEndTimestamp,
messageMetadata: stringValueSerializer(processedMessageMetadata.messageMetadata),
}

const resolvedMessageLog = this.resolveMessageLog(processedMessageMetadata)

this.logger.debug(
{
processedMessageMetadata: processedMessageMetadataLog,
...(resolvedMessageLog ? { message: resolvedMessageLog } : {}),
},
`Finished processing message ${processedMessageMetadata.messageId}`,
)
}

protected handleError(err: unknown, context?: Record<string, unknown>) {
Expand Down Expand Up @@ -284,8 +310,8 @@ export abstract class AbstractQueueService<
messageType,
)

const debugLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug')
if (!debugLoggingEnabled && !this.messageMetricsManager) return
const debugMessageLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug')
if (!debugMessageLoggingEnabled && !this.messageMetricsManager) return

const processedMessageMetadata = this.resolveProcessedMessageMetadata(
message,
Expand All @@ -295,11 +321,8 @@ export abstract class AbstractQueueService<
params.queueName,
messageId,
)
if (debugLoggingEnabled) {
this.logger.debug(
{ processedMessageMetadata: stringValueSerializer(processedMessageMetadata) },
`Finished processing message ${processedMessageMetadata.messageId}`,
)
if (debugMessageLoggingEnabled) {
this.logMessageProcessed(processedMessageMetadata)
}
if (this.messageMetricsManager) {
this.messageMetricsManager.registerProcessedMessage(processedMessageMetadata)
Expand All @@ -321,8 +344,13 @@ export abstract class AbstractQueueService<
const messageType = message ? this.resolveMessageTypeFromMessage(message) : undefined
const messageDeduplicationId =
message && this.messageDeduplicationIdField in message
? // @ts-ignore
message[this.messageDeduplicationId]
? // @ts-expect-error
message[this.messageDeduplicationIdField]
: undefined
const messageMetadata =
message && this.messageMetadataField in message
? // @ts-expect-error
message[this.messageMetadataField]
: undefined

return {
Expand All @@ -335,6 +363,7 @@ export abstract class AbstractQueueService<
messageDeduplicationId,
messageProcessingStartTimestamp,
messageProcessingEndTimestamp,
messageMetadata,
}
}

Expand Down
6 changes: 2 additions & 4 deletions packages/core/lib/queues/HandlerContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ export type Prehandler<MessagePayloadSchema extends object, ExecutionContext, Pr
next: (result: PrehandlerResult) => void,
) => void

export const defaultLogFormatter = <MessagePayloadSchema>(message: MessagePayloadSchema) => message

export type HandlerConfigOptions<
MessagePayloadSchema extends object,
ExecutionContext,
Expand Down Expand Up @@ -98,7 +96,7 @@ export class MessageHandlerConfig<
PrehandlerOutput,
BarrierOutput
>
public readonly messageLogFormatter: LogFormatter<MessagePayloadSchema>
public readonly messageLogFormatter?: LogFormatter<MessagePayloadSchema>
public readonly preHandlerBarrier?: BarrierCallback<
MessagePayloadSchema,
ExecutionContext,
Expand Down Expand Up @@ -126,7 +124,7 @@ export class MessageHandlerConfig<
this.definition = eventDefinition
this.messageType = options?.messageType
this.handler = handler
this.messageLogFormatter = options?.messageLogFormatter ?? defaultLogFormatter
this.messageLogFormatter = options?.messageLogFormatter
this.preHandlerBarrier = options?.preHandlerBarrier
this.preHandlers = options?.preHandlers ?? []
}
Expand Down
6 changes: 6 additions & 0 deletions packages/core/lib/types/queueOptionsTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ export type ProcessedMessageMetadata<MessagePayloadSchemas extends object = obje
* ID used for the message deduplication, in case it's enabled
*/
messageDeduplicationId?: string

/**
* Message metadata (see ConsumerMessageMetadataType)
*/
messageMetadata?: Record<string, unknown>
}

export interface MessageMetricsManager<MessagePayloadSchemas extends object = object> {
Expand Down Expand Up @@ -113,6 +118,7 @@ export type CommonQueueOptions = {
messageTimestampField?: string
messageDeduplicationIdField?: string
messageDeduplicationOptionsField?: string
messageMetadataField?: string
handlerSpy?: HandlerSpy<object> | HandlerSpyParams | boolean
logMessages?: boolean
deletionConfig?: DeletionConfig
Expand Down
Loading
Loading