diff --git a/README.md b/README.md index f71f659..7b7622d 100644 --- a/README.md +++ b/README.md @@ -107,4 +107,4 @@ Open http://localhost:3000 --- -MIT - DIYA73 - built with heart. +MIT - DIYA73 🤖 diff --git a/TEST.md b/TEST.md new file mode 100644 index 0000000..83c831f --- /dev/null +++ b/TEST.md @@ -0,0 +1 @@ +# test diff --git a/apps/api/src/executions/executions.controller.ts b/apps/api/src/executions/executions.controller.ts index 8f5d9ab..512c3e8 100644 --- a/apps/api/src/executions/executions.controller.ts +++ b/apps/api/src/executions/executions.controller.ts @@ -1,5 +1,10 @@ -import { Controller, Get, Param, UseGuards } from '@nestjs/common'; +import { + Controller, Get, Param, UseGuards, + Sse, Res, NotFoundException, +} from '@nestjs/common'; import { ApiTags, ApiBearerAuth } from '@nestjs/swagger'; +import { OnEvent } from '@nestjs/event-emitter'; +import { Response } from 'express'; import { ExecutionsService } from './executions.service'; import { JwtAuthGuard } from '../common/guards/jwt-auth.guard'; import { CurrentWorkspaceId } from '../common/decorators/current-user.decorator'; @@ -20,4 +25,64 @@ export class ExecutionsController { findOne(@Param('id') id: string) { return this.executionsService.findOne(id); } + + @Get(':id/stream') + async stream( + @Param('id') id: string, + @Res() res: Response, + ): Promise { + const execution = await this.executionsService.findOne(id); + if (!execution) throw new NotFoundException(`Execution ${id} not found`); + + // SSE headers + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('Access-Control-Allow-Origin', '*'); + res.flushHeaders(); + + const send = (event: string, data: unknown) => { + res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); + }; + + // Send existing logs immediately + if (execution.logs?.length) { + for (const log of execution.logs) { + send('log', { ...(log as Record), executionId: id }); + } + } + + // Send current status + send('status', { executionId: id, status: execution.status }); + + // Store listeners so we can remove them on close + const onLog = (payload: Record) => { + if (payload['executionId'] === id) send('log', payload); + }; + + const onStatus = (payload: Record) => { + if (payload['executionId'] === id) { + send('status', payload); + const terminal = ['SUCCESS', 'FAILED', 'CANCELLED']; + if (terminal.includes(String(payload['status']))) { + send('done', { executionId: id }); + res.end(); + } + } + }; + + this.executionsService.eventEmitter.on('execution.log', onLog); + this.executionsService.eventEmitter.on('execution.status', onStatus); + + // Heartbeat every 15s to keep connection alive + const heartbeat = setInterval(() => { + res.write(': heartbeat\n\n'); + }, 15_000); + + res.on('close', () => { + clearInterval(heartbeat); + this.executionsService.eventEmitter.off('execution.log', onLog); + this.executionsService.eventEmitter.off('execution.status', onStatus); + }); + } } diff --git a/apps/api/src/executions/executions.service.ts b/apps/api/src/executions/executions.service.ts index a81b4a4..ec930ef 100644 --- a/apps/api/src/executions/executions.service.ts +++ b/apps/api/src/executions/executions.service.ts @@ -30,7 +30,7 @@ export class ExecutionsService { private readonly executionQueue: Queue, @InjectRepository(Execution) private readonly executionRepo: Repository, - private readonly eventEmitter: EventEmitter2, + readonly eventEmitter: EventEmitter2, ) {} async enqueue(flow: Flow, input?: Record): Promise { @@ -84,3 +84,4 @@ export class ExecutionsService { return this.executionRepo.findOne({ where: { id } }); } } + diff --git a/apps/web/src/components/executions/ExecutionStream.tsx b/apps/web/src/components/executions/ExecutionStream.tsx new file mode 100644 index 0000000..5e747b3 --- /dev/null +++ b/apps/web/src/components/executions/ExecutionStream.tsx @@ -0,0 +1,79 @@ +'use client'; + +import { useExecutionStream } from '@/hooks/useExecutionStream'; +import { useAuth } from '@/hooks/useAuth'; + +interface Props { + executionId: string; +} + +const levelColor: Record = { + info: 'text-green-400', + warn: 'text-yellow-400', + error: 'text-red-400', +}; + +const statusColor: Record = { + QUEUED: 'text-gray-400', + RUNNING: 'text-blue-400', + SUCCESS: 'text-green-400', + FAILED: 'text-red-400', + CANCELLED: 'text-yellow-400', +}; + +export function ExecutionStream({ executionId }: Props) { + const { token } = useAuth(); + const { logs, status, done, connected } = useExecutionStream({ + executionId, + token, + }); + + return ( +
+ {/* Header */} +
+ + execution/{executionId.slice(0, 8)} + +
+ {connected && ( + + + streaming + + )} + {status && ( + + {status} + + )} +
+
+ + {/* Log output */} +
+ {logs.length === 0 && !done && ( + Waiting for output... + )} + {logs.map((log, i) => ( +
+ + {new Date(log.timestamp).toLocaleTimeString()} + + + {log.nodeId} + + + {log.message} + +
+ ))} + {done && ( +
+ — execution complete — +
+ )} +
+
+ ); +} diff --git a/apps/web/src/hooks/useExecutionStream.ts b/apps/web/src/hooks/useExecutionStream.ts new file mode 100644 index 0000000..a24476e --- /dev/null +++ b/apps/web/src/hooks/useExecutionStream.ts @@ -0,0 +1,85 @@ +import { useEffect, useRef, useState } from 'react'; + +export type LogLevel = 'info' | 'warn' | 'error'; + +export interface ExecutionLog { + executionId: string; + nodeId: string; + message: string; + level: LogLevel; + timestamp: string; +} + +export interface ExecutionStatus { + executionId: string; + status: 'QUEUED' | 'RUNNING' | 'SUCCESS' | 'FAILED' | 'CANCELLED'; +} + +interface UseExecutionStreamOptions { + executionId: string | null; + apiBase?: string; + token: string | null; +} + +interface UseExecutionStreamResult { + logs: ExecutionLog[]; + status: ExecutionStatus['status'] | null; + done: boolean; + connected: boolean; +} + +export function useExecutionStream({ + executionId, + apiBase = process.env.NEXT_PUBLIC_API_URL ?? 'http://localhost:3001', + token, +}: UseExecutionStreamOptions): UseExecutionStreamResult { + const [logs, setLogs] = useState([]); + const [status, setStatus] = useState(null); + const [done, setDone] = useState(false); + const [connected, setConnected] = useState(false); + const esRef = useRef(null); + + useEffect(() => { + if (!executionId || !token) return; + + setLogs([]); + setStatus(null); + setDone(false); + setConnected(false); + + // EventSource doesn't support headers — pass token as query param + const url = `${apiBase}/executions/${executionId}/stream?token=${encodeURIComponent(token)}`; + const es = new EventSource(url); + esRef.current = es; + + es.addEventListener('open', () => setConnected(true)); + + es.addEventListener('log', (e: MessageEvent) => { + const log = JSON.parse(e.data) as ExecutionLog; + setLogs((prev) => [...prev, log]); + }); + + es.addEventListener('status', (e: MessageEvent) => { + const payload = JSON.parse(e.data) as ExecutionStatus; + setStatus(payload.status); + }); + + es.addEventListener('done', () => { + setDone(true); + setConnected(false); + es.close(); + }); + + es.addEventListener('error', () => { + setConnected(false); + es.close(); + }); + + return () => { + es.close(); + esRef.current = null; + }; + }, [executionId, token, apiBase]); + + return { logs, status, done, connected }; +}