From 60f0f0ef5e32f334e5ad0586b7f3d10abd6e8c93 Mon Sep 17 00:00:00 2001 From: didi Date: Mon, 1 Jun 2026 22:11:50 +0300 Subject: [PATCH 1/4] test: trigger codewatch review --- TEST.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 TEST.md diff --git a/TEST.md b/TEST.md new file mode 100644 index 0000000..83c831f --- /dev/null +++ b/TEST.md @@ -0,0 +1 @@ +# test From 8c7153a73c44eb38c80979eae2879828f8b9eeb4 Mon Sep 17 00:00:00 2001 From: didi Date: Sun, 31 May 2026 15:44:59 +0300 Subject: [PATCH 2/4] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f71f659..524c710 100644 --- a/README.md +++ b/README.md @@ -107,4 +107,4 @@ Open http://localhost:3000 --- -MIT - DIYA73 - built with heart. +MIT - DIYA73 From 2ea8f354a4cb4c7fd7a1b967d01f01fb64bacb43 Mon Sep 17 00:00:00 2001 From: didi Date: Wed, 3 Jun 2026 21:35:16 +0300 Subject: [PATCH 3/4] Update license line with emoji --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 524c710..7b7622d 100644 --- a/README.md +++ b/README.md @@ -107,4 +107,4 @@ Open http://localhost:3000 --- -MIT - DIYA73 +MIT - DIYA73 🤖 From 95cc106155812a75d4662bf03aea530584f6e38d Mon Sep 17 00:00:00 2001 From: didi Date: Sun, 14 Jun 2026 17:43:36 +0300 Subject: [PATCH 4/4] =?UTF-8?q?feat:=20SSE=20streaming=20for=20execution?= =?UTF-8?q?=20logs=20=E2=80=94=20GET=20/executions/:id/stream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/executions/executions.controller.ts | 67 ++++++++++++++- apps/api/src/executions/executions.service.ts | 3 +- .../components/executions/ExecutionStream.tsx | 79 +++++++++++++++++ apps/web/src/hooks/useExecutionStream.ts | 85 +++++++++++++++++++ 4 files changed, 232 insertions(+), 2 deletions(-) create mode 100644 apps/web/src/components/executions/ExecutionStream.tsx create mode 100644 apps/web/src/hooks/useExecutionStream.ts diff --git a/apps/api/src/executions/executions.controller.ts b/apps/api/src/executions/executions.controller.ts index 8f5d9ab..512c3e8 100644 --- a/apps/api/src/executions/executions.controller.ts +++ b/apps/api/src/executions/executions.controller.ts @@ -1,5 +1,10 @@ -import { Controller, Get, Param, UseGuards } from '@nestjs/common'; +import { + Controller, Get, Param, UseGuards, + Sse, Res, NotFoundException, +} from '@nestjs/common'; import { ApiTags, ApiBearerAuth } from '@nestjs/swagger'; +import { OnEvent } from '@nestjs/event-emitter'; +import { Response } from 'express'; import { ExecutionsService } from './executions.service'; import { JwtAuthGuard } from '../common/guards/jwt-auth.guard'; import { CurrentWorkspaceId } from '../common/decorators/current-user.decorator'; @@ -20,4 +25,64 @@ export class ExecutionsController { findOne(@Param('id') id: string) { return this.executionsService.findOne(id); } + + @Get(':id/stream') + async stream( + @Param('id') id: string, + @Res() res: Response, + ): Promise { + const execution = await this.executionsService.findOne(id); + if (!execution) throw new NotFoundException(`Execution ${id} not found`); + + // SSE headers + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('Access-Control-Allow-Origin', '*'); + res.flushHeaders(); + + const send = (event: string, data: unknown) => { + res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); + }; + + // Send existing logs immediately + if (execution.logs?.length) { + for (const log of execution.logs) { + send('log', { ...(log as Record), executionId: id }); + } + } + + // Send current status + send('status', { executionId: id, status: execution.status }); + + // Store listeners so we can remove them on close + const onLog = (payload: Record) => { + if (payload['executionId'] === id) send('log', payload); + }; + + const onStatus = (payload: Record) => { + if (payload['executionId'] === id) { + send('status', payload); + const terminal = ['SUCCESS', 'FAILED', 'CANCELLED']; + if (terminal.includes(String(payload['status']))) { + send('done', { executionId: id }); + res.end(); + } + } + }; + + this.executionsService.eventEmitter.on('execution.log', onLog); + this.executionsService.eventEmitter.on('execution.status', onStatus); + + // Heartbeat every 15s to keep connection alive + const heartbeat = setInterval(() => { + res.write(': heartbeat\n\n'); + }, 15_000); + + res.on('close', () => { + clearInterval(heartbeat); + this.executionsService.eventEmitter.off('execution.log', onLog); + this.executionsService.eventEmitter.off('execution.status', onStatus); + }); + } } diff --git a/apps/api/src/executions/executions.service.ts b/apps/api/src/executions/executions.service.ts index a81b4a4..ec930ef 100644 --- a/apps/api/src/executions/executions.service.ts +++ b/apps/api/src/executions/executions.service.ts @@ -30,7 +30,7 @@ export class ExecutionsService { private readonly executionQueue: Queue, @InjectRepository(Execution) private readonly executionRepo: Repository, - private readonly eventEmitter: EventEmitter2, + readonly eventEmitter: EventEmitter2, ) {} async enqueue(flow: Flow, input?: Record): Promise { @@ -84,3 +84,4 @@ export class ExecutionsService { return this.executionRepo.findOne({ where: { id } }); } } + diff --git a/apps/web/src/components/executions/ExecutionStream.tsx b/apps/web/src/components/executions/ExecutionStream.tsx new file mode 100644 index 0000000..5e747b3 --- /dev/null +++ b/apps/web/src/components/executions/ExecutionStream.tsx @@ -0,0 +1,79 @@ +'use client'; + +import { useExecutionStream } from '@/hooks/useExecutionStream'; +import { useAuth } from '@/hooks/useAuth'; + +interface Props { + executionId: string; +} + +const levelColor: Record = { + info: 'text-green-400', + warn: 'text-yellow-400', + error: 'text-red-400', +}; + +const statusColor: Record = { + QUEUED: 'text-gray-400', + RUNNING: 'text-blue-400', + SUCCESS: 'text-green-400', + FAILED: 'text-red-400', + CANCELLED: 'text-yellow-400', +}; + +export function ExecutionStream({ executionId }: Props) { + const { token } = useAuth(); + const { logs, status, done, connected } = useExecutionStream({ + executionId, + token, + }); + + return ( +
+ {/* Header */} +
+ + execution/{executionId.slice(0, 8)} + +
+ {connected && ( + + + streaming + + )} + {status && ( + + {status} + + )} +
+
+ + {/* Log output */} +
+ {logs.length === 0 && !done && ( + Waiting for output... + )} + {logs.map((log, i) => ( +
+ + {new Date(log.timestamp).toLocaleTimeString()} + + + {log.nodeId} + + + {log.message} + +
+ ))} + {done && ( +
+ — execution complete — +
+ )} +
+
+ ); +} diff --git a/apps/web/src/hooks/useExecutionStream.ts b/apps/web/src/hooks/useExecutionStream.ts new file mode 100644 index 0000000..a24476e --- /dev/null +++ b/apps/web/src/hooks/useExecutionStream.ts @@ -0,0 +1,85 @@ +import { useEffect, useRef, useState } from 'react'; + +export type LogLevel = 'info' | 'warn' | 'error'; + +export interface ExecutionLog { + executionId: string; + nodeId: string; + message: string; + level: LogLevel; + timestamp: string; +} + +export interface ExecutionStatus { + executionId: string; + status: 'QUEUED' | 'RUNNING' | 'SUCCESS' | 'FAILED' | 'CANCELLED'; +} + +interface UseExecutionStreamOptions { + executionId: string | null; + apiBase?: string; + token: string | null; +} + +interface UseExecutionStreamResult { + logs: ExecutionLog[]; + status: ExecutionStatus['status'] | null; + done: boolean; + connected: boolean; +} + +export function useExecutionStream({ + executionId, + apiBase = process.env.NEXT_PUBLIC_API_URL ?? 'http://localhost:3001', + token, +}: UseExecutionStreamOptions): UseExecutionStreamResult { + const [logs, setLogs] = useState([]); + const [status, setStatus] = useState(null); + const [done, setDone] = useState(false); + const [connected, setConnected] = useState(false); + const esRef = useRef(null); + + useEffect(() => { + if (!executionId || !token) return; + + setLogs([]); + setStatus(null); + setDone(false); + setConnected(false); + + // EventSource doesn't support headers — pass token as query param + const url = `${apiBase}/executions/${executionId}/stream?token=${encodeURIComponent(token)}`; + const es = new EventSource(url); + esRef.current = es; + + es.addEventListener('open', () => setConnected(true)); + + es.addEventListener('log', (e: MessageEvent) => { + const log = JSON.parse(e.data) as ExecutionLog; + setLogs((prev) => [...prev, log]); + }); + + es.addEventListener('status', (e: MessageEvent) => { + const payload = JSON.parse(e.data) as ExecutionStatus; + setStatus(payload.status); + }); + + es.addEventListener('done', () => { + setDone(true); + setConnected(false); + es.close(); + }); + + es.addEventListener('error', () => { + setConnected(false); + es.close(); + }); + + return () => { + es.close(); + esRef.current = null; + }; + }, [executionId, token, apiBase]); + + return { logs, status, done, connected }; +}