Skip to content

Commit

Permalink
fix: starttime and endtime for kusto queries
Browse files Browse the repository at this point in the history
  • Loading branch information
itsnotapt committed Mar 4, 2023
1 parent e037f30 commit c9ce5f3
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 141 deletions.
4 changes: 2 additions & 2 deletions backend/Tim.Backend/Models/KustoQuery/KustoQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ public sealed class KustoQuery : KustoClusterDatabase, IValidatableObject
/// Gets or sets start time for query to be executed.
/// </summary>
[JsonProperty("startTime")]
public DateTime StartTime { get; set; }
public DateTime? StartTime { get; set; }

/// <summary>
/// Gets or sets the end time for query to be executed.
/// </summary>
[JsonProperty("endTime")]
public DateTime EndTime { get; set; }
public DateTime? EndTime { get; set; }

/// <summary>
/// Gets or sets the query.
Expand Down
157 changes: 18 additions & 139 deletions backend/Tim.Backend/Providers/Kusto/KustoQueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

namespace Tim.Backend.Providers.Kusto
{
using System;
using System.Collections.Generic;
using System.Data;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -70,62 +70,26 @@ public async Task<object> ShowClusterSchemaAsync()
var result = await m_commandClient.ExecuteControlCommandAsync(
m_commandClient.DefaultDatabaseName,
".show schema as json");
return CreateUntypedResultsFromTable(result);
return CreateResultsFromTable(result);
}

/// <summary>
/// Execute query and get results.
/// </summary>
/// <typeparam name="T">Object that results will be parsed into.</typeparam>
/// <param name="query">Query to be executes.</param>
/// <param name="properties">Properties to run the query with, if not provided, use defaults.</param>
/// <returns>Query Results.</returns>
/// <exception cref="UnexpectedFrameException">Formatting error.</exception>
public async Task<KustoQueryResults<T>> ReadAsync<T>(KustoQuery query, ClientRequestProperties properties = null)
where T : class, new()
/// <inheritdoc/>
public async Task<KustoQueryResults<IDictionary<string, object>>> RunQuery(KustoQuery query, CancellationToken cancellationToken)
{
var queryResults = new List<T>();
KustoQueryStats queryStats = null;

using var frames = await ExecuteQueryAsync(query, properties);
m_logger.Information($"Executing kusto query on cluster {query.Cluster}.", "KustoQueryClient-RunQuery");
var properties = new ClientRequestProperties();

while (frames.MoveNext())
if (query.StartTime != null)
{
var frame = frames.Current;

if (frame.FrameType == FrameType.DataSetHeader)
{
continue;
}
else if (frame.FrameType == FrameType.DataTable)
{
var frameResults = HandleDataTable<T>(frame as ProgressiveDataSetDataTableFrame);
queryResults.AddRange(frameResults.QueryResults);
if (frameResults.QueryStats != null)
{
queryStats = frameResults.QueryStats;
}
}
else if (frame.FrameType == FrameType.DataSetCompletion)
{
continue;
}
else
{
var ex = new UnexpectedFrameException($"Received unexpected frame type `{frame.FrameType}`.");
m_logger.Error(ex, "Failed to perform operation {operation} with exception: {exception}", "KustoQueryClient-ReadAsync", ex);
throw ex;
}
properties.SetParameter("StartTime", query.StartTime.Value.ToString("o", CultureInfo.InvariantCulture));
}

return new KustoQueryResults<T>(queryResults, queryStats);
}
if (query.EndTime != null)
{
properties.SetParameter("EndTime", query.EndTime.Value.ToString("o", CultureInfo.InvariantCulture));
}

