Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions lark_oapi/channel/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,9 +853,7 @@ def stop(self, *, join_timeout: float = 5.0) -> None:
def _stop_private_ws_client(self, ws: Any) -> None:
disconnect = getattr(ws, "_disconnect", None)
try:
from lark_oapi.ws import client as ws_client_module

ws_loop = getattr(ws_client_module, "loop", None)
ws_loop = getattr(ws, "_loop", None)
if callable(disconnect) and ws_loop is not None:
if ws_loop.is_running():
try:
Expand Down
2 changes: 1 addition & 1 deletion lark_oapi/core/const.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Info
PROJECT = "oapi-sdk-python"
VERSION = "1.7.0"
VERSION = "1.8.0"

# Domain
FEISHU_DOMAIN = "https://open.feishu.cn"
Expand Down
68 changes: 55 additions & 13 deletions lark_oapi/ws/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@
from lark_oapi.ws.pb.google.protobuf.internal.containers import RepeatedCompositeFieldContainer
from lark_oapi.ws.pb.pbbp2_pb2 import Frame

try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)


def _get_by_key(headers: RepeatedCompositeFieldContainer, key: str) -> str:
for header in headers:
Expand Down Expand Up @@ -139,6 +133,11 @@ def __init__(self,
# modules (e.g. FeishuChannel) pass ``["channel"]`` here.
self._user_agent: str = build_user_agent(source=source, extra_tags=extra_ua_tags)
self._conn: Optional[websockets.WebSocketClientProtocol] = None
# Event loop this client is driven on. Set when start()/start_async()
# begins; higher-level wrappers (e.g. FeishuChannel) read it to stop the
# connection loop. Replaces the former module-level ``loop`` global so
# nothing is captured at import time (see issues #96 / #133).
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._conn_url: str = ""
self._service_id: str = ""
self._conn_id: str = ""
Expand All @@ -162,21 +161,64 @@ def __init__(self,
logger.setLevel(log_level.value)

def start(self) -> None:
"""Start the client (blocking) on a freshly created event loop.

Backward-compatible entry point for synchronous callers (plain scripts,
or ``FeishuChannel`` driving this client from a worker thread). When an
event loop is already running in the current thread — e.g. inside an
async framework such as FastAPI or aiohttp — this client cannot own the
loop; callers must ``await client.start_async()`` instead, so we raise a
clear, actionable error rather than the cryptic native asyncio message.
"""
try:
loop.run_until_complete(self._connect())
asyncio.get_running_loop()
except RuntimeError:
pass # no running loop in this thread — safe to own one
else:
raise RuntimeError(
"ws.Client.start() cannot be called while an event loop is "
"running in the current thread; use 'await client.start_async()' "
"instead (e.g. inside FastAPI/aiohttp or any async framework)."
)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._loop = loop
try:
loop.run_until_complete(self.start_async())
finally:
# On success start_async() blocks for the connection's lifetime, so
# this only runs when it exits early (e.g. connect failed with no
# reconnect) — close the loop we created so repeated start() calls in
# the same thread don't leak event loops.
loop.close()

async def start_async(self) -> None:
"""Start the client on the caller's running event loop (awaitable).

Async counterpart of :meth:`start`. Await it from within a running loop;
connection, reconnect and event dispatch behave identically to the
synchronous path — it simply runs on the caller's loop instead of owning
its own. It blocks (never returns) for the lifetime of the connection,
so run it as a task if startup must continue::

asyncio.create_task(client.start_async())
"""
self._loop = asyncio.get_running_loop()
try:
await self._connect()
except ClientException as e:
logger.error(self._fmt_log("connect failed, err: {}", e))
raise e
except Exception as e:
logger.error(self._fmt_log("connect failed, err: {}", e))
loop.run_until_complete(self._disconnect())
await self._disconnect()
if self._auto_reconnect:
loop.run_until_complete(self._reconnect())
await self._reconnect()
else:
raise e

loop.create_task(self._ping_loop())
loop.run_until_complete(_select())
asyncio.get_running_loop().create_task(self._ping_loop())
await _select()

async def _ping_loop(self):
while True:
Expand Down Expand Up @@ -208,7 +250,7 @@ async def _connect(self) -> None:
self._service_id = service_id

logger.info(self._fmt_log("connected to {}", conn_url))
loop.create_task(self._receive_message_loop())
asyncio.get_running_loop().create_task(self._receive_message_loop())
except InvalidHandshake as e:
_parse_ws_conn_exception(e)
finally:
Expand All @@ -220,7 +262,7 @@ async def _receive_message_loop(self):
if self._conn is None:
raise ConnectionClosedException("connection is closed")
msg = await self._conn.recv()
loop.create_task(self._handle_message(msg))
asyncio.get_running_loop().create_task(self._handle_message(msg))
except Exception as e:
logger.error(self._fmt_log("receive message loop exit, err: {}", e))
await self._disconnect()
Expand Down
139 changes: 139 additions & 0 deletions lark_oapi/ws/tests/test_async_start.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import asyncio

import pytest

from lark_oapi.ws import client as ws_client
from lark_oapi.ws.exception import ClientException


class _FakeConn:
async def close(self):
pass

async def recv(self):
# Never used directly by these tests; present so the fake looks like a
# real connection to any code path that pokes at it.
await asyncio.sleep(3600)


def test_no_module_level_event_loop_global():
# Importing the client must not create or capture a process-wide loop at
# import time. A lingering module global is the root cause behind #133.
assert not hasattr(ws_client, "loop")


async def test_start_async_runs_on_caller_loop(monkeypatch):
client = ws_client.Client("app_id", "app_secret")
captured = {}

async def fake_connect():
captured["connect_loop"] = asyncio.get_running_loop()
client._conn = _FakeConn()

async def fake_select():
# Stand in for the long-lived idle wait so start_async does not block
# the test forever.
return

async def fake_ping_loop():
captured["ping_loop"] = asyncio.get_running_loop()

monkeypatch.setattr(client, "_connect", fake_connect)
monkeypatch.setattr(ws_client, "_select", fake_select)
monkeypatch.setattr(client, "_ping_loop", fake_ping_loop)

caller = asyncio.get_running_loop()
await client.start_async()
# Give the scheduled ping task a chance to run on the caller loop.
await asyncio.sleep(0)

assert captured["connect_loop"] is caller
assert captured["ping_loop"] is caller
assert client._conn is not None


def test_sync_start_delegates_to_start_async(monkeypatch):
# Synchronous test with NO running loop, modelling a plain worker thread /
# script (e.g. channel.py's executor worker). start() must build/acquire a
# loop of its own and run start_async() to completion.
client = ws_client.Client("app_id", "app_secret")
ran = {}

async def fake_start_async():
ran["called"] = True
ran["loop"] = asyncio.get_running_loop()

monkeypatch.setattr(client, "start_async", fake_start_async)

try:
client.start()

assert ran["called"] is True
assert ran["loop"] is not None
finally:
# Avoid leaking a loop into subsequent tests in this process.
asyncio.set_event_loop(None)


async def test_sync_start_in_running_loop_raises_clear_error():
# The test body itself runs inside a live loop, modelling a mistaken sync
# start() call from within an async framework (#96). The error must point
# the user at start_async(), not the opaque native message.
client = ws_client.Client("app_id", "app_secret")

with pytest.raises(RuntimeError) as err:
client.start()

assert "start_async" in str(err.value)


async def test_start_async_propagates_client_exception(monkeypatch):
client = ws_client.Client("app_id", "app_secret")
rc = {"n": 0}

async def fake_connect():
raise ClientException(403, "forbidden")

async def fake_reconnect():
rc["n"] += 1

monkeypatch.setattr(client, "_connect", fake_connect)
monkeypatch.setattr(client, "_reconnect", fake_reconnect)

with pytest.raises(ClientException):
await client.start_async()

# Credential/auth failures bubble up untouched and never trigger reconnect.
assert rc["n"] == 0


async def test_start_async_generic_error_triggers_reconnect(monkeypatch):
client = ws_client.Client("app_id", "app_secret", auto_reconnect=True)
rc = {"n": 0}

async def fake_connect():
raise RuntimeError("net")

async def fake_disconnect():
return

async def fake_reconnect():
rc["n"] += 1

async def fake_select():
return

async def fake_ping_loop():
return

monkeypatch.setattr(client, "_connect", fake_connect)
monkeypatch.setattr(client, "_disconnect", fake_disconnect)
monkeypatch.setattr(client, "_reconnect", fake_reconnect)
monkeypatch.setattr(ws_client, "_select", fake_select)
monkeypatch.setattr(client, "_ping_loop", fake_ping_loop)

await client.start_async()

# A generic (non-ClientException) failure under auto_reconnect must run the
# disconnect + reconnect path exactly once.
assert rc["n"] == 1
18 changes: 8 additions & 10 deletions lark_oapi/ws/tests/test_websockets_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ async def close(self):
pass


async def _noop_receive_loop():
# _connect() schedules the receive loop on the running loop via
# create_task; these tests only assert connect kwargs, so neutralize it.
return


def test_parse_ws_connection_exception_reads_new_invalid_status_response_headers():
exc = RuntimeError("handshake failed")
exc.response = SimpleNamespace(
Expand Down Expand Up @@ -98,11 +104,7 @@ async def fake_connect(uri, *, proxy=True):
lambda: "ws://example.test/callback?device_id=device&service_id=42",
)
monkeypatch.setattr(ws_client.websockets, "connect", fake_connect)
monkeypatch.setattr(
ws_client.loop,
"create_task",
lambda coro: coro.close() if hasattr(coro, "close") else None,
)
monkeypatch.setattr(client, "_receive_message_loop", _noop_receive_loop)

await client._connect()
await client._disconnect()
Expand All @@ -128,11 +130,7 @@ async def fake_connect(uri):
lambda: "ws://example.test/callback?device_id=device&service_id=42",
)
monkeypatch.setattr(ws_client.websockets, "connect", fake_connect)
monkeypatch.setattr(
ws_client.loop,
"create_task",
lambda coro: coro.close() if hasattr(coro, "close") else None,
)
monkeypatch.setattr(client, "_receive_message_loop", _noop_receive_loop)

await client._connect()
await client._disconnect()
Expand Down
63 changes: 63 additions & 0 deletions samples/ws/async_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import asyncio

import lark_oapi as lark
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTrigger, P2CardActionTriggerResponse
from lark_oapi.event.callback.model.p2_url_preview_get import P2URLPreviewGet, P2URLPreviewGetResponse


def do_p2_im_message_receive_v1(data: lark.im.v1.P2ImMessageReceiveV1) -> None:
print(f'[ do_p2_im_message_receive_v1 access ], data: {lark.JSON.marshal(data, indent=4)}')


def do_message_event(data: lark.CustomizedEvent) -> None:
print(f'[ do_customized_event access ], type: message, data: {lark.JSON.marshal(data, indent=4)}')


# 新版卡片回调,卡片回传交互 card.action.trigger
def do_card_action_trigger(data: P2CardActionTrigger) -> P2CardActionTriggerResponse:
print(lark.JSON.marshal(data))
resp = {
"toast": {
"type": "info",
"content": "卡片回传成功 from python sdk"
}
}
return P2CardActionTriggerResponse(resp)


def do_url_preview_get(data: P2URLPreviewGet) -> P2URLPreviewGetResponse:
print(lark.JSON.marshal(data))
resp = {
"inline": {
"title": "链接预览测试",
}
}
return P2URLPreviewGetResponse(resp)


event_handler = lark.EventDispatcherHandler.builder("", "") \
.register_p2_im_message_receive_v1(do_p2_im_message_receive_v1) \
.register_p2_card_action_trigger(do_card_action_trigger) \
.register_p2_url_preview_get(do_url_preview_get) \
.register_p1_customized_event("这里填入你要自定义订阅的 event 的 key,例如 out_approval", do_message_event) \
.build()


# 异步入口:在已有运行中事件循环的框架里,用 await client.start_async() 原生启动,
# 不再需要为 sync 的 client.start() 单开线程,也不会触发
# "RuntimeError: This event loop is already running"。
async def main():
cli = lark.ws.Client(lark.APP_ID, lark.APP_SECRET,
event_handler=event_handler, log_level=lark.LogLevel.DEBUG)
# start_async() 会一直运行(连接 + 收发 + ping),随调用方的事件循环存活。
await cli.start_async()


# 集成到异步 Web 框架时(如 FastAPI),在 startup 钩子里后台跑,不阻塞对外服务:
#
# @app.on_event("startup")
# async def _startup():
# cli = lark.ws.Client(lark.APP_ID, lark.APP_SECRET, event_handler=event_handler)
# asyncio.create_task(cli.start_async())
if __name__ == "__main__":
asyncio.run(main())