Skip to content

Robert-Cunningham/nozzle

Repository files navigation

Nozzle Logo


Quickstart   •   Recipes   •   Reference

Nozzle is a small TypeScript library for transforming async iterables, especially streamed text from LLMs.

It helps when provider chunks are not the chunks your app wants: parse structured markers, split text into useful pieces, fan out one stream to multiple consumers, extract generated sections, or smooth token timing without waiting for the whole response.

Quickstart

npm i nozzle-js # or pnpm / bun / yarn

Nozzle has ESM and CJS builds and works with any sync or async iterable.

import { nz } from "nozzle-js"

const pacedWords = nz(llmTextStream)
  .splitAfter(" ")
  .compact()
  .minInterval(40)

for await (const word of pacedWords) {
  process.stdout.write(word)
}

Recipes

Parse structured markers as they stream

const stream = await openai.chat.completions.create({ ...args, stream: true })

const parts = nz(stream)
  .map((chunk) => chunk.choices[0]?.delta?.content ?? "")
  .parse(/img-(\w+)/g, (match) => ({ type: "image", id: match[1] }))

// yields: "Here is ", { type: "image", id: "abc123" }, " for you"

Parse Demo

Branch one stream for UI and storage

const [displayStream, storageStream] = nz(stream).tee(2)

const displayPromise = (async () => {
  for await (const chunk of displayStream) process.stdout.write(chunk)
})()

const [, consumed] = await Promise.all([displayPromise, storageStream.consume()])
conversation.push({ role: "assistant", content: consumed.string() })

Tee Demo

Extract a generated section

const code = await nz(stream)
  .after("```ts\n")
  .before("```")
  .tap(sendToPreview)
  .consume()

await saveSnippet(code.string())

Extract Demo

Smooth chunky provider output

const smoothStream = nz(stream)
  .splitAfter(" ")
  .compact()
  .minInterval(40)

Timing Demo

Reference

Elements

asyncMap

nz(["hello", "world"]).asyncMap(async x => x.toUpperCase()) // => "HELLO", "WORLD"
nz(["api/users", "api/posts"]).asyncMap(async url => fetch(url).then(r => r.json())) // => [userData], [postsData]
nz(urls).asyncMap(fetchJson, { concurrency: 4 })

Transforms each value from the input stream using the provided async function. Applies the async function to each item as soon as it comes off the iterator and yields results as they complete, allowing multiple function calls to run concurrently.

Details
function asyncMap<T, U, R = any>(iterator: AsyncIterable<T, R>, fn: (value: T) => Promise<U>, options?: AsyncMapOptions): AsyncGenerator<U, R>;

Parameters

Parameter Type Description
iterator AsyncIterable<T, R> An asynchronous iterable of strings.
fn (value: T) => Promise<U> An async function that transforms each string value.
options AsyncMapOptions Optional configuration.

filter

nz(["Hello", "Hi", "World"]).filter(chunk => chunk.length > 5) // => "Hello", "World"

Filters the input stream based on a predicate function.

Details
function filter<T, R = any>(iterator: AsyncIterable<T, R>, predicate: (chunk: T) => boolean): AsyncGenerator<T, R, undefined>;

Parameters

Parameter Type Description
iterator AsyncIterable<T, R> An asynchronous iterable of strings.
predicate (chunk: T) => boolean A function that returns true for items to keep.

find

await nz(["apple", "banana", "cherry"]).find(chunk => chunk.startsWith("b")) // => "banana"

Finds the first value from the input stream that matches the predicate.

Details
function find<T>(iterator: AsyncIterable<T>, predicate: (chunk: T) => boolean): Promise<T>;

Parameters

Parameter Type Description
iterator AsyncIterable<T> An asynchronous iterable of values.
predicate (chunk: T) => boolean A function that returns true for the item to find.

map

nz(["hello", "world"]).map(x => x.toUpperCase()) // => "HELLO", "WORLD"

Transforms each value from the input stream using the provided function.

Details
function map<T, U, R = any>(iterator: AsyncIterable<T, R>, fn: (value: T) => U): AsyncGenerator<U, R, undefined>;

Parameters

Parameter Type Description
iterator AsyncIterable<T, R> An asynchronous iterable of strings.
fn (value: T) => U A function that transforms each string value.

Indexing

at

await nz(["a", "b", "c", "d", "e"]).at(2) // => "c"
await nz(["a", "b", "c", "d", "e"]).at(-1) // => "e"

Returns the element at the specified index in the input stream. Supports negative indices to count from the end.

