Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ bench-alloc
bench-group-pushdown
bench-idx-route
bench-join-buildside
bench-join-dup

# Rayforce REPL history
.rayhist.dat
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ bench-join-buildside:
bench/join_buildside/main.c $(LIB_SRC) $(LIBS) $(RELEASE_LDFLAGS)
./bench-join-buildside

# Join dup-fallback perf gate.
# Measures post-fix (auto dup-fallback to chained build) vs pre-fix (O(dup²)
# build via the ray_join_no_dup_fallback bypass knob) on catastrophic,
# zero-regression, and moderate-dup cases. Sanitizer-free.
bench-join-dup:
$(CC) $(RELEASE_CFLAGS) $(DEFS) $(INCLUDES) -o bench-join-dup \
bench/join_dup/main.c $(LIB_SRC) $(LIBS) $(RELEASE_LDFLAGS)
./bench-join-dup

# Tests. Depends on $(TARGET) because test/rfl/system/ipc_diff.rfl
# spawns ./$(TARGET) as an IPC server via .sys.exec — both binaries
# must exist on disk and share the build flavour (sanitizers, coverage).
Expand Down Expand Up @@ -185,7 +194,7 @@ clean:
-rm -f cov-*.profraw default.profraw coverage.profdata
-rm -rf coverage_html

.PHONY: default debug release lib bench-alloc bench-join-buildside test coverage clean
.PHONY: default debug release lib bench-alloc bench-join-buildside bench-join-dup test coverage clean

# Header dependencies last: .d fragments only add prerequisites to the
# object targets above, and being last they can't hijack the default goal.
Expand Down
360 changes: 360 additions & 0 deletions bench/bottleneck/join_dup_fallback_compare.md

Large diffs are not rendered by default.

442 changes: 442 additions & 0 deletions bench/join_dup/main.c

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/docs/architecture/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ Hash joins use adaptive radix partitioning to ensure each partition's hash table
The join pipeline:

1. **Partition** — Radix-partition both inputs by hash key bits
2. **Build** — Build per-partition hash tables (each fits in L2). For inner joins, the executor selects the build side at runtime using actual materialized row counts: the smaller input becomes the build side, keeping hash tables as compact as possible. LEFT, FULL, and ANTI joins always build on the right to preserve left-row semantics. The small-input (chained) path also always builds on the right.
2. **Build** — Build per-partition hash tables (each fits in L2). For inner joins, the executor selects the build side at runtime using actual materialized row counts: the smaller input becomes the build side, keeping hash tables as compact as possible. LEFT, FULL, and ANTI joins always build on the right to preserve left-row semantics. The small-input (chained) path also always builds on the right. During the per-partition open-addressing build, the executor tracks per-key duplicate counts; when a single key exceeds the duplication threshold (`RADIX_DUP_RUN_MAX = 512`), it abandons the radix attempt and re-runs the whole join through the chained hash table, which is O(n) regardless of duplication. No join (INNER, LEFT, or FULL) can degrade to quadratic build cost on a skewed key.
3. **Probe** — Probe partitions in parallel across worker threads. Inner-join output order is partition- and thread-dependent; it is not guaranteed to be stable.

### Per-Thread Heaps
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/queries/joins.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ All join operations compile to the Rayforce execution DAG. The optimizer and exe

