Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,4 @@ Open http://localhost:3000

---

MIT - DIYA73 - built with heart.
MIT - DIYA73 🤖
1 change: 1 addition & 0 deletions TEST.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# test
67 changes: 66 additions & 1 deletion apps/api/src/executions/executions.controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { Controller, Get, Param, UseGuards } from '@nestjs/common';
import {
Controller, Get, Param, UseGuards,
Sse, Res, NotFoundException,
} from '@nestjs/common';
import { ApiTags, ApiBearerAuth } from '@nestjs/swagger';
import { OnEvent } from '@nestjs/event-emitter';
import { Response } from 'express';
import { ExecutionsService } from './executions.service';
import { JwtAuthGuard } from '../common/guards/jwt-auth.guard';
import { CurrentWorkspaceId } from '../common/decorators/current-user.decorator';
Expand All @@ -20,4 +25,64 @@ export class ExecutionsController {
findOne(@Param('id') id: string) {
return this.executionsService.findOne(id);
}

@Get(':id/stream')
async stream(
@Param('id') id: string,
@Res() res: Response,
): Promise<void> {
const execution = await this.executionsService.findOne(id);
if (!execution) throw new NotFoundException(`Execution ${id} not found`);

// SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
res.flushHeaders();

const send = (event: string, data: unknown) => {
res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
};

// Send existing logs immediately
if (execution.logs?.length) {
for (const log of execution.logs) {
send('log', { ...(log as Record<string, unknown>), executionId: id });
}
}

// Send current status
send('status', { executionId: id, status: execution.status });

// Store listeners so we can remove them on close
const onLog = (payload: Record<string, unknown>) => {
if (payload['executionId'] === id) send('log', payload);
};

const onStatus = (payload: Record<string, unknown>) => {
if (payload['executionId'] === id) {
send('status', payload);
const terminal = ['SUCCESS', 'FAILED', 'CANCELLED'];
if (terminal.includes(String(payload['status']))) {
send('done', { executionId: id });
res.end();
}
}
};

this.executionsService.eventEmitter.on('execution.log', onLog);
this.executionsService.eventEmitter.on('execution.status', onStatus);

// Heartbeat every 15s to keep connection alive
const heartbeat = setInterval(() => {
res.write(': heartbeat\n\n');
}, 15_000);

res.on('close', () => {
clearInterval(heartbeat);
this.executionsService.eventEmitter.off('execution.log', onLog);
this.executionsService.eventEmitter.off('execution.status', onStatus);
});
}
}
3 changes: 2 additions & 1 deletion apps/api/src/executions/executions.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class ExecutionsService {
private readonly executionQueue: Queue,
@InjectRepository(Execution)
private readonly executionRepo: Repository<Execution>,
private readonly eventEmitter: EventEmitter2,
readonly eventEmitter: EventEmitter2,
) {}

async enqueue(flow: Flow, input?: Record<string, unknown>): Promise<Execution> {
Expand Down Expand Up @@ -84,3 +84,4 @@ export class ExecutionsService {
return this.executionRepo.findOne({ where: { id } });
}
}

79 changes: 79 additions & 0 deletions apps/web/src/components/executions/ExecutionStream.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
'use client';

import { useExecutionStream } from '@/hooks/useExecutionStream';
import { useAuth } from '@/hooks/useAuth';

interface Props {
executionId: string;
}

const levelColor: Record<string, string> = {
info: 'text-green-400',
warn: 'text-yellow-400',
error: 'text-red-400',
};

const statusColor: Record<string, string> = {
QUEUED: 'text-gray-400',
RUNNING: 'text-blue-400',
SUCCESS: 'text-green-400',
FAILED: 'text-red-400',
CANCELLED: 'text-yellow-400',
};

export function ExecutionStream({ executionId }: Props) {
const { token } = useAuth();
const { logs, status, done, connected } = useExecutionStream({
executionId,
token,
});

return (
<div className="flex flex-col h-full bg-gray-950 rounded-lg border border-gray-800">
{/* Header */}
<div className="flex items-center justify-between px-4 py-2 border-b border-gray-800">
<span className="text-xs font-mono text-gray-400">
execution/{executionId.slice(0, 8)}
</span>
<div className="flex items-center gap-2">
{connected && (
<span className="flex items-center gap-1 text-xs text-blue-400">
<span className="w-1.5 h-1.5 rounded-full bg-blue-400 animate-pulse" />
streaming
</span>
)}
{status && (
<span className={`text-xs font-medium ${statusColor[status] ?? 'text-gray-400'}`}>
{status}
</span>
)}
</div>
</div>

{/* Log output */}
<div className="flex-1 overflow-y-auto p-4 font-mono text-xs space-y-1">
{logs.length === 0 && !done && (
<span className="text-gray-600">Waiting for output...</span>
)}
{logs.map((log, i) => (
<div key={i} className="flex gap-3">
<span className="text-gray-600 shrink-0">
{new Date(log.timestamp).toLocaleTimeString()}
</span>
<span className="text-gray-500 shrink-0 w-24 truncate">
{log.nodeId}
</span>
<span className={levelColor[log.level] ?? 'text-gray-300'}>
{log.message}
</span>
</div>
))}
{done && (
<div className="pt-2 border-t border-gray-800 text-gray-600">
— execution complete —
</div>
)}
</div>
</div>
);
}
85 changes: 85 additions & 0 deletions apps/web/src/hooks/useExecutionStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { useEffect, useRef, useState } from 'react';

export type LogLevel = 'info' | 'warn' | 'error';

export interface ExecutionLog {
executionId: string;
nodeId: string;
message: string;
level: LogLevel;
timestamp: string;
}

export interface ExecutionStatus {
executionId: string;
status: 'QUEUED' | 'RUNNING' | 'SUCCESS' | 'FAILED' | 'CANCELLED';
}

interface UseExecutionStreamOptions {
executionId: string | null;
apiBase?: string;
token: string | null;
}

interface UseExecutionStreamResult {
logs: ExecutionLog[];
status: ExecutionStatus['status'] | null;
done: boolean;
connected: boolean;
}

export function useExecutionStream({
executionId,
apiBase = process.env.NEXT_PUBLIC_API_URL ?? 'http://localhost:3001',
token,
}: UseExecutionStreamOptions): UseExecutionStreamResult {
const [logs, setLogs] = useState<ExecutionLog[]>([]);
const [status, setStatus] = useState<ExecutionStatus['status'] | null>(null);
const [done, setDone] = useState(false);
const [connected, setConnected] = useState(false);
const esRef = useRef<EventSource | null>(null);

useEffect(() => {
if (!executionId || !token) return;

setLogs([]);
setStatus(null);
setDone(false);
setConnected(false);

// EventSource doesn't support headers — pass token as query param
const url = `${apiBase}/executions/${executionId}/stream?token=${encodeURIComponent(token)}`;
const es = new EventSource(url);
esRef.current = es;

es.addEventListener('open', () => setConnected(true));

es.addEventListener('log', (e: MessageEvent<string>) => {
const log = JSON.parse(e.data) as ExecutionLog;
setLogs((prev) => [...prev, log]);
});

es.addEventListener('status', (e: MessageEvent<string>) => {
const payload = JSON.parse(e.data) as ExecutionStatus;
setStatus(payload.status);
});

es.addEventListener('done', () => {
setDone(true);
setConnected(false);
es.close();
});

es.addEventListener('error', () => {
setConnected(false);
es.close();
});

return () => {
es.close();
esRef.current = null;
};
}, [executionId, token, apiBase]);

return { logs, status, done, connected };
}
Loading