Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions packages/jumpstarter/jumpstarter/client/lease.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async def _acquire(self):
# lease ready
if condition_true(result.conditions, "Ready"):
logger.debug("Lease %s acquired", self.name)
spinner.update_status(f"Lease {self.name} acquired successfully!")
spinner.update_status(f"Lease {self.name} acquired successfully!", force=True)
self.exporter_name = result.exporter
break

Expand Down Expand Up @@ -332,6 +332,8 @@ def __init__(self, lease_name: str | None = None):
self.start_time = None
self._should_show_spinner = self._is_terminal_available() and not self._is_non_interactive()
self._current_message = None
self._last_log_time = None
self._log_throttle_interval = timedelta(minutes=5)

def _is_non_interactive(self) -> bool:
"""Check if the user desires a NONINTERACTIVE environment."""
Expand Down Expand Up @@ -359,18 +361,32 @@ def __exit__(self, exc_type, exc_val, exc_tb):
if self.spinner:
self.spinner.stop()

def update_status(self, message: str):
"""Update the spinner status message."""
def update_status(self, message: str, force: bool = False):
"""Update the spinner status message.

:param message: The status message to display
:param force: If True, always log the message even when throttling (default: False)
"""
if self.spinner and self._should_show_spinner:
self._current_message = f"[blue]{message}[/blue]"
elapsed = datetime.now() - self.start_time
elapsed_str = str(elapsed).split(".")[0] # Remove microseconds
self.spinner.update(f"{self._current_message} [dim]({elapsed_str})[/dim]")
else:
# Log info message when no console is available
elapsed = datetime.now() - self.start_time
elapsed_str = str(elapsed).split(".")[0] # Remove microseconds
logger.info(f"{message} ({elapsed_str})")
# Throttle updates to at most every 5 minutes unless forced
now = datetime.now()
should_log = (
force
or self._last_log_time is None
or (now - self._last_log_time) >= self._log_throttle_interval
)

if should_log:
elapsed = now - self.start_time
elapsed_str = str(elapsed).split(".")[0] # Remove microseconds
logger.info(f"{message} ({elapsed_str})")
self._last_log_time = now

def tick(self):
"""Update the spinner with current elapsed time without changing the message."""
Expand Down
100 changes: 100 additions & 0 deletions packages/jumpstarter/jumpstarter/client/lease_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,103 @@ def test_start_time_initialization_in_context(self):
with spinner:
assert spinner.start_time is not None
assert isinstance(spinner.start_time, datetime)

def test_throttling_first_update_logged(self, caplog):
"""Test that the first update is always logged when console is not available."""
with patch.object(LeaseAcquisitionSpinner, '_is_terminal_available', return_value=False):
spinner = LeaseAcquisitionSpinner("test-lease")
spinner.start_time = datetime.now()

with caplog.at_level(logging.INFO):
spinner.update_status("First message")

assert "First message" in caplog.text
assert spinner._last_log_time is not None

def test_throttling_second_update_within_interval_not_logged(self, caplog):
"""Test that updates within 5 minutes are not logged."""
with patch.object(LeaseAcquisitionSpinner, '_is_terminal_available', return_value=False):
spinner = LeaseAcquisitionSpinner("test-lease")
spinner.start_time = datetime.now()
spinner._last_log_time = datetime.now() - timedelta(minutes=2) # 2 minutes ago

with caplog.at_level(logging.INFO):
spinner.update_status("Second message")

# Should not log because only 2 minutes have passed
assert "Second message" not in caplog.text

def test_throttling_update_after_interval_logged(self, caplog):
"""Test that updates after 5 minutes are logged."""
with patch.object(LeaseAcquisitionSpinner, '_is_terminal_available', return_value=False):
spinner = LeaseAcquisitionSpinner("test-lease")
spinner.start_time = datetime.now()
spinner._last_log_time = datetime.now() - timedelta(minutes=6) # 6 minutes ago

with caplog.at_level(logging.INFO):
spinner.update_status("After interval message")

assert "After interval message" in caplog.text
assert spinner._last_log_time is not None

def test_throttling_forced_update_always_logged(self, caplog):
"""Test that forced updates are always logged regardless of throttle interval."""
with patch.object(LeaseAcquisitionSpinner, '_is_terminal_available', return_value=False):
spinner = LeaseAcquisitionSpinner("test-lease")
spinner.start_time = datetime.now()
spinner._last_log_time = datetime.now() - timedelta(minutes=1) # 1 minute ago

with caplog.at_level(logging.INFO):
spinner.update_status("Forced message", force=True)

assert "Forced message" in caplog.text
assert spinner._last_log_time is not None

def test_throttling_multiple_updates_only_logs_when_needed(self, caplog):
"""Test that multiple rapid updates only log at appropriate intervals."""
with patch.object(LeaseAcquisitionSpinner, '_is_terminal_available', return_value=False):
spinner = LeaseAcquisitionSpinner("test-lease")
spinner.start_time = datetime.now()

with caplog.at_level(logging.INFO):
# First update should be logged
spinner.update_status("Message 1")
assert "Message 1" in caplog.text

# Set last log time to recent
spinner._last_log_time = datetime.now() - timedelta(minutes=1)

# Second update should not be logged (within interval)
spinner.update_status("Message 2")
assert "Message 2" not in caplog.text

# Third update should not be logged (within interval)
spinner.update_status("Message 3")
assert "Message 3" not in caplog.text

# Set last log time to past the interval
spinner._last_log_time = datetime.now() - timedelta(minutes=6)

# Fourth update should be logged (past interval)
spinner.update_status("Message 4")
assert "Message 4" in caplog.text

def test_throttling_not_applied_when_console_available(self):
"""Test that throttling is not applied when console is available."""
with patch.object(LeaseAcquisitionSpinner, '_is_terminal_available', return_value=True):
spinner = LeaseAcquisitionSpinner("test-lease")
spinner.start_time = datetime.now()

mock_spinner = Mock()
spinner.spinner = mock_spinner

# Multiple updates should all call update() regardless of throttle
spinner.update_status("Message 1")
spinner.update_status("Message 2")
spinner.update_status("Message 3")

# All should be called even if we set a recent last_log_time
spinner._last_log_time = datetime.now() - timedelta(minutes=1)
spinner.update_status("Message 4")

assert mock_spinner.update.call_count == 4
Loading