Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csharp/src/Drivers/Apache): tracing - work-in-progress #5

Draft
wants to merge 68 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
27c808c
optel poc - wip #1
birschick-bq Oct 31, 2024
2384ed4
Initial POC for using System.Diagnostics
birschick-bq Nov 7, 2024
3824499
Merge branch 'main' into dev/birschick-bq/open-telemetry-poc
birschick-bq Nov 7, 2024
351ced6
remove go code
birschick-bq Nov 8, 2024
49f9a72
moved tracing to Connection. Removed from Database/Driver
birschick-bq Nov 8, 2024
4af814a
moved tracing to Connection. Removed from Database/Driver
birschick-bq Nov 8, 2024
9bf476c
add comments about listener lifetimes
birschick-bq Nov 8, 2024
2be0142
remove go code
birschick-bq Nov 8, 2024
4b7997e
remove updates to tests
birschick-bq Nov 8, 2024
1b87b4f
update exception handling
birschick-bq Nov 8, 2024
1264d78
remove go
birschick-bq Nov 8, 2024
753038e
more instrumentation and exception handling
birschick-bq Nov 8, 2024
5749d7a
improved modularization
birschick-bq Nov 8, 2024
b30524f
remove unnecessary updates
birschick-bq Nov 8, 2024
cfbebb3
Merge branch 'main' into dev/birschick-bq/open-telemetry-poc
birschick-bq Nov 8, 2024
b77490b
updates from review comments to move the tracing code to make easier …
birschick-bq Nov 14, 2024
c4837c4
some clean-up and documentation
birschick-bq Nov 14, 2024
ea7b873
fix trailing space
birschick-bq Nov 14, 2024
a379b23
small update and tracing instrumentation for SparkDatabricksReader
birschick-bq Nov 14, 2024
c018d39
refactored TracingConnection to use common implementation. Added supp…
birschick-bq Nov 14, 2024
d935264
refactored TraceException
birschick-bq Nov 14, 2024
4d874bc
corrected implementation
birschick-bq Nov 14, 2024
40a067d
refactor tracing base
birschick-bq Nov 16, 2024
acc0a1b
remove unnecessary changes
birschick-bq Nov 16, 2024
b2d9c90
remove unnecessary changes
birschick-bq Nov 17, 2024
95658bf
improve listener behaviour. fix missing members
birschick-bq Nov 17, 2024
2625e14
complete comment
birschick-bq Nov 17, 2024
fb74cf1
update version 1.1 connection/statement
birschick-bq Nov 17, 2024
ee11da1
started activity in SparkDatabricksReader
birschick-bq Nov 17, 2024
6f0a759
fixed null handling in TracingBase
birschick-bq Nov 17, 2024
6a6fc3d
add better support for cancellation and retry
birschick-bq Nov 17, 2024
dd78038
refactor to use OpenTelemetry Exporter
birschick-bq Nov 19, 2024
381dce5
clean-up code, documentation, simplify locking
birschick-bq Nov 20, 2024
30ad831
modify the default logging folder
birschick-bq Nov 20, 2024
dc19887
add file exporter tests. Updated driver/database/connection/statement…
birschick-bq Nov 21, 2024
f2d7499
correct validation and tests
birschick-bq Nov 21, 2024
e7bea68
test information
birschick-bq Nov 21, 2024
9941b04
correct file size check and tests
birschick-bq Nov 22, 2024
7d7c7ce
correct tests
birschick-bq Nov 22, 2024
60646ff
correct tests
birschick-bq Nov 22, 2024
879b41d
correct tests
birschick-bq Nov 22, 2024
6c1d330
correct tests - larger size and range
birschick-bq Nov 22, 2024
65cbc95
correct tests - invalid file characters
birschick-bq Nov 22, 2024
d072479
correct tests - max trace files
birschick-bq Nov 22, 2024
1d68327
correct tests - single trace file
birschick-bq Nov 22, 2024
a3b9584
refactor where tracing options are handled. handle options for file s…
birschick-bq Nov 22, 2024
40dad00
remove tracing of all properties/options - might include tokens and p…
birschick-bq Nov 22, 2024
a3d84c9
validate trace folder is writable
birschick-bq Nov 22, 2024
be09ea8
ensure folder before folder validation
birschick-bq Nov 22, 2024
94ae794
update Spark driver documentation for file tracing options
birschick-bq Nov 22, 2024
fa2dca6
Merge branch 'main' into dev/birschick-bq/open-telemetry-poc
birschick-bq Nov 27, 2024
04e19c1
Merge branch 'main' into dev/birschick-bq/open-telemetry-poc
birschick-bq Nov 29, 2024
9c6a420
initial commit for setting the traceParent
birschick-bq Nov 29, 2024
69d4345
small fixes for setting the traceParent
birschick-bq Nov 29, 2024
afd7ea6
small doc updates
birschick-bq Nov 29, 2024
8bd877a
refactored how boiler place code is handled
birschick-bq Nov 30, 2024
7b58323
some renameing and coumentation
birschick-bq Nov 30, 2024
015bc90
fixed formatting
birschick-bq Nov 30, 2024
cb79fa7
add improved test for traceParent.
birschick-bq Dec 3, 2024
4ac1789
Merge branch 'main' into dev/birschick-bq/open-telemetry-poc
birschick-bq Dec 3, 2024
cb3b6f2
update tests
birschick-bq Dec 3, 2024
50e16e4
Improvement for output format and ingestion. Improvements for tracePa…
birschick-bq Dec 5, 2024
1cd083f
Add extension methods to ease of creating Events and Links
birschick-bq Dec 5, 2024
9f182b4
Documentation update
birschick-bq Dec 5, 2024
12882ab
Add public property for ActivitySourceName
birschick-bq Dec 5, 2024
600735d
Merge branch 'main' into dev/birschick-bq/open-telemetry-poc
birschick-bq Dec 10, 2024
fe60b80
Merge branch 'main' into dev/birschick-bq/open-telemetry-poc
birschick-bq Dec 10, 2024
1ccc858
Merge branch 'main' into dev/birschick-bq/open-telemetry-poc
birschick-bq Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions csharp/src/Apache.Arrow.Adbc/Apache.Arrow.Adbc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Apache.Arrow" Version="18.0.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
<PackageReference Include="System.Text.Json" Version="8.0.5" />
</ItemGroup>
<ItemGroup Condition="$([MSBuild]::IsTargetFrameworkCompatible($(TargetFramework), 'net6.0'))">
<Compile Remove="C\NativeLibrary.cs" />
Expand Down
23 changes: 16 additions & 7 deletions csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using System.Text;
using System.Threading;
using Apache.Arrow.Adbc.Extensions;
using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.C;
using Apache.Arrow.Ipc;

