Skip to content

Commit c0194c9

Browse files
committed
IGNITE-28352 Calcite. User defined sql function miss entries are written under the same tx lock
1 parent 09857d4 commit c0194c9

10 files changed

Lines changed: 352 additions & 7 deletions

File tree

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ private ExecutionContext<?> baseInboxContext(UUID nodeId, UUID qryId, long fragm
361361
NoOpIoTracker.INSTANCE,
362362
0,
363363
ImmutableMap.of(),
364+
null,
364365
null);
365366
}
366367
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,15 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
145145
/** */
146146
private Object[] correlations = new Object[16];
147147

148+
/** Modified entries holder. */
149+
@Nullable private final TxAwareModifiedEntriesHolder mofiedEntriesHolder;
150+
148151
/**
149152
* @param qctx Parent base query context.
150153
* @param qryId Query ID.
151154
* @param fragmentDesc Partitions information.
152155
* @param params Parameters.
156+
* @param mofiedEntriesHolder Modified entries holder.
153157
*/
154158
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
155159
public ExecutionContext(
@@ -166,7 +170,8 @@ public ExecutionContext(
166170
IoTracker ioTracker,
167171
long timeout,
168172
Map<String, Object> params,
169-
@Nullable Collection<QueryTxEntry> qryTxEntries
173+
@Nullable Collection<QueryTxEntry> qryTxEntries,
174+
@Nullable TxAwareModifiedEntriesHolder mofiedEntriesHolder
170175
) {
171176
super(qctx);
172177

@@ -183,6 +188,7 @@ public ExecutionContext(
183188
this.params = params;
184189
this.timeout = timeout;
185190
this.qryTxEntries = qryTxEntries;
191+
this.mofiedEntriesHolder = mofiedEntriesHolder;
186192

187193
startTs = U.currentTimeMillis();
188194

@@ -421,12 +427,19 @@ public void execute(RunnableX task, Consumer<Throwable> onError) {
421427

422428
executor.execute(qryId, fragmentId(), () -> {
423429
try {
430+
if (mofiedEntriesHolder != null)
431+
mofiedEntriesHolder.store(qryTxEntries);
432+
424433
if (!isCancelled())
425434
task.run();
426435
}
427436
catch (Throwable e) {
428437
onError.accept(e);
429438
}
439+
finally {
440+
if (mofiedEntriesHolder != null)
441+
mofiedEntriesHolder.detach();
442+
}
430443
});
431444
}
432445

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.ignite.internal.processors.query.calcite.exec;
1919

20+
import java.util.Collection;
2021
import java.util.Collections;
2122
import java.util.Iterator;
2223
import java.util.List;
@@ -37,6 +38,7 @@
3738
import org.apache.ignite.cache.query.QueryCancelledException;
3839
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
3940
import org.apache.ignite.cluster.ClusterNode;
41+
import org.apache.ignite.configuration.TransactionConfiguration;
4042
import org.apache.ignite.events.CacheQueryReadEvent;
4143
import org.apache.ignite.events.EventType;
4244
import org.apache.ignite.internal.GridKernalContext;
@@ -78,6 +80,7 @@
7880
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
7981
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
8082
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
83+
import org.apache.ignite.internal.processors.query.calcite.message.QueryTxEntry;
8184
import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
8285
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
8386
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
@@ -205,6 +208,13 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
205208
/** */
206209
private final Map<String, FragmentPlan> fragmentPlanCache = new GridBoundedConcurrentLinkedHashMap<>(1024);
207210

211+
/**
212+
* Transaction modified entries holder.
213+
*
214+
* @see TransactionConfiguration#isTxAwareQueriesEnabled()
215+
*/
216+
private TxAwareModifiedEntriesHolder mofiedEntriesHolder;
217+
208218
/**
209219
* @param ctx Kernal.
210220
*/
@@ -477,13 +487,15 @@ public void injectService(InjectResourcesService injectSvc) {
477487
eventManager().addDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
478488

479489
iteratorsHolder().init();
490+
mofiedEntriesHolder = new TxAwareModifiedEntriesHolder(U.isTxAwareQueriesEnabled(ctx));
480491
}
481492

482493
/** {@inheritDoc} */
483494
@Override public void tearDown() {
484495
eventManager().removeDiscoveryEventListener(discoLsnr, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
485496

486497
iteratorsHolder().tearDown();
498+
mofiedEntriesHolder = null;
487499
}
488500

489501
/** */
@@ -515,7 +527,8 @@ private FragmentPlan prepareFragment(BaseQueryContext ctx, String jsonFragment)
515527
case DML:
516528
ListFieldsQueryCursor<?> cur = mapAndExecutePlan(
517529
qry,
518-
(MultiStepPlan)plan
530+
(MultiStepPlan)plan,
531+
mofiedEntriesHolder
519532
);
520533

521534
cur.iterator().hasNext();
@@ -525,7 +538,8 @@ private FragmentPlan prepareFragment(BaseQueryContext ctx, String jsonFragment)
525538
case QUERY:
526539
return mapAndExecutePlan(
527540
qry,
528-
(MultiStepPlan)plan
541+
(MultiStepPlan)plan,
542+
mofiedEntriesHolder
529543
);
530544

531545
case EXPLAIN:
@@ -577,7 +591,8 @@ private FieldsQueryCursor<List<?>> executeDdl(RootQuery<Row> qry, DdlPlan plan)
577591
/** */
578592
private ListFieldsQueryCursor<?> mapAndExecutePlan(
579593
RootQuery<Row> qry,
580-
MultiStepPlan plan
594+
MultiStepPlan plan,
595+
TxAwareModifiedEntriesHolder mofiedEntriesHolder
581596
) {
582597
qry.mapping();
583598

@@ -626,6 +641,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
626641
MemoryTracker qryMemoryTracker = qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
627642

628643
final GridNearTxLocal userTx = Commons.queryTransaction(qry.context(), ctx.cache().context());
644+
final Collection<QueryTxEntry> writeEntrs = userTx == null ?
645+
mofiedEntriesHolder.retrieve() : ExecutionContext.transactionChanges(userTx.writeEntries());
629646

630647
ExecutionContext<Row> ectx = new ExecutionContext<>(
631648
qry.context(),
@@ -641,7 +658,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
641658
createIoTracker(locNodeId, qry.localQueryId()),
642659
timeout,
643660
qryParams,
644-
userTx == null ? null : ExecutionContext.transactionChanges(userTx.writeEntries()));
661+
writeEntrs,
662+
mofiedEntriesHolder);
645663

646664
Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
647665
exchangeService(), failureProcessor()).go(fragment.root());
@@ -901,7 +919,8 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
901919
createIoTracker(nodeId, msg.originatingQueryId()),
902920
msg.timeout(),
903921
Commons.parametersMap(msg.parameters()),
904-
msg.queryTransactionEntries()
922+
msg.queryTransactionEntries(),
923+
null
905924
);
906925

907926
executeFragment(qry, fragmentPlan, ectx);
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.ignite.internal.processors.query.calcite.exec;
18+
19+
import java.util.Collection;
20+
import java.util.Collections;
21+
import org.apache.ignite.internal.processors.query.calcite.message.QueryTxEntry;
22+
import org.jetbrains.annotations.Nullable;
23+
24+
/** Per thread modified entries holder. */
25+
public class TxAwareModifiedEntriesHolder {
26+
/** Transaction modified entries holder. */
27+
@Nullable private final ThreadLocal<Collection<QueryTxEntry>> holder;
28+
29+
/** */
30+
public TxAwareModifiedEntriesHolder(boolean txAware) {
31+
if (txAware)
32+
holder = new ThreadLocal<>();
33+
else
34+
holder = null;
35+
}
36+
37+
/** Store entries if applicable. */
38+
public void store(Collection<QueryTxEntry> items) {
39+
if (holder != null)
40+
holder.set(items);
41+
}
42+
43+
/** Retirieve entries if applicable. */
44+
public Collection<QueryTxEntry> retrieve() {
45+
if (holder != null)
46+
return holder.get();
47+
else
48+
return Collections.emptyList();
49+
}
50+
51+
/** Detach stored entries. */
52+
public void detach() {
53+
if (holder != null)
54+
holder.remove();
55+
}
56+
}

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public class LogicalRelImplementorTest extends GridCommonAbstractTest {
133133
NoOpIoTracker.INSTANCE,
134134
0,
135135
null,
136+
null,
136137
null
137138
) {
138139
@Override public ColocationGroup group(long srcId) {

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeSortedIndexTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ private RuntimeSortedIndex<Object[]> generate(RelDataType rowType, final List<In
121121
NoOpIoTracker.INSTANCE,
122122
0,
123123
null,
124+
null,
124125
null),
125126
RelCollations.of(ImmutableIntList.copyOf(idxCols)),
126127
(o1, o2) -> {

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ protected ExecutionContext<Object[]> executionContext(UUID nodeId, UUID qryId, l
362362
NoOpIoTracker.INSTANCE,
363363
0,
364364
ImmutableMap.of(),
365+
null,
365366
null
366367
);
367368
}

0 commit comments

Comments
 (0)