3232import com .datastax .oss .driver .internal .core .control .ControlConnection ;
3333import com .datastax .oss .driver .internal .core .util .concurrent .CompletableFutures ;
3434import com .datastax .oss .driver .shaded .guava .common .annotations .VisibleForTesting ;
35+ import com .datastax .oss .driver .shaded .guava .common .collect .ImmutableList ;
3536import com .datastax .oss .driver .shaded .guava .common .collect .ImmutableMap ;
3637import com .datastax .oss .driver .shaded .guava .common .collect .ImmutableSet ;
3738import com .datastax .oss .driver .shaded .guava .common .collect .Iterators ;
@@ -74,6 +75,121 @@ public class DefaultTopologyMonitor implements TopologyMonitor {
7475 private static final String NATIVE_PORT = "native_port" ;
7576 private static final String NATIVE_TRANSPORT_PORT = "native_transport_port" ;
7677
78+ /**
79+ * The columns we actually read from {@code system.local}. Used to intersect with the full column
80+ * list returned by the first {@code SELECT *} response, so that subsequent projected queries only
81+ * fetch columns the driver uses.
82+ *
83+ * <p>Includes DSE-specific columns; absent columns are silently ignored by the intersection step.
84+ */
85+ @ VisibleForTesting
86+ static final ImmutableSet <String > LOCAL_COLUMNS_OF_INTEREST =
87+ ImmutableSet .of (
88+ // Topology / addressing
89+ "broadcast_address" ,
90+ "broadcast_port" ,
91+ "listen_address" ,
92+ "listen_port" ,
93+ "rpc_address" ,
94+ "rpc_port" ,
95+ "native_address" ,
96+ "native_transport_address" ,
97+ "native_transport_port" ,
98+ "native_transport_port_ssl" ,
99+ // Node metadata
100+ "data_center" ,
101+ "rack" ,
102+ "release_version" ,
103+ "tokens" ,
104+ "partitioner" ,
105+ "host_id" ,
106+ "schema_version" ,
107+ // DSE-specific
108+ "dse_version" ,
109+ "graph" ,
110+ "workload" ,
111+ "workloads" ,
112+ "server_id" ,
113+ "storage_port" ,
114+ "storage_port_ssl" ,
115+ "jmx_port" );
116+
117+ /**
118+ * The columns we actually read from {@code system.peers}. Mirrors {@link
119+ * #LOCAL_COLUMNS_OF_INTEREST} but replaces {@code listen_address}/{@code listen_port} with the
120+ * {@code peer} column used as a broadcast-address fallback and peer-row identifier.
121+ */
122+ @ VisibleForTesting
123+ static final ImmutableSet <String > PEERS_COLUMNS_OF_INTEREST =
124+ ImmutableSet .of (
125+ // Peer identifier / broadcast address fallback
126+ "peer" ,
127+ // Topology / addressing
128+ "broadcast_address" ,
129+ "broadcast_port" ,
130+ "rpc_address" ,
131+ "rpc_port" ,
132+ "native_address" ,
133+ "native_transport_address" ,
134+ "native_transport_port" ,
135+ "native_transport_port_ssl" ,
136+ // Node metadata
137+ "data_center" ,
138+ "rack" ,
139+ "release_version" ,
140+ "tokens" ,
141+ "partitioner" ,
142+ "host_id" ,
143+ "schema_version" ,
144+ // DSE-specific
145+ "dse_version" ,
146+ "graph" ,
147+ "workload" ,
148+ "workloads" ,
149+ "server_id" ,
150+ "storage_port" ,
151+ "storage_port_ssl" ,
152+ "jmx_port" );
153+
154+ /**
155+ * The columns we actually read from {@code system.peers_v2} (Cassandra ≥ 4.0). Replaces {@code
156+ * rpc_address} with {@code native_address}/{@code native_port} as the primary RPC endpoint
157+ * columns, and adds {@code peer_port}.
158+ */
159+ @ VisibleForTesting
160+ static final ImmutableSet <String > PEERS_V2_COLUMNS_OF_INTEREST =
161+ ImmutableSet .of (
162+ // Peer identifier
163+ "peer" ,
164+ "peer_port" ,
165+ // Primary RPC endpoint (peers_v2-specific)
166+ "native_address" ,
167+ "native_port" ,
168+ // Topology / addressing
169+ "broadcast_address" ,
170+ "broadcast_port" ,
171+ "rpc_address" ,
172+ "native_transport_address" ,
173+ "native_transport_port" ,
174+ "native_transport_port_ssl" ,
175+ // Node metadata
176+ "data_center" ,
177+ "rack" ,
178+ "release_version" ,
179+ "tokens" ,
180+ "partitioner" ,
181+ "host_id" ,
182+ "schema_version" ,
183+ // DSE-specific
184+ "dse_version" ,
185+ "graph" ,
186+ "workload" ,
187+ "workloads" ,
188+ "server_id" ,
189+ "storage_port" ,
190+ "storage_port_ssl" ,
191+ "jmx_port" );
192+
77193 private final String logPrefix ;
78194 protected final InternalDriverContext context ;
79195 private final ControlConnection controlConnection ;
@@ -84,6 +200,14 @@ public class DefaultTopologyMonitor implements TopologyMonitor {
84200 @ VisibleForTesting volatile boolean isSchemaV2 ;
85201 @ VisibleForTesting volatile int port = -1 ;
86202
203+ // Column name caches: null means "not yet learned — use SELECT *".
204+ // Populated on the first successful response as the intersection of the server's column list
205+ // and the *_COLUMNS_OF_INTEREST set, so subsequent queries project only columns the driver reads.
206+ // Reset to null on reconnect.
207+ private volatile List <String > localColumns = null ;
208+ private volatile List <String > peersColumns = null ;
209+ private volatile List <String > peersV2Columns = null ;
210+
87211 public DefaultTopologyMonitor (InternalDriverContext context ) {
88212 this .logPrefix = context .getSessionName ();
89213 this .context = context ;
@@ -97,6 +221,63 @@ public DefaultTopologyMonitor(InternalDriverContext context) {
97221 this .isSchemaV2 = true ;
98222 }
99223
224+ /**
225+ * Resets all column name caches to null, causing the next query to use {@code SELECT *} and
226+ * re-learn the available columns from the response. Should be called on reconnect.
227+ */
228+ @ Override
229+ public void resetColumnCaches () {
230+ localColumns = null ;
231+ peersColumns = null ;
232+ peersV2Columns = null ;
233+ }
234+
235+ /**
236+ * Returns a new list containing only the elements of {@code serverColumns} that are present in
237+ * {@code needed}, preserving the server-response order. Returns an empty list (never {@code
238+ * null}) if no columns match.
239+ *
240+ * <p>This is used when populating the column caches from a {@code SELECT *} response: rather than
241+ * caching all server columns, we cache only the subset the driver actually reads, so that
242+ * subsequent projected queries skip unused columns (e.g. large collection columns the driver
243+ * never inspects).
244+ */
245+ private static List <String > intersectWithNeeded (
246+ List <String > serverColumns , ImmutableSet <String > needed ) {
247+ return serverColumns .stream ().filter (needed ::contains ).collect (ImmutableList .toImmutableList ());
248+ }
249+
250+ /**
251+ * Builds a {@code SELECT} query string.
252+ *
253+ * @param columns the column names to project, in the order they will appear in the query, or
254+ * {@code null} to use {@code SELECT *}
255+ * @param table the table name (e.g. {@code "system.local"})
256+ * @return the query string without a trailing WHERE clause
257+ */
258+ private String buildQuery (List <String > columns , String table ) {
259+ String projection = (columns == null ) ? "*" : String .join (", " , columns );
260+ return "SELECT " + projection + " FROM " + table ;
261+ }
262+
263+ /**
264+ * Builds a {@code SELECT} query string with a WHERE clause.
265+ *
266+ * @param columns the column names to project, in the order they will appear in the query, or
267+ * {@code null} to use {@code SELECT *}
268+ * @param table the table name
269+ * @param where the WHERE clause (without the {@code WHERE} keyword)
270+ * @return the full query string
271+ */
272+ private String buildQuery (List <String > columns , String table , String where ) {
273+ return buildQuery (columns , table ) + " WHERE " + where ;
274+ }
275+
276+ /** Returns the peers column cache appropriate for the current schema version. */
277+ private List <String > getPeerColumnsCache () {
278+ return isSchemaV2 ? peersV2Columns : peersColumns ;
279+ }
280+
100281 @ Override
101282 public CompletionStage <Void > init () {
102283 if (closeFuture .isDone ()) {
@@ -127,12 +308,12 @@ public CompletionStage<Optional<NodeInfo>> refreshNode(Node node) {
127308 } else if (node .getBroadcastAddress ().isPresent ()) {
128309 CompletionStage <AdminResult > query ;
129310 if (isSchemaV2 ) {
311+ // Use SELECT * for narrow WHERE-clause queries: projecting a single-row result gives
312+ // negligible benefit, and the fixed WHERE form is easier to prime in test infrastructure.
130313 query =
131314 query (
132315 channel ,
133- "SELECT * FROM "
134- + getPeerTableName ()
135- + " WHERE peer = :address and peer_port = :port" ,
316+ buildQuery (null , getPeerTableName (), "peer = :address and peer_port = :port" ),
136317 ImmutableMap .of (
137318 "address" ,
138319 node .getBroadcastAddress ().get ().getAddress (),
@@ -142,12 +323,12 @@ public CompletionStage<Optional<NodeInfo>> refreshNode(Node node) {
142323 query =
143324 query (
144325 channel ,
145- "SELECT * FROM " + getPeerTableName () + " WHERE peer = :address" ,
326+ buildQuery ( null , getPeerTableName (), " peer = :address") ,
146327 ImmutableMap .of ("address" , node .getBroadcastAddress ().get ().getAddress ()));
147328 }
148329 return query .thenApply (result -> firstPeerRowAsNodeInfo (result , localEndPoint ));
149330 } else {
150- return query (channel , "SELECT * FROM " + getPeerTableName ())
331+ return query (channel , buildQuery ( getPeerColumnsCache (), getPeerTableName () ))
151332 .thenApply (result -> findInPeers (result , node .getHostId (), localEndPoint ));
152333 }
153334 }
@@ -160,7 +341,7 @@ public CompletionStage<Optional<NodeInfo>> getNewNodeInfo(InetSocketAddress broa
160341 LOG .debug ("[{}] Fetching info for new node {}" , logPrefix , broadcastRpcAddress );
161342 DriverChannel channel = controlConnection .channel ();
162343 EndPoint localEndPoint = channel .getEndPoint ();
163- return query (channel , "SELECT * FROM " + getPeerTableName ())
344+ return query (channel , buildQuery ( getPeerColumnsCache (), getPeerTableName () ))
164345 .thenApply (result -> findInPeers (result , broadcastRpcAddress , localEndPoint ));
165346 }
166347
@@ -170,9 +351,13 @@ public CompletionStage<NodeInfo> getChannelNodeInfo(DriverChannel channel) {
170351 return CompletableFutures .failedFuture (new IllegalStateException ("closed" ));
171352 }
172353 EndPoint localEndPoint = channel .getEndPoint ();
173- return query (channel , "SELECT * FROM system.local WHERE key='local'" )
354+ return query (channel , buildQuery ( localColumns , " system.local" , " key='local'") )
174355 .thenApply (
175356 result -> {
357+ if (localColumns == null && !result .getColumnNames ().isEmpty ()) {
358+ localColumns =
359+ intersectWithNeeded (result .getColumnNames (), LOCAL_COLUMNS_OF_INTEREST );
360+ }
176361 Iterator <AdminRow > iterator = result .iterator ();
177362 if (!iterator .hasNext ()) {
178363 throw new IllegalStateException (
@@ -197,8 +382,9 @@ public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {
197382 savePort (channel );
198383
199384 CompletionStage <AdminResult > localQuery =
200- query (channel , "SELECT * FROM system.local WHERE key='local'" );
201- CompletionStage <AdminResult > peersV2Query = query (channel , "SELECT * FROM system.peers_v2" );
385+ query (channel , buildQuery (localColumns , "system.local" , "key='local'" ));
386+ CompletionStage <AdminResult > peersV2Query =
387+ query (channel , buildQuery (peersV2Columns , "system.peers_v2" ));
202388 CompletableFuture <AdminResult > peersQuery = new CompletableFuture <>();
203389
204390 peersV2Query .whenComplete (
@@ -215,19 +401,31 @@ public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {
215401 && error .message .contains ("Unknown keyspace/cf pair (system.peers_v2)" ))) {
216402 this .isSchemaV2 = false ; // We should not attempt this query in the future.
217403 CompletableFutures .completeFrom (
218- query (channel , "SELECT * FROM system.peers" ), peersQuery );
404+ query (channel , buildQuery ( peersColumns , " system.peers") ), peersQuery );
219405 return ;
220406 }
221407 }
222408 peersQuery .completeExceptionally (t );
223409 } else {
410+ if (peersV2Columns == null && !r .getColumnNames ().isEmpty ()) {
411+ peersV2Columns =
412+ intersectWithNeeded (r .getColumnNames (), PEERS_V2_COLUMNS_OF_INTEREST );
413+ }
224414 peersQuery .complete (r );
225415 }
226416 });
227417
228418 return localQuery .thenCombine (
229419 peersQuery ,
230420 (controlNodeResult , peersResult ) -> {
421+ if (localColumns == null && !controlNodeResult .getColumnNames ().isEmpty ()) {
422+ localColumns =
423+ intersectWithNeeded (controlNodeResult .getColumnNames (), LOCAL_COLUMNS_OF_INTEREST );
424+ }
425+ if (!isSchemaV2 && peersColumns == null && !peersResult .getColumnNames ().isEmpty ()) {
426+ peersColumns =
427+ intersectWithNeeded (peersResult .getColumnNames (), PEERS_COLUMNS_OF_INTEREST );
428+ }
231429 List <NodeInfo > nodeInfos = new ArrayList <>();
232430 AdminRow localRow = controlNodeResult .iterator ().next ();
233431 InetSocketAddress localBroadcastRpcAddress =
0 commit comments