Expand Down Expand Up @@ -399,7 +400,7 @@ public unsafe override AdbcConnection Connect(IReadOnlyDictionary<string, string

caller.Call(Driver.ConnectionInit, ref nativeConnection, ref _nativeDatabase);

result = new ImportedAdbcConnection(_driver, nativeConnection);
result = new ImportedAdbcConnection(_driver, nativeConnection, options);
}
}
finally
Expand Down Expand Up @@ -462,16 +463,18 @@ private unsafe void Dispose(bool disposing)
/// <summary>
/// A native implementation of <see cref="AdbcConnection"/>
/// </summary>
internal sealed class ImportedAdbcConnection : AdbcConnection
internal sealed class ImportedAdbcConnection : TracingConnection
{
private static readonly string s_tracingBaseName = typeof(ImportedAdbcConnection).FullName!;
private readonly ImportedAdbcDriver _driver;
private CAdbcConnection _nativeConnection;
private bool _disposed;
private bool? _autoCommit;
private IsolationLevel? _isolationLevel;
private bool? _readOnly;

internal ImportedAdbcConnection(ImportedAdbcDriver driver, CAdbcConnection nativeConnection)
internal ImportedAdbcConnection(ImportedAdbcDriver driver, CAdbcConnection nativeConnection, IReadOnlyDictionary<string, string>? properties)
: base(properties)
{
_driver = driver.AddReference();
_nativeConnection = nativeConnection;
Expand Down Expand Up @@ -521,6 +524,8 @@ public override bool ReadOnly
}
}

public override string TracingBaseName => s_tracingBaseName;

public unsafe override AdbcStatement CreateStatement()
{
CAdbcStatement nativeStatement = new CAdbcStatement();
Expand All @@ -540,7 +545,7 @@ public unsafe override AdbcStatement CreateStatement()
(connection, &nativeStatement, &caller._error));
}

