diff --git a/lark_oapi/channel/channel.py b/lark_oapi/channel/channel.py index 8d4cfa1e..71c695b2 100644 --- a/lark_oapi/channel/channel.py +++ b/lark_oapi/channel/channel.py @@ -853,9 +853,7 @@ def stop(self, *, join_timeout: float = 5.0) -> None: def _stop_private_ws_client(self, ws: Any) -> None: disconnect = getattr(ws, "_disconnect", None) try: - from lark_oapi.ws import client as ws_client_module - - ws_loop = getattr(ws_client_module, "loop", None) + ws_loop = getattr(ws, "_loop", None) if callable(disconnect) and ws_loop is not None: if ws_loop.is_running(): try: diff --git a/lark_oapi/core/const.py b/lark_oapi/core/const.py index 5f4f840f..becc6cc6 100644 --- a/lark_oapi/core/const.py +++ b/lark_oapi/core/const.py @@ -1,6 +1,6 @@ # Info PROJECT = "oapi-sdk-python" -VERSION = "1.7.0" +VERSION = "1.8.0" # Domain FEISHU_DOMAIN = "https://open.feishu.cn" diff --git a/lark_oapi/ws/client.py b/lark_oapi/ws/client.py index 8ee99183..556e478e 100644 --- a/lark_oapi/ws/client.py +++ b/lark_oapi/ws/client.py @@ -28,12 +28,6 @@ from lark_oapi.ws.pb.google.protobuf.internal.containers import RepeatedCompositeFieldContainer from lark_oapi.ws.pb.pbbp2_pb2 import Frame -try: - loop = asyncio.get_event_loop() -except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - def _get_by_key(headers: RepeatedCompositeFieldContainer, key: str) -> str: for header in headers: @@ -139,6 +133,11 @@ def __init__(self, # modules (e.g. FeishuChannel) pass ``["channel"]`` here. self._user_agent: str = build_user_agent(source=source, extra_tags=extra_ua_tags) self._conn: Optional[websockets.WebSocketClientProtocol] = None + # Event loop this client is driven on. Set when start()/start_async() + # begins; higher-level wrappers (e.g. FeishuChannel) read it to stop the + # connection loop. Replaces the former module-level ``loop`` global so + # nothing is captured at import time (see issues #96 / #133). + self._loop: Optional[asyncio.AbstractEventLoop] = None self._conn_url: str = "" self._service_id: str = "" self._conn_id: str = "" @@ -162,21 +161,64 @@ def __init__(self, logger.setLevel(log_level.value) def start(self) -> None: + """Start the client (blocking) on a freshly created event loop. + + Backward-compatible entry point for synchronous callers (plain scripts, + or ``FeishuChannel`` driving this client from a worker thread). When an + event loop is already running in the current thread — e.g. inside an + async framework such as FastAPI or aiohttp — this client cannot own the + loop; callers must ``await client.start_async()`` instead, so we raise a + clear, actionable error rather than the cryptic native asyncio message. + """ try: - loop.run_until_complete(self._connect()) + asyncio.get_running_loop() + except RuntimeError: + pass # no running loop in this thread — safe to own one + else: + raise RuntimeError( + "ws.Client.start() cannot be called while an event loop is " + "running in the current thread; use 'await client.start_async()' " + "instead (e.g. inside FastAPI/aiohttp or any async framework)." + ) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self._loop = loop + try: + loop.run_until_complete(self.start_async()) + finally: + # On success start_async() blocks for the connection's lifetime, so + # this only runs when it exits early (e.g. connect failed with no + # reconnect) — close the loop we created so repeated start() calls in + # the same thread don't leak event loops. + loop.close() + + async def start_async(self) -> None: + """Start the client on the caller's running event loop (awaitable). + + Async counterpart of :meth:`start`. Await it from within a running loop; + connection, reconnect and event dispatch behave identically to the + synchronous path — it simply runs on the caller's loop instead of owning + its own. It blocks (never returns) for the lifetime of the connection, + so run it as a task if startup must continue:: + + asyncio.create_task(client.start_async()) + """ + self._loop = asyncio.get_running_loop() + try: + await self._connect() except ClientException as e: logger.error(self._fmt_log("connect failed, err: {}", e)) raise e except Exception as e: logger.error(self._fmt_log("connect failed, err: {}", e)) - loop.run_until_complete(self._disconnect()) + await self._disconnect() if self._auto_reconnect: - loop.run_until_complete(self._reconnect()) + await self._reconnect() else: raise e - loop.create_task(self._ping_loop()) - loop.run_until_complete(_select()) + asyncio.get_running_loop().create_task(self._ping_loop()) + await _select() async def _ping_loop(self): while True: @@ -208,7 +250,7 @@ async def _connect(self) -> None: self._service_id = service_id logger.info(self._fmt_log("connected to {}", conn_url)) - loop.create_task(self._receive_message_loop()) + asyncio.get_running_loop().create_task(self._receive_message_loop()) except InvalidHandshake as e: _parse_ws_conn_exception(e) finally: @@ -220,7 +262,7 @@ async def _receive_message_loop(self): if self._conn is None: raise ConnectionClosedException("connection is closed") msg = await self._conn.recv() - loop.create_task(self._handle_message(msg)) + asyncio.get_running_loop().create_task(self._handle_message(msg)) except Exception as e: logger.error(self._fmt_log("receive message loop exit, err: {}", e)) await self._disconnect() diff --git a/lark_oapi/ws/tests/test_async_start.py b/lark_oapi/ws/tests/test_async_start.py new file mode 100644 index 00000000..cfb2bd3c --- /dev/null +++ b/lark_oapi/ws/tests/test_async_start.py @@ -0,0 +1,139 @@ +import asyncio + +import pytest + +from lark_oapi.ws import client as ws_client +from lark_oapi.ws.exception import ClientException + + +class _FakeConn: + async def close(self): + pass + + async def recv(self): + # Never used directly by these tests; present so the fake looks like a + # real connection to any code path that pokes at it. + await asyncio.sleep(3600) + + +def test_no_module_level_event_loop_global(): + # Importing the client must not create or capture a process-wide loop at + # import time. A lingering module global is the root cause behind #133. + assert not hasattr(ws_client, "loop") + + +async def test_start_async_runs_on_caller_loop(monkeypatch): + client = ws_client.Client("app_id", "app_secret") + captured = {} + + async def fake_connect(): + captured["connect_loop"] = asyncio.get_running_loop() + client._conn = _FakeConn() + + async def fake_select(): + # Stand in for the long-lived idle wait so start_async does not block + # the test forever. + return + + async def fake_ping_loop(): + captured["ping_loop"] = asyncio.get_running_loop() + + monkeypatch.setattr(client, "_connect", fake_connect) + monkeypatch.setattr(ws_client, "_select", fake_select) + monkeypatch.setattr(client, "_ping_loop", fake_ping_loop) + + caller = asyncio.get_running_loop() + await client.start_async() + # Give the scheduled ping task a chance to run on the caller loop. + await asyncio.sleep(0) + + assert captured["connect_loop"] is caller + assert captured["ping_loop"] is caller + assert client._conn is not None + + +def test_sync_start_delegates_to_start_async(monkeypatch): + # Synchronous test with NO running loop, modelling a plain worker thread / + # script (e.g. channel.py's executor worker). start() must build/acquire a + # loop of its own and run start_async() to completion. + client = ws_client.Client("app_id", "app_secret") + ran = {} + + async def fake_start_async(): + ran["called"] = True + ran["loop"] = asyncio.get_running_loop() + + monkeypatch.setattr(client, "start_async", fake_start_async) + + try: + client.start() + + assert ran["called"] is True + assert ran["loop"] is not None + finally: + # Avoid leaking a loop into subsequent tests in this process. + asyncio.set_event_loop(None) + + +async def test_sync_start_in_running_loop_raises_clear_error(): + # The test body itself runs inside a live loop, modelling a mistaken sync + # start() call from within an async framework (#96). The error must point + # the user at start_async(), not the opaque native message. + client = ws_client.Client("app_id", "app_secret") + + with pytest.raises(RuntimeError) as err: + client.start() + + assert "start_async" in str(err.value) + + +async def test_start_async_propagates_client_exception(monkeypatch): + client = ws_client.Client("app_id", "app_secret") + rc = {"n": 0} + + async def fake_connect(): + raise ClientException(403, "forbidden") + + async def fake_reconnect(): + rc["n"] += 1 + + monkeypatch.setattr(client, "_connect", fake_connect) + monkeypatch.setattr(client, "_reconnect", fake_reconnect) + + with pytest.raises(ClientException): + await client.start_async() + + # Credential/auth failures bubble up untouched and never trigger reconnect. + assert rc["n"] == 0 + + +async def test_start_async_generic_error_triggers_reconnect(monkeypatch): + client = ws_client.Client("app_id", "app_secret", auto_reconnect=True) + rc = {"n": 0} + + async def fake_connect(): + raise RuntimeError("net") + + async def fake_disconnect(): + return + + async def fake_reconnect(): + rc["n"] += 1 + + async def fake_select(): + return + + async def fake_ping_loop(): + return + + monkeypatch.setattr(client, "_connect", fake_connect) + monkeypatch.setattr(client, "_disconnect", fake_disconnect) + monkeypatch.setattr(client, "_reconnect", fake_reconnect) + monkeypatch.setattr(ws_client, "_select", fake_select) + monkeypatch.setattr(client, "_ping_loop", fake_ping_loop) + + await client.start_async() + + # A generic (non-ClientException) failure under auto_reconnect must run the + # disconnect + reconnect path exactly once. + assert rc["n"] == 1 diff --git a/lark_oapi/ws/tests/test_websockets_compat.py b/lark_oapi/ws/tests/test_websockets_compat.py index 538d1ee9..3fc35e12 100644 --- a/lark_oapi/ws/tests/test_websockets_compat.py +++ b/lark_oapi/ws/tests/test_websockets_compat.py @@ -17,6 +17,12 @@ async def close(self): pass +async def _noop_receive_loop(): + # _connect() schedules the receive loop on the running loop via + # create_task; these tests only assert connect kwargs, so neutralize it. + return + + def test_parse_ws_connection_exception_reads_new_invalid_status_response_headers(): exc = RuntimeError("handshake failed") exc.response = SimpleNamespace( @@ -98,11 +104,7 @@ async def fake_connect(uri, *, proxy=True): lambda: "ws://example.test/callback?device_id=device&service_id=42", ) monkeypatch.setattr(ws_client.websockets, "connect", fake_connect) - monkeypatch.setattr( - ws_client.loop, - "create_task", - lambda coro: coro.close() if hasattr(coro, "close") else None, - ) + monkeypatch.setattr(client, "_receive_message_loop", _noop_receive_loop) await client._connect() await client._disconnect() @@ -128,11 +130,7 @@ async def fake_connect(uri): lambda: "ws://example.test/callback?device_id=device&service_id=42", ) monkeypatch.setattr(ws_client.websockets, "connect", fake_connect) - monkeypatch.setattr( - ws_client.loop, - "create_task", - lambda coro: coro.close() if hasattr(coro, "close") else None, - ) + monkeypatch.setattr(client, "_receive_message_loop", _noop_receive_loop) await client._connect() await client._disconnect() diff --git a/samples/ws/async_sample.py b/samples/ws/async_sample.py new file mode 100644 index 00000000..fe1275e0 --- /dev/null +++ b/samples/ws/async_sample.py @@ -0,0 +1,63 @@ +import asyncio + +import lark_oapi as lark +from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTrigger, P2CardActionTriggerResponse +from lark_oapi.event.callback.model.p2_url_preview_get import P2URLPreviewGet, P2URLPreviewGetResponse + + +def do_p2_im_message_receive_v1(data: lark.im.v1.P2ImMessageReceiveV1) -> None: + print(f'[ do_p2_im_message_receive_v1 access ], data: {lark.JSON.marshal(data, indent=4)}') + + +def do_message_event(data: lark.CustomizedEvent) -> None: + print(f'[ do_customized_event access ], type: message, data: {lark.JSON.marshal(data, indent=4)}') + + +# 新版卡片回调,卡片回传交互 card.action.trigger +def do_card_action_trigger(data: P2CardActionTrigger) -> P2CardActionTriggerResponse: + print(lark.JSON.marshal(data)) + resp = { + "toast": { + "type": "info", + "content": "卡片回传成功 from python sdk" + } + } + return P2CardActionTriggerResponse(resp) + + +def do_url_preview_get(data: P2URLPreviewGet) -> P2URLPreviewGetResponse: + print(lark.JSON.marshal(data)) + resp = { + "inline": { + "title": "链接预览测试", + } + } + return P2URLPreviewGetResponse(resp) + + +event_handler = lark.EventDispatcherHandler.builder("", "") \ + .register_p2_im_message_receive_v1(do_p2_im_message_receive_v1) \ + .register_p2_card_action_trigger(do_card_action_trigger) \ + .register_p2_url_preview_get(do_url_preview_get) \ + .register_p1_customized_event("这里填入你要自定义订阅的 event 的 key,例如 out_approval", do_message_event) \ + .build() + + +# 异步入口:在已有运行中事件循环的框架里,用 await client.start_async() 原生启动, +# 不再需要为 sync 的 client.start() 单开线程,也不会触发 +# "RuntimeError: This event loop is already running"。 +async def main(): + cli = lark.ws.Client(lark.APP_ID, lark.APP_SECRET, + event_handler=event_handler, log_level=lark.LogLevel.DEBUG) + # start_async() 会一直运行(连接 + 收发 + ping),随调用方的事件循环存活。 + await cli.start_async() + + +# 集成到异步 Web 框架时(如 FastAPI),在 startup 钩子里后台跑,不阻塞对外服务: +# +# @app.on_event("startup") +# async def _startup(): +# cli = lark.ws.Client(lark.APP_ID, lark.APP_SECRET, event_handler=event_handler) +# asyncio.create_task(cli.start_async()) +if __name__ == "__main__": + asyncio.run(main())