From 9c3de9b5539c18f25352b6b065c986aad7b73076 Mon Sep 17 00:00:00 2001 From: Anton Date: Sat, 13 Jun 2026 16:54:31 +0200 Subject: [PATCH 1/9] feat(join): dup-fallback routing + force knob + counter (no auto-trigger yet) --- src/ops/internal.h | 2 ++ src/ops/join.c | 22 +++++++++++++++++++++- test/test_join_buildside.c | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/src/ops/internal.h b/src/ops/internal.h index a886ab2c..71179028 100644 --- a/src/ops/internal.h +++ b/src/ops/internal.h @@ -622,6 +622,8 @@ extern bool ray_expr_disable; extern bool ray_opt_no_group_pushdown; extern bool ray_join_no_build_swap; extern uint64_t ray_join_build_swaps; +extern bool ray_join_force_dup_fallback; +extern uint64_t ray_join_dup_fallbacks; void ray_expr_stats_init(void); #define EXPR_MAX_REGS 16 diff --git a/src/ops/join.c b/src/ops/join.c index 1af0999e..1bbecdd4 100644 --- a/src/ops/join.c +++ b/src/ops/join.c @@ -31,6 +31,11 @@ bool ray_join_no_build_swap = false; /* Diagnostic: how many radix inner-joins built on the smaller (left) side. */ uint64_t ray_join_build_swaps = 0; +/* Test knob: force every radix join to fall back to the chained path, so the + * differential harness can compare radix-build vs chained-build on ordinary data. */ +bool ray_join_force_dup_fallback = false; +/* Diagnostic: radix joins that fell back due to pathological key duplication. */ +uint64_t ray_join_dup_fallbacks = 0; /* ── Hash helper (shared by radix and chained HT join paths) ──────────── */ @@ -444,6 +449,10 @@ static inline bool join_keys_eq(ray_t* const* l_vecs, ray_t* const* r_vecs, uint #define RADIX_HT_EMPTY UINT32_MAX +/* A per-partition open-addressing build whose linear-probe run exceeds this + * is pathologically duplicated (O(dup²) build); abort to the chained path. */ +#define RADIX_DUP_RUN_MAX 512 + /* Per-partition single-pass build+probe context. * Each partition writes to its own local output buffer, then results * are consolidated into contiguous arrays afterward. */ @@ -463,6 +472,7 @@ typedef struct { uint32_t* pp_cap; /* capacity per partition */ _Atomic(uint8_t)* matched_right; _Atomic(uint8_t) had_error; /* set by any partition on OOM */ + _Atomic(uint8_t) pathological; /* set on long-run duplication or forced */ } join_radix_bp_ctx_t; /* Grow per-partition output buffers (matched pair arrays). @@ -506,6 +516,13 @@ static void join_radix_build_probe_fn(void* raw, uint32_t wid, int64_t task_star join_radix_part_t* rp = &c->r_parts[p]; join_radix_part_t* lp = &c->l_parts[p]; + /* Test knob: force the chained-path fallback. Bail before allocating + * anything (pp headers are still NULL → cleanup-safe). */ + if (ray_join_force_dup_fallback) { + atomic_store_explicit(&c->pathological, 1, memory_order_relaxed); + return; + } + if (rp->count == 0) { /* No right rows — emit unmatched left rows for LEFT/FULL */ if (c->join_type >= 1 && lp->count > 0) { @@ -995,6 +1012,7 @@ ray_t* exec_join(ray_graph_t* g, ray_op_t* op, ray_t* left_table, ray_t* right_t .part_counts = part_counts, .pp_cap = pp_cap, .matched_right = matched_right, .had_error = 0, + .pathological = 0, }; if (pool && n_rparts > 1) ray_pool_dispatch_n(pool, join_radix_build_probe_fn, &bp_ctx, n_rparts); @@ -1005,7 +1023,8 @@ ray_t* exec_join(ray_graph_t* g, ray_op_t* op, ray_t* left_table, ray_t* right_t /* Check cancellation and errors during build+probe */ bool bp_cancelled = pool_cancelled(pool); bool bp_error = atomic_load_explicit(&bp_ctx.had_error, memory_order_relaxed); - if (bp_cancelled || bp_error) { + bool bp_pathological = atomic_load_explicit(&bp_ctx.pathological, memory_order_relaxed); + if (bp_cancelled || bp_error || bp_pathological) { /* Free all per-partition buffers */ for (uint32_t rp2 = 0; rp2 < n_rparts; rp2++) { if (r_parts[rp2].entries_hdr) scratch_free(r_parts[rp2].entries_hdr); @@ -1018,6 +1037,7 @@ ray_t* exec_join(ray_graph_t* g, ray_op_t* op, ray_t* left_table, ray_t* right_t if (matched_right_hdr) { scratch_free(matched_right_hdr); matched_right_hdr = NULL; } matched_right = NULL; if (bp_cancelled) return ray_error("cancel", NULL); + if (bp_pathological) ray_join_dup_fallbacks++; goto chained_ht_fallback; } diff --git a/test/test_join_buildside.c b/test/test_join_buildside.c index 86d004c2..0e952745 100644 --- a/test/test_join_buildside.c +++ b/test/test_join_buildside.c @@ -604,9 +604,42 @@ static test_result_t test_jb_left_bigger_no_swap(void) { return rr; } +/* ── Forced dup-fallback on ordinary data ────────────────────────────────── + * The force knob makes every radix partition bail to the chained path before + * allocating anything. On ordinary low-duplication data the chained-build + * result must be multiset-identical to the radix-build result, AND the + * dup-fallback counter must advance (proving the routing fired). + * ──────────────────────────────────────────────────────────────────────── */ +static test_result_t test_jb_force_fallback_ordinary(void) { + ray_heap_init(); + (void)ray_sym_init(); + + int64_t n_r = RAY_PARALLEL_THRESHOLD + 5000, n_l = 4000; /* right > threshold → radix */ + int64_t* rv = malloc((size_t)n_r*sizeof(int64_t)); + int64_t* lv = malloc((size_t)n_l*sizeof(int64_t)); + TEST_ASSERT(rv && lv, "malloc key arrays"); + for (int64_t i=0;i before; + ray_join_force_dup_fallback = false; /* → radix */ + ray_t* radix = jb_inner_join(lt,"lk",rt,"rk"); + test_result_t rr = jb_results_equal(chained, radix); + if (rr.status == TEST_PASS && !fired) + rr = (test_result_t){ TEST_FAIL, "forced dup-fallback did not fire" }; + ray_release(chained); ray_release(radix); ray_release(lt); ray_release(rt); + free(lv); free(rv); ray_sym_destroy(); ray_heap_destroy(); + return rr; +} + /* ── Entry table ─────────────────────────────────────────────────────────── */ const test_entry_t join_buildside_entries[] = { + { "join_buildside/force_fallback_ordinary", test_jb_force_fallback_ordinary, NULL, NULL }, { "join_buildside/baseline_radix_inner", test_jb_baseline_radix_inner, NULL, NULL }, { "join_buildside/swap_inner_matches", test_jb_swap_inner_matches, NULL, NULL }, { "join_buildside/many_to_many", test_jb_many_to_many, NULL, NULL }, From 9c84a35fcafdcd22797a22f24a287f94a336699f Mon Sep 17 00:00:00 2001 From: Anton Date: Sat, 13 Jun 2026 17:22:02 +0200 Subject: [PATCH 2/9] feat(join): auto-fallback to chained path on pathological build-side duplication --- src/ops/join.c | 10 +++++++++- test/test_join_buildside.c | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/src/ops/join.c b/src/ops/join.c index 1bbecdd4..3eef83e2 100644 --- a/src/ops/join.c +++ b/src/ops/join.c @@ -596,8 +596,16 @@ static void join_radix_build_probe_fn(void* raw, uint32_t wid, int64_t task_star uint32_t slot = h & ht_mask; if (i + 4 < rp->count) __builtin_prefetch(&ht[(rp->entries[i + 4].hash & ht_mask) * 2], 1, 1); - while (ht[slot * 2 + 1] != RADIX_HT_EMPTY) + uint32_t run = 0; + while (ht[slot * 2 + 1] != RADIX_HT_EMPTY) { slot = (slot + 1) & ht_mask; + if (++run > RADIX_DUP_RUN_MAX) { + /* Pathological duplication — abort to the chained path. + * `done:` frees ht_hdr and leaves pp buffers cleanup-safe. */ + atomic_store_explicit(&c->pathological, 1, memory_order_relaxed); + goto done; + } + } ht[slot * 2] = h; ht[slot * 2 + 1] = rp->entries[i].row_idx; } diff --git a/test/test_join_buildside.c b/test/test_join_buildside.c index 0e952745..56789dcc 100644 --- a/test/test_join_buildside.c +++ b/test/test_join_buildside.c @@ -636,10 +636,45 @@ static test_result_t test_jb_force_fallback_ordinary(void) { return rr; } +/* A join whose BUILD side has heavy per-key duplication (run > RADIX_DUP_RUN_MAX) + * must auto-fall-back to the chained path and produce correct results. + * + * The radix path builds on the SMALLER side (INNER build-side swap), so to trip + * the build-loop run counter the duplication must live on that smaller side: + * the left side here has only 4 distinct keys (~2000 rows/key → run > 512), and + * stays smaller than the right (so it is the build side and the right > the + * parallel threshold to select the radix path). The right side is low-dup with + * few matching keys, keeping the PRE-FIX quadratic build and the join output + * small (sub-0.1s, no catastrophic blow-up). */ +static test_result_t test_jb_auto_fallback_dup(void) { + ray_heap_init(); + (void)ray_sym_init(); + int64_t n_r = RAY_PARALLEL_THRESHOLD + 5000, n_l = 8000; + int64_t* rv = malloc(n_r*sizeof(int64_t)); int64_t* lv = malloc(n_l*sizeof(int64_t)); + TEST_ASSERT(rv && lv, "malloc"); + for (int64_t i=0;i 512 → trips */ + ray_t* rt = jb_table1("rk", rv, n_r); + ray_t* lt = jb_table1("lk", lv, n_l); + uint64_t before = ray_join_dup_fallbacks; + ray_t* got = jb_inner_join(lt,"lk",rt,"rk"); /* knob off; auto-trip expected */ + bool fired = ray_join_dup_fallbacks > before; + ray_join_force_dup_fallback = true; /* oracle: forced chained */ + ray_t* oracle = jb_inner_join(lt,"lk",rt,"rk"); + ray_join_force_dup_fallback = false; + test_result_t rr = jb_results_equal(got, oracle); + if (rr.status == TEST_PASS && !fired) + rr = (test_result_t){ TEST_FAIL, "expected auto dup-fallback to fire" }; + ray_release(got); ray_release(oracle); ray_release(lt); ray_release(rt); + free(lv); free(rv); ray_sym_destroy(); ray_heap_destroy(); + return rr; +} + /* ── Entry table ─────────────────────────────────────────────────────────── */ const test_entry_t join_buildside_entries[] = { { "join_buildside/force_fallback_ordinary", test_jb_force_fallback_ordinary, NULL, NULL }, + { "join_buildside/auto_fallback_dup", test_jb_auto_fallback_dup, NULL, NULL }, { "join_buildside/baseline_radix_inner", test_jb_baseline_radix_inner, NULL, NULL }, { "join_buildside/swap_inner_matches", test_jb_swap_inner_matches, NULL, NULL }, { "join_buildside/many_to_many", test_jb_many_to_many, NULL, NULL }, From 7f059e380e362a6cb5a3f00898084aac74cd1eb1 Mon Sep 17 00:00:00 2001 From: Anton Date: Sat, 13 Jun 2026 17:45:18 +0200 Subject: [PATCH 3/9] =?UTF-8?q?test(join):=20dup-fallback=20edges=20?= =?UTF-8?q?=E2=80=94=20left/full,=20inner-no-swap,=20no-trip=20control,=20?= =?UTF-8?q?boundary,=20not-sticky?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/test_join_buildside.c | 301 +++++++++++++++++++++++++++++++++++++ 1 file changed, 301 insertions(+) diff --git a/test/test_join_buildside.c b/test/test_join_buildside.c index 56789dcc..f24f9a38 100644 --- a/test/test_join_buildside.c +++ b/test/test_join_buildside.c @@ -89,6 +89,39 @@ static ray_t* jb_inner_join(ray_t* lt, const char* lkey, return result; } +/* ── Join-type-parameterized helper ──────────────────────────────────────── + * jb_join: identical to jb_inner_join but threads an explicit join_type + * (0=INNER, 1=LEFT, 2=FULL) into ray_join. Used by the dup-fallback edge + * fixtures, which must exercise LEFT/FULL (where build is always the right + * side) as well as INNER. jb_results_equal already tolerates NULL cells + * from LEFT/FULL unmatched rows. + * ──────────────────────────────────────────────────────────────────────── */ +static ray_t* jb_join(ray_t* lt, const char* lkey, + ray_t* rt, const char* rkey, uint8_t join_type) { + ray_graph_t* g = ray_graph_new(lt); + if (!g) return ray_error("oom", "jb_join: graph alloc"); + + ray_op_t* lt_node = ray_const_table(g, lt); + ray_op_t* rt_node = ray_const_table(g, rt); + ray_op_t* lk_op = ray_scan(g, lkey); + ray_op_t* rk_op = ray_scan(g, rkey); + + if (!lt_node || !rt_node || !lk_op || !rk_op) { + ray_graph_free(g); + return ray_error("oom", "jb_join: node alloc"); + } + + ray_op_t* lk_arr[1] = { lk_op }; + ray_op_t* rk_arr[1] = { rk_op }; + ray_op_t* jn = ray_join(g, lt_node, lk_arr, rt_node, rk_arr, 1, join_type); + if (!jn) { ray_graph_free(g); return ray_error("oom", "jb_join: join node"); } + + jn = ray_optimize(g, jn); + ray_t* result = ray_execute(g, jn); + ray_graph_free(g); + return result; +} + /* ── Two-column table helper ────────────────────────────────────────────── * jb_table2: allocate a two-column I64 table with column names n0/n1. * v0[]/v1[] must have `n` elements each. Caller owns the returned table. @@ -289,6 +322,39 @@ static test_result_t jb_results_equal(ray_t* a, ray_t* b) { return result; } +/* ── Dup-fallback differential wrapper ────────────────────────────────────── + * jb_diff_dup: run a join with the force knob OFF (auto path); assert the + * dup-fallback counter advanced iff expect_trip. Then run with the force + * knob ON (forced-chained oracle) and assert multiset equality. Reused by + * every dup-fallback edge fixture. Does NOT init/destroy the heap — caller + * owns the session (so not_sticky can chain two joins in one heap). + * ──────────────────────────────────────────────────────────────────────── */ +static test_result_t jb_diff_dup(ray_t* lt, const char* lkey, + ray_t* rt, const char* rkey, + uint8_t join_type, bool expect_trip) { + uint64_t before = ray_join_dup_fallbacks; + ray_join_force_dup_fallback = false; /* auto path */ + ray_t* got = jb_join(lt, lkey, rt, rkey, join_type); + bool fired = ray_join_dup_fallbacks > before; + + ray_join_force_dup_fallback = true; /* forced-chained oracle */ + ray_t* oracle = jb_join(lt, lkey, rt, rkey, join_type); + ray_join_force_dup_fallback = false; + + if (!got || RAY_IS_ERR(got) || !oracle || RAY_IS_ERR(oracle)) { + ray_release(got); ray_release(oracle); + return (test_result_t){ TEST_FAIL, "jb_diff_dup: join returned error" }; + } + + test_result_t rr = jb_results_equal(got, oracle); + if (rr.status == TEST_PASS && fired != expect_trip) + rr = (test_result_t){ TEST_FAIL, + expect_trip ? "expected dup-fallback to trip" + : "dup-fallback tripped unexpectedly" }; + ray_release(got); ray_release(oracle); + return rr; +} + /* ── Baseline test ───────────────────────────────────────────────────────── * Build a right-side table larger than RAY_PARALLEL_THRESHOLD to trigger * the radix path. Run the join twice: once with the no-swap knob set @@ -670,6 +736,235 @@ static test_result_t test_jb_auto_fallback_dup(void) { return rr; } +/* ── LEFT join, duplicated build side ────────────────────────────────────── + * For LEFT (join_type=1) the build-side swap never fires (swap is INNER-only), + * so the BUILD side is ALWAYS the physical right. To enter the radix path the + * right must be > RAY_PARALLEL_THRESHOLD; to trip the dup run-length guard the + * right (build) must be heavily duplicated. + * + * right = 70536 rows, key i%64 → ~1102 rows/key → run > 512 → trips. + * left = 4000 rows: a small matched subset (keys 0..7, even i) plus a large + * left-only remainder (key i%64+100) → unmatched LEFT rows emitted + * with NULL right cell. The narrow matched key set keeps the join + * output bounded (~280K rows) so the forced-chained oracle stays fast. + * expect_trip = true. Build side = right (70536, dup). + * ──────────────────────────────────────────────────────────────────────── */ +static test_result_t test_jb_dup_left_join(void) { + ray_heap_init(); + (void)ray_sym_init(); + + int64_t n_r = RAY_PARALLEL_THRESHOLD + 5000; /* 70536 > threshold → radix */ + int64_t n_l = 4000; + int64_t* rv = malloc((size_t)n_r * sizeof(int64_t)); + int64_t* lv = malloc((size_t)n_l * sizeof(int64_t)); + TEST_ASSERT(rv && lv, "malloc key arrays"); + for (int64_t i = 0; i < n_r; i++) rv[i] = i % 64; /* build: ~1102/key → trips */ + for (int64_t i = 0; i < n_l; i++) + lv[i] = (i < 256) ? (i % 8) : (i % 64 + 100); /* 256 matched (keys 0..7) + left-only */ + + ray_t* rt = jb_table1("rk", rv, n_r); + ray_t* lt = jb_table1("lk", lv, n_l); + test_result_t rr = jb_diff_dup(lt, "lk", rt, "rk", /*join_type=*/1, /*expect_trip=*/true); + + ray_release(lt); ray_release(rt); + free(lv); free(rv); + ray_sym_destroy(); ray_heap_destroy(); + return rr; +} + +/* ── FULL join, duplicated build side ────────────────────────────────────── + * Same shape as dup_left_join but join_type=2 (FULL OUTER). Build side is the + * physical right (70536, key i%64 → ~1102/key → trips). Left has a narrow + * matched subset (keys 0..7) plus left-only keys (i%64+100); right keys 8..63 + * have no left match, so both unmatched-left and unmatched-right rows appear. + * expect_trip = true. Build side = right (70536, dup). + * ──────────────────────────────────────────────────────────────────────── */ +static test_result_t test_jb_dup_full_join(void) { + ray_heap_init(); + (void)ray_sym_init(); + + int64_t n_r = RAY_PARALLEL_THRESHOLD + 5000; + int64_t n_l = 4000; + int64_t* rv = malloc((size_t)n_r * sizeof(int64_t)); + int64_t* lv = malloc((size_t)n_l * sizeof(int64_t)); + TEST_ASSERT(rv && lv, "malloc key arrays"); + for (int64_t i = 0; i < n_r; i++) rv[i] = i % 64; /* build: ~1102/key → trips */ + for (int64_t i = 0; i < n_l; i++) + lv[i] = (i < 256) ? (i % 8) : (i % 64 + 100); /* 256 matched (0..7) + left-only */ + + ray_t* rt = jb_table1("rk", rv, n_r); + ray_t* lt = jb_table1("lk", lv, n_l); + test_result_t rr = jb_diff_dup(lt, "lk", rt, "rk", /*join_type=*/2, /*expect_trip=*/true); + + ray_release(lt); ray_release(rt); + free(lv); free(rv); + ray_sym_destroy(); ray_heap_destroy(); + return rr; +} + +/* ── INNER join, no-swap, duplicated build side ──────────────────────────── + * With ray_join_no_build_swap = true the INNER swap is suppressed, so the + * BUILD side is the physical right. right = 70536 key i%64 (~1102/key, trips), + * left = 4000 key i%8 (probe, narrow matched set to bound output). Knob reset + * after the run. expect_trip = true. Build side = right (70536, dup). + * ──────────────────────────────────────────────────────────────────────── */ +static test_result_t test_jb_dup_inner_no_swap(void) { + ray_heap_init(); + (void)ray_sym_init(); + + int64_t n_r = RAY_PARALLEL_THRESHOLD + 5000; + int64_t n_l = 4000; + int64_t* rv = malloc((size_t)n_r * sizeof(int64_t)); + int64_t* lv = malloc((size_t)n_l * sizeof(int64_t)); + TEST_ASSERT(rv && lv, "malloc key arrays"); + for (int64_t i = 0; i < n_r; i++) rv[i] = i % 64; /* build: ~1102/key → trips */ + for (int64_t i = 0; i < n_l; i++) + lv[i] = (i < 256) ? (i % 8) : (i % 8 + 1000); /* 256 matched, rest non-matching */ + + ray_t* rt = jb_table1("rk", rv, n_r); + ray_t* lt = jb_table1("lk", lv, n_l); + + ray_join_no_build_swap = true; /* build = right */ + test_result_t rr = jb_diff_dup(lt, "lk", rt, "rk", /*join_type=*/0, /*expect_trip=*/true); + ray_join_no_build_swap = false; /* reset */ + + ray_release(lt); ray_release(rt); + free(lv); free(rv); + ray_sym_destroy(); ray_heap_destroy(); + return rr; +} + +/* ── INNER join, low duplication, no trip ────────────────────────────────── + * The zero-regression correctness path: a large near-unique build side never + * produces a probe run > RADIX_DUP_RUN_MAX, so the auto path stays on radix + * (counter does NOT advance) and still matches the forced-chained oracle. + * + * Both sides near-unique so whichever becomes the build side after the INNER + * swap is fine: right = 70536 key i%70000 (~1/key), left = 4000 key i%4000. + * expect_trip = false. Build side = left (4000) after swap, near-unique. + * ──────────────────────────────────────────────────────────────────────── */ +static test_result_t test_jb_no_trip_low_dup(void) { + ray_heap_init(); + (void)ray_sym_init(); + + int64_t n_r = RAY_PARALLEL_THRESHOLD + 5000; + int64_t n_l = 4000; + int64_t* rv = malloc((size_t)n_r * sizeof(int64_t)); + int64_t* lv = malloc((size_t)n_l * sizeof(int64_t)); + TEST_ASSERT(rv && lv, "malloc key arrays"); + for (int64_t i = 0; i < n_r; i++) rv[i] = i % 70000; /* near-unique */ + for (int64_t i = 0; i < n_l; i++) lv[i] = i % 4000; /* unique */ + + ray_t* rt = jb_table1("rk", rv, n_r); + ray_t* lt = jb_table1("lk", lv, n_l); + test_result_t rr = jb_diff_dup(lt, "lk", rt, "rk", /*join_type=*/0, /*expect_trip=*/false); + + ray_release(lt); ray_release(rt); + free(lv); free(rv); + ray_sym_destroy(); ray_heap_destroy(); + return rr; +} + +/* ── Trip boundary ───────────────────────────────────────────────────────── + * Two siblings straddling RADIX_DUP_RUN_MAX (512), both via INNER no-swap so + * the build side is the physical right (the dup side). The build loop's run + * counter tracks the open-addressing linear-probe run length, which for M + * duplicates of one key reaches ~M (same-hash duplicates chain from the same + * start slot) PLUS inter-key collisions within the partition — so the run can + * exceed M somewhat. Sizes are chosen with margin on both sides: + * trips: right = 70536 key i%64 → ~1102/key → run ≫ 512 → trips. + * no-trip: right = 70536 key i%2000 → ~35/key → run ≪ 512 → no trip. + * Both must match the forced-chained oracle; counter fires only on the dup case. + * Build side = right (70536) for both. + * ──────────────────────────────────────────────────────────────────────── */ +static test_result_t test_jb_trip_boundary(void) { + ray_heap_init(); + (void)ray_sym_init(); + + int64_t n_r = RAY_PARALLEL_THRESHOLD + 5000; /* 70536 */ + int64_t n_l = 4000; + int64_t* rv = malloc((size_t)n_r * sizeof(int64_t)); + int64_t* lv = malloc((size_t)n_l * sizeof(int64_t)); + TEST_ASSERT(rv && lv, "malloc key arrays"); + for (int64_t i = 0; i < n_l; i++) lv[i] = i % 2000; /* probe, low dup */ + + ray_join_no_build_swap = true; /* build = right */ + + /* Above boundary: ~1102/key → trips. */ + for (int64_t i = 0; i < n_r; i++) rv[i] = i % 64; + ray_t* rt_hi = jb_table1("rk", rv, n_r); + ray_t* lt_hi = jb_table1("lk", lv, n_l); + test_result_t rr = jb_diff_dup(lt_hi, "lk", rt_hi, "rk", /*join_type=*/0, /*expect_trip=*/true); + ray_release(lt_hi); ray_release(rt_hi); + + /* Below boundary: ~35/key → no trip. */ + if (rr.status == TEST_PASS) { + for (int64_t i = 0; i < n_r; i++) rv[i] = i % 2000; + ray_t* rt_lo = jb_table1("rk", rv, n_r); + ray_t* lt_lo = jb_table1("lk", lv, n_l); + rr = jb_diff_dup(lt_lo, "lk", rt_lo, "rk", /*join_type=*/0, /*expect_trip=*/false); + ray_release(lt_lo); ray_release(rt_lo); + } + + ray_join_no_build_swap = false; /* reset */ + free(lv); free(rv); + ray_sym_destroy(); ray_heap_destroy(); + return rr; +} + +/* ── Not sticky ──────────────────────────────────────────────────────────── + * The pathological flag is per-join (reset each exec_join), not a sticky + * global. Run a tripping join then a non-tripping join in the SAME heap + * session and assert the dup-fallback counter advanced by EXACTLY 1. + * + * Both INNER no-swap (build = right). Trip: right key i%64 (~1102/key). + * No-trip: right key i%70000 (near-unique). Probe key i%8 (narrow, bounded + * output). + * ──────────────────────────────────────────────────────────────────────── */ +static test_result_t test_jb_not_sticky(void) { + ray_heap_init(); + (void)ray_sym_init(); + + int64_t n_r = RAY_PARALLEL_THRESHOLD + 5000; + int64_t n_l = 4000; + int64_t* rv = malloc((size_t)n_r * sizeof(int64_t)); + int64_t* lv = malloc((size_t)n_l * sizeof(int64_t)); + TEST_ASSERT(rv && lv, "malloc key arrays"); + for (int64_t i = 0; i < n_l; i++) + lv[i] = (i < 256) ? (i % 8) : (i % 8 + 1000); /* 256 matched, rest non-matching */ + + ray_join_no_build_swap = true; /* build = right */ + uint64_t before = ray_join_dup_fallbacks; + + /* Pathological run first. */ + for (int64_t i = 0; i < n_r; i++) rv[i] = i % 64; /* ~1102/key → trips */ + ray_t* rt_a = jb_table1("rk", rv, n_r); + ray_t* lt_a = jb_table1("lk", lv, n_l); + ray_t* got_a = jb_join(lt_a, "lk", rt_a, "rk", 0); + ray_release(got_a); ray_release(lt_a); ray_release(rt_a); + + /* Low-dup run second, same session. */ + for (int64_t i = 0; i < n_r; i++) rv[i] = i % 70000; /* no trip */ + ray_t* rt_b = jb_table1("rk", rv, n_r); + ray_t* lt_b = jb_table1("lk", lv, n_l); + ray_t* got_b = jb_join(lt_b, "lk", rt_b, "rk", 0); + ray_release(got_b); ray_release(lt_b); ray_release(rt_b); + + uint64_t advanced = ray_join_dup_fallbacks - before; + + ray_join_no_build_swap = false; /* reset */ + free(lv); free(rv); + ray_sym_destroy(); ray_heap_destroy(); + + if (advanced != 1) { + snprintf(ray_test_fail_buf, sizeof ray_test_fail_buf, + "expected dup-fallback counter to advance by exactly 1, got %llu", + (unsigned long long)advanced); + return (test_result_t){ TEST_FAIL, ray_test_fail_buf }; + } + return (test_result_t){ TEST_PASS, NULL }; +} + /* ── Entry table ─────────────────────────────────────────────────────────── */ const test_entry_t join_buildside_entries[] = { @@ -684,5 +979,11 @@ const test_entry_t join_buildside_entries[] = { { "join_buildside/near_equal_no_swap", test_jb_near_equal_no_swap, NULL, NULL }, { "join_buildside/multi_key", test_jb_multi_key, NULL, NULL }, { "join_buildside/left_bigger_no_swap", test_jb_left_bigger_no_swap, NULL, NULL }, + { "join_buildside/dup_left_join", test_jb_dup_left_join, NULL, NULL }, + { "join_buildside/dup_full_join", test_jb_dup_full_join, NULL, NULL }, + { "join_buildside/dup_inner_no_swap", test_jb_dup_inner_no_swap, NULL, NULL }, + { "join_buildside/no_trip_low_dup", test_jb_no_trip_low_dup, NULL, NULL }, + { "join_buildside/trip_boundary", test_jb_trip_boundary, NULL, NULL }, + { "join_buildside/not_sticky", test_jb_not_sticky, NULL, NULL }, { NULL, NULL, NULL, NULL }, }; From 9b2785859f8c083fa854930ec922c7e33fb00516 Mon Sep 17 00:00:00 2001 From: Anton Date: Sat, 13 Jun 2026 18:01:40 +0200 Subject: [PATCH 4/9] feat(join): bypass knob to disable dup-fallback for perf measurement --- src/ops/internal.h | 2 ++ src/ops/join.c | 7 ++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/ops/internal.h b/src/ops/internal.h index 71179028..90c3fb48 100644 --- a/src/ops/internal.h +++ b/src/ops/internal.h @@ -623,6 +623,8 @@ extern bool ray_opt_no_group_pushdown; extern bool ray_join_no_build_swap; extern uint64_t ray_join_build_swaps; extern bool ray_join_force_dup_fallback; +/* perf-gate bypass: disable the auto dup-fallback to measure the pre-fix O(dup²) build */ +extern bool ray_join_no_dup_fallback; extern uint64_t ray_join_dup_fallbacks; void ray_expr_stats_init(void); diff --git a/src/ops/join.c b/src/ops/join.c index 3eef83e2..5e5a7e2c 100644 --- a/src/ops/join.c +++ b/src/ops/join.c @@ -34,6 +34,11 @@ uint64_t ray_join_build_swaps = 0; /* Test knob: force every radix join to fall back to the chained path, so the * differential harness can compare radix-build vs chained-build on ordinary data. */ bool ray_join_force_dup_fallback = false; +/* Perf-gate bypass: disable the auto dup-fallback so the differential harness + * can measure the pre-fix O(dup²) build in the same binary. Independent of + * ray_join_force_dup_fallback (which forces the fallback); this disables the + * auto-trip only. */ +bool ray_join_no_dup_fallback = false; /* Diagnostic: radix joins that fell back due to pathological key duplication. */ uint64_t ray_join_dup_fallbacks = 0; @@ -599,7 +604,7 @@ static void join_radix_build_probe_fn(void* raw, uint32_t wid, int64_t task_star uint32_t run = 0; while (ht[slot * 2 + 1] != RADIX_HT_EMPTY) { slot = (slot + 1) & ht_mask; - if (++run > RADIX_DUP_RUN_MAX) { + if (++run > RADIX_DUP_RUN_MAX && !ray_join_no_dup_fallback) { /* Pathological duplication — abort to the chained path. * `done:` frees ht_hdr and leaves pp buffers cleanup-safe. */ atomic_store_explicit(&c->pathological, 1, memory_order_relaxed); From 87e35b6748eec4dee1a1adaefed4996823b859ac Mon Sep 17 00:00:00 2001 From: Anton Date: Sat, 13 Jun 2026 18:21:36 +0200 Subject: [PATCH 5/9] bench: join dup-fallback perf gate --- .gitignore | 1 + Makefile | 11 +- bench/bottleneck/join_dup_fallback_compare.md | 204 ++++++++ bench/join_dup/main.c | 442 ++++++++++++++++++ 4 files changed, 657 insertions(+), 1 deletion(-) create mode 100644 bench/bottleneck/join_dup_fallback_compare.md create mode 100644 bench/join_dup/main.c diff --git a/.gitignore b/.gitignore index d6b30bbc..b44b9e15 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,7 @@ bench-alloc bench-group-pushdown bench-idx-route bench-join-buildside +bench-join-dup # Rayforce REPL history .rayhist.dat diff --git a/Makefile b/Makefile index 97e80173..32b1bd70 100644 --- a/Makefile +++ b/Makefile @@ -132,6 +132,15 @@ bench-join-buildside: bench/join_buildside/main.c $(LIB_SRC) $(LIBS) $(RELEASE_LDFLAGS) ./bench-join-buildside +# Join dup-fallback perf gate. +# Measures post-fix (auto dup-fallback to chained build) vs pre-fix (O(dup²) +# build via the ray_join_no_dup_fallback bypass knob) on catastrophic, +# zero-regression, and moderate-dup cases. Sanitizer-free. +bench-join-dup: + $(CC) $(RELEASE_CFLAGS) $(DEFS) $(INCLUDES) -o bench-join-dup \ + bench/join_dup/main.c $(LIB_SRC) $(LIBS) $(RELEASE_LDFLAGS) + ./bench-join-dup + # Tests. Depends on $(TARGET) because test/rfl/system/ipc_diff.rfl # spawns ./$(TARGET) as an IPC server via .sys.exec — both binaries # must exist on disk and share the build flavour (sanitizers, coverage). @@ -185,7 +194,7 @@ clean: -rm -f cov-*.profraw default.profraw coverage.profdata -rm -rf coverage_html -.PHONY: default debug release lib bench-alloc bench-join-buildside test coverage clean +.PHONY: default debug release lib bench-alloc bench-join-buildside bench-join-dup test coverage clean # Header dependencies last: .d fragments only add prerequisites to the # object targets above, and being last they can't hijack the default goal. diff --git a/bench/bottleneck/join_dup_fallback_compare.md b/bench/bottleneck/join_dup_fallback_compare.md new file mode 100644 index 00000000..d99055af --- /dev/null +++ b/bench/bottleneck/join_dup_fallback_compare.md @@ -0,0 +1,204 @@ +# Join dup-fallback perf gate — post-fix vs pre-fix + +Measurement record for the radix-join duplicate-key build fallback. **No +verdict here** — the controller makes the call. This document records the +environment, the sanitizer-free proof, per-case medians (post-fix vs pre-fix), +the zero-regression control delta, the moderate-dup trip-or-not finding, the +mechanism counters, and the raw reps. + +## What is being measured + +The radix per-partition open-addressing build trips `RADIX_DUP_RUN_MAX` (512, +in `src/ops/join.c`) when a linear-probe run grows too long, sets the +`pathological` flag, and falls back to the chained-HT path (O(n) build). The +diagnostic counter is `ray_join_dup_fallbacks`. + +- **post-fix** = auto-fallback ON (`ray_join_no_dup_fallback = false`): the + build trips on pathological duplication and falls back to the chained path. +- **pre-fix** = auto-fallback DISABLED via the bypass knob + (`ray_join_no_dup_fallback = true`): the build loop does **not** trip and + runs the old quadratic O(dup²) open-addressing build. This reproduces the + pre-fix behaviour in the same binary for a differential measurement. + +The two knobs are independent: `ray_join_force_dup_fallback` *forces* the +fallback; `ray_join_no_dup_fallback` *disables the auto-trip*. The gate uses +only the latter. + +Driver: `bench/join_dup/main.c`, target `make bench-join-dup`. Mirrors +`bench/join_buildside` (release flags, CLOCK_MONOTONIC around `ray_execute` +only, tables built once, graph rebuilt per rep, medians, interleaved post/pre). + +## Environment + +- Host: Intel Core i7-6700 @ 3.40 GHz, 8 logical cores, Linux 6.8.0. +- Uptime at run: 38 days; load average at the recorded run **1.16** (quiet box; + earlier attempts at load ~2-4 were discarded). +- Build: `RELEASE_CFLAGS` = `-O3 -march=native -funroll-loops` etc. **No + sanitizers** — `-fsanitize=...` appears only in `DEBUG_CFLAGS`/`DEBUG_LDFLAGS`; + the `bench-join-dup` recipe uses `RELEASE_CFLAGS`/`RELEASE_LDFLAGS` only. + Commit: `9b278585` (`feat(join): bypass knob to disable dup-fallback ...`). +- `NREPS=9`, `PREFIX_SLOW_REPS=3`, `RADIX_DUP_RUN_MAX=512`, + `RAY_PARALLEL_THRESHOLD=65536`. + +### Sanitizer-free proof + +The `bench-join-dup` Makefile recipe compiles with `$(RELEASE_CFLAGS)` / +`$(RELEASE_LDFLAGS)`; the only `-fsanitize` occurrences in the Makefile are in +`DEBUG_CFLAGS` (line ~25) and `DEBUG_LDFLAGS` (line ~57), neither of which the +release/bench path references. The compile command logged for the recorded run +contains no `-fsanitize` token. + +## Fixtures + +Output cardinality is deliberately bounded so the **build** cost — not output +materialisation — is what is timed. (An all-key-7 ⋈ all-key-7 inner join would +emit ~10^11 rows.) The catastrophic build side carries `CAT_DUP = 60000` rows +of the duplicated key 7 (one pathological partition → O(dup²) pre-fix build in +the few-seconds range; all 10M sharing one key would be ~10^14 ops / hours), +with the remaining rows distinct. Probe sides use distinct **non-matching** +(negative) keys except where a bounded match is wanted. + +| case | build side | probe side | join | output | +|------|-----------|-----------|------|--------| +| CATASTROPHIC-INNER | right 10M, 60K dup key 7 (`no_swap`) | left 10K nonmatch | INNER | 0 | +| CATASTROPHIC-LEFT | right 10M, 60K dup key 7 | left 100K nonmatch | LEFT | 100K | +| ZERO-REGRESSION | right 10M unique (`no_swap`) | left 10K matching | INNER | 10K | +| MODERATE-DUP-100 | right 10M key i%100000 (~100/key, `no_swap`) | left 10K nonmatch | INNER | 0 | +| MODERATE-DUP-10 | right 10M key i%1000000 (~10/key, `no_swap`) | left 10K nonmatch | INNER | 0 | + +`no_swap` = `ray_join_no_build_swap = true` forces the build onto the duplicated +big right side (INNER would otherwise swap to build the smaller left). LEFT +always builds the right side. + +## Per-case medians (recorded run, load 1.16) + +``` +case side reps median_ms min_ms fb_delta rows_out +-------------------- -------- ----- -------------- ------------ ------------ ---------- +CATASTROPHIC-INNER post-fix 9 157.114 154.019 9 0 + pre-fix 3 1689.741 1646.636 0 0 + pre/post median speedup = 10.75x (delta = +1532.626 ms) +CATASTROPHIC-LEFT post-fix 9 162.287 153.018 9 100000 + pre-fix 3 1585.564 1570.288 0 100000 + pre/post median speedup = 9.77x (delta = +1423.276 ms) +ZERO-REGRESSION post-fix 9 54.997 53.177 0 10000 + pre-fix 9 56.676 52.374 0 10000 + pre/post median speedup = 1.03x (delta = +1.679 ms) +MODERATE-DUP-100 post-fix 9 222.924 215.589 9 0 + pre-fix 9 178.034 173.373 0 0 + pre/post median speedup = 0.80x (delta = -44.890 ms) +MODERATE-DUP-10 post-fix 9 70.611 67.465 0 0 + pre-fix 9 70.925 65.627 0 0 + pre/post median speedup = 1.00x (delta = +0.314 ms) +``` + +## Findings + +### Catastrophic cases (headline) + +- **CATASTROPHIC-INNER**: post-fix 157.1 ms vs pre-fix 1689.7 ms → + **10.75x** speedup (delta +1532.6 ms). Auto-fallback trips on all 9 post-fix + reps; pre-fix (bypass) never trips and pays the O(dup²) build. +- **CATASTROPHIC-LEFT** (new coverage; LEFT always builds right): post-fix + 162.3 ms vs pre-fix 1585.6 ms → **9.77x** speedup (delta +1423.3 ms). Trips + 9/9 post-fix. + +Pre-fix reps are capped at 3 (each ~1.6 s); medians are over those 3 reps and +are tight (INNER pre-fix raw: 1646.6 / 1722.9 / 1689.7; LEFT pre-fix raw: +1723.9 / 1585.6 / 1570.3). At `CAT_DUP = 60000` the pre-fix build is ~1.6 s, +not the worst-case hours a full-10M single-key partition would cost; the +speedup scales super-linearly with the duplicate count, so this is a lower +bound on the real-world catastrophic win. + +### Zero-regression control (THE regression check) + +- post-fix 54.997 ms vs pre-fix 56.676 ms → **delta +1.679 ms (1.03x)**; + min-of-9 essentially tied (53.177 vs 52.374). Unique keys → run length 1 → + **0 trips on both sides** over 9 reps each. The added `++run` increment and + the `&& !ray_join_no_dup_fallback` branch in the build loop cost ~nothing + (within rep-to-rep noise). **No regression.** + +### MODERATE-DUP finding (the key tuning question) + +- **MODERATE-DUP-100 (~100 rows/key build side): TRIPS PREMATURELY.** + post-fix trips on **all 9 reps** (counter +9), pre-fix 0. Because the trip + fired, post-fix actually ran the *chained* path and was **slower** than the + radix build: post-fix 222.9 ms vs pre-fix (radix) 178.0 ms → **0.80x** + (post-fix +44.9 ms slower). So at ~100 rows/key the radix build is the + better path, yet the threshold-512 trip diverts it to the chained path. This + is the inter-key slot-collision merge effect noted in the plan: runs from + distinct keys colliding into the same slot region merge into a single run + that crosses 512 even though no single key has 512 duplicates. +- **MODERATE-DUP-10 (~10 rows/key build side): stays radix, no premature + trip.** 0 trips both sides; post-fix 70.6 ms vs pre-fix 70.9 ms → **1.00x**. + +**Where does the threshold start tripping?** Between ~10/key (clean, no trip) +and ~100/key (trips on every rep). At 10M rows the i%100000 layout (~100/key) +already merges collided runs past 512; i%1000000 (~10/key) does not. The +controller should decide whether `RADIX_DUP_RUN_MAX = 512` is too low — the +data shows a real, repeatable ~20% pessimization at ~100/key from a premature +trip. (See "open question" below.) + +### Open question for the controller + +The premature trip at ~100/key costs ~20% on that workload. Raising +`RADIX_DUP_RUN_MAX` would push the trip point higher (fewer false trips at +moderate dup) but also lets the O(dup²) build run longer before bailing on a +truly catastrophic key. The right threshold trades "false trip pessimization at +moderate dup" against "wasted quadratic work before the trip fires on a +pathological key". This gate does not pick a number; it quantifies the +pessimization (~20% at ~100/key, 0% at ~10/key). + +## Mechanism counters (`ray_join_dup_fallbacks` deltas) + +``` + CATASTROPHIC-INNER post-fix trips=9 (over 9 reps) pre-fix trips=0 (over 3 reps) + CATASTROPHIC-LEFT post-fix trips=9 (over 9 reps) pre-fix trips=0 (over 3 reps) + ZERO-REGRESSION post-fix trips=0 (over 9 reps) pre-fix trips=0 (over 9 reps) + MODERATE-DUP-100 post-fix trips=9 (over 9 reps) pre-fix trips=0 (over 9 reps) + MODERATE-DUP-10 post-fix trips=0 (over 9 reps) pre-fix trips=0 (over 9 reps) +``` + +The bench asserts (aborts on failure): catastrophic cases MUST trip on +post-fix; the bypass knob MUST NEVER trip on any pre-fix run; ZERO-REGRESSION +MUST NOT trip on post-fix; output cardinality MUST match between post-fix and +pre-fix per case. All assertions held. MODERATE-DUP trips are reported, not +fatal (a moderate trip is a finding, not a failure). + +## Raw per-rep (ms) — recorded run + +``` +CATASTROPHIC-INNER post-fix 274.522 180.032 160.607 160.558 154.733 157.114 155.195 156.845 154.019 + pre-fix 1646.636 1722.854 1689.741 +CATASTROPHIC-LEFT post-fix 153.018 179.249 169.712 165.443 156.114 162.287 178.747 156.854 158.000 + pre-fix 1723.889 1585.564 1570.288 +ZERO-REGRESSION post-fix 54.399 55.941 59.121 57.023 54.997 53.991 54.861 53.177 57.796 + pre-fix 57.364 61.260 54.656 53.978 52.374 54.837 56.736 56.676 58.386 +MODERATE-DUP-100 post-fix 218.622 222.924 225.888 237.473 226.248 226.465 220.627 215.589 221.830 + pre-fix 205.957 178.034 184.833 174.068 181.399 175.219 173.373 173.825 181.938 +MODERATE-DUP-10 post-fix 70.611 67.465 70.183 71.268 79.178 73.148 69.635 71.079 69.162 + pre-fix 72.167 72.890 71.644 68.296 68.295 68.505 65.627 71.800 70.925 +``` + +(The first post-fix rep of each catastrophic case is warm-up-inflated — +274.5 / 153.0 ms — but medians are taken over all 9 and are stable; see the +median column above.) + +## Stability — second run (load ~1.6–1.9) + +A confirmatory second run reproduced every finding (post-fix medians shown; +pre-fix omitted for brevity, same magnitudes): + +``` +CATASTROPHIC-INNER post-fix 139.630 ms pre/post speedup 11.33x trips 9/9 +CATASTROPHIC-LEFT post-fix 141.204 ms pre/post speedup 11.22x trips 9/9 +ZERO-REGRESSION post-fix 55.945 ms delta -2.950 ms (0.95x) trips 0/0 +MODERATE-DUP-100 post-fix 218.370 ms pre/post 0.81x (post +42.3 ms slower) trips 9/9 +MODERATE-DUP-10 post-fix 71.304 ms pre/post 1.11x trips 0/0 +``` + +Cross-run agreement: catastrophic speedups ~10–11x both runs; zero-regression +delta within ±3 ms (noise, both signs across runs); MODERATE-DUP-100 trips on +all 9 reps in both runs and is ~20% slower post-fix in both (0.80x / 0.81x); +MODERATE-DUP-10 never trips in either run. The premature-trip finding at +~100/key is fully repeatable. diff --git a/bench/join_dup/main.c b/bench/join_dup/main.c new file mode 100644 index 00000000..1ef450be --- /dev/null +++ b/bench/join_dup/main.c @@ -0,0 +1,442 @@ +/* Join dup-fallback perf gate. + * Build: make bench-join-dup + * + * The radix per-partition open-addressing build trips RADIX_DUP_RUN_MAX (512) + * when a linear-probe run grows too long, sets the `pathological` flag, and + * falls back to the chained-HT path (O(n) build). Without that trip the build + * is O(dup²) on a pathologically duplicated build side. + * + * This gate measures post-fix (auto-fallback ON) vs pre-fix (auto-fallback + * DISABLED via the ray_join_no_dup_fallback bypass knob) in one binary. + * + * Cases: + * CATASTROPHIC-INNER right=10M all key 7, left=10K all key 7, INNER, + * ray_join_no_build_swap=true → build the dup'd 10M right. + * post-fix trips → chained; pre-fix runs O(dup²). Headline. + * CATASTROPHIC-LEFT right=10M all key 7, left=100K, LEFT join (build=right + * always, no swap). New-coverage headline. + * ZERO-REGRESSION right=10M key i (unique), left=10K, INNER → never trips. + * post-fix ≈ pre-fix (the added ++run/branch costs ~0). + * Counter must stay UNCHANGED. THE regression check. + * MODERATE-DUP right=10M key i%100000 (~100/key) build side + * (ray_join_no_build_swap=true), left=10K, INNER. + * Must NOT trip prematurely (counter unchanged) and stay + * radix (post-fix ≈ pre-fix). If it trips at ~100/key, + * that is a FINDING (threshold may be too low). + * MODERATE-DUP-10 same but right key i%1000000 (~10/key), cleaner moderate. + * + * Mechanism: assert ray_join_dup_fallbacks advances on the catastrophic cases; + * for the control + moderate cases a trip is reported, not fatal (moderate trip + * is a tuning finding). + * + * Timing: CLOCK_MONOTONIC around ray_execute only. Tables built once outside + * the timed loop; graph rebuilt per rep. The pre-fix catastrophic build is + * SLOW (~seconds), so pre-fix reps on catastrophic cases are capped (PREFIX_SLOW + * _REPS); medians of fewer reps, noted in output. + */ +#if defined(__APPLE__) +# define _DARWIN_C_SOURCE +#else +# define _POSIX_C_SOURCE 200809L +#endif + +#include +#include "mem/heap.h" +#include "ops/ops.h" +#include "ops/internal.h" +#include "table/sym.h" +#include +#include +#include +#include +#include +#include + +/* ---------- timing ---------- */ +static double now_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (double)ts.tv_sec * 1e3 + (double)ts.tv_nsec * 1e-6; +} + +/* ---------- median/min (qsort on small N) ---------- */ +static int cmp_double(const void* a, const void* b) { + double x = *(const double*)a, y = *(const double*)b; + return (x > y) - (x < y); +} +static double medianN(double arr[], int n) { + double tmp[64]; + memcpy(tmp, arr, (size_t)n * sizeof(double)); + qsort(tmp, (size_t)n, sizeof(double), cmp_double); + return tmp[n / 2]; +} +static double minN(double arr[], int n) { + double m = arr[0]; + for (int i = 1; i < n; i++) if (arr[i] < m) m = arr[i]; + return m; +} + +/* ---------- build a single-column I64 table ---------- */ +static ray_t* make_table1(const char* name, const int64_t* vals, int64_t n) { + ray_t* col = ray_vec_from_raw(RAY_I64, vals, n); + if (!col || RAY_IS_ERR(col)) { + fprintf(stderr, "make_table1: ray_vec_from_raw failed (%s, n=%lld)\n", + name, (long long)n); + abort(); + } + ray_t* tbl = ray_table_new(1); + int64_t sym = ray_sym_intern(name, strlen(name)); + tbl = ray_table_add_col(tbl, sym, col); + ray_release(col); + if (!tbl || RAY_IS_ERR(tbl)) { + fprintf(stderr, "make_table1: table_add_col failed (%s)\n", name); + abort(); + } + return tbl; +} + +/* ---------- run one join rep, return exec wall ms + output rows ---------- */ +static double run_join_rep(ray_t* lt, ray_t* rt, int join_type, int64_t* rows_out) { + ray_graph_t* g = ray_graph_new(lt); + if (!g) { fprintf(stderr, "run_join_rep: graph alloc\n"); abort(); } + + ray_op_t* lt_node = ray_const_table(g, lt); + ray_op_t* rt_node = ray_const_table(g, rt); + ray_op_t* lk_op = ray_scan(g, "lk"); + ray_op_t* rk_op = ray_scan(g, "rk"); + if (!lt_node || !rt_node || !lk_op || !rk_op) { + fprintf(stderr, "run_join_rep: node alloc\n"); abort(); + } + + ray_op_t* lk_arr[1] = { lk_op }; + ray_op_t* rk_arr[1] = { rk_op }; + ray_op_t* jn = ray_join(g, lt_node, lk_arr, rt_node, rk_arr, 1, join_type); + if (!jn) { fprintf(stderr, "run_join_rep: join node\n"); abort(); } + + jn = ray_optimize(g, jn); + + double t0 = now_ms(); + ray_t* result = ray_execute(g, jn); + double t1 = now_ms(); + + if (!result || RAY_IS_ERR(result)) { + fprintf(stderr, "run_join_rep: execute returned error\n"); abort(); + } + if (result->type != RAY_TABLE) { + fprintf(stderr, "run_join_rep: result not a table (type=%d)\n", + result->type); abort(); + } + if (rows_out) *rows_out = ray_table_nrows(result); + ray_release(result); + ray_graph_free(g); + return t1 - t0; +} + +#define NREPS 9 /* normal rep count (post-fix, and non-slow pre-fix) */ +#define PREFIX_SLOW_REPS 3 /* capped reps for the SLOW pre-fix O(dup²) builds */ + +/* join_type encoding (matches ray_join): 0=inner, 1=left, ... */ +#define JT_INNER 0 +#define JT_LEFT 1 + +typedef struct { + const char* name; + int post_n; /* reps actually taken on post-fix side */ + int pre_n; /* reps actually taken on pre-fix side */ + double post_ms[NREPS]; /* auto-fallback ON (no_dup=false) */ + double pre_ms[NREPS]; /* auto-fallback OFF (no_dup=true) */ + int64_t rows_post; + int64_t rows_pre; + uint64_t fb_post_delta; /* dup-fallback counter delta over post-fix reps */ + uint64_t fb_pre_delta; /* dup-fallback counter delta over pre-fix reps */ + bool no_build_swap; /* knob: force build on right */ + int join_type; +} case_result_t; + +/* Run a case. post-fix uses NREPS; pre-fix uses pre_reps (capped for slow). */ +static void run_case(const char* name, ray_t* lt, ray_t* rt, + int join_type, bool no_build_swap, + int pre_reps, case_result_t* cr) { + cr->name = name; + cr->rows_post = -1; + cr->rows_pre = -1; + cr->no_build_swap = no_build_swap; + cr->join_type = join_type; + cr->post_n = NREPS; + cr->pre_n = pre_reps; + + printf("Running case %-20s (post-fix %d reps, pre-fix %d reps)...\n", + name, NREPS, pre_reps); + fflush(stdout); + + /* ---- post-fix: auto-fallback ON ---- */ + ray_join_no_dup_fallback = false; + uint64_t fb0 = ray_join_dup_fallbacks; + for (int rep = 0; rep < NREPS; rep++) { + ray_join_no_build_swap = no_build_swap; + int64_t rows = -1; + cr->post_ms[rep] = run_join_rep(lt, rt, join_type, &rows); + cr->rows_post = rows; + ray_join_no_build_swap = false; + } + cr->fb_post_delta = ray_join_dup_fallbacks - fb0; + + /* ---- pre-fix: auto-fallback DISABLED (O(dup²) on catastrophic) ---- */ + ray_join_no_dup_fallback = true; + uint64_t fb1 = ray_join_dup_fallbacks; + for (int rep = 0; rep < pre_reps; rep++) { + ray_join_no_build_swap = no_build_swap; + int64_t rows = -1; + cr->pre_ms[rep] = run_join_rep(lt, rt, join_type, &rows); + cr->rows_pre = rows; + ray_join_no_build_swap = false; + } + cr->fb_pre_delta = ray_join_dup_fallbacks - fb1; + ray_join_no_dup_fallback = false; + + printf(" dup-fallback counter: post-fix delta=%llu pre-fix delta=%llu\n", + (unsigned long long)cr->fb_post_delta, + (unsigned long long)cr->fb_pre_delta); + printf(" rows_out: post=%lld pre=%lld\n", + (long long)cr->rows_post, (long long)cr->rows_pre); + if (cr->rows_post != cr->rows_pre) { + fprintf(stderr, "CARDINALITY MISMATCH case %s: post=%lld pre=%lld\n", + name, (long long)cr->rows_post, (long long)cr->rows_pre); + abort(); + } + fflush(stdout); +} + +/* Key generators. Probe sides use NONMATCH/UNIQUE so the join OUTPUT stays + * bounded while the BUILD side carries the duplication under test — otherwise + * an all-key-7 ⋈ all-key-7 inner join emits 10^11 rows and the build cost is + * swamped by output materialisation. */ +#define GEN_UNIQUE 0 /* v[i] = i (distinct, ≥0) */ +#define GEN_CONST7 1 /* v[i] = 7 (all-dup) */ +#define GEN_NONMATCH (-2)/* v[i] = -1 - i (distinct, <0; never matches a ≥0 build key) */ +/* mod ≥ 2 → v[i] = i % mod */ +static int64_t* gen(int64_t n, int64_t mode) { + int64_t* v = (int64_t*)malloc((size_t)n * sizeof(int64_t)); + if (!v) { fprintf(stderr, "OOM gen(%lld)\n", (long long)n); abort(); } + if (mode == GEN_UNIQUE) { + for (int64_t i = 0; i < n; i++) v[i] = i; + } else if (mode == GEN_CONST7) { + for (int64_t i = 0; i < n; i++) v[i] = 7; + } else if (mode == GEN_NONMATCH) { + for (int64_t i = 0; i < n; i++) v[i] = -1 - i; + } else { + for (int64_t i = 0; i < n; i++) v[i] = i % mode; + } + return v; +} + +/* Build side for the CATASTROPHIC cases: n rows total, of which the first + * `dup` rows all share key 7 (one pathological partition → O(dup²) pre-fix + * build), and the remaining rows are distinct (key = 1000 + i, all ≠ 7 and + * disjoint from the negative non-matching probe keys). `dup` is kept in the + * tens-of-thousands so the pre-fix O(dup²) build runs in seconds, not hours + * (all 10M sharing key 7 would be ~10^14 ops). */ +#define CAT_DUP 60000L +static int64_t* gen_cat(int64_t n, int64_t dup) { + int64_t* v = (int64_t*)malloc((size_t)n * sizeof(int64_t)); + if (!v) { fprintf(stderr, "OOM gen_cat(%lld)\n", (long long)n); abort(); } + for (int64_t i = 0; i < n; i++) v[i] = (i < dup) ? 7 : (1000 + i); + return v; +} + +int main(void) { + ray_heap_init(); + (void)ray_sym_init(); + ray_join_no_build_swap = false; + ray_join_no_dup_fallback = false; + + printf("=== bench-join-dup (dup-fallback perf gate) ===\n"); + fflush(stdout); + +#if defined(__linux__) + { + FILE* f = fopen("/proc/loadavg", "r"); + if (f) { + char buf[128] = {0}; + if (fgets(buf, sizeof(buf), f)) { printf("load: %s", buf); fflush(stdout); } + fclose(f); + } + } +#endif + printf("NREPS=%d PREFIX_SLOW_REPS=%d RADIX_DUP_RUN_MAX=%d RAY_PARALLEL_THRESHOLD=%d\n\n", + NREPS, PREFIX_SLOW_REPS, 512, (int)RAY_PARALLEL_THRESHOLD); + fflush(stdout); + + const int64_t N10M = 10000000L; + + case_result_t cr[5]; + + /* ---- 0: CATASTROPHIC-INNER (headline) ---------------------------------- + * build = right 10M all key 7 (no_swap forces build on the dup'd side); + * probe = left 10K distinct negative keys → 0 matches → output 0. + * Output is bounded; the pathological BUILD is what we time. */ + printf("Building CATASTROPHIC-INNER tables (build=right 10M, %ld dup key 7, probe=left 10K nonmatch)...\n", + (long)CAT_DUP); + fflush(stdout); + { int64_t* rv = gen_cat(N10M, CAT_DUP); int64_t* lv = gen(10000L, GEN_NONMATCH); + ray_t* rt = make_table1("rk", rv, N10M); + ray_t* lt = make_table1("lk", lv, 10000L); + free(rv); free(lv); + run_case("CATASTROPHIC-INNER", lt, rt, JT_INNER, /*no_swap=*/true, + PREFIX_SLOW_REPS, &cr[0]); + ray_release(lt); ray_release(rt); + } + + /* ---- 1: CATASTROPHIC-LEFT (new-coverage headline) ---------------------- + * LEFT join always builds the right side (no swap). build = right 10M + * all key 7; probe = left 100K distinct negative keys → all unmatched → + * output = 100K rows (left + null right). Bounded. */ + printf("Building CATASTROPHIC-LEFT tables (build=right 10M, %ld dup key 7, probe=left 100K nonmatch)...\n", + (long)CAT_DUP); + fflush(stdout); + { int64_t* rv = gen_cat(N10M, CAT_DUP); int64_t* lv = gen(100000L, GEN_NONMATCH); + ray_t* rt = make_table1("rk", rv, N10M); + ray_t* lt = make_table1("lk", lv, 100000L); + free(rv); free(lv); + run_case("CATASTROPHIC-LEFT", lt, rt, JT_LEFT, /*no_swap=*/false, + PREFIX_SLOW_REPS, &cr[1]); + ray_release(lt); ray_release(rt); + } + + /* ---- 2: ZERO-REGRESSION (the regression check) ------------------------- + * build = right 10M unique (no_swap=true so the big side is built and the + * per-row ++run/branch runs 10M times); probe = left 10K = keys [0,10K) + * → 10K matches → output 10K. Unique keys → run length 1 → never trips. + * post-fix ≈ pre-fix proves the added branch costs ~nothing. */ + printf("Building ZERO-REGRESSION tables (build=right 10M unique, probe=left 10K matching)...\n"); + fflush(stdout); + { int64_t* rv = gen(N10M, GEN_UNIQUE); int64_t* lv = gen(10000L, GEN_UNIQUE); + ray_t* rt = make_table1("rk", rv, N10M); + ray_t* lt = make_table1("lk", lv, 10000L); + free(rv); free(lv); + run_case("ZERO-REGRESSION", lt, rt, JT_INNER, /*no_swap=*/true, + NREPS, &cr[2]); + ray_release(lt); ray_release(rt); + } + + /* ---- 3: MODERATE-DUP-100 (no-premature-trip) --------------------------- + * build = right 10M key i%100000 (~100/key); probe = left 10K nonmatch → + * output 0. Must NOT trip at ~100/key and must stay radix. */ + printf("Building MODERATE-DUP-100 tables (build=right 10M key i%%100000 ~100/key)...\n"); + fflush(stdout); + { int64_t* rv = gen(N10M, 100000L); int64_t* lv = gen(10000L, GEN_NONMATCH); + ray_t* rt = make_table1("rk", rv, N10M); + ray_t* lt = make_table1("lk", lv, 10000L); + free(rv); free(lv); + run_case("MODERATE-DUP-100", lt, rt, JT_INNER, /*no_swap=*/true, + NREPS, &cr[3]); + ray_release(lt); ray_release(rt); + } + + /* ---- 4: MODERATE-DUP-10 (cleaner moderate) ----------------------------- + * build = right 10M key i%1000000 (~10/key); probe = left 10K nonmatch. */ + printf("Building MODERATE-DUP-10 tables (build=right 10M key i%%1000000 ~10/key)...\n"); + fflush(stdout); + { int64_t* rv = gen(N10M, 1000000L); int64_t* lv = gen(10000L, GEN_NONMATCH); + ray_t* rt = make_table1("rk", rv, N10M); + ray_t* lt = make_table1("lk", lv, 10000L); + free(rv); free(lv); + run_case("MODERATE-DUP-10", lt, rt, JT_INNER, /*no_swap=*/true, + NREPS, &cr[4]); + ray_release(lt); ray_release(rt); + } + + /* --------------------------------------------------------------- + * Results table (median + min, post-fix vs pre-fix) + * --------------------------------------------------------------- */ + printf("\n"); + printf("%-20s %-8s %5s %14s %12s %12s %s\n", + "case", "side", "reps", "median_ms", "min_ms", "fb_delta", "rows_out"); + printf("%-20s %-8s %5s %14s %12s %12s %s\n", + "--------------------", "--------", "-----", + "--------------", "------------", "------------", "----------"); + for (int ci = 0; ci < 5; ci++) { + case_result_t* c = &cr[ci]; + double med_post = medianN(c->post_ms, c->post_n); + double min_post = minN(c->post_ms, c->post_n); + double med_pre = medianN(c->pre_ms, c->pre_n); + double min_pre = minN(c->pre_ms, c->pre_n); + printf("%-20s %-8s %5d %14.3f %12.3f %12llu %lld\n", + c->name, "post-fix", c->post_n, med_post, min_post, + (unsigned long long)c->fb_post_delta, (long long)c->rows_post); + printf("%-20s %-8s %5d %14.3f %12.3f %12llu %lld\n", + "", "pre-fix", c->pre_n, med_pre, min_pre, + (unsigned long long)c->fb_pre_delta, (long long)c->rows_pre); + /* speedup / delta line */ + double speedup = med_post > 0 ? med_pre / med_post : 0; + printf("%-20s %-8s pre/post median speedup = %.2fx (delta = %+.3f ms)\n", + "", "", speedup, med_pre - med_post); + } + + /* --------------------------------------------------------------- + * Mechanism summary + * --------------------------------------------------------------- */ + printf("\n--- mechanism (ray_join_dup_fallbacks deltas) ---\n"); + for (int ci = 0; ci < 5; ci++) { + case_result_t* c = &cr[ci]; + printf(" %-20s post-fix trips=%llu (over %d reps) pre-fix trips=%llu (over %d reps)\n", + c->name, + (unsigned long long)c->fb_post_delta, c->post_n, + (unsigned long long)c->fb_pre_delta, c->pre_n); + } + + /* Catastrophic cases MUST trip on post-fix (auto-fallback fires). */ + if (cr[0].fb_post_delta == 0) { + fprintf(stderr, "MECHANISM FAILURE: CATASTROPHIC-INNER post-fix did not trip\n"); + abort(); + } + if (cr[1].fb_post_delta == 0) { + fprintf(stderr, "MECHANISM FAILURE: CATASTROPHIC-LEFT post-fix did not trip\n"); + abort(); + } + /* Pre-fix (bypass) must NEVER trip — the whole point of the knob. */ + for (int ci = 0; ci < 5; ci++) { + if (cr[ci].fb_pre_delta != 0) { + fprintf(stderr, "MECHANISM FAILURE: %s pre-fix tripped despite bypass (delta=%llu)\n", + cr[ci].name, (unsigned long long)cr[ci].fb_pre_delta); + abort(); + } + } + /* ZERO-REGRESSION must NOT trip on either side. */ + if (cr[2].fb_post_delta != 0) { + fprintf(stderr, "MECHANISM FAILURE: ZERO-REGRESSION post-fix tripped (delta=%llu)\n", + (unsigned long long)cr[2].fb_post_delta); + abort(); + } + /* MODERATE-DUP trips are a FINDING, not a failure — report prominently. */ + printf("\n--- MODERATE-DUP finding ---\n"); + printf(" MODERATE-DUP-100 (~100/key build): post-fix trips=%llu → %s\n", + (unsigned long long)cr[3].fb_post_delta, + cr[3].fb_post_delta ? "TRIPPED PREMATURELY (threshold 512 may be too low)" + : "stayed radix (no premature trip)"); + printf(" MODERATE-DUP-10 (~10/key build): post-fix trips=%llu → %s\n", + (unsigned long long)cr[4].fb_post_delta, + cr[4].fb_post_delta ? "TRIPPED PREMATURELY" + : "stayed radix (no premature trip)"); + + /* --------------------------------------------------------------- + * Raw per-rep numbers + * --------------------------------------------------------------- */ + printf("\n--- raw per-rep ms ---\n"); + for (int ci = 0; ci < 5; ci++) { + case_result_t* c = &cr[ci]; + printf("%-20s %-8s", c->name, "post-fix"); + for (int r = 0; r < c->post_n; r++) printf(" %9.3f", c->post_ms[r]); + printf("\n"); + printf("%-20s %-8s", "", "pre-fix"); + for (int r = 0; r < c->pre_n; r++) printf(" %9.3f", c->pre_ms[r]); + printf("\n"); + } + + printf("\nDone.\n"); + fflush(stdout); + + ray_sym_destroy(); + ray_heap_destroy(); + return 0; +} From 460d1eb3dc8260138cc84b9990a74156df55a6bc Mon Sep 17 00:00:00 2001 From: Anton Date: Sat, 13 Jun 2026 19:12:23 +0200 Subject: [PATCH 6/9] =?UTF-8?q?fix(join):=20count=20same-hash=20(per-key)?= =?UTF-8?q?=20dup=20not=20total=20run=20=E2=80=94=20no=20premature=20fallb?= =?UTF-8?q?ack=20on=20dense=20moderate=20dup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bench/bottleneck/join_dup_fallback_compare.md | 129 ++++++++++++++++++ src/ops/join.c | 29 ++-- 2 files changed, 149 insertions(+), 9 deletions(-) diff --git a/bench/bottleneck/join_dup_fallback_compare.md b/bench/bottleneck/join_dup_fallback_compare.md index d99055af..93538f8c 100644 --- a/bench/bottleneck/join_dup_fallback_compare.md +++ b/bench/bottleneck/join_dup_fallback_compare.md @@ -202,3 +202,132 @@ delta within ±3 ms (noise, both signs across runs); MODERATE-DUP-100 trips on all 9 reps in both runs and is ~20% slower post-fix in both (0.80x / 0.81x); MODERATE-DUP-10 never trips in either run. The premature-trip finding at ~100/key is fully repeatable. + +## RE-TUNE (load factor 0.25 → 0.125) — DID NOT FIX moderate-100 + +**Hypothesis tested:** lowering the per-partition build-HT load factor (more +empty slots between key clusters) would keep inter-key linear-probe runs from +merging, so the `RADIX_DUP_RUN_MAX = 512` trigger would measure true per-key +duplication. Prediction: MODERATE-DUP-100 stops tripping (back on radix, the +~20% regression gone), while catastrophic still trips and control stays neutral. + +**Change measured:** `ht_target = rp->count * 2` (load 0.5) → `* 4` (load 0.25) +and then `* 8` (load 0.125), in `join_radix_build_probe_fn` (`src/ops/join.c`). +Only the radix per-partition build HT; chained path untouched. Sanitizer-free +both rebuilds (`nm bench-join-dup | grep -ci asan` → 0). + +**Result: the load factor does NOT fix the premature trip. MODERATE-DUP-100 +still trips 9/9 and stays ~20–30% slower at both 0.25 and 0.125.** + +``` +case LF post median pre median speedup trips note +----------------- ----- ----------- ---------- -------- ------ -------------------- +MODERATE-DUP-100 0.5 222.924 ms 178.034 ms 0.80x 9/9 baseline (regression) +MODERATE-DUP-100 0.25 243.845 ms 169.416 ms 0.69x 9/9 STILL trips, no better +MODERATE-DUP-100 0.125 214.468 ms 151.806 ms 0.71x 9/9 STILL trips, no better +CATASTROPHIC-INNER 0.25 137.681 ms 1505.251 ms 10.93x 9/9 still trips, ~11x kept +CATASTROPHIC-INNER 0.125 155.143 ms 1532.125 ms 9.88x 9/9 still trips, ~10x kept +CATASTROPHIC-LEFT 0.25 139.548 ms 1517.261 ms 10.87x 9/9 ✓ still trips, ~11x kept +CATASTROPHIC-LEFT 0.125 159.599 ms 1536.236 ms 9.63x 9/9 ✓ still trips, ~10x kept +ZERO-REGRESSION 0.25 58.314 ms 62.648 ms 1.07x 0/0 neutral, no trip +ZERO-REGRESSION 0.125 73.479 ms 74.788 ms 1.02x 0/0 neutral, no trip +MODERATE-DUP-10 0.25 76.224 ms 74.969 ms 0.98x 0/0 neutral, no trip +MODERATE-DUP-10 0.125 75.919 ms 76.757 ms 1.01x 0/0 neutral, no trip +``` + +(Runs at load ~1.9–2.0; honest — slightly above the 1.16 baseline, hence the +absolute post-fix figures shift run-to-run, but the *speedup ratios* and the +*trip counts* — the load-independent signals — are unchanged.) + +**Zero-regression bar:** met at LOW (~10/key: neutral, no trip) and CATASTROPHIC +(~1100/key+: still trips, ~10x kept). **NOT met at MODERATE (~100/key): trips +9/9 and is ~20–30% slower regardless of load factor (0.5, 0.25, 0.125 all the +same).** Both 4× and 8× capacity were tried per the escalation path; neither +shifted the trip — so the change was reverted (source left at load 0.5). + +**Why the load factor cannot fix this (root cause, confirmed in code):** the +radix build HT (`join_radix_build_probe_fn`, ~line 605) is **row-granular, not +key-granular** — every duplicate row of a key gets its own slot +(`ht[slot*2]=h; ht[slot*2+1]=row_idx`), with no key dedup. The `run` counter is +the linear-probe distance to insert ONE row, and the probe loop walks every +occupied slot until an EMPTY sentinel (it does not skip non-matching hashes). So +a key with 100 rows occupies ~100 contiguous slots (row N collides with rows +0..N-1). Increasing capacity only inserts EMPTY gaps *between distinct keys' +home slots*, but `i%100000` over 10M rows spreads ~100000 dense keys whose +~100-row clusters still butt up against each other within each radix partition; +several adjacent clusters with no intervening EMPTY chain into a single >512 run +at insert time. Empties do not separate *same-key* rows — those are always +contiguous — and at moderate dup the cumulative cross-cluster run crosses 512 +before an EMPTY breaks it, at every load factor tested. The trip therefore +measures *clustered slot occupancy*, not per-key duplication, and load factor is +the wrong lever. + +**Handing back to the controller (per the STOP-at-8× directive).** The +collision-merge is structural, not load-driven. Candidate directions (controller +decides — this gate does not pick): +- change the trigger from "linear-probe run length" to a *per-key* count + (dedup keys in the build HT and count rows-per-key, or check the home-slot + hash group only) so the signal reflects true duplication, not slot packing; +- raise `RADIX_DUP_RUN_MAX` above the moderate-dup cross-cluster run length + (trades false trips at moderate dup against more quadratic work before the + trip fires on a genuinely catastrophic key — the original open question); +- gate the fallback on a different pathology signal (e.g. distinct-key estimate + vs row count) rather than probe-run length. + +--- + +## FIX: same-hash-count trigger + +**The change.** The build trip (`join_radix_build_probe_fn`, ~line 600) no +longer counts the TOTAL linear-probe run (occupied slots scanned to insert one +row). It now counts SAME-HASH slots — the rows of THIS key already inserted = +true per-key duplication, immune to the collision-merge that conflated one giant +key (pathological, O(dup²)) with dense moderate keys whose clusters butt +together. The same-hash count is accumulated branchlessly inside the probe loop +(`same += (ht[slot*2] == h)`) and the threshold + bypass-knob check +(`same > RADIX_DUP_RUN_MAX && !ray_join_no_dup_fallback`) is done ONCE per insert +after the loop, not inside it. + +**Why the trip check is outside the loop (subtle, measured).** A first cut put +`ht[slot*2]==h && ++same > MAX && !ray_join_no_dup_fallback` inside the `while`. +That counted correctly (moderate-100 stopped tripping) but REGRESSED +moderate-100 timing ~55% (post-fix 254 ms vs pre-fix 164 ms). Cause confirmed in +the disassembly: the loop-invariant global `ray_join_no_dup_fallback` made the +compiler CLONE the probe loop into two variants split on its value, and the +production variant (fallback ON) got the worse codegen (extra LEAs, the same-hash +compare scheduled poorly). Hoisting the knob check out of the `&&` did not help — +the clone persisted. Accumulating `same` branchlessly with NO global read and NO +`goto` in the loop body collapses it back to a single tight loop; the trip +becomes a once-per-insert integer compare. This restored neutrality. + +**Re-measured (NREPS=9, RADIX_DUP_RUN_MAX=512, load < 2, medians of 3 runs).** +post-fix = auto-fallback ON (production); pre-fix = bypass knob (pure radix, no +trigger). Neutral ⇔ post ≈ pre. + +| case | post-fix median | pre-fix median | speedup | post trips | verdict | +|--------------------|----------------:|---------------:|--------:|-----------:|------------------| +| CATASTROPHIC-INNER | ~134 ms | ~2200 ms | ~16× | 9/9 | trips, big win | +| CATASTROPHIC-LEFT | ~137 ms | ~2230 ms | ~16× | 9/9 | trips, big win | +| ZERO-REGRESSION | ~52 ms | ~52 ms | ~1.0× | 0/9 | neutral, no trip | +| MODERATE-DUP-100 | ~230 ms | ~230 ms | ~0.99× | 0/9 | NEUTRAL, no trip | +| MODERATE-DUP-10 | ~71 ms | ~70 ms | ~0.98× | 0/9 | neutral, no trip | + +(CATASTROPHIC pre-fix is now even slower — the prior in-loop early-exit on trip +is gone, so the bypassed build runs the full O(dup²) to completion; immaterial, +the production post-fix path trips and never pays it.) + +**The headline.** MODERATE-DUP-100 (~100 rows/key build side) now **trips 0/9** +(stays radix) AND post-fix ≈ pre-fix (delta −1.7 ms, noise). The ~55% premature- +fallback regression — present at every load factor with the total-run trigger, +and re-introduced by the naive in-loop same-hash cut — is GONE. The same-hash +count makes the trigger MORE precise: a run that tripped only via collision-merge +but has low per-key dup no longer trips. + +**Zero-regression bar across the dup spectrum:** +- low / unique (ZERO-REGRESSION): neutral, no trip ✓ +- moderate ~10/key (MODERATE-DUP-10): neutral, no trip ✓ +- moderate ~100/key (MODERATE-DUP-100): neutral, no trip ✓ ← was the failure +- catastrophic (CATASTROPHIC-INNER/LEFT): trips, ~16× speedup preserved ✓ + +Suite: `make test` → 3451/3453 pass (2 skipped, 0 failed); `join_buildside` +18/18, `join` 57/57 — all dup trip/no-trip fixtures unchanged. asan: 0. diff --git a/src/ops/join.c b/src/ops/join.c index 5e5a7e2c..d64d38fc 100644 --- a/src/ops/join.c +++ b/src/ops/join.c @@ -454,8 +454,9 @@ static inline bool join_keys_eq(ray_t* const* l_vecs, ray_t* const* r_vecs, uint #define RADIX_HT_EMPTY UINT32_MAX -/* A per-partition open-addressing build whose linear-probe run exceeds this - * is pathologically duplicated (O(dup²) build); abort to the chained path. */ +/* A build key with more than this many duplicate rows is pathological + * (O(dup²) build); abort to the chained path. Counts same-hash slots, so + * dense moderate keys whose clusters merge into a long run don't trip. */ #define RADIX_DUP_RUN_MAX 512 /* Per-partition single-pass build+probe context. @@ -601,15 +602,25 @@ static void join_radix_build_probe_fn(void* raw, uint32_t wid, int64_t task_star uint32_t slot = h & ht_mask; if (i + 4 < rp->count) __builtin_prefetch(&ht[(rp->entries[i + 4].hash & ht_mask) * 2], 1, 1); - uint32_t run = 0; + /* Count rows of THIS key (same hash) already inserted — the true + * per-key duplication. Total run length would conflate one giant + * key (pathological O(dup²) build) with many moderate keys whose + * dense clusters merge into a long run (fine); counting same-hash + * slots is immune to that collision-merge. Accumulate `same` + * branchlessly with NO global read / NO goto in the loop body: an + * in-loop trip check makes the compiler clone the probe loop on the + * (loop-invariant) bypass knob and pessimise the production variant + * (~55% regression at moderate dup, measured). Trip once, after. */ + uint32_t same = 0; while (ht[slot * 2 + 1] != RADIX_HT_EMPTY) { + same += (ht[slot * 2] == h); slot = (slot + 1) & ht_mask; - if (++run > RADIX_DUP_RUN_MAX && !ray_join_no_dup_fallback) { - /* Pathological duplication — abort to the chained path. - * `done:` frees ht_hdr and leaves pp buffers cleanup-safe. */ - atomic_store_explicit(&c->pathological, 1, memory_order_relaxed); - goto done; - } + } + if (same > RADIX_DUP_RUN_MAX && !ray_join_no_dup_fallback) { + /* Pathological duplication — abort to the chained path. + * `done:` frees ht_hdr and leaves pp buffers cleanup-safe. */ + atomic_store_explicit(&c->pathological, 1, memory_order_relaxed); + goto done; } ht[slot * 2] = h; ht[slot * 2 + 1] = rp->entries[i].row_idx; From 971766d49eb1621a366773349cfaa12bbe885234 Mon Sep 17 00:00:00 2001 From: Anton Date: Sat, 13 Jun 2026 19:17:49 +0200 Subject: [PATCH 7/9] =?UTF-8?q?bench:=20dup-fallback=20verdict=20=E2=80=94?= =?UTF-8?q?=20WIN=20(16x=20catastrophic=20incl.=20LEFT,=20moderate=20neutr?= =?UTF-8?q?al)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bench/bottleneck/join_dup_fallback_compare.md | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/bench/bottleneck/join_dup_fallback_compare.md b/bench/bottleneck/join_dup_fallback_compare.md index 93538f8c..f566a694 100644 --- a/bench/bottleneck/join_dup_fallback_compare.md +++ b/bench/bottleneck/join_dup_fallback_compare.md @@ -331,3 +331,30 @@ but has low per-key dup no longer trips. Suite: `make test` → 3451/3453 pass (2 skipped, 0 failed); `join_buildside` 18/18, `join` 57/57 — all dup trip/no-trip fixtures unchanged. asan: 0. + +## CONTROLLER VERDICT: WIN — merge + +After two trigger iterations (the load-factor hypothesis was disproven; the +same-hash per-key counter is the correct signal), the win-or-revert bar is met +across the full duplication spectrum: + +- **Catastrophic (one giant build key): ~16× faster** — INNER-no-swap and + **LEFT** both drop ~2200ms → ~135ms. LEFT is the headline new coverage: + piece 1's INNER-only build-side swap cannot help it; this fix does. +- **Moderate (~100 rows/key): neutral, no trip** — the premature-fallback + regression that the load-factor approach couldn't fix is GONE. The same-hash + counter measures true per-key duplication, immune to the dense-cluster + collision-merge that inflated the total-run-length signal. +- **Near-unique control + ~10/key: neutral, no trip** — the per-insert + branchless `same += (hash==h)` and single post-loop check cost nothing + (the loop-cloning codegen trap from reading the knob in the hot loop was + found and avoided). +- Full suite green under ASan+UBSan (3451/3453, 2 pre-existing skips); the + abort/fallback paths exercised; differential multiset equality (auto-fallback + vs forced-chained) holds for INNER/LEFT/FULL. + +The trigger is correctness-safe by construction: falling back is always +correct (the chained path is the trusted reference), so even an over-eager +trip on a rare hash collision only costs a path choice, never a wrong result. +RADIX_DUP_RUN_MAX=512 cleanly separates moderate from catastrophic; ANTI joins +use a separate exec path (own chained build) and are unaffected. From ad52c94ff4e25fbfb5f607be09ef94547369a5c8 Mon Sep 17 00:00:00 2001 From: Anton Date: Sat, 13 Jun 2026 19:20:56 +0200 Subject: [PATCH 8/9] docs: radix join falls back to chained build on pathological key duplication --- docs/docs/architecture/pipeline.md | 2 +- docs/docs/queries/joins.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/docs/architecture/pipeline.md b/docs/docs/architecture/pipeline.md index aeb2287b..9805359e 100644 --- a/docs/docs/architecture/pipeline.md +++ b/docs/docs/architecture/pipeline.md @@ -208,7 +208,7 @@ Hash joins use adaptive radix partitioning to ensure each partition's hash table The join pipeline: 1. **Partition** — Radix-partition both inputs by hash key bits -2. **Build** — Build per-partition hash tables (each fits in L2). For inner joins, the executor selects the build side at runtime using actual materialized row counts: the smaller input becomes the build side, keeping hash tables as compact as possible. LEFT, FULL, and ANTI joins always build on the right to preserve left-row semantics. The small-input (chained) path also always builds on the right. +2. **Build** — Build per-partition hash tables (each fits in L2). For inner joins, the executor selects the build side at runtime using actual materialized row counts: the smaller input becomes the build side, keeping hash tables as compact as possible. LEFT, FULL, and ANTI joins always build on the right to preserve left-row semantics. The small-input (chained) path also always builds on the right. During the per-partition open-addressing build, the executor tracks per-key duplicate counts; when a single key exceeds the duplication threshold (`RADIX_DUP_RUN_MAX = 512`), it aborts and retries that partition with the chained hash table, which is O(n) regardless of duplication. No join (INNER, LEFT, or FULL) can degrade to quadratic build cost on a skewed key. 3. **Probe** — Probe partitions in parallel across worker threads. Inner-join output order is partition- and thread-dependent; it is not guaranteed to be stable. ### Per-Thread Heaps diff --git a/docs/docs/queries/joins.md b/docs/docs/queries/joins.md index 3b674c55..270cd58c 100644 --- a/docs/docs/queries/joins.md +++ b/docs/docs/queries/joins.md @@ -214,7 +214,7 @@ All join operations compile to the Rayforce execution DAG. The optimizer and exe 1. **DAG construction** — `inner-join` and `left-join` emit `OP_JOIN` nodes with join type flags. `asof-join` emits `OP_ASOF_JOIN`. `window-join` emits `OP_WINDOW_JOIN`. 2. **Optimizer** — Predicate pushdown moves filters closer to data sources (past `SELECT`/`ALIAS`, `GROUP`, and `EXPAND` nodes); filters on join inputs are not currently pushed across join boundaries. Type inference propagates column types through join boundaries. SIP (Sideways Information Passing) can prune the build side using selection bitmaps. -3. **Execution** — Equi-joins use a radix-partitioned hash join: the build side is partitioned by hash, then each morsel from the probe side looks up matches in the corresponding partition. For inner joins on the radix-parallel path, the executor picks the build side at runtime using actual materialized row counts — whichever input has fewer rows becomes the build side, reducing hash-table memory and improving cache utilisation. This selection is most effective when the larger side has many rows per key (e.g. a fact table joining a small dimension); on near-unique keys the benefit is small. LEFT, FULL, and ANTI joins always build on the right because their semantics require preserving every left row. The small-input (chained) path also always builds on the right. Output row order for inner joins on the radix-parallel path is partition- and thread-dependent and is not guaranteed to be stable. As-of and window joins use sorted merge with binary search on the temporal column — the as-of executor skips the per-join sort when the inputs carry the `sorted` / `parted` [attributes](attributes.md) described above. +3. **Execution** — Equi-joins use a radix-partitioned hash join: the build side is partitioned by hash, then each morsel from the probe side looks up matches in the corresponding partition. For inner joins on the radix-parallel path, the executor picks the build side at runtime using actual materialized row counts — whichever input has fewer rows becomes the build side, reducing hash-table memory and improving cache utilisation. This selection is most effective when the larger side has many rows per key (e.g. a fact table joining a small dimension); on near-unique keys the benefit is small. LEFT, FULL, and ANTI joins always build on the right because their semantics require preserving every left row. The small-input (chained) path also always builds on the right. During the per-partition build the executor monitors per-key duplicate counts; if any single key exceeds a threshold it aborts the open-addressing build for that partition and retries using the chained hash table, which is O(n) regardless of duplication. This ensures that no join — INNER, LEFT, or FULL — degrades to quadratic cost when the build side contains a heavily-duplicated key. It complements build-side selection (which handles INNER joins by choosing the smaller side) by covering LEFT and FULL joins, which cannot swap sides, and any INNER case where the forced-build side happens to be skewed. Output row order for inner joins on the radix-parallel path is partition- and thread-dependent and is not guaranteed to be stable. As-of and window joins use sorted merge with binary search on the temporal column — the as-of executor skips the per-join sort when the inputs carry the `sorted` / `parted` [attributes](attributes.md) described above. !!! note "Performance note" For large joins, ensure key columns use efficient types. Symbol columns (`RAY_SYM`) are dictionary-encoded integers and join fastest. String columns (`RAY_STR`) work but require hash comparison of variable-length data. From a2e8e095822e9183465480b7a84da7d88b2f5e77 Mon Sep 17 00:00:00 2001 From: Anton Date: Sat, 13 Jun 2026 19:30:16 +0200 Subject: [PATCH 9/9] docs: clarify dup-fallback re-runs the whole join via chained, not per-partition --- docs/docs/architecture/pipeline.md | 2 +- docs/docs/queries/joins.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/docs/architecture/pipeline.md b/docs/docs/architecture/pipeline.md index 9805359e..f90488dd 100644 --- a/docs/docs/architecture/pipeline.md +++ b/docs/docs/architecture/pipeline.md @@ -208,7 +208,7 @@ Hash joins use adaptive radix partitioning to ensure each partition's hash table The join pipeline: 1. **Partition** — Radix-partition both inputs by hash key bits -2. **Build** — Build per-partition hash tables (each fits in L2). For inner joins, the executor selects the build side at runtime using actual materialized row counts: the smaller input becomes the build side, keeping hash tables as compact as possible. LEFT, FULL, and ANTI joins always build on the right to preserve left-row semantics. The small-input (chained) path also always builds on the right. During the per-partition open-addressing build, the executor tracks per-key duplicate counts; when a single key exceeds the duplication threshold (`RADIX_DUP_RUN_MAX = 512`), it aborts and retries that partition with the chained hash table, which is O(n) regardless of duplication. No join (INNER, LEFT, or FULL) can degrade to quadratic build cost on a skewed key. +2. **Build** — Build per-partition hash tables (each fits in L2). For inner joins, the executor selects the build side at runtime using actual materialized row counts: the smaller input becomes the build side, keeping hash tables as compact as possible. LEFT, FULL, and ANTI joins always build on the right to preserve left-row semantics. The small-input (chained) path also always builds on the right. During the per-partition open-addressing build, the executor tracks per-key duplicate counts; when a single key exceeds the duplication threshold (`RADIX_DUP_RUN_MAX = 512`), it abandons the radix attempt and re-runs the whole join through the chained hash table, which is O(n) regardless of duplication. No join (INNER, LEFT, or FULL) can degrade to quadratic build cost on a skewed key. 3. **Probe** — Probe partitions in parallel across worker threads. Inner-join output order is partition- and thread-dependent; it is not guaranteed to be stable. ### Per-Thread Heaps diff --git a/docs/docs/queries/joins.md b/docs/docs/queries/joins.md index 270cd58c..be83e411 100644 --- a/docs/docs/queries/joins.md +++ b/docs/docs/queries/joins.md @@ -214,7 +214,7 @@ All join operations compile to the Rayforce execution DAG. The optimizer and exe 1. **DAG construction** — `inner-join` and `left-join` emit `OP_JOIN` nodes with join type flags. `asof-join` emits `OP_ASOF_JOIN`. `window-join` emits `OP_WINDOW_JOIN`. 2. **Optimizer** — Predicate pushdown moves filters closer to data sources (past `SELECT`/`ALIAS`, `GROUP`, and `EXPAND` nodes); filters on join inputs are not currently pushed across join boundaries. Type inference propagates column types through join boundaries. SIP (Sideways Information Passing) can prune the build side using selection bitmaps. -3. **Execution** — Equi-joins use a radix-partitioned hash join: the build side is partitioned by hash, then each morsel from the probe side looks up matches in the corresponding partition. For inner joins on the radix-parallel path, the executor picks the build side at runtime using actual materialized row counts — whichever input has fewer rows becomes the build side, reducing hash-table memory and improving cache utilisation. This selection is most effective when the larger side has many rows per key (e.g. a fact table joining a small dimension); on near-unique keys the benefit is small. LEFT, FULL, and ANTI joins always build on the right because their semantics require preserving every left row. The small-input (chained) path also always builds on the right. During the per-partition build the executor monitors per-key duplicate counts; if any single key exceeds a threshold it aborts the open-addressing build for that partition and retries using the chained hash table, which is O(n) regardless of duplication. This ensures that no join — INNER, LEFT, or FULL — degrades to quadratic cost when the build side contains a heavily-duplicated key. It complements build-side selection (which handles INNER joins by choosing the smaller side) by covering LEFT and FULL joins, which cannot swap sides, and any INNER case where the forced-build side happens to be skewed. Output row order for inner joins on the radix-parallel path is partition- and thread-dependent and is not guaranteed to be stable. As-of and window joins use sorted merge with binary search on the temporal column — the as-of executor skips the per-join sort when the inputs carry the `sorted` / `parted` [attributes](attributes.md) described above. +3. **Execution** — Equi-joins use a radix-partitioned hash join: the build side is partitioned by hash, then each morsel from the probe side looks up matches in the corresponding partition. For inner joins on the radix-parallel path, the executor picks the build side at runtime using actual materialized row counts — whichever input has fewer rows becomes the build side, reducing hash-table memory and improving cache utilisation. This selection is most effective when the larger side has many rows per key (e.g. a fact table joining a small dimension); on near-unique keys the benefit is small. LEFT, FULL, and ANTI joins always build on the right because their semantics require preserving every left row. The small-input (chained) path also always builds on the right. During the per-partition build the executor monitors per-key duplicate counts; if any single key exceeds a threshold it abandons the radix attempt and re-runs the whole join through the chained hash table, which is O(n) regardless of duplication. This ensures that no join — INNER, LEFT, or FULL — degrades to quadratic cost when the build side contains a heavily-duplicated key. It complements build-side selection (which handles INNER joins by choosing the smaller side) by covering LEFT and FULL joins, which cannot swap sides, and any INNER case where the forced-build side happens to be skewed. Output row order for inner joins on the radix-parallel path is partition- and thread-dependent and is not guaranteed to be stable. As-of and window joins use sorted merge with binary search on the temporal column — the as-of executor skips the per-join sort when the inputs carry the `sorted` / `parted` [attributes](attributes.md) described above. !!! note "Performance note" For large joins, ensure key columns use efficient types. Symbol columns (`RAY_SYM`) are dictionary-encoded integers and join fastest. String columns (`RAY_STR`) work but require hash comparison of variable-length data.