1. **DAG construction** — `inner-join` and `left-join` emit `OP_JOIN` nodes with join type flags. `asof-join` emits `OP_ASOF_JOIN`. `window-join` emits `OP_WINDOW_JOIN`.
2. **Optimizer** — Predicate pushdown moves filters closer to data sources (past `SELECT`/`ALIAS`, `GROUP`, and `EXPAND` nodes); filters on join inputs are not currently pushed across join boundaries. Type inference propagates column types through join boundaries. SIP (Sideways Information Passing) can prune the build side using selection bitmaps.
3. **Execution** — Equi-joins use a radix-partitioned hash join: the build side is partitioned by hash, then each morsel from the probe side looks up matches in the corresponding partition. For inner joins on the radix-parallel path, the executor picks the build side at runtime using actual materialized row counts — whichever input has fewer rows becomes the build side, reducing hash-table memory and improving cache utilisation. This selection is most effective when the larger side has many rows per key (e.g. a fact table joining a small dimension); on near-unique keys the benefit is small. LEFT, FULL, and ANTI joins always build on the right because their semantics require preserving every left row. The small-input (chained) path also always builds on the right. Output row order for inner joins on the radix-parallel path is partition- and thread-dependent and is not guaranteed to be stable. As-of and window joins use sorted merge with binary search on the temporal column — the as-of executor skips the per-join sort when the inputs carry the `sorted` / `parted` [attributes](attributes.md) described above.
3. **Execution** — Equi-joins use a radix-partitioned hash join: the build side is partitioned by hash, then each morsel from the probe side looks up matches in the corresponding partition. For inner joins on the radix-parallel path, the executor picks the build side at runtime using actual materialized row counts — whichever input has fewer rows becomes the build side, reducing hash-table memory and improving cache utilisation. This selection is most effective when the larger side has many rows per key (e.g. a fact table joining a small dimension); on near-unique keys the benefit is small. LEFT, FULL, and ANTI joins always build on the right because their semantics require preserving every left row. The small-input (chained) path also always builds on the right. During the per-partition build the executor monitors per-key duplicate counts; if any single key exceeds a threshold it abandons the radix attempt and re-runs the whole join through the chained hash table, which is O(n) regardless of duplication. This ensures that no join — INNER, LEFT, or FULL — degrades to quadratic cost when the build side contains a heavily-duplicated key. It complements build-side selection (which handles INNER joins by choosing the smaller side) by covering LEFT and FULL joins, which cannot swap sides, and any INNER case where the forced-build side happens to be skewed. Output row order for inner joins on the radix-parallel path is partition- and thread-dependent and is not guaranteed to be stable. As-of and window joins use sorted merge with binary search on the temporal column — the as-of executor skips the per-join sort when the inputs carry the `sorted` / `parted` [attributes](attributes.md) described above.

!!! note "Performance note"
For large joins, ensure key columns use efficient types. Symbol columns (`RAY_SYM`) are dictionary-encoded integers and join fastest. String columns (`RAY_STR`) work but require hash comparison of variable-length data.
Expand Down
4 changes: 4 additions & 0 deletions src/ops/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,10 @@ extern bool ray_expr_disable;
extern bool ray_opt_no_group_pushdown;
extern bool ray_join_no_build_swap;
extern uint64_t ray_join_build_swaps;
extern bool ray_join_force_dup_fallback;
/* perf-gate bypass: disable the auto dup-fallback to measure the pre-fix O(dup²) build */
extern bool ray_join_no_dup_fallback;
extern uint64_t ray_join_dup_fallbacks;
void ray_expr_stats_init(void);

#define EXPR_MAX_REGS 16
Expand Down
48 changes: 46 additions & 2 deletions src/ops/join.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@
bool ray_join_no_build_swap = false;
/* Diagnostic: how many radix inner-joins built on the smaller (left) side. */
uint64_t ray_join_build_swaps = 0;
/* Test knob: force every radix join to fall back to the chained path, so the
* differential harness can compare radix-build vs chained-build on ordinary data. */
bool ray_join_force_dup_fallback = false;
/* Perf-gate bypass: disable the auto dup-fallback so the differential harness
* can measure the pre-fix O(dup²) build in the same binary. Independent of
* ray_join_force_dup_fallback (which forces the fallback); this disables the
* auto-trip only. */
bool ray_join_no_dup_fallback = false;
/* Diagnostic: radix joins that fell back due to pathological key duplication. */
uint64_t ray_join_dup_fallbacks = 0;

/* ── Hash helper (shared by radix and chained HT join paths) ──────────── */

