-
Notifications
You must be signed in to change notification settings - Fork 18
feat: add polling and cache sources for FDv2 #261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
kinyoklion
wants to merge
2
commits into
main
Choose a base branch
from
rlamb/sdk-2184/fdv2-polling-cache-sources
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
109 changes: 109 additions & 0 deletions
109
packages/common_client/lib/src/data_sources/fdv2/cache_initializer.dart
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; | ||
|
|
||
| import 'flag_eval_mapper.dart'; | ||
| import 'payload.dart'; | ||
| import 'selector.dart'; | ||
| import 'source.dart'; | ||
| import 'source_result.dart'; | ||
|
|
||
| /// The shape of a cache hit: parsed evaluation results plus the | ||
| /// environment ID that was current when the cache was written. | ||
| typedef CachedFlags = ({ | ||
| Map<String, LDEvaluationResult> flags, | ||
| String? environmentId, | ||
| }); | ||
|
|
||
| /// Reads cached flag state for [context] from persistence. Returns | ||
| /// null on a cache miss, an unreadable entry, or a parse failure. | ||
| typedef CachedFlagsReader = Future<CachedFlags?> Function(LDContext context); | ||
|
|
||
| /// One-shot initializer that brings the SDK up from its persistence | ||
| /// cache. The cache is read once; retries are not meaningful for a | ||
| /// local read. | ||
| /// | ||
| /// On cache hit, emits a [ChangeSetResult] with `persist: false` (the | ||
| /// data came from the cache; writing it back is a no-op) and an empty | ||
| /// selector (the cache does not track server-side selector state). | ||
| /// The payload type is [PayloadType.full]: a cache load is a complete | ||
| /// snapshot, not a delta. | ||
| /// | ||
| /// On cache miss, emits a [ChangeSetResult] with [PayloadType.none] so | ||
| /// the initializer chain advances rather than terminating. The cache | ||
| /// is best-effort, not a source of truth. | ||
| final class CacheInitializer implements Initializer { | ||
| final CachedFlagsReader _reader; | ||
| final LDContext _context; | ||
| final LDLogger _logger; | ||
| final DateTime Function() _now; | ||
|
|
||
| bool _closed = false; | ||
|
|
||
| CacheInitializer({ | ||
| required CachedFlagsReader reader, | ||
| required LDContext context, | ||
| required LDLogger logger, | ||
| DateTime Function()? now, | ||
| }) : _reader = reader, | ||
| _context = context, | ||
| _logger = logger.subLogger('CacheInitializer'), | ||
| _now = now ?? DateTime.now; | ||
|
|
||
| @override | ||
| Future<FDv2SourceResult> run() async { | ||
| if (_closed) return _shutdown(); | ||
|
|
||
| final CachedFlags? cached; | ||
| try { | ||
| cached = await _reader(_context); | ||
| } catch (err) { | ||
| _logger.warn('Cache read failed (${err.runtimeType}); ' | ||
| 'treating as miss'); | ||
| return _miss(); | ||
| } | ||
|
|
||
| if (_closed) return _shutdown(); | ||
|
|
||
| if (cached == null) { | ||
| return _miss(); | ||
| } | ||
|
|
||
| final updates = <Update>[]; | ||
| cached.flags.forEach((key, evalResult) { | ||
| updates.add(Update( | ||
| kind: flagEvalKind, | ||
| key: key, | ||
| version: evalResult.version, | ||
| object: LDEvaluationResultSerialization.toJson(evalResult), | ||
| )); | ||
| }); | ||
|
|
||
| return ChangeSetResult( | ||
| payload: Payload( | ||
| type: PayloadType.full, | ||
| selector: Selector.empty, | ||
| updates: updates, | ||
| ), | ||
| environmentId: cached.environmentId, | ||
| freshness: _now(), | ||
| persist: false, | ||
| ); | ||
| } | ||
|
|
||
| @override | ||
| void close() { | ||
| _closed = true; | ||
| } | ||
|
|
||
| ChangeSetResult _miss() => ChangeSetResult( | ||
| payload: const Payload( | ||
| type: PayloadType.none, | ||
| updates: [], | ||
| ), | ||
| freshness: _now(), | ||
| persist: false, | ||
| ); | ||
|
|
||
| StatusResult _shutdown() => FDv2SourceResults.shutdown( | ||
| message: 'Cache initializer closed before completion', | ||
| ); | ||
| } |
28 changes: 28 additions & 0 deletions
28
packages/common_client/lib/src/data_sources/fdv2/calculate_poll_delay.dart
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| /// Computes how long to wait before the next poll, given when the SDK | ||
| /// last received a fresh response and the configured polling interval. | ||
| /// | ||
| /// Returns the time remaining in the interval relative to [freshness]. | ||
| /// If [freshness] is null (no successful poll yet) or older than the | ||
| /// interval (we're overdue), returns either the full interval or zero | ||
| /// respectively. | ||
| /// | ||
| /// Caps the returned delay at [interval] so a freshness timestamp from | ||
| /// the future (clock skew, manually adjusted system time) cannot push | ||
| /// the next poll arbitrarily far out. | ||
| Duration calculatePollDelay({ | ||
| required DateTime now, | ||
| required Duration interval, | ||
| DateTime? freshness, | ||
| }) { | ||
| if (freshness == null) { | ||
| return interval; | ||
| } | ||
| final elapsed = now.difference(freshness); | ||
| if (elapsed.isNegative) { | ||
| return interval; | ||
| } | ||
| if (elapsed >= interval) { | ||
| return Duration.zero; | ||
| } | ||
| return interval - elapsed; | ||
| } | ||
110 changes: 110 additions & 0 deletions
110
packages/common_client/lib/src/data_sources/fdv2/polling_initializer.dart
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,110 @@ | ||
| import 'dart:async'; | ||
|
|
||
| import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; | ||
|
|
||
| import 'selector.dart'; | ||
| import 'source.dart'; | ||
| import 'source_result.dart'; | ||
|
|
||
| /// A single FDv2 poll. The orchestrator wires this to | ||
| /// [FDv2PollingBase.pollOnce]; tests inject scripted functions. | ||
| typedef PollFunction = Future<FDv2SourceResult> Function({Selector basis}); | ||
|
|
||
| /// A function that delays for the given duration. | ||
| typedef DelayFunction = Future<void> Function(Duration duration); | ||
|
|
||
| Future<void> _defaultDelay(Duration duration) => Future.delayed(duration); | ||
|
|
||
| /// One-shot polling initializer. | ||
| /// | ||
| /// Calls the injected [PollFunction] up to [_maxAttempts] times. Treats: | ||
| /// | ||
| /// - [ChangeSetResult] as success — returned immediately. | ||
| /// - [SourceState.terminalError], [SourceState.goodbye], and | ||
| /// [SourceState.shutdown] as terminal — returned immediately without | ||
| /// further retries. | ||
| /// - [SourceState.interrupted] as transient — retried after | ||
| /// [_retryDelay] up to the attempt limit. After the limit, the last | ||
| /// interrupted result is converted into a [SourceState.terminalError] | ||
| /// so the orchestrator stops retrying at this layer of the chain. | ||
| /// | ||
| /// Calling [close] before [run] completes signals an abort: any pending | ||
| /// retry delay returns immediately and [run] resolves to a | ||
| /// [SourceState.shutdown] result. | ||
| final class FDv2PollingInitializer implements Initializer { | ||
| static const int _maxAttempts = 3; | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we formalized this into the spec, but for web based/web potential SDKs, I think it makes sense. |
||
| static const Duration _retryDelay = Duration(seconds: 1); | ||
|
|
||
| final PollFunction _poll; | ||
| final SelectorGetter _selectorGetter; | ||
| final LDLogger _logger; | ||
| final DelayFunction _delay; | ||
| final Completer<void> _closedSignal = Completer<void>(); | ||
|
|
||
| FDv2PollingInitializer({ | ||
| required PollFunction poll, | ||
| required SelectorGetter selectorGetter, | ||
| required LDLogger logger, | ||
| DelayFunction? delay, | ||
| }) : _poll = poll, | ||
| _selectorGetter = selectorGetter, | ||
| _logger = logger.subLogger('FDv2PollingInitializer'), | ||
| _delay = delay ?? _defaultDelay; | ||
|
|
||
| @override | ||
| Future<FDv2SourceResult> run() async { | ||
| StatusResult? lastInterrupted; | ||
|
|
||
| for (var attempt = 1; attempt <= _maxAttempts; attempt++) { | ||
| if (_closedSignal.isCompleted) { | ||
| return _shutdownResult(); | ||
| } | ||
|
|
||
| final result = await _poll(basis: _selectorGetter()); | ||
|
|
||
| if (_closedSignal.isCompleted) { | ||
| return _shutdownResult(); | ||
| } | ||
|
|
||
| switch (result) { | ||
| case ChangeSetResult(): | ||
| return result; | ||
| case StatusResult(state: SourceState.interrupted): | ||
| lastInterrupted = result; | ||
| _logger.warn( | ||
| 'Polling initializer attempt $attempt/$_maxAttempts interrupted: ' | ||
| '${result.message}'); | ||
| if (attempt < _maxAttempts) { | ||
| await _waitForRetry(); | ||
| } | ||
| case StatusResult(): | ||
| // terminalError, goodbye, or shutdown -- pass through. | ||
| return result; | ||
| } | ||
| } | ||
|
|
||
| // All attempts produced interrupted. Escalate so the orchestrator | ||
| // can fall through to the next source rather than retry forever. | ||
| return FDv2SourceResults.terminalError( | ||
| message: 'Polling initializer exhausted $_maxAttempts attempts; ' | ||
| 'last error: ${lastInterrupted?.message}', | ||
| statusCode: lastInterrupted?.statusCode, | ||
| fdv1Fallback: lastInterrupted?.fdv1Fallback ?? false, | ||
| ); | ||
| } | ||
|
|
||
| @override | ||
| void close() { | ||
| if (!_closedSignal.isCompleted) { | ||
| _closedSignal.complete(); | ||
| } | ||
| } | ||
|
|
||
| Future<void> _waitForRetry() async { | ||
| await Future.any([_delay(_retryDelay), _closedSignal.future]); | ||
| } | ||
|
|
||
| StatusResult _shutdownResult() => FDv2SourceResults.shutdown( | ||
| message: 'Polling initializer closed before completion', | ||
| ); | ||
| } | ||
130 changes: 130 additions & 0 deletions
130
packages/common_client/lib/src/data_sources/fdv2/polling_synchronizer.dart
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,130 @@ | ||
| import 'dart:async'; | ||
|
|
||
| import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart'; | ||
|
|
||
| import 'calculate_poll_delay.dart'; | ||
| import 'polling_initializer.dart' show PollFunction; | ||
| import 'source.dart'; | ||
| import 'source_result.dart'; | ||
|
|
||
| /// Constructs a [Timer] that fires once after [duration] and invokes | ||
| /// [callback]. Tests inject a fake to control time. | ||
| typedef TimerFactory = Timer Function( | ||
| Duration duration, void Function() callback); | ||
|
|
||
| Timer _defaultTimerFactory(Duration duration, void Function() callback) => | ||
| Timer(duration, callback); | ||
|
|
||
| /// Long-lived polling synchronizer. | ||
| /// | ||
| /// Exposes a single-subscription [Stream] of [FDv2SourceResult]s. On | ||
| /// subscription, polls immediately and then schedules the next poll | ||
| /// using [calculatePollDelay] over the freshness of the most recent | ||
| /// successful result. Cancelling the subscription (or calling [close]) | ||
| /// stops the timer and closes the stream. | ||
| /// | ||
| /// Each emission carries whatever the underlying poll returned, including | ||
| /// transient interrupted statuses. The orchestrator decides how to react. | ||
| final class FDv2PollingSynchronizer implements Synchronizer { | ||
| final PollFunction _poll; | ||
| final SelectorGetter _selectorGetter; | ||
| final Duration _interval; | ||
| final TimerFactory _timerFactory; | ||
| final DateTime Function() _now; | ||
| final LDLogger _logger; | ||
|
|
||
| late final StreamController<FDv2SourceResult> _controller; | ||
| Timer? _timer; | ||
| bool _closed = false; | ||
| DateTime? _lastFreshness; | ||
|
|
||
| FDv2PollingSynchronizer({ | ||
| required PollFunction poll, | ||
| required SelectorGetter selectorGetter, | ||
| required Duration interval, | ||
| required LDLogger logger, | ||
| TimerFactory? timerFactory, | ||
| DateTime Function()? now, | ||
| }) : _poll = poll, | ||
| _selectorGetter = selectorGetter, | ||
| _interval = interval, | ||
| _timerFactory = timerFactory ?? _defaultTimerFactory, | ||
| _now = now ?? DateTime.now, | ||
| _logger = logger.subLogger('FDv2PollingSynchronizer') { | ||
| _controller = StreamController<FDv2SourceResult>( | ||
| onListen: _onListen, | ||
| onCancel: _onCancel, | ||
| ); | ||
| } | ||
|
|
||
| @override | ||
| Stream<FDv2SourceResult> get results => _controller.stream; | ||
|
|
||
| @override | ||
| void close() { | ||
| if (_closed) return; | ||
| _closed = true; | ||
| _timer?.cancel(); | ||
| _timer = null; | ||
| if (!_controller.isClosed) { | ||
| _controller.add( | ||
| FDv2SourceResults.shutdown(message: 'Polling synchronizer closed')); | ||
| _controller.close(); | ||
| } | ||
| } | ||
|
|
||
| void _onListen() { | ||
| // Kick off the first poll immediately. Subsequent polls are | ||
| // scheduled from inside _doPoll via the timer. | ||
| Future<void>.microtask(_doPoll); | ||
| } | ||
|
|
||
| Future<void> _onCancel() async { | ||
| if (_closed) return; | ||
| _closed = true; | ||
| _timer?.cancel(); | ||
| _timer = null; | ||
| // Don't emit shutdown -- the subscriber asked for cancellation. | ||
| } | ||
|
|
||
| Future<void> _doPoll() async { | ||
| if (_closed) return; | ||
| final FDv2SourceResult result; | ||
| try { | ||
| result = await _poll(basis: _selectorGetter()); | ||
| } catch (err) { | ||
| // PollFunction is the FDv2PollingBase.pollOnce contract, which | ||
| // already converts errors to StatusResult. A throw here means | ||
| // someone wired a non-conforming function; treat defensively. | ||
| _logger.error('Poll function threw unexpectedly: ${err.runtimeType}'); | ||
| if (!_closed && !_controller.isClosed) { | ||
| _controller.add(FDv2SourceResults.interrupted( | ||
| message: 'Polling source raised unexpectedly')); | ||
| } | ||
| _scheduleNext(); | ||
| return; | ||
| } | ||
|
|
||
| if (_closed) return; | ||
|
|
||
| if (result is ChangeSetResult && result.freshness != null) { | ||
| _lastFreshness = result.freshness; | ||
| } | ||
|
|
||
| if (!_controller.isClosed) { | ||
| _controller.add(result); | ||
| } | ||
|
|
||
| _scheduleNext(); | ||
| } | ||
|
|
||
| void _scheduleNext() { | ||
| if (_closed) return; | ||
| final delay = calculatePollDelay( | ||
| now: _now(), | ||
| interval: _interval, | ||
| freshness: _lastFreshness, | ||
| ); | ||
| _timer = _timerFactory(delay, _doPoll); | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now we are always going always poll immediately on a cache initializer. This is used for the synchronizer though.