Skip to content

Commit 4c311d1

Browse files
authored
perf: Mark more operators as FFI safe to avoid deep copies (#3765)
* mark more operators as ffi safe * convert CometLocalTableScanExec to extend CometSink * scalastyle
1 parent a5a9f44 commit 4c311d1

4 files changed

Lines changed: 10 additions & 28 deletions

File tree

spark/src/main/scala/org/apache/comet/serde/operator/CometSink.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.comet.serde.QueryPlanSerde.{serializeDataType, supportedDataTy
4141
abstract class CometSink[T <: SparkPlan] extends CometOperatorSerde[T] {
4242

4343
/** Whether the data produced by the Comet operator is FFI safe */
44-
def isFfiSafe: Boolean = false
44+
def isFfiSafe: Boolean = true
4545

4646
override def enabledConfig: Option[ConfigEntry[Boolean]] = None
4747

@@ -90,8 +90,6 @@ abstract class CometSink[T <: SparkPlan] extends CometOperatorSerde[T] {
9090

9191
object CometExchangeSink extends CometSink[SparkPlan] {
9292

93-
override def isFfiSafe: Boolean = true
94-
9593
override def convert(
9694
op: SparkPlan,
9795
builder: Operator.Builder,

spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -279,13 +279,6 @@ case class CometBroadcastExchangeExec(
279279

280280
object CometBroadcastExchangeExec extends CometSink[BroadcastExchangeExec] {
281281

282-
/**
283-
* Exchange data is FFI safe because there is no use of mutable buffers involved.
284-
*
285-
* Source of broadcast exchange batches is ArrowStreamReader.
286-
*/
287-
override def isFfiSafe: Boolean = true
288-
289282
override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
290283
CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED)
291284

spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.apache.spark.sql.comet
2121

22-
import scala.jdk.CollectionConverters._
23-
2422
import org.apache.spark.TaskContext
2523
import org.apache.spark.rdd.RDD
2624
import org.apache.spark.sql.catalyst.InternalRow
@@ -34,9 +32,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
3432
import com.google.common.base.Objects
3533

3634
import org.apache.comet.{CometConf, ConfigEntry}
37-
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
3835
import org.apache.comet.serde.OperatorOuterClass.Operator
39-
import org.apache.comet.serde.QueryPlanSerde.serializeDataType
36+
import org.apache.comet.serde.operator.CometSink
4037

4138
case class CometLocalTableScanExec(
4239
originalPlan: LocalTableScanExec,
@@ -106,24 +103,14 @@ case class CometLocalTableScanExec(
106103
override def hashCode(): Int = Objects.hashCode(originalPlan, originalPlan.schema, output)
107104
}
108105

109-
object CometLocalTableScanExec extends CometOperatorSerde[LocalTableScanExec] {
106+
object CometLocalTableScanExec extends CometSink[LocalTableScanExec] {
107+
108+
// uses CometArrowConverters, which re-uses arrays
109+
override def isFfiSafe: Boolean = false
110110

111111
override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
112112
CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED)
113113

114-
override def convert(
115-
op: LocalTableScanExec,
116-
builder: Operator.Builder,
117-
childOp: Operator*): Option[Operator] = {
118-
val scanTypes = op.output.flatten(attr => serializeDataType(attr.dataType))
119-
val scanBuilder = OperatorOuterClass.Scan
120-
.newBuilder()
121-
.setSource(op.getClass.getSimpleName)
122-
.addAllFields(scanTypes.asJava)
123-
.setArrowFfiSafe(false)
124-
Some(builder.setScan(scanBuilder).build())
125-
}
126-
127114
override def createExec(nativeOp: Operator, op: LocalTableScanExec): CometNativeExec = {
128115
CometScanWrapper(nativeOp, CometLocalTableScanExec(op, op.rows, op.output))
129116
}

spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ case class CometSparkToColumnarExec(child: SparkPlan)
139139
}
140140

141141
object CometSparkToColumnarExec extends CometSink[SparkPlan] with DataTypeSupport {
142+
143+
// uses CometArrowConverters, which re-uses arrays
144+
override def isFfiSafe: Boolean = false
145+
142146
override def createExec(
143147
nativeOp: OperatorOuterClass.Operator,
144148
op: SparkPlan): CometNativeExec = {

0 commit comments

Comments
 (0)