Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ When making larger scale changes, check with the developer before committing to

The SDK has three categories of threads:

**FFI callback thread** — The Rust FFI layer calls `LivekitFfiCallback` from a Rust-managed thread (typically a Tokio runtime thread). This single entry point deserializes the `FfiEvent` and calls `FfiClient::pushEvent`, which:
**FFI callback thread** — The Rust FFI layer calls `ffiEventCallback` from a Rust-managed thread (typically a Tokio runtime thread). This single entry point deserializes the `FfiEvent` and calls `FfiClient::pushEvent`, which:
1. Completes any pending async `std::promise` matched by `async_id`.
2. Invokes all registered `FfiClient` listeners (including `Room::onEvent`).

Expand Down
13 changes: 11 additions & 2 deletions src/ffi_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "ffi_client.h"

#include <cassert>
#include <csignal>

#include "data_track.pb.h"
#include "e2ee.pb.h"
Expand Down Expand Up @@ -167,7 +168,7 @@ bool FfiClient::initialize(bool capture_logs) {
return false;
}
initialized_.store(true, std::memory_order_release);
livekit_ffi_initialize(&LivekitFfiCallback, capture_logs, LIVEKIT_BUILD_FLAVOR, LIVEKIT_BUILD_VERSION);
livekit_ffi_initialize(&ffiEventCallback, capture_logs, LIVEKIT_BUILD_FLAVOR, LIVEKIT_BUILD_VERSION);
return true;
}

Expand Down Expand Up @@ -250,11 +251,19 @@ void FfiClient::pushEvent(const proto::FfiEvent& event) const {
}
}

void LivekitFfiCallback(const uint8_t* buf, size_t len) {
extern "C" LIVEKIT_INTERNAL_API void ffiEventCallback(const uint8_t* buf, size_t len) {
proto::FfiEvent event;
event.ParseFromArray(buf,
static_cast<int>(len)); // TODO: this fixes for now, what if len exceeds int?

// We are in a unrecoverable state, terminate the process
if (event.has_panic()) {
LK_LOG_CRITICAL("FFI Panic: {}", event.panic().message());
livekit::detail::getLogger()->flush(); // Flush the logger to ensure all messages are written
std::raise(SIGTERM);
return;
}

FfiClient::instance().pushEvent(event);
}

Expand Down
5 changes: 2 additions & 3 deletions src/ffi_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <cstdint>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <optional>
Expand Down Expand Up @@ -59,7 +58,7 @@ extern "C" void livekit_ffi_initialize(FfiCallbackFn cb, bool capture_logs, cons

extern "C" void livekit_ffi_dispose();

extern "C" void LivekitFfiCallback(const uint8_t* buf, size_t len);
extern "C" LIVEKIT_INTERNAL_API void ffiEventCallback(const uint8_t* buf, size_t len);

// The FfiClient is used to communicate with the FFI interface of the Rust SDK
// We use the generated protocol messages to facilitate the communication.
Expand Down Expand Up @@ -195,7 +194,7 @@ class LIVEKIT_INTERNAL_API FfiClient {
std::atomic<AsyncId> next_async_id_{1};

void pushEvent(const proto::FfiEvent& event) const;
friend void LivekitFfiCallback(const uint8_t* buf, size_t len);
friend void ffiEventCallback(const uint8_t* buf, size_t len);
std::atomic<bool> initialized_{false};
};
} // namespace livekit
45 changes: 45 additions & 0 deletions src/tests/unit/test_ffi_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,29 @@
#include <gtest/gtest.h>
#include <livekit/livekit.h>

#include <csignal>
#include <stdexcept>
#include <string>
#include <unordered_set>

#include "ffi.pb.h"
#include "ffi_client.h"

namespace livekit::test {

namespace {

volatile bool g_sigterm_received = false;

// Has to be registered globally per csignal API
void handleSignal(int signal) {
if (signal == SIGTERM) {
g_sigterm_received = true;
}
}

} // namespace

class FfiClientTest : public ::testing::Test {
protected:
void SetUp() override {
Expand Down Expand Up @@ -140,6 +155,36 @@ TEST_F(FfiClientTest, ListenerRegistrationSurvivesShutdownReinitCycle) {
EXPECT_NO_THROW(FfiClient::instance().removeListener(id));
}

TEST_F(FfiClientTest, PanicEvent) {
// Wire up a signal handler to ensure the panic event raises SIGTERM
// (and that users can handle it)
g_sigterm_received = false;
auto previous_handler = std::signal(SIGTERM, handleSignal);
ASSERT_NE(previous_handler, SIG_ERR);

// Wire up a listener to ensure the panic event doesn't make it through
// (matches Python SDK)
bool listener_called = false;
const auto id =
FfiClient::instance().addListener([&listener_called](const proto::FfiEvent&) { listener_called = true; });

proto::FfiEvent event;
event.mutable_panic()->set_message("rust panic");
std::string bytes;
ASSERT_TRUE(event.SerializeToString(&bytes));

testing::internal::CaptureStderr();
ffiEventCallback(reinterpret_cast<const std::uint8_t*>(bytes.data()), bytes.size());
const std::string stderr_output = testing::internal::GetCapturedStderr();

ASSERT_NE(std::signal(SIGTERM, previous_handler), SIG_ERR);
FfiClient::instance().removeListener(id);

EXPECT_TRUE(g_sigterm_received);
EXPECT_FALSE(listener_called);
EXPECT_NE(stderr_output.find("FFI Panic: rust panic"), std::string::npos);
}

// ---------------------------------------------------------------------------
// These tests ensure FfiClient methods throw in various error conditions
// ---------------------------------------------------------------------------
Expand Down
Loading