Skip to content

feat(shuffleV2): support multi-CN via serve-all-buckets mode#24940

Open
aunjgr wants to merge 14 commits into
matrixorigin:mainfrom
aunjgr:feat/multi-cn-shuffle-v2
Open

feat(shuffleV2): support multi-CN via serve-all-buckets mode#24940
aunjgr wants to merge 14 commits into
matrixorigin:mainfrom
aunjgr:feat/multi-cn-shuffle-v2

Conversation

@aunjgr

@aunjgr aunjgr commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #24097

What this PR does / why we need it:

Shuffle v2 (in-process ShufflePoolV2) was only used on single-CN because its Call() method only served the current worker's bucket. On multi-CN source scopes where Mcpu=1 (maxHolders==1), the single worker wrote rows to all N buckets but only drained bucket 0 — data for remote CNs was stranded in the pool.

Fix: When maxHolders == 1, Call() uses getAnyFullBatch() / getAnyLastBatch() to serve batches from all buckets. This lets the downstream Dispatch operator route each batch to the correct target CN based on ShuffleIDX.

Compile changes: The multi-CN constructor paths (constructShuffleOperatorForJoin and constructShuffleArgForGroup) now create v2 shuffle operators instead of v1. Single-CN paths (compileShuffleJoinV2 / compileShuffleGroupV2) are unchanged.

remoterun.go: Added vm.ShuffleV2 serialization/deserialization.

How scattering/gathering works:

Source CN (Mcpu=1):
  [child ops] -> [v2 Shuffle] -> [Dispatch]
                    |                 |
                    | set ShuffleIDX  | routes bat to register[ShuffleIDX]
                    | serve all N     |
                    | buckets         +- LocalRegs  -> PipelineSpool (same CN)
                    |                 +- RemoteRegs -> MoRPC (remote CN)

Target CN: MergeReceiver -> [HashJoin / Group]

🤖 Generated with Claude Code

@aunjgr aunjgr requested a review from ouyuanning as a code owner June 11, 2026 10:41
@qodo-code-review

Copy link
Copy Markdown

Qodo reviews are paused for this user.

Troubleshooting steps vary by plan Learn more →

On a Teams plan?
Reviews resume once this user has a paid seat and their Git account is linked in Qodo.
Link Git account →

Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center?
These require an Enterprise plan - Contact us
Contact us →

aunjgr added 2 commits June 15, 2026 23:26
When maxHolders == 1 (single worker on multi-CN source scope),
Call() serves full batches from all buckets instead of only
CurrentShuffleIdx. This lets dispatch route data to the correct
target CNs.

Also swap multi-CN compile constructors to use v2 operators:
- constructShuffleOperatorForJoin: v1 -> v2
- constructShuffleArgForGroup: v1 -> v2

Add vm.ShuffleV2 serialization/deserialization in remoterun.go.
- Prepare defaults maxHolders to 1; dupOperator overrides with SetMaxHolders
- allStop checks stoppers >= holders instead of stoppers == maxHolders
- Call() serves all buckets when maxHolders == 1 (single worker)
- hashShuffle returns const/constNull directly instead of through pool
- Remove DEDUP hash join gate in compileJoin
- Add getAnyFullBatch/getAnyLastBatch to ShufflePoolV2
aunjgr added 3 commits June 22, 2026 15:28
- compileJoin: remove mcpu check and V1 compileShuffleJoin fallback;
  always route to compileShuffleJoinV2
- compileShuffleJoinV2: keep fast path for simple single-CN case (paired
  scopes, no dispatch overhead); all other cases (multi-CN, merged scopes,
  mismatched parallelism) go through newShuffleJoinScopeList + V2-specific
  join construction (shuffleV2=true)
- newShuffleJoinScopeList: always use constructShuffleOperatorForJoinV2
  instead of switching between V1 (single-CN) and V2 (multi-CN)

Together these changes eliminate every remaining V1 code path from
production shuffle joins and groups.
The old len(leftscopes) != len(rightscopes) assertion panicked when a
probe side (e.g. table scan → 1 scope with Mcpu=dop) was joined with a
build side that was itself the result of a previous shuffled join and
thus had dop scopes each with Mcpu=1 (from newShuffleJoinScopeList).

Fix: remove the assertion; the fast path now requires equal scope counts
as part of its condition.  When scopes don't match, the general path
(newShuffleJoinScopeList) handles arbitrary scope configurations.
aunjgr added 3 commits June 23, 2026 12:41
…in general path

In the general path (newShuffleJoinScopeList with multiple scopes per CN),
ShuffleIdx=-1 breaks the 1-to-1 hash-build/hash-join pairing because each
scope receives different partition data from the dispatch routing
(ShuffleToRegIndex or ShuffleToMultiMatchedReg).  A hash join could
receive the JoinMap from a different scope's hash build, producing wrong
results.

Fix: use per-scope ShuffleIdx=i (shuffleV2=false) in the general path.
The shuffle operators themselves remain V2 (non-blocking, pipelined).
The fast path (single CN, single scope) keeps ShuffleIdx=-1 since there's
exactly one hash-build/hash-join pair with no ambiguity.
Deleted (all zero callers):
- constructShuffleOperatorForJoin (V1) — dead
- constructShuffleArgForGroup (V1) — dead
- compileShuffleJoin (V1) — dead
- compileShuffleGroup (V1) — dead

Renamed (drop V2 suffix):
- constructShuffleOperatorForJoinV2 → constructShuffleOperatorForJoin
- constructShuffleArgForGroupV2 → constructShuffleArgForGroup
- compileShuffleJoinV2 → compileShuffleJoin
- compileShuffleGroupV2 → compileShuffleGroup
- Test helpers: newCompileForShuffleGroupV2Test etc → newCompileForShuffleGroup etc
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/enhancement kind/feature size/M Denotes a PR that changes [100,499] lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants