@@ -1315,6 +1315,7 @@ impl Agent {
13151315 cron_rx : tokio:: sync:: Mutex :: new ( cron_rx) ,
13161316 is_processing_cron : AtomicBool :: new ( false ) ,
13171317 cron_started : AtomicBool :: new ( false ) ,
1318+ cancel_token : Arc :: new ( tokio:: sync:: Mutex :: new ( None ) ) ,
13181319 } )
13191320 }
13201321}
@@ -1371,6 +1372,9 @@ pub struct AgentSession {
13711372 /// The ticker is started lazily on the first `send()` call so that
13721373 /// `tokio::spawn` is always called from within an async runtime context.
13731374 cron_started : AtomicBool ,
1375+ /// Cancellation token for the current operation (send/stream).
1376+ /// Stored so that cancel() can abort ongoing LLM calls.
1377+ cancel_token : Arc < tokio:: sync:: Mutex < Option < tokio_util:: sync:: CancellationToken > > > ,
13741378}
13751379
13761380impl std:: fmt:: Debug for AgentSession {
@@ -1515,7 +1519,19 @@ impl AgentSession {
15151519 None => read_or_recover ( & self . history ) . clone ( ) ,
15161520 } ;
15171521
1518- let result = agent_loop. execute ( & effective_history, prompt, None ) . await ?;
1522+ let cancel_token = tokio_util:: sync:: CancellationToken :: new ( ) ;
1523+ * self . cancel_token . lock ( ) . await = Some ( cancel_token. clone ( ) ) ;
1524+ let result = agent_loop
1525+ . execute_with_session (
1526+ & effective_history,
1527+ prompt,
1528+ Some ( & self . session_id ) ,
1529+ None ,
1530+ Some ( & cancel_token) ,
1531+ )
1532+ . await ;
1533+ * self . cancel_token . lock ( ) . await = None ;
1534+ let result = result?;
15191535
15201536 // Auto-accumulate: only update internal history when no custom
15211537 // history was provided.
@@ -1670,14 +1686,50 @@ impl AgentSession {
16701686 None => read_or_recover ( & self . history ) . clone ( ) ,
16711687 } ;
16721688 let prompt = prompt. to_string ( ) ;
1689+ let session_id = self . session_id . clone ( ) ;
1690+
1691+ let cancel_token = tokio_util:: sync:: CancellationToken :: new ( ) ;
1692+ * self . cancel_token . lock ( ) . await = Some ( cancel_token. clone ( ) ) ;
1693+ let token_clone = cancel_token. clone ( ) ;
16731694
16741695 let handle = tokio:: spawn ( async move {
16751696 let _ = agent_loop
1676- . execute ( & effective_history, & prompt, Some ( tx) )
1697+ . execute_with_session (
1698+ & effective_history,
1699+ & prompt,
1700+ Some ( & session_id) ,
1701+ Some ( tx) ,
1702+ Some ( & token_clone) ,
1703+ )
16771704 . await ;
16781705 } ) ;
16791706
1680- Ok ( ( rx, handle) )
1707+ // Wrap the handle to clear the cancel token when done
1708+ let cancel_token_ref = self . cancel_token . clone ( ) ;
1709+ let wrapped_handle = tokio:: spawn ( async move {
1710+ let _ = handle. await ;
1711+ * cancel_token_ref. lock ( ) . await = None ;
1712+ } ) ;
1713+
1714+ Ok ( ( rx, wrapped_handle) )
1715+ }
1716+
1717+ /// Cancel the current ongoing operation (send/stream).
1718+ ///
1719+ /// If an operation is in progress, this will trigger cancellation of the LLM streaming
1720+ /// and tool execution. The operation will terminate as soon as possible.
1721+ ///
1722+ /// Returns `true` if an operation was cancelled, `false` if no operation was in progress.
1723+ pub async fn cancel ( & self ) -> bool {
1724+ let token = self . cancel_token . lock ( ) . await . clone ( ) ;
1725+ if let Some ( token) = token {
1726+ token. cancel ( ) ;
1727+ tracing:: info!( session_id = %self . session_id, "Cancelled ongoing operation" ) ;
1728+ true
1729+ } else {
1730+ tracing:: debug!( session_id = %self . session_id, "No ongoing operation to cancel" ) ;
1731+ false
1732+ }
16811733 }
16821734
16831735 /// Return a snapshot of the session's conversation history.
0 commit comments