Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion java-bigquery/google-cloud-bigquery-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-logging</artifactId>
<version>3.33.0-SNAPSHOT</version><!-- {x-version-update:google-cloud-logging:current} -->
<version>3.32.0</version><!-- {x-version-update:google-cloud-logging:current} -->
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
Expand Down Expand Up @@ -301,6 +301,19 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>
<!-- OpenTelemetry SDK and Exporters (will be shaded via existing 'io' relocation) -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
</dependency>

<!-- Test Dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
int transactionIsolation;
List<SQLWarning> sqlWarnings;
String catalog;
String gcpTelemetryCredentials;
String gcpTelemetryProjectId;
int holdability;
long retryTimeoutInSeconds;
Duration retryTimeoutDuration;
Expand Down Expand Up @@ -169,6 +171,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {

this.labels = ds.getLabels() != null ? ds.getLabels() : new java.util.HashMap<>();
this.maxBytesBilled = ds.getMaximumBytesBilled();
this.gcpTelemetryCredentials = ds.getGcpTelemetryCredentials();
this.gcpTelemetryProjectId = ds.getGcpTelemetryProjectId();
this.retryTimeoutInSeconds = ds.getTimeout();
this.retryTimeoutDuration = Duration.ofMillis(retryTimeoutInSeconds * 1000L);
this.retryInitialDelayInSeconds = ds.getRetryInitialDelay();
Expand Down Expand Up @@ -961,6 +965,32 @@ void removeStatement(Statement statement) {
this.openStatements.remove(statement);
}

private OpenTelemetry getOpenTelemetryInstance() {
String effectiveProjectId =
(this.gcpTelemetryProjectId != null) ? this.gcpTelemetryProjectId : this.catalog;

String effectiveCredentials = this.gcpTelemetryCredentials;
String authTypeStr = this.overrideProperties.get("OAuthType");

if (effectiveCredentials == null && "0".equals(authTypeStr)) {
effectiveCredentials = this.overrideProperties.get("OAuthPvtKey");
}

if (Boolean.TRUE.equals(this.enableGcpTraceExporter) && effectiveCredentials == null) {
if (!"0".equals(authTypeStr) && !"3".equals(authTypeStr)) {
throw new BigQueryJdbcRuntimeException(
"Exporting traces to Google Cloud is only supported when using Application Default Credentials (ADC) or Service Account authentication.");
}
}

return BigQueryJdbcOpenTelemetry.getOpenTelemetry(
Boolean.TRUE.equals(this.enableGcpTraceExporter),
Boolean.TRUE.equals(this.enableGcpLogExporter),
this.customOpenTelemetry,
effectiveCredentials,
effectiveProjectId);
}

private BigQuery getBigQueryConnection() {
BigQueryOptions.Builder bigQueryOptions = BigQueryOptions.newBuilder();
if (this.retryTimeoutInSeconds > 0L
Expand Down Expand Up @@ -997,17 +1027,14 @@ private BigQuery getBigQueryConnection() {
if (this.httpTransportOptions != null) {
bigQueryOptions.setTransportOptions(this.httpTransportOptions);
}
OpenTelemetry openTelemetry = getOpenTelemetryInstance();

OpenTelemetry openTelemetry =
BigQueryJdbcOpenTelemetry.getOpenTelemetry(
this.enableGcpTraceExporter, this.enableGcpLogExporter, this.customOpenTelemetry);

if (this.enableGcpLogExporter || this.customOpenTelemetry != null) {
if (Boolean.TRUE.equals(this.enableGcpLogExporter) || this.customOpenTelemetry != null) {
BigQueryJdbcOpenTelemetry.registerConnection(
this.connectionId, openTelemetry, null, this.enableGcpLogExporter);
}

if (this.enableGcpTraceExporter || this.customOpenTelemetry != null) {
if (Boolean.TRUE.equals(this.enableGcpTraceExporter) || this.customOpenTelemetry != null) {
this.tracer = BigQueryJdbcOpenTelemetry.getTracer(openTelemetry);
bigQueryOptions.setOpenTelemetryTracer(this.tracer);
}
Expand Down Expand Up @@ -1060,10 +1087,8 @@ private BigQueryReadClient getBigQueryReadClientConnection() throws IOException

bigQueryReadSettings.setTransportChannelProvider(activeProvider);

OpenTelemetry openTelemetry =
BigQueryJdbcOpenTelemetry.getOpenTelemetry(
this.enableGcpTraceExporter, this.enableGcpLogExporter, this.customOpenTelemetry);
if (this.enableGcpTraceExporter || this.customOpenTelemetry != null) {
OpenTelemetry openTelemetry = getOpenTelemetryInstance();
if (Boolean.TRUE.equals(this.enableGcpTraceExporter) || this.customOpenTelemetry != null) {
bigQueryReadSettings.setOpenTelemetryTracerProvider(openTelemetry.getTracerProvider());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ private static boolean isFileExists(String filename) {
}
}

private static boolean isJson(byte[] value) {
static boolean isJson(byte[] value) {
try {
// This is done this way to ensure strict Json parsing
// https://github.com/google/gson/issues/1208#issuecomment-2120764686
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@
package com.google.cloud.bigquery.jdbc;

import com.google.cloud.logging.Logging;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Handler;
import java.util.logging.Logger;
Expand All @@ -29,17 +36,59 @@ public class BigQueryJdbcOpenTelemetry {
static final String INSTRUMENTATION_SCOPE_NAME = "com.google.cloud.bigquery.jdbc";
static final String BIGQUERY_NAMESPACE = "com.google.cloud.bigquery";
public static final String CONNECTION_ID_BAGGAGE_KEY = "jdbc.connection_id";
private static final String OTEL_TRACES_EXPORTER = "otel.traces.exporter";
private static final String OTEL_EXPORTER_OTLP_ENDPOINT = "otel.exporter.otlp.endpoint";
private static final String OTEL_LOGS_EXPORTER = "otel.logs.exporter";
private static final String OTEL_METRICS_EXPORTER = "otel.metrics.exporter";
private static final String GOOGLE_CLOUD_PROJECT = "google.cloud.project";
private static final String CREDENTIALS_JSON = "google.cloud.credentials.json";
private static final String CREDENTIALS_PATH = "google.cloud.credentials.path";
private static final String OTLP_ENDPOINT_VALUE = "https://telemetry.googleapis.com:443";
private static final String EXPORTER_NONE = "none";
private static final String EXPORTER_OTLP = "otlp";
private static final BigQueryJdbcCustomLogger LOG =
new BigQueryJdbcCustomLogger("BigQueryJdbcOpenTelemetry");

private static final class SdkCacheKey {
private final String projectId;
private final String credentialsHashOrPath;
private final boolean enableTrace;

SdkCacheKey(String projectId, String credentialsHashOrPath, boolean enableTrace) {
this.projectId = projectId;
this.credentialsHashOrPath = credentialsHashOrPath;
this.enableTrace = enableTrace;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SdkCacheKey that = (SdkCacheKey) o;
return enableTrace == that.enableTrace
&& Objects.equals(projectId, that.projectId)
&& Objects.equals(credentialsHashOrPath, that.credentialsHashOrPath);
}

@Override
public int hashCode() {
return Objects.hash(projectId, credentialsHashOrPath, enableTrace);
}
}

private static final ConcurrentHashMap<SdkCacheKey, OpenTelemetrySdk> sdkCache =
new ConcurrentHashMap<>();

static class TelemetryConfig {
final OpenTelemetry openTelemetry;
final Logging loggingClient;
final boolean useDirectGcpLogging;

TelemetryConfig(
OpenTelemetry openTelemetry, Logging loggingClient, boolean useDirectGcpLogging) {
OpenTelemetry openTelemetry, Logging loggingClient, Boolean useDirectGcpLogging) {
this.openTelemetry = openTelemetry;
this.loggingClient = loggingClient;
this.useDirectGcpLogging = useDirectGcpLogging;
this.useDirectGcpLogging = useDirectGcpLogging != null ? useDirectGcpLogging : false;
}
}

Expand All @@ -50,6 +99,20 @@ private BigQueryJdbcOpenTelemetry() {}

static {
ensureGlobalHandlerAttached();
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
for (OpenTelemetrySdk sdk : sdkCache.values()) {
try {
sdk.close();
} catch (Exception e) {
// Ignore failures during shutdown to ensure all SDKs are attempted to be
// closed. Logging is avoided here because the logging system might have
// already been shut down by the JVM.
}
}
Comment thread
keshavdandeva marked this conversation as resolved.
}));
}

public static void ensureGlobalHandlerAttached() {
Expand All @@ -70,7 +133,7 @@ public static void registerConnection(
String connectionId,
OpenTelemetry openTelemetry,
Logging loggingClient,
boolean useDirectGcpLogging) {
Boolean useDirectGcpLogging) {
connectionConfigs.put(
connectionId, new TelemetryConfig(openTelemetry, loggingClient, useDirectGcpLogging));
Comment thread
keshavdandeva marked this conversation as resolved.
}
Expand All @@ -87,21 +150,76 @@ public static Collection<TelemetryConfig> getRegisteredConfigs() {
return connectionConfigs.values();
}

private static String getCredentialsIdentifier(String credentials) {
if (credentials == null) {
return "";
}
byte[] credsBytes = credentials.getBytes(StandardCharsets.UTF_8);
if (BigQueryJdbcOAuthUtility.isJson(credsBytes)) {
return Hashing.sha256().hashString(credentials, StandardCharsets.UTF_8).toString();
}
return credentials;
}

/**
* Initializes or returns the OpenTelemetry instance based on hybrid logic. Prefer
* customOpenTelemetry if provided; fallback to an auto-configured GCP exporter if requested.
*/
public static OpenTelemetry getOpenTelemetry(
boolean enableGcpTraceExporter,
boolean enableGcpLogExporter,
OpenTelemetry customOpenTelemetry) {
OpenTelemetry customOpenTelemetry,
String gcpTelemetryCredentials,
String gcpTelemetryProjectId) {
if (customOpenTelemetry != null) {
return customOpenTelemetry;
}

// NOTE: Currently, tracing only fully supports Application Default Credentials (ADC).
// Once b/503721589 is completed, Service Account (SA) will work as well.

if (enableGcpTraceExporter || enableGcpLogExporter) {
Comment thread
keshavdandeva marked this conversation as resolved.
// TODO(b/491238299): Initialize and return GCP-specific auto-configured SDK
return OpenTelemetry.noop();
SdkCacheKey key =
new SdkCacheKey(
gcpTelemetryProjectId,
getCredentialsIdentifier(gcpTelemetryCredentials),
enableGcpTraceExporter);
return sdkCache.computeIfAbsent(
key,
k -> {
Map<String, String> props = new HashMap<>();
if (gcpTelemetryCredentials != null) {
byte[] credsBytes = gcpTelemetryCredentials.getBytes(StandardCharsets.UTF_8);
if (BigQueryJdbcOAuthUtility.isJson(credsBytes)) {
props.put(CREDENTIALS_JSON, gcpTelemetryCredentials);
} else {
props.put(CREDENTIALS_PATH, gcpTelemetryCredentials);
}
}

if (enableGcpTraceExporter) {
props.put(OTEL_TRACES_EXPORTER, EXPORTER_OTLP);
Comment thread
keshavdandeva marked this conversation as resolved.
props.put(OTEL_EXPORTER_OTLP_ENDPOINT, OTLP_ENDPOINT_VALUE);
} else {
props.put(OTEL_TRACES_EXPORTER, EXPORTER_NONE);
}

// Logs are handled directly via GCP logging
props.put(OTEL_LOGS_EXPORTER, EXPORTER_NONE);
// Metrics are deferred to a future phase
props.put(OTEL_METRICS_EXPORTER, EXPORTER_NONE);

if (gcpTelemetryProjectId != null) {
props.put(GOOGLE_CLOUD_PROJECT, gcpTelemetryProjectId);
}

AutoConfiguredOpenTelemetrySdk autoConfigured =
AutoConfiguredOpenTelemetrySdk.builder().addPropertiesSupplier(() -> props).build();

OpenTelemetrySdk sdk = autoConfigured.getOpenTelemetrySdk();

return sdk;
});
}

return OpenTelemetry.noop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
static final String BIGQUERY_ENDPOINT_OVERRIDE_PROPERTY_NAME = "BIGQUERY";
static final String STS_ENDPOINT_OVERRIDE_PROPERTY_NAME = "STS";
static final String OAUTH_ACCESS_TOKEN_PROPERTY_NAME = "OAuthAccessToken";
static final String GCP_TELEMETRY_PROJECT_ID_PROPERTY_NAME = "gcpTelemetryProjectId";
static final String GCP_TELEMETRY_CREDENTIALS_PROPERTY_NAME = "gcpTelemetryCredentials";
static final String OAUTH_ACCESS_TOKEN_READONLY_PROPERTY_NAME = "OAuthAccessTokenReadonly";
static final String OAUTH_REFRESH_TOKEN_PROPERTY_NAME = "OAuthRefreshToken";
static final String OAUTH_CLIENT_ID_PROPERTY_NAME = "OAuthClientId";
Expand Down Expand Up @@ -628,6 +630,14 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
.setDescription(
"Enables or disables GCP OpenTelemetry Log exporter. Disabled by default.")
.setDefaultValue(String.valueOf(DEFAULT_ENABLE_GCP_LOG_EXPORTER_VALUE))
.build(),
BigQueryConnectionProperty.newBuilder()
.setName(GCP_TELEMETRY_CREDENTIALS_PROPERTY_NAME)
.setDescription("Path or raw JSON of credentials for OTel exporter.")
.build(),
BigQueryConnectionProperty.newBuilder()
.setName(GCP_TELEMETRY_PROJECT_ID_PROPERTY_NAME)
.setDescription("GCP Project ID for OTel exporter.")
.build())));

private static final List<String> NETWORK_PROPERTIES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class DataSource implements javax.sql.DataSource {
private String logLevel;
private Boolean enableSession;
private String logPath;
private String gcpTelemetryProjectId;
private String gcpTelemetryCredentials;
private Integer oAuthType;
private String oAuthServiceAcctEmail;
private String oAuthPvtKeyPath;
Expand Down Expand Up @@ -134,6 +136,12 @@ public class DataSource implements javax.sql.DataSource {
.put(BigQueryJdbcUrlUtility.PROJECT_ID_PROPERTY_NAME, DataSource::setProjectId)
.put(BigQueryJdbcUrlUtility.DEFAULT_DATASET_PROPERTY_NAME, DataSource::setDefaultDataset)
.put(BigQueryJdbcUrlUtility.LOCATION_PROPERTY_NAME, DataSource::setLocation)
.put(
BigQueryJdbcUrlUtility.GCP_TELEMETRY_PROJECT_ID_PROPERTY_NAME,
DataSource::setGcpTelemetryProjectId)
.put(
BigQueryJdbcUrlUtility.GCP_TELEMETRY_CREDENTIALS_PROPERTY_NAME,
DataSource::setGcpTelemetryCredentials)
.put(
BigQueryJdbcUrlUtility.ENABLE_HTAPI_PROPERTY_NAME,
(ds, val) ->
Expand Down Expand Up @@ -867,6 +875,22 @@ public void setLogPath(String logPath) {
this.logPath = logPath;
}

public String getGcpTelemetryProjectId() {
return gcpTelemetryProjectId;
}

public void setGcpTelemetryProjectId(String gcpTelemetryProjectId) {
this.gcpTelemetryProjectId = gcpTelemetryProjectId;
}

public String getGcpTelemetryCredentials() {
return gcpTelemetryCredentials;
}

public void setGcpTelemetryCredentials(String gcpTelemetryCredentials) {
this.gcpTelemetryCredentials = gcpTelemetryCredentials;
}

public String getUniverseDomain() {
return universeDomain != null
? universeDomain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.arrow.vector.util.JsonStringArrayList;
import org.apache.arrow.vector.util.JsonStringHashMap;
import org.apache.arrow.vector.util.Text;
Expand Down Expand Up @@ -138,8 +137,7 @@ BIGNUMERIC, new BigDecimal("11.2657"), new BigDecimal("33.4657")),
LocalDateTime.parse("2023-03-30T11:15:19.820227")),
arrowArraySchemaAndValue(
GEOGRAPHY, new Text("POINT(-122 47)"), new Text("POINT(-122 48)")),
arrowArraySchemaAndValue(
BYTES, Stream.of("one", "two").map(String::getBytes).toArray(byte[][]::new)));
arrowArraySchemaAndValue(BYTES, "one".getBytes(), "two".getBytes()));

List<Field> orderedSchemas =
schemaAndValues.stream().map(Tuple::x).collect(Collectors.toList());
Expand Down
Loading
Loading