diff --git a/AGENTS.md b/AGENTS.md index 6eaae54d..c03c458a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -40,7 +40,7 @@ When making larger scale changes, check with the developer before committing to The SDK has three categories of threads: -**FFI callback thread** — The Rust FFI layer calls `LivekitFfiCallback` from a Rust-managed thread (typically a Tokio runtime thread). This single entry point deserializes the `FfiEvent` and calls `FfiClient::pushEvent`, which: +**FFI callback thread** — The Rust FFI layer calls `ffiEventCallback` from a Rust-managed thread (typically a Tokio runtime thread). This single entry point deserializes the `FfiEvent` and calls `FfiClient::pushEvent`, which: 1. Completes any pending async `std::promise` matched by `async_id`. 2. Invokes all registered `FfiClient` listeners (including `Room::onEvent`). diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 1ae3ddf4..0966816f 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -17,6 +17,7 @@ #include "ffi_client.h" #include +#include #include "data_track.pb.h" #include "e2ee.pb.h" @@ -167,7 +168,7 @@ bool FfiClient::initialize(bool capture_logs) { return false; } initialized_.store(true, std::memory_order_release); - livekit_ffi_initialize(&LivekitFfiCallback, capture_logs, LIVEKIT_BUILD_FLAVOR, LIVEKIT_BUILD_VERSION); + livekit_ffi_initialize(&ffiEventCallback, capture_logs, LIVEKIT_BUILD_FLAVOR, LIVEKIT_BUILD_VERSION); return true; } @@ -250,11 +251,19 @@ void FfiClient::pushEvent(const proto::FfiEvent& event) const { } } -void LivekitFfiCallback(const uint8_t* buf, size_t len) { +extern "C" LIVEKIT_INTERNAL_API void ffiEventCallback(const uint8_t* buf, size_t len) { proto::FfiEvent event; event.ParseFromArray(buf, static_cast(len)); // TODO: this fixes for now, what if len exceeds int? + // We are in a unrecoverable state, terminate the process + if (event.has_panic()) { + LK_LOG_CRITICAL("FFI Panic: {}", event.panic().message()); + livekit::detail::getLogger()->flush(); // Flush the logger to ensure all messages are written + std::raise(SIGTERM); + return; + } + FfiClient::instance().pushEvent(event); } diff --git a/src/ffi_client.h b/src/ffi_client.h index 5136e49e..5ea0a89a 100644 --- a/src/ffi_client.h +++ b/src/ffi_client.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -59,7 +58,7 @@ extern "C" void livekit_ffi_initialize(FfiCallbackFn cb, bool capture_logs, cons extern "C" void livekit_ffi_dispose(); -extern "C" void LivekitFfiCallback(const uint8_t* buf, size_t len); +extern "C" LIVEKIT_INTERNAL_API void ffiEventCallback(const uint8_t* buf, size_t len); // The FfiClient is used to communicate with the FFI interface of the Rust SDK // We use the generated protocol messages to facilitate the communication. @@ -195,7 +194,7 @@ class LIVEKIT_INTERNAL_API FfiClient { std::atomic next_async_id_{1}; void pushEvent(const proto::FfiEvent& event) const; - friend void LivekitFfiCallback(const uint8_t* buf, size_t len); + friend void ffiEventCallback(const uint8_t* buf, size_t len); std::atomic initialized_{false}; }; } // namespace livekit diff --git a/src/tests/unit/test_ffi_client.cpp b/src/tests/unit/test_ffi_client.cpp index d240654b..f6982ebb 100644 --- a/src/tests/unit/test_ffi_client.cpp +++ b/src/tests/unit/test_ffi_client.cpp @@ -17,7 +17,9 @@ #include #include +#include #include +#include #include #include "ffi.pb.h" @@ -25,6 +27,19 @@ namespace livekit::test { +namespace { + +volatile bool g_sigterm_received = false; + +// Has to be registered globally per csignal API +void handleSignal(int signal) { + if (signal == SIGTERM) { + g_sigterm_received = true; + } +} + +} // namespace + class FfiClientTest : public ::testing::Test { protected: void SetUp() override { @@ -140,6 +155,36 @@ TEST_F(FfiClientTest, ListenerRegistrationSurvivesShutdownReinitCycle) { EXPECT_NO_THROW(FfiClient::instance().removeListener(id)); } +TEST_F(FfiClientTest, PanicEvent) { + // Wire up a signal handler to ensure the panic event raises SIGTERM + // (and that users can handle it) + g_sigterm_received = false; + auto previous_handler = std::signal(SIGTERM, handleSignal); + ASSERT_NE(previous_handler, SIG_ERR); + + // Wire up a listener to ensure the panic event doesn't make it through + // (matches Python SDK) + bool listener_called = false; + const auto id = + FfiClient::instance().addListener([&listener_called](const proto::FfiEvent&) { listener_called = true; }); + + proto::FfiEvent event; + event.mutable_panic()->set_message("rust panic"); + std::string bytes; + ASSERT_TRUE(event.SerializeToString(&bytes)); + + testing::internal::CaptureStderr(); + ffiEventCallback(reinterpret_cast(bytes.data()), bytes.size()); + const std::string stderr_output = testing::internal::GetCapturedStderr(); + + ASSERT_NE(std::signal(SIGTERM, previous_handler), SIG_ERR); + FfiClient::instance().removeListener(id); + + EXPECT_TRUE(g_sigterm_received); + EXPECT_FALSE(listener_called); + EXPECT_NE(stderr_output.find("FFI Panic: rust panic"), std::string::npos); +} + // --------------------------------------------------------------------------- // These tests ensure FfiClient methods throw in various error conditions // ---------------------------------------------------------------------------