Skip to content

Audio group duration + Frame/Producer API cleanup + decoder warm-up#1007

Open
kixelated wants to merge 4 commits intomainfrom
audio-group-duration-cleanup
Open

Audio group duration + Frame/Producer API cleanup + decoder warm-up#1007
kixelated wants to merge 4 commits intomainfrom
audio-group-duration-cleanup

Conversation

@kixelated
Copy link
Collaborator

Summary

  • Remove keyframe from Frame — group boundaries are now an explicit OrderedProducer API instead of a frame field that was ignored by Frame::encode().
  • Add OrderedProducer::keyframe() for explicit group boundaries (video importers call this before writing IDR/keyframes).
  • Add OrderedProducer::with_max_group_duration() for automatic ~100ms audio groups — opus/aac importers no longer create one QUIC stream per frame (~50 streams/sec → ~10 streams/sec).
  • Add OrderedFrame return type from OrderedConsumer::read() with inlined timestamp/payload plus group sequence number and frame index (0 = keyframe).
  • Rename JS maxLatencygroupDuration in the publish audio encoder for clarity.
  • Add audio decoder warm-up in JS watch: first group primes the decoder without emitting samples, reducing initial audio glitches.

Test plan

  • just check passes (all Rust + JS/TS linting and type checks)
  • Manual test: publish audio via opus/aac and verify groups are ~100ms instead of per-frame
  • Manual test: publish video and verify keyframe-based group boundaries still work
  • Manual test: watch audio and verify warm-up group doesn't produce audible output

🤖 Generated with Claude Code

- Remove `keyframe: bool` from `Frame` — group boundaries are now an
  explicit `OrderedProducer` API, not a frame field.
- Add `OrderedProducer::keyframe()` for explicit group boundaries (video).
- Add `OrderedProducer::with_max_group_duration()` for automatic ~100ms
  audio groups instead of one QUIC stream per frame.
- Replace `OrderedConsumer::read() -> Frame` with `-> OrderedFrame` that
  inlines timestamp/payload and adds group sequence + frame index.
- Update all importers: opus/aac use auto-grouping, avc3/hev1/av01 use
  explicit `keyframe()`, fmp4 keeps inline group management.
- Rename JS `maxLatency` → `groupDuration` in publish audio encoder.
- Add decoder warm-up in JS watch: first audio group primes the decoder
  without emitting samples.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 27, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 92bdcc1 and 6a07465.

📒 Files selected for processing (1)
  • js/watch/src/audio/decoder.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • js/watch/src/audio/decoder.ts

Walkthrough

Renames Encoder's maxLatency to groupDuration and uses it for keyframe-timing. Removes the public keyframe field from Frame and adds OrderedFrame with group and frame metadata; consumer/consumer APIs switch to return/consume OrderedFrame. OrderedProducer gains with_max_group_duration and keyframe() and manages groups automatically by duration or explicit keyframe calls. Import paths stop embedding keyframe flags in frames and call track.keyframe() instead. Legacy audio decoder path adds a warm-up that drops the first three output frames.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the three main changes: audio group duration management, Frame/Producer API cleanup, and decoder warm-up.
Description check ✅ Passed The description is well-detailed and directly related to the changeset, explaining the removal of the keyframe field, new OrderedProducer API methods, OrderedFrame addition, JS renaming, and decoder warm-up.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch audio-group-duration-cleanup

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
js/publish/src/audio/encoder.ts (1)

17-29: Good rename from maxLatency to groupDuration for semantic clarity.

The new name better reflects the purpose: controlling the maximum duration of audio groups rather than a latency setting.

Minor: As per coding guidelines, consider using interface instead of type for defining object shapes in TypeScript:

-export type EncoderProps = {
+export interface EncoderProps {
   enabled?: boolean | Signal<boolean>;
   source?: Source | Signal<Source | undefined>;
   // ...
-};
+}

