-
Notifications
You must be signed in to change notification settings - Fork 249
Add on_change callbacks for managed variables #1990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6c310d0
d36e24e
a87366a
6a7a49d
19c9578
b0705e5
57b1543
dcb5fa6
2c107a2
378c021
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -740,6 +740,13 @@ def configure( | |
| # are started when first accessed via get_variable_provider(). | ||
| if config.variables is not None: | ||
| config.get_variable_provider().start(logfire_instance if config.variables.instrument else None) | ||
| elif config._variable_change_polling_requested: # pyright: ignore[reportPrivateUsage] | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe polling should start if any variable is defined? Then it has a chance of being ready by the first |
||
| # An on_change callback is registered but no variables were explicitly configured. The | ||
| # poller would otherwise only start on the first variable resolution, so callbacks would | ||
| # never fire if nothing is resolved. Trigger lazy provider init (which starts polling) so | ||
| # on_change works without an eager get(). This also restarts the poller after a reconfigure | ||
| # rebuilt the provider as a no-op. Does nothing if no API key is available to resolve against. | ||
| config.get_variable_provider() | ||
|
|
||
| return logfire_instance | ||
|
|
||
|
|
@@ -1017,6 +1024,15 @@ def __init__( | |
| # thus it "shuts down" when it's gc'ed | ||
| self._meter_provider = ProxyMeterProvider(NoOpMeterProvider()) | ||
| self._variable_provider: VariableProvider = NoOpVariableProvider() | ||
| # Listeners for variable config changes. Held on the config (not the provider) so they | ||
| # survive reconfiguration: each provider this config creates is wired to dispatch here. | ||
| self._variables_change_listeners: list[Callable[[set[str]], None]] = [] | ||
| # Set once any variable's `on_change` callback is registered. on_change is only useful | ||
| # if the background poller is running, but the poller is otherwise started lazily on the | ||
| # first variable resolution. This flag lets us start it eagerly when a callback is | ||
| # registered, and restart it after a reconfigure (which rebuilds the provider). It lives | ||
| # on the config so it survives reconfiguration. | ||
| self._variable_change_polling_requested: bool = False | ||
| self._logger_provider = ProxyLoggerProvider(NoOpLoggerProvider()) | ||
| self._otlp_forwarding = OTLPForwardingManager([]) | ||
| # This ensures that we only call OTEL's global set_tracer_provider once to avoid warnings. | ||
|
|
@@ -1489,6 +1505,7 @@ def fix_pid(): # pragma: no cover | |
| options=self.variables, | ||
| server_response_hook=self.advanced.server_response_hook, | ||
| ) | ||
| self._variable_provider.add_on_config_change(self._notify_variables_change_listeners) | ||
| multi_log_processor = SynchronousMultiLogRecordProcessor() | ||
| for processor in log_record_processors: | ||
| multi_log_processor.add_log_record_processor(processor) | ||
|
|
@@ -1670,10 +1687,37 @@ def _lazy_init_variable_provider(self) -> VariableProvider: | |
| options=options, | ||
| server_response_hook=self.advanced.server_response_hook, | ||
| ) | ||
| provider.add_on_config_change(self._notify_variables_change_listeners) | ||
| self._variable_provider = provider | ||
| provider.start(Logfire(config=self)) | ||
| return provider | ||
|
|
||
| def add_variables_change_listener(self, listener: Callable[[set[str]], None]) -> None: | ||
| """Register a listener for variable config changes. | ||
|
|
||
| Registration is idempotent (adding the same listener twice has no effect) and | ||
| survives reconfiguration: every variable provider this config creates dispatches | ||
| its change notifications to the registered listeners. | ||
|
|
||
| Args: | ||
| listener: Called with the set of variable names (including aliases) whose | ||
| configs changed. | ||
| """ | ||
| if listener not in self._variables_change_listeners: | ||
| self._variables_change_listeners.append(listener) | ||
|
|
||
| def _notify_variables_change_listeners(self, changed_names: set[str]) -> None: | ||
| """Dispatch a provider's config-change notification to all registered listeners.""" | ||
| # Snapshot: dispatch runs on the provider's polling thread, and a concurrent | ||
| # registration on another thread shouldn't affect the in-flight dispatch. | ||
| for listener in list(self._variables_change_listeners): | ||
| try: | ||
| listener(changed_names) | ||
| except Exception: | ||
| import logging | ||
|
|
||
| logging.getLogger('logfire').exception('Error in variables change listener') | ||
|
|
||
| def warn_if_not_initialized(self, message: str): | ||
| ignore_no_config_env = os.getenv('LOGFIRE_IGNORE_NO_CONFIG', '') | ||
| ignore_no_config = ignore_no_config_env.lower() in ('1', 'true', 't') or self.ignore_no_config | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2603,6 +2603,7 @@ def var( | |
| description=description, | ||
| ) | ||
| self._variables[name] = variable | ||
| self._config.add_variables_change_listener(self._on_variables_config_change) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think #2034 should be merged first, and then this should change so that Logfire doesn't need to own callbacks. then there'll be 3 layers of callbacks instead of 4. |
||
|
|
||
| from logfire.variables.variable import warn_on_template_inputs_composition_mismatch | ||
|
|
||
|
|
@@ -2740,13 +2741,52 @@ class PromptInputs(BaseModel): | |
| template_mismatch_policy=template_mismatch_policy, | ||
| ) | ||
| self._variables[name] = variable | ||
| self._config.add_variables_change_listener(self._on_variables_config_change) | ||
|
|
||
| from logfire.variables.variable import warn_on_template_inputs_composition_mismatch | ||
|
|
||
| warn_on_template_inputs_composition_mismatch(self._variables, variable) | ||
|
|
||
| return variable | ||
|
|
||
| def _on_variables_config_change(self, changed_names: set[str]) -> None: | ||
| """Dispatch variable config changes to registered variables' on_change callbacks. | ||
|
|
||
| Registered with this instance's `LogfireConfig` (which wires it to every provider it | ||
| creates) the first time a variable is defined. Expands the directly-changed names to | ||
| every registered variable that is *effectively* changed — including variables that | ||
| (transitively) compose a changed variable via `@{ref}@` references — then fires each | ||
| affected variable's callbacks. | ||
| """ | ||
| # Snapshot the registry: dispatch runs on the provider's polling thread, and a | ||
| # concurrent `var()` on another thread mutating the dict mid-iteration would | ||
| # otherwise raise (losing the rest of this change cycle's notifications). | ||
| variables = dict(self._variables) | ||
| if not variables: | ||
| return | ||
|
|
||
| from logfire.variables.variable import expand_config_changes | ||
|
|
||
| provider_config = self.config.get_variable_provider().get_all_variables_config() | ||
| for name in sorted(expand_config_changes(changed_names, provider_config, variables)): | ||
| variables[name]._notify_change() # pyright: ignore[reportPrivateUsage] | ||
|
|
||
| def _ensure_variable_change_polling(self) -> None: | ||
| """Make sure the background poller is running so registered on_change callbacks can fire. | ||
|
|
||
| Called when a variable's `on_change` callback is registered. The remote provider's poller | ||
| is otherwise started lazily on the first variable resolution, so a program that only | ||
| registers callbacks (without ever resolving a variable) would never receive notifications. | ||
| We record the request on the config — so it survives reconfiguration, which rebuilds the | ||
| provider — and, if configuration has already happened, start the provider now (a no-op for | ||
| already-running and for no-op/local providers). | ||
| """ | ||
| self.config._variable_change_polling_requested = True # pyright: ignore[reportPrivateUsage] | ||
| # For an explicitly-configured provider this is already started; for the lazy-init | ||
| # remote path this creates and starts it. Returns a no-op provider when there's nothing | ||
| # to poll (no API key / no variables configured). | ||
| self.config.get_variable_provider() | ||
|
|
||
| def variables_clear(self) -> None: | ||
| """Clear all registered variables from this Logfire instance. | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,8 @@ | |
| VariableAlreadyExistsError, | ||
| VariableNotFoundError, | ||
| VariableProvider, | ||
| changed_config_keys, | ||
| resolution_relevant_config_changed, | ||
| ) | ||
| from logfire.variables.config import VariableConfig, VariablesConfig | ||
|
|
||
|
|
@@ -96,6 +98,7 @@ def create_variable(self, config: VariableConfig) -> VariableConfig: | |
| raise VariableAlreadyExistsError(f"Variable '{config.name}' already exists") | ||
| self._config.variables[config.name] = config | ||
| self._config._invalidate_alias_map() # pyright: ignore[reportPrivateUsage] | ||
| self._notify_config_change(changed_config_keys(config)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Snapshot configs before diffing notifications.
Suggested boundary-copy shape def get_variable_config(self, name: str) -> VariableConfig | None:
...
with self._lock:
- return self._config._get_variable_config(name) # pyright: ignore[reportPrivateUsage]
+ config = self._config._get_variable_config(name) # pyright: ignore[reportPrivateUsage]
+ return config.model_copy(deep=True) if config is not None else None
def create_variable(self, config: VariableConfig) -> VariableConfig:
...
with self._lock:
+ config = config.model_copy(deep=True)
if config.name in self._config.variables:
raise VariableAlreadyExistsError(f"Variable '{config.name}' already exists")
self._config.variables[config.name] = config
self._config._invalidate_alias_map() # pyright: ignore[reportPrivateUsage]
def update_variable(self, name: str, config: VariableConfig) -> VariableConfig:
...
with self._lock:
if name not in self._config.variables:
raise VariableNotFoundError(f"Variable '{name}' not found")
- old_config = self._config.variables[name]
- self._config.variables[name] = config
+ old_config = self._config.variables[name].model_copy(deep=True)
+ config = config.model_copy(deep=True)
+ self._config.variables[name] = config
self._config._invalidate_alias_map() # pyright: ignore[reportPrivateUsage]
def delete_variable(self, name: str) -> None:
...
- old_config = self._config.variables.pop(name)
+ old_config = self._config.variables.pop(name).model_copy(deep=True)Also applies to: 120-124, 139-141 🤖 Prompt for AI Agents |
||
| return config | ||
|
|
||
| def update_variable(self, name: str, config: VariableConfig) -> VariableConfig: | ||
|
|
@@ -114,8 +117,11 @@ def update_variable(self, name: str, config: VariableConfig) -> VariableConfig: | |
| with self._lock: | ||
| if name not in self._config.variables: | ||
| raise VariableNotFoundError(f"Variable '{name}' not found") | ||
| old_config = self._config.variables[name] | ||
| self._config.variables[name] = config | ||
| self._config._invalidate_alias_map() # pyright: ignore[reportPrivateUsage] | ||
| if resolution_relevant_config_changed(old_config, config): | ||
| self._notify_config_change(changed_config_keys(old_config, config)) | ||
| return config | ||
|
|
||
| def delete_variable(self, name: str) -> None: | ||
|
|
@@ -130,5 +136,6 @@ def delete_variable(self, name: str) -> None: | |
| with self._lock: | ||
| if name not in self._config.variables: | ||
| raise VariableNotFoundError(f"Variable '{name}' not found") | ||
| del self._config.variables[name] | ||
| old_config = self._config.variables.pop(name) | ||
| self._config._invalidate_alias_map() # pyright: ignore[reportPrivateUsage] | ||
| self._notify_config_change(changed_config_keys(old_config)) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is basically the same snippet as the first