diff --git a/client-sdk-rust b/client-sdk-rust index 3a3f42d..6e47737 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit 3a3f42d7403a648c40920d60f3cf6f1e4b808aea +Subproject commit 6e477379611bf9bd728be5418d7c3309b642ba9c diff --git a/examples/simple_room/main.cpp b/examples/simple_room/main.cpp index 71b0aff..ef29ac4 100644 --- a/examples/simple_room/main.cpp +++ b/examples/simple_room/main.cpp @@ -319,7 +319,7 @@ int main(int argc, char *argv[]) { << " Creation time (ms): " << info.creation_time << "\n"; // Setup Audio Source / Track - auto audioSource = std::make_shared(44100, 1, 10); + auto audioSource = std::make_shared(44100, 1, 0); auto audioTrack = LocalAudioTrack::createLocalAudioTrack("micTrack", audioSource); @@ -385,6 +385,8 @@ int main(int argc, char *argv[]) { // Shutdown the audio / video capture threads. media.stopMic(); media.stopCamera(); + media.stopSpeaker(); + media.shutdownRenderer(); // Drain any queued tasks that might still try to update the renderer / // speaker diff --git a/examples/simple_room/sdl_media_manager.cpp b/examples/simple_room/sdl_media_manager.cpp index 6cd9d22..ac6dad7 100644 --- a/examples/simple_room/sdl_media_manager.cpp +++ b/examples/simple_room/sdl_media_manager.cpp @@ -31,6 +31,7 @@ SDLMediaManager::~SDLMediaManager() { stopMic(); stopCamera(); stopSpeaker(); + shutdownRenderer(); } bool SDLMediaManager::ensureSDLInit(Uint32 flags) { diff --git a/examples/simple_room/sdl_video_renderer.cpp b/examples/simple_room/sdl_video_renderer.cpp index 1008f38..7ba3a78 100644 --- a/examples/simple_room/sdl_video_renderer.cpp +++ b/examples/simple_room/sdl_video_renderer.cpp @@ -22,6 +22,8 @@ using namespace livekit; +constexpr int kMaxFPS = 60; + SDLVideoRenderer::SDLVideoRenderer() = default; SDLVideoRenderer::~SDLVideoRenderer() { shutdown(); } @@ -95,6 +97,16 @@ void SDLVideoRenderer::render() { return; } + // Throttle rendering to kMaxFPS + const auto now = std::chrono::steady_clock::now(); + if (last_render_time_.time_since_epoch().count() != 0) { + const auto min_interval = std::chrono::microseconds(1'000'000 / kMaxFPS); + if (now - last_render_time_ < min_interval) { + return; + } + } + last_render_time_ = now; + // 3) Read a frame from VideoStream (blocking until one is available) livekit::VideoFrameEvent vfe; bool gotFrame = stream_->read(vfe); diff --git a/examples/simple_room/sdl_video_renderer.h b/examples/simple_room/sdl_video_renderer.h index 6e666ea..fb0d41e 100644 --- a/examples/simple_room/sdl_video_renderer.h +++ b/examples/simple_room/sdl_video_renderer.h @@ -49,4 +49,5 @@ class SDLVideoRenderer { std::shared_ptr stream_; int width_ = 0; int height_ = 0; + std::chrono::steady_clock::time_point last_render_time_{}; }; diff --git a/include/livekit/audio_source.h b/include/livekit/audio_source.h index 92401de..4fbc58c 100644 --- a/include/livekit/audio_source.h +++ b/include/livekit/audio_source.h @@ -41,8 +41,34 @@ class AudioSource { * @param sample_rate Sample rate in Hz. * @param num_channels Number of channels. * @param queue_size_ms Max buffer duration for the internal queue in ms. + * + * Buffering behavior: + * ------------------- + * - queue_size_ms == 0 (recommended for real-time capture): + * Disables internal buffering entirely. Audio frames are forwarded + * directly to WebRTC sinks and consumed synchronously. + * + * This mode is optimized for real-time audio capture driven by hardware + * media callbacks (e.g. microphone capture). The caller is expected to + * provide fixed-size real-time frames (typically 10 ms per call). + * + * Because the native side consumes frames immediately, this mode + * minimizes latency and jitter and is the best choice for live capture + * scenarios. + * + * - queue_size_ms > 0 (buffered / blocking mode): + * Enables an internal queue that buffers audio up to the specified + * duration. Frames are accumulated and flushed asynchronously once the buffer + * reaches its threshold. + * + * This mode is intended for non-real-time producers (e.g. TTS engines, + * file-based audio, or agents generating audio faster or slower than + * real-time). The buffering layer smooths timing and allows the audio to + * be streamed out in real time even if the producer is bursty. + * + * queue_size_ms must be a multiple of 10. */ - AudioSource(int sample_rate, int num_channels, int queue_size_ms = 1000); + AudioSource(int sample_rate, int num_channels, int queue_size_ms = 0); virtual ~AudioSource() = default; AudioSource(const AudioSource &) = delete; @@ -86,19 +112,32 @@ class AudioSource { * callback arrives (recommended for production unless the caller needs * explicit timeout control). * - * Notes: - * - This is a blocking call. - * - timeout_ms == 0 (infinite wait) is the safest mode because it - * guarantees the callback completes before the function returns, which in - * turn guarantees that the audio buffer lifetime is fully protected. The - * caller does not need to manage or extend the frame lifetime manually. + * Blocking semantics: + * The blocking behavior of this call depends on the buffering mode selected + * at construction time: + * + * - queue_size_ms == 0 (real-time capture mode): + * Frames are consumed synchronously by the native layer. The FFI callback + * is invoked immediately as part of the capture call, so this function + * returns quickly. + * + * This mode relies on the caller being paced by a real-time media + * callback (e.g. audio hardware interrupt / capture thread). It provides the + * lowest possible latency and is ideal for live microphone capture. + * + * - queue_size_ms > 0 (buffered / non-real-time mode): + * Frames are queued internally and flushed asynchronously. This function + * will block until the buffered audio corresponding to this frame has + * been consumed by the native side and the FFI callback fires. * - * - May throw std::runtime_error if: - * • the FFI reports an error + * This mode is best suited for non-real-time audio producers (such as TTS + * engines or agents) that generate audio independently of real-time + * pacing, while still streaming audio out in real time. * - * - The underlying FFI request *must* eventually produce a callback for - * each frame. If the FFI layer is misbehaving or the event loop is stalled, - * a timeout may occur in bounded-wait mode. + * Safety notes: + * May throw std::runtime_error if: + * - the FFI reports an error + * - a timeout occurs in bounded-wait mode */ void captureFrame(const AudioFrame &frame, int timeout_ms = 20); diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 811cebb..4f2cbe7 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -15,6 +15,7 @@ */ #include +#include #include "build.h" #include "e2ee.pb.h" @@ -37,6 +38,99 @@ std::string bytesToString(const std::vector &b) { return std::string(reinterpret_cast(b.data()), b.size()); } +// Helper to log errors and throw +inline void logAndThrow(const std::string &error_msg) { + std::cerr << "LiveKit SDK Error: " << error_msg << std::endl; + throw std::runtime_error(error_msg); +} + +std::optional ExtractAsyncId(const proto::FfiEvent &event) { + using E = proto::FfiEvent; + switch (event.message_case()) { + case E::kConnect: + return event.connect().async_id(); + case E::kDisconnect: + return event.disconnect().async_id(); + case E::kDispose: + return event.dispose().async_id(); + case E::kPublishTrack: + return event.publish_track().async_id(); + case E::kUnpublishTrack: + return event.unpublish_track().async_id(); + case E::kPublishData: + return event.publish_data().async_id(); + case E::kPublishTranscription: + return event.publish_transcription().async_id(); + case E::kCaptureAudioFrame: + return event.capture_audio_frame().async_id(); + case E::kSetLocalMetadata: + return event.set_local_metadata().async_id(); + case E::kSetLocalName: + return event.set_local_name().async_id(); + case E::kSetLocalAttributes: + return event.set_local_attributes().async_id(); + case E::kGetStats: + return event.get_stats().async_id(); + case E::kGetSessionStats: + return event.get_session_stats().async_id(); + case E::kPublishSipDtmf: + return event.publish_sip_dtmf().async_id(); + case E::kChatMessage: + return event.chat_message().async_id(); + case E::kPerformRpc: + return event.perform_rpc().async_id(); + + // low-level data stream callbacks + case E::kSendStreamHeader: + return event.send_stream_header().async_id(); + case E::kSendStreamChunk: + return event.send_stream_chunk().async_id(); + case E::kSendStreamTrailer: + return event.send_stream_trailer().async_id(); + + // high-level + case E::kByteStreamReaderReadAll: + return event.byte_stream_reader_read_all().async_id(); + case E::kByteStreamReaderWriteToFile: + return event.byte_stream_reader_write_to_file().async_id(); + case E::kByteStreamOpen: + return event.byte_stream_open().async_id(); + case E::kByteStreamWriterWrite: + return event.byte_stream_writer_write().async_id(); + case E::kByteStreamWriterClose: + return event.byte_stream_writer_close().async_id(); + case E::kSendFile: + return event.send_file().async_id(); + + case E::kTextStreamReaderReadAll: + return event.text_stream_reader_read_all().async_id(); + case E::kTextStreamOpen: + return event.text_stream_open().async_id(); + case E::kTextStreamWriterWrite: + return event.text_stream_writer_write().async_id(); + case E::kTextStreamWriterClose: + return event.text_stream_writer_close().async_id(); + case E::kSendText: + return event.send_text().async_id(); + case E::kSendBytes: + return event.send_bytes().async_id(); + + // NOT async completion: + case E::kRoomEvent: + case E::kTrackEvent: + case E::kVideoStreamEvent: + case E::kAudioStreamEvent: + case E::kByteStreamReaderEvent: + case E::kTextStreamReaderEvent: + case E::kRpcMethodInvocation: + case E::kLogs: + case E::kPanic: + case E::MESSAGE_NOT_SET: + default: + return std::nullopt; + } +} + } // namespace FfiClient::~FfiClient() { @@ -70,7 +164,7 @@ bool FfiClient::isInitialized() const noexcept { FfiClient::ListenerId FfiClient::AddListener(const FfiClient::Listener &listener) { std::lock_guard guard(lock_); - FfiClient::ListenerId id = nextListenerId++; + FfiClient::ListenerId id = next_listener_id++; listeners_[id] = listener; return id; } @@ -110,34 +204,33 @@ FfiClient::sendRequest(const proto::FfiRequest &request) const { } void FfiClient::PushEvent(const proto::FfiEvent &event) const { - std::vector> to_complete; + std::unique_ptr to_complete; + std::vector listeners_copy; { std::lock_guard guard(lock_); - for (auto it = pending_.begin(); it != pending_.end();) { - if ((*it)->matches(event)) { - to_complete.push_back(std::move(*it)); - it = pending_.erase(it); - } else { - ++it; + + // Complete pending future if this event is a callback with async_id + if (auto async_id = ExtractAsyncId(event)) { + auto it = pending_by_id_.find(*async_id); + if (it != pending_by_id_.end() && it->second && + it->second->matches(event)) { + to_complete = std::move(it->second); + pending_by_id_.erase(it); } } - } - - // Run handlers outside lock - for (auto &p : to_complete) { - p->complete(event); - } - // Notify listeners. Note, we copy the listeners here to avoid calling into - // the listeners under the lock, which could potentially cause deadlock. - std::vector listeners_copy; - { - std::lock_guard guard(lock_); + // Snapshot listeners listeners_copy.reserve(listeners_.size()); - for (auto &[_, listener] : listeners_) { - listeners_copy.push_back(listener); + for (const auto &kv : listeners_) { + listeners_copy.push_back(kv.second); } } + // Run handler outside lock + if (to_complete) { + to_complete->complete(event); + } + + // Notify listeners outside lock for (auto &listener : listeners_copy) { listener(event); } @@ -150,17 +243,39 @@ void LivekitFfiCallback(const uint8_t *buf, size_t len) { FfiClient::instance().PushEvent(event); } +FfiClient::AsyncId FfiClient::generateAsyncId() { + return next_async_id_.fetch_add(1, std::memory_order_relaxed); +} + +bool FfiClient::cancelPendingByAsyncId(AsyncId async_id) { + std::unique_ptr to_cancel; + { + std::lock_guard guard(lock_); + auto it = pending_by_id_.find(async_id); + if (it != pending_by_id_.end()) { + to_cancel = std::move(it->second); + pending_by_id_.erase(it); + } + } + if (to_cancel) { + to_cancel->cancel(); + return true; + } + return false; +} + template std::future FfiClient::registerAsync( - std::function match, + AsyncId async_id, std::function match, std::function &)> handler) { auto pending = std::make_unique>(); + pending->async_id = async_id; auto fut = pending->promise.get_future(); pending->match = std::move(match); pending->handler = std::move(handler); { std::lock_guard guard(lock_); - pending_.push_back(std::move(pending)); + pending_by_id_.emplace(async_id, std::move(pending)); } return fut; } @@ -170,10 +285,35 @@ std::future FfiClient::connectAsync(const std::string &url, const std::string &token, const RoomOptions &options) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, + // match lambda: is this the connect event with our async_id? + [async_id](const proto::FfiEvent &event) { + return event.has_connect() && event.connect().async_id() == async_id; + }, + // handler lambda: fill the promise with RoomInfo or an exception + [](const proto::FfiEvent &event, + std::promise &pr) { + const auto &connectCb = event.connect(); + if (!connectCb.error().empty()) { + pr.set_exception( + std::make_exception_ptr(std::runtime_error(connectCb.error()))); + return; + } + + pr.set_value(connectCb); + }); + + // Build and send the request proto::FfiRequest req; auto *connect = req.mutable_connect(); connect->set_url(url); connect->set_token(token); + connect->set_request_async_id(async_id); auto *opts = connect->mutable_options(); opts->set_auto_subscribe(options.auto_subscribe); opts->set_dynacast(options.dynacast); @@ -245,50 +385,29 @@ FfiClient::connectAsync(const std::string &url, const std::string &token, } } } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_connect()) { - throw std::runtime_error("FfiResponse missing connect"); - } - const AsyncId async_id = resp.connect().async_id(); - - // Now we register an async op that completes with RoomInfo - return registerAsync( - // match lambda: is this the connect event with our async_id? - [async_id](const proto::FfiEvent &event) { - return event.has_connect() && event.connect().async_id() == async_id; - }, - // handler lambda: fill the promise with RoomInfo or an exception - [](const proto::FfiEvent &event, - std::promise &pr) { - const auto &connectCb = event.connect(); - if (!connectCb.error().empty()) { - pr.set_exception( - std::make_exception_ptr(std::runtime_error(connectCb.error()))); - return; - } + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_connect()) { + logAndThrow("FfiResponse missing connect"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } - pr.set_value(connectCb); - }); + return fut; } // Track APIs Implementation std::future> FfiClient::getTrackStatsAsync(uintptr_t track_handle) { - proto::FfiRequest req; - auto *get_stats_req = req.mutable_get_stats(); - get_stats_req->set_track_handle(track_handle); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_get_stats()) { - throw std::runtime_error("FfiResponse missing get_stats"); - } - - const AsyncId async_id = resp.get_stats().async_id(); + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - // Register pending op: - // - match: event.has_get_stats() && ids equal - // - handler: convert proto stats to C++ wrapper + fulfill promise - return registerAsync>( + // Register the async handler BEFORE sending the request + auto fut = registerAsync>( + async_id, // match [async_id](const proto::FfiEvent &event) { return event.has_get_stats() && @@ -312,6 +431,24 @@ FfiClient::getTrackStatsAsync(uintptr_t track_handle) { } pr.set_value(std::move(stats_vec)); }); + + // Build and send the request + proto::FfiRequest req; + auto *get_stats_req = req.mutable_get_stats(); + get_stats_req->set_track_handle(track_handle); + get_stats_req->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_get_stats()) { + logAndThrow("FfiResponse missing get_stats"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } // Participant APIs Implementation @@ -319,19 +456,12 @@ std::future FfiClient::publishTrackAsync(std::uint64_t local_participant_handle, std::uint64_t track_handle, const TrackPublishOptions &options) { - proto::FfiRequest req; - auto *msg = req.mutable_publish_track(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_track_handle(track_handle); - auto optionProto = toProto(options); - msg->mutable_options()->CopyFrom(optionProto); + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_publish_track()) { - throw std::runtime_error("FfiResponse missing publish_track"); - } - const AsyncId async_id = resp.publish_track().async_id(); - return registerAsync( + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, // Match: is this our PublishTrackCallback? [async_id](const proto::FfiEvent &event) { return event.has_publish_track() && @@ -358,23 +488,39 @@ FfiClient::publishTrackAsync(std::uint64_t local_participant_handle, proto::OwnedTrackPublication pub = cb.publication(); pr.set_value(std::move(pub)); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_publish_track(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_track_handle(track_handle); + msg->set_request_async_id(async_id); + auto optionProto = toProto(options); + msg->mutable_options()->CopyFrom(optionProto); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_publish_track()) { + logAndThrow("FfiResponse missing publish_track"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::unpublishTrackAsync(std::uint64_t local_participant_handle, const std::string &track_sid, bool stop_on_unpublish) { - proto::FfiRequest req; - auto *msg = req.mutable_unpublish_track(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_track_sid(track_sid); - msg->set_stop_on_unpublish(stop_on_unpublish); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_unpublish_track()) { - throw std::runtime_error("FfiResponse missing unpublish_track"); - } - const AsyncId async_id = resp.unpublish_track().async_id(); - return registerAsync( + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &event) { return event.has_unpublish_track() && event.unpublish_track().async_id() == async_id; @@ -388,6 +534,26 @@ FfiClient::unpublishTrackAsync(std::uint64_t local_participant_handle, } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_unpublish_track(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_track_sid(track_sid); + msg->set_stop_on_unpublish(stop_on_unpublish); + msg->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_unpublish_track()) { + logAndThrow("FfiResponse missing unpublish_track"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::publishDataAsync( @@ -395,23 +561,12 @@ std::future FfiClient::publishDataAsync( std::uint64_t data_len, bool reliable, const std::vector &destination_identities, const std::string &topic) { - proto::FfiRequest req; - auto *msg = req.mutable_publish_data(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_data_ptr(reinterpret_cast(data_ptr)); - msg->set_data_len(data_len); - msg->set_reliable(reliable); - msg->set_topic(topic); - for (const auto &id : destination_identities) { - msg->add_destination_identities(id); - } + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_publish_data()) { - throw std::runtime_error("FfiResponse missing publish_data"); - } - const AsyncId async_id = resp.publish_data().async_id(); - return registerAsync( + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &event) { return event.has_publish_data() && event.publish_data().async_id() == async_id; @@ -425,26 +580,43 @@ std::future FfiClient::publishDataAsync( } pr.set_value(); }); -} -std::future FfiClient::publishSipDtmfAsync( - std::uint64_t local_participant_handle, std::uint32_t code, - const std::string &digit, - const std::vector &destination_identities) { + // Build and send the request proto::FfiRequest req; - auto *msg = req.mutable_publish_sip_dtmf(); + auto *msg = req.mutable_publish_data(); msg->set_local_participant_handle(local_participant_handle); - msg->set_code(code); - msg->set_digit(digit); + msg->set_data_ptr(reinterpret_cast(data_ptr)); + msg->set_data_len(data_len); + msg->set_reliable(reliable); + msg->set_topic(topic); + msg->set_request_async_id(async_id); for (const auto &id : destination_identities) { msg->add_destination_identities(id); } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_publish_sip_dtmf()) { - throw std::runtime_error("FfiResponse missing publish_sip_dtmf"); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_publish_data()) { + logAndThrow("FfiResponse missing publish_data"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; } - const AsyncId async_id = resp.publish_sip_dtmf().async_id(); - return registerAsync( + + return fut; +} + +std::future FfiClient::publishSipDtmfAsync( + std::uint64_t local_participant_handle, std::uint32_t code, + const std::string &digit, + const std::vector &destination_identities) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &event) { return event.has_publish_sip_dtmf() && event.publish_sip_dtmf().async_id() == async_id; @@ -458,21 +630,40 @@ std::future FfiClient::publishSipDtmfAsync( } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_publish_sip_dtmf(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_code(code); + msg->set_digit(digit); + msg->set_request_async_id(async_id); + for (const auto &id : destination_identities) { + msg->add_destination_identities(id); + } + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_publish_sip_dtmf()) { + logAndThrow("FfiResponse missing publish_sip_dtmf"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::setLocalMetadataAsync(std::uint64_t local_participant_handle, const std::string &metadata) { - proto::FfiRequest req; - auto *msg = req.mutable_set_local_metadata(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_metadata(metadata); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_set_local_metadata()) { - throw std::runtime_error("FfiResponse missing set_local_metadata"); - } - const AsyncId async_id = resp.set_local_metadata().async_id(); - return registerAsync( + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &event) { return event.has_set_local_metadata() && event.set_local_metadata().async_id() == async_id; @@ -486,24 +677,36 @@ FfiClient::setLocalMetadataAsync(std::uint64_t local_participant_handle, } pr.set_value(); }); -} -std::future -FfiClient::captureAudioFrameAsync(std::uint64_t source_handle, - const proto::AudioFrameBufferInfo &buffer) { + // Build and send the request proto::FfiRequest req; - auto *msg = req.mutable_capture_audio_frame(); - msg->set_source_handle(source_handle); - msg->mutable_buffer()->CopyFrom(buffer); + auto *msg = req.mutable_set_local_metadata(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_metadata(metadata); + msg->set_request_async_id(async_id); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_capture_audio_frame()) { - throw std::runtime_error("FfiResponse missing capture_audio_frame"); + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_set_local_metadata()) { + logAndThrow("FfiResponse missing set_local_metadata"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; } - const AsyncId async_id = resp.capture_audio_frame().async_id(); + return fut; +} - return registerAsync( +std::future +FfiClient::captureAudioFrameAsync(std::uint64_t source_handle, + const proto::AudioFrameBufferInfo &buffer) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, // match predicate [async_id](const proto::FfiEvent &event) { return event.has_capture_audio_frame() && @@ -519,6 +722,25 @@ FfiClient::captureAudioFrameAsync(std::uint64_t source_handle, } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_capture_audio_frame(); + msg->set_source_handle(source_handle); + msg->set_request_async_id(async_id); + msg->mutable_buffer()->CopyFrom(buffer); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_capture_audio_frame()) { + logAndThrow("FfiResponse missing capture_audio_frame"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future @@ -527,21 +749,12 @@ FfiClient::performRpcAsync(std::uint64_t local_participant_handle, const std::string &method, const std::string &payload, std::optional response_timeout_ms) { - proto::FfiRequest req; - auto *msg = req.mutable_perform_rpc(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_destination_identity(destination_identity); - msg->set_method(method); - msg->set_payload(payload); - if (response_timeout_ms.has_value()) { - msg->set_response_timeout_ms(*response_timeout_ms); - } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_perform_rpc()) { - throw std::runtime_error("FfiResponse missing perform_rpc"); - } - const AsyncId async_id = resp.perform_rpc().async_id(); - return registerAsync( + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, // match predicate [async_id](const proto::FfiEvent &event) { return event.has_perform_rpc() && @@ -558,6 +771,30 @@ FfiClient::performRpcAsync(std::uint64_t local_participant_handle, } pr.set_value(cb.payload()); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_perform_rpc(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_destination_identity(destination_identity); + msg->set_method(method); + msg->set_payload(payload); + msg->set_request_async_id(async_id); + if (response_timeout_ms.has_value()) { + msg->set_response_timeout_ms(*response_timeout_ms); + } + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_perform_rpc()) { + logAndThrow("FfiResponse missing perform_rpc"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::sendStreamHeaderAsync( @@ -565,22 +802,12 @@ std::future FfiClient::sendStreamHeaderAsync( const proto::DataStream::Header &header, const std::vector &destination_identities, const std::string &sender_identity) { - proto::FfiRequest req; - auto *msg = req.mutable_send_stream_header(); - msg->set_local_participant_handle(local_participant_handle); - *msg->mutable_header() = header; - msg->set_sender_identity(sender_identity); - for (const auto &id : destination_identities) { - msg->add_destination_identities(id); - } - - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_send_stream_header()) { - throw std::runtime_error("FfiResponse missing send_stream_header"); - } - const AsyncId async_id = resp.send_stream_header().async_id(); + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - return registerAsync( + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &e) { return e.has_send_stream_header() && e.send_stream_header().async_id() == async_id; @@ -594,28 +821,42 @@ std::future FfiClient::sendStreamHeaderAsync( } pr.set_value(); }); -} -std::future FfiClient::sendStreamChunkAsync( - std::uint64_t local_participant_handle, - const proto::DataStream::Chunk &chunk, - const std::vector &destination_identities, - const std::string &sender_identity) { + // Build and send the request proto::FfiRequest req; - auto *msg = req.mutable_send_stream_chunk(); + auto *msg = req.mutable_send_stream_header(); msg->set_local_participant_handle(local_participant_handle); - *msg->mutable_chunk() = chunk; + *msg->mutable_header() = header; msg->set_sender_identity(sender_identity); + msg->set_request_async_id(async_id); for (const auto &id : destination_identities) { msg->add_destination_identities(id); } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_send_stream_chunk()) { - throw std::runtime_error("FfiResponse missing send_stream_chunk"); + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_send_stream_header()) { + logAndThrow("FfiResponse missing send_stream_header"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; } - const AsyncId async_id = resp.send_stream_chunk().async_id(); - return registerAsync( + + return fut; +} + +std::future FfiClient::sendStreamChunkAsync( + std::uint64_t local_participant_handle, + const proto::DataStream::Chunk &chunk, + const std::vector &destination_identities, + const std::string &sender_identity) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &e) { return e.has_send_stream_chunk() && e.send_stream_chunk().async_id() == async_id; @@ -629,25 +870,41 @@ std::future FfiClient::sendStreamChunkAsync( } pr.set_value(); }); -} -std::future -FfiClient::sendStreamTrailerAsync(std::uint64_t local_participant_handle, - const proto::DataStream::Trailer &trailer, - const std::string &sender_identity) { + // Build and send the request proto::FfiRequest req; - auto *msg = req.mutable_send_stream_trailer(); + auto *msg = req.mutable_send_stream_chunk(); msg->set_local_participant_handle(local_participant_handle); - *msg->mutable_trailer() = trailer; + *msg->mutable_chunk() = chunk; msg->set_sender_identity(sender_identity); + msg->set_request_async_id(async_id); + for (const auto &id : destination_identities) { + msg->add_destination_identities(id); + } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_send_stream_trailer()) { - throw std::runtime_error("FfiResponse missing send_stream_trailer"); + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_send_stream_chunk()) { + logAndThrow("FfiResponse missing send_stream_chunk"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; } - const AsyncId async_id = resp.send_stream_trailer().async_id(); - return registerAsync( + return fut; +} + +std::future +FfiClient::sendStreamTrailerAsync(std::uint64_t local_participant_handle, + const proto::DataStream::Trailer &trailer, + const std::string &sender_identity) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &e) { return e.has_send_stream_trailer() && e.send_stream_trailer().async_id() == async_id; @@ -661,6 +918,26 @@ FfiClient::sendStreamTrailerAsync(std::uint64_t local_participant_handle, } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_send_stream_trailer(); + msg->set_local_participant_handle(local_participant_handle); + *msg->mutable_trailer() = trailer; + msg->set_sender_identity(sender_identity); + msg->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_send_stream_trailer()) { + logAndThrow("FfiResponse missing send_stream_trailer"); + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } } // namespace livekit diff --git a/src/ffi_client.h b/src/ffi_client.h index 36cf72c..667100e 100644 --- a/src/ffi_client.h +++ b/src/ffi_client.h @@ -17,11 +17,13 @@ #ifndef LIVEKIT_FFI_CLIENT_H #define LIVEKIT_FFI_CLIENT_H +#include #include #include #include #include #include +#include #include #include "livekit/stats.h" @@ -147,9 +149,11 @@ class FfiClient { // Base class for type-erased pending ops struct PendingBase { + AsyncId async_id = 0; // Client-generated async ID for cancellation virtual ~PendingBase() = default; virtual bool matches(const proto::FfiEvent &event) const = 0; virtual void complete(const proto::FfiEvent &event) = 0; + virtual void cancel() = 0; // Cancel the pending operation }; template struct Pending : PendingBase { std::promise promise; @@ -163,17 +167,35 @@ class FfiClient { void complete(const proto::FfiEvent &event) override { handler(event, promise); } + + void cancel() override { + try { + promise.set_exception(std::make_exception_ptr( + std::runtime_error("Async operation cancelled"))); + } catch (const std::future_error &) { + // already satisfied + } + } }; template std::future registerAsync( - std::function match, + AsyncId async_id, std::function match, std::function &)> handler); + // Generate a unique client-side async ID for request correlation + AsyncId generateAsyncId(); + + // Cancel a pending async operation by its async_id. Returns true if found and + // removed. + bool cancelPendingByAsyncId(AsyncId async_id); + std::unordered_map listeners_; - ListenerId nextListenerId = 1; + std::atomic next_listener_id{1}; mutable std::mutex lock_; - mutable std::vector> pending_; + mutable std::unordered_map> + pending_by_id_; + std::atomic next_async_id_{1}; void PushEvent(const proto::FfiEvent &event) const; friend void LivekitFfiCallback(const uint8_t *buf, size_t len);