Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 9 additions & 26 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ index 7af826583bd..3c3def1eb67 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 44c8cb92fc3..e29cb93ecda 100644
index 44c8cb92fc3..f098beeca26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand Down Expand Up @@ -825,19 +825,7 @@ index 44c8cb92fc3..e29cb93ecda 100644
checkAnswer(shjNonCodegenDF, Seq.empty)
}
}
@@ -1470,7 +1491,10 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
"/*+ BROADCAST(t2) */ t1.k as k"
}
val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan
- assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
+ assert(collect(plan) {
+ case _: BroadcastNestedLoopJoinExec | _: CometBroadcastNestedLoopJoinExec =>
+ true
+ }.size === 1)
// No extra shuffle before aggregation
assert(collect(plan) { case _: ShuffleExchangeExec => true }.size === 0)
}
@@ -1486,7 +1510,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1486,7 +1507,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
// Have shuffle before aggregation
Expand All @@ -847,17 +835,12 @@ index 44c8cb92fc3..e29cb93ecda 100644
}

def getJoinQuery(selectExpr: String, joinType: String): String = {
@@ -1514,10 +1539,16 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
"/*+ BROADCAST(right_t) */ k1 as k0"
@@ -1515,9 +1537,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
- assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 3)
+ assert(collect(plan) {
+ case _: BroadcastNestedLoopJoinExec | _: CometBroadcastNestedLoopJoinExec =>
+ true
+ }.size === 1)
+ assert(collect(plan) {
+ case _: SortMergeJoinExec => true
+ case _: CometSortMergeJoinExec => true
+ }.size === 3)
Expand All @@ -867,7 +850,7 @@ index 44c8cb92fc3..e29cb93ecda 100644
}

// Test output ordering is not preserved
@@ -1526,9 +1557,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1526,9 +1551,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
Expand All @@ -882,7 +865,7 @@ index 44c8cb92fc3..e29cb93ecda 100644
}

// Test singe partition
@@ -1538,7 +1572,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1538,7 +1566,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
|FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
|""".stripMargin)
val plan = fullJoinDF.queryExecution.executedPlan
Expand All @@ -892,7 +875,7 @@ index 44c8cb92fc3..e29cb93ecda 100644
checkAnswer(fullJoinDF, Row(100))
}
}
@@ -1611,6 +1646,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1611,6 +1640,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
Seq(semiJoinDF, antiJoinDF).foreach { df =>
assert(collect(df.queryExecution.executedPlan) {
case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true
Expand All @@ -902,7 +885,7 @@ index 44c8cb92fc3..e29cb93ecda 100644
}.size == 1)
}
}
@@ -1655,14 +1693,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1655,14 +1687,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan

test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
Expand All @@ -925,7 +908,7 @@ index 44c8cb92fc3..e29cb93ecda 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
@@ -1798,7 +1842,8 @@ class ThreadLeakInSortMergeJoinSuite
@@ -1798,7 +1836,8 @@ class ThreadLeakInSortMergeJoinSuite
sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
}

Expand Down
35 changes: 9 additions & 26 deletions dev/diffs/4.0.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ index 53e47f428c3..a55d8f0c161 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index aaac0ebc9aa..276c592ec88 100644
index aaac0ebc9aa..fbef0774d46 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand Down Expand Up @@ -972,19 +972,7 @@ index aaac0ebc9aa..276c592ec88 100644
checkAnswer(shjNonCodegenDF, Seq.empty)
}
}
@@ -1473,7 +1494,10 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
"/*+ BROADCAST(t2) */ t1.k as k"
}
val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan
- assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
+ assert(collect(plan) {
+ case _: BroadcastNestedLoopJoinExec | _: CometBroadcastNestedLoopJoinExec =>
+ true
+ }.size === 1)
// No extra shuffle before aggregation
assert(collect(plan) { case _: ShuffleExchangeExec => true }.size === 0)
}
@@ -1489,7 +1513,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1489,7 +1510,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
// Have shuffle before aggregation
Expand All @@ -994,17 +982,12 @@ index aaac0ebc9aa..276c592ec88 100644
}

def getJoinQuery(selectExpr: String, joinType: String): String = {
@@ -1517,10 +1542,16 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
"/*+ BROADCAST(right_t) */ k1 as k0"
@@ -1518,9 +1540,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
- assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 3)
+ assert(collect(plan) {
+ case _: BroadcastNestedLoopJoinExec | _: CometBroadcastNestedLoopJoinExec =>
+ true
+ }.size === 1)
+ assert(collect(plan) {
+ case _: SortMergeJoinExec => true
+ case _: CometSortMergeJoinExec => true
+ }.size === 3)
Expand All @@ -1014,7 +997,7 @@ index aaac0ebc9aa..276c592ec88 100644
}

// Test output ordering is not preserved
@@ -1529,9 +1560,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1529,9 +1554,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
Expand All @@ -1029,7 +1012,7 @@ index aaac0ebc9aa..276c592ec88 100644
}

// Test singe partition
@@ -1541,7 +1575,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1541,7 +1569,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
|FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
|""".stripMargin)
val plan = fullJoinDF.queryExecution.executedPlan
Expand All @@ -1039,7 +1022,7 @@ index aaac0ebc9aa..276c592ec88 100644
checkAnswer(fullJoinDF, Row(100))
}
}
@@ -1614,6 +1649,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1614,6 +1643,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
Seq(semiJoinDF, antiJoinDF).foreach { df =>
assert(collect(df.queryExecution.executedPlan) {
case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true
Expand All @@ -1049,7 +1032,7 @@ index aaac0ebc9aa..276c592ec88 100644
}.size == 1)
}
}
@@ -1658,14 +1696,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1658,14 +1690,20 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan

test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
Expand All @@ -1072,7 +1055,7 @@ index aaac0ebc9aa..276c592ec88 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
@@ -1801,7 +1845,8 @@ class ThreadLeakInSortMergeJoinSuite
@@ -1801,7 +1839,8 @@ class ThreadLeakInSortMergeJoinSuite
sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
}

Expand Down
20 changes: 2 additions & 18 deletions dev/diffs/4.1.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1043,18 +1043,6 @@ index 885512d4d19..113ae17ad9f 100644
checkAnswer(shjNonCodegenDF, Seq.empty)
}
}
@@ -1485,7 +1507,10 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
"/*+ BROADCAST(t2) */ t1.k as k"
}
val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan
- assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
+ assert(collect(plan) {
+ case _: BroadcastNestedLoopJoinExec | _: CometBroadcastNestedLoopJoinExec =>
+ true
+ }.size === 1)
// No extra shuffle before aggregation
assert(collect(plan) { case _: ShuffleExchangeExec => true }.size === 0)
}
@@ -1501,7 +1523,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
Expand All @@ -1065,16 +1053,12 @@ index 885512d4d19..113ae17ad9f 100644
}

def getJoinQuery(selectExpr: String, joinType: String): String = {
@@ -1530,10 +1556,16 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1530,9 +1553,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
- assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 3)
+ assert(collect(plan) {
+ case _: BroadcastNestedLoopJoinExec | _: CometBroadcastNestedLoopJoinExec =>
+ true
+ }.size === 1)
+ assert(collect(plan) {
+ case _: SortMergeJoinExec => true
+ case _: CometSortMergeJoinExec => true
+ }.size === 3)
Expand Down
1 change: 0 additions & 1 deletion docs/source/user-guide/latest/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ not supported by Comet will fall back to regular Spark execution.
| BatchScanExec | Yes | Supports Parquet files and Apache Iceberg Parquet scans. See the [Comet Compatibility Guide] for more information. |
| BroadcastExchangeExec | Yes | |
| BroadcastHashJoinExec | Yes | |
| BroadcastNestedLoopJoinExec | Yes | Falls back to Spark when the preserved side is broadcast (e.g. LEFT OUTER with BROADCAST on the left). |
| ExpandExec | Yes | |
| FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. |
| FilterExec | Yes | |
Expand Down
31 changes: 15 additions & 16 deletions docs/source/user-guide/latest/understanding-comet-plans.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,22 +202,21 @@ by role. Names match what is shown in the plan output.
These run natively in DataFusion. When several appear consecutively in a plan,
they execute as a single fused native block.

