diff --git a/packages/common_client/lib/src/data_sources/fdv2/cache_initializer.dart b/packages/common_client/lib/src/data_sources/fdv2/cache_initializer.dart new file mode 100644 index 0000000..2b60c42 --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/cache_initializer.dart @@ -0,0 +1,109 @@ +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; + +import 'flag_eval_mapper.dart'; +import 'payload.dart'; +import 'selector.dart'; +import 'source.dart'; +import 'source_result.dart'; + +/// The shape of a cache hit: parsed evaluation results plus the +/// environment ID that was current when the cache was written. +typedef CachedFlags = ({ + Map flags, + String? environmentId, +}); + +/// Reads cached flag state for [context] from persistence. Returns +/// null on a cache miss, an unreadable entry, or a parse failure. +typedef CachedFlagsReader = Future Function(LDContext context); + +/// One-shot initializer that brings the SDK up from its persistence +/// cache. The cache is read once; retries are not meaningful for a +/// local read. +/// +/// On cache hit, emits a [ChangeSetResult] with `persist: false` (the +/// data came from the cache; writing it back is a no-op) and an empty +/// selector (the cache does not track server-side selector state). +/// The payload type is [PayloadType.full]: a cache load is a complete +/// snapshot, not a delta. +/// +/// On cache miss, emits a [ChangeSetResult] with [PayloadType.none] so +/// the initializer chain advances rather than terminating. The cache +/// is best-effort, not a source of truth. +final class CacheInitializer implements Initializer { + final CachedFlagsReader _reader; + final LDContext _context; + final LDLogger _logger; + final DateTime Function() _now; + + bool _closed = false; + + CacheInitializer({ + required CachedFlagsReader reader, + required LDContext context, + required LDLogger logger, + DateTime Function()? now, + }) : _reader = reader, + _context = context, + _logger = logger.subLogger('CacheInitializer'), + _now = now ?? DateTime.now; + + @override + Future run() async { + if (_closed) return _shutdown(); + + final CachedFlags? cached; + try { + cached = await _reader(_context); + } catch (err) { + _logger.warn('Cache read failed (${err.runtimeType}); ' + 'treating as miss'); + return _miss(); + } + + if (_closed) return _shutdown(); + + if (cached == null) { + return _miss(); + } + + final updates = []; + cached.flags.forEach((key, evalResult) { + updates.add(Update( + kind: flagEvalKind, + key: key, + version: evalResult.version, + object: LDEvaluationResultSerialization.toJson(evalResult), + )); + }); + + return ChangeSetResult( + payload: Payload( + type: PayloadType.full, + selector: Selector.empty, + updates: updates, + ), + environmentId: cached.environmentId, + freshness: _now(), + persist: false, + ); + } + + @override + void close() { + _closed = true; + } + + ChangeSetResult _miss() => ChangeSetResult( + payload: const Payload( + type: PayloadType.none, + updates: [], + ), + freshness: _now(), + persist: false, + ); + + StatusResult _shutdown() => FDv2SourceResults.shutdown( + message: 'Cache initializer closed before completion', + ); +} diff --git a/packages/common_client/lib/src/data_sources/fdv2/calculate_poll_delay.dart b/packages/common_client/lib/src/data_sources/fdv2/calculate_poll_delay.dart new file mode 100644 index 0000000..7729091 --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/calculate_poll_delay.dart @@ -0,0 +1,28 @@ +/// Computes how long to wait before the next poll, given when the SDK +/// last received a fresh response and the configured polling interval. +/// +/// Returns the time remaining in the interval relative to [freshness]. +/// If [freshness] is null (no successful poll yet) or older than the +/// interval (we're overdue), returns either the full interval or zero +/// respectively. +/// +/// Caps the returned delay at [interval] so a freshness timestamp from +/// the future (clock skew, manually adjusted system time) cannot push +/// the next poll arbitrarily far out. +Duration calculatePollDelay({ + required DateTime now, + required Duration interval, + DateTime? freshness, +}) { + if (freshness == null) { + return interval; + } + final elapsed = now.difference(freshness); + if (elapsed.isNegative) { + return interval; + } + if (elapsed >= interval) { + return Duration.zero; + } + return interval - elapsed; +} diff --git a/packages/common_client/lib/src/data_sources/fdv2/polling_initializer.dart b/packages/common_client/lib/src/data_sources/fdv2/polling_initializer.dart new file mode 100644 index 0000000..ecca23e --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/polling_initializer.dart @@ -0,0 +1,110 @@ +import 'dart:async'; + +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; + +import 'selector.dart'; +import 'source.dart'; +import 'source_result.dart'; + +/// A single FDv2 poll. The orchestrator wires this to +/// [FDv2PollingBase.pollOnce]; tests inject scripted functions. +typedef PollFunction = Future Function({Selector basis}); + +/// A function that delays for the given duration. +typedef DelayFunction = Future Function(Duration duration); + +Future _defaultDelay(Duration duration) => Future.delayed(duration); + +/// One-shot polling initializer. +/// +/// Calls the injected [PollFunction] up to [_maxAttempts] times. Treats: +/// +/// - [ChangeSetResult] as success — returned immediately. +/// - [SourceState.terminalError], [SourceState.goodbye], and +/// [SourceState.shutdown] as terminal — returned immediately without +/// further retries. +/// - [SourceState.interrupted] as transient — retried after +/// [_retryDelay] up to the attempt limit. After the limit, the last +/// interrupted result is converted into a [SourceState.terminalError] +/// so the orchestrator stops retrying at this layer of the chain. +/// +/// Calling [close] before [run] completes signals an abort: any pending +/// retry delay returns immediately and [run] resolves to a +/// [SourceState.shutdown] result. +final class FDv2PollingInitializer implements Initializer { + static const int _maxAttempts = 3; + static const Duration _retryDelay = Duration(seconds: 1); + + final PollFunction _poll; + final SelectorGetter _selectorGetter; + final LDLogger _logger; + final DelayFunction _delay; + final Completer _closedSignal = Completer(); + + FDv2PollingInitializer({ + required PollFunction poll, + required SelectorGetter selectorGetter, + required LDLogger logger, + DelayFunction? delay, + }) : _poll = poll, + _selectorGetter = selectorGetter, + _logger = logger.subLogger('FDv2PollingInitializer'), + _delay = delay ?? _defaultDelay; + + @override + Future run() async { + StatusResult? lastInterrupted; + + for (var attempt = 1; attempt <= _maxAttempts; attempt++) { + if (_closedSignal.isCompleted) { + return _shutdownResult(); + } + + final result = await _poll(basis: _selectorGetter()); + + if (_closedSignal.isCompleted) { + return _shutdownResult(); + } + + switch (result) { + case ChangeSetResult(): + return result; + case StatusResult(state: SourceState.interrupted): + lastInterrupted = result; + _logger.warn( + 'Polling initializer attempt $attempt/$_maxAttempts interrupted: ' + '${result.message}'); + if (attempt < _maxAttempts) { + await _waitForRetry(); + } + case StatusResult(): + // terminalError, goodbye, or shutdown -- pass through. + return result; + } + } + + // All attempts produced interrupted. Escalate so the orchestrator + // can fall through to the next source rather than retry forever. + return FDv2SourceResults.terminalError( + message: 'Polling initializer exhausted $_maxAttempts attempts; ' + 'last error: ${lastInterrupted?.message}', + statusCode: lastInterrupted?.statusCode, + fdv1Fallback: lastInterrupted?.fdv1Fallback ?? false, + ); + } + + @override + void close() { + if (!_closedSignal.isCompleted) { + _closedSignal.complete(); + } + } + + Future _waitForRetry() async { + await Future.any([_delay(_retryDelay), _closedSignal.future]); + } + + StatusResult _shutdownResult() => FDv2SourceResults.shutdown( + message: 'Polling initializer closed before completion', + ); +} diff --git a/packages/common_client/lib/src/data_sources/fdv2/polling_synchronizer.dart b/packages/common_client/lib/src/data_sources/fdv2/polling_synchronizer.dart new file mode 100644 index 0000000..137b717 --- /dev/null +++ b/packages/common_client/lib/src/data_sources/fdv2/polling_synchronizer.dart @@ -0,0 +1,130 @@ +import 'dart:async'; + +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; + +import 'calculate_poll_delay.dart'; +import 'polling_initializer.dart' show PollFunction; +import 'source.dart'; +import 'source_result.dart'; + +/// Constructs a [Timer] that fires once after [duration] and invokes +/// [callback]. Tests inject a fake to control time. +typedef TimerFactory = Timer Function( + Duration duration, void Function() callback); + +Timer _defaultTimerFactory(Duration duration, void Function() callback) => + Timer(duration, callback); + +/// Long-lived polling synchronizer. +/// +/// Exposes a single-subscription [Stream] of [FDv2SourceResult]s. On +/// subscription, polls immediately and then schedules the next poll +/// using [calculatePollDelay] over the freshness of the most recent +/// successful result. Cancelling the subscription (or calling [close]) +/// stops the timer and closes the stream. +/// +/// Each emission carries whatever the underlying poll returned, including +/// transient interrupted statuses. The orchestrator decides how to react. +final class FDv2PollingSynchronizer implements Synchronizer { + final PollFunction _poll; + final SelectorGetter _selectorGetter; + final Duration _interval; + final TimerFactory _timerFactory; + final DateTime Function() _now; + final LDLogger _logger; + + late final StreamController _controller; + Timer? _timer; + bool _closed = false; + DateTime? _lastFreshness; + + FDv2PollingSynchronizer({ + required PollFunction poll, + required SelectorGetter selectorGetter, + required Duration interval, + required LDLogger logger, + TimerFactory? timerFactory, + DateTime Function()? now, + }) : _poll = poll, + _selectorGetter = selectorGetter, + _interval = interval, + _timerFactory = timerFactory ?? _defaultTimerFactory, + _now = now ?? DateTime.now, + _logger = logger.subLogger('FDv2PollingSynchronizer') { + _controller = StreamController( + onListen: _onListen, + onCancel: _onCancel, + ); + } + + @override + Stream get results => _controller.stream; + + @override + void close() { + if (_closed) return; + _closed = true; + _timer?.cancel(); + _timer = null; + if (!_controller.isClosed) { + _controller.add( + FDv2SourceResults.shutdown(message: 'Polling synchronizer closed')); + _controller.close(); + } + } + + void _onListen() { + // Kick off the first poll immediately. Subsequent polls are + // scheduled from inside _doPoll via the timer. + Future.microtask(_doPoll); + } + + Future _onCancel() async { + if (_closed) return; + _closed = true; + _timer?.cancel(); + _timer = null; + // Don't emit shutdown -- the subscriber asked for cancellation. + } + + Future _doPoll() async { + if (_closed) return; + final FDv2SourceResult result; + try { + result = await _poll(basis: _selectorGetter()); + } catch (err) { + // PollFunction is the FDv2PollingBase.pollOnce contract, which + // already converts errors to StatusResult. A throw here means + // someone wired a non-conforming function; treat defensively. + _logger.error('Poll function threw unexpectedly: ${err.runtimeType}'); + if (!_closed && !_controller.isClosed) { + _controller.add(FDv2SourceResults.interrupted( + message: 'Polling source raised unexpectedly')); + } + _scheduleNext(); + return; + } + + if (_closed) return; + + if (result is ChangeSetResult && result.freshness != null) { + _lastFreshness = result.freshness; + } + + if (!_controller.isClosed) { + _controller.add(result); + } + + _scheduleNext(); + } + + void _scheduleNext() { + if (_closed) return; + final delay = calculatePollDelay( + now: _now(), + interval: _interval, + freshness: _lastFreshness, + ); + _timer = _timerFactory(delay, _doPoll); + } +} diff --git a/packages/common_client/test/data_sources/fdv2/cache_initializer_test.dart b/packages/common_client/test/data_sources/fdv2/cache_initializer_test.dart new file mode 100644 index 0000000..d900651 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/cache_initializer_test.dart @@ -0,0 +1,151 @@ +import 'package:launchdarkly_common_client/src/data_sources/fdv2/cache_initializer.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/payload.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:test/test.dart'; + +LDEvaluationResult _evalResult({int version = 1, bool value = true}) => + LDEvaluationResult( + version: version, + detail: LDEvaluationDetail( + value ? LDValue.ofBool(true) : LDValue.ofBool(false), 0, null), + ); + +LDContext _ctx() => LDContextBuilder().kind('user', 'alice').build(); + +CachedFlagsReader _staticReader(CachedFlags? value) => + (LDContext _) async => value; + +CachedFlagsReader _throwingReader(Object error) => (LDContext _) async { + throw error; + }; + +void main() { + final logger = LDLogger(level: LDLogLevel.error); + + test('cache hit emits a full ChangeSetResult with updates and persist=false', + () async { + final init = CacheInitializer( + reader: _staticReader(( + flags: { + 'flag-a': _evalResult(version: 7), + 'flag-b': _evalResult(version: 9, value: false), + }, + environmentId: 'env-xyz', + )), + context: _ctx(), + logger: logger, + ); + + final result = await init.run(); + + expect(result, isA()); + final cs = result as ChangeSetResult; + expect(cs.persist, isFalse); + expect(cs.payload.type, equals(PayloadType.full)); + expect(cs.payload.selector.isEmpty, isTrue); + expect(cs.environmentId, equals('env-xyz')); + expect(cs.payload.updates, hasLength(2)); + + final byKey = {for (final u in cs.payload.updates) u.key: u}; + expect(byKey['flag-a']?.version, equals(7)); + expect(byKey['flag-a']?.kind, equals('flag-eval')); + expect(byKey['flag-a']?.deleted, isFalse); + expect(byKey['flag-a']?.object, isNotNull); + expect(byKey['flag-b']?.version, equals(9)); + }); + + test('cache hit Updates round-trip through LDEvaluationResultSerialization', + () async { + // Confirms the cache initializer writes the same JSON shape that the + // protocol handler's flag_eval mapper expects on the read side. + final original = _evalResult(version: 42); + final init = CacheInitializer( + reader: _staticReader((flags: {'k': original}, environmentId: null)), + context: _ctx(), + logger: logger, + ); + + final result = await init.run(); + final cs = result as ChangeSetResult; + final update = cs.payload.updates.single; + + final reconstructed = + LDEvaluationResultSerialization.fromJson(update.object!); + expect(reconstructed, equals(original)); + }); + + test('cache miss emits a none-type ChangeSetResult so the chain advances', + () async { + final init = CacheInitializer( + reader: _staticReader(null), + context: _ctx(), + logger: logger, + ); + + final result = await init.run(); + + expect(result, isA()); + final cs = result as ChangeSetResult; + expect(cs.payload.type, equals(PayloadType.none)); + expect(cs.payload.updates, isEmpty); + expect(cs.persist, isFalse); + expect(cs.environmentId, isNull); + }); + + test('reader throws is treated as a cache miss (does not propagate)', + () async { + final init = CacheInitializer( + reader: _throwingReader(StateError('persistence corrupt')), + context: _ctx(), + logger: logger, + ); + + final result = await init.run(); + + expect(result, isA()); + expect((result as ChangeSetResult).payload.type, equals(PayloadType.none)); + }); + + test('close before run returns shutdown without invoking reader', () async { + var readerCalled = false; + final init = CacheInitializer( + reader: (LDContext _) async { + readerCalled = true; + return null; + }, + context: _ctx(), + logger: logger, + ); + init.close(); + + final result = await init.run(); + + expect((result as StatusResult).state, equals(SourceState.shutdown)); + expect(readerCalled, isFalse); + }); + + test('close is idempotent', () { + final init = CacheInitializer( + reader: _staticReader(null), + context: _ctx(), + logger: logger, + ); + init.close(); + expect(() => init.close(), returnsNormally); + }); + + test('freshness is set from the now function', () async { + final fixedNow = DateTime.utc(2026, 4, 28, 15, 30); + final init = CacheInitializer( + reader: _staticReader( + (flags: {}, environmentId: null)), + context: _ctx(), + logger: logger, + now: () => fixedNow, + ); + + final result = await init.run(); + expect((result as ChangeSetResult).freshness, equals(fixedNow)); + }); +} diff --git a/packages/common_client/test/data_sources/fdv2/calculate_poll_delay_test.dart b/packages/common_client/test/data_sources/fdv2/calculate_poll_delay_test.dart new file mode 100644 index 0000000..9ffbf50 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/calculate_poll_delay_test.dart @@ -0,0 +1,65 @@ +import 'package:launchdarkly_common_client/src/data_sources/fdv2/calculate_poll_delay.dart'; +import 'package:test/test.dart'; + +void main() { + final t0 = DateTime.utc(2026, 1, 1, 12, 0, 0); + const interval = Duration(seconds: 30); + + test('null freshness returns the full interval', () { + expect( + calculatePollDelay(now: t0, interval: interval, freshness: null), + equals(interval), + ); + }); + + test('freshness equal to now returns the full interval', () { + expect( + calculatePollDelay(now: t0, interval: interval, freshness: t0), + equals(interval), + ); + }); + + test('freshness within the interval returns the time remaining', () { + expect( + calculatePollDelay( + now: t0, + interval: interval, + freshness: t0.subtract(const Duration(seconds: 10)), + ), + equals(const Duration(seconds: 20)), + ); + }); + + test('freshness exactly one interval ago returns zero', () { + expect( + calculatePollDelay( + now: t0, + interval: interval, + freshness: t0.subtract(interval), + ), + equals(Duration.zero), + ); + }); + + test('freshness older than the interval returns zero', () { + expect( + calculatePollDelay( + now: t0, + interval: interval, + freshness: t0.subtract(const Duration(minutes: 5)), + ), + equals(Duration.zero), + ); + }); + + test('freshness in the future is clamped to the full interval', () { + expect( + calculatePollDelay( + now: t0, + interval: interval, + freshness: t0.add(const Duration(minutes: 5)), + ), + equals(interval), + ); + }); +} diff --git a/packages/common_client/test/data_sources/fdv2/polling_initializer_test.dart b/packages/common_client/test/data_sources/fdv2/polling_initializer_test.dart new file mode 100644 index 0000000..7e8990a --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/polling_initializer_test.dart @@ -0,0 +1,231 @@ +import 'dart:async'; + +import 'package:launchdarkly_common_client/src/data_sources/fdv2/payload.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/polling_initializer.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/selector.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:test/test.dart'; + +/// Builds a [PollFunction] that returns scripted results in order. Each +/// call shifts the head off the list. The function records the basis it +/// was invoked with for assertions. +class ScriptedPoll { + final List _results; + final List _basisSeen = []; + + ScriptedPoll(List results) : _results = List.of(results); + + List get basisSeen => List.unmodifiable(_basisSeen); + + Future call({Selector basis = Selector.empty}) async { + _basisSeen.add(basis); + if (_results.isEmpty) { + throw StateError('ScriptedPoll exhausted'); + } + return _results.removeAt(0); + } +} + +/// A delay function that does nothing (instant). Tests that need to +/// observe close-during-delay behavior use [HoldingDelay] instead. +Future instantDelay(Duration _) async {} + +/// A delay function whose returned future never completes on its own. +/// Used to model "still waiting for retry" so the test can call close() +/// and observe the early termination. +class HoldingDelay { + final _completer = Completer(); + + Future call(Duration duration) => _completer.future; +} + +ChangeSetResult _changeSet({String flagKey = 'k', int version = 1}) => + ChangeSetResult( + payload: Payload( + type: PayloadType.full, + updates: [ + Update( + kind: 'flag-eval', + key: flagKey, + version: version, + object: const {}) + ], + ), + persist: true, + freshness: DateTime.utc(2026, 1, 1), + ); + +void main() { + final logger = LDLogger(level: LDLogLevel.error); + + test('first poll succeeds returns ChangeSetResult immediately', () async { + final poll = ScriptedPoll([_changeSet()]); + final init = FDv2PollingInitializer( + poll: poll.call, + selectorGetter: () => Selector.empty, + logger: logger, + delay: instantDelay, + ); + + final result = await init.run(); + + expect(result, isA()); + expect(poll.basisSeen, hasLength(1)); + }); + + test('terminal status (terminalError) returns immediately, no retry', + () async { + final poll = ScriptedPoll([ + FDv2SourceResults.terminalError(message: 'forbidden', statusCode: 403) + ]); + final init = FDv2PollingInitializer( + poll: poll.call, + selectorGetter: () => Selector.empty, + logger: logger, + delay: instantDelay, + ); + + final result = await init.run(); + + expect((result as StatusResult).state, equals(SourceState.terminalError)); + expect(poll.basisSeen, hasLength(1)); + }); + + test('goodbye returns immediately, no retry', () async { + final poll = + ScriptedPoll([FDv2SourceResults.goodbyeResult(message: 'maintenance')]); + final init = FDv2PollingInitializer( + poll: poll.call, + selectorGetter: () => Selector.empty, + logger: logger, + delay: instantDelay, + ); + + final result = await init.run(); + + expect((result as StatusResult).state, equals(SourceState.goodbye)); + expect(poll.basisSeen, hasLength(1)); + }); + + test('retries on interrupted up to 3 attempts then succeeds', () async { + final poll = ScriptedPoll([ + FDv2SourceResults.interrupted(message: 'transient'), + FDv2SourceResults.interrupted(message: 'transient'), + _changeSet(flagKey: 'after-retries'), + ]); + final init = FDv2PollingInitializer( + poll: poll.call, + selectorGetter: () => Selector.empty, + logger: logger, + delay: instantDelay, + ); + + final result = await init.run(); + + expect(result, isA()); + expect(poll.basisSeen, hasLength(3)); + }); + + test( + 'all 3 attempts interrupted converts to terminalError carrying the ' + 'last context', () async { + final poll = ScriptedPoll([ + FDv2SourceResults.interrupted(message: 'first', statusCode: 503), + FDv2SourceResults.interrupted(message: 'second', statusCode: 503), + FDv2SourceResults.interrupted( + message: 'third', statusCode: 503, fdv1Fallback: true), + ]); + final init = FDv2PollingInitializer( + poll: poll.call, + selectorGetter: () => Selector.empty, + logger: logger, + delay: instantDelay, + ); + + final result = await init.run(); + + final status = result as StatusResult; + expect(status.state, equals(SourceState.terminalError)); + expect(status.statusCode, equals(503)); + expect(status.fdv1Fallback, isTrue); + expect(status.message, contains('third')); + expect(poll.basisSeen, hasLength(3)); + }); + + test('selector is read lazily before each poll', () async { + var calls = 0; + final selectors = [ + Selector.empty, + Selector(state: 'sel-after-first', version: 1), + Selector(state: 'sel-after-second', version: 2), + ]; + final poll = ScriptedPoll([ + FDv2SourceResults.interrupted(message: 'a'), + FDv2SourceResults.interrupted(message: 'b'), + _changeSet(), + ]); + final init = FDv2PollingInitializer( + poll: poll.call, + selectorGetter: () => selectors[calls++], + logger: logger, + delay: instantDelay, + ); + + await init.run(); + + expect(poll.basisSeen[0].isEmpty, isTrue); + expect(poll.basisSeen[1].state, equals('sel-after-first')); + expect(poll.basisSeen[2].state, equals('sel-after-second')); + }); + + test('close during retry delay returns shutdown status', () async { + final holdingDelay = HoldingDelay(); + final poll = ScriptedPoll([ + FDv2SourceResults.interrupted(message: 'wait'), + _changeSet(), // never reached + ]); + final init = FDv2PollingInitializer( + poll: poll.call, + selectorGetter: () => Selector.empty, + logger: logger, + delay: holdingDelay.call, + ); + + final runFuture = init.run(); + // Yield so the initializer reaches the retry delay. + await Future.delayed(Duration.zero); + init.close(); + + final result = await runFuture; + expect((result as StatusResult).state, equals(SourceState.shutdown)); + // Only the first poll ran; the retry was aborted. + expect(poll.basisSeen, hasLength(1)); + }); + + test('close before run returns shutdown without polling', () async { + final poll = ScriptedPoll([_changeSet()]); + final init = FDv2PollingInitializer( + poll: poll.call, + selectorGetter: () => Selector.empty, + logger: logger, + delay: instantDelay, + ); + init.close(); + + final result = await init.run(); + expect((result as StatusResult).state, equals(SourceState.shutdown)); + expect(poll.basisSeen, isEmpty); + }); + + test('close is idempotent', () { + final init = FDv2PollingInitializer( + poll: ScriptedPoll([_changeSet()]).call, + selectorGetter: () => Selector.empty, + logger: logger, + delay: instantDelay, + ); + init.close(); + expect(() => init.close(), returnsNormally); + }); +} diff --git a/packages/common_client/test/data_sources/fdv2/polling_synchronizer_test.dart b/packages/common_client/test/data_sources/fdv2/polling_synchronizer_test.dart new file mode 100644 index 0000000..7db66c7 --- /dev/null +++ b/packages/common_client/test/data_sources/fdv2/polling_synchronizer_test.dart @@ -0,0 +1,324 @@ +import 'dart:async'; + +import 'package:launchdarkly_common_client/src/data_sources/fdv2/payload.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/polling_synchronizer.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/selector.dart'; +import 'package:launchdarkly_common_client/src/data_sources/fdv2/source_result.dart'; +import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; +import 'package:test/test.dart'; + +class ScriptedPoll { + final List _results; + final List basisSeen = []; + + ScriptedPoll(List results) : _results = List.of(results); + + int get callCount => basisSeen.length; + + Future call({Selector basis = Selector.empty}) async { + basisSeen.add(basis); + if (_results.isEmpty) { + // Don't throw -- a test may not predict exactly how many polls + // fire before tearing down. Emit a benign interrupted instead. + return FDv2SourceResults.interrupted(message: 'no more scripted'); + } + return _results.removeAt(0); + } +} + +/// A timer factory that records each requested delay and lets the test +/// trigger the callback on demand. Cancellation marks the timer +/// inactive but the request stays in the history list, so tests can +/// still inspect what was originally scheduled. +class FakeTimerFactory { + final List timers = []; + + Timer call(Duration duration, void Function() callback) { + final t = FakeTimer(duration, callback); + timers.add(t); + return t; + } + + /// Fires the most recently scheduled active timer. + void fireLatest() { + for (var i = timers.length - 1; i >= 0; i--) { + if (timers[i].isActive) { + timers[i].fire(); + return; + } + } + fail('no active timer to fire'); + } + + /// The duration of the most recent scheduling request, regardless of + /// whether it has since been cancelled. + Duration? get latestRequestedDelay => + timers.isEmpty ? null : timers.last.duration; +} + +class FakeTimer implements Timer { + final Duration duration; + final void Function() _callback; + bool _cancelled = false; + bool _fired = false; + + FakeTimer(this.duration, this._callback); + + void fire() { + if (_cancelled || _fired) return; + _fired = true; + _callback(); + } + + @override + void cancel() { + _cancelled = true; + } + + @override + bool get isActive => !_cancelled && !_fired; + + @override + int get tick => 0; +} + +ChangeSetResult _changeSet({DateTime? freshness, String? selectorState}) => + ChangeSetResult( + payload: Payload( + type: PayloadType.full, + selector: selectorState != null + ? Selector(state: selectorState, version: 1) + : Selector.empty, + updates: const [], + ), + persist: true, + freshness: freshness ?? DateTime.utc(2026, 1, 1), + ); + +void main() { + final logger = LDLogger(level: LDLogLevel.error); + const interval = Duration(seconds: 30); + + test('polls immediately on subscribe and emits the result', () async { + final poll = ScriptedPoll([_changeSet()]); + final timerFactory = FakeTimerFactory(); + final sync = FDv2PollingSynchronizer( + poll: poll.call, + selectorGetter: () => Selector.empty, + interval: interval, + logger: logger, + timerFactory: timerFactory.call, + ); + + final emissions = []; + final sub = sync.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(1)); + expect(emissions[0], isA()); + expect(poll.callCount, equals(1)); + + await sub.cancel(); + sync.close(); + }); + + test('schedules the next poll using the freshness of the last result', + () async { + final freshness = DateTime.utc(2026, 1, 1, 12, 0, 0); + final now = DateTime.utc(2026, 1, 1, 12, 0, 10); // 10s after freshness + final poll = ScriptedPoll([_changeSet(freshness: freshness)]); + final timerFactory = FakeTimerFactory(); + + final sync = FDv2PollingSynchronizer( + poll: poll.call, + selectorGetter: () => Selector.empty, + interval: interval, + logger: logger, + timerFactory: timerFactory.call, + now: () => now, + ); + + final sub = sync.results.listen((_) {}); + await Future.delayed(Duration.zero); + + expect( + timerFactory.latestRequestedDelay, equals(const Duration(seconds: 20))); + + await sub.cancel(); + sync.close(); + }); + + test('subsequent timer fire triggers another poll and another emission', + () async { + final poll = ScriptedPoll([ + _changeSet(selectorState: 'first'), + _changeSet(selectorState: 'second'), + ]); + final timerFactory = FakeTimerFactory(); + + final sync = FDv2PollingSynchronizer( + poll: poll.call, + selectorGetter: () => Selector.empty, + interval: interval, + logger: logger, + timerFactory: timerFactory.call, + ); + + final emissions = []; + final sub = sync.results.listen(emissions.add); + + await Future.delayed(Duration.zero); + expect(emissions, hasLength(1)); + + timerFactory.fireLatest(); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(2)); + expect( + (emissions[1] as ChangeSetResult).payload.selector.state, + equals('second'), + ); + + await sub.cancel(); + sync.close(); + }); + + test('interrupted result is emitted but does not advance freshness', + () async { + final freshness = DateTime.utc(2026, 1, 1, 12, 0, 0); + var nowCallCount = 0; + final nows = [ + DateTime.utc(2026, 1, 1, 12, 0, 5), // after first poll + DateTime.utc(2026, 1, 1, 12, 0, 5), // after second poll (interrupted) + ]; + final poll = ScriptedPoll([ + _changeSet(freshness: freshness), + FDv2SourceResults.interrupted(message: 'transient'), + ]); + final timerFactory = FakeTimerFactory(); + + final sync = FDv2PollingSynchronizer( + poll: poll.call, + selectorGetter: () => Selector.empty, + interval: interval, + logger: logger, + timerFactory: timerFactory.call, + now: () => nows[nowCallCount++], + ); + + final emissions = []; + final sub = sync.results.listen(emissions.add); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(1)); + expect( + timerFactory.latestRequestedDelay, equals(const Duration(seconds: 25))); + + timerFactory.fireLatest(); + await Future.delayed(Duration.zero); + + expect(emissions, hasLength(2)); + expect(emissions[1], isA()); + // Freshness was NOT updated by the interrupted result -- the next + // delay is still computed against the original freshness. + expect( + timerFactory.latestRequestedDelay, equals(const Duration(seconds: 25))); + + await sub.cancel(); + sync.close(); + }); + + test('selector is read lazily before each poll', () async { + final selectors = [ + Selector.empty, + Selector(state: 'second-basis', version: 1), + ]; + var idx = 0; + final poll = ScriptedPoll([_changeSet(), _changeSet()]); + final timerFactory = FakeTimerFactory(); + + final sync = FDv2PollingSynchronizer( + poll: poll.call, + selectorGetter: () => selectors[idx++], + interval: interval, + logger: logger, + timerFactory: timerFactory.call, + ); + + final sub = sync.results.listen((_) {}); + await Future.delayed(Duration.zero); + timerFactory.fireLatest(); + await Future.delayed(Duration.zero); + + expect(poll.basisSeen[0].isEmpty, isTrue); + expect(poll.basisSeen[1].state, equals('second-basis')); + + await sub.cancel(); + sync.close(); + }); + + test('close cancels the pending timer and emits shutdown then closes', + () async { + final poll = ScriptedPoll([_changeSet()]); + final timerFactory = FakeTimerFactory(); + final sync = FDv2PollingSynchronizer( + poll: poll.call, + selectorGetter: () => Selector.empty, + interval: interval, + logger: logger, + timerFactory: timerFactory.call, + ); + + final emissions = []; + final doneCompleter = Completer(); + sync.results.listen(emissions.add, onDone: doneCompleter.complete); + + await Future.delayed(Duration.zero); + expect(emissions, hasLength(1)); + + sync.close(); + await doneCompleter.future; + + expect(emissions, hasLength(2)); + expect((emissions[1] as StatusResult).state, equals(SourceState.shutdown)); + expect(timerFactory.timers.last.isActive, isFalse); + }); + + test('subscription cancel stops polling without emitting shutdown', () async { + final poll = ScriptedPoll([_changeSet(), _changeSet(), _changeSet()]); + final timerFactory = FakeTimerFactory(); + final sync = FDv2PollingSynchronizer( + poll: poll.call, + selectorGetter: () => Selector.empty, + interval: interval, + logger: logger, + timerFactory: timerFactory.call, + ); + + final emissions = []; + final sub = sync.results.listen(emissions.add); + await Future.delayed(Duration.zero); + expect(emissions, hasLength(1)); + + await sub.cancel(); + final pollsBeforeFire = poll.callCount; + if (timerFactory.latestRequestedDelay != null && + timerFactory.timers.last.isActive) { + timerFactory.fireLatest(); + await Future.delayed(Duration.zero); + } + expect(poll.callCount, equals(pollsBeforeFire)); + }); + + test('close is idempotent', () { + final sync = FDv2PollingSynchronizer( + poll: ScriptedPoll([_changeSet()]).call, + selectorGetter: () => Selector.empty, + interval: interval, + logger: logger, + timerFactory: FakeTimerFactory().call, + ); + sync.close(); + expect(() => sync.close(), returnsNormally); + }); +}