diff --git a/go.mod b/go.mod index f765d084..58037ba2 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,8 @@ require ( github.com/testcontainers/testcontainers-go/modules/postgres v0.42.0 go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/metric v1.43.0 + go.opentelemetry.io/otel/sdk v1.43.0 + go.opentelemetry.io/otel/sdk/metric v1.43.0 go.opentelemetry.io/otel/trace v1.43.0 google.golang.org/genproto v0.0.0-20260420184626-e10c466a9529 google.golang.org/protobuf v1.36.11 diff --git a/go.sum b/go.sum index dae644ea..f2e7ed5c 100644 --- a/go.sum +++ b/go.sum @@ -250,6 +250,8 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= diff --git a/opentelemetry/config.go b/opentelemetry/config.go index 93a13fca..9ccb200a 100644 --- a/opentelemetry/config.go +++ b/opentelemetry/config.go @@ -6,7 +6,15 @@ import ( "go.opentelemetry.io/otel/trace" ) -const instrumentationName = "github.com/get-eventually/go-eventually/opentelemetry" +// InstrumentationName is the instrumentation scope name used by the Tracer +// and Meter exposed by this package. +const InstrumentationName = "github.com/get-eventually/go-eventually/opentelemetry" + +// MetricUnitMilliseconds is the [OpenTelemetry-compatible unit] used by the +// duration histograms exposed by this package. +// +// [OpenTelemetry-compatible unit]: https://ucum.org/ucum +const MetricUnitMilliseconds = "ms" type config struct { MeterProvider metric.MeterProvider @@ -14,11 +22,11 @@ type config struct { } func (c config) meter() metric.Meter { - return c.MeterProvider.Meter(instrumentationName) + return c.MeterProvider.Meter(InstrumentationName) } func (c config) tracer() trace.Tracer { - return c.TracerProvider.Tracer(instrumentationName) + return c.TracerProvider.Tracer(InstrumentationName) } // Option specifies instrumentation configuration options. diff --git a/opentelemetry/config_test.go b/opentelemetry/config_test.go new file mode 100644 index 00000000..b278d454 --- /dev/null +++ b/opentelemetry/config_test.go @@ -0,0 +1,48 @@ +package opentelemetry_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/get-eventually/go-eventually/event" + otelex "github.com/get-eventually/go-eventually/opentelemetry" + "github.com/get-eventually/go-eventually/version" +) + +func TestWithTracerProvider_OverridesDefault(t *testing.T) { + h := newHarness(t) + + ies, err := otelex.NewInstrumentedEventStore( + event.NewInMemoryStore(), + otelex.WithTracerProvider(h.tracer), + otelex.WithMeterProvider(h.meter), + ) + require.NoError(t, err) + + _, err = ies.Append(t.Context(), "stream", version.Any) + require.NoError(t, err) + + spans := h.endedSpans() + require.Len(t, spans, 1, "expected a span to be recorded by the supplied tracer provider") + assert.Equal(t, otelex.InstrumentationName, spans[0].InstrumentationScope().Name) +} + +func TestWithMeterProvider_OverridesDefault(t *testing.T) { + h := newHarness(t) + + ies, err := otelex.NewInstrumentedEventStore( + event.NewInMemoryStore(), + otelex.WithTracerProvider(h.tracer), + otelex.WithMeterProvider(h.meter), + ) + require.NoError(t, err) + + _, err = ies.Append(t.Context(), "stream", version.Any) + require.NoError(t, err) + + sm := h.collectScopeMetrics(t) + assert.Equal(t, otelex.InstrumentationName, sm.Scope.Name) + assert.NotEmpty(t, sm.Metrics, "expected metrics on the supplied meter provider") +} diff --git a/opentelemetry/event_store.go b/opentelemetry/event_store.go index 7b0132c7..458a0f1d 100644 --- a/opentelemetry/event_store.go +++ b/opentelemetry/event_store.go @@ -22,6 +22,21 @@ const ( EventStoreNumEventsKey attribute.Key = "event_store.num_events" ) +// Span names emitted by the InstrumentedEventStore instrumentation. +const ( + EventStoreStreamSpanName = "event.Store.Stream" + EventStoreAppendSpanName = "event.Store.Append" +) + +// Metric names and descriptions exposed by the InstrumentedEventStore instrumentation. +const ( + EventStoreStreamDurationMetricName = "eventually.event_store.stream.duration.milliseconds" + EventStoreStreamDurationMetricDescription = "Duration in milliseconds of event.Store.Stream operations performed." + + EventStoreAppendDurationMetricName = "eventually.event_store.append.duration.milliseconds" + EventStoreAppendDurationMetricDescription = "Duration in milliseconds of event.Store.Append operations performed." +) + var _ event.Store = new(InstrumentedEventStore) // InstrumentedEventStore is a wrapper type over an event.Store @@ -41,17 +56,17 @@ func (ies *InstrumentedEventStore) registerMetrics(meter metric.Meter) error { var err error if ies.streamDuration, err = meter.Int64Histogram( - "eventually.event_store.stream.duration.milliseconds", - metric.WithUnit("ms"), - metric.WithDescription("Duration in milliseconds of event.Store.Stream operations performed."), + EventStoreStreamDurationMetricName, + metric.WithUnit(MetricUnitMilliseconds), + metric.WithDescription(EventStoreStreamDurationMetricDescription), ); err != nil { return fmt.Errorf("opentelemetry.InstrumentedEventStore: failed to register metric, %w", err) } if ies.appendDuration, err = meter.Int64Histogram( - "eventually.event_store.append.duration.milliseconds", - metric.WithUnit("ms"), - metric.WithDescription("Duration in milliseconds of event.Store.Append operations performed."), + EventStoreAppendDurationMetricName, + metric.WithUnit(MetricUnitMilliseconds), + metric.WithDescription(EventStoreAppendDurationMetricDescription), ); err != nil { return fmt.Errorf("opentelemetry.InstrumentedEventStore: failed to register metric, %w", err) } @@ -96,7 +111,7 @@ func (ies *InstrumentedEventStore) Stream( } return event.NewStream(func(yield func(event.Persisted) bool) error { - ctx, span := ies.tracer.Start(ctx, "event.Store.Stream", trace.WithAttributes(attributes...)) + ctx, span := ies.tracer.Start(ctx, EventStoreStreamSpanName, trace.WithAttributes(attributes...)) start := time.Now() inner := ies.eventStore.Stream(ctx, id, selector) @@ -146,7 +161,7 @@ func (ies *InstrumentedEventStore) Append( EventStoreNumEventsKey.Int(len(events)), } - ctx, span := ies.tracer.Start(ctx, "event.Store.Append", trace.WithAttributes(attributes...)) + ctx, span := ies.tracer.Start(ctx, EventStoreAppendSpanName, trace.WithAttributes(attributes...)) start := time.Now() defer func() { diff --git a/opentelemetry/event_store_test.go b/opentelemetry/event_store_test.go new file mode 100644 index 00000000..f50cf243 --- /dev/null +++ b/opentelemetry/event_store_test.go @@ -0,0 +1,456 @@ +package opentelemetry_test + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "github.com/get-eventually/go-eventually/event" + "github.com/get-eventually/go-eventually/message" + "github.com/get-eventually/go-eventually/opentelemetry" + "github.com/get-eventually/go-eventually/version" +) + +type noopMessage struct{ id int } + +func (noopMessage) Name() string { return "noop" } + +var _ message.Message = noopMessage{} + +const testStreamID event.StreamID = "test-stream" + +func appendEnvelopes(t *testing.T, store event.Store, n int) { + t.Helper() + + envelopes := make([]event.Envelope, 0, n) + for i := range n { + envelopes = append(envelopes, event.Envelope{Message: noopMessage{id: i}}) + } + + _, err := store.Append(t.Context(), testStreamID, version.Any, envelopes...) + require.NoError(t, err) +} + +func drainStream(t *testing.T, stream *event.Stream) { + t.Helper() + + count := 0 + for range stream.Iter() { + count++ + } + + require.NoError(t, stream.Err()) +} + +func TestNewInstrumentedEventStore_Succeeds(t *testing.T) { + h := newHarness(t) + + ies, err := opentelemetry.NewInstrumentedEventStore(event.NewInMemoryStore(), h.options()...) + require.NoError(t, err) + require.NotNil(t, ies) +} + +func TestStream_DelegatesAndYieldsEvents(t *testing.T) { + h := newHarness(t) + + inner := event.NewInMemoryStore() + appendEnvelopes(t, inner, 3) + + ies, err := opentelemetry.NewInstrumentedEventStore(inner, h.options()...) + require.NoError(t, err) + + stream := ies.Stream(t.Context(), testStreamID, version.SelectFromBeginning) + + got := make([]int, 0, 3) + + for evt := range stream.Iter() { + msg, ok := evt.Message.(noopMessage) + require.True(t, ok) + + got = append(got, msg.id) + } + + require.NoError(t, stream.Err()) + assert.Equal(t, []int{0, 1, 2}, got) +} + +func TestStream_RecordsSpan(t *testing.T) { + h := newHarness(t) + + inner := event.NewInMemoryStore() + appendEnvelopes(t, inner, 2) + + ies, err := opentelemetry.NewInstrumentedEventStore(inner, h.options()...) + require.NoError(t, err) + + drainStream(t, ies.Stream(t.Context(), testStreamID, version.Selector{From: 1})) + + spans := h.endedSpans() + require.Len(t, spans, 1) + + span := spans[0] + assert.Equal(t, opentelemetry.EventStoreStreamSpanName, span.Name()) + assert.Equal(t, codes.Unset, span.Status().Code, "successful stream should not set error status") + assert.Contains(t, span.Attributes(), opentelemetry.EventStreamIDKey.String(string(testStreamID))) + assert.Contains(t, span.Attributes(), opentelemetry.EventStreamVersionSelectorKey.Int64(1)) +} + +func TestStream_RecordsHistogram(t *testing.T) { + h := newHarness(t) + + inner := event.NewInMemoryStore() + appendEnvelopes(t, inner, 1) + + ies, err := opentelemetry.NewInstrumentedEventStore(inner, h.options()...) + require.NoError(t, err) + + drainStream(t, ies.Stream(t.Context(), testStreamID, version.SelectFromBeginning)) + + sm := h.collectScopeMetrics(t) + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: opentelemetry.InstrumentationName}, + Metrics: []metricdata.Metrics{ + { + Name: opentelemetry.EventStoreStreamDurationMetricName, + Description: opentelemetry.EventStoreStreamDurationMetricDescription, + Unit: opentelemetry.MetricUnitMilliseconds, + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(opentelemetry.ErrorAttribute.Bool(false)), + }, + }, + }, + }, + }, + } + + got := metricdata.ScopeMetrics{ + Scope: sm.Scope, + Metrics: []metricdata.Metrics{findMetric(t, &sm, opentelemetry.EventStoreStreamDurationMetricName)}, + } + + metricdatatest.AssertEqual(t, want, got, + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue(), + metricdatatest.IgnoreExemplars(), + ) +} + +func TestStream_DurationCoversIteration(t *testing.T) { + h := newHarness(t) + + inner := event.NewInMemoryStore() + appendEnvelopes(t, inner, 3) + + slow := &slowEventStore{inner: inner, delay: 50 * time.Millisecond} + + ies, err := opentelemetry.NewInstrumentedEventStore(slow, h.options()...) + require.NoError(t, err) + + drainStream(t, ies.Stream(t.Context(), testStreamID, version.SelectFromBeginning)) + + sm := h.collectScopeMetrics(t) + m := findMetric(t, &sm, opentelemetry.EventStoreStreamDurationMetricName) + + data, ok := m.Data.(metricdata.Histogram[int64]) + require.True(t, ok, "expected Histogram[int64] data type") + require.Len(t, data.DataPoints, 1) + + // 3 events x 50ms sleep = ~150ms. Assert the recorded sum is at least + // 100ms to allow for scheduling jitter while still proving iteration + // time is included. + assert.GreaterOrEqual(t, data.DataPoints[0].Sum, int64(100), + "recorded duration should cover full iteration, got %d ms", data.DataPoints[0].Sum) +} + +func TestStream_ProducerError_RecordsErrorOnSpanAndMetric(t *testing.T) { + h := newHarness(t) + + wantErr := errors.New("store exploded") + inner := &errorEventStore{streamErr: wantErr} + + ies, err := opentelemetry.NewInstrumentedEventStore(inner, h.options()...) + require.NoError(t, err) + + stream := ies.Stream(t.Context(), testStreamID, version.SelectFromBeginning) + for range stream.Iter() { + t.Fatalf("no events should be yielded when the producer errors before iteration") + } + + require.ErrorIs(t, stream.Err(), wantErr) + + spans := h.endedSpans() + require.Len(t, spans, 1) + + span := spans[0] + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, wantErr.Error(), span.Status().Description) + assert.True(t, hasExceptionEvent(span), "span should have recorded an exception event") + + sm := h.collectScopeMetrics(t) + m := findMetric(t, &sm, opentelemetry.EventStoreStreamDurationMetricName) + + data, ok := m.Data.(metricdata.Histogram[int64]) + require.True(t, ok) + require.Len(t, data.DataPoints, 1) + + val, ok := data.DataPoints[0].Attributes.Value(opentelemetry.ErrorAttribute) + require.True(t, ok, "error attribute not found on data point") + assert.True(t, val.AsBool(), "error attribute should be true") +} + +func TestStream_ConsumerAbandonment_NoErrorRecorded(t *testing.T) { + h := newHarness(t) + + inner := event.NewInMemoryStore() + appendEnvelopes(t, inner, 10) + + ies, err := opentelemetry.NewInstrumentedEventStore(inner, h.options()...) + require.NoError(t, err) + + stream := ies.Stream(t.Context(), testStreamID, version.SelectFromBeginning) + count := 0 + + for range stream.Iter() { + count++ + if count == 3 { + break + } + } + + require.NoError(t, stream.Err(), "abandonment is not a failure") + + spans := h.endedSpans() + require.Len(t, spans, 1) + assert.Equal(t, codes.Unset, spans[0].Status().Code) + + sm := h.collectScopeMetrics(t) + m := findMetric(t, &sm, opentelemetry.EventStoreStreamDurationMetricName) + + data, ok := m.Data.(metricdata.Histogram[int64]) + require.True(t, ok) + require.Len(t, data.DataPoints, 1) + + val, ok := data.DataPoints[0].Attributes.Value(opentelemetry.ErrorAttribute) + require.True(t, ok) + assert.False(t, val.AsBool(), "error attribute should be false on consumer abandonment") +} + +func TestStream_SpanIsChildOfParent(t *testing.T) { + h := newHarness(t) + + inner := event.NewInMemoryStore() + appendEnvelopes(t, inner, 1) + + ies, err := opentelemetry.NewInstrumentedEventStore(inner, h.options()...) + require.NoError(t, err) + + ctx, parent := h.tracer.Tracer("test").Start(t.Context(), "parent") + + drainStream(t, ies.Stream(ctx, testStreamID, version.SelectFromBeginning)) + + parent.End() + + spans := h.endedSpans() + require.Len(t, spans, 2) + + // Spans are emitted in completion order: the inner Stream span finishes + // first (inside the Iter range), then the parent. + streamSpan, parentSpan := spans[0], spans[1] + if streamSpan.Name() != opentelemetry.EventStoreStreamSpanName { + streamSpan, parentSpan = spans[1], spans[0] + } + + assert.Equal(t, opentelemetry.EventStoreStreamSpanName, streamSpan.Name()) + assert.Equal(t, "parent", parentSpan.Name()) + assert.Equal(t, parentSpan.SpanContext().SpanID(), streamSpan.Parent().SpanID()) +} + +func TestAppend_DelegatesAndReturnsNewVersion(t *testing.T) { + h := newHarness(t) + + inner := event.NewInMemoryStore() + + ies, err := opentelemetry.NewInstrumentedEventStore(inner, h.options()...) + require.NoError(t, err) + + newVersion, err := ies.Append( + t.Context(), + testStreamID, + version.Any, + event.Envelope{Message: noopMessage{id: 0}}, + event.Envelope{Message: noopMessage{id: 1}}, + ) + require.NoError(t, err) + assert.Equal(t, version.Version(2), newVersion) + + stream := inner.Stream(t.Context(), testStreamID, version.SelectFromBeginning) + count := 0 + + for range stream.Iter() { + count++ + } + + require.NoError(t, stream.Err()) + assert.Equal(t, 2, count) +} + +func TestAppend_RecordsSpan_CheckExact(t *testing.T) { + h := newHarness(t) + + inner := event.NewInMemoryStore() + + ies, err := opentelemetry.NewInstrumentedEventStore(inner, h.options()...) + require.NoError(t, err) + + _, err = ies.Append( + t.Context(), + testStreamID, + version.CheckExact(0), + event.Envelope{Message: noopMessage{id: 0}}, + event.Envelope{Message: noopMessage{id: 1}}, + event.Envelope{Message: noopMessage{id: 2}}, + ) + require.NoError(t, err) + + spans := h.endedSpans() + require.Len(t, spans, 1) + + span := spans[0] + assert.Equal(t, opentelemetry.EventStoreAppendSpanName, span.Name()) + assert.Equal(t, codes.Unset, span.Status().Code) + assert.Contains(t, span.Attributes(), opentelemetry.EventStreamIDKey.String(string(testStreamID))) + assert.Contains(t, span.Attributes(), opentelemetry.EventStreamExpectedVersionKey.Int64(0)) + assert.Contains(t, span.Attributes(), opentelemetry.EventStoreNumEventsKey.Int(3)) +} + +func TestAppend_RecordsSpan_CheckAny(t *testing.T) { + h := newHarness(t) + + inner := event.NewInMemoryStore() + + ies, err := opentelemetry.NewInstrumentedEventStore(inner, h.options()...) + require.NoError(t, err) + + _, err = ies.Append(t.Context(), testStreamID, version.Any) + require.NoError(t, err) + + spans := h.endedSpans() + require.Len(t, spans, 1) + assert.Contains(t, spans[0].Attributes(), opentelemetry.EventStreamExpectedVersionKey.Int64(-1)) +} + +func TestAppend_RecordsHistogram(t *testing.T) { + h := newHarness(t) + + inner := event.NewInMemoryStore() + + ies, err := opentelemetry.NewInstrumentedEventStore(inner, h.options()...) + require.NoError(t, err) + + _, err = ies.Append(t.Context(), testStreamID, version.Any, event.Envelope{Message: noopMessage{}}) + require.NoError(t, err) + + sm := h.collectScopeMetrics(t) + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: opentelemetry.InstrumentationName}, + Metrics: []metricdata.Metrics{ + { + Name: opentelemetry.EventStoreAppendDurationMetricName, + Description: opentelemetry.EventStoreAppendDurationMetricDescription, + Unit: opentelemetry.MetricUnitMilliseconds, + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(opentelemetry.ErrorAttribute.Bool(false)), + }, + }, + }, + }, + }, + } + + got := metricdata.ScopeMetrics{ + Scope: sm.Scope, + Metrics: []metricdata.Metrics{findMetric(t, &sm, opentelemetry.EventStoreAppendDurationMetricName)}, + } + + metricdatatest.AssertEqual(t, want, got, + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue(), + metricdatatest.IgnoreExemplars(), + ) +} + +func TestAppend_Error_RecordsErrorOnSpanAndMetric(t *testing.T) { + h := newHarness(t) + + wantErr := errors.New("append failed") + inner := &errorEventStore{appendErr: wantErr} + + ies, err := opentelemetry.NewInstrumentedEventStore(inner, h.options()...) + require.NoError(t, err) + + _, err = ies.Append(t.Context(), testStreamID, version.Any) + require.ErrorIs(t, err, wantErr) + + spans := h.endedSpans() + require.Len(t, spans, 1) + + span := spans[0] + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, wantErr.Error(), span.Status().Description) + assert.True(t, hasExceptionEvent(span), "span should have recorded an exception event") + + sm := h.collectScopeMetrics(t) + m := findMetric(t, &sm, opentelemetry.EventStoreAppendDurationMetricName) + + data, ok := m.Data.(metricdata.Histogram[int64]) + require.True(t, ok) + require.Len(t, data.DataPoints, 1) + + val, ok := data.DataPoints[0].Attributes.Value(opentelemetry.ErrorAttribute) + require.True(t, ok) + assert.True(t, val.AsBool(), "error attribute should be true on failed Append") +} + +func TestAppend_SpanIsChildOfParent(t *testing.T) { + h := newHarness(t) + + inner := event.NewInMemoryStore() + + ies, err := opentelemetry.NewInstrumentedEventStore(inner, h.options()...) + require.NoError(t, err) + + ctx, parent := h.tracer.Tracer("test").Start(t.Context(), "parent") + + _, err = ies.Append(ctx, testStreamID, version.Any) + require.NoError(t, err) + + parent.End() + + spans := h.endedSpans() + require.Len(t, spans, 2) + + appendSpan, parentSpan := spans[0], spans[1] + if appendSpan.Name() != opentelemetry.EventStoreAppendSpanName { + appendSpan, parentSpan = spans[1], spans[0] + } + + assert.Equal(t, opentelemetry.EventStoreAppendSpanName, appendSpan.Name()) + assert.Equal(t, "parent", parentSpan.Name()) + assert.Equal(t, parentSpan.SpanContext().SpanID(), appendSpan.Parent().SpanID()) +} diff --git a/opentelemetry/helpers_test.go b/opentelemetry/helpers_test.go new file mode 100644 index 00000000..aa0be144 --- /dev/null +++ b/opentelemetry/helpers_test.go @@ -0,0 +1,170 @@ +package opentelemetry_test + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + "github.com/get-eventually/go-eventually/aggregate" + "github.com/get-eventually/go-eventually/event" + otelex "github.com/get-eventually/go-eventually/opentelemetry" + "github.com/get-eventually/go-eventually/version" +) + +const exceptionEventName = "exception" + +type harness struct { + spanRecorder *tracetest.SpanRecorder + tracer *sdktrace.TracerProvider + metricReader *sdkmetric.ManualReader + meter *sdkmetric.MeterProvider +} + +func newHarness(t *testing.T) *harness { + t.Helper() + + spanRecorder := tracetest.NewSpanRecorder() + tracerProvider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(spanRecorder)) + + reader := sdkmetric.NewManualReader() + meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + + t.Cleanup(func() { + ctx := context.Background() + require.NoError(t, tracerProvider.Shutdown(ctx)) + require.NoError(t, meterProvider.Shutdown(ctx)) + }) + + return &harness{ + spanRecorder: spanRecorder, + tracer: tracerProvider, + metricReader: reader, + meter: meterProvider, + } +} + +func (h *harness) options() []otelex.Option { + return []otelex.Option{ + otelex.WithTracerProvider(h.tracer), + otelex.WithMeterProvider(h.meter), + } +} + +func (h *harness) endedSpans() []sdktrace.ReadOnlySpan { + return h.spanRecorder.Ended() +} + +func (h *harness) collectScopeMetrics(t *testing.T) metricdata.ScopeMetrics { + t.Helper() + + var rm metricdata.ResourceMetrics + + require.NoError(t, h.metricReader.Collect(context.Background(), &rm)) + require.Len(t, rm.ScopeMetrics, 1, "expected exactly one ScopeMetrics entry") + + return rm.ScopeMetrics[0] +} + +func findMetric(t *testing.T, sm *metricdata.ScopeMetrics, name string) metricdata.Metrics { + t.Helper() + + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + + t.Fatalf("metric %q not found in scope %q", name, sm.Scope.Name) + + return metricdata.Metrics{} +} + +func hasExceptionEvent(span sdktrace.ReadOnlySpan) bool { + for _, evt := range span.Events() { + if evt.Name == exceptionEventName { + return true + } + } + + return false +} + +type errorEventStore struct { + streamErr error + appendErr error +} + +func (s *errorEventStore) Stream(_ context.Context, _ event.StreamID, _ version.Selector) *event.Stream { + return event.NewStream(func(_ func(event.Persisted) bool) error { + return s.streamErr + }) +} + +func (s *errorEventStore) Append( + _ context.Context, + _ event.StreamID, + _ version.Check, + _ ...event.Envelope, +) (version.Version, error) { + return 0, s.appendErr +} + +type slowEventStore struct { + inner event.Store + delay time.Duration +} + +func (s *slowEventStore) Stream(ctx context.Context, id event.StreamID, selector version.Selector) *event.Stream { + inner := s.inner.Stream(ctx, id, selector) + + return event.NewStream(func(yield func(event.Persisted) bool) error { + for evt := range inner.Iter() { + time.Sleep(s.delay) + + if !yield(evt) { + return nil + } + } + + return inner.Err() + }) +} + +func (s *slowEventStore) Append( + ctx context.Context, + id event.StreamID, + expected version.Check, + events ...event.Envelope, +) (version.Version, error) { + return s.inner.Append(ctx, id, expected, events...) +} + +type errorRepository[I aggregate.ID, T aggregate.Root[I]] struct { + getErr error + saveErr error + result T +} + +func (r *errorRepository[I, T]) Get(_ context.Context, _ I) (T, error) { + return r.result, r.getErr +} + +func (r *errorRepository[I, T]) Save(_ context.Context, _ T) error { + return r.saveErr +} + +func newUUID(t *testing.T) uuid.UUID { + t.Helper() + + id, err := uuid.NewRandom() + require.NoError(t, err) + + return id +} diff --git a/opentelemetry/repository.go b/opentelemetry/repository.go index b79baa54..03274334 100644 --- a/opentelemetry/repository.go +++ b/opentelemetry/repository.go @@ -21,6 +21,21 @@ const ( AggregateIDAttribute attribute.Key = "aggregate.id" ) +// Span names emitted by the InstrumentedRepository instrumentation. +const ( + RepositoryGetSpanName = "aggregate.Repository.Get" + RepositorySaveSpanName = "aggregate.Repository.Save" +) + +// Metric names and descriptions exposed by the InstrumentedRepository instrumentation. +const ( + RepositoryGetDurationMetricName = "eventually.repository.get.duration.milliseconds" + RepositoryGetDurationMetricDescription = "Duration in milliseconds of aggregate.Repository.Get operations performed." + + RepositorySaveDurationMetricName = "eventually.repository.save.duration.milliseconds" + RepositorySaveDurationMetricDescription = "Duration in milliseconds of aggregate.Repository.Save operations performed." +) + // InstrumentedRepository is a wrapper type over an aggregate.Repository // instance to provide instrumentation, in the form of metrics and traces // using OpenTelemetry. @@ -39,17 +54,17 @@ func (ir *InstrumentedRepository[I, T]) registerMetrics(meter metric.Meter) erro var err error if ir.getDuration, err = meter.Int64Histogram( - "eventually.repository.get.duration.milliseconds", - metric.WithUnit("ms"), - metric.WithDescription("Duration in milliseconds of aggregate.Repository.Get operations performed."), + RepositoryGetDurationMetricName, + metric.WithUnit(MetricUnitMilliseconds), + metric.WithDescription(RepositoryGetDurationMetricDescription), ); err != nil { return fmt.Errorf("opentelemetry.InstrumentedRepository: failed to register metric, %w", err) } if ir.saveDuration, err = meter.Int64Histogram( - "eventually.repository.save.duration.milliseconds", - metric.WithUnit("ms"), - metric.WithDescription("Duration in milliseconds of aggregate.Repository.Save operations performed."), + RepositorySaveDurationMetricName, + metric.WithUnit(MetricUnitMilliseconds), + metric.WithDescription(RepositorySaveDurationMetricDescription), ); err != nil { return fmt.Errorf("opentelemetry.InstrumentedRepository: failed to register metric, %w", err) } @@ -98,7 +113,7 @@ func (ir *InstrumentedRepository[I, T]) Get(ctx context.Context, id I) (result T AggregateIDAttribute.String(id.String()), ) - ctx, span := ir.tracer.Start(ctx, "aggregate.Repository.Get", trace.WithAttributes(spanAttributes...)) + ctx, span := ir.tracer.Start(ctx, RepositoryGetSpanName, trace.WithAttributes(spanAttributes...)) start := time.Now() defer func() { @@ -135,7 +150,7 @@ func (ir *InstrumentedRepository[I, T]) Save(ctx context.Context, root T) (err e AggregateVersionAttribute.Int64(int64(root.Version())), ) - ctx, span := ir.tracer.Start(ctx, "aggregate.Repository.Save", trace.WithAttributes(spanAttributes...)) + ctx, span := ir.tracer.Start(ctx, RepositorySaveSpanName, trace.WithAttributes(spanAttributes...)) start := time.Now() defer func() { diff --git a/opentelemetry/repository_test.go b/opentelemetry/repository_test.go new file mode 100644 index 00000000..9e673d1d --- /dev/null +++ b/opentelemetry/repository_test.go @@ -0,0 +1,400 @@ +package opentelemetry_test + +import ( + "errors" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "github.com/get-eventually/go-eventually/aggregate" + "github.com/get-eventually/go-eventually/event" + "github.com/get-eventually/go-eventually/internal/user" + otelex "github.com/get-eventually/go-eventually/opentelemetry" +) + +var testBirthDate = time.Date(1990, 1, 1, 0, 0, 0, 0, time.UTC) + +func newUserRepository() aggregate.Repository[uuid.UUID, *user.User] { + return aggregate.NewEventSourcedRepository(event.NewInMemoryStore(), user.Type) +} + +func newTestUser(t *testing.T, id uuid.UUID) *user.User { + t.Helper() + + usr, err := user.Create(id, "Jane", "Doe", "jane@doe.com", testBirthDate, time.Now()) + require.NoError(t, err) + + return usr +} + +func TestNewInstrumentedRepository_Succeeds(t *testing.T) { + h := newHarness(t) + + repo, err := otelex.NewInstrumentedRepository(user.Type, newUserRepository(), h.options()...) + require.NoError(t, err) + require.NotNil(t, repo) +} + +func TestGet_DelegatesAndReturnsRoot(t *testing.T) { + h := newHarness(t) + + inner := newUserRepository() + + repo, err := otelex.NewInstrumentedRepository(user.Type, inner, h.options()...) + require.NoError(t, err) + + id := newUUID(t) + usr := newTestUser(t, id) + + require.NoError(t, repo.Save(t.Context(), usr)) + + got, err := repo.Get(t.Context(), id) + require.NoError(t, err) + assert.Equal(t, usr, got) +} + +func TestGet_RecordsSpan_WithVersionAttributeOnSuccess(t *testing.T) { + h := newHarness(t) + + inner := newUserRepository() + + id := newUUID(t) + usr := newTestUser(t, id) + require.NoError(t, inner.Save(t.Context(), usr)) + + repo, err := otelex.NewInstrumentedRepository(user.Type, inner, h.options()...) + require.NoError(t, err) + + _, err = repo.Get(t.Context(), id) + require.NoError(t, err) + + spans := h.endedSpans() + require.Len(t, spans, 1) + + span := spans[0] + assert.Equal(t, otelex.RepositoryGetSpanName, span.Name()) + assert.Equal(t, codes.Unset, span.Status().Code) + assert.Contains(t, span.Attributes(), otelex.AggregateTypeAttribute.String(user.Type.Name)) + assert.Contains(t, span.Attributes(), otelex.AggregateIDAttribute.String(id.String())) + assert.Contains(t, span.Attributes(), otelex.AggregateVersionAttribute.Int64(int64(usr.Version()))) +} + +func TestGet_RecordsHistogram_WithAggregateTypeAttribute(t *testing.T) { + h := newHarness(t) + + inner := newUserRepository() + + id := newUUID(t) + usr := newTestUser(t, id) + require.NoError(t, inner.Save(t.Context(), usr)) + + repo, err := otelex.NewInstrumentedRepository(user.Type, inner, h.options()...) + require.NoError(t, err) + + _, err = repo.Get(t.Context(), id) + require.NoError(t, err) + + sm := h.collectScopeMetrics(t) + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: otelex.InstrumentationName}, + Metrics: []metricdata.Metrics{ + { + Name: otelex.RepositoryGetDurationMetricName, + Description: otelex.RepositoryGetDurationMetricDescription, + Unit: otelex.MetricUnitMilliseconds, + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelex.AggregateTypeAttribute.String(user.Type.Name), + otelex.ErrorAttribute.Bool(false), + ), + }, + }, + }, + }, + }, + } + + got := metricdata.ScopeMetrics{ + Scope: sm.Scope, + Metrics: []metricdata.Metrics{findMetric(t, &sm, otelex.RepositoryGetDurationMetricName)}, + } + + metricdatatest.AssertEqual(t, want, got, + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue(), + metricdatatest.IgnoreExemplars(), + ) +} + +func TestGet_Error_RecordsErrorOnSpanAndMetric(t *testing.T) { + h := newHarness(t) + + inner := &errorRepository[uuid.UUID, *user.User]{getErr: aggregate.ErrRootNotFound} + + repo, err := otelex.NewInstrumentedRepository(user.Type, inner, h.options()...) + require.NoError(t, err) + + id := newUUID(t) + + _, err = repo.Get(t.Context(), id) + require.ErrorIs(t, err, aggregate.ErrRootNotFound) + + spans := h.endedSpans() + require.Len(t, spans, 1) + + span := spans[0] + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, aggregate.ErrRootNotFound.Error(), span.Status().Description) + assert.True(t, hasExceptionEvent(span), "span should have recorded an exception event") + + // aggregate.version should NOT be present on the span when Get fails. + for _, attr := range span.Attributes() { + assert.NotEqual(t, otelex.AggregateVersionAttribute, attr.Key, + "aggregate.version should not be set on the span when Get fails") + } + + sm := h.collectScopeMetrics(t) + m := findMetric(t, &sm, otelex.RepositoryGetDurationMetricName) + + data, ok := m.Data.(metricdata.Histogram[int64]) + require.True(t, ok) + require.Len(t, data.DataPoints, 1) + + val, ok := data.DataPoints[0].Attributes.Value(otelex.ErrorAttribute) + require.True(t, ok) + assert.True(t, val.AsBool(), "error attribute should be true on failed Get") +} + +func TestGet_SpanIsChildOfParent(t *testing.T) { + h := newHarness(t) + + inner := newUserRepository() + + id := newUUID(t) + usr := newTestUser(t, id) + require.NoError(t, inner.Save(t.Context(), usr)) + + repo, err := otelex.NewInstrumentedRepository(user.Type, inner, h.options()...) + require.NoError(t, err) + + ctx, parent := h.tracer.Tracer("test").Start(t.Context(), "parent") + + _, err = repo.Get(ctx, id) + require.NoError(t, err) + + parent.End() + + spans := h.endedSpans() + require.Len(t, spans, 2) + + getSpan, parentSpan := spans[0], spans[1] + if getSpan.Name() != otelex.RepositoryGetSpanName { + getSpan, parentSpan = spans[1], spans[0] + } + + assert.Equal(t, otelex.RepositoryGetSpanName, getSpan.Name()) + assert.Equal(t, "parent", parentSpan.Name()) + assert.Equal(t, parentSpan.SpanContext().SpanID(), getSpan.Parent().SpanID()) +} + +func TestSave_DelegatesAndPersistsRoot(t *testing.T) { + h := newHarness(t) + + inner := newUserRepository() + + repo, err := otelex.NewInstrumentedRepository(user.Type, inner, h.options()...) + require.NoError(t, err) + + id := newUUID(t) + usr := newTestUser(t, id) + + require.NoError(t, repo.Save(t.Context(), usr)) + + got, err := inner.Get(t.Context(), id) + require.NoError(t, err) + assert.Equal(t, usr.AggregateID(), got.AggregateID()) +} + +func TestSave_RecordsSpan(t *testing.T) { + h := newHarness(t) + + inner := newUserRepository() + + repo, err := otelex.NewInstrumentedRepository(user.Type, inner, h.options()...) + require.NoError(t, err) + + id := newUUID(t) + usr := newTestUser(t, id) + + require.NoError(t, repo.Save(t.Context(), usr)) + + spans := h.endedSpans() + require.Len(t, spans, 1) + + span := spans[0] + assert.Equal(t, otelex.RepositorySaveSpanName, span.Name()) + assert.Equal(t, codes.Unset, span.Status().Code) + assert.Contains(t, span.Attributes(), otelex.AggregateTypeAttribute.String(user.Type.Name)) + assert.Contains(t, span.Attributes(), otelex.AggregateIDAttribute.String(id.String())) + assert.Contains(t, span.Attributes(), otelex.AggregateVersionAttribute.Int64(int64(usr.Version()))) +} + +func TestSave_RecordsHistogram(t *testing.T) { + h := newHarness(t) + + inner := newUserRepository() + + repo, err := otelex.NewInstrumentedRepository(user.Type, inner, h.options()...) + require.NoError(t, err) + + id := newUUID(t) + usr := newTestUser(t, id) + require.NoError(t, repo.Save(t.Context(), usr)) + + sm := h.collectScopeMetrics(t) + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: otelex.InstrumentationName}, + Metrics: []metricdata.Metrics{ + { + Name: otelex.RepositorySaveDurationMetricName, + Description: otelex.RepositorySaveDurationMetricDescription, + Unit: otelex.MetricUnitMilliseconds, + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + otelex.AggregateTypeAttribute.String(user.Type.Name), + otelex.ErrorAttribute.Bool(false), + ), + }, + }, + }, + }, + }, + } + + got := metricdata.ScopeMetrics{ + Scope: sm.Scope, + Metrics: []metricdata.Metrics{findMetric(t, &sm, otelex.RepositorySaveDurationMetricName)}, + } + + metricdatatest.AssertEqual(t, want, got, + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue(), + metricdatatest.IgnoreExemplars(), + ) +} + +func TestSave_Error_RecordsErrorOnSpanAndMetric(t *testing.T) { + h := newHarness(t) + + wantErr := errors.New("save failed") + inner := &errorRepository[uuid.UUID, *user.User]{saveErr: wantErr} + + repo, err := otelex.NewInstrumentedRepository(user.Type, inner, h.options()...) + require.NoError(t, err) + + id := newUUID(t) + usr := newTestUser(t, id) + + err = repo.Save(t.Context(), usr) + require.ErrorIs(t, err, wantErr) + + spans := h.endedSpans() + require.Len(t, spans, 1) + + span := spans[0] + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, wantErr.Error(), span.Status().Description) + assert.True(t, hasExceptionEvent(span)) + + sm := h.collectScopeMetrics(t) + m := findMetric(t, &sm, otelex.RepositorySaveDurationMetricName) + + data, ok := m.Data.(metricdata.Histogram[int64]) + require.True(t, ok) + require.Len(t, data.DataPoints, 1) + + val, ok := data.DataPoints[0].Attributes.Value(otelex.ErrorAttribute) + require.True(t, ok) + assert.True(t, val.AsBool()) +} + +func TestSave_SpanIsChildOfParent(t *testing.T) { + h := newHarness(t) + + inner := newUserRepository() + + repo, err := otelex.NewInstrumentedRepository(user.Type, inner, h.options()...) + require.NoError(t, err) + + id := newUUID(t) + usr := newTestUser(t, id) + + ctx, parent := h.tracer.Tracer("test").Start(t.Context(), "parent") + + require.NoError(t, repo.Save(ctx, usr)) + + parent.End() + + spans := h.endedSpans() + require.Len(t, spans, 2) + + saveSpan, parentSpan := spans[0], spans[1] + if saveSpan.Name() != otelex.RepositorySaveSpanName { + saveSpan, parentSpan = spans[1], spans[0] + } + + assert.Equal(t, otelex.RepositorySaveSpanName, saveSpan.Name()) + assert.Equal(t, "parent", parentSpan.Name()) + assert.Equal(t, parentSpan.SpanContext().SpanID(), saveSpan.Parent().SpanID()) +} + +func TestRepository_AttributeSlicesDoNotAlias(t *testing.T) { + h := newHarness(t) + + inner := newUserRepository() + + id1 := newUUID(t) + id2 := newUUID(t) + + usr1, err := user.Create(id1, "Jane", "Doe", "jane@doe.com", testBirthDate, time.Now()) + require.NoError(t, err) + require.NoError(t, inner.Save(t.Context(), usr1)) + + usr2, err := user.Create(id2, "John", "Doe", "john@doe.com", testBirthDate, time.Now()) + require.NoError(t, err) + require.NoError(t, inner.Save(t.Context(), usr2)) + + repo, err := otelex.NewInstrumentedRepository(user.Type, inner, h.options()...) + require.NoError(t, err) + + _, err = repo.Get(t.Context(), id1) + require.NoError(t, err) + + _, err = repo.Get(t.Context(), id2) + require.NoError(t, err) + + spans := h.endedSpans() + require.Len(t, spans, 2) + + assert.Contains(t, spans[0].Attributes(), otelex.AggregateIDAttribute.String(id1.String())) + assert.NotContains(t, spans[0].Attributes(), otelex.AggregateIDAttribute.String(id2.String())) + + assert.Contains(t, spans[1].Attributes(), otelex.AggregateIDAttribute.String(id2.String())) + assert.NotContains(t, spans[1].Attributes(), otelex.AggregateIDAttribute.String(id1.String())) +}