Skip to content

Commit d661507

Browse files
author
Todd Meng
committed
Parse decimal with UseArrowNativeType
1 parent bcd2d75 commit d661507

File tree

5 files changed

+71
-20
lines changed

5 files changed

+71
-20
lines changed

csharp/src/Drivers/Apache/Spark/SparkStatement.cs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,28 +32,9 @@ internal SparkStatement(SparkConnection connection)
3232

3333
protected override void SetStatementProperties(TExecuteStatementReq statement)
3434
{
35-
// TODO: Ensure this is set dynamically depending on server capabilities.
36-
statement.EnforceResultPersistenceMode = false;
37-
statement.ResultPersistenceMode = TResultPersistenceMode.ALL_RESULTS;
3835
// This seems like a good idea to have the server timeout so it doesn't keep processing unnecessarily.
3936
// Set in combination with a CancellationToken.
4037
statement.QueryTimeout = QueryTimeoutSeconds;
41-
statement.CanReadArrowResult = true;
42-
43-
#pragma warning disable CS0618 // Type or member is obsolete
44-
statement.ConfOverlay = SparkConnection.timestampConfig;
45-
#pragma warning restore CS0618 // Type or member is obsolete
46-
statement.UseArrowNativeTypes = new TSparkArrowTypes
47-
{
48-
TimestampAsArrow = true,
49-
DecimalAsArrow = true,
50-
51-
// set to false so they return as string
52-
// otherwise, they return as ARRAY_TYPE but you can't determine
53-
// the object type of the items in the array
54-
ComplexTypesAsArrow = false,
55-
IntervalTypesAsArrow = false,
56-
};
5738
}
5839

5940
public override string AssemblyName => s_assemblyName;

csharp/src/Drivers/Databricks/DatabricksConnection.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ internal class DatabricksConnection : SparkHttpConnection
5555
private bool _useCloudFetch = true;
5656
private bool _canDecompressLz4 = true;
5757
private long _maxBytesPerFile = DefaultMaxBytesPerFile;
58+
private bool _useArrowNativeTypes = true;
5859
private const bool DefaultRetryOnUnavailable = true;
5960
private const int DefaultTemporarilyUnavailableRetryTimeout = 900;
6061
private bool _useDescTableExtended = true;
@@ -264,6 +265,11 @@ private void ValidateProperties()
264265
/// </summary>
265266
internal bool CanDecompressLz4 => _canDecompressLz4;
266267

268+
/// <summary>
269+
/// Gets whether Arrow native types are used.
270+
/// </summary>
271+
internal bool UseArrowNativeTypes => _useArrowNativeTypes;
272+
267273
/// <summary>
268274
/// Gets the maximum bytes per file for CloudFetch.
269275
/// </summary>
@@ -392,10 +398,17 @@ internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, TGe
392398
isLz4Compressed = metadataResp.Lz4Compressed;
393399
}
394400

401+
// If we have a schema from the metadata response, reparse it with the statement's useArrowNativeTypes flag
402+
if (metadataResp?.Schema != null && schema != null)
403+
{
404+
var schemaParser = new DatabricksSchemaParser(databricksStatement.UseArrowNativeTypes);
405+
schema = schemaParser.GetArrowSchema(metadataResp.Schema, DataTypeConversion);
406+
}
407+
395408
return new DatabricksCompositeReader(databricksStatement, schema, isLz4Compressed, TlsOptions, _proxyConfigurator);
396409
}
397410

398-
internal override SchemaParser SchemaParser => new DatabricksSchemaParser();
411+
internal override SchemaParser SchemaParser => new DatabricksSchemaParser(_useArrowNativeTypes);
399412

