Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';

import 'flag_eval_mapper.dart';
import 'payload.dart';
import 'selector.dart';
import 'source.dart';
import 'source_result.dart';

/// The shape of a cache hit: parsed evaluation results plus the
/// environment ID that was current when the cache was written.
typedef CachedFlags = ({
Map<String, LDEvaluationResult> flags,
String? environmentId,
});

/// Reads cached flag state for [context] from persistence. Returns
/// null on a cache miss, an unreadable entry, or a parse failure.
typedef CachedFlagsReader = Future<CachedFlags?> Function(LDContext context);

/// One-shot initializer that brings the SDK up from its persistence
/// cache. The cache is read once; retries are not meaningful for a
/// local read.
///
/// On cache hit, emits a [ChangeSetResult] with `persist: false` (the
/// data came from the cache; writing it back is a no-op) and an empty
/// selector (the cache does not track server-side selector state).
/// The payload type is [PayloadType.full]: a cache load is a complete
/// snapshot, not a delta.
///
/// On cache miss, emits a [ChangeSetResult] with [PayloadType.none] so
/// the initializer chain advances rather than terminating. The cache
/// is best-effort, not a source of truth.
final class CacheInitializer implements Initializer {
final CachedFlagsReader _reader;
final LDContext _context;
final LDLogger _logger;
final DateTime Function() _now;

bool _closed = false;

CacheInitializer({
required CachedFlagsReader reader,
required LDContext context,
required LDLogger logger,
DateTime Function()? now,
}) : _reader = reader,
_context = context,
_logger = logger.subLogger('CacheInitializer'),
_now = now ?? DateTime.now;

@override
Future<FDv2SourceResult> run() async {
if (_closed) return _shutdown();

final CachedFlags? cached;
try {
cached = await _reader(_context);
} catch (err) {
_logger.warn('Cache read failed (${err.runtimeType}); '
'treating as miss');
return _miss();
}

if (_closed) return _shutdown();

if (cached == null) {
return _miss();
}

final updates = <Update>[];
cached.flags.forEach((key, evalResult) {
updates.add(Update(
kind: flagEvalKind,
key: key,
version: evalResult.version,
object: LDEvaluationResultSerialization.toJson(evalResult),
));
});

return ChangeSetResult(
payload: Payload(
type: PayloadType.full,
selector: Selector.empty,
updates: updates,
),
environmentId: cached.environmentId,
freshness: _now(),
persist: false,
);
}

@override
void close() {
_closed = true;
}

ChangeSetResult _miss() => ChangeSetResult(
payload: const Payload(
type: PayloadType.none,
updates: [],
),
freshness: _now(),
persist: false,
);

StatusResult _shutdown() => FDv2SourceResults.shutdown(
message: 'Cache initializer closed before completion',
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/// Computes how long to wait before the next poll, given when the SDK
/// last received a fresh response and the configured polling interval.
///
/// Returns the time remaining in the interval relative to [freshness].
/// If [freshness] is null (no successful poll yet) or older than the
/// interval (we're overdue), returns either the full interval or zero
/// respectively.
///
/// Caps the returned delay at [interval] so a freshness timestamp from
/// the future (clock skew, manually adjusted system time) cannot push
/// the next poll arbitrarily far out.
Duration calculatePollDelay({
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now we are always going always poll immediately on a cache initializer. This is used for the synchronizer though.

required DateTime now,
required Duration interval,
DateTime? freshness,
}) {
if (freshness == null) {
return interval;
}
final elapsed = now.difference(freshness);
if (elapsed.isNegative) {
return interval;
}
if (elapsed >= interval) {
return Duration.zero;
}
return interval - elapsed;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import 'dart:async';

import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';

import 'selector.dart';
import 'source.dart';
import 'source_result.dart';

/// A single FDv2 poll. The orchestrator wires this to
/// [FDv2PollingBase.pollOnce]; tests inject scripted functions.
typedef PollFunction = Future<FDv2SourceResult> Function({Selector basis});

/// A function that delays for the given duration.
typedef DelayFunction = Future<void> Function(Duration duration);

Future<void> _defaultDelay(Duration duration) => Future.delayed(duration);

/// One-shot polling initializer.
///
/// Calls the injected [PollFunction] up to [_maxAttempts] times. Treats:
///
/// - [ChangeSetResult] as success — returned immediately.
/// - [SourceState.terminalError], [SourceState.goodbye], and
/// [SourceState.shutdown] as terminal — returned immediately without
/// further retries.
/// - [SourceState.interrupted] as transient — retried after
/// [_retryDelay] up to the attempt limit. After the limit, the last
/// interrupted result is converted into a [SourceState.terminalError]
/// so the orchestrator stops retrying at this layer of the chain.
///
/// Calling [close] before [run] completes signals an abort: any pending
/// retry delay returns immediately and [run] resolves to a
/// [SourceState.shutdown] result.
final class FDv2PollingInitializer implements Initializer {
static const int _maxAttempts = 3;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we formalized this into the spec, but for web based/web potential SDKs, I think it makes sense.

static const Duration _retryDelay = Duration(seconds: 1);

final PollFunction _poll;
final SelectorGetter _selectorGetter;
final LDLogger _logger;
final DelayFunction _delay;
final Completer<void> _closedSignal = Completer<void>();

FDv2PollingInitializer({
required PollFunction poll,
required SelectorGetter selectorGetter,
required LDLogger logger,
DelayFunction? delay,
}) : _poll = poll,
_selectorGetter = selectorGetter,
_logger = logger.subLogger('FDv2PollingInitializer'),
_delay = delay ?? _defaultDelay;

@override
Future<FDv2SourceResult> run() async {
StatusResult? lastInterrupted;

for (var attempt = 1; attempt <= _maxAttempts; attempt++) {
if (_closedSignal.isCompleted) {
return _shutdownResult();
}

final result = await _poll(basis: _selectorGetter());

if (_closedSignal.isCompleted) {
return _shutdownResult();
}

switch (result) {
case ChangeSetResult():
return result;
case StatusResult(state: SourceState.interrupted):
lastInterrupted = result;
_logger.warn(
'Polling initializer attempt $attempt/$_maxAttempts interrupted: '
'${result.message}');
if (attempt < _maxAttempts) {
await _waitForRetry();
}
case StatusResult():
// terminalError, goodbye, or shutdown -- pass through.
return result;
}
}

// All attempts produced interrupted. Escalate so the orchestrator
// can fall through to the next source rather than retry forever.
return FDv2SourceResults.terminalError(
message: 'Polling initializer exhausted $_maxAttempts attempts; '
'last error: ${lastInterrupted?.message}',
statusCode: lastInterrupted?.statusCode,
fdv1Fallback: lastInterrupted?.fdv1Fallback ?? false,
);
}

@override
void close() {
if (!_closedSignal.isCompleted) {
_closedSignal.complete();
}
}

Future<void> _waitForRetry() async {
await Future.any([_delay(_retryDelay), _closedSignal.future]);
}

StatusResult _shutdownResult() => FDv2SourceResults.shutdown(
message: 'Polling initializer closed before completion',
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import 'dart:async';

import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';

import 'calculate_poll_delay.dart';
import 'polling_initializer.dart' show PollFunction;
import 'source.dart';
import 'source_result.dart';

/// Constructs a [Timer] that fires once after [duration] and invokes
/// [callback]. Tests inject a fake to control time.
typedef TimerFactory = Timer Function(
Duration duration, void Function() callback);

Timer _defaultTimerFactory(Duration duration, void Function() callback) =>
Timer(duration, callback);

/// Long-lived polling synchronizer.
///
/// Exposes a single-subscription [Stream] of [FDv2SourceResult]s. On
/// subscription, polls immediately and then schedules the next poll
/// using [calculatePollDelay] over the freshness of the most recent
/// successful result. Cancelling the subscription (or calling [close])
/// stops the timer and closes the stream.
///
/// Each emission carries whatever the underlying poll returned, including
/// transient interrupted statuses. The orchestrator decides how to react.
final class FDv2PollingSynchronizer implements Synchronizer {
final PollFunction _poll;
final SelectorGetter _selectorGetter;
final Duration _interval;
final TimerFactory _timerFactory;
final DateTime Function() _now;
final LDLogger _logger;

late final StreamController<FDv2SourceResult> _controller;
Timer? _timer;
bool _closed = false;
DateTime? _lastFreshness;

FDv2PollingSynchronizer({
required PollFunction poll,
required SelectorGetter selectorGetter,
required Duration interval,
required LDLogger logger,
TimerFactory? timerFactory,
DateTime Function()? now,
}) : _poll = poll,
_selectorGetter = selectorGetter,
_interval = interval,
_timerFactory = timerFactory ?? _defaultTimerFactory,
_now = now ?? DateTime.now,
_logger = logger.subLogger('FDv2PollingSynchronizer') {
_controller = StreamController<FDv2SourceResult>(
onListen: _onListen,
onCancel: _onCancel,
);
}

@override
Stream<FDv2SourceResult> get results => _controller.stream;

@override
void close() {
if (_closed) return;
_closed = true;
_timer?.cancel();
_timer = null;
if (!_controller.isClosed) {
_controller.add(
FDv2SourceResults.shutdown(message: 'Polling synchronizer closed'));
_controller.close();
}
}

void _onListen() {
// Kick off the first poll immediately. Subsequent polls are
// scheduled from inside _doPoll via the timer.
Future<void>.microtask(_doPoll);
}

Future<void> _onCancel() async {
if (_closed) return;
_closed = true;
_timer?.cancel();
_timer = null;
// Don't emit shutdown -- the subscriber asked for cancellation.
}

Future<void> _doPoll() async {
if (_closed) return;
final FDv2SourceResult result;
try {
result = await _poll(basis: _selectorGetter());
} catch (err) {
// PollFunction is the FDv2PollingBase.pollOnce contract, which
// already converts errors to StatusResult. A throw here means
// someone wired a non-conforming function; treat defensively.
_logger.error('Poll function threw unexpectedly: ${err.runtimeType}');
if (!_closed && !_controller.isClosed) {
_controller.add(FDv2SourceResults.interrupted(
message: 'Polling source raised unexpectedly'));
}
_scheduleNext();
return;
}

if (_closed) return;

if (result is ChangeSetResult && result.freshness != null) {
_lastFreshness = result.freshness;
}

if (!_controller.isClosed) {
_controller.add(result);
}

_scheduleNext();
}

void _scheduleNext() {
if (_closed) return;
final delay = calculatePollDelay(
now: _now(),
interval: _interval,
freshness: _lastFreshness,
);
_timer = _timerFactory(delay, _doPoll);
}
}
Loading
Loading