diff --git a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml
index d7a56c91f597..a4710371d965 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml
+++ b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml
@@ -131,6 +131,10 @@
io
com.google.bqjdbc.shaded.io
+
+ io.opentelemetry.api.*
+ io.opentelemetry.context.*
+
@@ -277,6 +281,16 @@
httpcore5
+
+
+ io.opentelemetry
+ opentelemetry-api
+
+
+ io.opentelemetry
+ opentelemetry-context
+
+
com.google.truth
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java
index 17471e252205..7bb9bfa409c1 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java
@@ -41,6 +41,8 @@
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.http.HttpTransportOptions;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.Tracer;
import java.io.IOException;
import java.io.InputStream;
import java.sql.CallableStatement;
@@ -138,6 +140,9 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
Long connectionPoolSize;
Long listenerPoolSize;
String partnerToken;
+ boolean enableOpenTelemetry;
+ OpenTelemetry customOpenTelemetry;
+ Tracer tracer = OpenTelemetry.noop().getTracer("");
BigQueryConnection(String url) throws IOException {
this(url, DataSource.fromUrl(url));
@@ -242,6 +247,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
this.connectionPoolSize = ds.getConnectionPoolSize();
this.listenerPoolSize = ds.getListenerPoolSize();
this.partnerToken = ds.getPartnerToken();
+ this.enableOpenTelemetry = ds.getEnableOpenTelemetry();
+ this.customOpenTelemetry = ds.getCustomOpenTelemetry();
this.headerProvider = createHeaderProvider();
this.bigQuery = getBigQueryConnection();
@@ -935,6 +942,14 @@ private BigQuery getBigQueryConnection() {
bigQueryOptions.setTransportOptions(this.httpTransportOptions);
}
+ OpenTelemetry openTelemetry =
+ BigQueryJdbcOpenTelemetry.getOpenTelemetry(
+ this.enableOpenTelemetry, this.customOpenTelemetry);
+ if (this.enableOpenTelemetry) {
+ this.tracer = BigQueryJdbcOpenTelemetry.getTracer(openTelemetry);
+ bigQueryOptions.setOpenTelemetryTracer(this.tracer);
+ }
+
BigQueryOptions options = bigQueryOptions.setHeaderProvider(this.headerProvider).build();
options.setDefaultJobCreationMode(
this.useStatelessQueryMode
@@ -1083,4 +1098,8 @@ public CallableStatement prepareCall(
}
return prepareCall(sql);
}
+
+ public Tracer getTracer() {
+ return this.tracer;
+ }
}
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java
index 930fc42af2bc..02bfd3cc164c 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java
@@ -20,6 +20,7 @@
import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException;
import io.grpc.LoadBalancerRegistry;
import io.grpc.internal.PickFirstLoadBalancerProvider;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
@@ -121,9 +122,14 @@ public Connection connect(String url, Properties info) throws SQLException {
LOG.finest("++enter++");
try {
if (acceptsURL(url)) {
- // strip 'jdbc:' from the URL, add any extra properties
+ Properties connectInfo = info == null ? new Properties() : (Properties) info.clone();
+ OpenTelemetry customOpenTelemetry = null;
+ if (connectInfo.containsKey("customOpenTelemetry")) {
+ customOpenTelemetry = (OpenTelemetry) connectInfo.remove("customOpenTelemetry");
+ }
String connectionUri =
- BigQueryJdbcUrlUtility.appendPropertiesToURL(url.substring(5), this.toString(), info);
+ BigQueryJdbcUrlUtility.appendPropertiesToURL(
+ url.substring(5), this.toString(), connectInfo);
try {
BigQueryJdbcUrlUtility.parseUrl(connectionUri);
} catch (BigQueryJdbcRuntimeException e) {
@@ -131,6 +137,7 @@ public Connection connect(String url, Properties info) throws SQLException {
}
DataSource ds = DataSource.fromUrl(connectionUri);
+ ds.setCustomOpenTelemetry(customOpenTelemetry);
// LogLevel
String logLevelStr = ds.getLogLevel();
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java
new file mode 100644
index 000000000000..811158bb7dad
--- /dev/null
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.jdbc;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.Tracer;
+
+public class BigQueryJdbcOpenTelemetry {
+
+ private static final String INSTRUMENTATION_SCOPE_NAME = "com.google.cloud.bigquery.jdbc";
+
+ /**
+ * Initializes or returns the OpenTelemetry instance based on hybrid logic. Prefer
+ * GlobalOpenTelemetry; fallback to an auto-configured GCP exporter if requested.
+ */
+ public static OpenTelemetry getOpenTelemetry(
+ boolean enableOpenTelemetry, OpenTelemetry customOpenTelemetry) {
+ if (!enableOpenTelemetry) {
+ return OpenTelemetry.noop();
+ }
+
+ if (customOpenTelemetry != null) {
+ return customOpenTelemetry;
+ }
+
+ return GlobalOpenTelemetry.get();
+ }
+
+ /** Gets a Tracer for the JDBC driver instrumentation scope. */
+ public static Tracer getTracer(OpenTelemetry openTelemetry) {
+ return openTelemetry.getTracer(INSTRUMENTATION_SCOPE_NAME);
+ }
+}
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java
index 5b89cf27eecf..23040f345dda 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java
@@ -162,6 +162,8 @@ protected boolean removeEldestEntry(Map.Entry> eldes
static final int DEFAULT_SWA_APPEND_ROW_COUNT_VALUE = 1000;
static final String SWA_ACTIVATION_ROW_COUNT_PROPERTY_NAME = "SWA_ActivationRowCount";
static final int DEFAULT_SWA_ACTIVATION_ROW_COUNT_VALUE = 3;
+ static final String ENABLE_OPENTELEMETRY_PROPERTY_NAME = "EnableOpenTelemetry";
+ static final boolean DEFAULT_ENABLE_OPENTELEMETRY_VALUE = false;
private static final BigQueryJdbcCustomLogger LOG =
new BigQueryJdbcCustomLogger(BigQueryJdbcUrlUtility.class.getName());
static final String FILTER_TABLES_ON_DEFAULT_DATASET_PROPERTY_NAME =
@@ -607,6 +609,12 @@ protected boolean removeEldestEntry(Map.Entry> eldes
.setDescription(
"Reason for the request, which is passed as the x-goog-request-reason"
+ " header.")
+ .build(),
+ BigQueryConnectionProperty.newBuilder()
+ .setName(ENABLE_OPENTELEMETRY_PROPERTY_NAME)
+ .setDescription(
+ "Enables or disables OpenTelemetry features in the Driver. Disabled by default.")
+ .setDefaultValue(String.valueOf(DEFAULT_ENABLE_OPENTELEMETRY_VALUE))
.build())));
private static final List NETWORK_PROPERTIES =
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java
index ca579d1d0c1b..9dbb79ea2151 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java
@@ -57,6 +57,12 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.Uninterruptibles;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
import java.lang.ref.ReferenceQueue;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -117,6 +123,7 @@ public class BigQueryStatement extends BigQueryNoOpsStatement {
private int fetchSize;
private String scriptQuery;
private Map extraLabels = new HashMap<>();
+ protected Context otelContext = null;
private BigQueryReadClient bigQueryReadClient = null;
private final BigQuery bigQuery;
@@ -233,40 +240,49 @@ private BigQuerySettings generateBigQuerySettings() {
*/
@Override
public ResultSet executeQuery(String sql) throws SQLException {
- // TODO: write method to return state variables to original state.
LOG.finest("++enter++");
- logQueryExecutionStart(sql);
- try {
- QueryJobConfiguration jobConfiguration =
- setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build());
- runQuery(sql, jobConfiguration);
- } catch (InterruptedException ex) {
- throw new BigQueryJdbcException(ex);
- }
+ return withTracing(
+ "BigQueryStatement.executeQuery",
+ (span) -> {
+ span.setAttribute("db.statement", sql);
+ logQueryExecutionStart(sql);
+ try {
+ QueryJobConfiguration jobConfiguration =
+ setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build());
+ runQuery(sql, jobConfiguration);
+ } catch (InterruptedException ex) {
+ throw new BigQueryJdbcException(ex);
+ }
- if (!isSingularResultSet()) {
- throw new BigQueryJdbcException(
- "Query returned more than one or didn't return any ResultSet.");
- }
- // This contains all the other assertions spec required on this method
- return getCurrentResultSet();
+ if (!isSingularResultSet()) {
+ throw new BigQueryJdbcException(
+ "Query returned more than one or didn't return any ResultSet.");
+ }
+ // This contains all the other assertions spec required on this method
+ return getCurrentResultSet();
+ });
}
@Override
public long executeLargeUpdate(String sql) throws SQLException {
LOG.finest("++enter++");
- logQueryExecutionStart(sql);
- try {
- QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql);
- runQuery(sql, jobConfiguration.build());
- } catch (InterruptedException ex) {
- throw new BigQueryJdbcRuntimeException(ex);
- }
- if (this.currentUpdateCount == -1) {
- throw new BigQueryJdbcException(
- "Update query expected to return affected row count. Double check query type.");
- }
- return this.currentUpdateCount;
+ return withTracing(
+ "BigQueryStatement.executeLargeUpdate",
+ (span) -> {
+ span.setAttribute("db.statement", sql);
+ logQueryExecutionStart(sql);
+ try {
+ QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql);
+ runQuery(sql, jobConfiguration.build());
+ } catch (InterruptedException ex) {
+ throw new BigQueryJdbcRuntimeException(ex);
+ }
+ if (this.currentUpdateCount == -1) {
+ throw new BigQueryJdbcException(
+ "Update query expected to return affected row count. Double check query type.");
+ }
+ return this.currentUpdateCount;
+ });
}
@Override
@@ -288,18 +304,23 @@ int checkUpdateCount(long updateCount) {
@Override
public boolean execute(String sql) throws SQLException {
LOG.finest("++enter++");
- logQueryExecutionStart(sql);
- try {
- QueryJobConfiguration jobConfiguration = getJobConfig(sql).build();
- // If Large Results are enabled, ensure query type is SELECT
- if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) {
- jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration);
- }
- runQuery(sql, jobConfiguration);
- } catch (InterruptedException ex) {
- throw new BigQueryJdbcRuntimeException(ex);
- }
- return getCurrentResultSet() != null;
+ return withTracing(
+ "BigQueryStatement.execute",
+ (span) -> {
+ span.setAttribute("db.statement", sql);
+ logQueryExecutionStart(sql);
+ try {
+ QueryJobConfiguration jobConfiguration = getJobConfig(sql).build();
+ // If Large Results are enabled, ensure query type is SELECT
+ if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) {
+ jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration);
+ }
+ runQuery(sql, jobConfiguration);
+ } catch (InterruptedException ex) {
+ throw new BigQueryJdbcRuntimeException(ex);
+ }
+ return getCurrentResultSet() != null;
+ });
}
StatementType getStatementType(QueryJobConfiguration queryJobConfiguration) throws SQLException {
@@ -810,77 +831,84 @@ Thread populateArrowBufferedQueue(
LOG.finest("++enter++");
Runnable arrowStreamProcessor =
- () -> {
- long rowsRead = 0;
- int retryCount = 0;
- try {
- // Use the first stream to perform reading.
- String streamName = readSession.getStreams(0).getName();
-
- while (true) {
- try {
- ReadRowsRequest readRowsRequest =
- ReadRowsRequest.newBuilder()
- .setReadStream(streamName)
- .setOffset(rowsRead)
- .build();
-
- // Process each block of rows as they arrive and decode using our simple row reader.
- com.google.api.gax.rpc.ServerStream stream =
- bqReadClient.readRowsCallable().call(readRowsRequest);
- for (ReadRowsResponse response : stream) {
- if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
- break;
+ Context.current()
+ .wrap(
+ () -> {
+ long rowsRead = 0;
+ int retryCount = 0;
+ try {
+ // Use the first stream to perform reading.
+ String streamName = readSession.getStreams(0).getName();
+
+ while (true) {
+ try {
+ ReadRowsRequest readRowsRequest =
+ ReadRowsRequest.newBuilder()
+ .setReadStream(streamName)
+ .setOffset(rowsRead)
+ .build();
+
+ // Process each block of rows as they arrive and decode using our simple row
+ // reader.
+ com.google.api.gax.rpc.ServerStream stream =
+ bqReadClient.readRowsCallable().call(readRowsRequest);
+ for (ReadRowsResponse response : stream) {
+ if (Thread.currentThread().isInterrupted()
+ || queryTaskExecutor.isShutdown()) {
+ break;
+ }
+
+ ArrowRecordBatch currentBatch = response.getArrowRecordBatch();
+ Uninterruptibles.putUninterruptibly(
+ arrowBatchWrapperBlockingQueue,
+ BigQueryArrowBatchWrapper.of(currentBatch));
+ rowsRead += response.getRowCount();
+ }
+ break;
+ } catch (com.google.api.gax.rpc.ApiException e) {
+ if (e.getStatusCode().getCode()
+ == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) {
+ LOG.warning("Read session expired or not found: %s", e.getMessage());
+ enqueueError(arrowBatchWrapperBlockingQueue, e);
+ break;
+ }
+ if (retryCount >= MAX_RETRY_COUNT) {
+ LOG.log(
+ Level.SEVERE,
+ "\n"
+ + Thread.currentThread().getName()
+ + " Interrupted @ arrowStreamProcessor, max retries exceeded",
+ e);
+ enqueueError(arrowBatchWrapperBlockingQueue, e);
+ break;
+ }
+ retryCount++;
+ LOG.warning(
+ "Connection interrupted during arrow stream read, retrying. attempt: %d",
+ retryCount);
+ Thread.sleep(RETRY_DELAY_MS);
+ }
+ }
+
+ } catch (InterruptedException e) {
+ LOG.log(
+ Level.WARNING,
+ "\n"
+ + Thread.currentThread().getName()
+ + " Interrupted @ arrowStreamProcessor",
+ e);
+ enqueueError(arrowBatchWrapperBlockingQueue, e);
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOG.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor",
+ e);
+ enqueueError(arrowBatchWrapperBlockingQueue, e);
+ } finally { // logic needed for graceful shutdown
+ enqueueEndOfStream(arrowBatchWrapperBlockingQueue);
}
-
- ArrowRecordBatch currentBatch = response.getArrowRecordBatch();
- Uninterruptibles.putUninterruptibly(
- arrowBatchWrapperBlockingQueue, BigQueryArrowBatchWrapper.of(currentBatch));
- rowsRead += response.getRowCount();
- }
- break;
- } catch (com.google.api.gax.rpc.ApiException e) {
- if (e.getStatusCode().getCode()
- == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) {
- LOG.warning("Read session expired or not found: %s", e.getMessage());
- enqueueError(arrowBatchWrapperBlockingQueue, e);
- break;
- }
- if (retryCount >= MAX_RETRY_COUNT) {
- LOG.log(
- Level.SEVERE,
- "\n"
- + Thread.currentThread().getName()
- + " Interrupted @ arrowStreamProcessor, max retries exceeded",
- e);
- enqueueError(arrowBatchWrapperBlockingQueue, e);
- break;
- }
- retryCount++;
- LOG.warning(
- "Connection interrupted during arrow stream read, retrying. attempt: %d",
- retryCount);
- Thread.sleep(RETRY_DELAY_MS);
- }
- }
-
- } catch (InterruptedException e) {
- LOG.log(
- Level.WARNING,
- "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor",
- e);
- enqueueError(arrowBatchWrapperBlockingQueue, e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- LOG.log(
- Level.WARNING,
- "\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor",
- e);
- enqueueError(arrowBatchWrapperBlockingQueue, e);
- } finally { // logic needed for graceful shutdown
- enqueueEndOfStream(arrowBatchWrapperBlockingQueue);
- }
- };
+ });
Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(arrowStreamProcessor);
populateBufferWorker.start();
@@ -1029,53 +1057,72 @@ Thread runNextPageTaskAsync(
// calls
populateFirstPage(result, rpcResponseQueue);
+ Context asyncContext = (this.otelContext != null) ? this.otelContext : Context.current();
+
// This thread makes the RPC calls and paginates
Runnable nextPageTask =
- () -> {
- String currentPageToken = firstPageToken;
- TableResult currentResults = result;
- TableId destinationTable = null;
- if (firstPageToken != null) {
- destinationTable = getDestinationTable(jobId);
- }
-
- try {
- while (currentPageToken != null) {
- // do not process further pages and shutdown
- if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
- LOG.warning(
- "%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName());
- break;
+ asyncContext.wrap(
+ () -> {
+ Tracer tracer = getSafeTracer();
+ String currentPageToken = firstPageToken;
+ TableResult currentResults = result;
+ TableId destinationTable = null;
+ if (firstPageToken != null) {
+ destinationTable = getDestinationTable(jobId);
}
- long startTime = System.nanoTime();
- currentResults =
- this.bigQuery.listTableData(
- destinationTable,
- TableDataListOption.pageSize(querySettings.getMaxResultPerPage()),
- TableDataListOption.pageToken(currentPageToken));
-
- currentPageToken = currentResults.getNextPageToken();
- // this will be parsed asynchronously without blocking the current
- // thread
- Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(currentResults, true));
- LOG.fine(
- "Fetched %d results from the server in %d ms.",
- querySettings.getMaxResultPerPage(),
- (int) ((System.nanoTime() - startTime) / 1000000));
- }
- } catch (Exception ex) {
- Uninterruptibles.putUninterruptibly(
- bigQueryFieldValueListWrapperBlockingQueue,
- BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex)));
- } finally {
- // this will stop the parseDataTask as well when the pagination
- // completes
- Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false));
- }
- // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
- // have finished processing the records and even that will be interrupted
- };
+ try {
+ while (currentPageToken != null) {
+ // do not process further pages and shutdown
+ if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
+ LOG.warning(
+ "%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName());
+ break;
+ }
+
+ Span paginationSpan =
+ tracer.spanBuilder("BigQueryStatement.pagination").startSpan();
+ try (Scope scope = paginationSpan.makeCurrent()) {
+ paginationSpan.setAttribute("db.pagination.page_token", currentPageToken);
+
+ long startTime = System.nanoTime();
+ currentResults =
+ this.bigQuery.listTableData(
+ destinationTable,
+ TableDataListOption.pageSize(querySettings.getMaxResultPerPage()),
+ TableDataListOption.pageToken(currentPageToken));
+
+ long duration = (System.nanoTime() - startTime) / 1000000;
+ paginationSpan.setAttribute("db.pagination.duration_ms", duration);
+ paginationSpan.setAttribute(
+ "db.pagination.rows_fetched", querySettings.getMaxResultPerPage());
+
+ currentPageToken = currentResults.getNextPageToken();
+ // this will be parsed asynchronously without blocking the current thread
+ Uninterruptibles.putUninterruptibly(
+ rpcResponseQueue, Tuple.of(currentResults, true));
+ LOG.fine(
+ "Fetched %d results from the server in %d ms.",
+ querySettings.getMaxResultPerPage(), (int) duration);
+ } catch (Exception e) {
+ paginationSpan.recordException(e);
+ paginationSpan.setStatus(StatusCode.ERROR, e.getMessage());
+ throw e;
+ } finally {
+ paginationSpan.end();
+ }
+ }
+ } catch (Exception ex) {
+ Uninterruptibles.putUninterruptibly(
+ bigQueryFieldValueListWrapperBlockingQueue,
+ BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex)));
+ } finally {
+ // this will stop the parseDataTask as well when the pagination completes
+ Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false));
+ }
+ // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
+ // have finished processing the records and even that will be interrupted
+ });
Thread nextPageWorker = JDBC_THREAD_FACTORY.newThread(nextPageTask);
nextPageWorker.start();
@@ -1094,63 +1141,69 @@ Thread parseAndPopulateRpcDataAsync(
LOG.finest("++enter++");
Runnable populateBufferRunnable =
- () -> { // producer thread populating the buffer
- try {
- Iterable fieldValueLists;
- // as we have to process the first page
- boolean hasRows = true;
- while (hasRows) {
- try {
- Tuple nextPageTuple = rpcResponseQueue.take();
- if (nextPageTuple.x() != null) {
- fieldValueLists = nextPageTuple.x().getValues();
- } else {
- fieldValueLists = null;
- }
- hasRows = nextPageTuple.y();
-
- } catch (InterruptedException e) {
- LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
- // Thread might get interrupted while calling the Cancel method, which is
- // expected, so logging this instead of throwing the exception back
- break;
- }
-
- if (Thread.currentThread().isInterrupted()
- || queryTaskExecutor.isShutdown()
- || fieldValueLists == null) {
- // do not process further pages and shutdown (outerloop)
- break;
- }
-
- long startTime = System.nanoTime();
- long results = 0;
- for (FieldValueList fieldValueList : fieldValueLists) {
-
- if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
- // do not process further pages and shutdown (inner loop)
- break;
- }
- Uninterruptibles.putUninterruptibly(
- bigQueryFieldValueListWrapperBlockingQueue,
- BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList));
- results += 1;
- }
- LOG.fine(
- "Processed %d results in %d ms.",
- results, (int) ((System.nanoTime() - startTime) / 1000000));
- }
-
- } catch (Exception ex) {
- LOG.log(
- Level.WARNING,
- "\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync",
- ex);
- enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex);
- } finally {
- enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue);
- }
- };
+ Context.current()
+ .wrap(
+ () -> { // producer thread populating the buffer
+ try {
+ Iterable fieldValueLists;
+ // as we have to process the first page
+ boolean hasRows = true;
+ while (hasRows) {
+ try {
+ Tuple nextPageTuple = rpcResponseQueue.take();
+ if (nextPageTuple.x() != null) {
+ fieldValueLists = nextPageTuple.x().getValues();
+ } else {
+ fieldValueLists = null;
+ }
+ hasRows = nextPageTuple.y();
+
+ } catch (InterruptedException e) {
+ LOG.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Interrupted",
+ e);
+ // Thread might get interrupted while calling the Cancel method, which is
+ // expected, so logging this instead of throwing the exception back
+ break;
+ }
+
+ if (Thread.currentThread().isInterrupted()
+ || queryTaskExecutor.isShutdown()
+ || fieldValueLists == null) {
+ // do not process further pages and shutdown (outerloop)
+ break;
+ }
+
+ long startTime = System.nanoTime();
+ long results = 0;
+ for (FieldValueList fieldValueList : fieldValueLists) {
+
+ if (Thread.currentThread().isInterrupted()
+ || queryTaskExecutor.isShutdown()) {
+ // do not process further pages and shutdown (inner loop)
+ break;
+ }
+ Uninterruptibles.putUninterruptibly(
+ bigQueryFieldValueListWrapperBlockingQueue,
+ BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList));
+ results += 1;
+ }
+ LOG.fine(
+ "Processed %d results in %d ms.",
+ results, (int) ((System.nanoTime() - startTime) / 1000000));
+ }
+
+ } catch (Exception ex) {
+ LOG.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync",
+ ex);
+ enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex);
+ } finally {
+ enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue);
+ }
+ });
Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(populateBufferRunnable);
populateBufferWorker.start();
@@ -1402,29 +1455,35 @@ public void clearBatch() {
@Override
public int[] executeBatch() throws SQLException {
LOG.finest("++enter++");
- int[] result = new int[this.batchQueries.size()];
- if (this.batchQueries.isEmpty()) {
- return result;
- }
+ return withTracing(
+ "BigQueryStatement.executeBatch",
+ (span) -> {
+ span.setAttribute("db.statement.count", this.batchQueries.size());
+
+ int[] result = new int[this.batchQueries.size()];
+ if (this.batchQueries.isEmpty()) {
+ return result;
+ }
- try {
- String combinedQueries = String.join("", this.batchQueries);
- QueryJobConfiguration.Builder jobConfiguration = getJobConfig(combinedQueries);
- jobConfiguration.setPriority(QueryJobConfiguration.Priority.BATCH);
- runQuery(combinedQueries, jobConfiguration.build());
- } catch (InterruptedException ex) {
- throw new BigQueryJdbcRuntimeException(ex);
- }
+ try {
+ String combinedQueries = String.join("", this.batchQueries);
+ QueryJobConfiguration.Builder jobConfiguration = getJobConfig(combinedQueries);
+ jobConfiguration.setPriority(QueryJobConfiguration.Priority.BATCH);
+ runQuery(combinedQueries, jobConfiguration.build());
+ } catch (InterruptedException ex) {
+ throw new BigQueryJdbcRuntimeException(ex);
+ }
- int i = 0;
- while (getUpdateCount() != -1 && i < this.batchQueries.size()) {
- result[i] = getUpdateCount();
- getMoreResults();
- i++;
- }
+ int i = 0;
+ while (getUpdateCount() != -1 && i < this.batchQueries.size()) {
+ result[i] = getUpdateCount();
+ getMoreResults();
+ i++;
+ }
- clearBatch();
- return result;
+ clearBatch();
+ return result;
+ });
}
@Override
@@ -1573,4 +1632,42 @@ private void enqueueBufferError(BlockingQueue que
private void enqueueBufferEndOfStream(BlockingQueue queue) {
Uninterruptibles.putUninterruptibly(queue, BigQueryFieldValueListWrapper.of(null, null, true));
}
+
+ @FunctionalInterface
+ private interface TracedOperation {
+ T run(Span span) throws SQLException;
+ }
+
+ private T withTracing(String spanName, TracedOperation operation) throws SQLException {
+ Tracer tracer = getSafeTracer();
+ Span span = tracer.spanBuilder(spanName).startSpan();
+ try (Scope scope = span.makeCurrent()) {
+ this.otelContext = Context.current();
+ return operation.run(span);
+ } catch (Exception ex) {
+ span.recordException(ex);
+ span.setStatus(StatusCode.ERROR, ex.getMessage());
+ throw ex;
+ } finally {
+ span.end();
+ }
+ }
+
+ /**
+ * Gets the OpenTelemetry Context from the statement execution. Used by ResultSet for pagination
+ * span context.
+ */
+ private Tracer getSafeTracer() {
+ if (connection != null) {
+ Tracer tracer = connection.getTracer();
+ if (tracer != null) {
+ return tracer;
+ }
+ }
+ return GlobalOpenTelemetry.getTracer("google-cloud-bigquery-jdbc-noop");
+ }
+
+ public Context getOtelContext() {
+ return this.otelContext;
+ }
}
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java
index 681595f8b05c..e1bb06625a0e 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java
@@ -20,6 +20,7 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -113,6 +114,8 @@ public class DataSource implements javax.sql.DataSource {
private String privateServiceConnect;
private Long connectionPoolSize;
private Long listenerPoolSize;
+ private Boolean enableOpenTelemetry;
+ private OpenTelemetry customOpenTelemetry;
// Make sure the JDBC driver class is loaded.
static {
@@ -324,6 +327,12 @@ public class DataSource implements javax.sql.DataSource {
.put(
BigQueryJdbcUrlUtility.LISTENER_POOL_SIZE_PROPERTY_NAME,
(ds, val) -> ds.setListenerPoolSize(Long.parseLong(val)))
+ .put(
+ BigQueryJdbcUrlUtility.ENABLE_OPENTELEMETRY_PROPERTY_NAME,
+ (ds, val) ->
+ ds.setEnableOpenTelemetry(
+ BigQueryJdbcUrlUtility.convertIntToBoolean(
+ val, BigQueryJdbcUrlUtility.ENABLE_OPENTELEMETRY_PROPERTY_NAME)))
.build();
public static DataSource fromUrl(String url) {
@@ -375,7 +384,11 @@ public Connection getConnection() throws SQLException {
throw new BigQueryJdbcException(
"The URL " + getURL() + " is invalid. Please specify a valid Connection URL. ");
}
- return DriverManager.getConnection(getURL(), createProperties());
+ Properties props = createProperties();
+ if (this.customOpenTelemetry != null) {
+ props.put("customOpenTelemetry", this.customOpenTelemetry);
+ }
+ return DriverManager.getConnection(getURL(), props);
}
private Properties createProperties() {
@@ -616,6 +629,11 @@ private Properties createProperties() {
BigQueryJdbcUrlUtility.LISTENER_POOL_SIZE_PROPERTY_NAME,
String.valueOf(this.listenerPoolSize));
}
+ if (this.enableOpenTelemetry != null) {
+ connectionProperties.setProperty(
+ BigQueryJdbcUrlUtility.ENABLE_OPENTELEMETRY_PROPERTY_NAME,
+ String.valueOf(this.enableOpenTelemetry));
+ }
return connectionProperties;
}
@@ -737,6 +755,24 @@ public void setListenerPoolSize(Long listenerPoolSize) {
this.listenerPoolSize = listenerPoolSize;
}
+ public Boolean getEnableOpenTelemetry() {
+ return enableOpenTelemetry != null
+ ? enableOpenTelemetry
+ : BigQueryJdbcUrlUtility.DEFAULT_ENABLE_OPENTELEMETRY_VALUE;
+ }
+
+ public void setEnableOpenTelemetry(Boolean enableOpenTelemetry) {
+ this.enableOpenTelemetry = enableOpenTelemetry;
+ }
+
+ public OpenTelemetry getCustomOpenTelemetry() {
+ return customOpenTelemetry;
+ }
+
+ public void setCustomOpenTelemetry(OpenTelemetry customOpenTelemetry) {
+ this.customOpenTelemetry = customOpenTelemetry;
+ }
+
public void setHighThroughputMinTableSize(Integer highThroughputMinTableSize) {
this.highThroughputMinTableSize = highThroughputMinTableSize;
}