400413
public override AdbcStatement CreateStatement()
401414
{
@@ -442,6 +455,7 @@ protected override async Task HandleOpenSessionResponse(TOpenSessionResp? sessio
442455
}
443456
_enablePKFK = _enablePKFK && FeatureVersionNegotiator.SupportsPKFK(version);
444457
_enableMultipleCatalogSupport = session.__isset.canUseMultipleCatalogs ? session.CanUseMultipleCatalogs : false;
458+
_useArrowNativeTypes = FeatureVersionNegotiator.SupportsArrowNativeTypes(version);
445459
if (session.__isset.initialNamespace)
446460
{
447461
_defaultNamespace = session.InitialNamespace;

csharp/src/Drivers/Databricks/DatabricksSchemaParser.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,25 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
2525
{
2626
internal class DatabricksSchemaParser : SchemaParser
2727
{
28+
private readonly bool _useArrowNativeTypes;
29+
30+
public DatabricksSchemaParser(bool useArrowNativeTypes = true)
31+
{
32+
_useArrowNativeTypes = useArrowNativeTypes;
33+
}
34+
2835
public override IArrowType GetArrowType(TPrimitiveTypeEntry thriftType, DataTypeConversion dataTypeConversion)
2936
{
37+
// For decimal types, check if we're using Arrow native types
38+
if (thriftType.Type == TTypeId.DECIMAL_TYPE && !_useArrowNativeTypes)
39+
{
40+
// When decimal is not returned as Arrow, it's returned as a string
41+
return StringType.Default;
42+
}
43+
44+
// For future expansion: handle other types that might be affected by useArrowNativeTypes
45+
// For example, timestamps, complex types, etc.
46+
3047
return thriftType.Type switch
3148
{
3249
TTypeId.BIGINT_TYPE => Int64Type.Default,

csharp/src/Drivers/Databricks/DatabricksStatement.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,32 @@ protected override void SetStatementProperties(TExecuteStatementReq statement)
6868
{
6969
base.SetStatementProperties(statement);
7070

71+
// Set Databricks-specific statement properties
72+
// TODO: Ensure this is set dynamically depending on server capabilities.
73+
statement.EnforceResultPersistenceMode = false;
74+
statement.ResultPersistenceMode = TResultPersistenceMode.ALL_RESULTS;
75+
statement.CanReadArrowResult = true;
76+
77+
#pragma warning disable CS0618 // Type or member is obsolete
78+
statement.ConfOverlay = SparkConnection.timestampConfig;
79+
#pragma warning restore CS0618 // Type or member is obsolete
80+
81+
// Set UseArrowNativeTypes based on protocol version
82+
if (UseArrowNativeTypes)
83+
{
84+
statement.UseArrowNativeTypes = new TSparkArrowTypes
85+
{
86+
TimestampAsArrow = true,
87+
DecimalAsArrow = true,
88+
89+
// set to false so they return as string
90+
// otherwise, they return as ARRAY_TYPE but you can't determine
91+
// the object type of the items in the array
92+
ComplexTypesAsArrow = false,
93+
IntervalTypesAsArrow = false,
94+
};
95+
}
96+
7197
// Set CloudFetch capabilities
7298
statement.CanDownloadResult = useCloudFetch;
7399
statement.CanDecompressLZ4Result = canDecompressLz4;
@@ -157,6 +183,12 @@ internal void SetUseCloudFetch(bool useCloudFetch)
157183
/// </summary>
158184
public bool CanDecompressLz4 => canDecompressLz4;
159185

186+
/// <summary>
187+
/// Gets whether to use Arrow native types.
188+
/// Returns true if the protocol version is greater than or equal to v5.
189+
/// </summary>
190+
public bool UseArrowNativeTypes => Connection.UseArrowNativeTypes;
191+
160192
/// <summary>
161193
/// Sets whether the client can decompress LZ4 compressed results.
162194
/// </summary>

csharp/src/Drivers/Databricks/FeatureVersionNegotiator.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ public static bool SupportsPKFK(TProtocolVersion protocolVersion) =>
8484
* - ArrowMetadata (field is not used)
8585
*/
8686

87+
/// <summary>
88+
/// Gets whether Arrow native types are supported.
89+
/// </summary>
90+
/// <param name="protocolVersion">The current protocol version.</param>
91+
/// <returns>True if Arrow native types are supported; otherwise, false.</returns>
92+
public static bool SupportsArrowNativeTypes(TProtocolVersion protocolVersion) =>
93+
SupportsProtocolVersion(protocolVersion, TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V5);
8794

8895
#endregion
8996

0 commit comments

Comments
 (0)