result = new ImportedAdbcStatement(_driver, nativeStatement);
result = new ImportedAdbcStatement(_driver, nativeStatement, ActivitySource);
}
}
finally
Expand Down Expand Up @@ -713,7 +718,7 @@ public override void Dispose()
GC.SuppressFinalize(this);
}

private unsafe void Dispose(bool disposing)
protected override unsafe void Dispose(bool disposing)
{
if (!_disposed)
{
Expand Down Expand Up @@ -745,14 +750,16 @@ private unsafe void Dispose(bool disposing)
/// <summary>
/// A native implementation of <see cref="AdbcStatement"/>
/// </summary>
internal sealed class ImportedAdbcStatement : AdbcStatement
internal sealed class ImportedAdbcStatement : TracingStatement
{
private static readonly string s_tracingBaseName = typeof(ImportedAdbcStatement).FullName!;
private ImportedAdbcDriver _driver;
private CAdbcStatement _nativeStatement;
private byte[]? _substraitPlan;
private bool _disposed;

internal ImportedAdbcStatement(ImportedAdbcDriver driver, CAdbcStatement nativeStatement)
internal ImportedAdbcStatement(ImportedAdbcDriver driver, CAdbcStatement nativeStatement, ActivitySource? activitySource)
: base(activitySource)
{
_driver = driver.AddReference();
_nativeStatement = nativeStatement;
Expand Down Expand Up @@ -804,6 +811,8 @@ public unsafe override byte[]? SubstraitPlan
}
}

public override string TracingBaseName => s_tracingBaseName;

public unsafe override void Bind(RecordBatch batch, Schema schema)
{
using (CallHelper caller = new CallHelper())
Expand Down
46 changes: 46 additions & 0 deletions csharp/src/Apache.Arrow.Adbc/Tracing/ITracingObject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Diagnostics;

namespace Apache.Arrow.Adbc.Tracing
{
/// <summary>
/// Provides interface for a tracing object.
/// </summary>
public interface ITracingObject
{
/// <summary>
/// Gets the <see cref="System.Diagnostics.ActivitySource"/> for the current object. The value may be null if the containing object
/// did not set this property - typically because tracing was not enabled.
/// </summary>
ActivitySource? ActivitySource { get; }

/// <summary>
/// Starts a new activity with a given name.
/// </summary>
/// <param name="methodName"></param>
/// <returns>Returns a new <see cref="System.Diagnostics.Activity"/> instance if there is an active listener for the <see cref="ActivitySource"/>.
/// It returns <c>null</c>, otherwise.</returns>
Activity? StartActivity(string methodName);

/// <summary>
/// Gets the base name of the tracing object. Typically this is the full name of the class that is tracing.
/// </summary>
string TracingBaseName { get; }
}
}
37 changes: 37 additions & 0 deletions csharp/src/Apache.Arrow.Adbc/Tracing/TracingArrowArrayStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Ipc;

namespace Apache.Arrow.Adbc.Tracing
{
/// <summary>
/// Provides tracing capability for objects that need to implement the <see cref="IArrowArrayStream"/> interface.
/// </summary>
/// <param name="activitySource"></param>
public abstract class TracingArrowArrayStream(ActivitySource? activitySource) : TracingObject(activitySource), IArrowArrayStream
{
public abstract ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default);

public abstract void Dispose();

public abstract Schema Schema { get; }
}
}
171 changes: 171 additions & 0 deletions csharp/src/Apache.Arrow.Adbc/Tracing/TracingConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Reflection;
using System.Text.Json;
using System.Threading.Tasks;

namespace Apache.Arrow.Adbc.Tracing
{
/// <summary>
/// Provides an <see cref="AdbcConnection"> which can instrument tracing activity.
/// </summary>
public abstract class TracingConnection : AdbcConnection, ITracingObject
{
private bool _disposed = false;
internal const string ProductVersionDefault = "1.0.0";
private static readonly string s_activitySourceName = Assembly.GetExecutingAssembly().GetName().Name!;
private static readonly string s_assemblyVersion = GetProductVersion();
private static readonly ConcurrentDictionary<string, ActivityListener> s_listeners = new();
private readonly ConcurrentQueue<Activity> _activityQueue = new();
private readonly IReadOnlyDictionary<string, string>? _options;
private DirectoryInfo? _traceDirectory;

protected TracingConnection(IReadOnlyDictionary<string, string>? options = default)
{
_options = options ?? new Dictionary<string, string>();
EnsureTracing();
}

protected TracingConnection(bool isTracingEnabled, string traceLocation, int traceMaxFileSizeKb, int traceMaxFiles)
{
var options = new Dictionary<string, string>();
options[TracingOptions.Connection.Trace] = isTracingEnabled.ToString();
options[TracingOptions.Connection.TraceLocation] = traceLocation;
options[TracingOptions.Connection.TraceFileMaxSizeKb] = traceMaxFileSizeKb.ToString();
options[TracingOptions.Connection.TraceFileMaxFiles] = traceMaxFiles.ToString();
_options = options;
EnsureTracing();
}

public ActivitySource? ActivitySource { get; private set; }

protected static string GetProductVersion()
{
FileVersionInfo fileVersionInfo = FileVersionInfo.GetVersionInfo(Assembly.GetExecutingAssembly().Location);
return fileVersionInfo.ProductVersion ?? ProductVersionDefault;
}

private void EnsureTracing()
{
if (true || _options.TryGetValue(TracingOptions.Connection.Trace, out string? traceOption) && bool.TryParse(traceOption, out bool traceEnabled))
{

// TODO: Handle exceptions
if (_options?.TryGetValue(TracingOptions.Connection.TraceLocation, out string? traceLocation) != true || !Directory.Exists(traceLocation))
{
string? traceLocationDefault = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
_traceDirectory = new DirectoryInfo(traceLocationDefault);
}
else
{
// TODO: If not exist, try to create
_traceDirectory = new DirectoryInfo(traceLocation);
}
// TODO: Check if folder is writable


// TODO: Determine the best handling of listener lifetimes.
// Key of listeners collection should be ouput file location
ActivityListener listener = s_listeners.GetOrAdd(s_activitySourceName + "." + _traceDirectory.FullName, (_) => new()
{
ShouldListenTo = (source) => source.Name == s_activitySourceName,
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStarted = OnActivityStarted,
ActivityStopped = OnActivityStopped,
SampleUsingParentId = (ref ActivityCreationOptions<string> options) => ActivitySamplingResult.AllDataAndRecorded,
});
// This is a singleton add, if the lister is the same.
ActivitySource.AddActivityListener(listener);
// THis is an new instance and needs to be disposed later
ActivitySource = new(s_activitySourceName, s_assemblyVersion);
}
}

private void OnActivityStarted(Activity activity)
{
_activityQueue.Enqueue(activity);
// Intentionally avoid await.
DequeueAndWrite("started")
.ContinueWith(t => Console.WriteLine(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}

private void OnActivityStopped(Activity activity)
{
_activityQueue.Enqueue(activity);
// Intentionally avoid await.
DequeueAndWrite("stopped")
.ContinueWith(t => Console.WriteLine(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}

// TODO: Encapsulate this separately
private Task DequeueAndWrite(string state)
{
if (_activityQueue.TryDequeue(out Activity? activity))
{
if (activity != null)
{
try
{
string json = JsonSerializer.Serialize(new { State = state, Activity = activity });
Console.WriteLine(json);
}
catch (NotSupportedException ex)
{
Console.WriteLine(ex.Message);
}
}
}

return Task.CompletedTask;
}

public Activity? StartActivity(string methodName)
{
return StartActivity(ActivitySource, TracingBaseName, methodName);
}

public abstract string TracingBaseName { get; }

protected internal static Activity? StartActivity(ActivitySource? activitySource, string typeName, string methodName) =>
activitySource?.StartActivity(typeName + "." + methodName);

protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
ActivitySource?.Dispose();
}
_disposed = true;
}
}

public override void Dispose()
{
Dispose(true);
base.Dispose();
GC.SuppressFinalize(this);
}
}
}
37 changes: 37 additions & 0 deletions csharp/src/Apache.Arrow.Adbc/Tracing/TracingObject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Diagnostics;

namespace Apache.Arrow.Adbc.Tracing
{
/// <summary>
/// Provides an implementation of the <see cref="ITracingObject"/> interface.
/// </summary>
/// <param name="activitySource">The <see cref="System.Diagnostics.ActivitySource"/> to trace on.</param>
public abstract class TracingObject(ActivitySource? activitySource) : ITracingObject
{
public ActivitySource? ActivitySource { get; private set; } = activitySource;

public Activity? StartActivity(string methodName)
{
return TracingConnection.StartActivity(ActivitySource, TracingBaseName, methodName);
}

public abstract string TracingBaseName { get; }
}
}
Loading
Loading