,

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/publish/src/audio/encoder.ts` around lines 17 - 29, Replace the exported
type alias with an interface: change the declaration of EncoderProps from a
"type" to an "interface" while keeping the same fields (enabled, source, muted,
volume, groupDuration, container) and their exact types so code consuming
EncoderProps (e.g., any references to EncoderProps in encoder.ts) remains
compatible with the new interface form.
rs/moq-mux/src/import/aac.rs (1)

113-117: Consider extracting the shared audio group duration constant.

Both opus.rs and aac.rs define identical MAX_GROUP_DURATION constants (100ms). Consider extracting this to a shared location to ensure consistency and make it easier to tune audio grouping behavior across all audio codecs.

// In a shared module, e.g., rs/moq-mux/src/import/mod.rs or a constants module:
pub const AUDIO_MAX_GROUP_DURATION: hang::container::Timestamp = 
    hang::container::Timestamp::from_millis_unchecked(100);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/import/aac.rs` around lines 113 - 117, Both aac.rs and opus.rs
define an identical MAX_GROUP_DURATION constant; extract it into a shared
constant (e.g., AUDIO_MAX_GROUP_DURATION) in a common module (for example
rs/moq-mux/src/import/mod.rs or a new constants module) and replace the local
MAX_GROUP_DURATION definitions with references to that shared constant; update
uses in functions like the OrderedProducer::with_max_group_duration call (and
any other occurrences of MAX_GROUP_DURATION) to use the new
AUDIO_MAX_GROUP_DURATION symbol so all codecs share the same 100ms value.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@js/publish/src/audio/encoder.ts`:
- Around line 17-29: Replace the exported type alias with an interface: change
the declaration of EncoderProps from a "type" to an "interface" while keeping
the same fields (enabled, source, muted, volume, groupDuration, container) and
their exact types so code consuming EncoderProps (e.g., any references to
EncoderProps in encoder.ts) remains compatible with the new interface form.

In `@rs/moq-mux/src/import/aac.rs`:
- Around line 113-117: Both aac.rs and opus.rs define an identical
MAX_GROUP_DURATION constant; extract it into a shared constant (e.g.,
AUDIO_MAX_GROUP_DURATION) in a common module (for example
rs/moq-mux/src/import/mod.rs or a new constants module) and replace the local
MAX_GROUP_DURATION definitions with references to that shared constant; update
uses in functions like the OrderedProducer::with_max_group_duration call (and
any other occurrences of MAX_GROUP_DURATION) to use the new
AUDIO_MAX_GROUP_DURATION symbol so all codecs share the same 100ms value.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a3a1120 and 0ee9102.

📒 Files selected for processing (14)
  • js/publish/src/audio/encoder.ts
  • js/watch/src/audio/decoder.ts
  • rs/hang/examples/subscribe.rs
  • rs/hang/examples/video.rs
  • rs/hang/src/container/consumer.rs
  • rs/hang/src/container/frame.rs
  • rs/hang/src/container/producer.rs
  • rs/libmoq/src/consume.rs
  • rs/moq-mux/src/import/aac.rs
  • rs/moq-mux/src/import/av01.rs
  • rs/moq-mux/src/import/avc3.rs
  • rs/moq-mux/src/import/fmp4.rs
  • rs/moq-mux/src/import/hev1.rs
  • rs/moq-mux/src/import/opus.rs
💤 Files with no reviewable changes (2)
  • rs/hang/src/container/frame.rs
  • rs/moq-mux/src/import/fmp4.rs

- Remove redundant `pending_keyframe` flag — `keyframe()` already sets
  `self.group` to None which triggers new group creation in `write()`.
- Move auto-close check to after writing so the group closes immediately
  once the next frame is estimated to exceed the limit, with a safety
  check at the start in case the estimate was wrong.
- Replace `prev_timestamp` with `group_frames` counter to estimate
  average frame duration, which handles B-frames correctly.
- Convert `with_max_group_duration` to a builder-style setter on
  `new()` for easier future extension.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/hang/src/container/producer.rs`:
- Around line 72-118: The OrderedProducer can leave an active group unfinished
when dropped; implement Drop for OrderedProducer to check self.group (and
self.group.is_some()) and call finish() on the taken group to finalize it;
specifically, in Drop for OrderedProducer, if let Some(mut group) =
self.group.take() { group.finish().ok(); } (ensure you reference the
OrderedProducer struct, its group field, and call GroupProducer::finish or the
group's finish() method) so the final group is always closed during teardown.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0ee9102 and cf67ac1.

📒 Files selected for processing (3)
  • rs/hang/src/container/producer.rs
  • rs/moq-mux/src/import/aac.rs
  • rs/moq-mux/src/import/opus.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • rs/moq-mux/src/import/aac.rs

Comment on lines 72 to 118
pub fn write(&mut self, frame: Frame) -> Result<(), Error> {
tracing::trace!(?frame, "write frame");

if frame.keyframe {
if let Some(mut group) = self.group.take() {
group.finish()?;
}

// Make sure this frame's timestamp doesn't go backwards relative to the last keyframe.
// We can't really enforce this for frames generally because b-frames suck.
if let Some(keyframe) = self.keyframe
&& frame.timestamp < keyframe
{
return Err(Error::TimestampBackwards);
}

self.keyframe = Some(frame.timestamp);
// Safety check: close the group if this frame already exceeds the max duration.
if let (Some(max_duration), Some(group_start)) = (self.max_group_duration, self.group_start)
&& self.group.is_some()
&& frame.timestamp.checked_sub(group_start).unwrap_or(Timestamp::ZERO) >= max_duration
&& let Some(mut group) = self.group.take()
{
group.finish()?;
}

let mut group = match self.group.take() {
Some(group) => group,
None if frame.keyframe => self.track.append_group()?,
// The first frame must be a keyframe.
None => return Err(Error::MissingKeyframe),
};
// Start a new group if needed (first frame, after keyframe(), or after auto-close).
if self.group.is_none() {
let group = self.track.append_group()?;
self.group = Some(group);
self.group_start = Some(frame.timestamp);
self.group_frames = 0;
}

// Encode the frame.
let mut group = self.group.take().expect("group should exist");
frame.encode(&mut group)?;

self.group.replace(group);

Ok(())
}
self.group_frames += 1;

// Estimate the next frame's timestamp and close the group now if it would exceed the limit.
// avg_frame_duration = elapsed / group_frames
// estimated_next_elapsed = elapsed + avg_frame_duration
// Rearranged to avoid division: elapsed * (frames + 1) >= max_duration * frames
if let (Some(max_duration), Some(group_start)) = (self.max_group_duration, self.group_start) {
let elapsed = frame
.timestamp
.checked_sub(group_start)
.unwrap_or(Timestamp::ZERO)
.as_micros();
let max = max_duration.as_micros();

if elapsed * (self.group_frames as u128 + 1) >= max * self.group_frames as u128
&& let Some(mut group) = self.group.take()
{
group.finish()?;
}
}

/// An explicit way to end the current group.
///
/// This is useful to flush when you know the next frame will be a keyframe.
pub fn flush(&mut self) -> Result<(), Error> {
self.group.take().ok_or(Error::MissingKeyframe)?.finish()?;
Ok(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find moq_lite::GroupProducer definition and Drop implementation
echo "=== Checking moq_lite::GroupProducer structure and Drop impl ==="
fd -e rs -p 'moq.lite' | head -20
rg -A 10 'pub struct GroupProducer' --type rs 2>/dev/null || echo "Pattern not found in search"

# Check for Drop impl on GroupProducer
rg -B 2 -A 10 'impl Drop for GroupProducer' --type rs 2>/dev/null || echo "No Drop impl found for GroupProducer"

# Check finish() method signature
rg -B 2 -A 5 'fn finish\(' --type rs 2>/dev/null | head -50

Repository: moq-dev/moq

Length of output: 813


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify current OrderedProducer teardown/finalization paths
echo "=== Checking OrderedProducer definition and Drop impl ==="
rg -B 5 -A 30 'pub struct OrderedProducer' rs/hang/src/container/producer.rs

echo ""
echo "=== Checking for existing Drop impl on OrderedProducer ==="
rg 'impl Drop for OrderedProducer' rs/hang/src/container/producer.rs || echo "No Drop impl found"

echo ""
echo "=== Checking for keyframe() and finish() calls in OrderedProducer ==="
rg -n 'fn keyframe|group\.finish' rs/hang/src/container/producer.rs

Repository: moq-dev/moq

Length of output: 1703


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check importers/callers that may rely on implicit teardown
echo "=== Checking OrderedProducer usage in importers and callers ==="
find rs -name '*.rs' -type f | xargs rg -l 'OrderedProducer' | head -10

echo ""
echo "=== Sample usages of OrderedProducer ==="
rg -B 3 -A 5 'OrderedProducer::new\(|OrderedProducer {' rs --type rs | head -80

Repository: moq-dev/moq

Length of output: 530


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Search for GroupProducer definition to understand lifecycle
echo "=== GroupProducer definition ==="
rg -B 3 -A 20 'pub struct GroupProducer' --type rs

echo ""
echo "=== Check if GroupProducer implements Drop or has finish() ==="
rg 'pub fn finish|impl.*Drop' --type rs | grep -i group

Repository: moq-dev/moq

Length of output: 131


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check actual usage patterns of OrderedProducer in callers
echo "=== OrderedProducer usage in opus.rs ==="
rg -B 2 -A 10 'OrderedProducer' rs/moq-mux/src/import/opus.rs

echo ""
echo "=== Check if any caller explicitly finalizes ==="
rg -B 5 'OrderedProducer' rs/moq-mux/src/import/aac.rs | head -40

Repository: moq-dev/moq

Length of output: 1517


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check if finish() is a required method for GroupProducer
echo "=== GroupProducer finish method ==="
rg -B 5 -A 8 'fn finish' rs/moq-lite/src --type rs

echo ""
echo "=== Check moq_lite module structure ==="
fd -e rs moq-lite | xargs grep -l 'pub struct GroupProducer'

Repository: moq-dev/moq

Length of output: 134


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# List moq-lite structure
echo "=== moq-lite source files ==="
fd . rs/moq-lite/src -type f -name '*.rs' | head -20

# Find GroupProducer definition with simpler search
echo ""
echo "=== Find GroupProducer ==="
rg 'struct GroupProducer' rs/

Repository: moq-dev/moq

Length of output: 272


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Look at actual moq-lite track module
echo "=== Check moq-lite track-related files ==="
find rs/moq-lite/src -name '*.rs' -exec grep -l 'GroupProducer' {} \;

echo ""
echo "=== Look for GroupProducer in track/group files ==="
find rs/moq-lite/src -name '*track*' -o -name '*group*' | xargs cat 2>/dev/null | head -100

Repository: moq-dev/moq

Length of output: 2867


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check how OrderedProducer is actually dropped/used in examples and tests
echo "=== Check video.rs example for OrderedProducer usage ==="
rg -B 3 -A 10 'OrderedProducer' rs/hang/examples/video.rs

echo ""
echo "=== Check if OrderedProducer lifetime is managed at stream level ==="
rg -B 5 'struct Opus|struct Aac' rs/moq-mux/src/import/opus.rs | head -30

Repository: moq-dev/moq

Length of output: 1057


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Search for GroupProducer definition directly
echo "=== Finding GroupProducer definition ==="
find rs -name '*.rs' -exec grep -l 'pub struct GroupProducer' {} \;

echo ""
echo "=== Look at track.rs for GroupProducer ==="
head -200 rs/moq-lite/src/model/track.rs | tail -100

Repository: moq-dev/moq

Length of output: 2813


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check full track.rs file size and content
echo "=== track.rs file size and GroupProducer ==="
wc -l rs/moq-lite/src/model/track.rs

echo ""
echo "=== Search for pub struct in track.rs ==="
grep -n 'pub struct' rs/moq-lite/src/model/track.rs

echo ""
echo "=== Extract GroupProducer struct ==="
sed -n '/pub struct GroupProducer/,/^}/p' rs/moq-lite/src/model/track.rs | head -50

Repository: moq-dev/moq

Length of output: 298


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check if GroupProducer has finish() method and Drop impl
echo "=== Search for finish() in track.rs ==="
grep -n 'fn finish' rs/moq-lite/src/model/track.rs

echo ""
echo "=== Search for Drop impl in track.rs ==="
grep -n 'impl Drop\|impl.*Drop' rs/moq-lite/src/model/track.rs

echo ""
echo "=== Full content of track.rs to understand GroupProducer ==="
wc -l rs/moq-lite/src/model/track.rs
cat rs/moq-lite/src/model/track.rs | head -300

Repository: moq-dev/moq

Length of output: 182


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check GroupProducer definition in group.rs
echo "=== GroupProducer in group.rs ==="
wc -l rs/moq-lite/src/model/group.rs

echo ""
echo "=== Full group.rs content (small file) ==="
cat rs/moq-lite/src/model/group.rs

Repository: moq-dev/moq

Length of output: 6260


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify if finish() is mandatory for GroupProducer
echo "=== Check GroupProducer::finish() signature and behavior ==="
grep -n 'pub fn finish\|fn finish' rs/moq-lite/src/model/group.rs

echo ""
echo "=== Check for Drop impl on GroupProducer ==="
grep -n 'impl Drop\|Drop for\|impl.*Drop' rs/moq-lite/src/model/group.rs

echo ""
echo "=== Check what finish() does ==="
sed -n '/pub fn finish/,/^[[:space:]]*}/p' rs/moq-lite/src/model/group.rs

Repository: moq-dev/moq

Length of output: 207


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check if there are any tests showing what happens with unfinished groups
echo "=== Search for tests or examples with OrderedProducer and unfinished groups ==="
rg -B 3 -A 5 'OrderedProducer|group.*finish' rs/hang/examples rs/hang/src --type rust | head -100

echo ""
echo "=== Check if video.rs example explicitly calls finish() ==="
cat rs/hang/examples/video.rs | grep -A 20 'OrderedProducer'

Repository: moq-dev/moq

Length of output: 5714


Add Drop impl to explicitly finalize active groups on teardown.

OrderedProducer.group can remain open after write() returns, especially with duration-based grouping. If the producer is dropped before the next group boundary, the final group remains unfinished. GroupProducer has no Drop impl and relies on explicit finish() calls. Without finalization, the last group's frames become inaccessible to consumers—causing tail-frame loss and potential consumer hangs. The video.rs example and production decoders (opus.rs, aac.rs) all drop OrderedProducer implicitly, exposing this gap.

Proposed fix
+impl Drop for OrderedProducer {
+	fn drop(&mut self) {
+		if let Some(mut group) = self.group.take() {
+			if let Err(err) = group.finish() {
+				tracing::warn!(?err, "failed to finish active group on drop");
+			}
+		}
+	}
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/hang/src/container/producer.rs` around lines 72 - 118, The OrderedProducer
can leave an active group unfinished when dropped; implement Drop for
OrderedProducer to check self.group (and self.group.is_some()) and call finish()
on the taken group to finalize it; specifically, in Drop for OrderedProducer, if
let Some(mut group) = self.group.take() { group.finish().ok(); } (ensure you
reference the OrderedProducer struct, its group field, and call
GroupProducer::finish or the group's finish() method) so the final group is
always closed during teardown.

kixelated and others added 2 commits February 28, 2026 18:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant