diff --git a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml index d7a56c91f597..a4710371d965 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml +++ b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml @@ -131,6 +131,10 @@ io com.google.bqjdbc.shaded.io + + io.opentelemetry.api.* + io.opentelemetry.context.* + @@ -277,6 +281,16 @@ httpcore5 + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-context + + com.google.truth diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index 17471e252205..7bb9bfa409c1 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -41,6 +41,8 @@ import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; import com.google.cloud.http.HttpTransportOptions; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Tracer; import java.io.IOException; import java.io.InputStream; import java.sql.CallableStatement; @@ -138,6 +140,9 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { Long connectionPoolSize; Long listenerPoolSize; String partnerToken; + boolean enableOpenTelemetry; + OpenTelemetry customOpenTelemetry; + Tracer tracer = OpenTelemetry.noop().getTracer(""); BigQueryConnection(String url) throws IOException { this(url, DataSource.fromUrl(url)); @@ -242,6 +247,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { this.connectionPoolSize = ds.getConnectionPoolSize(); this.listenerPoolSize = ds.getListenerPoolSize(); this.partnerToken = ds.getPartnerToken(); + this.enableOpenTelemetry = ds.getEnableOpenTelemetry(); + this.customOpenTelemetry = ds.getCustomOpenTelemetry(); this.headerProvider = createHeaderProvider(); this.bigQuery = getBigQueryConnection(); @@ -935,6 +942,14 @@ private BigQuery getBigQueryConnection() { bigQueryOptions.setTransportOptions(this.httpTransportOptions); } + OpenTelemetry openTelemetry = + BigQueryJdbcOpenTelemetry.getOpenTelemetry( + this.enableOpenTelemetry, this.customOpenTelemetry); + if (this.enableOpenTelemetry) { + this.tracer = BigQueryJdbcOpenTelemetry.getTracer(openTelemetry); + bigQueryOptions.setOpenTelemetryTracer(this.tracer); + } + BigQueryOptions options = bigQueryOptions.setHeaderProvider(this.headerProvider).build(); options.setDefaultJobCreationMode( this.useStatelessQueryMode @@ -1083,4 +1098,8 @@ public CallableStatement prepareCall( } return prepareCall(sql); } + + public Tracer getTracer() { + return this.tracer; + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java index 930fc42af2bc..02bfd3cc164c 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java @@ -20,6 +20,7 @@ import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException; import io.grpc.LoadBalancerRegistry; import io.grpc.internal.PickFirstLoadBalancerProvider; +import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; import java.sql.Connection; import java.sql.Driver; @@ -121,9 +122,14 @@ public Connection connect(String url, Properties info) throws SQLException { LOG.finest("++enter++"); try { if (acceptsURL(url)) { - // strip 'jdbc:' from the URL, add any extra properties + Properties connectInfo = info == null ? new Properties() : (Properties) info.clone(); + OpenTelemetry customOpenTelemetry = null; + if (connectInfo.containsKey("customOpenTelemetry")) { + customOpenTelemetry = (OpenTelemetry) connectInfo.remove("customOpenTelemetry"); + } String connectionUri = - BigQueryJdbcUrlUtility.appendPropertiesToURL(url.substring(5), this.toString(), info); + BigQueryJdbcUrlUtility.appendPropertiesToURL( + url.substring(5), this.toString(), connectInfo); try { BigQueryJdbcUrlUtility.parseUrl(connectionUri); } catch (BigQueryJdbcRuntimeException e) { @@ -131,6 +137,7 @@ public Connection connect(String url, Properties info) throws SQLException { } DataSource ds = DataSource.fromUrl(connectionUri); + ds.setCustomOpenTelemetry(customOpenTelemetry); // LogLevel String logLevelStr = ds.getLogLevel(); diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java new file mode 100644 index 000000000000..811158bb7dad --- /dev/null +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java @@ -0,0 +1,48 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.jdbc; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Tracer; + +public class BigQueryJdbcOpenTelemetry { + + private static final String INSTRUMENTATION_SCOPE_NAME = "com.google.cloud.bigquery.jdbc"; + + /** + * Initializes or returns the OpenTelemetry instance based on hybrid logic. Prefer + * GlobalOpenTelemetry; fallback to an auto-configured GCP exporter if requested. + */ + public static OpenTelemetry getOpenTelemetry( + boolean enableOpenTelemetry, OpenTelemetry customOpenTelemetry) { + if (!enableOpenTelemetry) { + return OpenTelemetry.noop(); + } + + if (customOpenTelemetry != null) { + return customOpenTelemetry; + } + + return GlobalOpenTelemetry.get(); + } + + /** Gets a Tracer for the JDBC driver instrumentation scope. */ + public static Tracer getTracer(OpenTelemetry openTelemetry) { + return openTelemetry.getTracer(INSTRUMENTATION_SCOPE_NAME); + } +} diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java index 5b89cf27eecf..23040f345dda 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java @@ -162,6 +162,8 @@ protected boolean removeEldestEntry(Map.Entry> eldes static final int DEFAULT_SWA_APPEND_ROW_COUNT_VALUE = 1000; static final String SWA_ACTIVATION_ROW_COUNT_PROPERTY_NAME = "SWA_ActivationRowCount"; static final int DEFAULT_SWA_ACTIVATION_ROW_COUNT_VALUE = 3; + static final String ENABLE_OPENTELEMETRY_PROPERTY_NAME = "EnableOpenTelemetry"; + static final boolean DEFAULT_ENABLE_OPENTELEMETRY_VALUE = false; private static final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(BigQueryJdbcUrlUtility.class.getName()); static final String FILTER_TABLES_ON_DEFAULT_DATASET_PROPERTY_NAME = @@ -607,6 +609,12 @@ protected boolean removeEldestEntry(Map.Entry> eldes .setDescription( "Reason for the request, which is passed as the x-goog-request-reason" + " header.") + .build(), + BigQueryConnectionProperty.newBuilder() + .setName(ENABLE_OPENTELEMETRY_PROPERTY_NAME) + .setDescription( + "Enables or disables OpenTelemetry features in the Driver. Disabled by default.") + .setDefaultValue(String.valueOf(DEFAULT_ENABLE_OPENTELEMETRY_VALUE)) .build()))); private static final List NETWORK_PROPERTIES = diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index ca579d1d0c1b..9dbb79ea2151 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -57,6 +57,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.util.concurrent.Uninterruptibles; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.lang.ref.ReferenceQueue; import java.sql.Connection; import java.sql.ResultSet; @@ -117,6 +123,7 @@ public class BigQueryStatement extends BigQueryNoOpsStatement { private int fetchSize; private String scriptQuery; private Map extraLabels = new HashMap<>(); + protected Context otelContext = null; private BigQueryReadClient bigQueryReadClient = null; private final BigQuery bigQuery; @@ -233,40 +240,49 @@ private BigQuerySettings generateBigQuerySettings() { */ @Override public ResultSet executeQuery(String sql) throws SQLException { - // TODO: write method to return state variables to original state. LOG.finest("++enter++"); - logQueryExecutionStart(sql); - try { - QueryJobConfiguration jobConfiguration = - setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build()); - runQuery(sql, jobConfiguration); - } catch (InterruptedException ex) { - throw new BigQueryJdbcException(ex); - } + return withTracing( + "BigQueryStatement.executeQuery", + (span) -> { + span.setAttribute("db.statement", sql); + logQueryExecutionStart(sql); + try { + QueryJobConfiguration jobConfiguration = + setDestinationDatasetAndTableInJobConfig(getJobConfig(sql).build()); + runQuery(sql, jobConfiguration); + } catch (InterruptedException ex) { + throw new BigQueryJdbcException(ex); + } - if (!isSingularResultSet()) { - throw new BigQueryJdbcException( - "Query returned more than one or didn't return any ResultSet."); - } - // This contains all the other assertions spec required on this method - return getCurrentResultSet(); + if (!isSingularResultSet()) { + throw new BigQueryJdbcException( + "Query returned more than one or didn't return any ResultSet."); + } + // This contains all the other assertions spec required on this method + return getCurrentResultSet(); + }); } @Override public long executeLargeUpdate(String sql) throws SQLException { LOG.finest("++enter++"); - logQueryExecutionStart(sql); - try { - QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql); - runQuery(sql, jobConfiguration.build()); - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException(ex); - } - if (this.currentUpdateCount == -1) { - throw new BigQueryJdbcException( - "Update query expected to return affected row count. Double check query type."); - } - return this.currentUpdateCount; + return withTracing( + "BigQueryStatement.executeLargeUpdate", + (span) -> { + span.setAttribute("db.statement", sql); + logQueryExecutionStart(sql); + try { + QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql); + runQuery(sql, jobConfiguration.build()); + } catch (InterruptedException ex) { + throw new BigQueryJdbcRuntimeException(ex); + } + if (this.currentUpdateCount == -1) { + throw new BigQueryJdbcException( + "Update query expected to return affected row count. Double check query type."); + } + return this.currentUpdateCount; + }); } @Override @@ -288,18 +304,23 @@ int checkUpdateCount(long updateCount) { @Override public boolean execute(String sql) throws SQLException { LOG.finest("++enter++"); - logQueryExecutionStart(sql); - try { - QueryJobConfiguration jobConfiguration = getJobConfig(sql).build(); - // If Large Results are enabled, ensure query type is SELECT - if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) { - jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration); - } - runQuery(sql, jobConfiguration); - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException(ex); - } - return getCurrentResultSet() != null; + return withTracing( + "BigQueryStatement.execute", + (span) -> { + span.setAttribute("db.statement", sql); + logQueryExecutionStart(sql); + try { + QueryJobConfiguration jobConfiguration = getJobConfig(sql).build(); + // If Large Results are enabled, ensure query type is SELECT + if (isLargeResultsEnabled() && getQueryType(jobConfiguration, null) == SqlType.SELECT) { + jobConfiguration = setDestinationDatasetAndTableInJobConfig(jobConfiguration); + } + runQuery(sql, jobConfiguration); + } catch (InterruptedException ex) { + throw new BigQueryJdbcRuntimeException(ex); + } + return getCurrentResultSet() != null; + }); } StatementType getStatementType(QueryJobConfiguration queryJobConfiguration) throws SQLException { @@ -810,77 +831,84 @@ Thread populateArrowBufferedQueue( LOG.finest("++enter++"); Runnable arrowStreamProcessor = - () -> { - long rowsRead = 0; - int retryCount = 0; - try { - // Use the first stream to perform reading. - String streamName = readSession.getStreams(0).getName(); - - while (true) { - try { - ReadRowsRequest readRowsRequest = - ReadRowsRequest.newBuilder() - .setReadStream(streamName) - .setOffset(rowsRead) - .build(); - - // Process each block of rows as they arrive and decode using our simple row reader. - com.google.api.gax.rpc.ServerStream stream = - bqReadClient.readRowsCallable().call(readRowsRequest); - for (ReadRowsResponse response : stream) { - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - break; + Context.current() + .wrap( + () -> { + long rowsRead = 0; + int retryCount = 0; + try { + // Use the first stream to perform reading. + String streamName = readSession.getStreams(0).getName(); + + while (true) { + try { + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder() + .setReadStream(streamName) + .setOffset(rowsRead) + .build(); + + // Process each block of rows as they arrive and decode using our simple row + // reader. + com.google.api.gax.rpc.ServerStream stream = + bqReadClient.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown()) { + break; + } + + ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); + Uninterruptibles.putUninterruptibly( + arrowBatchWrapperBlockingQueue, + BigQueryArrowBatchWrapper.of(currentBatch)); + rowsRead += response.getRowCount(); + } + break; + } catch (com.google.api.gax.rpc.ApiException e) { + if (e.getStatusCode().getCode() + == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { + LOG.warning("Read session expired or not found: %s", e.getMessage()); + enqueueError(arrowBatchWrapperBlockingQueue, e); + break; + } + if (retryCount >= MAX_RETRY_COUNT) { + LOG.log( + Level.SEVERE, + "\n" + + Thread.currentThread().getName() + + " Interrupted @ arrowStreamProcessor, max retries exceeded", + e); + enqueueError(arrowBatchWrapperBlockingQueue, e); + break; + } + retryCount++; + LOG.warning( + "Connection interrupted during arrow stream read, retrying. attempt: %d", + retryCount); + Thread.sleep(RETRY_DELAY_MS); + } + } + + } catch (InterruptedException e) { + LOG.log( + Level.WARNING, + "\n" + + Thread.currentThread().getName() + + " Interrupted @ arrowStreamProcessor", + e); + enqueueError(arrowBatchWrapperBlockingQueue, e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor", + e); + enqueueError(arrowBatchWrapperBlockingQueue, e); + } finally { // logic needed for graceful shutdown + enqueueEndOfStream(arrowBatchWrapperBlockingQueue); } - - ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); - Uninterruptibles.putUninterruptibly( - arrowBatchWrapperBlockingQueue, BigQueryArrowBatchWrapper.of(currentBatch)); - rowsRead += response.getRowCount(); - } - break; - } catch (com.google.api.gax.rpc.ApiException e) { - if (e.getStatusCode().getCode() - == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { - LOG.warning("Read session expired or not found: %s", e.getMessage()); - enqueueError(arrowBatchWrapperBlockingQueue, e); - break; - } - if (retryCount >= MAX_RETRY_COUNT) { - LOG.log( - Level.SEVERE, - "\n" - + Thread.currentThread().getName() - + " Interrupted @ arrowStreamProcessor, max retries exceeded", - e); - enqueueError(arrowBatchWrapperBlockingQueue, e); - break; - } - retryCount++; - LOG.warning( - "Connection interrupted during arrow stream read, retrying. attempt: %d", - retryCount); - Thread.sleep(RETRY_DELAY_MS); - } - } - - } catch (InterruptedException e) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor", - e); - enqueueError(arrowBatchWrapperBlockingQueue, e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor", - e); - enqueueError(arrowBatchWrapperBlockingQueue, e); - } finally { // logic needed for graceful shutdown - enqueueEndOfStream(arrowBatchWrapperBlockingQueue); - } - }; + }); Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(arrowStreamProcessor); populateBufferWorker.start(); @@ -1029,53 +1057,72 @@ Thread runNextPageTaskAsync( // calls populateFirstPage(result, rpcResponseQueue); + Context asyncContext = (this.otelContext != null) ? this.otelContext : Context.current(); + // This thread makes the RPC calls and paginates Runnable nextPageTask = - () -> { - String currentPageToken = firstPageToken; - TableResult currentResults = result; - TableId destinationTable = null; - if (firstPageToken != null) { - destinationTable = getDestinationTable(jobId); - } - - try { - while (currentPageToken != null) { - // do not process further pages and shutdown - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - LOG.warning( - "%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName()); - break; + asyncContext.wrap( + () -> { + Tracer tracer = getSafeTracer(); + String currentPageToken = firstPageToken; + TableResult currentResults = result; + TableId destinationTable = null; + if (firstPageToken != null) { + destinationTable = getDestinationTable(jobId); } - long startTime = System.nanoTime(); - currentResults = - this.bigQuery.listTableData( - destinationTable, - TableDataListOption.pageSize(querySettings.getMaxResultPerPage()), - TableDataListOption.pageToken(currentPageToken)); - - currentPageToken = currentResults.getNextPageToken(); - // this will be parsed asynchronously without blocking the current - // thread - Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(currentResults, true)); - LOG.fine( - "Fetched %d results from the server in %d ms.", - querySettings.getMaxResultPerPage(), - (int) ((System.nanoTime() - startTime) / 1000000)); - } - } catch (Exception ex) { - Uninterruptibles.putUninterruptibly( - bigQueryFieldValueListWrapperBlockingQueue, - BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex))); - } finally { - // this will stop the parseDataTask as well when the pagination - // completes - Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false)); - } - // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not - // have finished processing the records and even that will be interrupted - }; + try { + while (currentPageToken != null) { + // do not process further pages and shutdown + if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + LOG.warning( + "%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName()); + break; + } + + Span paginationSpan = + tracer.spanBuilder("BigQueryStatement.pagination").startSpan(); + try (Scope scope = paginationSpan.makeCurrent()) { + paginationSpan.setAttribute("db.pagination.page_token", currentPageToken); + + long startTime = System.nanoTime(); + currentResults = + this.bigQuery.listTableData( + destinationTable, + TableDataListOption.pageSize(querySettings.getMaxResultPerPage()), + TableDataListOption.pageToken(currentPageToken)); + + long duration = (System.nanoTime() - startTime) / 1000000; + paginationSpan.setAttribute("db.pagination.duration_ms", duration); + paginationSpan.setAttribute( + "db.pagination.rows_fetched", querySettings.getMaxResultPerPage()); + + currentPageToken = currentResults.getNextPageToken(); + // this will be parsed asynchronously without blocking the current thread + Uninterruptibles.putUninterruptibly( + rpcResponseQueue, Tuple.of(currentResults, true)); + LOG.fine( + "Fetched %d results from the server in %d ms.", + querySettings.getMaxResultPerPage(), (int) duration); + } catch (Exception e) { + paginationSpan.recordException(e); + paginationSpan.setStatus(StatusCode.ERROR, e.getMessage()); + throw e; + } finally { + paginationSpan.end(); + } + } + } catch (Exception ex) { + Uninterruptibles.putUninterruptibly( + bigQueryFieldValueListWrapperBlockingQueue, + BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex))); + } finally { + // this will stop the parseDataTask as well when the pagination completes + Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false)); + } + // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not + // have finished processing the records and even that will be interrupted + }); Thread nextPageWorker = JDBC_THREAD_FACTORY.newThread(nextPageTask); nextPageWorker.start(); @@ -1094,63 +1141,69 @@ Thread parseAndPopulateRpcDataAsync( LOG.finest("++enter++"); Runnable populateBufferRunnable = - () -> { // producer thread populating the buffer - try { - Iterable fieldValueLists; - // as we have to process the first page - boolean hasRows = true; - while (hasRows) { - try { - Tuple nextPageTuple = rpcResponseQueue.take(); - if (nextPageTuple.x() != null) { - fieldValueLists = nextPageTuple.x().getValues(); - } else { - fieldValueLists = null; - } - hasRows = nextPageTuple.y(); - - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); - // Thread might get interrupted while calling the Cancel method, which is - // expected, so logging this instead of throwing the exception back - break; - } - - if (Thread.currentThread().isInterrupted() - || queryTaskExecutor.isShutdown() - || fieldValueLists == null) { - // do not process further pages and shutdown (outerloop) - break; - } - - long startTime = System.nanoTime(); - long results = 0; - for (FieldValueList fieldValueList : fieldValueLists) { - - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - // do not process further pages and shutdown (inner loop) - break; - } - Uninterruptibles.putUninterruptibly( - bigQueryFieldValueListWrapperBlockingQueue, - BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList)); - results += 1; - } - LOG.fine( - "Processed %d results in %d ms.", - results, (int) ((System.nanoTime() - startTime) / 1000000)); - } - - } catch (Exception ex) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync", - ex); - enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex); - } finally { - enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue); - } - }; + Context.current() + .wrap( + () -> { // producer thread populating the buffer + try { + Iterable fieldValueLists; + // as we have to process the first page + boolean hasRows = true; + while (hasRows) { + try { + Tuple nextPageTuple = rpcResponseQueue.take(); + if (nextPageTuple.x() != null) { + fieldValueLists = nextPageTuple.x().getValues(); + } else { + fieldValueLists = null; + } + hasRows = nextPageTuple.y(); + + } catch (InterruptedException e) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted", + e); + // Thread might get interrupted while calling the Cancel method, which is + // expected, so logging this instead of throwing the exception back + break; + } + + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown() + || fieldValueLists == null) { + // do not process further pages and shutdown (outerloop) + break; + } + + long startTime = System.nanoTime(); + long results = 0; + for (FieldValueList fieldValueList : fieldValueLists) { + + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown()) { + // do not process further pages and shutdown (inner loop) + break; + } + Uninterruptibles.putUninterruptibly( + bigQueryFieldValueListWrapperBlockingQueue, + BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList)); + results += 1; + } + LOG.fine( + "Processed %d results in %d ms.", + results, (int) ((System.nanoTime() - startTime) / 1000000)); + } + + } catch (Exception ex) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync", + ex); + enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex); + } finally { + enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue); + } + }); Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(populateBufferRunnable); populateBufferWorker.start(); @@ -1402,29 +1455,35 @@ public void clearBatch() { @Override public int[] executeBatch() throws SQLException { LOG.finest("++enter++"); - int[] result = new int[this.batchQueries.size()]; - if (this.batchQueries.isEmpty()) { - return result; - } + return withTracing( + "BigQueryStatement.executeBatch", + (span) -> { + span.setAttribute("db.statement.count", this.batchQueries.size()); + + int[] result = new int[this.batchQueries.size()]; + if (this.batchQueries.isEmpty()) { + return result; + } - try { - String combinedQueries = String.join("", this.batchQueries); - QueryJobConfiguration.Builder jobConfiguration = getJobConfig(combinedQueries); - jobConfiguration.setPriority(QueryJobConfiguration.Priority.BATCH); - runQuery(combinedQueries, jobConfiguration.build()); - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException(ex); - } + try { + String combinedQueries = String.join("", this.batchQueries); + QueryJobConfiguration.Builder jobConfiguration = getJobConfig(combinedQueries); + jobConfiguration.setPriority(QueryJobConfiguration.Priority.BATCH); + runQuery(combinedQueries, jobConfiguration.build()); + } catch (InterruptedException ex) { + throw new BigQueryJdbcRuntimeException(ex); + } - int i = 0; - while (getUpdateCount() != -1 && i < this.batchQueries.size()) { - result[i] = getUpdateCount(); - getMoreResults(); - i++; - } + int i = 0; + while (getUpdateCount() != -1 && i < this.batchQueries.size()) { + result[i] = getUpdateCount(); + getMoreResults(); + i++; + } - clearBatch(); - return result; + clearBatch(); + return result; + }); } @Override @@ -1573,4 +1632,42 @@ private void enqueueBufferError(BlockingQueue que private void enqueueBufferEndOfStream(BlockingQueue queue) { Uninterruptibles.putUninterruptibly(queue, BigQueryFieldValueListWrapper.of(null, null, true)); } + + @FunctionalInterface + private interface TracedOperation { + T run(Span span) throws SQLException; + } + + private T withTracing(String spanName, TracedOperation operation) throws SQLException { + Tracer tracer = getSafeTracer(); + Span span = tracer.spanBuilder(spanName).startSpan(); + try (Scope scope = span.makeCurrent()) { + this.otelContext = Context.current(); + return operation.run(span); + } catch (Exception ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + /** + * Gets the OpenTelemetry Context from the statement execution. Used by ResultSet for pagination + * span context. + */ + private Tracer getSafeTracer() { + if (connection != null) { + Tracer tracer = connection.getTracer(); + if (tracer != null) { + return tracer; + } + } + return GlobalOpenTelemetry.getTracer("google-cloud-bigquery-jdbc-noop"); + } + + public Context getOtelContext() { + return this.otelContext; + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java index 681595f8b05c..e1bb06625a0e 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java @@ -20,6 +20,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.opentelemetry.api.OpenTelemetry; import java.io.PrintWriter; import java.sql.Connection; import java.sql.DriverManager; @@ -113,6 +114,8 @@ public class DataSource implements javax.sql.DataSource { private String privateServiceConnect; private Long connectionPoolSize; private Long listenerPoolSize; + private Boolean enableOpenTelemetry; + private OpenTelemetry customOpenTelemetry; // Make sure the JDBC driver class is loaded. static { @@ -324,6 +327,12 @@ public class DataSource implements javax.sql.DataSource { .put( BigQueryJdbcUrlUtility.LISTENER_POOL_SIZE_PROPERTY_NAME, (ds, val) -> ds.setListenerPoolSize(Long.parseLong(val))) + .put( + BigQueryJdbcUrlUtility.ENABLE_OPENTELEMETRY_PROPERTY_NAME, + (ds, val) -> + ds.setEnableOpenTelemetry( + BigQueryJdbcUrlUtility.convertIntToBoolean( + val, BigQueryJdbcUrlUtility.ENABLE_OPENTELEMETRY_PROPERTY_NAME))) .build(); public static DataSource fromUrl(String url) { @@ -375,7 +384,11 @@ public Connection getConnection() throws SQLException { throw new BigQueryJdbcException( "The URL " + getURL() + " is invalid. Please specify a valid Connection URL. "); } - return DriverManager.getConnection(getURL(), createProperties()); + Properties props = createProperties(); + if (this.customOpenTelemetry != null) { + props.put("customOpenTelemetry", this.customOpenTelemetry); + } + return DriverManager.getConnection(getURL(), props); } private Properties createProperties() { @@ -616,6 +629,11 @@ private Properties createProperties() { BigQueryJdbcUrlUtility.LISTENER_POOL_SIZE_PROPERTY_NAME, String.valueOf(this.listenerPoolSize)); } + if (this.enableOpenTelemetry != null) { + connectionProperties.setProperty( + BigQueryJdbcUrlUtility.ENABLE_OPENTELEMETRY_PROPERTY_NAME, + String.valueOf(this.enableOpenTelemetry)); + } return connectionProperties; } @@ -737,6 +755,24 @@ public void setListenerPoolSize(Long listenerPoolSize) { this.listenerPoolSize = listenerPoolSize; } + public Boolean getEnableOpenTelemetry() { + return enableOpenTelemetry != null + ? enableOpenTelemetry + : BigQueryJdbcUrlUtility.DEFAULT_ENABLE_OPENTELEMETRY_VALUE; + } + + public void setEnableOpenTelemetry(Boolean enableOpenTelemetry) { + this.enableOpenTelemetry = enableOpenTelemetry; + } + + public OpenTelemetry getCustomOpenTelemetry() { + return customOpenTelemetry; + } + + public void setCustomOpenTelemetry(OpenTelemetry customOpenTelemetry) { + this.customOpenTelemetry = customOpenTelemetry; + } + public void setHighThroughputMinTableSize(Integer highThroughputMinTableSize) { this.highThroughputMinTableSize = highThroughputMinTableSize; }