Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- CreateEnum
CREATE TYPE "PrFixType" AS ENUM ('MERGE_CONFLICT', 'CI_FAILURE', 'REVIEW_FEEDBACK', 'OTHER');

-- AlterTable: Add type column to PrFixQueueItem
ALTER TABLE "PrFixQueueItem" ADD COLUMN "type" "PrFixType" NOT NULL DEFAULT 'OTHER';

-- CreateIndex: Priority ordering for queue consumption
CREATE INDEX "PrFixQueueItem_type_idx" ON "PrFixQueueItem"("type");
CREATE INDEX "PrFixQueueItem_status_type_idx" ON "PrFixQueueItem"("status", "type");
12 changes: 12 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,22 @@ enum PrFixStatus {
IGNORED
}

enum PrFixType {
MERGE_CONFLICT
CI_FAILURE
REVIEW_FEEDBACK
OTHER
}


model PrFixQueueItem {
id String @id @default(cuid())
repo String
pr Int
issue Int?
branch String?
lane PrFixLane @default(NORMAL)
type PrFixType @default(OTHER)
reason String @db.Text
feedback String[] @default([])
evidenceKeys String[] @default([])
Expand All @@ -109,6 +118,9 @@ model PrFixQueueItem {
@@index([status])
@@index([lane])
@@index([status, lane])

@@index([type])
@@index([status, type])
}

model PrFixHistory {
Expand Down
3 changes: 2 additions & 1 deletion src/app/api/pr-fix-queue/queued/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export async function GET(request: Request) {
const { searchParams } = new URL(request.url);
const lane = searchParams.get("lane");
const includeBlocked = searchParams.get("include_blocked") === "true";
const prioritizeByType = searchParams.get("prioritize_by_type") !== "false"; // default true

if (lane) {
const normalized = lane.trim().toUpperCase().replace(/-/g, "_");
Expand All @@ -21,7 +22,7 @@ export async function GET(request: Request) {
}
}

const items = await listQueuedPrFixItems(asPrFixQueueClient(prisma), { lane, includeBlocked });
const items = await listQueuedPrFixItems(asPrFixQueueClient(prisma), { lane, includeBlocked, prioritizeByType });
return NextResponse.json(items);
} catch (error) {
console.error("Failed to list PR fix queue:", error);
Expand Down
29 changes: 28 additions & 1 deletion src/app/api/pr-followup/sync/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { NextRequest, NextResponse } from "next/server";
import { prisma, asPrFixQueueClient } from "@/lib/prisma";
import { authorizeRequest } from "@/lib/auth";
import { getTrackedRepos } from "@/lib/config";
import { processPrFollowupEvents, isAllowedBotAuthor } from "@/lib/pr-followup-ingestion";
import { processPrFollowupEvents, isAllowedBotAuthor, ingestMergeConflict, clearResolvedConflictItems } from "@/lib/pr-followup-ingestion";

/**
* PR Follow-up Sync Endpoint (Pull-based)
Expand Down Expand Up @@ -32,6 +32,7 @@ interface GithubPR {
merged_at: string | null;
draft: boolean;
mergeable_state?: string;
mergeable?: string; // "CONFLICTING", "MERGEABLE", "UNKNOWN"
}

/**
Expand Down Expand Up @@ -227,6 +228,32 @@ export async function POST(request: NextRequest) {
linkedIssue,
});
}

// Detect merge conflicts (CONFLICTING mergeable status)
if (pr.mergeable && pr.mergeable.toUpperCase() === "CONFLICTING") {
const conflictKey = await ingestMergeConflict(asPrFixQueueClient(prisma), {
repoFullName,
prNumber: pr.number,
branch: pr.head.ref ?? null,
url: pr.url,
title: pr.title,
author: pr.user.login,
mergeable: pr.mergeable,
linkedIssue,
});
if (conflictKey) {
totalEnqueued++;
}
} else {
// Clear resolved conflict items if PR is no longer conflicting
if (pr.mergeable) {
await clearResolvedConflictItems(asPrFixQueueClient(prisma), {
repoFullName,
prNumber: pr.number,
mergeable: pr.mergeable,
});
}
}
}
}

Expand Down
32 changes: 27 additions & 5 deletions src/lib/pr-fix-queue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { normalizePrFixLane, normalizePrFixStatus, PrFixLane, PrFixStatus } from "@/types";
import { normalizePrFixLane, normalizePrFixStatus, normalizePrFixType, PrFixLane, PrFixStatus, PrFixType, PR_FIX_TYPE_PRIORITY } from "@/types";

export type PrFixQueueClient = {
prFixQueueItem: {
Expand All @@ -17,6 +17,7 @@ export interface EnqueuePrFixInput {
repo: string;
pr: number;
lane?: string | null;
type?: string | null;
reason: string;
feedback: string;
evidenceKey: string;
Expand Down Expand Up @@ -52,6 +53,7 @@ export function parseEnqueuePrFixInput(body: unknown): EnqueuePrFixInput | { err
repo: input.repo.trim(),
pr: Number(input.pr),
lane: typeof input.lane === "string" ? input.lane : undefined,
type: typeof input.type === "string" ? input.type : undefined,
reason: input.reason.trim(),
feedback: input.feedback.trim(),
evidenceKey: input.evidenceKey.trim(),
Expand Down Expand Up @@ -105,6 +107,7 @@ function metadataPatch(input: EnqueuePrFixInput): Record<string, string | number

export async function enqueuePrFixItem(client: PrFixQueueClient, input: EnqueuePrFixInput) {
const lane = normalizePrFixLane(input.lane);
const type = normalizePrFixType(input.type);
const status: PrFixStatus = lane === "NEEDS_HUMAN" ? "BLOCKED" : "QUEUED";

return client.$transaction(async (tx) => {
Expand All @@ -114,6 +117,7 @@ export async function enqueuePrFixItem(client: PrFixQueueClient, input: EnqueueP
where: { id: existing.id },
data: {
lane,
type,
status,
reason: input.reason,
feedback: uniqueAppend(existing.feedback ?? [], input.feedback, 12),
Expand All @@ -132,6 +136,7 @@ export async function enqueuePrFixItem(client: PrFixQueueClient, input: EnqueueP
repo: input.repo,
pr: input.pr,
lane,
type,
status,
reason: input.reason,
feedback: [input.feedback],
Expand All @@ -146,13 +151,28 @@ export async function enqueuePrFixItem(client: PrFixQueueClient, input: EnqueueP
});
}

export async function listQueuedPrFixItems(client: PrFixQueueClient, options: { lane?: string | null; includeBlocked?: boolean } = {}) {
export async function listQueuedPrFixItems(client: PrFixQueueClient, options: { lane?: string | null; includeBlocked?: boolean; prioritizeByType?: boolean } = {}) {
const lane = options.lane ? normalizePrFixLane(options.lane) : undefined;
const status = options.includeBlocked ? { in: ["QUEUED", "BLOCKED"] } : "QUEUED";
return client.prFixQueueItem.findMany({

const items = await client.prFixQueueItem.findMany({
where: { status, ...(lane ? { lane } : {}) },
orderBy: [{ queuedAt: "asc" }, { repo: "asc" }, { pr: "asc" }],
});

// Sort by type priority first, then by queuedAt
if (options.prioritizeByType !== false) {
items.sort((a, b) => {
const aPriority = PR_FIX_TYPE_PRIORITY[normalizePrFixType(a.type)] ?? 3;
const bPriority = PR_FIX_TYPE_PRIORITY[normalizePrFixType(b.type)] ?? 3;
if (aPriority !== bPriority) return aPriority - bPriority;
// Within same type, oldest first
return new Date(a.queuedAt).getTime() - new Date(b.queuedAt).getTime();
});
} else {
items.sort((a, b) => new Date(a.queuedAt).getTime() - new Date(b.queuedAt).getTime());
}

return items;
}

export async function markPrFixItem(client: PrFixQueueClient, input: MarkPrFixInput) {
Expand All @@ -168,8 +188,10 @@ export async function markPrFixItem(client: PrFixQueueClient, input: MarkPrFixIn
}

export function toAgentQueuePrFixItem(item: any) {
const fixType = normalizePrFixType(item.type);
return {
type: "pr-review-fix",
fixType,
id: item.id,
repo: item.repo,
pr: item.pr,
Expand All @@ -186,6 +208,6 @@ export function toAgentQueuePrFixItem(item: any) {
author: item.author,
queuedAt: item.queuedAt,
updatedAt: item.updatedAt,
rankingReason: "queued PR review-fix item",
rankingReason: `queued PR review-fix item (${fixType})`,
};
}
105 changes: 104 additions & 1 deletion src/lib/pr-followup-ingestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export function classifyFeedback(content: string): FeedbackClassification {
* Format: {eventType}:{source}:{identifier}
*/
export function computeEvidenceKey(
eventType: "comment" | "review" | "check_run" | "merge_state",
eventType: "comment" | "review" | "check_run" | "merge_state" | "merge_conflict",
sourceId: string, // comment ID, review ID, check run ID
repoFullName: string,
prNumber: number,
Expand Down Expand Up @@ -202,6 +202,7 @@ export async function ingestCommentEvent(
repo: opts.repoFullName,
pr: opts.prNumber,
lane,
type: "REVIEW_FEEDBACK",
reason: `PR comment: ${classification === "needs_human" ? "ambiguous feedback" : "actionable feedback"}`,
feedback: opts.commentBody,
evidenceKey,
Expand Down Expand Up @@ -251,6 +252,7 @@ export async function ingestReviewEvent(
repo: opts.repoFullName,
pr: opts.prNumber,
lane,
type: "REVIEW_FEEDBACK",
reason: `PR review: CHANGES_REQUESTED`,
feedback: opts.reviewBody,
evidenceKey,
Expand Down Expand Up @@ -302,6 +304,7 @@ export async function ingestCheckRunEvent(
repo: opts.repoFullName,
pr: opts.prNumber,
lane,
type: "CI_FAILURE",
reason: `Failing check: ${opts.checkName} (${opts.conclusion})`,
feedback: opts.checkDetails ?? `Check "${opts.checkName}" concluded ${opts.conclusion}`,
evidenceKey,
Expand Down Expand Up @@ -343,10 +346,16 @@ export async function ingestMergeStateEvent(

const evidenceKey = computeEvidenceKey("merge_state", opts.mergeStateStatus, opts.repoFullName, opts.prNumber);

// Determine type based on merge state status
const mergeType = ["dirty", "conflicting"].includes(opts.mergeStateStatus.toLowerCase())
? "MERGE_CONFLICT"
: "OTHER";

await enqueuePrFixItem(client, {
repo: opts.repoFullName,
pr: opts.prNumber,
lane: "NORMAL",
type: mergeType,
reason: `Merge state change: ${opts.mergeStateStatus}`,
feedback: `PR merge state is now ${opts.mergeStateStatus}`,
evidenceKey,
Expand All @@ -360,6 +369,100 @@ export async function ingestMergeStateEvent(
return evidenceKey;
}


// ─── Merge Conflict Detection ───────────────────────────────────────────────

/**
* Detect and enqueue merge conflict items for PRs with mergeable=CONFLICTING.
* This is the primary function for surfacing merge conflicts as PR review-fix items.
*
* Returns the evidence key if a new item was enqueued, null if skipped (not conflicting,
* not eligible, or already queued).
*/
export async function ingestMergeConflict(
client: PrFixQueueClient,
opts: {
repoFullName: string;
prNumber: number;
branch: string | null;
url: string;
title: string;
author: string | null;
mergeable: string; // "CONFLICTING", "MERGEABLE", "UNKNOWN"
linkedIssue?: number | null;
},
): Promise<string | null> {
// Only CONFLICTING PRs trigger merge conflict items
if (opts.mergeable.toUpperCase() !== "CONFLICTING") return null;

// Check author eligibility
if (!isAllowedBotAuthor(opts.author)) return null;

// Check repo owner eligibility
if (!isAllowedBranchOwner(opts.repoFullName)) return null;

const evidenceKey = computeEvidenceKey("merge_conflict", "conflicting", opts.repoFullName, opts.prNumber);

await enqueuePrFixItem(client, {
repo: opts.repoFullName,
pr: opts.prNumber,
lane: "NORMAL",
type: "MERGE_CONFLICT",
reason: `Merge conflict detected: PR is CONFLICTING`,
feedback: `PR has merge conflicts and needs rebase. Use \`git rebase\` to resolve, not patch.`,
evidenceKey,
issue: opts.linkedIssue,
branch: opts.branch,
url: opts.url,
title: opts.title,
author: opts.author,
});

return evidenceKey;
}

/**
* Clear merge conflict items for PRs that are no longer conflicting.
* This handles idempotent cleanup — items are marked FIXED when the PR
* becomes mergeable or is closed.
*/
export async function clearResolvedConflictItems(
client: PrFixQueueClient,
opts: {
repoFullName: string;
prNumber: number;
mergeable: string; // "MERGEABLE", "CONFLICTING", "UNKNOWN"
},
): Promise<boolean> {
// Only clear when PR is no longer conflicting
if (opts.mergeable.toUpperCase() === "CONFLICTING") return false;

const existing = await client.prFixQueueItem.findUnique({
where: { repo_pr: { repo: opts.repoFullName, pr: opts.prNumber } },
});

if (!existing) return false;

// Only clear if it's a merge conflict item and still queued
if (existing.type !== "MERGE_CONFLICT" || existing.status !== "QUEUED") return false;

await client.prFixQueueItem.update({
where: { id: existing.id },
data: { status: "FIXED" },
});

await client.prFixHistory.create({
data: {
itemId: existing.id,
action: "mark",
status: "FIXED",
note: `PR is now ${opts.mergeable} — conflict resolved`,
},
});

return true;
}

// ─── Bulk Sync ──────────────────────────────────────────────────────────────

/**
Expand Down
30 changes: 30 additions & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,36 @@ export type PrFixStatus = "QUEUED" | "FIXED" | "BLOCKED" | "STALE" | "IGNORED";

export const VALID_PR_FIX_LANES: PrFixLane[] = ["NORMAL", "ESCALATED", "NEEDS_HUMAN"];
export const VALID_PR_FIX_STATUSES: PrFixStatus[] = ["QUEUED", "FIXED", "BLOCKED", "STALE", "IGNORED"];
// ─── PR Fix Type Constants ───────────────────────────────────────────────────

export type PrFixType = "MERGE_CONFLICT" | "CI_FAILURE" | "REVIEW_FEEDBACK" | "OTHER";

export const VALID_PR_FIX_TYPES: PrFixType[] = ["MERGE_CONFLICT", "CI_FAILURE", "REVIEW_FEEDBACK", "OTHER"];

export function isValidPrFixType(type: string): type is PrFixType {
return VALID_PR_FIX_TYPES.includes(type as PrFixType);
}

export function normalizePrFixType(type?: string | null): PrFixType {
if (!type) return "OTHER";
const normalized = type.trim().toUpperCase().replace(/-/g, "_");
if (normalized === "MERGE_CONFLICT" || normalized === "MERGECONFLICT") return "MERGE_CONFLICT";
if (normalized === "CI_FAILURE" || normalized === "CIFAILURE") return "CI_FAILURE";
if (normalized === "REVIEW_FEEDBACK" || normalized === "REVIEWFEEDBACK") return "REVIEW_FEEDBACK";
return isValidPrFixType(normalized) ? normalized : "OTHER";
}

/**
* Priority ordering for PR fix queue items.
* Lower number = higher priority.
*/
export const PR_FIX_TYPE_PRIORITY: Record<PrFixType, number> = {
MERGE_CONFLICT: 0,
CI_FAILURE: 1,
REVIEW_FEEDBACK: 2,
OTHER: 3,
};


export function isValidPrFixLane(lane: string): lane is PrFixLane {
return VALID_PR_FIX_LANES.includes(lane as PrFixLane);
Expand Down