Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose scan projection schema #161

Merged
merged 3 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 12 additions & 129 deletions src/Knet.Kudu.Client/KuduScanEnumerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,12 @@
using Knet.Kudu.Client.Requests;
using Knet.Kudu.Client.Scanner;
using Knet.Kudu.Client.Tablet;
using Knet.Kudu.Client.Util;
using Microsoft.Extensions.Logging;

namespace Knet.Kudu.Client
{
public class KuduScanEnumerator<T> : IAsyncEnumerator<T>
{
private static readonly ByteString _defaultDeletedColumnValue =
UnsafeByteOperations.UnsafeWrap(new byte[] { 0 });

private readonly ILogger _logger;
private readonly KuduClient _client;
private readonly KuduTable _table;
Expand Down Expand Up @@ -56,7 +52,7 @@ public class KuduScanEnumerator<T> : IAsyncEnumerator<T>
/// If == DONE, then we're done scanning.
/// Otherwise it contains a proper tabletSlice name, and we're currently scanning.
/// </summary>
internal RemoteTablet Tablet { get; set; }
internal RemoteTablet Tablet { get; private set; }

internal long SnapshotTimestamp { get; private set; }

Expand All @@ -69,9 +65,11 @@ public KuduScanEnumerator(
KuduClient client,
KuduTable table,
IKuduScanParser<T> parser,
List<string> projectedColumnNames,
List<int> projectedColumnIndexes,
List<ColumnSchemaPB> projectedColumnsPb,
KuduSchema projectionSchema,
OrderModePB orderMode,
ReadMode readMode,
ReplicaSelection replicaSelection,
RowDataFormat rowDataFormat,
bool isFaultTolerant,
Dictionary<string, KuduPredicate> predicates,
Expand All @@ -81,117 +79,41 @@ public KuduScanEnumerator(
byte[] endPrimaryKey,
long startTimestamp,
long htTimestamp,
int? batchSizeBytes,
int batchSizeBytes,
PartitionPruner partitionPruner,
ReplicaSelection replicaSelection,
CancellationToken cancellationToken)
{
if (htTimestamp != KuduClient.NoTimestamp)
{
if (htTimestamp < 0)
throw new ArgumentException(
$"Need non-negative number for the scan, timestamp got {htTimestamp}");

if (readMode != ReadMode.ReadAtSnapshot)
throw new ArgumentException(
"When specifying a HybridClock timestamp, the read mode needs to be set to READ_AT_SNAPSHOT");
}
if (startTimestamp != KuduClient.NoTimestamp)
{
if (htTimestamp < 0)
throw new ArgumentException(
"Must have both start and end timestamps for a diff scan");

if (startTimestamp > htTimestamp)
throw new ArgumentException(
"Start timestamp must be less than or equal to end timestamp");
}

_isFaultTolerant = isFaultTolerant;
if (isFaultTolerant)
{
if (readMode != ReadMode.ReadAtSnapshot)
throw new ArgumentException("Use of fault tolerance scanner " +
"requires the read mode to be set to READ_AT_SNAPSHOT");
_orderMode = OrderModePB.Ordered;
}
else
_orderMode = OrderModePB.Unordered;

_logger = logger;
_client = client;
_table = table;
_parser = parser;
_partitionPruner = partitionPruner;
_orderMode = orderMode;
_readMode = readMode;
_columns = projectedColumnsPb;
_schema = projectionSchema;
_predicates = predicates;
_replicaSelection = replicaSelection;
_rowDataFormat = rowDataFormat;
_isFaultTolerant = isFaultTolerant;
_limit = limit;
_cacheBlocks = cacheBlocks;
_startPrimaryKey = startPrimaryKey ?? Array.Empty<byte>();
_endPrimaryKey = endPrimaryKey ?? Array.Empty<byte>();
_startTimestamp = startTimestamp;
SnapshotTimestamp = htTimestamp;
_batchSizeBytes = batchSizeBytes;
_lastPrimaryKey = Array.Empty<byte>();
_cancellationToken = cancellationToken;
ResourceMetrics = new ResourceMetrics();

// Map the column names to actual columns in the table schema.
// If the user set this to 'null', we scan all columns.
_columns = new List<ColumnSchemaPB>();
var columns = new List<ColumnSchema>();
if (projectedColumnNames != null)
{
foreach (string columnName in projectedColumnNames)
{
ColumnSchema originalColumn = table.Schema.GetColumn(columnName);
_columns.Add(ToColumnSchemaPb(originalColumn));
columns.Add(originalColumn);
}
}
else if (projectedColumnIndexes != null)
{
foreach (int columnIndex in projectedColumnIndexes)
{
ColumnSchema originalColumn = table.Schema.GetColumn(columnIndex);
_columns.Add(ToColumnSchemaPb(originalColumn));
columns.Add(originalColumn);
}
}
else
{
foreach (ColumnSchema columnSchema in table.Schema.Columns)
{
_columns.Add(ToColumnSchemaPb(columnSchema));
columns.Add(columnSchema);
}
}

int isDeletedIndex = -1;

// This is a diff scan so add the IS_DELETED column.
if (startTimestamp != KuduClient.NoTimestamp)
{
var deletedColumn = GenerateIsDeletedColumn(table.Schema);
_columns.Add(deletedColumn);

var delColumn = new ColumnSchema(deletedColumn.Name, KuduType.Bool);
columns.Add(delColumn);
isDeletedIndex = columns.Count - 1;
}

_schema = new KuduSchema(columns, isDeletedIndex);
_batchSizeBytes = batchSizeBytes ?? _schema.GetScannerBatchSizeEstimate();

// If the partition pruner has pruned all partitions, then the scan can be
// short circuited without contacting any tablet servers.
if (!_partitionPruner.HasMorePartitionKeyRanges)
{
_closed = true;
}

_replicaSelection = replicaSelection;

// For READ_YOUR_WRITES scan mode, get the latest observed timestamp
// and store it. Always use this one as the propagated timestamp for
// the duration of the scan to avoid unnecessary wait.
Expand Down Expand Up @@ -589,44 +511,5 @@ private void ClearCurrent()
(current as IDisposable)?.Dispose();
}
}

/// <summary>
/// Generates and returns a ColumnSchema for the virtual IS_DELETED column.
/// The column name is generated to ensure there is never a collision.
/// </summary>
/// <param name="schema">The table schema.</param>
private static ColumnSchemaPB GenerateIsDeletedColumn(KuduSchema schema)
{
var columnName = "is_deleted";

// If the column already exists and we need to pick an alternate column name.
while (schema.HasColumn(columnName))
{
columnName += "_";
}

return new ColumnSchemaPB
{
Name = columnName,
Type = DataTypePB.IsDeleted,
ReadDefaultValue = _defaultDeletedColumnValue,
IsNullable = false,
IsKey = false
};
}

private static ColumnSchemaPB ToColumnSchemaPb(ColumnSchema columnSchema)
{
return new ColumnSchemaPB
{
Name = columnSchema.Name,
Type = (DataTypePB)columnSchema.Type,
IsNullable = columnSchema.IsNullable,
// Set isKey to false on the passed ColumnSchema.
// This allows out of order key columns in projections.
IsKey = false,
TypeAttributes = columnSchema.TypeAttributes.ToTypeAttributesPb()
};
}
}
}
Loading