Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ function assertChunksMatchResult(result, chunks) {
}

for (const [label, create] of [
["Bash", () => new Bash()],
["BashTool", () => new BashTool()],
["Bash", (options) => new Bash(options)],
["BashTool", (options) => new BashTool(options)],
]) {
describe(`${label} streaming output`, () => {
it("sync rejects Promise-returning onOutput", () => {
Expand Down Expand Up @@ -147,6 +147,24 @@ for (const [label, create] of [
assert.equal(result.stdout, "after-error\n");
});

it("sync rejects same-instance execute from onOutput", () => {
const shell = create();

assert.throws(
() =>
shell.executeSync(SCRIPT, {
onOutput() {
shell.executeSync("echo nested");
},
}),
/onOutput cannot re-enter the same Bash instance/,
);

const result = shell.executeSync("echo after-reentry");
assert.equal(result.exitCode, 0);
assert.equal(result.stdout, "after-reentry\n");
});

it("sync callback errors do not clear future explicit cancel", () => {
const shell = create();

Expand Down Expand Up @@ -205,6 +223,29 @@ for (const [label, create] of [
assert.equal(result.stdout, "");
});

it("async rejects same-instance fs handle from onOutput", async () => {
const shell = create({
files: {
"/workspace/memory.md": "hello\n",
},
});
const fs = shell.fs();

await assert.rejects(
() =>
shell.execute(SCRIPT, {
onOutput() {
fs.readFile("/workspace/memory.md");
},
}),
/onOutput cannot re-enter the same Bash instance/,
);

const result = await shell.execute("cat /workspace/memory.md");
assert.equal(result.exitCode, 0);
assert.equal(result.stdout, "hello\n");
});

it("async rejects async onOutput", async () => {
const shell = create();

Expand Down
70 changes: 61 additions & 9 deletions crates/bashkit-js/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use napi_derive::napi;
use std::collections::HashMap;
use std::path::Path;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex as StdMutex};
use tokio::sync::Mutex;

Expand Down Expand Up @@ -56,6 +56,34 @@ fn callback_semaphore() -> &'static tokio::sync::Semaphore {
SEM.get_or_init(|| tokio::sync::Semaphore::new(MAX_CONCURRENT_TOOL_CALLBACKS))
}

// Decision: reject same-instance onOutput re-entry at the binding boundary so
// sync paths fail with a JS error instead of deadlocking or panicking.
const ON_OUTPUT_REENTRY_ERROR: &str = "onOutput cannot re-enter the same Bash instance; use collected output or another Bash instance for live access";

struct OnOutputReentryScope {
depth: Arc<AtomicUsize>,
}

impl OnOutputReentryScope {
fn enter(depth: Arc<AtomicUsize>) -> Self {
depth.fetch_add(1, Ordering::SeqCst);
Self { depth }
}
}

impl Drop for OnOutputReentryScope {
fn drop(&mut self) {
self.depth.fetch_sub(1, Ordering::SeqCst);
}
}

fn reject_on_output_reentry(state: &Arc<SharedState>) -> napi::Result<()> {
if state.on_output_reentry_depth.load(Ordering::SeqCst) > 0 {
return Err(napi::Error::from_reason(ON_OUTPUT_REENTRY_ERROR));
}
Ok(())
}

// ============================================================================
// MontyObject <-> JSON conversion
// ============================================================================
Expand Down Expand Up @@ -483,6 +511,7 @@ fn build_sync_output_callback(
cancelled: Arc<AtomicBool>,
callback_requested_cancel: Arc<AtomicBool>,
callback_error: Arc<StdMutex<Option<String>>>,
on_output_reentry_depth: Arc<AtomicUsize>,
) -> OutputCallback {
Box::new(move |stdout_chunk, stderr_chunk| {
let has_error = callback_error
Expand All @@ -507,6 +536,7 @@ fn build_sync_output_callback(
}
};

let _reentry_scope = OnOutputReentryScope::enter(on_output_reentry_depth.clone());
match callback.call((stdout_chunk.to_string(), stderr_chunk.to_string())) {
Ok(Some(err)) => {
record_callback_error(
Expand All @@ -533,6 +563,7 @@ fn build_async_output_callback(
tsfn: Arc<OutputTsfn>,
cancelled: Arc<AtomicBool>,
callback_requested_cancel: Arc<AtomicBool>,
on_output_reentry_depth: Arc<AtomicUsize>,
) -> (OutputCallback, Arc<StdMutex<Option<String>>>) {
let callback_error = Arc::new(StdMutex::new(None));
let callback_error_output = callback_error.clone();
Expand All @@ -551,12 +582,14 @@ fn build_async_output_callback(
let stdout = stdout_chunk.to_string();
let stderr = stderr_chunk.to_string();
let tsfn = tsfn.clone();
let on_output_reentry_depth = on_output_reentry_depth.clone();
let (tx, rx) = std::sync::mpsc::channel();

// OutputCallback in core bashkit is synchronous. Dispatch onto the
// shared callback runtime, then block until JS finishes so callback
// errors abort execution immediately and chunk ordering stays stable.
callback_runtime().spawn(async move {
let _reentry_scope = OnOutputReentryScope::enter(on_output_reentry_depth);
let result: Result<Option<String>, String> = tsfn
.call_async((stdout, stderr))
.await
Expand Down Expand Up @@ -733,6 +766,7 @@ struct SharedState {
inner: Mutex<RustBash>,
rt: Mutex<tokio::runtime::Runtime>,
cancelled: Arc<AtomicBool>,
on_output_reentry_depth: Arc<AtomicUsize>,
username: Option<String>,
hostname: Option<String>,
max_commands: Option<u32>,
Expand Down Expand Up @@ -768,10 +802,14 @@ type ExternalHandlerArc = Arc<
/// Clone `Arc<SharedState>`, then use the runtime to block on a future that
/// captures only the cloned Arc. This avoids holding raw `&self` across
/// `block_on` boundaries.
fn block_on_with<Fut, T>(state: &Arc<SharedState>, f: impl FnOnce(Arc<SharedState>) -> Fut) -> T
fn block_on_with<Fut, T>(
state: &Arc<SharedState>,
f: impl FnOnce(Arc<SharedState>) -> Fut,
) -> napi::Result<T>
where
Fut: std::future::Future<Output = T>,
Fut: std::future::Future<Output = napi::Result<T>>,
{
reject_on_output_reentry(state)?;
let s = state.clone();
let rt_guard = s.rt.blocking_lock();
let s2 = state.clone();
Expand Down Expand Up @@ -829,6 +867,7 @@ impl Bash {
cancelled.clone(),
callback_requested_cancel.clone(),
callback_error.clone(),
s.on_output_reentry_depth.clone(),
);
execute_rust_bash(
&mut bash,
Expand All @@ -848,6 +887,7 @@ impl Bash {
/// Execute bash commands asynchronously, returning a Promise.
#[napi]
pub async fn execute(&self, commands: String) -> napi::Result<ExecResult> {
reject_on_output_reentry(&self.state)?;
let s = self.state.clone();
let mut bash = s.inner.lock().await;
execute_rust_bash(&mut bash, &commands, None, None, None, None).await
Expand All @@ -862,19 +902,22 @@ impl Bash {
commands: String,
on_output: napi::bindgen_prelude::Function<'env, (String, String), Option<String>>,
) -> napi::Result<napi::bindgen_prelude::PromiseRaw<'env, ExecResult>> {
reject_on_output_reentry(&self.state)?;
let raw_env = on_output.value().env;
let tsfn = create_output_tsfn(on_output)?;
let state = self.state.clone();
let promise = napi::bindgen_prelude::execute_tokio_future(
raw_env,
async move {
reject_on_output_reentry(&state)?;
let mut bash = state.inner.lock().await;
let cancelled = bash.cancellation_token();
let callback_requested_cancel = Arc::new(AtomicBool::new(false));
let (output_callback, callback_error) = build_async_output_callback(
tsfn,
cancelled.clone(),
callback_requested_cancel.clone(),
state.on_output_reentry_depth.clone(),
);
execute_rust_bash(
&mut bash,
Expand Down Expand Up @@ -1183,10 +1226,11 @@ impl Bash {

/// Get a `JsFileSystem` handle for direct VFS operations.
#[napi]
pub fn fs(&self) -> JsFileSystem {
JsFileSystem {
pub fn fs(&self) -> napi::Result<JsFileSystem> {
reject_on_output_reentry(&self.state)?;
Ok(JsFileSystem {
state: self.state.clone(),
}
})
}
}

Expand Down Expand Up @@ -1256,6 +1300,7 @@ impl BashTool {
cancelled.clone(),
callback_requested_cancel.clone(),
callback_error.clone(),
s.on_output_reentry_depth.clone(),
);
execute_rust_bash(
&mut bash,
Expand All @@ -1275,6 +1320,7 @@ impl BashTool {
/// Execute bash commands asynchronously, returning a Promise.
#[napi]
pub async fn execute(&self, commands: String) -> napi::Result<ExecResult> {
reject_on_output_reentry(&self.state)?;
let s = self.state.clone();
let mut bash = s.inner.lock().await;
execute_rust_bash(&mut bash, &commands, None, None, None, None).await
Expand All @@ -1289,19 +1335,22 @@ impl BashTool {
commands: String,
on_output: napi::bindgen_prelude::Function<'env, (String, String), Option<String>>,
) -> napi::Result<napi::bindgen_prelude::PromiseRaw<'env, ExecResult>> {
reject_on_output_reentry(&self.state)?;
let raw_env = on_output.value().env;
let tsfn = create_output_tsfn(on_output)?;
let state = self.state.clone();
let promise = napi::bindgen_prelude::execute_tokio_future(
raw_env,
async move {
reject_on_output_reentry(&state)?;
let mut bash = state.inner.lock().await;
let cancelled = bash.cancellation_token();
let callback_requested_cancel = Arc::new(AtomicBool::new(false));
let (output_callback, callback_error) = build_async_output_callback(
tsfn,
cancelled.clone(),
callback_requested_cancel.clone(),
state.on_output_reentry_depth.clone(),
);
execute_rust_bash(
&mut bash,
Expand Down Expand Up @@ -1654,10 +1703,11 @@ impl BashTool {

/// Get a `JsFileSystem` handle for direct VFS operations.
#[napi]
pub fn fs(&self) -> JsFileSystem {
JsFileSystem {
pub fn fs(&self) -> napi::Result<JsFileSystem> {
reject_on_output_reentry(&self.state)?;
Ok(JsFileSystem {
state: self.state.clone(),
}
})
}
}

Expand Down Expand Up @@ -2072,6 +2122,7 @@ fn shared_state_from_opts(
.map_err(|e| napi::Error::from_reason(format!("Failed to create runtime: {e}")))?,
),
cancelled: Arc::new(AtomicBool::new(false)),
on_output_reentry_depth: Arc::new(AtomicUsize::new(0)),
username: opts.username.clone(),
hostname: opts.hostname.clone(),
max_commands: opts.max_commands,
Expand Down Expand Up @@ -2105,6 +2156,7 @@ fn shared_state_from_opts(
inner: Mutex::new(bash),
rt: Mutex::new(rt),
cancelled,
on_output_reentry_depth: tmp.on_output_reentry_depth,
username: opts.username,
hostname: opts.hostname,
max_commands: opts.max_commands,
Expand Down
4 changes: 2 additions & 2 deletions crates/bashkit-js/wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ export interface ExecuteOptions {
*
* Limitation: do not call back into the same `Bash` / `BashTool` instance
* from this handler (`execute*`, `readFile`, `fs()`, etc.). The current
* binding delivers chunks while that instance is still mid-execution, so
* same-instance re-entry can deadlock or panic.
* binding rejects same-instance re-entry to avoid deadlocks and runtime
* panics.
*/
onOutput?: OnOutput;
}
Expand Down
3 changes: 3 additions & 0 deletions crates/bashkit-python/bashkit/_bashkit.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ class Bash:
python: Enable embedded Python (``python3`` builtin).
external_functions: Function names callable from Python code.
external_handler: Async callback for external function calls.
The callback must not call back into the same ``Bash`` instance
via live methods like ``read_file()``, ``fs()``, or
``execute()``; those re-entrant calls are rejected.
files: Dict mapping VFS paths to file contents or lazy callables.
mounts: List of real host directory mount configs.
custom_builtins: Constructor-time Python callbacks exposed as
Expand Down
Loading
Loading