Details
function at<T>(iterator: AsyncIterable<T>, index: number): Promise<T>;

Parameters

Parameter Type Description
iterator AsyncIterable<T> An asynchronous iterable of values.
index number The index to access. Negative values count from the end.

first

await nz(["Hello", "World", "!"]).first() // => "Hello"

Returns the first value from the input stream.

Details
function first<T>(iterator: AsyncIterable<T>): Promise<T>;

Parameters

Parameter Type Description
iterator AsyncIterable<T> An asynchronous iterable of values.

head

nz(["Hello", "World", "!"]).head() // => "Hello"

Yields only the first value from the input stream.

Details
function head<T>(iterator: AsyncIterable<T>): AsyncGenerator<T, any, any>;

Parameters

Parameter Type Description
iterator AsyncIterable<T> An asynchronous iterable of values.

See

{@link at}, {@link tail}, {@link initial}, {@link last}


initial

nz(["Hello", "World", "!"]).initial() // => "Hello", "World"

Yields all values except the last from the input stream.

Details
function initial<T>(iterator: AsyncIterable<T>): AsyncGenerator<T, any, any>;

Parameters

Parameter Type Description
iterator AsyncIterable<T> An asynchronous iterable of values.

last

await nz(["Hello", "World", "!"]).last() // => "!"

Returns the last value from the input stream.

Details
function last<T>(iterator: AsyncIterable<T>): Promise<T>;

Parameters

Parameter Type Description
iterator AsyncIterable<T> An asynchronous iterable of values.

slice

nz(["a", "b", "c", "d", "e"]).slice(1, 3) // => "b", "c"
nz(["a", "b", "c", "d", "e"]).slice(-2) // => "d", "e"

Yields a slice of the input stream between start and end indices. Supports negative indices by maintaining an internal buffer.

Details
function slice<T>(iterator: AsyncIterable<T>, start: number, end?: number): AsyncGenerator<T>;

Parameters

Parameter Type Description
iterator AsyncIterable<T> The async iterable to slice
start number Starting index (inclusive). Negative values count from end.
end number Ending index (exclusive). Negative values count from end. If undefined, slices to end.

tail

nz(["Hello", "World", "!"]).tail() // => "World", "!"

Yields all values except the first from the input stream.

Details
function tail<T>(iterator: AsyncIterable<T>): AsyncGenerator<T, any, any>;

Parameters

Parameter Type Description
iterator AsyncIterable<T> An asynchronous iterable of values.

Filtering

compact

nz(["Hello", "", "World", ""]).compact() // => "Hello", "World"

Filters out empty strings from the input stream.

Details
function compact(iterator: AsyncIterable<string>): AsyncGenerator<string>;

Parameters

Parameter Type Description
iterator AsyncIterable<string> An asynchronous iterable of strings.

takeUntil

nz([1, 2, 3, 4]).takeUntil(n => n === 3) // => 1, 2

Yields values until the predicate matches, excluding the matching value.

Details
function takeUntil<T, R = any>(source: AsyncIterable<T, R>, predicate: (value: T) => boolean): AsyncGenerator<T, R, undefined>;

Parameters

Parameter Type Description
source AsyncIterable<T, R> An asynchronous iterable of values.
predicate (value: T) => boolean A function that returns true for the value that should stop the stream.

takeWhile

nz([1, 2, 3, 1]).takeWhile(n => n < 3) // => 1, 2

Yields values while the predicate matches, excluding the first non-matching value.

Details
function takeWhile<T, R = any>(source: AsyncIterable<T, R>, predicate: (value: T) => boolean): AsyncGenerator<T, R, undefined>;

Parameters

Parameter Type Description
source AsyncIterable<T, R> An asynchronous iterable of values.
predicate (value: T) => boolean A function that returns true for values to keep.

Splitting

after

nz(["a", "b", "c", "d", "e"]).after(/bc/) // => "d", "e"

Emit everything after the accumulated prefix that matches pattern.

Built on: scan(source, regex) skipping until first match, then yielding everything after

Details
function after(source: StringIterable, pattern: string | RegExp): AsyncGenerator<string>;

Parameters

Parameter Type Description
source StringIterable stream or iterable to scan
pattern string | RegExp first RegExp that marks the cut-off

before

nz(["a", "b", "c", "d", "e"]).before("cd") // => "a", "b"

Emit everything before the accumulated prefix that contains separator.

Built on: scan(source, regex) taking text until first match

Details
function before(source: StringIterable, separator: string | RegExp): AsyncGenerator<string>;

