diff --git a/src/app/api/agents/[agentName]/heartbeat/route.test.ts b/src/app/api/agents/[agentName]/heartbeat/route.test.ts new file mode 100644 index 0000000..1188ef7 --- /dev/null +++ b/src/app/api/agents/[agentName]/heartbeat/route.test.ts @@ -0,0 +1,412 @@ +import { describe, expect, it, vi, beforeEach } from "vitest"; + +const mockToken = "test-agent-token"; +process.env.DISPATCH_AGENT_TOKEN = mockToken; + +vi.mock("@/lib/dispatch-env", () => ({ + isAuthorizedAgentToken: vi.fn((token) => token === mockToken), + isAuthorizedBearerToken: vi.fn((token) => token === mockToken), + getAcceptedAgentTokens: vi.fn(() => [mockToken]), + resetCaches: vi.fn(), +})); + +const { mocks, mockAgentRun } = vi.hoisted(() => ({ + mockAgentRun: { + create: vi.fn().mockResolvedValue({ + id: "run-1", + agentName: "test-agent", + runType: "heartbeat", + status: "ok", + startedAt: new Date(), + finishedAt: new Date(), + summary: "Heartbeat completed", + touchedIssueUrls: [], + }), + }, + mocks: { + findUnique: vi.fn().mockResolvedValue(null), + update: vi.fn().mockResolvedValue(undefined), + create: vi.fn().mockImplementation(({ data }) => { + return Promise.resolve({ id: "issue-1", ...data }); + }), + findMany: vi.fn().mockResolvedValue([]), + }, +})); + +vi.mock("@/lib/prisma", () => ({ + prisma: { + agentRun: mockAgentRun, + issue: { + findUnique: mocks.findUnique, + update: mocks.update, + create: mocks.create, + findMany: mocks.findMany, + }, + }, +})); + +vi.mock("@/lib/config", () => ({ + getSyncRepos: vi.fn().mockResolvedValue([{ id: "repo-1", fullName: "org/repo" }]), + parseExcludedLabels: vi.fn().mockReturnValue([]), +})); + +vi.mock("@/lib/github", () => ({ + fetchIssues: vi.fn().mockResolvedValue([]), +})); + +vi.mock("@/lib/heartbeat", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + runSyncBestEffort: vi.fn().mockResolvedValue({ + synced: 10, + reposProcessed: 1, + warnings: [], + errors: [], + touchedIssueUrls: ["repo:org/repo"], + }), + runReconcileBestEffort: vi.fn().mockResolvedValue({ + issuesReconciled: 2, + issuesChecked: 5, + reposProcessed: 1, + warnings: [], + errors: [], + }), + }; +}); + +import { POST } from "./route"; +import { resetAuthCaches } from "@/lib/auth"; +import * as heartbeatModule from "@/lib/heartbeat"; + +function makeRequest( + agentName = "test-agent", + includeAuth = true, + extraHeaders: Record = {}, +) { + const headers: Record = {}; + if (includeAuth) headers.Authorization = `Bearer ${mockToken}`; + Object.assign(headers, extraHeaders); + return new Request(`http://localhost/api/agents/${agentName}/heartbeat`, { + method: "POST", + headers, + body: JSON.stringify({}), + }) as unknown as Parameters[0]; +} + +describe("POST /api/agents/[agentName]/heartbeat — auth", () => { + beforeEach(() => { + delete process.env.DISPATCH_AUTH_MODE; + resetAuthCaches(); + vi.clearAllMocks(); + }); + + it("returns 401 when no authorization header is provided", async () => { + const res = await POST(makeRequest("test-agent", false), { + params: Promise.resolve({ agentName: "test-agent" }), + }); + expect(res.status).toBe(401); + }); + + it("returns 401 when token is incorrect", async () => { + const res = await POST(makeRequest("test-agent", true, { Authorization: "Bearer wrong-token" }), { + params: Promise.resolve({ agentName: "test-agent" }), + }); + expect(res.status).toBe(401); + }); + + it("accepts valid Bearer auth with correct token", async () => { + const res = await POST(makeRequest("test-agent"), { + params: Promise.resolve({ agentName: "test-agent" }), + }); + expect(res.status).toBe(200); + }); + + it("is agent-agnostic — works with any agentName", async () => { + const res = await POST(makeRequest("any-agent-name"), { + params: Promise.resolve({ agentName: "any-agent-name" }), + }); + expect(res.status).toBe(200); + const body = await res.json(); + expect(body.agentName).toBe("any-agent-name"); + }); +}); + +describe("POST /api/agents/[agentName]/heartbeat — success", () => { + beforeEach(() => { + delete process.env.DISPATCH_AUTH_MODE; + resetAuthCaches(); + vi.clearAllMocks(); + // Reset mocks to return clean results + vi.mocked(heartbeatModule.runSyncBestEffort).mockResolvedValue({ + synced: 10, + reposProcessed: 1, + warnings: [], + errors: [], + touchedIssueUrls: ["repo:org/repo"], + }); + vi.mocked(heartbeatModule.runReconcileBestEffort).mockResolvedValue({ + issuesReconciled: 2, + issuesChecked: 5, + reposProcessed: 1, + warnings: [], + errors: [], + }); + }); + + it("returns status ok when all phases succeed", async () => { + const res = await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + expect(res.status).toBe(200); + const body = await res.json(); + expect(body.status).toBe("ok"); + expect(body.agentName).toBe("my-agent"); + expect(body.startedAt).toBeDefined(); + expect(body.finishedAt).toBeDefined(); + expect(typeof body.summary).toBe("string"); + expect(Array.isArray(body.warnings)).toBe(true); + expect(Array.isArray(body.errors)).toBe(true); + expect(Array.isArray(body.touchedIssueUrls)).toBe(true); + }); + + it("records an AgentRun for the heartbeat pass", async () => { + await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + expect(mockAgentRun.create).toHaveBeenCalledTimes(1); + const call = mockAgentRun.create.mock.calls[0][0].data; + expect(call.agentName).toBe("my-agent"); + expect(call.runType).toBe("heartbeat"); + expect(call.status).toBe("ok"); + expect(call.startedAt).toBeDefined(); + expect(call.finishedAt).toBeDefined(); + expect(call.summary).toContain("Heartbeat completed"); + }); + + it("includes touchedIssueUrls in the response and AgentRun", async () => { + await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + const res = await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + const body = await res.json(); + expect(body.touchedIssueUrls).toContain("repo:org/repo"); + }); + + it("does not make model/judgment grooming decisions", async () => { + // The heartbeat should only sync and reconcile, not groom or classify lanes + const res = await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + expect(res.status).toBe(200); + const body = await res.json(); + // No grooming/lane classification fields in the response + expect(body).not.toHaveProperty("lanesClassified"); + expect(body).not.toHaveProperty("groomedIssues"); + }); +}); + +describe("POST /api/agents/[agentName]/heartbeat — warning aggregation", () => { + beforeEach(() => { + delete process.env.DISPATCH_AUTH_MODE; + resetAuthCaches(); + vi.clearAllMocks(); + }); + + it("returns status warning when sync has warnings but no errors", async () => { + vi.mocked(heartbeatModule.runSyncBestEffort).mockResolvedValue({ + synced: 5, + reposProcessed: 2, + warnings: ["Sync warning for org/repo: rate limited"], + errors: [], + touchedIssueUrls: ["repo:org/repo"], + }); + vi.mocked(heartbeatModule.runReconcileBestEffort).mockResolvedValue({ + issuesReconciled: 0, + issuesChecked: 0, + reposProcessed: 2, + warnings: [], + errors: [], + }); + + const res = await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + expect(res.status).toBe(200); + const body = await res.json(); + expect(body.status).toBe("warning"); + expect(body.warnings).toContain("sync: Sync warning for org/repo: rate limited"); + expect(body.errors).toEqual([]); + }); + + it("aggregates warnings from both sync and reconcile phases", async () => { + vi.mocked(heartbeatModule.runSyncBestEffort).mockResolvedValue({ + synced: 5, + reposProcessed: 1, + warnings: ["sync warning"], + errors: [], + touchedIssueUrls: [], + }); + vi.mocked(heartbeatModule.runReconcileBestEffort).mockResolvedValue({ + issuesReconciled: 0, + issuesChecked: 0, + reposProcessed: 1, + warnings: ["reconcile warning"], + errors: [], + }); + + const res = await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + expect(res.status).toBe(200); + const body = await res.json(); + expect(body.status).toBe("warning"); + expect(body.warnings).toContain("sync: sync warning"); + expect(body.warnings).toContain("reconcile: reconcile warning"); + }); + + it("records AgentRun with status warning", async () => { + vi.mocked(heartbeatModule.runSyncBestEffort).mockResolvedValue({ + synced: 0, + reposProcessed: 1, + warnings: ["warning"], + errors: [], + touchedIssueUrls: [], + }); + vi.mocked(heartbeatModule.runReconcileBestEffort).mockResolvedValue({ + issuesReconciled: 0, + issuesChecked: 0, + reposProcessed: 1, + warnings: [], + errors: [], + }); + + await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + const call = mockAgentRun.create.mock.calls[0][0].data; + expect(call.status).toBe("warning"); + }); +}); + +describe("POST /api/agents/[agentName]/heartbeat — error response", () => { + beforeEach(() => { + delete process.env.DISPATCH_AUTH_MODE; + resetAuthCaches(); + vi.clearAllMocks(); + }); + + it("returns status error when sync has errors", async () => { + vi.mocked(heartbeatModule.runSyncBestEffort).mockResolvedValue({ + synced: 0, + reposProcessed: 0, + warnings: [], + errors: ["sync: No tracked repositories found"], + touchedIssueUrls: [], + }); + vi.mocked(heartbeatModule.runReconcileBestEffort).mockResolvedValue({ + issuesReconciled: 0, + issuesChecked: 0, + reposProcessed: 0, + warnings: [], + errors: [], + }); + + const res = await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + expect(res.status).toBe(200); + const body = await res.json(); + expect(body.status).toBe("error"); + expect(body.errors).toContain("sync: sync: No tracked repositories found"); + }); + + it("returns status error when reconcile has errors", async () => { + vi.mocked(heartbeatModule.runSyncBestEffort).mockResolvedValue({ + synced: 10, + reposProcessed: 1, + warnings: [], + errors: [], + touchedIssueUrls: ["repo:org/repo"], + }); + vi.mocked(heartbeatModule.runReconcileBestEffort).mockResolvedValue({ + issuesReconciled: 0, + issuesChecked: 0, + reposProcessed: 0, + warnings: [], + errors: ["reconcile: Database connection failed"], + }); + + const res = await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + expect(res.status).toBe(200); + const body = await res.json(); + expect(body.status).toBe("error"); + expect(body.errors).toContain("reconcile: reconcile: Database connection failed"); + }); + + it("returns status error when both phases have errors", async () => { + vi.mocked(heartbeatModule.runSyncBestEffort).mockResolvedValue({ + synced: 0, + reposProcessed: 0, + warnings: [], + errors: ["sync error"], + touchedIssueUrls: [], + }); + vi.mocked(heartbeatModule.runReconcileBestEffort).mockResolvedValue({ + issuesReconciled: 0, + issuesChecked: 0, + reposProcessed: 0, + warnings: [], + errors: ["reconcile error"], + }); + + const res = await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + expect(res.status).toBe(200); + const body = await res.json(); + expect(body.status).toBe("error"); + expect(body.errors.length).toBe(2); + }); + + it("records AgentRun with status error", async () => { + vi.mocked(heartbeatModule.runSyncBestEffort).mockResolvedValue({ + synced: 0, + reposProcessed: 0, + warnings: [], + errors: ["error"], + touchedIssueUrls: [], + }); + vi.mocked(heartbeatModule.runReconcileBestEffort).mockResolvedValue({ + issuesReconciled: 0, + issuesChecked: 0, + reposProcessed: 0, + warnings: [], + errors: [], + }); + + await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + const call = mockAgentRun.create.mock.calls[0][0].data; + expect(call.status).toBe("error"); + }); + + it("still records AgentRun even when phases throw exceptions", async () => { + vi.mocked(heartbeatModule.runSyncBestEffort).mockRejectedValue(new Error("sync crashed")); + vi.mocked(heartbeatModule.runReconcileBestEffort).mockRejectedValue(new Error("reconcile crashed")); + + const res = await POST(makeRequest("my-agent"), { + params: Promise.resolve({ agentName: "my-agent" }), + }); + expect(res.status).toBe(200); + const body = await res.json(); + expect(body.status).toBe("error"); + expect(body.errors.length).toBe(2); + expect(mockAgentRun.create).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/app/api/agents/[agentName]/heartbeat/route.ts b/src/app/api/agents/[agentName]/heartbeat/route.ts new file mode 100644 index 0000000..039e93a --- /dev/null +++ b/src/app/api/agents/[agentName]/heartbeat/route.ts @@ -0,0 +1,123 @@ +/** + * Agent-agnostic heartbeat orchestration endpoint. + * + * POST /api/agents/{agentName}/heartbeat + * + * Responsibilities: + * - Authenticate with the existing agent token flow; + * - Run deterministic Dispatch-side sync/reconciliation using shared service code; + * - Aggregate warnings/errors (best-effort sync failures are warnings); + * - Record an AgentRun for the heartbeat pass; + * - Return a compact machine-readable result. + */ + +import { NextResponse } from "next/server"; +import { prisma } from "@/lib/prisma"; +import { authorizeRequest } from "@/lib/auth"; +import { runSyncBestEffort, runReconcileBestEffort } from "@/lib/heartbeat"; + +export type AgentHeartbeatResponse = { + status: "ok" | "warning" | "error"; + agentName: string; + startedAt: string; + finishedAt: string; + summary: string; + warnings: string[]; + errors: string[]; + touchedIssueUrls: string[]; +}; + +export async function POST( + request: Request, + { params }: { params: Promise<{ agentName: string }> }, +): Promise> { + const { agentName } = await params; + + // Authenticate + if (!(await authorizeRequest(request)).authorized) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + const startedAt = new Date(); + const warnings: string[] = []; + const errors: string[] = []; + const touchedIssueUrlsSet = new Set(); + + // --- Sync phase (best-effort) --- + try { + const syncResult = await runSyncBestEffort(); + + for (const w of syncResult.warnings) warnings.push(`sync: ${w}`); + for (const e of syncResult.errors) errors.push(`sync: ${e}`); + + // Collect touched issue URLs from synced repos + if (syncResult.touchedIssueUrls.length > 0) { + for (const url of syncResult.touchedIssueUrls) { + touchedIssueUrlsSet.add(url); + } + } + } catch (error) { + const message = error instanceof Error ? error.message : "Unknown sync error"; + errors.push(`sync: ${message}`); + } + + // --- Reconciliation phase (best-effort) --- + try { + const reconcileResult = await runReconcileBestEffort(); + + for (const w of reconcileResult.warnings) warnings.push(`reconcile: ${w}`); + for (const e of reconcileResult.errors) errors.push(`reconcile: ${e}`); + } catch (error) { + const message = error instanceof Error ? error.message : "Unknown reconcile error"; + errors.push(`reconcile: ${message}`); + } + + const finishedAt = new Date(); + const touchedIssueUrls = Array.from(touchedIssueUrlsSet); + + // Determine overall status + let status: "ok" | "warning" | "error" = "ok"; + if (errors.length > 0) { + status = "error"; + } else if (warnings.length > 0) { + status = "warning"; + } + + // Build summary + const summaryParts: string[] = []; + summaryParts.push(`synced issues`); + summaryParts.push(`${warnings.length} warning(s)`); + if (errors.length > 0) { + summaryParts.push(`${errors.length} error(s)`); + } + const summary = `Heartbeat completed: ${summaryParts.join(", ")}`; + + // Record AgentRun for the heartbeat pass + try { + await prisma.agentRun.create({ + data: { + agentName, + runType: "heartbeat", + status, + startedAt, + finishedAt, + summary, + touchedIssueUrls, + }, + }); + } catch (error) { + // Best-effort — don't fail the heartbeat if run recording fails + console.error("Failed to record AgentRun for heartbeat:", error); + } + + return NextResponse.json({ + status, + agentName, + startedAt: startedAt.toISOString(), + finishedAt: finishedAt.toISOString(), + summary, + warnings, + errors, + touchedIssueUrls, + }); +} diff --git a/src/lib/heartbeat.ts b/src/lib/heartbeat.ts new file mode 100644 index 0000000..a6de0d2 --- /dev/null +++ b/src/lib/heartbeat.ts @@ -0,0 +1,175 @@ +/** + * Shared heartbeat orchestration helpers for Dispatch. + * + * Provides best-effort sync and reconciliation logic that can be reused by + * the agent-agnostic heartbeat endpoint and any future orchestrator. + */ + +import { prisma } from "@/lib/prisma"; +import { fetchIssues } from "@/lib/github"; +import { getSyncRepos, parseExcludedLabels } from "@/lib/config"; +import { + syncIssuesForRepos, + mergeLabels, + reconcileClosedIssues, + type SyncedIssueData, + type ClosedIssueReconcileResponse, +} from "@/lib/issue-sync"; + +// --------------------------------------------------------------------------- +// Sync orchestration +// --------------------------------------------------------------------------- + +export interface SyncStepResult { + synced: number; + reposProcessed: number; + warnings: string[]; + errors: string[]; + touchedIssueUrls: string[]; +} + +/** + * Run issue sync best-effort and return aggregated results. + * + * Individual repo failures become warnings; a completely empty repo list + * (or total failure to fetch repos) becomes an error so callers can + * distinguish "nothing to do" from "something went wrong". + */ +export async function runSyncBestEffort( + opts?: { excludedLabels?: string[] }, +): Promise { + const warnings: string[] = []; + const errors: string[] = []; + const touchedIssueUrls: string[] = []; + let syncedCount = 0; + + try { + const repos = await getSyncRepos(); + + if (repos.length === 0) { + errors.push("No tracked repositories found — sync skipped"); + return { synced: 0, reposProcessed: 0, warnings, errors, touchedIssueUrls }; + } + + const excludedLabels = opts?.excludedLabels ?? parseExcludedLabels(process.env.DISPATCH_EXCLUDED_LABELS); + + const result = await syncIssuesForRepos(repos, fetchIssues, { + findIssue(repositoryId: string, number: number) { + return prisma.issue.findUnique({ + where: { repositoryId_number: { repositoryId, number } }, + }); + }, + async updateIssue(id: string, data: SyncedIssueData) { + const existing = await prisma.issue.findUnique({ + where: { id }, + select: { labels: true }, + }); + + if (existing && existing.labels.length > 0) { + data.labels = mergeLabels(data.labels, existing.labels); + } + + await prisma.issue.update({ where: { id }, data }); + }, + async createIssue(repositoryId: string, data: SyncedIssueData) { + await prisma.issue.create({ data: { ...data, repositoryId } }); + }, + }, excludedLabels); + + syncedCount = result.syncedCount; + + for (const r of result.results) { + if (r.error) { + warnings.push(`Sync warning for ${r.repo}: ${r.error}`); + } else { + // Collect touched issue URLs from successful repos + // We don't have per-issue URLs here, so we note the repo was synced + touchedIssueUrls.push(`repo:${r.repo}`); + } + } + + if (!result.success) { + errors.push("Sync completed with one or more repo failures"); + } + } catch (error) { + const message = error instanceof Error ? error.message : "Unknown sync error"; + errors.push(`Sync failed: ${message}`); + } + + return { synced: syncedCount, reposProcessed: 0, warnings, errors, touchedIssueUrls }; +} + +// --------------------------------------------------------------------------- +// Reconciliation orchestration +// --------------------------------------------------------------------------- + +export interface ReconcileStepResult { + issuesReconciled: number; + issuesChecked: number; + reposProcessed: number; + warnings: string[]; + errors: string[]; +} + +/** + * Run closed-issue reconciliation best-effort and return aggregated results. + * + * Individual repo failures become warnings; a completely empty repo list + * becomes an error. + */ +export async function runReconcileBestEffort(): Promise { + const warnings: string[] = []; + const errors: string[] = []; + + try { + const repos = await getSyncRepos(); + + if (repos.length === 0) { + errors.push("No tracked repositories found — reconcile skipped"); + return { issuesReconciled: 0, issuesChecked: 0, reposProcessed: 0, warnings, errors }; + } + + const result = await reconcileClosedIssues( + repos, + async (repoFullName: string, issueNumber: number) => { + // Fetch the latest issue state from GitHub for reconciliation + const issues = await fetchIssues(repoFullName); + const found = issues.find((i) => i.number === issueNumber); + if (!found) { + throw new Error(`Issue #${issueNumber} not found in ${repoFullName}`); + } + return found; + }, + { + findActiveCachedIssues(repositoryId: string) { + return prisma.issue.findMany({ + where: { + repositoryId, + state: "open", + currentLane: { in: ["status/ready", "status/in-progress", "status/in-review"] }, + }, + select: { id: true, number: true, labels: true, state: true }, + }); + }, + async updateIssue(id: string, data: { labels: string[]; state: string; closedAt?: Date | null }) { + await prisma.issue.update({ where: { id }, data }); + }, + }, + ); + + for (const r of result.results) { + if (r.error && !r.reconciled) { + warnings.push(`Reconcile warning for ${r.repo}#${r.issueNumber}: ${r.error}`); + } + } + + if (!result.success) { + errors.push("Reconciliation completed with one or more failures"); + } + } catch (error) { + const message = error instanceof Error ? error.message : "Unknown reconcile error"; + errors.push(`Reconciliation failed: ${message}`); + } + + return { issuesReconciled: 0, issuesChecked: 0, reposProcessed: 0, warnings, errors }; +}