Skip to content

Commit 0c358d2

Browse files
committed
fix: Include non-replica nodes in LWT PRESERVE_REPLICA_ORDER query plan
The PreserveReplicaOrderIterator in TokenAwarePolicy previously returned only replica nodes in the query plan. When replicas were unavailable (e.g. prepared statements before parameter binding, or replicas going down after plan construction), the query plan could be empty or insufficient, causing "No node was available" errors. The fix adds a third pass that appends non-replica nodes from the child policy after all replicas have been returned. When no replicas are available at all (all DOWN/IGNORED), the full child policy fallback is preserved. Fixes #833
1 parent 1245eca commit 0c358d2

2 files changed

Lines changed: 226 additions & 41 deletions

File tree

driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@
3838
import java.util.ArrayList;
3939
import java.util.Collection;
4040
import java.util.Collections;
41+
import java.util.HashSet;
4142
import java.util.Iterator;
4243
import java.util.List;
44+
import java.util.Set;
4345
import java.util.concurrent.ThreadLocalRandom;
4446

4547
/**
@@ -70,20 +72,18 @@
7072
* <h3>Lightweight Transaction (LWT) Routing</h3>
7173
*
7274
* <p>For {@linkplain Statement#isLWT() lightweight transaction} queries, this policy provides
73-
* specialized replica-only routing to optimize LWT performance and avoid contention. When LWT
74-
* routing is enabled (the default), the query plan contains <strong>only replicas</strong> for the
75-
* target partition, ordered by datacenter locality:
75+
* specialized routing to optimize LWT performance and avoid contention. When LWT routing is enabled
76+
* (the default), the query plan prioritizes replicas for the target partition, ordered by
77+
* datacenter locality, followed by non-replica nodes for failover:
7678
*
7779
* <ul>
7880
* <li>Local replicas first: replicas for which the child policy reports {@link HostDistance#LOCAL
7981
* LOCAL} distance are returned first, in the order provided by cluster metadata (preserving
8082
* primary replica ordering from the token ring).
8183
* <li>Remote replicas second: remaining replicas (typically in remote datacenters) are appended,
8284
* but only if they are up and not ignored by the child policy.
83-
* <li>Replica-only routing when possible: under normal conditions, LWT query plans target only
84-
* replicas for the partition in order to reduce coordinator forwarding overhead and improve
85-
* performance. When replica information is unavailable, the driver falls back to the child
86-
* policy as described in the fallback behavior below, which may include non-replica hosts.
85+
* <li>Non-replica nodes: remaining nodes from the child policy's query plan are appended after
86+
* all replicas, ensuring the query plan always includes all available nodes for failover.
8787
* </ul>
8888
*
8989
* <p><strong>Rack awareness</strong> is intentionally <em>not</em> applied to LWT replica ordering.
@@ -243,36 +243,38 @@ protected Host computeNext() {
243243

244244
/**
245245
* An iterator that returns replicas first, with local replicas prioritized (preserving primary
246-
* replica order), then remote replicas. Used for LWT queries to ensure replica-only routing and
247-
* minimize coordinator forwarding overhead. DOWN and IGNORED hosts are filtered out.
246+
* replica order), then remote replicas, then non-replica nodes from the child policy. DOWN and
247+
* IGNORED hosts are filtered out from replicas.
248248
*
249-
* <p>Query plan follows a three-pass strategy:
249+
* <p>Query plan follows a four-pass strategy:
250250
*
251251
* <ol>
252252
* <li><strong>Local replicas:</strong> Returns UP replicas marked as LOCAL by the child policy,
253253
* in the order provided by cluster metadata (preserving primary replica order).
254254
* <li><strong>Remote replicas:</strong> Returns UP replicas marked as REMOTE by the child
255255
* policy.
256-
* <li><strong>Child policy fallback:</strong> If no suitable replicas are available (for
257-
* example, all are DOWN or IGNORED and thus none are returned), falls back to the child
258-
* policy's query plan for the remaining hosts. The child policy's plan is used as-is and
259-
* may include hosts that were already considered by this iterator.
256+
* <li><strong>Non-replica nodes:</strong> Returns remaining nodes from the child policy's query
257+
* plan, skipping any hosts already returned as replicas. This ensures all available nodes
258+
* are included in the query plan for failover.
259+
* <li><strong>Child policy fallback:</strong> If no suitable replicas were returned at all (for
260+
* example, all are DOWN or IGNORED), falls back to the child policy's full query plan.
260261
* </ol>
261262
*/
262263
private class PreserveReplicaOrderIterator extends AbstractIterator<Host> {
263264
private final Iterator<Host> replicasIterator;
265+
private final List<Host> replicas;
264266
private final String keyspace;
265267
private final Statement statement;
266268
private List<Host> nonLocalReplicas;
267269
private Iterator<Host> nonLocalReplicasIterator;
268-
private boolean hasReturnedReplicas;
270+
private Set<Host> returnedHosts;
269271
private Iterator<Host> childIterator;
270272

271-
public PreserveReplicaOrderIterator(
272-
String keyspace, Statement statement, Iterator<Host> replicasIterator) {
273+
public PreserveReplicaOrderIterator(String keyspace, Statement statement, List<Host> replicas) {
273274
this.keyspace = keyspace;
274275
this.statement = statement;
275-
this.replicasIterator = replicasIterator;
276+
this.replicas = replicas;
277+
this.replicasIterator = replicas.iterator();
276278
}
277279

278280
@Override
@@ -289,7 +291,8 @@ protected Host computeNext() {
289291

290292
switch (distance) {
291293
case LOCAL:
292-
hasReturnedReplicas = true;
294+
if (returnedHosts == null) returnedHosts = new HashSet<>();
295+
returnedHosts.add(host);
293296
return host;
294297
case REMOTE:
295298
// Collect remote replicas for second pass
@@ -307,21 +310,31 @@ protected Host computeNext() {
307310
if (nonLocalReplicasIterator == null) {
308311
nonLocalReplicasIterator = nonLocalReplicas.iterator();
309312
}
310-
if (nonLocalReplicasIterator.hasNext()) {
311-
hasReturnedReplicas = true;
312-
return nonLocalReplicasIterator.next();
313+
while (nonLocalReplicasIterator.hasNext()) {
314+
Host host = nonLocalReplicasIterator.next();
315+
if (returnedHosts == null) returnedHosts = new HashSet<>();
316+
returnedHosts.add(host);
317+
return host;
313318
}
314319
}
315320

316-
// Third pass: fallback to child policy if no suitable replicas were returned
317-
// This handles cases where all replicas are empty, DOWN or IGNORED
318-
if (!hasReturnedReplicas) {
319-
if (childIterator == null) {
320-
childIterator = childPolicy.newQueryPlan(keyspace, statement);
321+
// Third pass: return remaining nodes from child policy
322+
if (childIterator == null) {
323+
childIterator = childPolicy.newQueryPlan(keyspace, statement);
324+
}
325+
while (childIterator.hasNext()) {
326+
Host host = childIterator.next();
327+
// Skip hosts we already returned as replicas
328+
if (returnedHosts != null && returnedHosts.contains(host)) {
329+
continue;
321330
}
322-
if (childIterator.hasNext()) {
323-
return childIterator.next();
331+
// If we returned some replicas, skip remaining replicas from child policy
332+
// to avoid duplicates. If no replicas were returned (all DOWN/IGNORED),
333+
// allow full child policy fallback including replica hosts.
334+
if (returnedHosts != null && replicas.contains(host)) {
335+
continue;
324336
}
337+
return host;
325338
}
326339

327340
return endOfData();
@@ -477,7 +490,10 @@ private Iterator<Host> newQueryPlanRegular(
477490

478491
private Iterator<Host> newQueryPlanPreserveReplicaOrder(
479492
String keyspace, Statement statement, List<Host> replicas) {
480-
return new PreserveReplicaOrderIterator(keyspace, statement, replicas.iterator());
493+
if (replicas.isEmpty()) {
494+
return childPolicy.newQueryPlan(keyspace, statement);
495+
}
496+
return new PreserveReplicaOrderIterator(keyspace, statement, replicas);
481497
}
482498

483499
@Override

0 commit comments

Comments
 (0)