| Node | Spark equivalent |
| ------------------------------ | ----------------------------------------------- |
| `CometProject` | `ProjectExec` |
| `CometFilter` | `FilterExec` |
| `CometSort` | `SortExec` |
| `CometLocalLimit` | `LocalLimitExec` |
| `CometGlobalLimit` | `GlobalLimitExec` |
| `CometExpand` | `ExpandExec` |
| `CometExplode` | `GenerateExec` (for `explode` and `posexplode`) |
| `CometHashAggregate` | `HashAggregateExec`, `ObjectHashAggregateExec` |
| `CometHashJoin` | `ShuffledHashJoinExec` |
| `CometBroadcastHashJoin` | `BroadcastHashJoinExec` |
| `CometBroadcastNestedLoopJoin` | `BroadcastNestedLoopJoinExec` |
| `CometSortMergeJoin` | `SortMergeJoinExec` |
| `CometWindow` | `WindowExec` |
| `CometTakeOrderedAndProject` | `TakeOrderedAndProjectExec` |
| Node | Spark equivalent |
| ---------------------------- | ----------------------------------------------- |
| `CometProject` | `ProjectExec` |
| `CometFilter` | `FilterExec` |
| `CometSort` | `SortExec` |
| `CometLocalLimit` | `LocalLimitExec` |
| `CometGlobalLimit` | `GlobalLimitExec` |
| `CometExpand` | `ExpandExec` |
| `CometExplode` | `GenerateExec` (for `explode` and `posexplode`) |
| `CometHashAggregate` | `HashAggregateExec`, `ObjectHashAggregateExec` |
| `CometHashJoin` | `ShuffledHashJoinExec` |
| `CometBroadcastHashJoin` | `BroadcastHashJoinExec` |
| `CometSortMergeJoin` | `SortMergeJoinExec` |
| `CometWindow` | `WindowExec` |
| `CometTakeOrderedAndProject` | `TakeOrderedAndProjectExec` |

### JVM-Side Operators

Expand Down
1 change: 0 additions & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ fn op_name(op: &OpStruct) -> &'static str {
OpStruct::Explode(_) => "Explode",
OpStruct::CsvScan(_) => "CsvScan",
OpStruct::ShuffleScan(_) => "ShuffleScan",
OpStruct::BroadcastNestedLoopJoin(_) => "BroadcastNestedLoopJoin",
}
}

Expand Down
52 changes: 0 additions & 52 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ use arrow::row::{OwnedRow, RowConverter, SortField};
use datafusion::common::utils::SingleRowListArrayBuilder;
use datafusion::common::UnnestOptions;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::NestedLoopJoinExec;
use datafusion::physical_plan::limit::GlobalLimitExec;
use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
use datafusion_comet_proto::spark_expression::ListLiteral;
Expand Down Expand Up @@ -1229,57 +1228,6 @@ impl PhysicalPlanner {
Arc::new(SparkPlan::new(spark_plan.plan_id, aggregate, vec![child])),
))
}

OpStruct::BroadcastNestedLoopJoin(bnlj) => {
let (join_params, scans, shuffle_scans) = self.parse_join_parameters(
inputs,
children,
&[],
&[],
bnlj.join_type,
&bnlj.condition,
partition_count,
)?;

let left = Arc::clone(&join_params.left.native_plan);
let right = Arc::clone(&join_params.right.native_plan);

let nested_loop_join = Arc::new(NestedLoopJoinExec::try_new(
left,
right,
join_params.join_filter,
&join_params.join_type,
None,
)?);

if bnlj.build_side == BuildSide::BuildRight as i32 {
let swapped_join = nested_loop_join.as_ref().swap_inputs()?;
let mut additional_native_plans = vec![];
if swapped_join.as_any().is::<ProjectionExec>() {
additional_native_plans.push(Arc::clone(swapped_join.children()[0]));
}
Ok((
scans,
shuffle_scans,
Arc::new(SparkPlan::new_with_additional(
spark_plan.plan_id,
swapped_join,
vec![join_params.left, join_params.right],
additional_native_plans,
)),
))
} else {
Ok((
scans,
shuffle_scans,
Arc::new(SparkPlan::new(
spark_plan.plan_id,
nested_loop_join,
vec![join_params.left, join_params.right],
)),
))
}
}
OpStruct::Limit(limit) => {
assert_eq!(children.len(), 1);
let num = limit.limit;
Expand Down
1 change: 0 additions & 1 deletion native/core/src/execution/planner/operator_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,5 @@ fn get_operator_type(spark_operator: &Operator) -> Option<OperatorType> {
OpStruct::Explode(_) => None, // Not yet in OperatorType enum
OpStruct::CsvScan(_) => Some(OperatorType::CsvScan),
OpStruct::ShuffleScan(_) => None, // Not yet in OperatorType enum
OpStruct::BroadcastNestedLoopJoin(_) => None,
}
}
Loading
Loading