Parameters

Parameter Type Description
source StringIterable stream or iterable to scan
separator string | RegExp string that marks the cut-off

chunk

nz(["a", "b", "c", "d", "e", "f"]).chunk(3) // => "abc", "def"

Groups input tokens into chunks of the specified size and yields the joined result. Takes N input items and yields N/size output items, where each output is the concatenation of size input items.

Details
function chunk<R = any>(source: AsyncIterable<string, R>, size: number): AsyncGenerator<string, R, undefined>;

Parameters

Parameter Type Description
source AsyncIterable<string, R> The async iterable source of strings (tokens).
size number The number of input tokens to group together in each output chunk.

split

nz(["hello,world,test"]).split(",") // => "hello", "world", "test"

Takes incoming chunks, merges them, and then splits them by a string separator.

Built on: scan(source, regex) accumulating text between matches

Details
function split(source: AsyncIterable<string>, separator: string | RegExp): AsyncGenerator<string>;

Parameters

Parameter Type Description
source AsyncIterable<string> The async iterable source of strings.
separator string | RegExp The string separator to split by.

splitAfter

nz(["hello,world,test"]).splitAfter(",") // => "hello,", "world,", "test"

Takes incoming chunks, merges them, and then splits them by a string separator, keeping the separator at the end of each part (except the last).

Built on: scan(source, regex) with separator appended to each segment

Details
function splitAfter(source: AsyncIterable<string>, separator: string | RegExp): AsyncGenerator<string>;

Parameters

Parameter Type Description
source AsyncIterable<string> The async iterable source of strings.
separator string | RegExp The string separator to split by.

splitBefore

nz(["hello,world,test"]).splitBefore(",") // => "hello", ",world", ",test"

Takes incoming chunks, merges them, and then splits them by a string separator, keeping the separator at the beginning of each part (except the first).

Built on: scan(source, regex) with separator prepended to each segment after first

Details
function splitBefore(source: AsyncIterable<string>, separator: string | RegExp): AsyncGenerator<string>;

Parameters

Parameter Type Description
source AsyncIterable<string> The async iterable source of strings.
separator string | RegExp The string separator to split by.

Accumulation

accumulate

nz(["This ", "is ", "a ", "test!"]).accumulate() // => "This ", "This is ", "This is a ", "This is a test!"

Yields a cumulative prefix of the input stream.

Details
function accumulate(iterator: AsyncIterable<string>): AsyncGenerator<string>;

Parameters

Parameter Type Description
iterator AsyncIterable<string> An asynchronous iterable of strings.

diff

nz(["This ", "This is ", "This is a ", "This is a test!"]).diff().value() // => "This ", "is ", "a ", "test!"

Yields the difference between the current and previous string in the input stream.

Details
function diff(iterator: AsyncIterable<string>): AsyncGenerator<string>;

Parameters

Parameter Type Description
iterator AsyncIterable<string> An asynchronous iterable of strings.

reduce

nz([1, 2, 3, 4]).reduce((acc, n) => acc + n, 0) // => 1, 3, 6, 10

Yields progressive accumulated values using a reducer function.

Details
function reduce<T, A>(source: AsyncIterable<T>, reducer: (accumulator: A, current: T, index: number) => A, initial: A): AsyncGenerator<A>;

Parameters

Parameter Type Description
source AsyncIterable<T> An asynchronous iterable of values.
reducer (accumulator: A, current: T, index: number) => A A function that combines the accumulator with each value.
initial A The initial accumulator value.

Transformation

flatMap

nz(["hi", "ok"]).flatMap(word => word.split("")) // => "h", "i", "o", "k"
nz([1, 2, 3]).flatMap(n => Array(n).fill(n)) // => 1, 2, 2, 3, 3, 3

Transforms each value from the input stream into zero or more output values.

Details
function flatMap<T, U, R = any>(source: AsyncIterable<T, R>, fn: (value: T) => Iterable<U, any, any> | AsyncIterable<U, any, any>): AsyncGenerator<U, R, undefined>;

Parameters

Parameter Type Description
source AsyncIterable<T, R> An asynchronous iterable of values.
fn (value: T) => Iterable<U, any, any> | AsyncIterable<U, any, any> A function that returns sync or async iterable values for each input.

flatten

nz([["a", "b"], ["c", "d"], ["e"]]).flatten() // => "a", "b", "c", "d", "e"

Flattens nested arrays or iterables into a single stream.

Details
function flatten<T>(src: Iterable<Iterable<T> | T[]>): AsyncGenerator<T>;

Parameters

Parameter Type Description
src Iterable<Iterable<T> | T[]> The source iterable containing nested arrays or iterables.

Regex

These functions use JavaScript regular expressions to search, match, and transform streaming text.

Nozzle handles matching patterns across chunk boundaries by buffering text internally. Non-matching text is yielded as soon as it's certain not to be part of a match, while potential matches are held back until confirmed.

Unsupported Features

These features throw an error because they cannot work reliably with streaming:

Feature Example Why
Lookaheads (?=...), (?!...) Content to look ahead may not have arrived yet
Lookbehinds (?<=...), (?<!...) Content to look behind may have already been yielded
Backreferences \1, \k<name> Referenced group may span chunks or be partially buffered
Multiline mode /pattern/m ^/$ would behave inconsistently at arbitrary chunk boundaries

Patterns That Delay Output

Some patterns force nozzle to buffer text longer than you might expect:

  • Open-ended quantifiers at pattern end: /hello.+/g buffers everything after "hello" until the stream ends—.+ can always match more. Use a delimiter instead: /hello[^!]+/g matches until !, allowing earlier output.

  • Alternations with shared prefixes: /cat|caterpillar/g buffers "cat" until enough text arrives to rule out "caterpillar". Put longer alternatives first: /caterpillar|cat/g.

  • Optional suffixes: /items?/g buffers "item" to check for a trailing "s". This is usually fine, but /data.*?end/g buffers from "data" until "end" appears.

Global vs Non-Global

  • Global (/pattern/g): Finds all matches throughout the stream
  • Non-global (/pattern/): Finds only the first match, then passes remaining text through unchanged

match

nz(["a", "b", "b", "a"]).match(/a([ab]*)a/g) // => ["abba", "bb"] (match arrays with capture groups)

Extracts matches of a regex pattern from the input stream.

Uses earliestPossibleMatchIndex to efficiently skip tokens as soon as we know they don't match the regex, while holding back potential matches until we can determine if they match.

Built on: scan(input, regex).filter(x => 'match' in x).map(x => x.match)

Details
function match(input: AsyncIterable<string>, regex: RegExp): AsyncGenerator<RegExpExecArray>;

Parameters

Parameter Type Description
input AsyncIterable<string> An asynchronous iterable of strings.
regex RegExp The regular expression pattern to match.

parse

// Extract UUIDs from text
nz(["Now I'm taking uuid-asdf-flkj and adding it to uuid-fslkj-alkjlsf."])
  .parse(/uuid-(?<id>\w+)-\w+/g, m => ({ id: m.groups!.id }))
// yields: "Now I'm taking ", { id: "asdf" }, " and adding it to ", { id: "fslkj" }, "."

// Parse numbers from text
nz(["The answer is 42 and also 123"])
  .parse(/\d+/g, m => parseInt(m[0], 10))
// yields: "The answer is ", 42, " and also ", 123

Parses input for regex matches, yielding text as-is and transforming matches.

This transform is useful for extracting structured data from text streams. Non-matching text is passed through as strings, while matches are transformed using the provided function.

Details
function parse<T>(input: AsyncIterable<string>, regex: RegExp, transform: (match: RegExpExecArray) => T): AsyncGenerator<string | T>;

Parameters

Parameter Type Description
input AsyncIterable<string> An asynchronous iterable of strings.
regex RegExp The regular expression pattern to match.
transform (match: RegExpExecArray) => T A function that transforms each match into a desired type.

replace

nz(["a", "b", "b", "a"]).replace(/a[ab]*a/g, "X") // => "X"

Replaces matches of a regex pattern with a replacement string in the input stream.

Uses earliestPossibleMatchIndex to efficiently yield tokens as soon as we know they don't match the regex, while holding back potential matches until we can determine if they should be replaced.

Built on: scan(input, regex).map(x => 'text' in x ? x.text : x.match[0].replace(regex, replacement))

Details
function replace(input: AsyncIterable<string>, regex: RegExp, replacement: string): AsyncGenerator<string>;

Parameters

Parameter Type Description
input AsyncIterable<string> An asynchronous iterable of strings.
regex RegExp The regular expression pattern to match.
replacement string The string to replace matches with.

scan

nz(["hello world"]).scan(/\w+/g)
// yields: { match: [...] }, { text: " " }, { match: [...] }

nz(["Now I'm taking uuid-asdf-flkj..."]).scan(/uuid-(\w+)-\w+/g)
// yields: { text: "Now I'm taking " }, { match: [...] }, { text: "..." }

