Skip to content

Commit 3c3238c

Browse files
authored
DRILL-8545: Disable HashAgg for collect_to_list_varchar due to ordering requirements (#3042)
1 parent e1a06f3 commit 3c3238c

3 files changed

Lines changed: 89 additions & 37 deletions

File tree

exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharAggFunction.java renamed to exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharFunction.java

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,40 +29,47 @@
2929
/**
3030
* Aggregate function which collects incoming VarChar column values into the list.
3131
*/
32-
@FunctionTemplate(name = "collect_to_list_varchar",
33-
scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE,
34-
isInternal = true)
35-
public class CollectToListVarcharAggFunction implements DrillAggFunc {
32+
public class CollectToListVarcharFunction {
33+
public static final String NAME = "collect_to_list_varchar";
3634

37-
@Param NullableVarCharHolder input;
38-
@Output BaseWriter.ComplexWriter writer;
39-
@Workspace ObjectHolder writerHolder;
35+
@FunctionTemplate(name = NAME,
36+
scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE,
37+
isInternal = true)
38+
public static class CollectToListVarcharAggFunction implements DrillAggFunc {
4039

41-
@Override
42-
public void setup() {
43-
writerHolder = new ObjectHolder();
44-
}
40+
@Param NullableVarCharHolder input;
41+
@Output BaseWriter.ComplexWriter writer;
42+
@Workspace ObjectHolder writerHolder;
43+
44+
private CollectToListVarcharAggFunction() {
45+
}
4546

46-
@Override
47-
public void add() {
48-
org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter listWriter;
49-
if (writerHolder.obj == null) {
50-
writerHolder.obj = writer.rootAsList();
47+
@Override
48+
public void setup() {
49+
writerHolder = new ObjectHolder();
5150
}
5251

53-
listWriter = (org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter) writerHolder.obj;
52+
@Override
53+
public void add() {
54+
org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter listWriter;
55+
if (writerHolder.obj == null) {
56+
writerHolder.obj = writer.rootAsList();
57+
}
58+
59+
listWriter = (org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter) writerHolder.obj;
5460

55-
if (input.isSet > 0) {
56-
listWriter.varChar().writeVarChar(input.start, input.end, input.buffer);
61+
if (input.isSet > 0) {
62+
listWriter.varChar().writeVarChar(input.start, input.end, input.buffer);
63+
}
5764
}
58-
}
5965

60-
@Override
61-
public void output() {
62-
}
66+
@Override
67+
public void output() {
68+
}
6369

64-
@Override
65-
public void reset() {
66-
writerHolder.obj = null;
70+
@Override
71+
public void reset() {
72+
writerHolder.obj = null;
73+
}
6774
}
6875
}

exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
import com.google.common.collect.Lists;
2121
import org.apache.calcite.util.ImmutableBitSet;
22+
import org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction;
2223
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
2324
import org.apache.drill.exec.planner.logical.RelOptHelper;
2425
import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
2526
import org.apache.calcite.rel.InvalidRelException;
2627
import org.apache.calcite.rel.RelNode;
28+
import org.apache.calcite.rel.core.AggregateCall;
2729
import org.apache.calcite.plan.RelOptRule;
2830
import org.apache.calcite.plan.RelOptRuleCall;
2931
import org.apache.calcite.plan.RelTrait;
@@ -58,6 +60,10 @@ public void onMatch(RelOptRuleCall call) {
5860
final DrillAggregateRel aggregate = call.rel(0);
5961
final RelNode input = call.rel(1);
6062

63+
if (hasIncompatibleAggCalls(aggregate)) {
64+
return;
65+
}
66+
6167
if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) {
6268
// currently, don't use HashAggregate if any of the logical aggrs contains DISTINCT or
6369
// if there are no grouping keys
@@ -168,4 +174,35 @@ private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggre
168174
call.transformTo(newAgg);
169175
}
170176

177+
/**
178+
* Evaluates the logical aggregate expressions to determine if any are
179+
* incompatible with the Hash Aggregate physical operator.
180+
* <p>
181+
* While Hash Aggregate is generally performant for many aggregation types,
182+
* certain functions (such as {@code collect_to_list_varchar}) may require specific
183+
* data ordering or memory management patterns that the Hash Aggregate
184+
* implementation does not provide.
185+
* </p>
186+
* <p>
187+
* <b>Current Incompatibilities:</b>
188+
* <ul>
189+
* <li>{@link org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction}: Excluded from
190+
* HashAgg because it requires data ordering and cannot be processed efficiently in an unordered
191+
* fashion. {@code SortAggPrule} is intended to handle this, ensuring deterministic results.
192+
* </li>
193+
* </ul>
194+
* </p>
195+
* @param aggregate The logical aggregate relational nodes containing the
196+
* list of {@link AggregateCall}s to inspect.
197+
* @return {@code true} if at least one aggregation call is incompatible
198+
* with HashAgg; {@code false} otherwise.
199+
*/
200+
private boolean hasIncompatibleAggCalls(DrillAggregateRel aggregate) {
201+
for (AggregateCall aggCall : aggregate.getAggCallList()) {
202+
if (CollectToListVarcharFunction.NAME.equalsIgnoreCase(aggCall.getAggregation().getName())) {
203+
return true;
204+
}
205+
}
206+
return false;
207+
}
171208
}

exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,29 +1101,37 @@ public void testCollectListHashAgg() throws Exception {
11011101
public void testCollectToListVarcharStreamAgg() throws Exception {
11021102
try {
11031103
client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
1104+
client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), true);
11041105
testBuilder()
1105-
.sqlQuery("select collect_to_list_varchar(`date`) as l from " +
1106-
"(select * from cp.`store/json/clicks.json` limit 2)")
1106+
.sqlQuery("select collect_to_list_varchar(`clicks`.`trans_id`) as ids from " +
1107+
"cp.`store/json/clicks.json` clicks group by `clicks`.`user_info`.`device`")
11071108
.unOrdered()
1108-
.baselineColumns("l")
1109-
.baselineValues(listOf("2014-04-26", "2014-04-20"))
1109+
.baselineColumns("ids")
1110+
.baselineValues(listOf("31920", "32383", "32359"))
1111+
.baselineValues(listOf("31026"))
1112+
.baselineValues(listOf("33848"))
11101113
.go();
11111114
} finally {
11121115
client.resetSession(PlannerSettings.HASHAGG.getOptionName());
1116+
client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
11131117
}
11141118
}
11151119

1120+
/**
1121+
* The current implementation of {@link org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction}
1122+
* requires ordered input data. Because the Hash Aggregator does not maintain input order,
1123+
* it looks like there is no efficient way to process these values correctly within that operator
1124+
* by the function. {@code SortAggPrule} is intended to handle this to ensure deterministic
1125+
* results.
1126+
*/
11161127
@Test
11171128
public void testCollectToListVarcharHashAgg() throws Exception {
11181129
try {
1130+
thrown.expect(UserRemoteException.class);
1131+
thrown.expectMessage(containsString("SYSTEM ERROR: CannotPlanException"));
11191132
client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
1120-
testBuilder()
1121-
.sqlQuery("select collect_to_list_varchar(`date`) as l from " +
1122-
"(select * from cp.`store/json/clicks.json` limit 2) group by 'a'")
1123-
.unOrdered()
1124-
.baselineColumns("l")
1125-
.baselineValues(listOf("2014-04-26", "2014-04-20"))
1126-
.go();
1133+
run("select collect_to_list_varchar(`clicks`.`trans_id`) as ids from" +
1134+
" cp.`store/json/clicks.json` clicks group by `clicks`.`user_info`.`device`");
11271135
} finally {
11281136
client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
11291137
}

0 commit comments

Comments
 (0)