Expand Down Expand Up @@ -444,6 +454,11 @@ static inline bool join_keys_eq(ray_t* const* l_vecs, ray_t* const* r_vecs, uint

#define RADIX_HT_EMPTY UINT32_MAX

/* A build key with more than this many duplicate rows is pathological
* (O(dup²) build); abort to the chained path. Counts same-hash slots, so
* dense moderate keys whose clusters merge into a long run don't trip. */
#define RADIX_DUP_RUN_MAX 512

/* Per-partition single-pass build+probe context.
* Each partition writes to its own local output buffer, then results
* are consolidated into contiguous arrays afterward. */
Expand All @@ -463,6 +478,7 @@ typedef struct {
uint32_t* pp_cap; /* capacity per partition */
_Atomic(uint8_t)* matched_right;
_Atomic(uint8_t) had_error; /* set by any partition on OOM */
_Atomic(uint8_t) pathological; /* set on long-run duplication or forced */
} join_radix_bp_ctx_t;

/* Grow per-partition output buffers (matched pair arrays).
Expand Down Expand Up @@ -506,6 +522,13 @@ static void join_radix_build_probe_fn(void* raw, uint32_t wid, int64_t task_star
join_radix_part_t* rp = &c->r_parts[p];
join_radix_part_t* lp = &c->l_parts[p];

/* Test knob: force the chained-path fallback. Bail before allocating
* anything (pp headers are still NULL → cleanup-safe). */
if (ray_join_force_dup_fallback) {
atomic_store_explicit(&c->pathological, 1, memory_order_relaxed);
return;
}

if (rp->count == 0) {
/* No right rows — emit unmatched left rows for LEFT/FULL */
if (c->join_type >= 1 && lp->count > 0) {
Expand Down Expand Up @@ -579,8 +602,26 @@ static void join_radix_build_probe_fn(void* raw, uint32_t wid, int64_t task_star
uint32_t slot = h & ht_mask;
if (i + 4 < rp->count)
__builtin_prefetch(&ht[(rp->entries[i + 4].hash & ht_mask) * 2], 1, 1);
while (ht[slot * 2 + 1] != RADIX_HT_EMPTY)
/* Count rows of THIS key (same hash) already inserted — the true
* per-key duplication. Total run length would conflate one giant
* key (pathological O(dup²) build) with many moderate keys whose
* dense clusters merge into a long run (fine); counting same-hash
* slots is immune to that collision-merge. Accumulate `same`
* branchlessly with NO global read / NO goto in the loop body: an
* in-loop trip check makes the compiler clone the probe loop on the
* (loop-invariant) bypass knob and pessimise the production variant
* (~55% regression at moderate dup, measured). Trip once, after. */
uint32_t same = 0;
while (ht[slot * 2 + 1] != RADIX_HT_EMPTY) {
same += (ht[slot * 2] == h);
slot = (slot + 1) & ht_mask;
}
if (same > RADIX_DUP_RUN_MAX && !ray_join_no_dup_fallback) {
/* Pathological duplication — abort to the chained path.
* `done:` frees ht_hdr and leaves pp buffers cleanup-safe. */
atomic_store_explicit(&c->pathological, 1, memory_order_relaxed);
goto done;
}
ht[slot * 2] = h;
ht[slot * 2 + 1] = rp->entries[i].row_idx;
}
Expand Down Expand Up @@ -995,6 +1036,7 @@ ray_t* exec_join(ray_graph_t* g, ray_op_t* op, ray_t* left_table, ray_t* right_t
.part_counts = part_counts, .pp_cap = pp_cap,
.matched_right = matched_right,
.had_error = 0,
.pathological = 0,
};
if (pool && n_rparts > 1)
ray_pool_dispatch_n(pool, join_radix_build_probe_fn, &bp_ctx, n_rparts);
Expand All @@ -1005,7 +1047,8 @@ ray_t* exec_join(ray_graph_t* g, ray_op_t* op, ray_t* left_table, ray_t* right_t
/* Check cancellation and errors during build+probe */
bool bp_cancelled = pool_cancelled(pool);
bool bp_error = atomic_load_explicit(&bp_ctx.had_error, memory_order_relaxed);
if (bp_cancelled || bp_error) {
bool bp_pathological = atomic_load_explicit(&bp_ctx.pathological, memory_order_relaxed);
if (bp_cancelled || bp_error || bp_pathological) {
/* Free all per-partition buffers */
for (uint32_t rp2 = 0; rp2 < n_rparts; rp2++) {
if (r_parts[rp2].entries_hdr) scratch_free(r_parts[rp2].entries_hdr);
Expand All @@ -1018,6 +1061,7 @@ ray_t* exec_join(ray_graph_t* g, ray_op_t* op, ray_t* left_table, ray_t* right_t
if (matched_right_hdr) { scratch_free(matched_right_hdr); matched_right_hdr = NULL; }
matched_right = NULL;
if (bp_cancelled) return ray_error("cancel", NULL);
if (bp_pathological) ray_join_dup_fallbacks++;
goto chained_ht_fallback;
}

Expand Down
Loading
Loading