Scans input for regex matches, yielding interleaved text and match results.

This is the foundational regex transform that other transforms build on. It efficiently yields non-matching text as soon as we're certain it can't match, while holding back potential matches until we can determine their boundaries.

Note: Empty text strings are never yielded.

Details
function scan(input: AsyncIterable<string>, regex: RegExp): AsyncGenerator<ScanResult>;

Parameters

Parameter Type Description
input AsyncIterable<string> An asynchronous iterable of strings.
regex RegExp The regular expression pattern to match.

Timing

minInterval

nz(["a", "b", "c"]).minInterval(100) // => "a" (0ms), "b" (100ms), "c" (200ms)

Enforces a minimum delay between adjacent tokens in a stream. The first token is yielded immediately, then subsequent tokens are delayed to ensure at least delayMs milliseconds pass between each yield.

Details
function minInterval<T>(source: AsyncIterable<T>, delayMs: number): AsyncGenerator<T>;

Parameters

Parameter Type Description
source AsyncIterable<T> The async iterable source of tokens.
delayMs number The minimum delay in milliseconds between adjacent tokens.

throttle

nz(["a", "b", "c", "d"]).throttle(100, chunks => chunks.join("")) // => "a" (0ms), "bcd" (100ms)

Throttles the output from a source, with special timing behavior:

  • The first chunk is yielded immediately
  • Subsequent chunks are batched and yielded together after the interval
  • If no chunks arrive during an interval, the next chunk is yielded immediately when it arrives
Details
function throttle<T, R = any>(source: AsyncIterable<T, R>, intervalMs: number, merge: (values: T[]) => T): AsyncGenerator<T, R>;

Parameters

Parameter Type Description
source AsyncIterable<T, R> The async iterable source of values.
intervalMs number The throttling interval in milliseconds.
merge (values: T[]) => T -

Buffering

batch

nz([1, 2, 3, 4, 5]).batch(2) // => [1, 2], [3, 4], [5]
nz(["a", "b", "c"]).batch(2).map(xs => xs.join("")) // => "ab", "c"

Groups input values into arrays of the specified size.

Details
function batch<T, R = any>(source: AsyncIterable<T, R>, size: number): AsyncGenerator<T[], R, undefined>;

Parameters

Parameter Type Description
source AsyncIterable<T, R> The async iterable source of values.
size number The number of input values to include in each batch.

buffer

nz(["a", "b", "c"]).tap(x => console.log(`consumed: ${x}`)).buffer(2).tap(x => console.log(`yielded: ${x}`)) // => consumed: a, consumed: b, yielded: a, consumed: c, yielded: b, yielded: c

Buffers up to N items from the source iterator, consuming them eagerly and yielding them on demand. If n is undefined, buffers unlimited items.

The buffer() function "slurps up" as much of the input iterator as it can as fast as it can, storing items in an internal buffer. When items are requested from the buffer, they are yielded from this pre-filled buffer. This creates a decoupling between the consumption rate and the production rate.

Details
function buffer<T, R = any>(source: AsyncIterable<T, R>, n?: number): AsyncGenerator<T, R>;

Parameters

Parameter Type Description
source AsyncIterable<T, R> The async iterable source of values.
n number The maximum number of items to buffer. If undefined, buffers unlimited items.

Side Effects

tap

nz(["Hello", "World", "!"]).tap(x => console.log(`yielded: ${x}`)) // => "Hello", "World", "!" (logs: yielded: Hello, yielded: World, yielded: !)

Executes a side effect for each value without modifying the stream.

Details
function tap<T, R = any>(iterator: AsyncIterable<T, R>, fn: (value: T) => void): AsyncGenerator<T, R, undefined>;

Parameters

Parameter Type Description
iterator AsyncIterable<T, R> An asynchronous iterable of strings.
fn (value: T) => void A function to execute for each value.

tee

const [stream1, stream2] = nz(["a", "b", "c"]).tee(2) // => Two independent streams of "a", "b", "c"

Splits a single iterator into N independent iterables.

Details
function tee<T, R = any>(iterator: AsyncIterator<T, R>, n: number): AsyncGenerator<T, R, any>[];

Parameters

Parameter Type Description
iterator AsyncIterator<T, R> The source async iterator to split.
n number Number of independent iterables to create.

Error Handling

recover

nz(stream).recover(() => ["[stream failed]"])
nz(stream).recover(() => []) // swallow the error and end

Catches an upstream error and optionally yields replacement values.

