-
Notifications
You must be signed in to change notification settings - Fork 2.7k
refactor(rtzr): FlushSentinel-based segment control and type safety improvements #4565
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Add STT.on_vad_event() default no-op hook (documented) - Forward VAD events from AgentActivity to the active STT (best-effort) - RTZR STT can optionally consume VAD events for endpointing via use_vad_event
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughAdds typing support and a py.typed marker for the RTZR plugin, tightens type annotations and token validation in the RTZR API client, and significantly refactors the STT streaming pipeline with connection state management, idle handling, and event-based transcript processing (epd_time default increased). Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant STT as STT Instance
participant WS as WebSocket
participant Idle as Idle Watchdog
participant Events as Event Channel
Client->>STT: start(audio stream)
activate STT
Note over STT: _send_audio_task (lazy connect)
STT->>STT: _ensure_connected()
STT->>WS: open connection
WS-->>STT: connected
par Audio send
loop audio chunks
STT->>STT: build frame (AudioByteStream)
STT->>WS: send audio frame
end
and Receive processing
loop server events
WS->>STT: transcript/event
STT->>STT: _process_transcript_event()
STT->>Events: emit SpeechEvent (INTERIM/FINAL/END)
end
end
Idle->>Idle: monitor inactivity
alt idle timeout
Idle->>STT: trigger _end_segment()
STT->>WS: send EOS
end
Client->>STT: stop()
STT->>WS: close
deactivate STT
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧹 Recent nitpick comments
📜 Recent review detailsConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used📓 Path-based instructions (1)**/*.py📄 CodeRabbit inference engine (AGENTS.md)
Files:
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
🔇 Additional comments (7)
✏️ Tip: You can disable this entire section by setting Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 31f25c4bd5
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| async def _process_vad_events(self) -> None: | ||
| try: | ||
| while True: | ||
| ev = await self._vad_event_queue.get() | ||
| if ev.type == agents_vad.VADEventType.START_OF_SPEECH: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a termination path for VAD event processing
The VAD task runs an infinite while True loop awaiting self._vad_event_queue.get() and never exits on its own. Because _run awaits asyncio.gather(send_task, vad_task), once the input channel closes and send_task finishes, the stream will still hang waiting on this VAD task, leaving _run (and the stream) stuck until it is externally cancelled. This means consumers that rely on natural end-of-stream (e.g., after end_input) will hang unless they manually cancel; consider closing the queue or adding a sentinel/exit condition tied to input shutdown.
Useful? React with 👍 / 👎.
| for frame in ev.frames: | ||
| payload = frame.data.tobytes() | ||
| async with self._send_lock: | ||
| await ws.send_bytes(payload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resample VAD frames before sending to RTZR
In _handle_vad_start the code forwards ev.frames directly to the websocket. These frames come from the VAD pipeline (raw rtc.AudioFrames forwarded in audio_recognition.push_audio) and bypass the STT resampling path that normally converts input to self._rtzr_stt._params.sample_rate. If the room audio sample rate differs from the RTZR configuration (common 48 kHz input vs 8 kHz RTZR), the service receives audio at the wrong rate and transcripts/endpointing become unreliable. Consider resampling ev.frames or using the already-resampled audio from the STT input queue.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@livekit-agents/livekit/agents/stt/stt.py`:
- Around line 238-249: The type annotation on the method on_vad_event currently
uses a quoted forward reference ("VADEvent"); remove the unnecessary quotes so
the signature becomes def on_vad_event(self, ev: VADEvent) -> None: since the
module uses from __future__ import annotations. Update the annotation only (no
behavior changes), keep the docstring and return, and ensure the VADEvent symbol
is still available/imported where referenced.
In `@livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.py`:
- Around line 157-163: The token refresh check in the method that returns the
access token uses an inverted threshold and only refreshes after a token has
been expired for over an hour; update the condition that checks
token["expire_at"] to refresh proactively (use token["expire_at"] < time.time()
+ 3600) so the code calls self._refresh_token() within the last hour of
validity; ensure you still handle token is None and raise RTZRAPIError("Failed
to obtain RTZR access token") if refresh fails (references: variable token, key
"expire_at", method _refresh_token, and RTZRAPIError).
🧹 Nitpick comments (1)
livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py (1)
197-469: Consider bounding_pending_speech_framesto avoid unbounded growth.
If VAD start events are delayed or dropped, buffered audio could grow without limit. A small cap with backpressure logging would prevent memory spikes.♻️ Optional mitigation
-_pending_speech_frames: deque[bytes] = deque() +_MAX_PENDING_FRAMES = 200 +_pending_speech_frames: deque[bytes] = deque(maxlen=_MAX_PENDING_FRAMES)
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
CONTRIBUTING.mdlivekit-agents/livekit/agents/stt/stt.pylivekit-agents/livekit/agents/voice/agent_activity.pylivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/__init__.pylivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/py.typedlivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.pylivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings
Files:
livekit-agents/livekit/agents/voice/agent_activity.pylivekit-agents/livekit/agents/stt/stt.pylivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/__init__.pylivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.pylivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.py
🧠 Learnings (1)
📚 Learning: 2026-01-16T07:44:56.353Z
Learnt from: CR
Repo: livekit/agents PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-16T07:44:56.353Z
Learning: Follow the Plugin System pattern where plugins in livekit-plugins/ are separate packages registered via the Plugin base class
Applied to files:
CONTRIBUTING.md
🧬 Code graph analysis (3)
livekit-agents/livekit/agents/voice/agent_activity.py (4)
livekit-agents/livekit/agents/voice/agent_session.py (1)
stt(1256-1257)livekit-agents/livekit/agents/voice/agent.py (1)
stt(508-518)livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py (1)
on_vad_event(145-188)livekit-agents/livekit/agents/stt/stt.py (1)
on_vad_event(238-249)
livekit-agents/livekit/agents/stt/stt.py (5)
livekit-agents/livekit/agents/voice/agent_activity.py (1)
vad(2792-2793)livekit-agents/livekit/agents/voice/agent_session.py (1)
vad(1268-1269)livekit-agents/livekit/agents/voice/agent.py (1)
vad(560-570)livekit-agents/livekit/agents/vad.py (1)
VADEvent(26-68)livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py (1)
on_vad_event(145-188)
livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py (3)
livekit-agents/livekit/agents/stt/stt.py (4)
on_vad_event(238-249)stream(224-232)STT(101-264)SpeechEvent(72-76)livekit-agents/livekit/agents/vad.py (2)
VADEvent(26-68)VADEventType(19-22)livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.py (1)
build_config(238-265)
🪛 GitHub Check: ruff
livekit-agents/livekit/agents/stt/stt.py
[failure] 238-238: Ruff (UP037)
livekit-agents/livekit/agents/stt/stt.py:238:32: UP037 Remove quotes from type annotation
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: type-check (3.13)
- GitHub Check: type-check (3.9)
- GitHub Check: unit-tests
🔇 Additional comments (7)
livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/py.typed (1)
1-1: No action needed. Thepy.typedmarker file is correctly placed and will be automatically included in the package distribution by hatchling when it parsespackages = ["livekit"]in the build configuration. This follows the established pattern used across 50+ existing plugins in the codebase, all of which use the same configuration without requiring explicitpy.typedentries inpyproject.toml.Likely an incorrect or invalid review comment.
livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/__init__.py (1)
18-20: Nice clarity on__init__return type.
Clean, explicit typing without behavioral impact.CONTRIBUTING.md (1)
84-85: Good update to the mypy plugin list.
Keeps local type-checking aligned with the RTZR plugin.livekit-agents/livekit/agents/voice/agent_activity.py (1)
1226-1231: Solid defensive VAD hook propagation.
The try/except guard keeps the pipeline resilient while enabling observability.Also applies to: 1249-1254, 1272-1277
livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.py (1)
45-72: Typing + validation hardening looks good.
The structured token parsing, keyword validation, and context manager typing are clean improvements.Also applies to: 142-151, 176-184, 248-249
livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py (2)
64-195: VAD hook integration and stream registration look solid.
Nice alignment with the new STT hook and multi-stream coordination.
471-569: Recv loop changes are clear and consistent.
The transcript event emission and error handling read well.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
c69fa98 to
03e2cfa
Compare
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
…dency - Replace VAD-event-driven WebSocket lifecycle with FlushSentinel pattern - Add _StreamState enum for proper lifecycle management (IDLE/ACTIVE/CLOSING/CLOSED) - Add lazy connection (_ensure_connected) on first audio frame - Add segment termination (_end_segment): EOS → await recv → cleanup → ready for next - Add idle watchdog for auto-closing stale connections (25s timeout) - Revert on_vad_event hook from base STT class (no longer needed) - Revert VAD forwarding from agent_activity.py (no longer needed) - Remove unused self._logger from rtzrapi.py - Update epd_time default: 0.3 → 0.8 seconds
03e2cfa to
c2831a9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py`:
- Around line 281-321: In _send_audio_task, has_ended is currently persistent
across iterations causing a FlushSentinel received before a WS to bleed into the
next item; make the flag per-iteration by moving its initialization inside the
async for loop (or explicitly resetting has_ended to False whenever you don't
have an active WS after calling _ensure_connected), so that a flush only
triggers _end_segment() for the input item that carried the FlushSentinel;
target symbols: _send_audio_task, has_ended, _input_ch, _ensure_connected, _ws,
and _end_segment.
♻️ Duplicate comments (1)
livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.py (1)
154-162: Refresh threshold may allow expired tokens for up to an hour.
Ifexpire_atis an epoch timestamp, the current check refreshes only after the token has already been expired for an hour. Consider refreshing within the final hour instead.🐛 Proposed fix
- if token is None or token["expire_at"] < time.time() - 3600: + if token is None or token["expire_at"] < time.time() + 3600:RTZR authenticate response expire_at meaning and token lifetime
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
CONTRIBUTING.mdlivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/__init__.pylivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/py.typedlivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.pylivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings
Files:
livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/__init__.pylivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.pylivekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.py
🧠 Learnings (1)
📚 Learning: 2026-01-16T07:44:56.353Z
Learnt from: CR
Repo: livekit/agents PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-16T07:44:56.353Z
Learning: Follow the Plugin System pattern where plugins in livekit-plugins/ are separate packages registered via the Plugin base class
Applied to files:
CONTRIBUTING.md
🧬 Code graph analysis (1)
livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py (5)
livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.py (2)
build_config(235-262)connect_websocket(205-233)livekit-agents/livekit/agents/utils/log.py (1)
log_exceptions(9-41)livekit-agents/livekit/agents/utils/audio.py (1)
AudioByteStream(41-157)livekit-agents/livekit/agents/types.py (1)
TimedString(94-116)livekit-agents/livekit/agents/_exceptions.py (1)
APIStatusError(45-81)
🔇 Additional comments (16)
livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/py.typed (1)
1-1: LGTM —py.typedmarker added correctly.
No concerns here.CONTRIBUTING.md (1)
84-84: Mypy scope update looks good.
Including the RTZR plugin in the local type-check command keeps contributor steps aligned with CI.livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/__init__.py (1)
18-20: Explicit__init__return annotation is fine.
Improves type clarity without changing runtime behavior.livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py (8)
17-71: State/idle constants integration looks good.
The new stream state enum and idle timing constants make the lifecycle model clearer.
76-113: STT option wiring and defaults look consistent.
The updated defaults are applied cleanly through_STTOptions.
148-159: SpeechStream state fields are well-initialized.
The added connection and idle tracking fields are set up cleanly.
160-201: Connection setup and error mapping look solid.
The config build and error translation are cohesive and readable.
203-215: Run-loop cleanup ordering looks safe.
The finalization sequence should shut down cleanly.
216-279: Lifecycle helpers are clear and cohesive.
Lazy connect, EOS handling, and idle watchdog responsibilities are well-separated.
322-411: Transcript parsing helpers look good.
The extraction of word timing and transcript parsing improves readability.
414-443: Recv loop integration looks good.
Event parsing and error checks are cleanly integrated.livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.py (5)
7-48: Token TypedDict addition looks good.
Improves type safety for cached token handling.
50-101: Keyword formatting validation is robust.
The stricter parsing and bounds checks are well-structured.
141-153: Async context-manager typing looks good.
Clearer typing for__aenter__/__aexit__.
175-182: Token payload validation is solid.
The stricter structure checks reduce downstream surprises.
235-246: build_config keyword typing update is fine.
The signature now matches the supported keyword formats.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
FlushSentinel received without active WS could leave has_ended=True, causing incorrect _end_segment() call on next audio frame.
Summary
Refactor RTZR STT plugin with FlushSentinel-based WebSocket lifecycle management and improved type safety.
Motivation
_FlushSentinelpatternChanges
RTZR STT Plugin (
livekit-plugins-rtzr)Architecture refactor (
stt.py):_StreamStateenum for lifecycle management (IDLE → ACTIVE → CLOSING → IDLE cycle)_ensure_connected): WebSocket connects on first audio frame_end_segment): EOS → await final transcript → cleanup → ready for next_send_audio_task()withhas_endedpattern for FlushSentinel handling_process_transcript_event()Type safety (
rtzrapi.py):_TokenTypedDict for token response validation__aenter__/__aexit___format_keywords()type signatureself._loggerinstance variableOther:
py.typedmarker for PEP 561 complianceepd_timedefault: 0.3 → 0.8 seconds__init__How to test
Summary by CodeRabbit
Bug Fixes
Chores
✏️ Tip: You can customize this high-level summary in your review settings.