diff --git a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs
index f279f815fb..1d5a0beb1e 100644
--- a/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs
+++ b/csharp/src/Apache.Arrow.Adbc/Tracing/ActivityExtensions.cs
@@ -63,6 +63,25 @@ public static class ActivityExtensions
return activity?.AddTag(key, value());
}
+ ///
+ /// Update the Activity to have a tag with an additional 'key' and value 'value'.
+ /// This shows up in the enumeration. It is meant for information that
+ /// is useful to log but not needed for runtime control (for the latter, )
+ ///
+ /// for convenient chaining.
+ /// The tag key name as a function
+ /// The tag value mapped to the input key
+ /// /// The condition to check before adding the tag
+ public static Activity? AddConditionalTag(this Activity? activity, string key, string? value, bool condition)
+ {
+ if (condition)
+ {
+ return activity?.AddTag(key, value);
+ }
+
+ return activity;
+ }
+
///
/// Update the Activity to have a tag with an additional 'key' and value 'value'.
/// This shows up in the enumeration. It is meant for information that
diff --git a/csharp/src/Drivers/BigQuery/ActivityExtensions.cs b/csharp/src/Drivers/BigQuery/ActivityExtensions.cs
new file mode 100644
index 0000000000..11e1db79c6
--- /dev/null
+++ b/csharp/src/Drivers/BigQuery/ActivityExtensions.cs
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System.Diagnostics;
+using Apache.Arrow.Adbc.Tracing;
+
+namespace Apache.Arrow.Adbc.Drivers.BigQuery
+{
+ internal static class ActivityExtensions
+ {
+ private const string bigQueryKeyPrefix = "adbc.bigquery.tracing.";
+ private const string bigQueryParameterKeyValueSuffix = ".value";
+
+ public static Activity AddBigQueryTag(this Activity activity, string key, object? value)
+ {
+ string bigQueryKey = bigQueryKeyPrefix + key;
+ return activity.AddTag(bigQueryKey, value);
+ }
+
+ public static Activity AddConditionalBigQueryTag(this Activity activity, string key, string? value, bool condition)
+ {
+ string bigQueryKey = bigQueryKeyPrefix + key;
+ return activity.AddConditionalTag(key, value, condition)!;
+ }
+
+ public static Activity AddBigQueryParameterTag(this Activity activity, string parameterName, object? value)
+ {
+ if (BigQueryParameters.IsSafeToLog(parameterName))
+ {
+ string bigQueryParameterValueKey = parameterName + bigQueryParameterKeyValueSuffix;
+ return activity.AddTag(bigQueryParameterValueKey, value);
+ }
+
+ return activity;
+ }
+ }
+}
diff --git a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
index f480c5bc54..9855e76be1 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Text;
@@ -24,6 +25,7 @@
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Extensions;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using Apache.Arrow.Types;
using Google.Api.Gax;
@@ -36,16 +38,15 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
///
/// BigQuery-specific implementation of
///
- public class BigQueryConnection : AdbcConnection, ITokenProtectedResource
+ public class BigQueryConnection : TracingConnection, ITokenProtectedResource
{
readonly Dictionary properties;
readonly HttpClient httpClient;
bool includePublicProjectIds = false;
const string infoDriverName = "ADBC BigQuery Driver";
- const string infoDriverVersion = "1.0.1";
const string infoVendorName = "BigQuery";
- const string infoDriverArrowVersion = "19.0.0";
- const string publicProjectId = "bigquery-public-data";
+
+ private readonly string infoDriverArrowVersion = BigQueryUtils.GetAssemblyVersion(typeof(IArrowArray));
readonly AdbcInfoCode[] infoSupportedCodes = new[] {
AdbcInfoCode.DriverName,
@@ -54,7 +55,7 @@ public class BigQueryConnection : AdbcConnection, ITokenProtectedResource
AdbcInfoCode.VendorName
};
- public BigQueryConnection(IReadOnlyDictionary properties)
+ public BigQueryConnection(IReadOnlyDictionary properties) : base(properties)
{
if (properties == null)
{
@@ -89,6 +90,8 @@ public BigQueryConnection(IReadOnlyDictionary properties)
///
public Func? UpdateToken { get; set; }
+ internal string DriverName => infoDriverName;
+
internal BigQueryClient? Client { get; private set; }
internal GoogleCredential? Credential { get; private set; }
@@ -97,136 +100,176 @@ public BigQueryConnection(IReadOnlyDictionary properties)
internal int RetryDelayMs { get; private set; } = 200;
+ public override string AssemblyVersion => BigQueryUtils.BigQueryAssemblyVersion;
+
+ public override string AssemblyName => BigQueryUtils.BigQueryAssemblyName;
+
///
/// Initializes the internal BigQuery connection
///
+ /// A project ID that has been specified by the caller, not a user.
///
- internal BigQueryClient Open()
+ internal BigQueryClient Open(string? projectId = null)
{
- string? projectId = null;
- string? billingProjectId = null;
- TimeSpan? clientTimeout = null;
-
- // if the caller doesn't specify a projectId, use the default
- if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out projectId))
- projectId = BigQueryConstants.DetectProjectId;
-
- // in some situations, the publicProjectId gets passed and causes an error when we try to create a query job:
- // Google.GoogleApiException : The service bigquery has thrown an exception. HttpStatusCode is Forbidden.
- // Access Denied: Project bigquery-public-data: User does not have bigquery.jobs.create permission in
- // project bigquery-public-data.
- // so if that is the case, treat it as if we need to detect the projectId
- if (projectId.Equals(publicProjectId, StringComparison.OrdinalIgnoreCase))
- projectId = BigQueryConstants.DetectProjectId;
-
- // the billing project can be null if it's not specified
- this.properties.TryGetValue(BigQueryParameters.BillingProjectId, out billingProjectId);
-
- if (this.properties.TryGetValue(BigQueryParameters.IncludePublicProjectId, out string? result))
+ return this.TraceActivity(activity =>
{
- if (!string.IsNullOrEmpty(result))
- this.includePublicProjectIds = Convert.ToBoolean(result);
- }
+ string? billingProjectId = null;
+ TimeSpan? clientTimeout = null;
- if (this.properties.TryGetValue(BigQueryParameters.ClientTimeout, out string? timeoutSeconds) &&
- int.TryParse(timeoutSeconds, out int seconds))
- {
- clientTimeout = TimeSpan.FromSeconds(seconds);
- }
+ if (string.IsNullOrEmpty(projectId))
+ {
+ // if the caller doesn't specify a projectId, use the default
+ if (!this.properties.TryGetValue(BigQueryParameters.ProjectId, out projectId))
+ {
+ projectId = BigQueryConstants.DetectProjectId;
+ }
+ else
+ {
+ activity?.AddBigQueryParameterTag(BigQueryParameters.ProjectId, projectId);
+ }
- SetCredential();
+ // in some situations, the publicProjectId gets passed and causes an error when we try to create a query job:
+ // Google.GoogleApiException : The service bigquery has thrown an exception. HttpStatusCode is Forbidden.
+ // Access Denied: Project bigquery-public-data: User does not have bigquery.jobs.create permission in
+ // project bigquery-public-data.
+ // so if that is the case, treat it as if we need to detect the projectId
+ if (projectId.Equals(BigQueryConstants.PublicProjectId, StringComparison.OrdinalIgnoreCase))
+ {
+ projectId = BigQueryConstants.DetectProjectId;
+ activity?.AddBigQueryTag("change_public_projectId_to_detect_project_id", projectId);
+ }
+ }
- BigQueryClientBuilder bigQueryClientBuilder = new BigQueryClientBuilder()
- {
- ProjectId = projectId,
- QuotaProject = billingProjectId,
- GoogleCredential = Credential
- };
+ // the billing project can be null if it's not specified
+ if (this.properties.TryGetValue(BigQueryParameters.BillingProjectId, out billingProjectId))
+ {
+ activity?.AddBigQueryParameterTag((BigQueryParameters.BillingProjectId), billingProjectId);
+ }
- BigQueryClient client = bigQueryClientBuilder.Build();
+ if (this.properties.TryGetValue(BigQueryParameters.IncludePublicProjectId, out string? result))
+ {
+ if (!string.IsNullOrEmpty(result))
+ {
+ this.includePublicProjectIds = Convert.ToBoolean(result);
+ activity?.AddBigQueryParameterTag(BigQueryParameters.IncludePublicProjectId, this.includePublicProjectIds);
+ }
+ }
- if (clientTimeout.HasValue)
- {
- client.Service.HttpClient.Timeout = clientTimeout.Value;
- }
+ if (this.properties.TryGetValue(BigQueryParameters.ClientTimeout, out string? timeoutSeconds) &&
+ int.TryParse(timeoutSeconds, out int seconds))
+ {
+ clientTimeout = TimeSpan.FromSeconds(seconds);
+ activity?.AddBigQueryParameterTag(BigQueryParameters.ClientTimeout, seconds);
+ }
+
+ SetCredential();
+
+ BigQueryClientBuilder bigQueryClientBuilder = new BigQueryClientBuilder()
+ {
+ ProjectId = projectId,
+ QuotaProject = billingProjectId,
+ GoogleCredential = Credential
+ };
+
+ BigQueryClient client = bigQueryClientBuilder.Build();
- Client = client;
- return client;
+ if (clientTimeout.HasValue)
+ {
+ client.Service.HttpClient.Timeout = clientTimeout.Value;
+ }
+
+ Client = client;
+ return client;
+ });
}
internal void SetCredential()
{
- string? clientId = null;
- string? clientSecret = null;
- string? refreshToken = null;
- string? accessToken = null;
- string? audienceUri = null;
- string? authenticationType = null;
-
- string tokenEndpoint = BigQueryConstants.TokenEndpoint;
+ this.TraceActivity(activity =>
+ {
+ string? clientId = null;
+ string? clientSecret = null;
+ string? refreshToken = null;
+ string? accessToken = null;
+ string? audienceUri = null;
+ string? authenticationType = null;
- if (!this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out authenticationType))
- throw new ArgumentException($"The {BigQueryParameters.AuthenticationType} parameter is not present");
+ string tokenEndpoint = BigQueryConstants.TokenEndpoint;
- if (this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out string? newAuthenticationType))
- {
- if (!string.IsNullOrEmpty(newAuthenticationType))
- authenticationType = newAuthenticationType;
+ if (!this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out authenticationType))
+ {
+ throw new ArgumentException($"The {BigQueryParameters.AuthenticationType} parameter is not present");
+ }
- if (!authenticationType.Equals(BigQueryConstants.UserAuthenticationType, StringComparison.OrdinalIgnoreCase) &&
- !authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType, StringComparison.OrdinalIgnoreCase) &&
- !authenticationType.Equals(BigQueryConstants.EntraIdAuthenticationType, StringComparison.OrdinalIgnoreCase))
+ if (this.properties.TryGetValue(BigQueryParameters.AuthenticationType, out string? newAuthenticationType))
{
- throw new ArgumentException($"The {BigQueryParameters.AuthenticationType} parameter can only be `{BigQueryConstants.UserAuthenticationType}`, `{BigQueryConstants.ServiceAccountAuthenticationType}` or `{BigQueryConstants.EntraIdAuthenticationType}`");
+ if (!string.IsNullOrEmpty(newAuthenticationType))
+ authenticationType = newAuthenticationType;
+
+ if (!authenticationType.Equals(BigQueryConstants.UserAuthenticationType, StringComparison.OrdinalIgnoreCase) &&
+ !authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType, StringComparison.OrdinalIgnoreCase) &&
+ !authenticationType.Equals(BigQueryConstants.EntraIdAuthenticationType, StringComparison.OrdinalIgnoreCase))
+ {
+ throw new ArgumentException($"The {BigQueryParameters.AuthenticationType} parameter can only be `{BigQueryConstants.UserAuthenticationType}`, `{BigQueryConstants.ServiceAccountAuthenticationType}` or `{BigQueryConstants.EntraIdAuthenticationType}`");
+ }
+ else
+ {
+ activity?.AddBigQueryParameterTag((BigQueryParameters.AuthenticationType), authenticationType);
+ }
}
- }
- if (!string.IsNullOrEmpty(authenticationType) && authenticationType.Equals(BigQueryConstants.UserAuthenticationType, StringComparison.OrdinalIgnoreCase))
- {
- if (!this.properties.TryGetValue(BigQueryParameters.ClientId, out clientId))
- throw new ArgumentException($"The {BigQueryParameters.ClientId} parameter is not present");
+ if (!string.IsNullOrEmpty(authenticationType) && authenticationType.Equals(BigQueryConstants.UserAuthenticationType, StringComparison.OrdinalIgnoreCase))
+ {
+ if (!this.properties.TryGetValue(BigQueryParameters.ClientId, out clientId))
+ throw new ArgumentException($"The {BigQueryParameters.ClientId} parameter is not present");
- if (!this.properties.TryGetValue(BigQueryParameters.ClientSecret, out clientSecret))
- throw new ArgumentException($"The {BigQueryParameters.ClientSecret} parameter is not present");
+ if (!this.properties.TryGetValue(BigQueryParameters.ClientSecret, out clientSecret))
+ throw new ArgumentException($"The {BigQueryParameters.ClientSecret} parameter is not present");
- if (!this.properties.TryGetValue(BigQueryParameters.RefreshToken, out refreshToken))
- throw new ArgumentException($"The {BigQueryParameters.RefreshToken} parameter is not present");
+ if (!this.properties.TryGetValue(BigQueryParameters.RefreshToken, out refreshToken))
+ throw new ArgumentException($"The {BigQueryParameters.RefreshToken} parameter is not present");
- Credential = ApplyScopes(GoogleCredential.FromAccessToken(GetAccessToken(clientId, clientSecret, refreshToken, tokenEndpoint)));
- }
- else if (!string.IsNullOrEmpty(authenticationType) && authenticationType.Equals(BigQueryConstants.EntraIdAuthenticationType, StringComparison.OrdinalIgnoreCase))
- {
- if (!this.properties.TryGetValue(BigQueryParameters.AccessToken, out accessToken))
- throw new ArgumentException($"The {BigQueryParameters.AccessToken} parameter is not present");
+ Credential = ApplyScopes(GoogleCredential.FromAccessToken(GetAccessToken(clientId, clientSecret, refreshToken, tokenEndpoint)));
+ }
+ else if (!string.IsNullOrEmpty(authenticationType) && authenticationType.Equals(BigQueryConstants.EntraIdAuthenticationType, StringComparison.OrdinalIgnoreCase))
+ {
+ if (!this.properties.TryGetValue(BigQueryParameters.AccessToken, out accessToken))
+ throw new ArgumentException($"The {BigQueryParameters.AccessToken} parameter is not present");
- if (!this.properties.TryGetValue(BigQueryParameters.AudienceUri, out audienceUri))
- throw new ArgumentException($"The {BigQueryParameters.AudienceUri} parameter is not present");
+ if (!this.properties.TryGetValue(BigQueryParameters.AudienceUri, out audienceUri))
+ throw new ArgumentException($"The {BigQueryParameters.AudienceUri} parameter is not present");
- Credential = ApplyScopes(GoogleCredential.FromAccessToken(TradeEntraIdTokenForBigQueryToken(audienceUri, accessToken)));
- }
- else if (!string.IsNullOrEmpty(authenticationType) && authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType, StringComparison.OrdinalIgnoreCase))
- {
- string? json = string.Empty;
+ Credential = ApplyScopes(GoogleCredential.FromAccessToken(TradeEntraIdTokenForBigQueryToken(audienceUri, accessToken)));
+ }
+ else if (!string.IsNullOrEmpty(authenticationType) && authenticationType.Equals(BigQueryConstants.ServiceAccountAuthenticationType, StringComparison.OrdinalIgnoreCase))
+ {
+ string? json = string.Empty;
- if (!this.properties.TryGetValue(BigQueryParameters.JsonCredential, out json))
- throw new ArgumentException($"The {BigQueryParameters.JsonCredential} parameter is not present");
+ if (!this.properties.TryGetValue(BigQueryParameters.JsonCredential, out json))
+ throw new ArgumentException($"The {BigQueryParameters.JsonCredential} parameter is not present");
- Credential = ApplyScopes(GoogleCredential.FromJson(json));
- }
- else
- {
- throw new ArgumentException($"{authenticationType} is not a valid authenticationType");
- }
+ Credential = ApplyScopes(GoogleCredential.FromJson(json));
+ }
+ else
+ {
+ throw new ArgumentException($"{authenticationType} is not a valid authenticationType");
+ }
+ });
}
public override void SetOption(string key, string value)
{
- this.properties[key] = value;
-
- if (key.Equals(BigQueryParameters.AccessToken))
+ this.TraceActivity(activity =>
{
- UpdateClientToken();
- }
+ activity?.AddTag(key + ".set", value);
+
+ this.properties[key] = value;
+
+ if (key.Equals(BigQueryParameters.AccessToken))
+ {
+ UpdateClientToken();
+ }
+ });
}
///
@@ -253,11 +296,13 @@ private GoogleCredential ApplyScopes(GoogleCredential credential)
public override IArrowArrayStream GetInfo(IReadOnlyList codes)
{
- const int strValTypeID = 0;
+ return this.TraceActivity(activity =>
+ {
+ const int strValTypeID = 0;
- UnionType infoUnionType = new UnionType(
- new Field[]
- {
+ UnionType infoUnionType = new UnionType(
+ new Field[]
+ {
new Field("string_value", StringType.Default, true),
new Field("bool_value", BooleanType.Default, true),
new Field("int64_value", Int64Type.Default, true),
@@ -282,89 +327,97 @@ public override IArrowArrayStream GetInfo(IReadOnlyList codes)
),
true
)
- },
- new int[] { 0, 1, 2, 3, 4, 5 }.ToArray(),
- UnionMode.Dense);
+ },
+ new int[] { 0, 1, 2, 3, 4, 5 }.ToArray(),
+ UnionMode.Dense);
- if (codes.Count == 0)
- {
- codes = infoSupportedCodes;
- }
+ if (codes.Count == 0)
+ {
+ codes = infoSupportedCodes;
+ }
- UInt32Array.Builder infoNameBuilder = new UInt32Array.Builder();
- ArrowBuffer.Builder typeBuilder = new ArrowBuffer.Builder();
- ArrowBuffer.Builder offsetBuilder = new ArrowBuffer.Builder();
- StringArray.Builder stringInfoBuilder = new StringArray.Builder();
- int nullCount = 0;
- int arrayLength = codes.Count;
+ UInt32Array.Builder infoNameBuilder = new UInt32Array.Builder();
+ ArrowBuffer.Builder typeBuilder = new ArrowBuffer.Builder();
+ ArrowBuffer.Builder offsetBuilder = new ArrowBuffer.Builder();
+ StringArray.Builder stringInfoBuilder = new StringArray.Builder();
+ int nullCount = 0;
+ int arrayLength = codes.Count;
- foreach (AdbcInfoCode code in codes)
- {
- switch (code)
+ foreach (AdbcInfoCode code in codes)
{
- case AdbcInfoCode.DriverName:
- infoNameBuilder.Append((UInt32)code);
- typeBuilder.Append(strValTypeID);
- offsetBuilder.Append(stringInfoBuilder.Length);
- stringInfoBuilder.Append(infoDriverName);
- break;
- case AdbcInfoCode.DriverVersion:
- infoNameBuilder.Append((UInt32)code);
- typeBuilder.Append(strValTypeID);
- offsetBuilder.Append(stringInfoBuilder.Length);
- stringInfoBuilder.Append(infoDriverVersion);
- break;
- case AdbcInfoCode.DriverArrowVersion:
- infoNameBuilder.Append((UInt32)code);
- typeBuilder.Append(strValTypeID);
- offsetBuilder.Append(stringInfoBuilder.Length);
- stringInfoBuilder.Append(infoDriverArrowVersion);
- break;
- case AdbcInfoCode.VendorName:
- infoNameBuilder.Append((UInt32)code);
- typeBuilder.Append(strValTypeID);
- offsetBuilder.Append(stringInfoBuilder.Length);
- stringInfoBuilder.Append(infoVendorName);
- break;
- default:
- infoNameBuilder.Append((UInt32)code);
- typeBuilder.Append(strValTypeID);
- offsetBuilder.Append(stringInfoBuilder.Length);
- stringInfoBuilder.AppendNull();
- nullCount++;
- break;
+ string tagKey = SemanticConventions.Db.Operation.Parameter(code.ToString().ToLowerInvariant());
+ string? tagValue = null;
+ switch (code)
+ {
+ case AdbcInfoCode.DriverName:
+ infoNameBuilder.Append((UInt32)code);
+ typeBuilder.Append(strValTypeID);
+ offsetBuilder.Append(stringInfoBuilder.Length);
+ stringInfoBuilder.Append(infoDriverName);
+ tagValue = infoDriverName;
+ break;
+ case AdbcInfoCode.DriverVersion:
+ infoNameBuilder.Append((UInt32)code);
+ typeBuilder.Append(strValTypeID);
+ offsetBuilder.Append(stringInfoBuilder.Length);
+ stringInfoBuilder.Append(BigQueryUtils.BigQueryAssemblyVersion);
+ tagValue = BigQueryUtils.BigQueryAssemblyVersion;
+ break;
+ case AdbcInfoCode.DriverArrowVersion:
+ infoNameBuilder.Append((UInt32)code);
+ typeBuilder.Append(strValTypeID);
+ offsetBuilder.Append(stringInfoBuilder.Length);
+ stringInfoBuilder.Append(infoDriverArrowVersion);
+ tagValue = infoDriverArrowVersion;
+ break;
+ case AdbcInfoCode.VendorName:
+ infoNameBuilder.Append((UInt32)code);
+ typeBuilder.Append(strValTypeID);
+ offsetBuilder.Append(stringInfoBuilder.Length);
+ stringInfoBuilder.Append(infoVendorName);
+ tagValue = infoVendorName;
+ break;
+ default:
+ infoNameBuilder.Append((UInt32)code);
+ typeBuilder.Append(strValTypeID);
+ offsetBuilder.Append(stringInfoBuilder.Length);
+ stringInfoBuilder.AppendNull();
+ nullCount++;
+ break;
+ }
+ activity?.AddTag(tagKey, tagValue);
}
- }
- StructType entryType = new StructType(
- new Field[] {
+ StructType entryType = new StructType(
+ new Field[] {
new Field("key", Int32Type.Default, false),
new Field("value", Int32Type.Default, true)});
- StructArray entriesDataArray = new StructArray(entryType, 0,
- new[] { new Int32Array.Builder().Build(), new Int32Array.Builder().Build() },
- new ArrowBuffer.BitmapBuilder().Build());
+ StructArray entriesDataArray = new StructArray(entryType, 0,
+ new[] { new Int32Array.Builder().Build(), new Int32Array.Builder().Build() },
+ new ArrowBuffer.BitmapBuilder().Build());
- IArrowArray[] childrenArrays = new IArrowArray[]
- {
+ IArrowArray[] childrenArrays = new IArrowArray[]
+ {
stringInfoBuilder.Build(),
new BooleanArray.Builder().Build(),
new Int64Array.Builder().Build(),
new Int32Array.Builder().Build(),
new ListArray.Builder(StringType.Default).Build(),
new List(){ entriesDataArray }.BuildListArrayForType(entryType)
- };
+ };
- DenseUnionArray infoValue = new DenseUnionArray(infoUnionType, arrayLength, childrenArrays, typeBuilder.Build(), offsetBuilder.Build(), nullCount);
+ DenseUnionArray infoValue = new DenseUnionArray(infoUnionType, arrayLength, childrenArrays, typeBuilder.Build(), offsetBuilder.Build(), nullCount);
- IArrowArray[] dataArrays = new IArrowArray[]
- {
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
infoNameBuilder.Build(),
infoValue
- };
- StandardSchemas.GetInfoSchema.Validate(dataArrays);
+ };
+ StandardSchemas.GetInfoSchema.Validate(dataArrays);
- return new BigQueryInfoArrowStream(StandardSchemas.GetInfoSchema, dataArrays);
+ return new BigQueryInfoArrowStream(StandardSchemas.GetInfoSchema, dataArrays);
+ });
}
public override IArrowArrayStream GetObjects(
@@ -375,10 +428,13 @@ public override IArrowArrayStream GetObjects(
IReadOnlyList? tableTypes,
string? columnNamePattern)
{
- IArrowArray[] dataArrays = GetCatalogs(depth, catalogPattern, dbSchemaPattern,
- tableNamePattern, tableTypes, columnNamePattern);
+ return this.TraceActivity(activity =>
+ {
+ IArrowArray[] dataArrays = GetCatalogs(depth, catalogPattern, dbSchemaPattern,
+ tableNamePattern, tableTypes, columnNamePattern);
- return new BigQueryInfoArrowStream(StandardSchemas.GetObjectsSchema, dataArrays);
+ return new BigQueryInfoArrowStream(StandardSchemas.GetObjectsSchema, dataArrays);
+ });
}
///
@@ -395,7 +451,7 @@ internal void UpdateClientToken()
///
public bool TokenRequiresUpdate(Exception ex) => BigQueryUtils.TokenRequiresUpdate(ex);
- private async Task ExecuteWithRetriesAsync(Func> action) => await RetryManager.ExecuteWithRetriesAsync(this, action, MaxRetryAttempts, RetryDelayMs);
+ private async Task ExecuteWithRetriesAsync(Func> action, Activity? activity) => await RetryManager.ExecuteWithRetriesAsync(this, action, activity, MaxRetryAttempts, RetryDelayMs);
///
/// Executes the query using the BigQueryClient.
@@ -412,10 +468,15 @@ internal void UpdateClientToken()
{
if (Client == null) { Client = Open(); }
- Func> func = () => Client.ExecuteQueryAsync(sql, parameters ?? Enumerable.Empty(), queryOptions, resultsOptions);
- BigQueryResults? result = ExecuteWithRetriesAsync(func).GetAwaiter().GetResult();
+ return this.TraceActivity(activity =>
+ {
+ activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, sql, BigQueryUtils.IsSafeToTrace());
+
+ Func> func = () => Client.ExecuteQueryAsync(sql, parameters ?? Enumerable.Empty(), queryOptions, resultsOptions);
+ BigQueryResults? result = ExecuteWithRetriesAsync(func, activity).GetAwaiter().GetResult();
- return result;
+ return result;
+ });
}
private IArrowArray[] GetCatalogs(
@@ -426,58 +487,61 @@ private IArrowArray[] GetCatalogs(
IReadOnlyList? tableTypes,
string? columnNamePattern)
{
- StringArray.Builder catalogNameBuilder = new StringArray.Builder();
- List catalogDbSchemasValues = new List();
- string catalogRegexp = PatternToRegEx(catalogPattern);
- PagedEnumerable? catalogs;
- List projectIds = new List();
-
- Func?>> func = () => Task.Run(() =>
+ return this.TraceActivity(activity =>
{
- // stick with this call because PagedAsyncEnumerable has different behaviors for selecting items
- return Client?.ListProjects();
- });
+ StringArray.Builder catalogNameBuilder = new StringArray.Builder();
+ List catalogDbSchemasValues = new List();
+ string catalogRegexp = PatternToRegEx(catalogPattern);
+ PagedEnumerable? catalogs;
+ List projectIds = new List();
- catalogs = ExecuteWithRetriesAsync?>(func).GetAwaiter().GetResult();
+ Func?>> func = () => Task.Run(() =>
+ {
+ // stick with this call because PagedAsyncEnumerable has different behaviors for selecting items
+ return Client?.ListProjects();
+ });
- if (catalogs != null)
- {
- projectIds = catalogs.Select(x => x.ProjectId).ToList();
- }
+ catalogs = ExecuteWithRetriesAsync?>(func, activity).GetAwaiter().GetResult();
- if (this.includePublicProjectIds && !projectIds.Contains(publicProjectId))
- projectIds.Add(publicProjectId);
+ if (catalogs != null)
+ {
+ projectIds = catalogs.Select(x => x.ProjectId).ToList();
+ }
- projectIds.Sort();
+ if (this.includePublicProjectIds && !projectIds.Contains(BigQueryConstants.PublicProjectId))
+ projectIds.Add(BigQueryConstants.PublicProjectId);
- foreach (string projectId in projectIds)
- {
- if (Regex.IsMatch(projectId, catalogRegexp, RegexOptions.IgnoreCase))
- {
- catalogNameBuilder.Append(projectId);
+ projectIds.Sort();
- if (depth == GetObjectsDepth.Catalogs)
- {
- catalogDbSchemasValues.Add(null);
- }
- else
+ foreach (string projectId in projectIds)
+ {
+ if (Regex.IsMatch(projectId, catalogRegexp, RegexOptions.IgnoreCase))
{
- catalogDbSchemasValues.Add(GetDbSchemas(
- depth, projectId, dbSchemaPattern,
- tableNamePattern, tableTypes, columnNamePattern));
+ catalogNameBuilder.Append(projectId);
+
+ if (depth == GetObjectsDepth.Catalogs)
+ {
+ catalogDbSchemasValues.Add(null);
+ }
+ else
+ {
+ catalogDbSchemasValues.Add(GetDbSchemas(
+ depth, projectId, dbSchemaPattern,
+ tableNamePattern, tableTypes, columnNamePattern));
+ }
}
}
- }
- IArrowArray[] dataArrays = new IArrowArray[]
- {
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
catalogNameBuilder.Build(),
catalogDbSchemasValues.BuildListArrayForType(new StructType(StandardSchemas.DbSchemaSchema)),
- };
+ };
- StandardSchemas.GetObjectsSchema.Validate(dataArrays);
+ StandardSchemas.GetObjectsSchema.Validate(dataArrays);
- return dataArrays;
+ return dataArrays;
+ });
}
private StructArray GetDbSchemas(
@@ -488,57 +552,60 @@ private StructArray GetDbSchemas(
IReadOnlyList? tableTypes,
string? columnNamePattern)
{
- StringArray.Builder dbSchemaNameBuilder = new StringArray.Builder();
- List dbSchemaTablesValues = new List();
- ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
- int length = 0;
+ return this.TraceActivity(activity =>
+ {
+ StringArray.Builder dbSchemaNameBuilder = new StringArray.Builder();
+ List dbSchemaTablesValues = new List();
+ ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
+ int length = 0;
- string dbSchemaRegexp = PatternToRegEx(dbSchemaPattern);
+ string dbSchemaRegexp = PatternToRegEx(dbSchemaPattern);
- Func?>> func = () => Task.Run(() =>
- {
- // stick with this call because PagedAsyncEnumerable has different behaviors for selecting items
- return Client?.ListDatasets(catalog);
- });
+ Func?>> func = () => Task.Run(() =>
+ {
+ // stick with this call because PagedAsyncEnumerable has different behaviors for selecting items
+ return Client?.ListDatasets(catalog);
+ });
- PagedEnumerable? schemas = ExecuteWithRetriesAsync?>(func).GetAwaiter().GetResult();
+ PagedEnumerable? schemas = ExecuteWithRetriesAsync?>(func, activity).GetAwaiter().GetResult();
- if (schemas != null)
- {
- foreach (BigQueryDataset schema in schemas)
+ if (schemas != null)
{
- if (Regex.IsMatch(schema.Reference.DatasetId, dbSchemaRegexp, RegexOptions.IgnoreCase))
+ foreach (BigQueryDataset schema in schemas)
{
- dbSchemaNameBuilder.Append(schema.Reference.DatasetId);
- length++;
- nullBitmapBuffer.Append(true);
-
- if (depth == GetObjectsDepth.DbSchemas)
- {
- dbSchemaTablesValues.Add(null);
- }
- else
+ if (Regex.IsMatch(schema.Reference.DatasetId, dbSchemaRegexp, RegexOptions.IgnoreCase))
{
- dbSchemaTablesValues.Add(GetTableSchemas(
- depth, catalog, schema.Reference.DatasetId,
- tableNamePattern, tableTypes, columnNamePattern));
+ dbSchemaNameBuilder.Append(schema.Reference.DatasetId);
+ length++;
+ nullBitmapBuffer.Append(true);
+
+ if (depth == GetObjectsDepth.DbSchemas)
+ {
+ dbSchemaTablesValues.Add(null);
+ }
+ else
+ {
+ dbSchemaTablesValues.Add(GetTableSchemas(
+ depth, catalog, schema.Reference.DatasetId,
+ tableNamePattern, tableTypes, columnNamePattern));
+ }
}
}
}
- }
- IArrowArray[] dataArrays = new IArrowArray[]
- {
- dbSchemaNameBuilder.Build(),
- dbSchemaTablesValues.BuildListArrayForType(new StructType(StandardSchemas.TableSchema)),
- };
- StandardSchemas.DbSchemaSchema.Validate(dataArrays);
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
+ dbSchemaNameBuilder.Build(),
+ dbSchemaTablesValues.BuildListArrayForType(new StructType(StandardSchemas.TableSchema)),
+ };
+ StandardSchemas.DbSchemaSchema.Validate(dataArrays);
- return new StructArray(
- new StructType(StandardSchemas.DbSchemaSchema),
- length,
- dataArrays,
- nullBitmapBuffer.Build());
+ return new StructArray(
+ new StructType(StandardSchemas.DbSchemaSchema),
+ length,
+ dataArrays,
+ nullBitmapBuffer.Build());
+ });
}
private StructArray GetTableSchemas(
@@ -549,87 +616,90 @@ private StructArray GetTableSchemas(
IReadOnlyList? tableTypes,
string? columnNamePattern)
{
- StringArray.Builder tableNameBuilder = new StringArray.Builder();
- StringArray.Builder tableTypeBuilder = new StringArray.Builder();
- List tableColumnsValues = new List();
- List tableConstraintsValues = new List();
- ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
- int length = 0;
+ return this.TraceActivity(activity =>
+ {
+ StringArray.Builder tableNameBuilder = new StringArray.Builder();
+ StringArray.Builder tableTypeBuilder = new StringArray.Builder();
+ List tableColumnsValues = new List();
+ List tableConstraintsValues = new List();
+ ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
+ int length = 0;
- string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.TABLES",
- Sanitize(catalog), Sanitize(dbSchema));
+ string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.TABLES",
+ Sanitize(catalog), Sanitize(dbSchema));
- if (tableNamePattern != null)
- {
- query = string.Concat(query, string.Format(" WHERE table_name LIKE '{0}'", Sanitize(tableNamePattern)));
- if (tableTypes?.Count > 0)
+ if (tableNamePattern != null)
{
- IEnumerable sanitizedTypes = tableTypes.Select(x => Sanitize(x));
- query = string.Concat(query, string.Format(" AND table_type IN ('{0}')", string.Join("', '", sanitizedTypes).ToUpper()));
+ query = string.Concat(query, string.Format(" WHERE table_name LIKE '{0}'", Sanitize(tableNamePattern)));
+ if (tableTypes?.Count > 0)
+ {
+ IEnumerable sanitizedTypes = tableTypes.Select(x => Sanitize(x));
+ query = string.Concat(query, string.Format(" AND table_type IN ('{0}')", string.Join("', '", sanitizedTypes).ToUpper()));
+ }
}
- }
- else
- {
- if (tableTypes?.Count > 0)
+ else
{
- IEnumerable sanitizedTypes = tableTypes.Select(x => Sanitize(x));
- query = string.Concat(query, string.Format(" WHERE table_type IN ('{0}')", string.Join("', '", sanitizedTypes).ToUpper()));
+ if (tableTypes?.Count > 0)
+ {
+ IEnumerable sanitizedTypes = tableTypes.Select(x => Sanitize(x));
+ query = string.Concat(query, string.Format(" WHERE table_type IN ('{0}')", string.Join("', '", sanitizedTypes).ToUpper()));
+ }
}
- }
- BigQueryResults? result = ExecuteQuery(query, parameters: null);
-
- if (result != null)
- {
- bool includeConstraints = true;
-
- if (this.properties.TryGetValue(BigQueryParameters.IncludeConstraintsWithGetObjects, out string? includeConstraintsValue))
- {
- bool.TryParse(includeConstraintsValue, out includeConstraints);
- }
+ BigQueryResults? result = ExecuteQuery(query, parameters: null);
- foreach (BigQueryRow row in result)
+ if (result != null)
{
- tableNameBuilder.Append(GetValue(row["table_name"]));
- tableTypeBuilder.Append(GetValue(row["table_type"]));
- nullBitmapBuffer.Append(true);
- length++;
+ bool includeConstraints = true;
- if (depth == GetObjectsDepth.All && includeConstraints)
- {
- tableConstraintsValues.Add(GetConstraintSchema(
- depth, catalog, dbSchema, GetValue(row["table_name"]), columnNamePattern));
- }
- else
+ if (this.properties.TryGetValue(BigQueryParameters.IncludeConstraintsWithGetObjects, out string? includeConstraintsValue))
{
- tableConstraintsValues.Add(null);
+ bool.TryParse(includeConstraintsValue, out includeConstraints);
}
- if (depth == GetObjectsDepth.Tables)
+ foreach (BigQueryRow row in result)
{
- tableColumnsValues.Add(null);
- }
- else
- {
- tableColumnsValues.Add(GetColumnSchema(catalog, dbSchema, GetValue(row["table_name"]), columnNamePattern));
+ tableNameBuilder.Append(GetValue(row["table_name"]));
+ tableTypeBuilder.Append(GetValue(row["table_type"]));
+ nullBitmapBuffer.Append(true);
+ length++;
+
+ if (depth == GetObjectsDepth.All && includeConstraints)
+ {
+ tableConstraintsValues.Add(GetConstraintSchema(
+ depth, catalog, dbSchema, GetValue(row["table_name"]), columnNamePattern));
+ }
+ else
+ {
+ tableConstraintsValues.Add(null);
+ }
+
+ if (depth == GetObjectsDepth.Tables)
+ {
+ tableColumnsValues.Add(null);
+ }
+ else
+ {
+ tableColumnsValues.Add(GetColumnSchema(catalog, dbSchema, GetValue(row["table_name"]), columnNamePattern));
+ }
}
}
- }
- IArrowArray[] dataArrays = new IArrowArray[]
- {
- tableNameBuilder.Build(),
- tableTypeBuilder.Build(),
- tableColumnsValues.BuildListArrayForType(new StructType(StandardSchemas.ColumnSchema)),
- tableConstraintsValues.BuildListArrayForType(new StructType(StandardSchemas.ConstraintSchema))
- };
- StandardSchemas.TableSchema.Validate(dataArrays);
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
+ tableNameBuilder.Build(),
+ tableTypeBuilder.Build(),
+ tableColumnsValues.BuildListArrayForType(new StructType(StandardSchemas.ColumnSchema)),
+ tableConstraintsValues.BuildListArrayForType(new StructType(StandardSchemas.ConstraintSchema))
+ };
+ StandardSchemas.TableSchema.Validate(dataArrays);
- return new StructArray(
- new StructType(StandardSchemas.TableSchema),
- length,
- dataArrays,
- nullBitmapBuffer.Build());
+ return new StructArray(
+ new StructType(StandardSchemas.TableSchema),
+ length,
+ dataArrays,
+ nullBitmapBuffer.Build());
+ });
}
private StructArray GetColumnSchema(
@@ -638,111 +708,114 @@ private StructArray GetColumnSchema(
string table,
string? columnNamePattern)
{
- StringArray.Builder columnNameBuilder = new StringArray.Builder();
- Int32Array.Builder ordinalPositionBuilder = new Int32Array.Builder();
- StringArray.Builder remarksBuilder = new StringArray.Builder();
- Int16Array.Builder xdbcDataTypeBuilder = new Int16Array.Builder();
- StringArray.Builder xdbcTypeNameBuilder = new StringArray.Builder();
- Int32Array.Builder xdbcColumnSizeBuilder = new Int32Array.Builder();
- Int16Array.Builder xdbcDecimalDigitsBuilder = new Int16Array.Builder();
- Int16Array.Builder xdbcNumPrecRadixBuilder = new Int16Array.Builder();
- Int16Array.Builder xdbcNullableBuilder = new Int16Array.Builder();
- StringArray.Builder xdbcColumnDefBuilder = new StringArray.Builder();
- Int16Array.Builder xdbcSqlDataTypeBuilder = new Int16Array.Builder();
- Int16Array.Builder xdbcDatetimeSubBuilder = new Int16Array.Builder();
- Int32Array.Builder xdbcCharOctetLengthBuilder = new Int32Array.Builder();
- StringArray.Builder xdbcIsNullableBuilder = new StringArray.Builder();
- StringArray.Builder xdbcScopeCatalogBuilder = new StringArray.Builder();
- StringArray.Builder xdbcScopeSchemaBuilder = new StringArray.Builder();
- StringArray.Builder xdbcScopeTableBuilder = new StringArray.Builder();
- BooleanArray.Builder xdbcIsAutoincrementBuilder = new BooleanArray.Builder();
- BooleanArray.Builder xdbcIsGeneratedcolumnBuilder = new BooleanArray.Builder();
- ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
- int length = 0;
-
- string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{2}'",
- Sanitize(catalog), Sanitize(dbSchema), Sanitize(table));
-
- if (columnNamePattern != null)
+ return this.TraceActivity(activity =>
{
- query = string.Concat(query, string.Format("AND column_name LIKE '{0}'", Sanitize(columnNamePattern)));
- }
+ StringArray.Builder columnNameBuilder = new StringArray.Builder();
+ Int32Array.Builder ordinalPositionBuilder = new Int32Array.Builder();
+ StringArray.Builder remarksBuilder = new StringArray.Builder();
+ Int16Array.Builder xdbcDataTypeBuilder = new Int16Array.Builder();
+ StringArray.Builder xdbcTypeNameBuilder = new StringArray.Builder();
+ Int32Array.Builder xdbcColumnSizeBuilder = new Int32Array.Builder();
+ Int16Array.Builder xdbcDecimalDigitsBuilder = new Int16Array.Builder();
+ Int16Array.Builder xdbcNumPrecRadixBuilder = new Int16Array.Builder();
+ Int16Array.Builder xdbcNullableBuilder = new Int16Array.Builder();
+ StringArray.Builder xdbcColumnDefBuilder = new StringArray.Builder();
+ Int16Array.Builder xdbcSqlDataTypeBuilder = new Int16Array.Builder();
+ Int16Array.Builder xdbcDatetimeSubBuilder = new Int16Array.Builder();
+ Int32Array.Builder xdbcCharOctetLengthBuilder = new Int32Array.Builder();
+ StringArray.Builder xdbcIsNullableBuilder = new StringArray.Builder();
+ StringArray.Builder xdbcScopeCatalogBuilder = new StringArray.Builder();
+ StringArray.Builder xdbcScopeSchemaBuilder = new StringArray.Builder();
+ StringArray.Builder xdbcScopeTableBuilder = new StringArray.Builder();
+ BooleanArray.Builder xdbcIsAutoincrementBuilder = new BooleanArray.Builder();
+ BooleanArray.Builder xdbcIsGeneratedcolumnBuilder = new BooleanArray.Builder();
+ ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
+ int length = 0;
+
+ string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{2}'",
+ Sanitize(catalog), Sanitize(dbSchema), Sanitize(table));
+
+ if (columnNamePattern != null)
+ {
+ query = string.Concat(query, string.Format("AND column_name LIKE '{0}'", Sanitize(columnNamePattern)));
+ }
- BigQueryResults? result = ExecuteQuery(query, parameters: null);
+ BigQueryResults? result = ExecuteQuery(query, parameters: null);
- if (result != null)
- {
- foreach (BigQueryRow row in result)
+ if (result != null)
{
- columnNameBuilder.Append(GetValue(row["column_name"]));
- ordinalPositionBuilder.Append((int)(long)row["ordinal_position"]);
- remarksBuilder.Append("");
+ foreach (BigQueryRow row in result)
+ {
+ columnNameBuilder.Append(GetValue(row["column_name"]));
+ ordinalPositionBuilder.Append((int)(long)row["ordinal_position"]);
+ remarksBuilder.Append("");
- string dataType = ToTypeName(GetValue(row["data_type"]), out string suffix);
+ string dataType = ToTypeName(GetValue(row["data_type"]), out string suffix);
- if ((dataType.StartsWith("NUMERIC") ||
- dataType.StartsWith("DECIMAL") ||
- dataType.StartsWith("BIGNUMERIC") ||
- dataType.StartsWith("BIGDECIMAL"))
- && !string.IsNullOrEmpty(suffix))
- {
- ParsedDecimalValues values = ParsePrecisionAndScale(suffix);
- xdbcColumnSizeBuilder.Append(values.Precision);
- xdbcDecimalDigitsBuilder.Append(Convert.ToInt16(values.Scale));
- }
- else
- {
- xdbcColumnSizeBuilder.AppendNull();
- xdbcDecimalDigitsBuilder.AppendNull();
- }
+ if ((dataType.StartsWith("NUMERIC") ||
+ dataType.StartsWith("DECIMAL") ||
+ dataType.StartsWith("BIGNUMERIC") ||
+ dataType.StartsWith("BIGDECIMAL"))
+ && !string.IsNullOrEmpty(suffix))
+ {
+ ParsedDecimalValues values = ParsePrecisionAndScale(suffix);
+ xdbcColumnSizeBuilder.Append(values.Precision);
+ xdbcDecimalDigitsBuilder.Append(Convert.ToInt16(values.Scale));
+ }
+ else
+ {
+ xdbcColumnSizeBuilder.AppendNull();
+ xdbcDecimalDigitsBuilder.AppendNull();
+ }
- xdbcDataTypeBuilder.AppendNull();
- xdbcTypeNameBuilder.Append(dataType);
- xdbcNumPrecRadixBuilder.AppendNull();
- xdbcNullableBuilder.AppendNull();
- xdbcColumnDefBuilder.AppendNull();
- xdbcSqlDataTypeBuilder.Append((short)ToXdbcDataType(dataType));
- xdbcDatetimeSubBuilder.AppendNull();
- xdbcCharOctetLengthBuilder.AppendNull();
- xdbcIsNullableBuilder.Append(row["is_nullable"].ToString());
- xdbcScopeCatalogBuilder.AppendNull();
- xdbcScopeSchemaBuilder.AppendNull();
- xdbcScopeTableBuilder.AppendNull();
- xdbcIsAutoincrementBuilder.AppendNull();
- xdbcIsGeneratedcolumnBuilder.Append(GetValue(row["is_generated"]).ToUpper() == "YES");
- nullBitmapBuffer.Append(true);
- length++;
+ xdbcDataTypeBuilder.AppendNull();
+ xdbcTypeNameBuilder.Append(dataType);
+ xdbcNumPrecRadixBuilder.AppendNull();
+ xdbcNullableBuilder.AppendNull();
+ xdbcColumnDefBuilder.AppendNull();
+ xdbcSqlDataTypeBuilder.Append((short)ToXdbcDataType(dataType));
+ xdbcDatetimeSubBuilder.AppendNull();
+ xdbcCharOctetLengthBuilder.AppendNull();
+ xdbcIsNullableBuilder.Append(row["is_nullable"].ToString());
+ xdbcScopeCatalogBuilder.AppendNull();
+ xdbcScopeSchemaBuilder.AppendNull();
+ xdbcScopeTableBuilder.AppendNull();
+ xdbcIsAutoincrementBuilder.AppendNull();
+ xdbcIsGeneratedcolumnBuilder.Append(GetValue(row["is_generated"]).ToUpper() == "YES");
+ nullBitmapBuffer.Append(true);
+ length++;
+ }
}
- }
- IArrowArray[] dataArrays = new IArrowArray[]
- {
- columnNameBuilder.Build(),
- ordinalPositionBuilder.Build(),
- remarksBuilder.Build(),
- xdbcDataTypeBuilder.Build(),
- xdbcTypeNameBuilder.Build(),
- xdbcColumnSizeBuilder.Build(),
- xdbcDecimalDigitsBuilder.Build(),
- xdbcNumPrecRadixBuilder.Build(),
- xdbcNullableBuilder.Build(),
- xdbcColumnDefBuilder.Build(),
- xdbcSqlDataTypeBuilder.Build(),
- xdbcDatetimeSubBuilder.Build(),
- xdbcCharOctetLengthBuilder.Build(),
- xdbcIsNullableBuilder.Build(),
- xdbcScopeCatalogBuilder.Build(),
- xdbcScopeSchemaBuilder.Build(),
- xdbcScopeTableBuilder.Build(),
- xdbcIsAutoincrementBuilder.Build(),
- xdbcIsGeneratedcolumnBuilder.Build()
- };
- StandardSchemas.ColumnSchema.Validate(dataArrays);
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
+ columnNameBuilder.Build(),
+ ordinalPositionBuilder.Build(),
+ remarksBuilder.Build(),
+ xdbcDataTypeBuilder.Build(),
+ xdbcTypeNameBuilder.Build(),
+ xdbcColumnSizeBuilder.Build(),
+ xdbcDecimalDigitsBuilder.Build(),
+ xdbcNumPrecRadixBuilder.Build(),
+ xdbcNullableBuilder.Build(),
+ xdbcColumnDefBuilder.Build(),
+ xdbcSqlDataTypeBuilder.Build(),
+ xdbcDatetimeSubBuilder.Build(),
+ xdbcCharOctetLengthBuilder.Build(),
+ xdbcIsNullableBuilder.Build(),
+ xdbcScopeCatalogBuilder.Build(),
+ xdbcScopeSchemaBuilder.Build(),
+ xdbcScopeTableBuilder.Build(),
+ xdbcIsAutoincrementBuilder.Build(),
+ xdbcIsGeneratedcolumnBuilder.Build()
+ };
+ StandardSchemas.ColumnSchema.Validate(dataArrays);
- return new StructArray(
- new StructType(StandardSchemas.ColumnSchema),
- length,
- dataArrays,
- nullBitmapBuffer.Build());
+ return new StructArray(
+ new StructType(StandardSchemas.ColumnSchema),
+ length,
+ dataArrays,
+ nullBitmapBuffer.Build());
+ });
}
private StructArray GetConstraintSchema(
@@ -752,66 +825,69 @@ private StructArray GetConstraintSchema(
string table,
string? columnNamePattern)
{
- StringArray.Builder constraintNameBuilder = new StringArray.Builder();
- StringArray.Builder constraintTypeBuilder = new StringArray.Builder();
- List constraintColumnNamesValues = new List();
- List constraintColumnUsageValues = new List();
- ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
- int length = 0;
+ return this.TraceActivity(activity =>
+ {
+ StringArray.Builder constraintNameBuilder = new StringArray.Builder();
+ StringArray.Builder constraintTypeBuilder = new StringArray.Builder();
+ List constraintColumnNamesValues = new List();
+ List constraintColumnUsageValues = new List();
+ ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
+ int length = 0;
- string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE table_name = '{2}'",
- Sanitize(catalog), Sanitize(dbSchema), Sanitize(table));
+ string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE table_name = '{2}'",
+ Sanitize(catalog), Sanitize(dbSchema), Sanitize(table));
- BigQueryResults? result = ExecuteQuery(query, parameters: null);
+ BigQueryResults? result = ExecuteQuery(query, parameters: null);
- if (result != null)
- {
- foreach (BigQueryRow row in result)
+ if (result != null)
{
- string constraintName = GetValue(row["constraint_name"]);
- constraintNameBuilder.Append(constraintName);
- string constraintType = GetValue(row["constraint_type"]);
- constraintTypeBuilder.Append(constraintType);
- nullBitmapBuffer.Append(true);
- length++;
-
- if (depth == GetObjectsDepth.All || depth == GetObjectsDepth.Tables)
+ foreach (BigQueryRow row in result)
{
- constraintColumnNamesValues.Add(GetConstraintColumnNames(
- catalog, dbSchema, table, constraintName));
- if (constraintType.ToUpper() == "FOREIGN KEY")
+ string constraintName = GetValue(row["constraint_name"]);
+ constraintNameBuilder.Append(constraintName);
+ string constraintType = GetValue(row["constraint_type"]);
+ constraintTypeBuilder.Append(constraintType);
+ nullBitmapBuffer.Append(true);
+ length++;
+
+ if (depth == GetObjectsDepth.All || depth == GetObjectsDepth.Tables)
{
- constraintColumnUsageValues.Add(GetConstraintsUsage(
+ constraintColumnNamesValues.Add(GetConstraintColumnNames(
catalog, dbSchema, table, constraintName));
+ if (constraintType.ToUpper() == "FOREIGN KEY")
+ {
+ constraintColumnUsageValues.Add(GetConstraintsUsage(
+ catalog, dbSchema, table, constraintName));
+ }
+ else
+ {
+ constraintColumnUsageValues.Add(null);
+ }
}
else
{
+ constraintColumnNamesValues.Add(null);
constraintColumnUsageValues.Add(null);
}
}
- else
- {
- constraintColumnNamesValues.Add(null);
- constraintColumnUsageValues.Add(null);
- }
}
- }
- IArrowArray[] dataArrays = new IArrowArray[]
- {
- constraintNameBuilder.Build(),
- constraintTypeBuilder.Build(),
- constraintColumnNamesValues.BuildListArrayForType(StringType.Default),
- constraintColumnUsageValues.BuildListArrayForType(new StructType(StandardSchemas.UsageSchema))
- };
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
+ constraintNameBuilder.Build(),
+ constraintTypeBuilder.Build(),
+ constraintColumnNamesValues.BuildListArrayForType(StringType.Default),
+ constraintColumnUsageValues.BuildListArrayForType(new StructType(StandardSchemas.UsageSchema))
+ };
- StandardSchemas.ConstraintSchema.Validate(dataArrays);
+ StandardSchemas.ConstraintSchema.Validate(dataArrays);
- return new StructArray(
- new StructType(StandardSchemas.ConstraintSchema),
- length,
- dataArrays,
- nullBitmapBuffer.Build());
+ return new StructArray(
+ new StructType(StandardSchemas.ConstraintSchema),
+ length,
+ dataArrays,
+ nullBitmapBuffer.Build());
+ });
}
private StringArray GetConstraintColumnNames(
@@ -820,23 +896,26 @@ private StringArray GetConstraintColumnNames(
string table,
string constraintName)
{
- string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE table_name = '{2}' AND constraint_name = '{3}' ORDER BY ordinal_position",
+ return this.TraceActivity(activity =>
+ {
+ string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE table_name = '{2}' AND constraint_name = '{3}' ORDER BY ordinal_position",
Sanitize(catalog), Sanitize(dbSchema), Sanitize(table), Sanitize(constraintName));
- StringArray.Builder constraintColumnNamesBuilder = new StringArray.Builder();
+ StringArray.Builder constraintColumnNamesBuilder = new StringArray.Builder();
- BigQueryResults? result = ExecuteQuery(query, parameters: null);
+ BigQueryResults? result = ExecuteQuery(query, parameters: null);
- if (result != null)
- {
- foreach (BigQueryRow row in result)
+ if (result != null)
{
- string column = GetValue(row["column_name"]);
- constraintColumnNamesBuilder.Append(column);
+ foreach (BigQueryRow row in result)
+ {
+ string column = GetValue(row["column_name"]);
+ constraintColumnNamesBuilder.Append(column);
+ }
}
- }
- return constraintColumnNamesBuilder.Build();
+ return constraintColumnNamesBuilder.Build();
+ });
}
private StructArray GetConstraintsUsage(
@@ -845,51 +924,54 @@ private StructArray GetConstraintsUsage(
string table,
string constraintName)
{
- StringArray.Builder constraintFkCatalogBuilder = new StringArray.Builder();
- StringArray.Builder constraintFkDbSchemaBuilder = new StringArray.Builder();
- StringArray.Builder constraintFkTableBuilder = new StringArray.Builder();
- StringArray.Builder constraintFkColumnNameBuilder = new StringArray.Builder();
- ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
- int length = 0;
+ return this.TraceActivity(activity =>
+ {
+ StringArray.Builder constraintFkCatalogBuilder = new StringArray.Builder();
+ StringArray.Builder constraintFkDbSchemaBuilder = new StringArray.Builder();
+ StringArray.Builder constraintFkTableBuilder = new StringArray.Builder();
+ StringArray.Builder constraintFkColumnNameBuilder = new StringArray.Builder();
+ ArrowBuffer.BitmapBuilder nullBitmapBuffer = new ArrowBuffer.BitmapBuilder();
+ int length = 0;
- string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE WHERE constraint_name = '{2}'",
- Sanitize(catalog), Sanitize(dbSchema), Sanitize(constraintName));
+ string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE WHERE constraint_name = '{2}'",
+ Sanitize(catalog), Sanitize(dbSchema), Sanitize(constraintName));
- BigQueryResults? result = ExecuteQuery(query, parameters: null);
+ BigQueryResults? result = ExecuteQuery(query, parameters: null);
- if (result != null)
- {
- foreach (BigQueryRow row in result)
+ if (result != null)
{
- string constraint_catalog = GetValue(row["constraint_catalog"]);
- string constraint_schema = GetValue(row["constraint_schema"]);
- string table_name = GetValue(row["table_name"]);
- string column_name = GetValue(row["column_name"]);
-
- constraintFkCatalogBuilder.Append(constraint_catalog);
- constraintFkDbSchemaBuilder.Append(constraint_schema);
- constraintFkTableBuilder.Append(table_name);
- constraintFkColumnNameBuilder.Append(column_name);
-
- nullBitmapBuffer.Append(true);
- length++;
+ foreach (BigQueryRow row in result)
+ {
+ string constraint_catalog = GetValue(row["constraint_catalog"]);
+ string constraint_schema = GetValue(row["constraint_schema"]);
+ string table_name = GetValue(row["table_name"]);
+ string column_name = GetValue(row["column_name"]);
+
+ constraintFkCatalogBuilder.Append(constraint_catalog);
+ constraintFkDbSchemaBuilder.Append(constraint_schema);
+ constraintFkTableBuilder.Append(table_name);
+ constraintFkColumnNameBuilder.Append(column_name);
+
+ nullBitmapBuffer.Append(true);
+ length++;
+ }
}
- }
- IArrowArray[] dataArrays = new IArrowArray[]
- {
- constraintFkCatalogBuilder.Build(),
- constraintFkDbSchemaBuilder.Build(),
- constraintFkTableBuilder.Build(),
- constraintFkColumnNameBuilder.Build()
- };
- StandardSchemas.UsageSchema.Validate(dataArrays);
+ IArrowArray[] dataArrays = new IArrowArray[]
+ {
+ constraintFkCatalogBuilder.Build(),
+ constraintFkDbSchemaBuilder.Build(),
+ constraintFkTableBuilder.Build(),
+ constraintFkColumnNameBuilder.Build()
+ };
+ StandardSchemas.UsageSchema.Validate(dataArrays);
- return new StructArray(
- new StructType(StandardSchemas.UsageSchema),
- length,
- dataArrays,
- nullBitmapBuffer.Build());
+ return new StructArray(
+ new StructType(StandardSchemas.UsageSchema),
+ length,
+ dataArrays,
+ nullBitmapBuffer.Build());
+ });
}
private string PatternToRegEx(string? pattern)
@@ -989,22 +1071,25 @@ private XdbcDataType ToXdbcDataType(string type)
public override Schema GetTableSchema(string? catalog, string? dbSchema, string tableName)
{
- string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{2}'",
+ return this.TraceActivity(activity =>
+ {
+ string query = string.Format("SELECT * FROM `{0}`.`{1}`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{2}'",
Sanitize(catalog), Sanitize(dbSchema), Sanitize(tableName));
- BigQueryResults? result = ExecuteQuery(query, parameters: null);
+ BigQueryResults? result = ExecuteQuery(query, parameters: null);
- List fields = new List();
+ List fields = new List();
- if (result != null)
- {
- foreach (BigQueryRow row in result)
+ if (result != null)
{
- fields.Add(DescToField(row));
+ foreach (BigQueryRow row in result)
+ {
+ fields.Add(DescToField(row));
+ }
}
- }
- return new Schema(fields, null);
+ return new Schema(fields, null);
+ });
}
private Field DescToField(BigQueryRow row)
@@ -1014,7 +1099,7 @@ private Field DescToField(BigQueryRow row)
metaData.Add("ORDINAL_POSITION", GetValue(row["ordinal_position"]));
metaData.Add("DATA_TYPE", GetValue(row["data_type"]));
- Field.Builder fieldBuilder = SchemaFieldGenerator(GetValue(row["column_name"]).ToLower(), GetValue(row["data_type"]));
+ Field.Builder fieldBuilder = SchemaFieldGenerator(GetValue(row["column_name"]), GetValue(row["data_type"]));
fieldBuilder.Metadata(metaData);
if (!GetValue(row["is_nullable"]).Equals("YES", StringComparison.OrdinalIgnoreCase))
@@ -1022,7 +1107,7 @@ private Field DescToField(BigQueryRow row)
fieldBuilder.Nullable(false);
}
- fieldBuilder.Name(GetValue(row["column_name"]).ToLower());
+ fieldBuilder.Name(GetValue(row["column_name"]));
return fieldBuilder.Build();
}
@@ -1163,6 +1248,7 @@ private Dictionary ParseOptions()
BigQueryParameters.AllowLargeResults,
BigQueryParameters.UseLegacySQL,
BigQueryParameters.LargeDecimalsAsString,
+ BigQueryParameters.LargeResultsDataset,
BigQueryParameters.LargeResultsDestinationTable,
BigQueryParameters.GetQueryResultsOptionsTimeout,
BigQueryParameters.MaxFetchConcurrency,
diff --git a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs
index 45125856b7..f0002c609e 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryParameters.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryParameters.cs
@@ -15,6 +15,9 @@
* limitations under the License.
*/
+using System;
+using System.Collections.Generic;
+
namespace Apache.Arrow.Adbc.Drivers.BigQuery
{
///
@@ -23,29 +26,48 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
internal class BigQueryParameters
{
public const string AccessToken = "adbc.bigquery.access_token";
+ public const string AllowLargeResults = "adbc.bigquery.allow_large_results";
public const string AudienceUri = "adbc.bigquery.audience_uri";
- public const string ProjectId = "adbc.bigquery.project_id";
+ public const string AuthenticationType = "adbc.bigquery.auth_type";
public const string BillingProjectId = "adbc.bigquery.billing_project_id";
public const string ClientId = "adbc.bigquery.client_id";
public const string ClientSecret = "adbc.bigquery.client_secret";
- public const string RefreshToken = "adbc.bigquery.refresh_token";
- public const string AuthenticationType = "adbc.bigquery.auth_type";
+ public const string ClientTimeout = "adbc.bigquery.client.timeout";
+ public const string EvaluationKind = "adbc.bigquery.multiple_statement.evaluation_kind";
+ public const string GetQueryResultsOptionsTimeout = "adbc.bigquery.get_query_results_options.timeout";
+ public const string IncludeConstraintsWithGetObjects = "adbc.bigquery.include_constraints_getobjects";
+ public const string IncludePublicProjectId = "adbc.bigquery.include_public_project_id";
public const string JsonCredential = "adbc.bigquery.auth_json_credential";
- public const string AllowLargeResults = "adbc.bigquery.allow_large_results";
- public const string LargeResultsDestinationTable = "adbc.bigquery.large_results_destination_table";
- public const string UseLegacySQL = "adbc.bigquery.use_legacy_sql";
public const string LargeDecimalsAsString = "adbc.bigquery.large_decimals_as_string";
- public const string Scopes = "adbc.bigquery.scopes";
- public const string IncludeConstraintsWithGetObjects = "adbc.bigquery.include_constraints_getobjects";
- public const string ClientTimeout = "adbc.bigquery.client.timeout";
+ public const string LargeResultsDataset = "adbc.bigquery.large_results_dataset";
+ public const string LargeResultsDestinationTable = "adbc.bigquery.large_results_destination_table";
+ public const string MaxFetchConcurrency = "adbc.bigquery.max_fetch_concurrency";
public const string MaximumRetryAttempts = "adbc.bigquery.maximum_retries";
+ public const string ProjectId = "adbc.bigquery.project_id";
+ public const string RefreshToken = "adbc.bigquery.refresh_token";
public const string RetryDelayMs = "adbc.bigquery.retry_delay_ms";
- public const string GetQueryResultsOptionsTimeout = "adbc.bigquery.get_query_results_options.timeout";
- public const string MaxFetchConcurrency = "adbc.bigquery.max_fetch_concurrency";
- public const string IncludePublicProjectId = "adbc.bigquery.include_public_project_id";
- public const string StatementType = "adbc.bigquery.multiple_statement.statement_type";
+ public const string Scopes = "adbc.bigquery.scopes";
public const string StatementIndex = "adbc.bigquery.multiple_statement.statement_index";
- public const string EvaluationKind = "adbc.bigquery.multiple_statement.evaluation_kind";
+ public const string StatementType = "adbc.bigquery.multiple_statement.statement_type";
+ public const string UseLegacySQL = "adbc.bigquery.use_legacy_sql";
+
+ // these values are safe to log any time
+ private static HashSet safeToLog = new HashSet(StringComparer.OrdinalIgnoreCase)
+ {
+ AllowLargeResults, AuthenticationType, BillingProjectId, ClientId, ClientTimeout,
+ EvaluationKind, GetQueryResultsOptionsTimeout, IncludeConstraintsWithGetObjects,
+ IncludePublicProjectId, LargeDecimalsAsString, LargeResultsDataset, LargeResultsDestinationTable,
+ MaxFetchConcurrency, MaximumRetryAttempts, ProjectId, RetryDelayMs, StatementIndex,
+ StatementType, UseLegacySQL
+ };
+
+ public static bool IsSafeToLog(string name)
+ {
+ if (safeToLog.Contains(name))
+ return true;
+
+ return false;
+ }
}
///
@@ -68,5 +90,11 @@ internal class BigQueryConstants
// default value per https://pkg.go.dev/cloud.google.com/go/bigquery#section-readme
public const string DetectProjectId = "*detect-project-id*";
+
+ // Reuse what the ODBC driver already has in place, in case a caller
+ // has permission issues trying to create a new dataset
+ public const string DefaultLargeDatasetId = "_bqodbc_temp_tables";
+
+ public const string PublicProjectId = "bigquery-public-data";
}
}
diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
index 356260c97f..a148e0f22c 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs
@@ -17,13 +17,16 @@
using System;
using System.Collections.Generic;
+using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using Apache.Arrow.Types;
+using Google;
using Google.Api.Gax;
using Google.Apis.Auth.OAuth2;
using Google.Apis.Bigquery.v2.Data;
@@ -37,11 +40,11 @@ namespace Apache.Arrow.Adbc.Drivers.BigQuery
///
/// BigQuery-specific implementation of
///
- class BigQueryStatement : AdbcStatement, ITokenProtectedResource, IDisposable
+ class BigQueryStatement : TracingStatement, ITokenProtectedResource, IDisposable
{
readonly BigQueryConnection bigQueryConnection;
- public BigQueryStatement(BigQueryConnection bigQueryConnection)
+ public BigQueryStatement(BigQueryConnection bigQueryConnection) : base(bigQueryConnection)
{
if (bigQueryConnection == null) { throw new AdbcException($"{nameof(bigQueryConnection)} cannot be null", AdbcStatusCode.InvalidArgument); }
@@ -63,6 +66,10 @@ public BigQueryStatement(BigQueryConnection bigQueryConnection)
private int RetryDelayMs => this.bigQueryConnection.RetryDelayMs;
+ public override string AssemblyVersion => BigQueryUtils.BigQueryAssemblyVersion;
+
+ public override string AssemblyName => BigQueryUtils.BigQueryAssemblyName;
+
public override void SetOption(string key, string value)
{
if (Options == null)
@@ -80,122 +87,129 @@ public override QueryResult ExecuteQuery()
private async Task ExecuteQueryInternalAsync()
{
- QueryOptions queryOptions = ValidateOptions();
- BigQueryJob job = await Client.CreateQueryJobAsync(SqlQuery, null, queryOptions);
-
- JobReference jobReference = job.Reference;
- GetQueryResultsOptions getQueryResultsOptions = new GetQueryResultsOptions();
-
- if (Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out string? timeoutSeconds) == true &&
- int.TryParse(timeoutSeconds, out int seconds) &&
- seconds >= 0)
+ return await this.TraceActivity(async activity =>
{
- getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
- }
+ QueryOptions queryOptions = ValidateOptions(activity);
- // We can't checkJobStatus, Otherwise, the timeout in QueryResultsOptions is meaningless.
- // When encountering a long-running job, it should be controlled by the timeout in the Google SDK instead of blocking in a while loop.
- Func> getJobResults = async () =>
- {
- // if the authentication token was reset, then we need a new job with the latest token
- BigQueryJob completedJob = await Client.GetJobAsync(jobReference);
- return await completedJob.GetQueryResultsAsync(getQueryResultsOptions);
- };
+ activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, SqlQuery, BigQueryUtils.IsSafeToTrace());
- BigQueryResults results = await ExecuteWithRetriesAsync(getJobResults);
+ BigQueryJob job = await Client.CreateQueryJobAsync(SqlQuery, null, queryOptions);
- TokenProtectedReadClientManger clientMgr = new TokenProtectedReadClientManger(Credential);
- clientMgr.UpdateToken = () => Task.Run(() =>
- {
- this.bigQueryConnection.SetCredential();
- clientMgr.UpdateCredential(Credential);
- });
+ JobReference jobReference = job.Reference;
+ GetQueryResultsOptions getQueryResultsOptions = new GetQueryResultsOptions();
- // For multi-statement queries, StatementType == "SCRIPT"
- if (results.TableReference == null || job.Statistics.Query.StatementType.Equals("SCRIPT", StringComparison.OrdinalIgnoreCase))
- {
- string statementType = string.Empty;
- if (Options?.TryGetValue(BigQueryParameters.StatementType, out string? statementTypeString) == true)
+ if (Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out string? timeoutSeconds) == true &&
+ int.TryParse(timeoutSeconds, out int seconds) &&
+ seconds >= 0)
{
- statementType = statementTypeString;
+ getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
+ activity?.AddBigQueryParameterTag(BigQueryParameters.GetQueryResultsOptionsTimeout, seconds);
}
- int statementIndex = 1;
- if (Options?.TryGetValue(BigQueryParameters.StatementIndex, out string? statementIndexString) == true &&
- int.TryParse(statementIndexString, out int statementIndexInt) &&
- statementIndexInt > 0)
+
+ // We can't checkJobStatus, Otherwise, the timeout in QueryResultsOptions is meaningless.
+ // When encountering a long-running job, it should be controlled by the timeout in the Google SDK instead of blocking in a while loop.
+ Func> getJobResults = async () =>
{
- statementIndex = statementIndexInt;
- }
- string evaluationKind = string.Empty;
- if (Options?.TryGetValue(BigQueryParameters.EvaluationKind, out string? evaluationKindString) == true)
+ // if the authentication token was reset, then we need a new job with the latest token
+ BigQueryJob completedJob = await Client.GetJobAsync(jobReference);
+ return await completedJob.GetQueryResultsAsync(getQueryResultsOptions);
+ };
+
+ BigQueryResults results = await ExecuteWithRetriesAsync(getJobResults, activity);
+
+ TokenProtectedReadClientManger clientMgr = new TokenProtectedReadClientManger(Credential);
+ clientMgr.UpdateToken = () => Task.Run(() =>
{
- evaluationKind = evaluationKindString;
- }
+ this.bigQueryConnection.SetCredential();
+ clientMgr.UpdateCredential(Credential);
+ });
- Func> getMultiJobResults = async () =>
+ // For multi-statement queries, StatementType == "SCRIPT"
+ if (results.TableReference == null || job.Statistics.Query.StatementType.Equals("SCRIPT", StringComparison.OrdinalIgnoreCase))
{
- // To get the results of all statements in a multi-statement query, enumerate the child jobs. Related public docs: https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements.
- // Can filter by StatementType and EvaluationKind. Related public docs: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2, https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#evaluationkind
- ListJobsOptions listJobsOptions = new ListJobsOptions();
- listJobsOptions.ParentJobId = results.JobReference.JobId;
- var joblist = Client.ListJobs(listJobsOptions)
- .Select(job => Client.GetJob(job.Reference))
- .Where(job => string.IsNullOrEmpty(evaluationKind) || job.Statistics.ScriptStatistics.EvaluationKind.Equals(evaluationKind, StringComparison.OrdinalIgnoreCase))
- .Where(job => string.IsNullOrEmpty(statementType) || job.Statistics.Query.StatementType.Equals(statementType, StringComparison.OrdinalIgnoreCase))
- .OrderBy(job => job.Resource.Statistics.CreationTime)
- .ToList();
-
- if (joblist.Count > 0)
+ string statementType = string.Empty;
+ if (Options?.TryGetValue(BigQueryParameters.StatementType, out string? statementTypeString) == true)
+ {
+ statementType = statementTypeString;
+ }
+ int statementIndex = 1;
+ if (Options?.TryGetValue(BigQueryParameters.StatementIndex, out string? statementIndexString) == true &&
+ int.TryParse(statementIndexString, out int statementIndexInt) &&
+ statementIndexInt > 0)
+ {
+ statementIndex = statementIndexInt;
+ }
+ string evaluationKind = string.Empty;
+ if (Options?.TryGetValue(BigQueryParameters.EvaluationKind, out string? evaluationKindString) == true)
{
- if (statementIndex < 1 || statementIndex > joblist.Count)
+ evaluationKind = evaluationKindString;
+ }
+
+ Func> getMultiJobResults = async () =>
+ {
+ // To get the results of all statements in a multi-statement query, enumerate the child jobs. Related public docs: https://cloud.google.com/bigquery/docs/multi-statement-queries#get_all_executed_statements.
+ // Can filter by StatementType and EvaluationKind. Related public docs: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobstatistics2, https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#evaluationkind
+ ListJobsOptions listJobsOptions = new ListJobsOptions();
+ listJobsOptions.ParentJobId = results.JobReference.JobId;
+ var joblist = Client.ListJobs(listJobsOptions)
+ .Select(job => Client.GetJob(job.Reference))
+ .Where(job => string.IsNullOrEmpty(evaluationKind) || job.Statistics.ScriptStatistics.EvaluationKind.Equals(evaluationKind, StringComparison.OrdinalIgnoreCase))
+ .Where(job => string.IsNullOrEmpty(statementType) || job.Statistics.Query.StatementType.Equals(statementType, StringComparison.OrdinalIgnoreCase))
+ .OrderBy(job => job.Resource.Statistics.CreationTime)
+ .ToList();
+
+ if (joblist.Count > 0)
{
- throw new ArgumentOutOfRangeException($"The specified index {statementIndex} is out of range. There are {joblist.Count} jobs available.");
+ if (statementIndex < 1 || statementIndex > joblist.Count)
+ {
+ throw new ArgumentOutOfRangeException($"The specified index {statementIndex} is out of range. There are {joblist.Count} jobs available.");
+ }
+ return await joblist[statementIndex - 1].GetQueryResultsAsync(getQueryResultsOptions);
}
- return await joblist[statementIndex - 1].GetQueryResultsAsync(getQueryResultsOptions);
- }
- throw new AdbcException($"Unable to obtain result from statement [{statementIndex}]", AdbcStatusCode.InvalidData);
- };
+ throw new AdbcException($"Unable to obtain result from statement [{statementIndex}]", AdbcStatusCode.InvalidData);
+ };
- results = await ExecuteWithRetriesAsync(getMultiJobResults);
- }
+ results = await ExecuteWithRetriesAsync(getMultiJobResults, activity);
+ }
- if (results?.TableReference == null)
- {
- throw new AdbcException("There is no query statement");
- }
+ if (results?.TableReference == null)
+ {
+ throw new AdbcException("There is no query statement");
+ }
- string table = $"projects/{results.TableReference.ProjectId}/datasets/{results.TableReference.DatasetId}/tables/{results.TableReference.TableId}";
+ string table = $"projects/{results.TableReference.ProjectId}/datasets/{results.TableReference.DatasetId}/tables/{results.TableReference.TableId}";
- int maxStreamCount = 1;
+ int maxStreamCount = 1;
- if (Options?.TryGetValue(BigQueryParameters.MaxFetchConcurrency, out string? maxStreamCountString) == true)
- {
- if (int.TryParse(maxStreamCountString, out int count))
+ if (Options?.TryGetValue(BigQueryParameters.MaxFetchConcurrency, out string? maxStreamCountString) == true)
{
- if (count >= 0)
+ if (int.TryParse(maxStreamCountString, out int count))
{
- maxStreamCount = count;
+ if (count >= 0)
+ {
+ maxStreamCount = count;
+ }
}
}
- }
-
- ReadSession rs = new ReadSession { Table = table, DataFormat = DataFormat.Arrow };
- Func> createReadSession = () => clientMgr.ReadClient.CreateReadSessionAsync("projects/" + results.TableReference.ProjectId, rs, maxStreamCount);
+ ReadSession rs = new ReadSession { Table = table, DataFormat = DataFormat.Arrow };
- ReadSession rrs = await ExecuteWithRetriesAsync(createReadSession);
+ Func> createReadSession = () => clientMgr.ReadClient.CreateReadSessionAsync("projects/" + results.TableReference.ProjectId, rs, maxStreamCount);
- long totalRows = results.TotalRows == null ? -1L : (long)results.TotalRows.Value;
+ ReadSession rrs = await ExecuteWithRetriesAsync(createReadSession, activity);
- var readers = rrs.Streams
- .Select(s => ReadChunkWithRetries(clientMgr, s.Name))
- .Where(chunk => chunk != null)
- .Cast();
+ long totalRows = results.TotalRows == null ? -1L : (long)results.TotalRows.Value;
- IArrowArrayStream stream = new MultiArrowReader(TranslateSchema(results.Schema), readers);
+ var readers = rrs.Streams
+ .Select(s => ReadChunkWithRetries(clientMgr, s.Name, activity))
+ .Where(chunk => chunk != null)
+ .Cast();
- return new QueryResult(totalRows, stream);
+ IArrowArrayStream stream = new MultiArrowReader(this, TranslateSchema(results.Schema), readers);
+ activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, totalRows);
+ return new QueryResult(totalRows, stream);
+ });
}
public override UpdateResult ExecuteUpdate()
@@ -205,21 +219,28 @@ public override UpdateResult ExecuteUpdate()
private async Task ExecuteUpdateInternalAsync()
{
- GetQueryResultsOptions getQueryResultsOptions = new GetQueryResultsOptions();
-
- if (Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out string? timeoutSeconds) == true &&
- int.TryParse(timeoutSeconds, out int seconds) &&
- seconds >= 0)
+ return await this.TraceActivity(async activity =>
{
- getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
- }
+ GetQueryResultsOptions getQueryResultsOptions = new GetQueryResultsOptions();
- // Cannot set destination table in jobs with DDL statements, otherwise an error will be prompted
- Func> func = () => Client.ExecuteQueryAsync(SqlQuery, null, null, getQueryResultsOptions);
- BigQueryResults? result = await ExecuteWithRetriesAsync(func);
- long updatedRows = result?.NumDmlAffectedRows.HasValue == true ? result.NumDmlAffectedRows.Value : -1L;
+ if (Options?.TryGetValue(BigQueryParameters.GetQueryResultsOptionsTimeout, out string? timeoutSeconds) == true &&
+ int.TryParse(timeoutSeconds, out int seconds) &&
+ seconds >= 0)
+ {
+ getQueryResultsOptions.Timeout = TimeSpan.FromSeconds(seconds);
+ activity?.AddBigQueryParameterTag(BigQueryParameters.GetQueryResultsOptionsTimeout, seconds);
+ }
+
+ activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, SqlQuery, BigQueryUtils.IsSafeToTrace());
+
+ // Cannot set destination table in jobs with DDL statements, otherwise an error will be prompted
+ Func> func = () => Client.ExecuteQueryAsync(SqlQuery, null, null, getQueryResultsOptions);
+ BigQueryResults? result = await ExecuteWithRetriesAsync(func, activity);
+ long updatedRows = result?.NumDmlAffectedRows.HasValue == true ? result.NumDmlAffectedRows.Value : -1L;
- return new UpdateResult(updatedRows);
+ activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, updatedRows);
+ return new UpdateResult(updatedRows);
+ });
}
private Schema TranslateSchema(TableSchema schema)
@@ -229,7 +250,13 @@ private Schema TranslateSchema(TableSchema schema)
private Field TranslateField(TableFieldSchema field)
{
- return new Field(field.Name, TranslateType(field), field.Mode == "NULLABLE");
+ List> metadata = new List>()
+ {
+ new KeyValuePair("BIGQUERY_TYPE", field.Type),
+ new KeyValuePair("BIGQUERY_MODE", field.Mode)
+ };
+
+ return new Field(field.Name, TranslateType(field), field.Mode == "NULLABLE", metadata);
}
private IArrowType TranslateType(TableFieldSchema field)
@@ -302,25 +329,27 @@ private IArrowType GetType(TableFieldSchema field, IArrowType type)
return type;
}
- private IArrowReader? ReadChunkWithRetries(TokenProtectedReadClientManger clientMgr, string streamName)
+ private IArrowReader? ReadChunkWithRetries(TokenProtectedReadClientManger clientMgr, string streamName, Activity? activity)
{
- Func> func = () => Task.FromResult(ReadChunk(clientMgr, streamName));
- return RetryManager.ExecuteWithRetriesAsync(clientMgr, func, MaxRetryAttempts, RetryDelayMs).GetAwaiter().GetResult();
+ Func> func = () => Task.FromResult(ReadChunk(clientMgr, streamName, activity));
+ return RetryManager.ExecuteWithRetriesAsync(clientMgr, func, activity, MaxRetryAttempts, RetryDelayMs).GetAwaiter().GetResult();
}
- private static IArrowReader? ReadChunk(TokenProtectedReadClientManger clientMgr, string streamName)
+ private static IArrowReader? ReadChunk(TokenProtectedReadClientManger clientMgr, string streamName, Activity? activity)
{
- return ReadChunk(clientMgr.ReadClient, streamName);
+ return ReadChunk(clientMgr.ReadClient, streamName, activity);
}
- private static IArrowReader? ReadChunk(BigQueryReadClient client, string streamName)
+ private static IArrowReader? ReadChunk(BigQueryReadClient client, string streamName, Activity? activity)
{
// Ideally we wouldn't need to indirect through a stream, but the necessary APIs in Arrow
// are internal. (TODO: consider changing Arrow).
+ activity?.AddConditionalBigQueryTag("read_stream", streamName, BigQueryUtils.IsSafeToTrace());
BigQueryReadClient.ReadRowsStream readRowsStream = client.ReadRows(new ReadRowsRequest { ReadStream = streamName });
IAsyncEnumerator enumerator = readRowsStream.GetResponseStream().GetAsyncEnumerator();
ReadRowsStream stream = new ReadRowsStream(enumerator);
+ activity?.AddBigQueryTag("read_stream.has_rows", stream.HasRows);
if (stream.HasRows)
{
@@ -332,12 +361,14 @@ private IArrowType GetType(TableFieldSchema field, IArrowType type)
}
}
- private QueryOptions ValidateOptions()
+ private QueryOptions ValidateOptions(Activity? activity)
{
QueryOptions options = new QueryOptions();
if (Client.ProjectId == BigQueryConstants.DetectProjectId)
{
+ activity?.AddBigQueryTag("client_project_id", BigQueryConstants.DetectProjectId);
+
// An error occurs when calling CreateQueryJob without the ID set,
// so use the first one that is found. This does not prevent from calling
// to other 'project IDs' (catalogs) with a query.
@@ -346,7 +377,7 @@ private QueryOptions ValidateOptions()
return Client?.ListProjects();
});
- PagedEnumerable? projects = ExecuteWithRetriesAsync?>(func).GetAwaiter().GetResult();
+ PagedEnumerable? projects = ExecuteWithRetriesAsync?>(func, activity).GetAwaiter().GetResult();
if (projects != null)
{
@@ -355,6 +386,9 @@ private QueryOptions ValidateOptions()
if (firstProjectId != null)
{
options.ProjectId = firstProjectId;
+ activity?.AddBigQueryTag("detected_client_project_id", firstProjectId);
+ // need to reopen the Client with the projectId specified
+ this.bigQueryConnection.Open(firstProjectId);
}
}
}
@@ -362,105 +396,205 @@ private QueryOptions ValidateOptions()
if (Options == null || Options.Count == 0)
return options;
+ string largeResultDatasetId = BigQueryConstants.DefaultLargeDatasetId;
+
foreach (KeyValuePair keyValuePair in Options)
{
- if (keyValuePair.Key == BigQueryParameters.AllowLargeResults)
+ switch (keyValuePair.Key)
{
- options.AllowLargeResults = true ? keyValuePair.Value.ToLower().Equals("true") : false;
- }
- if (keyValuePair.Key == BigQueryParameters.LargeResultsDestinationTable)
- {
- string destinationTable = keyValuePair.Value;
+ case BigQueryParameters.AllowLargeResults:
+ options.AllowLargeResults = true ? keyValuePair.Value.Equals("true", StringComparison.OrdinalIgnoreCase) : false;
+ activity?.AddBigQueryParameterTag(BigQueryParameters.AllowLargeResults, options.AllowLargeResults);
+ break;
+ case BigQueryParameters.LargeResultsDataset:
+ largeResultDatasetId = keyValuePair.Value;
+ activity?.AddBigQueryParameterTag(BigQueryParameters.LargeResultsDataset, largeResultDatasetId);
+ break;
+ case BigQueryParameters.LargeResultsDestinationTable:
+ string destinationTable = keyValuePair.Value;
- if (!destinationTable.Contains("."))
- throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} is invalid");
+ if (!destinationTable.Contains("."))
+ throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} is invalid");
- string projectId = string.Empty;
- string datasetId = string.Empty;
- string tableId = string.Empty;
+ string projectId = string.Empty;
+ string datasetId = string.Empty;
+ string tableId = string.Empty;
- string[] segments = destinationTable.Split('.');
+ string[] segments = destinationTable.Split('.');
- if (segments.Length != 3)
- throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} cannot be parsed");
+ if (segments.Length != 3)
+ throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} cannot be parsed");
- projectId = segments[0];
- datasetId = segments[1];
- tableId = segments[2];
+ projectId = segments[0];
+ datasetId = segments[1];
+ tableId = segments[2];
- if (string.IsNullOrEmpty(projectId.Trim()) || string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim()))
- throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} contains invalid values");
+ if (string.IsNullOrEmpty(projectId.Trim()) || string.IsNullOrEmpty(datasetId.Trim()) || string.IsNullOrEmpty(tableId.Trim()))
+ throw new InvalidOperationException($"{BigQueryParameters.LargeResultsDestinationTable} contains invalid values");
- options.DestinationTable = new TableReference()
+ options.DestinationTable = new TableReference()
+ {
+ ProjectId = projectId,
+ DatasetId = datasetId,
+ TableId = tableId
+ };
+ activity?.AddBigQueryParameterTag(BigQueryParameters.LargeResultsDestinationTable, destinationTable);
+ break;
+ case BigQueryParameters.UseLegacySQL:
+ options.UseLegacySql = true ? keyValuePair.Value.Equals("true", StringComparison.OrdinalIgnoreCase) : false;
+ activity?.AddBigQueryParameterTag(BigQueryParameters.UseLegacySQL, options.UseLegacySql);
+ break;
+ }
+ }
+
+ if (options.AllowLargeResults == true && options.DestinationTable == null)
+ {
+ options.DestinationTable = TryGetLargeDestinationTableReference(largeResultDatasetId, activity);
+ }
+
+ return options;
+ }
+
+ ///
+ /// Attempts to retrieve or create the specified dataset.
+ ///
+ /// The name of the dataset.
+ /// A to a randomly generated table name in the specified dataset.
+ private TableReference TryGetLargeDestinationTableReference(string datasetId, Activity? activity)
+ {
+ BigQueryDataset? dataset = null;
+
+ try
+ {
+ activity?.AddBigQueryTag("large_results.dataset.try_find", datasetId);
+ dataset = this.Client.GetDataset(datasetId);
+ activity?.AddBigQueryTag("large_results.dataset.found", datasetId);
+ }
+ catch (GoogleApiException gaEx)
+ {
+ if (gaEx.HttpStatusCode != System.Net.HttpStatusCode.NotFound)
+ {
+ activity?.AddException(gaEx);
+ throw new AdbcException($"Failure trying to retrieve dataset {datasetId}", gaEx);
+ }
+ }
+
+ if (dataset == null)
+ {
+ try
+ {
+ activity?.AddBigQueryTag("large_results.dataset.try_create", datasetId);
+ DatasetReference reference = this.Client.GetDatasetReference(datasetId);
+ BigQueryDataset bigQueryDataset = new BigQueryDataset(this.Client, new Dataset()
{
- ProjectId = projectId,
- DatasetId = datasetId,
- TableId = tableId
- };
+ DatasetReference = reference,
+ DefaultTableExpirationMs = (long)TimeSpan.FromDays(1).TotalMilliseconds,
+ Labels = new Dictionary()
+ {
+ // lower case, no spaces or periods per https://cloud.google.com/bigquery/docs/labels-intro
+ { "created_by", this.bigQueryConnection.DriverName.ToLowerInvariant().Replace(" ","_") + "_v_" + AssemblyVersion.Replace(".","_") }
+ }
+ });
+
+ dataset = this.Client.CreateDataset(datasetId, bigQueryDataset.Resource);
+ activity?.AddBigQueryTag("large_results.dataset.created", datasetId);
}
- if (keyValuePair.Key == BigQueryParameters.UseLegacySQL)
+ catch (Exception ex)
{
- options.UseLegacySql = true ? keyValuePair.Value.ToLower().Equals("true") : false;
+ activity?.AddException(ex);
+ throw new AdbcException($"Could not create dataset {datasetId}", ex);
}
}
- return options;
+
+ if (dataset == null)
+ {
+ throw new AdbcException($"Could not find dataset {datasetId}", AdbcStatusCode.NotFound);
+ }
+ else
+ {
+ TableReference reference = new TableReference()
+ {
+ ProjectId = this.Client.ProjectId,
+ DatasetId = datasetId,
+ TableId = "lg_" + Guid.NewGuid().ToString().Replace("-", "")
+ };
+
+ activity?.AddBigQueryTag("large_results.table_reference", reference.ToString());
+
+ return reference;
+ }
}
public bool TokenRequiresUpdate(Exception ex) => BigQueryUtils.TokenRequiresUpdate(ex);
- private async Task ExecuteWithRetriesAsync(Func> action) => await RetryManager.ExecuteWithRetriesAsync(this, action, MaxRetryAttempts, RetryDelayMs);
+ private async Task ExecuteWithRetriesAsync(Func> action, Activity? activity) => await RetryManager.ExecuteWithRetriesAsync(this, action, activity, MaxRetryAttempts, RetryDelayMs);
- private class MultiArrowReader : IArrowArrayStream
+ private class MultiArrowReader : TracingReader
{
+ private static readonly string s_assemblyName = BigQueryUtils.GetAssemblyName(typeof(BigQueryStatement));
+ private static readonly string s_assemblyVersion = BigQueryUtils.GetAssemblyVersion(typeof(BigQueryStatement));
+
readonly Schema schema;
IEnumerator? readers;
IArrowReader? reader;
- public MultiArrowReader(Schema schema, IEnumerable readers)
+ public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable readers) : base(statement)
{
this.schema = schema;
this.readers = readers.GetEnumerator();
}
- public Schema Schema { get { return this.schema; } }
+ public override Schema Schema { get { return this.schema; } }
+
+ public override string AssemblyVersion => s_assemblyVersion;
- public async ValueTask ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+ public override string AssemblyName => s_assemblyName;
+
+ public override async ValueTask ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
- if (this.readers == null)
+ return await this.TraceActivityAsync(async activity =>
{
- return null;
- }
+ if (this.readers == null)
+ {
+ return null;
+ }
- while (true)
- {
- if (this.reader == null)
+ while (true)
{
- if (!this.readers.MoveNext())
+ if (this.reader == null)
{
- Dispose(); // TODO: Remove this line
- return null;
+ if (!this.readers.MoveNext())
+ {
+ Dispose(); // TODO: Remove this line
+ return null;
+ }
+ this.reader = this.readers.Current;
}
- this.reader = this.readers.Current;
- }
- RecordBatch result = await this.reader.ReadNextRecordBatchAsync(cancellationToken);
+ RecordBatch result = await this.reader.ReadNextRecordBatchAsync(cancellationToken);
- if (result != null)
- {
- return result;
- }
+ if (result != null)
+ {
+ return result;
+ }
- this.reader = null;
- }
+ this.reader = null;
+ }
+ });
}
- public void Dispose()
+ protected override void Dispose(bool disposing)
{
- if (this.readers != null)
+ if (disposing)
{
- this.readers.Dispose();
- this.readers = null;
+ if (this.readers != null)
+ {
+ this.readers.Dispose();
+ this.readers = null;
+ }
}
+
+ base.Dispose(disposing);
}
}
diff --git a/csharp/src/Drivers/BigQuery/BigQueryUtils.cs b/csharp/src/Drivers/BigQuery/BigQueryUtils.cs
index 3b3167b4e3..b8fca804f0 100644
--- a/csharp/src/Drivers/BigQuery/BigQueryUtils.cs
+++ b/csharp/src/Drivers/BigQuery/BigQueryUtils.cs
@@ -16,6 +16,7 @@
*/
using System;
+using System.Diagnostics;
using Google;
namespace Apache.Arrow.Adbc.Drivers.BigQuery
@@ -33,5 +34,26 @@ public static bool TokenRequiresUpdate(Exception ex)
return result;
}
+
+ internal static string BigQueryAssemblyName = GetAssemblyName(typeof(BigQueryConnection));
+
+ internal static string BigQueryAssemblyVersion = GetAssemblyVersion(typeof(BigQueryConnection));
+
+ internal static string GetAssemblyName(Type type) => type.Assembly.GetName().Name!;
+
+ internal static string GetAssemblyVersion(Type type) => FileVersionInfo.GetVersionInfo(type.Assembly.Location).ProductVersion ?? string.Empty;
+
+ ///
+ /// Conditional used to determines if it is safe to trace
+ ///
+ ///
+ /// It is safe to write to some output types (ie, files) but not others (ie, a shared resource).
+ ///
+ ///
+ internal static bool IsSafeToTrace()
+ {
+ // TODO: Add logic to determine if a file writer is listening
+ return false;
+ }
}
}
diff --git a/csharp/src/Drivers/BigQuery/RetryManager.cs b/csharp/src/Drivers/BigQuery/RetryManager.cs
index a205e62ef1..6848fbcc1e 100644
--- a/csharp/src/Drivers/BigQuery/RetryManager.cs
+++ b/csharp/src/Drivers/BigQuery/RetryManager.cs
@@ -17,6 +17,7 @@
*/
using System;
+using System.Diagnostics;
using System.Threading.Tasks;
namespace Apache.Arrow.Adbc.Drivers.BigQuery
@@ -29,6 +30,7 @@ internal class RetryManager
public static async Task ExecuteWithRetriesAsync(
ITokenProtectedResource tokenProtectedResource,
Func> action,
+ Activity? activity,
int maxRetries = 5,
int initialDelayMilliseconds = 200)
{
@@ -49,6 +51,9 @@ public static async Task ExecuteWithRetriesAsync(
}
catch (Exception ex)
{
+ activity?.AddBigQueryTag("retry_attempt", retryCount);
+ activity?.AddException(ex);
+
retryCount++;
if (retryCount >= maxRetries)
{
@@ -56,6 +61,7 @@ public static async Task ExecuteWithRetriesAsync(
{
if (tokenProtectedResource?.TokenRequiresUpdate(ex) == true)
{
+ activity?.AddBigQueryTag("update_token.status", "Expired");
throw new AdbcException($"Cannot update access token after {maxRetries} tries", AdbcStatusCode.Unauthenticated, ex);
}
}
@@ -67,7 +73,9 @@ public static async Task ExecuteWithRetriesAsync(
{
if (tokenProtectedResource.TokenRequiresUpdate(ex) == true)
{
+ activity?.AddBigQueryTag("update_token.status", "Required");
await tokenProtectedResource.UpdateToken();
+ activity?.AddBigQueryTag("update_token.status", "Completed");
}
}
diff --git a/csharp/src/Drivers/BigQuery/readme.md b/csharp/src/Drivers/BigQuery/readme.md
index 1ff14d8164..d49018afaf 100644
--- a/csharp/src/Drivers/BigQuery/readme.md
+++ b/csharp/src/Drivers/BigQuery/readme.md
@@ -85,6 +85,9 @@ The following parameters can be used to configure the driver behavior. The param
**adbc.bigquery.include_public_project_id**
Include the `bigquery-public-data` project ID with the list of project IDs.
+**adbc.bigquery.large_results_dataset**
+ Optional. Sets the dataset ID to use for large results. The dataset needs to be in the same region as the data being queried. If no value is specified, the driver will attempt to use or create `_bqodbc_temp_tables`. A randomly generated table name will be used for the DestinationTable.
+
**adbc.bigquery.large_results_destination_table**
Optional. Sets the [DestinationTable](https://cloud.google.com/dotnet/docs/reference/Google.Cloud.BigQuery.V2/latest/Google.Cloud.BigQuery.V2.QueryOptions#Google_Cloud_BigQuery_V2_QueryOptions_DestinationTable) value of the QueryOptions if configured. Expects the format to be `{projectId}.{datasetId}.{tableId}` to set the corresponding values in the [TableReference](https://github.com/googleapis/google-api-dotnet-client/blob/6c415c73788b848711e47c6dd33c2f93c76faf97/Src/Generated/Google.Apis.Bigquery.v2/Google.Apis.Bigquery.v2.cs#L9348) class.
@@ -145,3 +148,52 @@ connection.UpdateToken = () => Task.Run(() =>
```
In the sample above, when a new token is needed, the delegate is invoked and updates the `adbc.bigquery.access_token` parameter on the connection object.
+
+## Default Project ID
+
+If a `adbc.bigquery.project_id` is not specified, or if it equals `bigquery-public-data`, the driver will query for the first project ID that is associated with the credentials provided. This will be the project ID that is used to perform queries.
+
+## Large Results
+
+If a result set will contain large results, the `adbc.bigquery.allow_large_results` parameter should be set to `"true"`. If this value is set, a destination must be specified.
+The caller can either explicitly specify the fully qualified name of the destination table using the `adbc.bigquery.large_results_destination_table` value, or they can specify
+a dataset using the `adbc.bigquery.large_results_dataset` parameter.
+
+Behavior:
+- If a destination table is explicitly set, the driver will use that value.
+- If only a dataset value is set, the driver will attempt to retrieve the dataset. If the dataset does not exist, the driver will attempt to
+ create it. The default table expiration will be set to 1 day and a `created_by` label will be included with the driver name and version that created the dataset. For example `created_by : adbc_bigquery_driver_v_0_19_0_0`. A randomly generated name will be used for the table name.
+- If a destination table and a dataset are not specified, the driver will attempt to use or create the `_bqodbc_temp_tables` dataset using the same defaults and label specified above. A randomly generated name will be used for the table name.
+
+## Permissions
+
+The ADBC driver uses the BigQuery Client APIs to communicate with BigQuery. The following actions are performed in the driver and require the calling user to have the specified permissions. For more details on the permissions, or what roles may already have the permissions required, please see the additional references section below.
+
+|Action|Permissions Required
+|:----------|:-------------|
+|Create Dataset*+|bigquery.datasets.create|
+|Create Query Job|bigquery.jobs.create|
+|Create Read Session|bigquery.readsessions.create
bigquery.tables.getData|
+|Execute Query|bigquery.jobs.create
bigquery.jobs.get
bigquery.jobs.list|
+|Get Dataset*|bigquery.datasets.get|
+|Get Job|bigquery.jobs.get|
+|Get Query Results|bigquery.jobs.get|
+|List Jobs|bigquery.jobs.list|
+|Read Rows|bigquery.readsessions.getData|
+
+
+*Only for large result sets
++If a specified dataset does not already exist.
+
+
+
+
+Some environments may also require:
+- [VPC Service Controls](https://cloud.google.com/vpc-service-controls/docs/troubleshooting)
+- [Service Usage Consumer](https://cloud.google.com/service-usage/docs/access-control#serviceusage.serviceUsageConsumer) permissions
+
+**Additional References**:
+- [BigQuery IAM roles and permissions | Google Cloud](https://cloud.google.com/bigquery/docs/access-control)
+- [Running jobs programmatically | BigQuery | Google Cloud](https://cloud.google.com/bigquery/docs/running-jobs)
+- [Create datasets | BigQuery | Google Cloud](https://cloud.google.com/bigquery/docs/datasets#required_permissions)
+- [Use the BigQuery Storage Read API to read table data | Google Cloud](https://cloud.google.com/bigquery/docs/reference/storage/#permissions)
diff --git a/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs b/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs
index 66a8dcd69e..17071465b2 100644
--- a/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs
+++ b/csharp/test/Drivers/BigQuery/BigQueryTestConfiguration.cs
@@ -71,6 +71,9 @@ public BigQueryTestEnvironment()
[JsonPropertyName("allowLargeResults")]
public bool AllowLargeResults { get; set; }
+ [JsonPropertyName("largeResultsDataset")]
+ public string LargeResultsDataset { get; set; } = string.Empty;
+
[JsonPropertyName("largeResultsDestinationTable")]
public string LargeResultsDestinationTable { get; set; } = string.Empty;
diff --git a/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs b/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs
index bc0afdcea6..e3590212ee 100644
--- a/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs
+++ b/csharp/test/Drivers/BigQuery/BigQueryTestingUtils.cs
@@ -145,6 +145,11 @@ internal static Dictionary GetBigQueryParameters(BigQueryTestEnv
parameters.Add(BigQueryParameters.IncludePublicProjectId, testEnvironment.IncludePublicProjectId.ToString());
+ if (!string.IsNullOrEmpty(testEnvironment.LargeResultsDataset))
+ {
+ parameters.Add(BigQueryParameters.LargeResultsDataset, testEnvironment.LargeResultsDataset);
+ }
+
if (!string.IsNullOrEmpty(testEnvironment.LargeResultsDestinationTable))
{
parameters.Add(BigQueryParameters.LargeResultsDestinationTable, testEnvironment.LargeResultsDestinationTable);