From 5aec12052cf291a906fd3f118e28ae073653ea13 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Mon, 8 Jun 2026 19:42:38 -0600 Subject: [PATCH] =?UTF-8?q?feat(bench):=20EOPS=20on=20the=20CANONICAL=20lo?= =?UTF-8?q?op=20system=20=E2=80=94=20Supervisor=20+=20observe()=20analyst?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stops hand-rolling. Brings the Supervisor depth/breadth drivers forward (the general agentic primitive, d5aa85a) and makes them truly canonical: the depth STEERER is now agent-eval's observe() (makeFinding + ChatClient + the derived_from_judge firewall), not a hand-rolled chat call. The worker runs through the keystone (createSupervisor + Scope + Agent.act + scope.spawn shots), metered by the conserved budget pool (equal-k by construction), journaled. This is the loop system we built, end to end — depth (continue over one artifact, observe()-steered) vs breadth (parallel best-of), over the live EOPS gym. The +13.4pp branch hand-rolled its analyst; this is the first run with the REAL analyst. Fixed the executor-rename drift (LeafExecutor→Executor etc, #190). Smoke (n=2, deepseek-v4-pro) runs clean through the Supervisor; full n in flight. Replaces the throwaway flat-loop eops-gate prototype for the science. --- bench/src/agentic-eops.ts | 73 +++++++ bench/src/agentic-run.mts | 84 ++++++++ bench/src/agentic.ts | 425 ++++++++++++++++++++++++++++++++++++++ bench/src/gym-agent.ts | 404 ++++++++++++++++++++++++++++++++++++ 4 files changed, 986 insertions(+) create mode 100644 bench/src/agentic-eops.ts create mode 100644 bench/src/agentic-run.mts create mode 100644 bench/src/agentic.ts create mode 100644 bench/src/gym-agent.ts diff --git a/bench/src/agentic-eops.ts b/bench/src/agentic-eops.ts new file mode 100644 index 0000000..86c82b9 --- /dev/null +++ b/bench/src/agentic-eops.ts @@ -0,0 +1,73 @@ +/** + * EnterpriseOps as an `AgenticSurface` — the EOPS instance of the general agentic primitive. + * + * This is ALL the EOPS-specific code: open = seed an isolated gym DB; tools = the gym's MCP tool + * schemas; call = an MCP tools/call; score = the deterministic SQL verifiers; close = delete the DB. + * Reuses the primitives proven against the live container in gym-agent.ts. A new domain (Commit0, + * AppWorld, terminal-bench) ships its own file like this one — the drivers in agentic.ts never change. + */ + +import type { AgenticSurface, AgenticTask, AgenticTool, ArtifactHandle, SurfaceScore } from './agentic' +import { callTool, deleteDb, type GymServer, type GymVerifier, loadTools, runVerifiers, seed } from './gym-agent' + +interface EopsMeta { + servers: GymServer[] + verifiers: GymVerifier[] + selectedTools: string[] +} + +function metaOf(task: AgenticTask): EopsMeta { + const m = task.meta as EopsMeta | undefined + if (!m || !Array.isArray(m.servers) || !Array.isArray(m.verifiers)) { + throw new Error(`eops surface: task ${task.id} missing meta.servers/verifiers`) + } + return m +} + +/** Build an `AgenticTask` from an EnterpriseOps HF row (the EOPS-shaped fields go in `meta`). */ +export function eopsTaskFromRow(row: { + task_id: string + system_prompt: string + user_prompt: string + selected_tools: string[] + gym_servers_config: string | GymServer[] + verifiers: string | GymVerifier[] +}): AgenticTask { + const arr = (v: string | T[]): T[] => (typeof v === 'string' ? (JSON.parse(v) as T[]) : v) + return { + id: row.task_id, + systemPrompt: row.system_prompt, + userPrompt: row.user_prompt, + meta: { + servers: arr(row.gym_servers_config), + verifiers: arr(row.verifiers), + selectedTools: row.selected_tools, + } satisfies EopsMeta, + } +} + +/** The EnterpriseOps surface. `gymDbsDir` = the unzipped gym_dbs.zip (resolves seed_database_file). */ +export function createEopsSurface(gymDbsDir: string): AgenticSurface { + return { + name: 'eops', + async open(task: AgenticTask): Promise { + // Clone the server config and seed a fresh isolated DB (seed() stamps server._database_id). + const server: GymServer = { ...metaOf(task).servers[0]! } + await seed(server, gymDbsDir) + return { id: server._database_id ?? task.id, surface: 'eops', ctx: server } + }, + async tools(task: AgenticTask, handle: ArtifactHandle): Promise { + return loadTools(handle.ctx as GymServer, metaOf(task).selectedTools) + }, + call(handle: ArtifactHandle, name: string, args: Record): Promise { + return callTool(handle.ctx as GymServer, name, args) + }, + async score(task: AgenticTask, handle: ArtifactHandle): Promise { + const server = handle.ctx as GymServer + return runVerifiers({ servers: [server], verifiers: metaOf(task).verifiers } as never) + }, + close(handle: ArtifactHandle): Promise { + return deleteDb(handle.ctx as GymServer) + }, + } +} diff --git a/bench/src/agentic-run.mts b/bench/src/agentic-run.mts new file mode 100644 index 0000000..d3a7e17 --- /dev/null +++ b/bench/src/agentic-run.mts @@ -0,0 +1,84 @@ +/** + * Run the general agentic primitive over EnterpriseOps — depth (sequential, same artifact) and + * breadth (parallel), both through the keystone Supervisor. Reports the depth progress-over-shots + * curve and a depth-vs-breadth comparison at matched compute. + * + * export TANGLE_API_KEY=… EOPS_GYM_DBS_DIR= # itsm gym on :8006 + * TASKS=4 MAX_SHOTS=5 WIDTH=5 INNER_TURNS=4 WORKER_MODEL=gpt-4.1 tsx src/agentic-run.mts + */ + +import { type AgenticOptions, type AgenticTask, runAgentic } from './agentic' +import { createEopsSurface, eopsTaskFromRow } from './agentic-eops' + +const must = (k: string): string => { + const v = process.env[k] + if (!v) throw new Error(`env ${k} is required`) + return v +} + +async function loadItsmTasks(n: number): Promise { + const url = + 'https://datasets-server.huggingface.co/rows?dataset=ServiceNow-AI%2FEnterpriseOps-Gym' + + `&config=oracle&split=itsm&offset=0&length=${n}` + const res = await fetch(url) + if (!res.ok) throw new Error(`HF rows ${res.status}`) + const body = (await res.json()) as { rows?: Array<{ row: Parameters[0] }> } + return (body.rows ?? []).slice(0, n).map(({ row }) => eopsTaskFromRow(row)) +} + +async function main(): Promise { + const nTasks = Number(process.env.TASKS ?? 4) + const maxShots = Number(process.env.MAX_SHOTS ?? 5) + const width = Number(process.env.WIDTH ?? 5) + const opts: AgenticOptions = { + routerBaseUrl: process.env.ROUTER_BASE ?? 'https://router.tangle.tools/v1', + routerKey: must('TANGLE_API_KEY'), + model: process.env.WORKER_MODEL ?? 'gpt-4.1', + temperature: Number(process.env.TEMPERATURE ?? 0.7), + innerTurns: Number(process.env.INNER_TURNS ?? 4), + } + const surface = createEopsSurface(must('EOPS_GYM_DBS_DIR')) + const tasks = await loadItsmTasks(nTasks) + console.log(`agentic primitive over EOPS: ${tasks.length} itsm tasks, ${opts.model}, maxShots=${maxShots}, width=${width}, innerTurns=${opts.innerTurns}\n`) + + const rows: Array<{ depth: number; breadth: number; cD: number; cB: number; prog: number[] }> = [] + for (const [i, task] of tasks.entries()) { + let depth: Awaited> + try { + depth = await runAgentic({ ...opts, surface, task, mode: 'depth', budget: maxShots }) + } catch (e) { + console.log(` task ${i}: DEPTH failed — ${e instanceof Error ? e.message.slice(0, 110) : e}`) + continue + } + // Breadth at matched compute: widen until cumulative completions ≥ depth's. + let breadthScore = 0 + let cB = 0 + let w = 0 + while (cB < depth.completions && w < width * 3) { + const step = Math.max(1, Math.min(width, Math.ceil((depth.completions - cB) / (opts.innerTurns ?? 4)))) + const b = await runAgentic({ ...opts, surface, task, mode: 'breadth', budget: step }) + cB += b.completions + w += step + if (b.score > breadthScore) breadthScore = b.score + } + rows.push({ depth: depth.score, breadth: breadthScore, cD: depth.completions, cB, prog: depth.progression }) + console.log( + ` task ${i} ${task.id.slice(0, 24)}: DEPTH ${(depth.score * 100).toFixed(0)}% [progress ${depth.progression.map((s) => (s * 100).toFixed(0)).join('→')}] ` + + `${depth.completions} comp vs BREADTH ${(breadthScore * 100).toFixed(0)}% (best-of-${w}, ${cB} comp)`, + ) + } + + if (rows.length === 0) throw new Error('no scoreable tasks') + const mean = (f: (r: (typeof rows)[number]) => number) => rows.reduce((a, r) => a + f(r), 0) / rows.length + const d = mean((r) => r.depth) + const b = mean((r) => r.breadth) + console.log(`\n=== n=${rows.length}, equal compute (~${mean((r) => r.cD).toFixed(0)} vs ${mean((r) => r.cB).toFixed(0)} comp) ===`) + console.log(`DEPTH (continue, same artifact): ${(d * 100).toFixed(1)}%`) + console.log(`BREADTH (parallel best-of): ${(b * 100).toFixed(1)}%`) + console.log(`VERDICT: depth ${d >= b ? 'BEATS' : 'loses to'} breadth by ${((d - b) * 100).toFixed(1)}pp at equal compute`) +} + +main().catch((e) => { + console.error(e instanceof Error ? (e.stack ?? e.message) : String(e)) + process.exit(1) +}) diff --git a/bench/src/agentic.ts b/bench/src/agentic.ts new file mode 100644 index 0000000..c91f5e3 --- /dev/null +++ b/bench/src/agentic.ts @@ -0,0 +1,425 @@ +/** + * The general agentic primitive — sequential (depth) and parallel (breadth) over a shared, + * checkable artifact, driven through the keystone Supervisor as one recursive `Agent.act`. + * + * The domain lives behind ONE seam — `AgenticSurface` (open an artifact, list tools, call a tool, + * score the artifact, close it). EnterpriseOps implements it (seed a gym DB, MCP tools, SQL + * verifier); Commit0/AppWorld/terminal-bench implement it the same way (a repo workspace, shell + * tools, the test suite). The drivers below are domain-blind: they run over any surface. + * + * Two shapes, the agent's POMDP rollout as the unit: + * - DEPTH one persistent artifact carried across shots. Each shot the agent works the tool loop; + * between shots a trace-analyst (selector≠judge: reads the trajectory, never the score) + * steers the resumed session toward what's unfinished. shot n stands on shot n-1's + * artifact state + history. This is continuation — long-horizon, same artifact. + * - BREADTH K independent artifacts, each a fresh rollout, the deployable verifier picks the best. + * + * Both are an `Agent` whose `act` spawns leaf shots through `scope.spawn` and reacts via + * `scope.next()` — so the conserved budget pool meters them (equal-k by construction), the journal + * records the tree, and the same primitive nests. `runAgentic` runs the chosen driver through + * `createSupervisor().run`. The leaf (one shot over a handle) is resolved per-spawn from a + * surface-closed registry — the open `Executor` seam, not bespoke per-benchmark glue. + */ + +import { createChatClient } from '@tangle-network/agent-eval' +import { + createSupervisor, + InMemoryResultBlobStore, + InMemorySpawnJournal, + observe, +} from '@tangle-network/agent-runtime/loops' +import type { + Agent, + AgentSpec, + Budget, + ExecutorContext, + ExecutorRegistry, + Executor, + ExecutorFactory, + ExecutorResult, + Outcome, + Scope, + Settled, + Spend, +} from '@tangle-network/agent-runtime/loops' + +// ── The general surface seam (the only thing a new benchmark implements) ───────── + +export interface AgenticTask { + readonly id: string + readonly systemPrompt: string + readonly userPrompt: string + /** Opaque domain payload the surface reads (EOPS: servers/verifiers/tools). Drivers never read it. */ + readonly meta?: Record +} + +export interface ArtifactHandle { + readonly id: string + readonly surface: string + /** Opaque per-artifact context the surface stashes (EOPS: the seeded gym server + db id). */ + readonly ctx?: unknown +} + +export interface AgenticTool { + readonly type: 'function' + readonly function: { name: string; description?: string; parameters: Record } +} + +export interface SurfaceScore { + passes: number + total: number + /** Checks excluded as malformed (data defect, not the agent). `total === 0` ⇒ unscoreable. */ + errored: number +} + +/** A stateful, checkable environment an agent operates over with tools. Open behind one interface. */ +export interface AgenticSurface { + readonly name: string + open(task: AgenticTask): Promise + tools(task: AgenticTask, handle: ArtifactHandle): Promise + call(handle: ArtifactHandle, name: string, args: Record): Promise + score(task: AgenticTask, handle: ArtifactHandle): Promise + close(handle: ArtifactHandle): Promise +} + +export interface AgenticOptions { + routerBaseUrl: string + routerKey: string + model: string + temperature?: number + /** Turns the agent may take within ONE shot before the driver intervenes. */ + innerTurns?: number +} + +// ── The unit: one agentic shot (a bounded tool loop) over a handle ─────────────── + +type Msg = Record +interface ToolCall { + id: string + function: { name: string; arguments: string } +} + +interface ShotTask { + task: AgenticTask + handle?: ArtifactHandle // present ⇒ DEPTH (shared artifact); absent ⇒ BREADTH (open own) + messages?: Msg[] // carried conversation (depth); fresh when absent + steer?: string // analyst-derived steer injected before this shot (depth) +} + +interface ShotOut { + messages: Msg[] + completions: number + toolCalls: number + toolErrors: number +} + +const taskNudge = + 'Use the available tools to bring the artifact to the required final state. Address EVERY distinct ' + + 'change the request implies. After each tool result, check what remains and continue. Re-read the ' + + 'values you set to confirm they took. Reply DONE only once every required change is made and verified.' + +/** One shot: run the agent's tool loop (≤ innerTurns) over the handle, mutating the artifact via + * `surface.call`, carrying `messages`. Returns the updated conversation + counts. */ +async function runShot( + surface: AgenticSurface, + task: AgenticTask, + handle: ArtifactHandle, + tools: AgenticTool[], + messages: Msg[], + opts: AgenticOptions, +): Promise { + const innerTurns = opts.innerTurns ?? 4 + let completions = 0 + let toolCalls = 0 + let toolErrors = 0 + for (let t = 0; t < innerTurns; t += 1) { + const res = await fetch(`${opts.routerBaseUrl.replace(/\/$/, '')}/chat/completions`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${opts.routerKey}` }, + body: JSON.stringify({ model: opts.model, messages, tools, tool_choice: 'auto', temperature: opts.temperature ?? 0.7 }), + }) + if (!res.ok) throw new Error(`router ${res.status}: ${(await res.text()).slice(0, 200)}`) + completions += 1 + const data = (await res.json()) as { choices?: Array<{ message?: { content?: string; tool_calls?: ToolCall[] } }> } + const msg = data.choices?.[0]?.message + if (!msg) break + const calls = msg.tool_calls ?? [] + messages.push({ role: 'assistant', content: msg.content ?? '', ...(calls.length ? { tool_calls: calls } : {}) }) + if (calls.length === 0) break + for (const call of calls) { + toolCalls += 1 + let args: Record = {} + try { + args = JSON.parse(call.function.arguments || '{}') + } catch { + toolErrors += 1 + } + let out: string + try { + out = await surface.call(handle, call.function.name, args) + if (out.startsWith('ERROR:')) toolErrors += 1 + } catch (e) { + toolErrors += 1 + out = `ERROR: ${e instanceof Error ? e.message : String(e)}` + } + messages.push({ role: 'tool', tool_call_id: call.id, content: out }) + } + } + return { messages, completions, toolCalls, toolErrors } +} + +/** The trace-analyst (selector≠judge): reads ONLY the trajectory + task, never the score. */ +/** The depth STEERER, on the CANONICAL analyst: agent-eval's `observe()` (makeFinding + + * ChatClient + the derived_from_judge firewall) reads the agent's tool-call trajectory + * (behavior, never the score) and returns findings; we steer on their recommended_actions. + * The trajectory (calls + RESULTS) rides in `output` so the analyst sees what actually + * happened, not just tool names. No actionable findings ⇒ COMPLETE (depth self-terminates). */ +async function analyze(task: AgenticTask, messages: Msg[], opts: AgenticOptions): Promise { + const trajectory = messages + .filter((m) => m.role === 'assistant' || m.role === 'tool') + .map((m) => { + if (m.role === 'tool') return `RESULT ${String(m.content).slice(0, 280)}` + const calls = (m.tool_calls as ToolCall[] | undefined)?.map((c) => `${c.function.name}(${c.function.arguments})`).join(', ') + return calls ? `CALL ${calls}` : `SAY ${String(m.content).slice(0, 200)}` + }) + .join('\n') + .slice(0, 7000) + const chat = createChatClient({ transport: 'router', apiKey: opts.routerKey, baseUrl: opts.routerBaseUrl, defaultModel: opts.model }) + const obs = await observe( + { task: task.userPrompt, output: trajectory, trace: messages, outcome: 'failed' }, + { chat, model: opts.model }, + ) + // The steer = the analyst's recommended actions for the agent. Empty ⇒ nothing left to do. + const steer = obs.findings + .map((f) => f.recommended_action) + .filter((a): a is string => typeof a === 'string' && a.trim().length > 0) + .join('\n') + .trim() + return steer || 'COMPLETE' +} + +// ── Leaf executors (one shot / one analyst), resolved per-spawn from the surface ── + +interface ShotResult { + messages: Msg[] + score: number + passes: number + total: number + completions: number + toolErrors: number +} + +const spend = (iterations: number): Spend => ({ iterations, tokens: { input: 0, output: 0 }, usd: 0, ms: 0 }) + +/** Resolve a shot: if `handle` given, operate on the SHARED artifact (depth); else open+score+close + * an OWN artifact (breadth). Always scores the artifact's final state as the deployable verdict. */ +function shotExecutor(surface: AgenticSurface, opts: AgenticOptions): Executor { + let artifact: ExecutorResult | undefined + return { + runtime: 'agentic-shot', + async execute(task: unknown): Promise> { + const t = task as ShotTask + const own = !t.handle + const handle = t.handle ?? (await surface.open(t.task)) + try { + const tools = await surface.tools(t.task, handle) + const messages: Msg[] = t.messages ?? [ + { role: 'system', content: t.task.systemPrompt }, + { role: 'user', content: `${t.task.userPrompt}\n\n${taskNudge}` }, + ] + if (t.steer) messages.push({ role: 'user', content: t.steer }) + const shot = await runShot(surface, t.task, handle, tools, messages, opts) + const s = await surface.score(t.task, handle) + const score = s.total > 0 ? s.passes / s.total : 0 + const out: ShotResult = { messages: shot.messages, score, passes: s.passes, total: s.total, completions: shot.completions, toolErrors: shot.toolErrors } + artifact = { + outRef: `shot:${handle.id}:${shot.completions}:${s.passes}/${s.total}`, + out, + verdict: { valid: s.total > 0 && s.passes === s.total, score }, + spent: spend(shot.completions), + } + return artifact + } finally { + if (own) await surface.close(handle) + } + }, + teardown: () => Promise.resolve({ destroyed: true }), + resultArtifact() { + if (!artifact) throw new Error('shotExecutor: resultArtifact before execute') + return artifact + }, + } +} + +function analystExecutor(opts: AgenticOptions): Executor { + let artifact: ExecutorResult | undefined + return { + runtime: 'agentic-analyst', + async execute(task: unknown): Promise> { + const t = task as { task: AgenticTask; messages: Msg[] } + const findings = await analyze(t.task, t.messages, opts) + artifact = { outRef: `analyst:${findings.length}`, out: findings, spent: spend(1) } + return artifact + }, + teardown: () => Promise.resolve({ destroyed: true }), + resultArtifact() { + if (!artifact) throw new Error('analystExecutor: resultArtifact before execute') + return artifact + }, + } +} + +/** Registry dispatching on the child's role tag — fresh executor per spawn (no shared-instance race). */ +function agenticRegistry(surface: AgenticSurface, opts: AgenticOptions): ExecutorRegistry { + return { + register() { + throw new Error('agenticRegistry: register unsupported') + }, + resolve(spec: AgentSpec) { + const role = (spec.profile.metadata as { role?: string } | undefined)?.role + const factory: ExecutorFactory = (_s: AgentSpec, _ctx: ExecutorContext) => + (role === 'analyst' ? analystExecutor(opts) : shotExecutor(surface, opts)) as Executor + return { succeeded: true as const, value: factory } + }, + } +} + +function leaf(name: string, role: 'shot' | 'analyst'): Agent> { + const agent = { + name, + executorSpec: { profile: { name, metadata: { role } }, harness: null } as unknown as AgentSpec, + act(): Promise> { + throw new Error(`agentic: spawned leaf "${name}" run as a driver`) + }, + } + return agent as Agent> +} + +/** Drain exactly one settlement (the just-spawned child). */ +async function drainOne(scope: Scope>): Promise>> { + const s = await scope.next() + if (!s) throw new Error('agentic: spawned child never settled') + return s +} + +// ── The result + the two drivers (domain-blind Agents run by the Supervisor) ───── + +export interface AgenticRunResult { + mode: 'depth' | 'breadth' + score: number + resolved: boolean + completions: number + /** DEPTH: score after each shot — the progress-over-rounds curve. BREADTH: best-so-far per rollout. */ + progression: number[] + shots: number +} + +const perChild = (innerTurns: number): Budget => ({ maxIterations: innerTurns + 1, maxTokens: 1_000_000 }) + +/** DEPTH: one persistent artifact, carried across analyst-steered shots. */ +function depthDriver(surface: AgenticSurface, task: AgenticTask, opts: AgenticOptions, cfg: { maxShots: number }): Agent> { + const innerTurns = opts.innerTurns ?? 4 + let pendingSteer: string | undefined // analyst-derived steer carried between shots + return { + name: 'depth', + async act(_t, scope): Promise> { + const handle = await surface.open(task) + const progression: number[] = [] + let messages: Msg[] | undefined + let completions = 0 + let shots = 0 + try { + for (shots = 0; shots < cfg.maxShots; shots += 1) { + const child = leaf(`shot:${shots}`, 'shot') + const steer = shots === 0 ? undefined : pendingSteer + const res = scope.spawn(child, { task, handle, messages, steer } as ShotTask, { budget: perChild(innerTurns), label: `shot:${shots}` }) + if (!res.ok) break + const settled = await drainOne(scope) + if (settled.kind === 'down') break + const out = settled.out as unknown as ShotResult + messages = out.messages + completions += out.completions + progression.push(out.score) + if (out.score >= 1 || shots === cfg.maxShots - 1) break + // Analyst reads the trajectory (firewalled) → steer the resumed session. + const aChild = leaf(`analyst:${shots}`, 'analyst') + const aRes = scope.spawn(aChild, { task, messages }, { budget: perChild(1), label: `analyst:${shots}` }) + if (!aRes.ok) break + const aSettled = await drainOne(scope) + completions += 1 + if (aSettled.kind === 'down') break + const findings = aSettled.out as unknown as string + if (/^\s*COMPLETE\b/i.test(findings)) break + pendingSteer = `A reviewer flagged unfinished items:\n${findings}\n\nAddress each with the tools, verify they took, then continue.` + } + const final = await surface.score(task, handle) + const score = final.total > 0 ? final.passes / final.total : 0 + return { kind: 'done', deliverable: { mode: 'depth', score, resolved: final.total > 0 && final.passes === final.total, completions, progression, shots: shots + 1 } } + } finally { + await surface.close(handle) + } + }, + } +} + +/** BREADTH: K independent rollouts (each own artifact), verifier picks the best. */ +function breadthDriver(surface: AgenticSurface, task: AgenticTask, opts: AgenticOptions, cfg: { width: number }): Agent> { + const innerTurns = opts.innerTurns ?? 4 + return { + name: 'breadth', + async act(_t, scope): Promise> { + let opened = 0 + for (let k = 0; k < cfg.width; k += 1) { + const res = scope.spawn(leaf(`rollout:${k}`, 'shot'), { task } as ShotTask, { budget: perChild(innerTurns), label: `rollout:${k}` }) + if (res.ok) opened += 1 + } + if (opened === 0) return { kind: 'blocked', blockers: ['breadth: pool admitted no rollout'] } + let best = -1 + let bestResolved = false + let completions = 0 + const progression: number[] = [] + for (let s = await scope.next(); s !== null; s = await scope.next()) { + if (s.kind === 'down') continue + const out = s.out as unknown as ShotResult + completions += out.completions + if (out.score > best) best = out.score + if (out.total > 0 && out.passes === out.total) bestResolved = true + progression.push(best) + } + if (best < 0) return { kind: 'blocked', blockers: ['breadth: every rollout went down'] } + return { kind: 'done', deliverable: { mode: 'breadth', score: best, resolved: bestResolved, completions, progression, shots: opened } } + }, + } +} + +export interface RunAgenticOptions extends AgenticOptions { + surface: AgenticSurface + task: AgenticTask + mode: 'depth' | 'breadth' + /** depth: max shots; breadth: rollout width. */ + budget: number + rootBudget?: Budget +} + +/** Run the chosen driver through the keystone Supervisor — `Agent.act` over a conserved-budget Scope. */ +export async function runAgentic(opts: RunAgenticOptions): Promise { + const driver = + opts.mode === 'depth' + ? depthDriver(opts.surface, opts.task, opts, { maxShots: opts.budget }) + : breadthDriver(opts.surface, opts.task, opts, { width: opts.budget }) + const supervisor = createSupervisor>() + const root: Budget = opts.rootBudget ?? { maxIterations: opts.budget * ((opts.innerTurns ?? 4) + 2), maxTokens: 1_000_000_000 } + const result = await supervisor.run(driver, undefined, { + budget: root, + runId: `agentic:${opts.mode}:${opts.task.id}`, + journal: new InMemorySpawnJournal(), + blobs: new InMemoryResultBlobStore(), + executors: agenticRegistry(opts.surface, opts), + maxDepth: 3, + }) + if (result.kind !== 'winner' || result.out.kind !== 'done') { + const reason = result.kind === 'winner' ? `blocked: ${(result.out as { blockers?: string[] }).blockers?.join('; ')}` : `no-winner: ${result.reason}` + throw new Error(`runAgentic(${opts.mode}) produced no result — ${reason}`) + } + return result.out.deliverable as AgenticRunResult +} diff --git a/bench/src/gym-agent.ts b/bench/src/gym-agent.ts new file mode 100644 index 0000000..f26ce4a --- /dev/null +++ b/bench/src/gym-agent.ts @@ -0,0 +1,404 @@ +/** + * An agentic EnterpriseOps shot — a real tool-calling loop, NOT a one-shot plan. + * + * The agent gets the gym's MCP tools as OpenAI function-tools and loops: completion → tool_calls + * → execute each against the gym (mutating its seeded DB) → feed results back → repeat, until it + * stops calling tools or hits maxTurns. This is the "shot" the gate's leaf should be on an agentic + * domain: the agent sees tool RESULTS and adapts, so it can do multi-step work (look up an id, then + * use it) that a blind single completion cannot. Score = the deterministic verifiers over the final + * DB state. Each shot seeds its OWN isolated database (its own database_id), so n shots run + * concurrently without colliding. + * + * Self-contained against the live gym (router + HTTP), no sandbox: an agentic shot here is a + * tool-calling loop, which the router serves directly. The same seed/replay/verify protocol the + * judge proved (scripts/enterpriseops_gym_judge.py) — lifted to a live loop instead of a transcript + * replay. + */ + +import { readFile } from 'node:fs/promises' +import { isAbsolute, join } from 'node:path' + +export interface GymServer { + mcp_server_name: string + mcp_server_url: string + seed_database_file: string + context?: Record + /** The per-rollout isolated database id, attached after seeding (every call targets it). */ + _database_id?: string +} + +export interface GymVerifier { + name: string + gym_name: string + validation_config: { query: string; expected_value: unknown; comparison_type: string } +} + +export interface GymTask { + id: string + systemPrompt: string + userPrompt: string + selectedTools: string[] + servers: GymServer[] + verifiers: GymVerifier[] +} + +export interface GymShotResult { + passes: number + total: number + score: number + resolved: boolean + turns: number + toolCalls: number + toolErrors: number + /** Verifiers excluded because their SQL was rejected by the server (data defect, not the agent). */ + verifierErrors: number + /** True when NO verifier ran (every one was malformed) — the task is unscoreable; skip it. */ + unscoreable: boolean +} + +export interface OpenAiTool { + type: 'function' + function: { name: string; description?: string; parameters: Record } +} + +interface ToolCall { + id: string + function: { name: string; arguments: string } +} + +const headersFor = (server: GymServer): Record => ({ + 'content-type': 'application/json', + accept: 'application/json, text/event-stream', + ...(server.context ?? {}), + ...(server._database_id ? { 'x-database-id': server._database_id } : {}), +}) + +function parseBody(raw: string): unknown { + const text = raw.trim() + if (!text) return {} + if (text.startsWith('event:') || text.startsWith('data:') || text.includes('\ndata:')) { + const matches = [...text.matchAll(/data:\s*(\{[\s\S]*?\})\s*(?:\n|$)/g)] + if (matches.length > 0) return JSON.parse(matches[matches.length - 1]![1]!) + } + return JSON.parse(text) +} + +async function httpPost(url: string, payload: unknown, headers: Record, method = 'POST'): Promise { + const res = await fetch(url, { method, headers, body: JSON.stringify(payload) }) + if (!res.ok) throw new HttpError(res.status, (await res.text()).slice(0, 200)) + return res.text() +} + +class HttpError extends Error { + constructor( + readonly status: number, + body: string, + ) { + super(`HTTP ${status}: ${body}`) + } +} + +/** Seed an isolated DB for this shot from the task's snapshot; sets `server._database_id`. */ +export async function seed(server: GymServer, gymDbsDir: string): Promise { + if (!server.seed_database_file) throw new Error(`server ${server.mcp_server_name} has no seed_database_file`) + const path = isAbsolute(server.seed_database_file) ? server.seed_database_file : join(gymDbsDir, server.seed_database_file) + const sql = await readFile(path, 'utf8') + const dbId = `gate_${Math.random().toString(16).slice(2, 14)}` + const url = `${server.mcp_server_url.replace(/\/$/, '')}/api/seed-database` + await httpPost(url, { database_id: dbId, name: `gate_${dbId}`, description: 'agentic shot', sql_content: sql }, { + 'content-type': 'application/json', + }) + server._database_id = dbId +} + +export async function deleteDb(server: GymServer): Promise { + if (!server._database_id) return + const url = `${server.mcp_server_url.replace(/\/$/, '')}/api/delete-database` + await httpPost(url, { database_id: server._database_id }, { 'content-type': 'application/json' }, 'DELETE').catch(() => {}) +} + +/** Fetch the gym's MCP tool schemas and project them into OpenAI function-tools, filtered to the + * task's selected_tools (the allow-list the agent may call). */ +export async function loadTools(server: GymServer, selected: string[]): Promise { + const url = `${server.mcp_server_url.replace(/\/$/, '')}/mcp` + const raw = await httpPost(url, { jsonrpc: '2.0', id: 1, method: 'tools/list', params: {} }, headersFor(server)) + const body = parseBody(raw) as { result?: { tools?: Array<{ name: string; description?: string; inputSchema?: Record }> } } + const allow = new Set(selected) + const tools: OpenAiTool[] = [] + for (const t of body.result?.tools ?? []) { + if (!allow.has(t.name)) continue + tools.push({ + type: 'function', + function: { + name: t.name, + ...(t.description ? { description: t.description.slice(0, 1024) } : {}), + parameters: (t.inputSchema as Record) ?? { type: 'object', properties: {} }, + }, + }) + } + return tools +} + +export async function callTool(server: GymServer, name: string, args: Record): Promise { + const url = `${server.mcp_server_url.replace(/\/$/, '')}/mcp` + const raw = await httpPost( + url, + { jsonrpc: '2.0', id: Date.now(), method: 'tools/call', params: { name, arguments: args } }, + headersFor(server), + ) + const body = parseBody(raw) as { result?: { content?: Array<{ text?: string }>; isError?: boolean }; error?: unknown } + if (body.error) return `ERROR: ${JSON.stringify(body.error).slice(0, 400)}` + const content = body.result?.content?.map((c) => c.text ?? '').join('\n') ?? JSON.stringify(body.result ?? {}) + return content.slice(0, 4000) +} + +/** Run every verifier. A verifier whose SQL the server rejects (malformed query in the task data — + * e.g. a column that doesn't exist) is EXCLUDED from `total`, never counted as a fail: that is a + * data defect, not the agent's. `total` is the count that actually ran; a task where every + * verifier errors is unscoreable (`total === 0`) and the caller skips it. */ +export async function runVerifiers(task: GymTask): Promise<{ passes: number; total: number; errored: number }> { + let passes = 0 + let total = 0 + let errored = 0 + for (const v of task.verifiers) { + const server = task.servers.find((s) => s.mcp_server_name === v.gym_name) ?? task.servers[0]! + const url = `${server.mcp_server_url.replace(/\/$/, '')}/api/sql-runner` + try { + const raw = await httpPost(url, { query: v.validation_config.query, database_id: server._database_id }, headersFor(server)) + const body = parseBody(raw) as { data?: Array> } + const row = body.data?.[0] + const actual = row ? Object.values(row)[0] : undefined + total += 1 + if (compare(actual, v.validation_config.expected_value, v.validation_config.comparison_type)) passes += 1 + } catch (e) { + // A transport failure (server down) IS fatal; a 4xx (bad verifier SQL) is excluded. + if (e instanceof HttpError && e.status >= 400 && e.status < 500) errored += 1 + else throw e + } + } + return { passes, total, errored } +} + +function compare(actual: unknown, expected: unknown, cmp: string): boolean { + const a = Number(actual) + const e = Number(expected) + const numeric = !Number.isNaN(a) && !Number.isNaN(e) + if (cmp === 'equals') return numeric ? a === e : String(actual) === String(expected) + if (cmp === 'greater_than') return a > e + if (cmp === 'less_than') return a < e + if (cmp === 'contains') return String(actual).includes(String(expected)) + throw new Error(`unsupported comparison_type ${cmp}`) +} + +export interface RunGymShotOptions { + routerBaseUrl: string + routerKey: string + model: string + gymDbsDir: string + maxTurns?: number + temperature?: number +} + +const taskNudge = + 'Use the available tools to bring the system to the required final state. Address EVERY distinct ' + + 'change the request implies, not just the first. After each tool result, check what remains and ' + + 'continue. Re-read the values you set back to confirm they took. Only reply DONE once every ' + + 'required change is made and verified — partial completion is a failure.' + +interface TurnCounts { + completions: number + toolCalls: number + toolErrors: number +} + +/** Run the agent's tool-calling loop until it stops calling tools or hits `maxTurns`. Mutates the + * carried `messages` (the live session) and the server's seeded DB. One router completion per + * iteration — `completions` is the compute unit. Shared by the breadth shot and the depth driver. */ +async function runAgenticTurns( + server: GymServer, + messages: Array>, + tools: OpenAiTool[], + opts: RunGymShotOptions, + maxTurns: number, +): Promise { + let completions = 0 + let toolCalls = 0 + let toolErrors = 0 + for (let t = 0; t < maxTurns; t += 1) { + const res = await fetch(`${opts.routerBaseUrl.replace(/\/$/, '')}/chat/completions`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${opts.routerKey}` }, + body: JSON.stringify({ model: opts.model, messages, tools, tool_choice: 'auto', temperature: opts.temperature ?? 0.7 }), + }) + if (!res.ok) throw new Error(`router ${res.status}: ${(await res.text()).slice(0, 200)}`) + completions += 1 + const data = (await res.json()) as { choices?: Array<{ message?: { content?: string; tool_calls?: ToolCall[] } }> } + const msg = data.choices?.[0]?.message + if (!msg) break + const calls = msg.tool_calls ?? [] + messages.push({ role: 'assistant', content: msg.content ?? '', ...(calls.length ? { tool_calls: calls } : {}) }) + if (calls.length === 0) break // the agent stopped calling tools + for (const call of calls) { + toolCalls += 1 + let args: Record = {} + try { + args = JSON.parse(call.function.arguments || '{}') + } catch { + toolErrors += 1 + } + let result: string + try { + result = await callTool(server, call.function.name, args) + if (result.startsWith('ERROR:')) toolErrors += 1 + } catch (e) { + toolErrors += 1 + result = `ERROR: ${e instanceof Error ? e.message : String(e)}` + } + messages.push({ role: 'tool', tool_call_id: call.id, content: result }) + } + } + return { completions, toolCalls, toolErrors } +} + +/** Run ONE breadth rollout (a stateless attempt from the seed): seed → tool-calling loop → verify → + * cleanup. `turns` reports the router-completion count (the compute unit). */ +export async function runGymAgentShot(task: GymTask, opts: RunGymShotOptions): Promise { + const server = task.servers[0]! + await seed(server, opts.gymDbsDir) + try { + const tools = await loadTools(server, task.selectedTools) + const messages: Array> = [ + { role: 'system', content: task.systemPrompt }, + { role: 'user', content: `${task.userPrompt}\n\n${taskNudge}` }, + ] + const c = await runAgenticTurns(server, messages, tools, opts, opts.maxTurns ?? 12) + const { passes, total, errored } = await runVerifiers(task) + return { + passes, + total, + score: total > 0 ? passes / total : 0, + resolved: total > 0 && passes === total, + turns: c.completions, + toolCalls: c.toolCalls, + toolErrors: c.toolErrors, + verifierErrors: errored, + unscoreable: total === 0, + } + } finally { + await deleteDb(server) + } +} + +/** + * The trace-analyst (selector ≠ judge firewall): reads ONLY the agent's trajectory + the task + * statement — never the verifier SQL or expected values — and reports what required work still + * appears unfinished. Steering off this keeps the driver's signal trace-derived, not judge-derived. + */ +async function analyzeTrajectory( + task: GymTask, + messages: Array>, + opts: RunGymShotOptions, +): Promise { + const trace = messages + .filter((m) => m.role === 'assistant' || m.role === 'tool') + .map((m) => { + if (m.role === 'tool') return `RESULT ${String(m.content).slice(0, 280)}` + const calls = (m.tool_calls as ToolCall[] | undefined) + ?.map((c) => `${c.function.name}(${c.function.arguments})`) + .join(', ') + return calls ? `CALL ${calls}` : `SAY ${String(m.content).slice(0, 200)}` + }) + .join('\n') + .slice(0, 7000) + const sys = + "You audit an agent's work. From ONLY the task and the agent's tool-call trajectory below, list " + + 'every required change that does NOT yet appear done or verified. Judge from observed tool ' + + 'RESULTS, not from the agent\'s intent. Be specific. If everything required appears done and ' + + 'verified, reply exactly COMPLETE.' + const res = await fetch(`${opts.routerBaseUrl.replace(/\/$/, '')}/chat/completions`, { + method: 'POST', + headers: { 'content-type': 'application/json', authorization: `Bearer ${opts.routerKey}` }, + body: JSON.stringify({ + model: opts.model, + messages: [ + { role: 'system', content: sys }, + { role: 'user', content: `TASK:\n${task.userPrompt}\n\nTRAJECTORY:\n${trace}\n\nWhat required work is still unfinished?` }, + ], + temperature: 0.3, + }), + }) + if (!res.ok) throw new Error(`analyst router ${res.status}: ${(await res.text()).slice(0, 160)}`) + const data = (await res.json()) as { choices?: Array<{ message?: { content?: string } }> } + return data.choices?.[0]?.message?.content ?? '' +} + +export interface DriverRolloutResult extends GymShotResult { + /** Driver shots taken (agent-work + analyst-steer cycles). */ + shots: number + /** Total router completions (agent turns + analyst calls) — the compute unit for equal-k. */ + completions: number + analystCalls: number +} + +/** + * A DEPTH rollout: ONE persistent session (the same seeded DB and the same conversation carried + * forward) driven over multiple shots. Each shot the agent works a bounded number of turns; between + * shots the trace-analyst reads the trajectory and the driver injects a steer toward what's + * unfinished, then the session RESUMES — shot n stands on shot n-1's DB state + history. Stops when + * the analyst returns COMPLETE or the shot budget is spent. This is continuation (workspace + info + * carried), the thing the fresh-box steering test never exercised. + */ +export async function runGymDriverRollout( + task: GymTask, + opts: RunGymShotOptions, + cfg: { maxShots: number; innerTurns: number }, +): Promise { + const server = task.servers[0]! + await seed(server, opts.gymDbsDir) + let completions = 0 + let toolCalls = 0 + let toolErrors = 0 + let analystCalls = 0 + let shots = 0 + try { + const tools = await loadTools(server, task.selectedTools) + const messages: Array> = [ + { role: 'system', content: task.systemPrompt }, + { role: 'user', content: `${task.userPrompt}\n\n${taskNudge}` }, + ] + for (shots = 0; shots < cfg.maxShots; shots += 1) { + const c = await runAgenticTurns(server, messages, tools, opts, cfg.innerTurns) + completions += c.completions + toolCalls += c.toolCalls + toolErrors += c.toolErrors + if (shots === cfg.maxShots - 1) break + const findings = await analyzeTrajectory(task, messages, opts) + analystCalls += 1 + completions += 1 + if (/^\s*COMPLETE\b/i.test(findings)) break + messages.push({ + role: 'user', + content: + `A reviewer audited your work so far and flagged unfinished items:\n${findings}\n\n` + + 'Address each one using the tools, verify the values took, then continue.', + }) + } + const { passes, total, errored } = await runVerifiers(task) + return { + passes, + total, + score: total > 0 ? passes / total : 0, + resolved: total > 0 && passes === total, + turns: completions, + toolCalls, + toolErrors, + verifierErrors: errored, + unscoreable: total === 0, + shots: shots + 1, + completions, + analystCalls, + } + } finally { + await deleteDb(server) + } +}