/// <inheritdoc/>
public async Task<KustoQueryResults<IDictionary<string, object>>> RunQuery(KustoQuery query, CancellationToken cancellationToken)
{
m_logger.Information($"Executing kusto query on cluster {query.Cluster}.", "KustoQueryClient-RunQuery");
var dataSet = await m_client.ExecuteQueryV2Async(query.Database, query.QueryText, new ClientRequestProperties());
var dataSet = await m_client.ExecuteQueryV2Async(query.Database, query.QueryText, properties);
var queryResults = new List<IDictionary<string, object>>();
KustoQueryStats queryStats = null;
var frames = dataSet.GetFrames();
Expand All @@ -140,7 +104,7 @@ public async Task<KustoQueryResults<IDictionary<string, object>>> RunQuery(Kusto
}
else if (frame.FrameType == FrameType.DataTable)
{
var frameResults = HandleDataTableUntyped(frame as ProgressiveDataSetDataTableFrame);
var frameResults = HandleDataTable(frame as ProgressiveDataSetDataTableFrame);
queryResults.AddRange(frameResults.QueryResults);
if (frameResults.QueryStats != null)
{
Expand All @@ -162,67 +126,7 @@ public async Task<KustoQueryResults<IDictionary<string, object>>> RunQuery(Kusto
return new KustoQueryResults<IDictionary<string, object>>(queryResults, queryStats);
}

private async Task<IEnumerator<ProgressiveDataSetFrame>> ExecuteQueryAsync(KustoQuery query, ClientRequestProperties properties = null)
{
ClientRequestProperties clientRequestProperties;
if (properties == null)
{
clientRequestProperties = new ClientRequestProperties()
{
// Provides breadcrumb in Kusto performance logs
ClientRequestId = $"TimBackendQuery;{Guid.NewGuid()}",
};
clientRequestProperties.SafeSetQueryResultsProgressiveEnabled(false);
}
else
{
clientRequestProperties = properties;
}

m_logger.Information($"Executing query using Kusto's ExecuteQueryV2Async. Client Request Id: {clientRequestProperties.ClientRequestId} ", "KustoQueryClient-ExecuteQueryAsync");
var dataSet = await m_client.ExecuteQueryV2Async(query.Database, query.QueryText, clientRequestProperties);
m_logger.Information($"Query completed. Client Request Id: {clientRequestProperties.ClientRequestId} ", "KustoQueryClient-ExecuteQueryAsync");
return dataSet.GetFrames();
}

/// <summary>
/// Parse query results.
/// </summary>
/// <typeparam name="T">object type that results will be parsed into.</typeparam>
/// <param name="frame">Data table frame returned by kusto.</param>
/// <returns>Query Results.</returns>
private KustoQueryResults<T> HandleDataTable<T>(ProgressiveDataSetDataTableFrame frame)
where T : class, new()
{
var queryResults = Enumerable.Empty<T>();
KustoQueryStats queryStats = null;

using var table = frame.TableData;
do
{
if (frame.TableKind == WellKnownDataSet.PrimaryResult)
{
queryResults = CreateObjectsFromTable<T>(table);
}
else if (frame.TableKind == WellKnownDataSet.QueryCompletionInformation)
{
queryStats = GetQueryStats(table);
}
else
{
while (table.Read())
{
// We need to materialize the results to continue to the next frame, but we don't care about the data.
// This is enforced by the Kusto.Data client, which throws an exception if we don't, though to be honest I'm not quite sure why.
}
}
}
while (table.NextResult());

return new KustoQueryResults<T>(queryResults, queryStats);
}

private KustoQueryResults<IDictionary<string, object>> HandleDataTableUntyped(ProgressiveDataSetDataTableFrame frame)
private static KustoQueryResults<IDictionary<string, object>> HandleDataTable(ProgressiveDataSetDataTableFrame frame)
{
var queryResults = Enumerable.Empty<IDictionary<string, object>>();
KustoQueryStats queryStats = null;
Expand All @@ -232,7 +136,7 @@ private KustoQueryResults<IDictionary<string, object>> HandleDataTableUntyped(Pr
{
if (frame.TableKind == WellKnownDataSet.PrimaryResult)
{
queryResults = CreateUntypedResultsFromTable(table);
queryResults = CreateResultsFromTable(table);
}
else if (frame.TableKind == WellKnownDataSet.QueryCompletionInformation)
{
Expand All @@ -252,32 +156,7 @@ private KustoQueryResults<IDictionary<string, object>> HandleDataTableUntyped(Pr
return new KustoQueryResults<IDictionary<string, object>>(queryResults, queryStats);
}

private IEnumerable<T> CreateObjectsFromTable<T>(IDataReader table)
where T : class, new()
{
var results = new List<T>();

while (table.Read())
{
var result = new T();

for (var i = 0; i < table.FieldCount; i++)
{
var propertyName = table.GetName(i);
var property = result
.GetType()
.GetProperty(propertyName);

property?.SetValue(result, table.GetValue(i));
}

results.Add(result);
}

return results;
}

private IEnumerable<IDictionary<string, object>> CreateUntypedResultsFromTable(IDataReader table)
private static IEnumerable<IDictionary<string, object>> CreateResultsFromTable(IDataReader table)
{
var results = new List<IDictionary<string, object>>();

Expand All @@ -296,7 +175,7 @@ private IEnumerable<IDictionary<string, object>> CreateUntypedResultsFromTable(I
return results;
}

private KustoQueryStats GetQueryStats(IDataReader table)
private static KustoQueryStats GetQueryStats(IDataReader table)
{
var levelNameColumnIndex = -1;
var payloadColumnIndex = -1;
Expand Down

0 comments on commit c9ce5f3

Please sign in to comment.