Details
function recover<T, R = any>(source: AsyncIterable<T, R>, handler: (error: unknown) => RecoverResult<T>): AsyncGenerator<T, R, undefined>;

Parameters

Parameter Type Description
source AsyncIterable<T, R> An asynchronous iterable of values.
handler (error: unknown) => RecoverResult<T> A function that returns replacement values, or nothing to end the stream.

unwrap

nz(["hello", "world"]).wrap().unwrap() // => "hello", "world"

Unwraps results from wrap() back into a normal iterator that throws/returns/yields. The opposite of wrap() - takes {value, return, error} objects and converts them back to normal iterator behavior.

Details
function unwrap<T, R = any>(iterator: AsyncIterable<{ error?: any; return?: R; value?: T }>): AsyncGenerator<T, R, any>;

Parameters

Parameter Type Description
iterator AsyncIterable<{ error?: any; return?: R; value?: T }> An asynchronous iterable of wrapped result objects.

wrap

nz(["hello", "world"]).wrap() // => {value: "hello"}, {value: "world"}, {return: undefined}

Wraps an iterator to catch any errors and return them in a result object format. Instead of throwing, errors are yielded as {error} and successful values as {value}.

Details
function wrap<T>(iterator: AsyncIterable<T>): AsyncGenerator<{ error?: unknown; return?: any; value?: T }>;

Parameters

Parameter Type Description
iterator AsyncIterable<T> An asynchronous iterable.

Return Values

mapReturn

nz(["a", "b"]).mapReturn(returnValue => returnValue?.toString() ?? "default") // => "a", "b" (with mapped return value)

Maps the return type of an iterator while preserving all yielded values unchanged.

Details
function mapReturn<T, R, U>(iterator: AsyncIterable<T, R>, fn: (value: R) => U): AsyncGenerator<T, U, undefined>;

Parameters

Parameter Type Description
iterator AsyncIterable<T, R> An asynchronous iterable.
fn (value: R) => U A function that transforms the return value.

Conversion

consume

const consumed = await nz(["a", "b"]).consume()
consumed.list()   // => ["a", "b"]
consumed.return() // => undefined (or iterator's return value)

Consumes an async iterator completely, collecting both yielded values and the return value.

Returns a ConsumedPipeline which provides access to both yielded values and return values:

  • .list() - Returns all yielded values as an array (T[])
  • .return() - Returns the iterator's return value (R)
Details
function consume<T, R>(iterator: AsyncIterable<T, R>): Promise<ConsumedPipeline<T, R>>;

Parameters

Parameter Type Description
iterator AsyncIterable<T, R> An asynchronous iterable to consume

fromList

nz(["Hello", "World", "!"]) // => "Hello", "World", "!"

Converts an array to an async iterator.

Details
function fromList<T>(list: T[]): AsyncGenerator<T>;

Parameters

Parameter Type Description
list T[] An array of values.

Functions

aperture

nz([1, 2, 3, 4, 5]).aperture(3) // => [1, 2, 3], [2, 3, 4], [3, 4, 5]

Creates a sliding window of size n over the input stream, yielding arrays of consecutive elements.

Details
function aperture<T, R = any>(source: Iterable<T, R>, n: number): AsyncGenerator<T[], R>;

Parameters

Parameter Type Description
source Iterable<T, R> An iterable to create windows over.
n number The size of each window.

window

// Simple passthrough with lookahead
nz([1, 2, 3, 4]).window(({ current, upcoming, done }) => {
  if (!done && upcoming.length === 0) {
    return { value: current, advance: 0 } // peek ahead
  }
  return { value: current } // advance by 1 (default)
})

Provides a windowed view of the stream with lookahead/lookbehind capabilities.

Details
function window<T, U, R = any>(source: Iterable<T, R>, fn: (ctx: { current: T; done: boolean; index: number; past: T[]; upcoming: T[] }) => { advance?: number; value: U }, options?: { maxPast?: number }): AsyncGenerator<U, R>;

Parameters

Parameter Type Description
source Iterable<T, R> The async iterable to window over
fn (ctx: { current: T; done: boolean; index: number; past: T[]; upcoming: T[] }) => { advance?: number; value: U } Callback receiving context and returning value and advance amount
options { maxPast?: number } Optional configuration

Testing

Install the library:

git clone https://github.com/Robert-Cunningham/nozzle
cd nozzle
npm i

Then run the tests:

npm run test

License

This library is licensed under the MIT license.

About

Nozzle is a utility library for manipulating streams of text, and in particular streamed responses from LLMs.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors