Skip to content

Commit aae84d2

Browse files
feat(csharp/src/Drivers/Databricks): Primary Key and Foreign Key Metadata Optimization (#2886)
## Arrow ADBC: Primary Key and Foreign Key Metadata Optimization ### Description This PR adds support for optimizing Primary Key and Foreign Key metadata queries in the C# Databricks ADBC driver. It introduces a new connection parameter `adbc.databricks.enable_pk_fk` that allows users to control whether the driver should make PK/FK metadata calls to the server or return empty results for improved performance. ### Background Primary Key and Foreign Key metadata queries can be expensive operations, particularly in Databricks environments where they may not be fully supported in certain catalogs. This implementation provides a way to optimize these operations by: 1. Allowing users to disable PK/FK metadata calls entirely via configuration 2. Automatically returning empty results for legacy catalogs (SPARK, hive_metastore) where PK/FK metadata is not supported 3. Ensuring that empty results maintain schema compatibility with real metadata responses ### Proposed Changes - Add new connection parameter `adbc.databricks.enable_pk_fk` to control PK/FK metadata behavior (default: true) - Implement special handling for legacy catalogs (SPARK, hive_metastore) to return empty results without server calls - Modify method visibility in base classes to allow proper overriding in derived classes - Add comprehensive test coverage for the new functionality ### How is this tested? Added unit tests that verify: 1. The correct behavior of the `ShouldReturnEmptyPkFkResult` method with various combinations of settings 2. Schema compatibility between empty results and real metadata responses 3. Proper handling of different catalog scenarios These tests ensure that the optimization works correctly while maintaining compatibility with client applications that expect consistent schema structures.
1 parent 4556c16 commit aae84d2

File tree

5 files changed

+240
-3
lines changed

5 files changed

+240
-3
lines changed

csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ private async Task<QueryResult> ExecuteMetadataCommandQuery(CancellationToken ca
368368
};
369369
}
370370

371-
private async Task<QueryResult> GetCrossReferenceAsync(CancellationToken cancellationToken = default)
371+
protected virtual async Task<QueryResult> GetCrossReferenceAsync(CancellationToken cancellationToken = default)
372372
{
373373
TGetCrossReferenceResp resp = await Connection.GetCrossReferenceAsync(
374374
CatalogName,
@@ -383,7 +383,7 @@ private async Task<QueryResult> GetCrossReferenceAsync(CancellationToken cancell
383383
return await GetQueryResult(resp.DirectResults, cancellationToken);
384384
}
385385

386-
private async Task<QueryResult> GetPrimaryKeysAsync(CancellationToken cancellationToken = default)
386+
protected virtual async Task<QueryResult> GetPrimaryKeysAsync(CancellationToken cancellationToken = default)
387387
{
388388
TGetPrimaryKeysResp resp = await Connection.GetPrimaryKeysAsync(
389389
CatalogName,

csharp/src/Drivers/Databricks/DatabricksConnection.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ internal class DatabricksConnection : SparkHttpConnection
4040
private bool _applySSPWithQueries = false;
4141
private bool _enableDirectResults = true;
4242
private bool _enableMultipleCatalogSupport = true;
43+
private bool _enablePKFK = true;
4344

4445
internal static TSparkGetDirectResults defaultGetDirectResults = new()
4546
{
@@ -71,6 +72,18 @@ protected override TCLIService.IAsync CreateTCLIServiceClient(TProtocol protocol
7172

7273
private void ValidateProperties()
7374
{
75+
if (Properties.TryGetValue(DatabricksParameters.EnablePKFK, out string? enablePKFKStr))
76+
{
77+
if (bool.TryParse(enablePKFKStr, out bool enablePKFKValue))
78+
{
79+
_enablePKFK = enablePKFKValue;
80+
}
81+
else
82+
{
83+
throw new ArgumentException($"Parameter '{DatabricksParameters.EnablePKFK}' value '{enablePKFKStr}' could not be parsed. Valid values are 'true', 'false'.");
84+
}
85+
}
86+
7487
if (Properties.TryGetValue(DatabricksParameters.EnableMultipleCatalogSupport, out string? enableMultipleCatalogSupportStr))
7588
{
7689
if (bool.TryParse(enableMultipleCatalogSupportStr, out bool enableMultipleCatalogSupportValue))
@@ -204,6 +217,11 @@ private void ValidateProperties()
204217
/// </summary>
205218
internal bool EnableMultipleCatalogSupport => _enableMultipleCatalogSupport;
206219

220+
/// <summary>
221+
/// Gets whether PK/FK metadata call is enabled
222+
/// </summary>
223+
public bool EnablePKFK => _enablePKFK;
224+
207225
/// <summary>
208226
/// Gets a value indicating whether to retry requests that receive a 503 response with a Retry-After header.
209227
/// </summary>

csharp/src/Drivers/Databricks/DatabricksParameters.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ public class DatabricksParameters : SparkParameters
155155
/// Default value is true if not specified.
156156
/// </summary>
157157
public const string EnableMultipleCatalogSupport = "adbc.databricks.enable_multiple_catalog_support";
158+
159+
/// <summary>
160+
/// Whether to enable primary key foreign key metadata call.
161+
/// Default value is true if not specified.
162+
/// </summary>
163+
public const string EnablePKFK = "adbc.databricks.enable_pk_fk";
158164
}
159165

160166
/// <summary>

csharp/src/Drivers/Databricks/DatabricksStatement.cs

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717

1818
using System;
19-
using System.Collections.Generic;
2019
using System.Threading;
2120
using System.Threading.Tasks;
2221
using Apache.Arrow.Adbc.Drivers.Apache;
@@ -37,6 +36,7 @@ internal class DatabricksStatement : SparkStatement, IHiveServer2Statement
3736
private bool canDecompressLz4;
3837
private long maxBytesPerFile;
3938
private bool enableMultipleCatalogSupport;
39+
private bool enablePKFK;
4040

4141
public DatabricksStatement(DatabricksConnection connection)
4242
: base(connection)
@@ -46,6 +46,7 @@ public DatabricksStatement(DatabricksConnection connection)
4646
canDecompressLz4 = connection.CanDecompressLz4;
4747
maxBytesPerFile = connection.MaxBytesPerFile;
4848
enableMultipleCatalogSupport = connection.EnableMultipleCatalogSupport;
49+
enablePKFK = connection.EnablePKFK;
4950
}
5051

5152
protected override void SetStatementProperties(TExecuteStatementReq statement)
@@ -386,5 +387,119 @@ protected override async Task<QueryResult> GetColumnsAsync(CancellationToken can
386387
// Call the base implementation with the potentially modified catalog name
387388
return await base.GetColumnsAsync(cancellationToken);
388389
}
390+
391+
/// <summary>
392+
/// Determines whether PK/FK metadata queries (GetPrimaryKeys/GetCrossReference) should return an empty result set without hitting the server.
393+
///
394+
/// Why:
395+
/// - For certain catalog names (null, empty, "SPARK", "hive_metastore"), Databricks does not support PK/FK metadata,
396+
/// or these are legacy/synthesized catalogs that should gracefully return empty results for compatibility.
397+
/// - The EnablePKFK flag allows the client to globally disable PK/FK metadata queries for performance or compatibility reasons.
398+
///
399+
/// What it does:
400+
/// - Returns true if PK/FK queries should return an empty result (and not hit the server), based on:
401+
/// - The EnablePKFK flag (if false, always return empty)
402+
/// - The catalog name (SPARK, hive_metastore, null, or empty string)
403+
/// - Returns false if the query should proceed to the server (for valid, supported catalogs).
404+
/// </summary>
405+
internal bool ShouldReturnEmptyPkFkResult()
406+
{
407+
if (!enablePKFK)
408+
return true;
409+
410+
// Handle special catalog cases
411+
if (string.IsNullOrEmpty(CatalogName) ||
412+
string.Equals(CatalogName, "SPARK", StringComparison.OrdinalIgnoreCase) ||
413+
string.Equals(CatalogName, "hive_metastore", StringComparison.OrdinalIgnoreCase))
414+
{
415+
return true;
416+
}
417+
418+
return false;
419+
}
420+
421+
protected override async Task<QueryResult> GetPrimaryKeysAsync(CancellationToken cancellationToken = default)
422+
{
423+
if (ShouldReturnEmptyPkFkResult())
424+
return EmptyPrimaryKeysResult();
425+
426+
return await base.GetPrimaryKeysAsync(cancellationToken);
427+
}
428+
429+
private QueryResult EmptyPrimaryKeysResult()
430+
{
431+
var fields = new[]
432+
{
433+
new Field("TABLE_CAT", StringType.Default, true),
434+
new Field("TABLE_SCHEM", StringType.Default, true),
435+
new Field("TABLE_NAME", StringType.Default, true),
436+
new Field("COLUMN_NAME", StringType.Default, true),
437+
new Field("KEQ_SEQ", Int32Type.Default, true),
438+
new Field("PK_NAME", StringType.Default, true)
439+
};
440+
var schema = new Schema(fields, null);
441+
442+
var arrays = new IArrowArray[]
443+
{
444+
new StringArray.Builder().Build(), // TABLE_CAT
445+
new StringArray.Builder().Build(), // TABLE_SCHEM
446+
new StringArray.Builder().Build(), // TABLE_NAME
447+
new StringArray.Builder().Build(), // COLUMN_NAME
448+
new Int16Array.Builder().Build(), // KEQ_SEQ
449+
new StringArray.Builder().Build() // PK_NAME
450+
};
451+
452+
return new QueryResult(0, new HiveServer2Connection.HiveInfoArrowStream(schema, arrays));
453+
}
454+
455+
protected override async Task<QueryResult> GetCrossReferenceAsync(CancellationToken cancellationToken = default)
456+
{
457+
if (ShouldReturnEmptyPkFkResult())
458+
return EmptyCrossReferenceResult();
459+
460+
return await base.GetCrossReferenceAsync(cancellationToken);
461+
}
462+
463+
private QueryResult EmptyCrossReferenceResult()
464+
{
465+
var fields = new[]
466+
{
467+
new Field("PKTABLE_CAT", StringType.Default, true),
468+
new Field("PKTABLE_SCHEM", StringType.Default, true),
469+
new Field("PKTABLE_NAME", StringType.Default, true),
470+
new Field("PKCOLUMN_NAME", StringType.Default, true),
471+
new Field("FKTABLE_CAT", StringType.Default, true),
472+
new Field("FKTABLE_SCHEM", StringType.Default, true),
473+
new Field("FKTABLE_NAME", StringType.Default, true),
474+
new Field("FKCOLUMN_NAME", StringType.Default, true),
475+
new Field("KEY_SEQ", Int16Type.Default, true),
476+
new Field("UPDATE_RULE", Int16Type.Default, true),
477+
new Field("DELETE_RULE", Int16Type.Default, true),
478+
new Field("FK_NAME", StringType.Default, true),
479+
new Field("PK_NAME", StringType.Default, true),
480+
new Field("DEFERRABILITY", Int16Type.Default, true)
481+
};
482+
var schema = new Schema(fields, null);
483+
484+
var arrays = new IArrowArray[]
485+
{
486+
new StringArray.Builder().Build(), // PKTABLE_CAT
487+
new StringArray.Builder().Build(), // PKTABLE_SCHEM
488+
new StringArray.Builder().Build(), // PKTABLE_NAME
489+
new StringArray.Builder().Build(), // PKCOLUMN_NAME
490+
new StringArray.Builder().Build(), // FKTABLE_CAT
491+
new StringArray.Builder().Build(), // FKTABLE_SCHEM
492+
new StringArray.Builder().Build(), // FKTABLE_NAME
493+
new StringArray.Builder().Build(), // FKCOLUMN_NAME
494+
new Int16Array.Builder().Build(), // KEY_SEQ
495+
new Int16Array.Builder().Build(), // UPDATE_RULE
496+
new Int16Array.Builder().Build(), // DELETE_RULE
497+
new StringArray.Builder().Build(), // FK_NAME
498+
new StringArray.Builder().Build(), // PK_NAME
499+
new Int16Array.Builder().Build() // DEFERRABILITY
500+
};
501+
502+
return new QueryResult(0, new HiveServer2Connection.HiveInfoArrowStream(schema, arrays));
503+
}
389504
}
390505
}

csharp/test/Drivers/Databricks/StatementTests.cs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,5 +652,103 @@ private void AssertField(Schema schema, int index, string expectedName, IArrowTy
652652
Assert.True(expectedType.Equals(field.DataType), $"Field {index} type mismatch");
653653
Assert.True(expectedNullable == field.IsNullable, $"Field {index} nullability mismatch");
654654
}
655+
656+
[Theory]
657+
[InlineData(false, "main", true)]
658+
[InlineData(true, null, true)]
659+
[InlineData(true, "", true)]
660+
[InlineData(true, "SPARK", true)]
661+
[InlineData(true, "hive_metastore", true)]
662+
[InlineData(true, "main", false)]
663+
public void ShouldReturnEmptyPkFkResult_WorksAsExpected(bool enablePKFK, string? catalogName, bool expected)
664+
{
665+
// Arrange: create test configuration and connection
666+
var testConfig = (DatabricksTestConfiguration)TestConfiguration.Clone();
667+
var connectionParams = new Dictionary<string, string>
668+
{
669+
[DatabricksParameters.EnablePKFK] = enablePKFK.ToString().ToLowerInvariant()
670+
};
671+
using var connection = NewConnection(testConfig, connectionParams);
672+
var statement = connection.CreateStatement();
673+
674+
// Set CatalogName using SetOption
675+
if(catalogName != null)
676+
{
677+
statement.SetOption(ApacheParameters.CatalogName, catalogName);
678+
}
679+
680+
// Act
681+
var result = ((DatabricksStatement)statement).ShouldReturnEmptyPkFkResult();
682+
683+
// Assert
684+
Assert.Equal(expected, result);
685+
}
686+
687+
[SkippableFact]
688+
public async Task PKFK_EmptyResult_SchemaMatches_RealMetadataResponse()
689+
{
690+
// Arrange: create test configuration and connection
691+
var testConfig = (DatabricksTestConfiguration)TestConfiguration.Clone();
692+
var connectionParams = new Dictionary<string, string>
693+
{
694+
[DatabricksParameters.EnablePKFK] = "true"
695+
};
696+
using var connection = NewConnection(testConfig, connectionParams);
697+
var statement = connection.CreateStatement();
698+
699+
// Get real PK metadata schema
700+
statement.SetOption(ApacheParameters.IsMetadataCommand, "true");
701+
statement.SetOption(ApacheParameters.CatalogName, "powerbi");
702+
statement.SetOption(ApacheParameters.SchemaName, TestConfiguration.Metadata.Schema);
703+
statement.SetOption(ApacheParameters.TableName, TestConfiguration.Metadata.Table);
704+
statement.SqlQuery = "GetPrimaryKeys";
705+
var realPkResult = await statement.ExecuteQueryAsync();
706+
Assert.NotNull(realPkResult.Stream);
707+
var realPkSchema = realPkResult.Stream.Schema;
708+
709+
// Get empty PK result schema (using SPARK catalog which should return empty)
710+
statement.SetOption(ApacheParameters.CatalogName, "SPARK");
711+
var emptyPkResult = await statement.ExecuteQueryAsync();
712+
Assert.NotNull(emptyPkResult.Stream);
713+
var emptyPkSchema = emptyPkResult.Stream.Schema;
714+
715+
// Verify PK schemas match
716+
Assert.Equal(realPkSchema.FieldsList.Count, emptyPkSchema.FieldsList.Count);
717+
for (int i = 0; i < realPkSchema.FieldsList.Count; i++)
718+
{
719+
var realField = realPkSchema.FieldsList[i];
720+
var emptyField = emptyPkSchema.FieldsList[i];
721+
AssertField(emptyField, realField.Name, realField.DataType, realField.IsNullable);
722+
}
723+
724+
// Get real FK metadata schema
725+
statement.SetOption(ApacheParameters.CatalogName, TestConfiguration.Metadata.Catalog);
726+
statement.SqlQuery = "GetCrossReference";
727+
var realFkResult = await statement.ExecuteQueryAsync();
728+
Assert.NotNull(realFkResult.Stream);
729+
var realFkSchema = realFkResult.Stream.Schema;
730+
731+
// Get empty FK result schema
732+
statement.SetOption(ApacheParameters.CatalogName, "SPARK");
733+
var emptyFkResult = await statement.ExecuteQueryAsync();
734+
Assert.NotNull(emptyFkResult.Stream);
735+
var emptyFkSchema = emptyFkResult.Stream.Schema;
736+
737+
// Verify FK schemas match
738+
Assert.Equal(realFkSchema.FieldsList.Count, emptyFkSchema.FieldsList.Count);
739+
for (int i = 0; i < realFkSchema.FieldsList.Count; i++)
740+
{
741+
var realField = realFkSchema.FieldsList[i];
742+
var emptyField = emptyFkSchema.FieldsList[i];
743+
AssertField(emptyField, realField.Name, realField.DataType, realField.IsNullable);
744+
}
745+
}
746+
747+
private void AssertField(Field field, string expectedName, IArrowType expectedType, bool expectedNullable)
748+
{
749+
Assert.True(expectedName.Equals(field.Name), $"Field name mismatch: expected {expectedName}, got {field.Name}");
750+
Assert.True(expectedType.Equals(field.DataType), $"Field type mismatch: expected {expectedType}, got {field.DataType}");
751+
Assert.True(expectedNullable == field.IsNullable, $"Field nullability mismatch: expected {expectedNullable}, got {field.IsNullable}");
752+
}
655753
}
656754
}

0 commit comments

Comments
 (0)