Skip to content

Commit

Permalink
Add KuduPartitioner (#133)
Browse files Browse the repository at this point in the history
The API allows a client to determine which partition a row falls into
without actually writing that row.
  • Loading branch information
xqrzd authored Sep 15, 2020
1 parent 1c4a991 commit d07c2d1
Show file tree
Hide file tree
Showing 5 changed files with 388 additions and 19 deletions.
49 changes: 39 additions & 10 deletions src/Knet.Kudu.Client/KuduClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,19 @@ public IKuduSession NewSession(KuduSessionOptions options)
return new KuduSession(this, options, _loggerFactory);
}

public async ValueTask<KuduPartitioner> CreatePartitionerAsync(
KuduTable table, CancellationToken cancellationToken = default)
{
var tablets = await LoopLocateTableAsync(
table.TableId,
null,
null,
1000,
cancellationToken).ConfigureAwait(false);

return new KuduPartitioner(table, tablets);
}

private async Task<KuduTable> OpenTableAsync(
TableIdentifierPB tableIdentifier, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -723,8 +736,28 @@ private TableLocationsCache GetTableLocationsCache(string tableId)
#endif
}

private async Task LoopLocateTableAsync(
KuduTable table,
public async ValueTask<List<RemoteTablet>> LoopLocateTableAsync(
string tableId,
byte[] startPartitionKey,
byte[] endPartitionKey,
int fetchBatchSize,
CancellationToken cancellationToken = default)
{
var tablets = new List<RemoteTablet>();

await LoopLocateTableAsync(
tableId,
startPartitionKey,
endPartitionKey,
fetchBatchSize,
tablets,
cancellationToken).ConfigureAwait(false);

return tablets;
}

private async ValueTask LoopLocateTableAsync(
string tableId,
byte[] startPartitionKey,
byte[] endPartitionKey,
int fetchBatchSize,
Expand All @@ -741,7 +774,6 @@ private async Task LoopLocateTableAsync(
// The next partition key to look up. If null, then it represents
// the minimum partition key, If empty, it represents the maximum key.
byte[] partitionKey = startPartitionKey;
string tableId = table.TableId;

// Continue while the partition key is the minimum, or it is not the maximum
// and it is less than the end partition key.
Expand Down Expand Up @@ -771,7 +803,7 @@ await GetTableLocationsAsync(tableId, key, fetchBatchSize, cancellationToken)
.ConfigureAwait(false);

await LoopLocateTableAsync(
table,
tableId,
lookupKey,
endPartitionKey,
fetchBatchSize,
Expand Down Expand Up @@ -820,7 +852,7 @@ private Task<List<KeyRangePB>> GetTabletKeyRangesAsync(
}

internal async ValueTask<List<KeyRange>> GetTableKeyRangesAsync(
KuduTable table,
string tableId,
byte[] startPrimaryKey,
byte[] endPrimaryKey,
byte[] startPartitionKey,
Expand All @@ -829,14 +861,11 @@ internal async ValueTask<List<KeyRange>> GetTableKeyRangesAsync(
long splitSizeBytes,
CancellationToken cancellationToken = default)
{
var tablets = new List<RemoteTablet>();

await LoopLocateTableAsync(
table,
var tablets = await LoopLocateTableAsync(
tableId,
startPartitionKey,
endPartitionKey,
fetchBatchSize,
tablets,
cancellationToken).ConfigureAwait(false);

var keyRanges = new List<KeyRange>(tablets.Count);
Expand Down
205 changes: 205 additions & 0 deletions src/Knet.Kudu.Client/KuduPartitioner.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
using System;
using System.Collections.Generic;
using Knet.Kudu.Client.Exceptions;
using Knet.Kudu.Client.Internal;
using Knet.Kudu.Client.Tablet;
using Knet.Kudu.Client.Util;

namespace Knet.Kudu.Client
{
/// <summary>
/// <para>
/// A <see cref="KuduPartitioner"/> allows clients to determine the target
/// partition of a row without actually performing a write. The set of
/// partitions is eagerly fetched when the KuduPartitioner is constructed
/// so that the actual partitioning step can be performed synchronously
/// without any network trips.
/// </para>
///
/// <para>
/// NOTE: Because this operates on a metadata snapshot retrieved at
/// construction time, it will not reflect any metadata changes to the
/// table that have occurred since its creation.
/// </para>
/// </summary>
public class KuduPartitioner
{
private readonly PartitionSchema _partitionSchema;
private readonly AvlTree _cache;

public int NumPartitions { get; }

public KuduPartitioner(KuduTable table, List<RemoteTablet> tablets)
{
_partitionSchema = table.PartitionSchema;
_cache = new AvlTree();

NumPartitions = tablets.Count;
InitializeCache(tablets);
}

/// <summary>
/// Determine if the given row falls into a valid partition.
/// </summary>
/// <param name="row">The row to check.</param>
public bool IsCovered(PartialRow row)
{
var entry = GetCacheEntry(row);
return entry.IsCoveredRange;
}

public PartitionResult GetRowTablet(PartialRow row)
{
var entry = GetCacheEntry(row);

if (entry.IsNonCoveredRange)
{
throw new NonCoveredRangeException(
entry.LowerBoundPartitionKey,
entry.UpperBoundPartitionKey);
}

return new PartitionResult(entry.Tablet, entry.PartitionIndex);
}

private PartitionerLocationEntry GetCacheEntry(PartialRow row)
{
var partitionSchema = _partitionSchema;
int maxSize = KeyEncoder.CalculateMaxPartitionKeySize(row, partitionSchema);
Span<byte> buffer = stackalloc byte[maxSize];

KeyEncoder.EncodePartitionKey(
row,
partitionSchema,
buffer,
out int bytesWritten);

var partitionKey = buffer.Slice(0, bytesWritten);

return (PartitionerLocationEntry)_cache.FloorEntry(partitionKey);
}

private void InitializeCache(List<RemoteTablet> tablets)
{
var newEntries = new List<PartitionerLocationEntry>();
int partitionIndex = 0;

if (tablets.Count == 0)
{
// If there are no tablets in the response, then the table is empty. If
// there were any tablets in the table they would have been returned, since
// the master guarantees that if the partition key falls in a non-covered
// range, the previous tablet will be returned, and we did not set an upper
// bound partition key on the request.

newEntries.Add(PartitionerLocationEntry.NewNonCoveredRange2(
Array.Empty<byte>(),
Array.Empty<byte>()));
}
else
{
// The comments below will reference the following diagram:
//
// +---+ +---+---+
// | | | | |
// A | B | C | D | E | F
// | | | | |
// +---+ +---+---+
//
// It depicts a tablet locations response from the master containing three
// tablets: B, D and E. Three non-covered ranges are present: A, C, and F.
// An RPC response containing B, D and E could occur if the lookup partition
// key falls in A, B, or C, although the existence of A as an initial
// non-covered range can only be inferred if the lookup partition key falls
// in A.

byte[] firstLowerBound = tablets[0].Partition.PartitionKeyStart;

if (firstLowerBound.Length > 0)
{
// There is an initial non-covered range, such as A.
newEntries.Add(PartitionerLocationEntry.NewNonCoveredRange2(
Array.Empty<byte>(), firstLowerBound));
}

// lastUpperBound tracks the upper bound of the previously processed
// entry, so that we can determine when we have found a non-covered range.
byte[] lastUpperBound = firstLowerBound;

foreach (var tablet in tablets)
{
byte[] tabletLowerBound = tablet.Partition.PartitionKeyStart;
byte[] tabletUpperBound = tablet.Partition.PartitionKeyEnd;

if (lastUpperBound.SequenceCompareTo(tabletLowerBound) < 0)
{
// There is a non-covered range between the previous tablet and this tablet.
// This will discover C while processing the tablet location for D.
newEntries.Add(PartitionerLocationEntry.NewNonCoveredRange2(
lastUpperBound, tabletLowerBound));
}

lastUpperBound = tabletUpperBound;

// Now add the tablet itself (such as B, D, or E).
newEntries.Add(PartitionerLocationEntry.NewTablet2(tablet, partitionIndex++));
}

if (lastUpperBound.Length > 0)
{
// There is a non-covered range between the last tablet and the end
// of the partition key space, such as F.
newEntries.Add(PartitionerLocationEntry.NewNonCoveredRange2(
lastUpperBound, Array.Empty<byte>()));
}
}

foreach (var entry in newEntries)
_cache.Insert(entry);
}

private class PartitionerLocationEntry : TableLocationEntry
{
public int PartitionIndex { get; }

public PartitionerLocationEntry(
RemoteTablet tablet,
byte[] lowerBoundPartitionKey,
byte[] upperBoundPartitionKey,
int partitionIndex) : base(tablet, lowerBoundPartitionKey, upperBoundPartitionKey, -1)
{
PartitionIndex = partitionIndex;
}

public static PartitionerLocationEntry NewNonCoveredRange2(
byte[] lowerBoundPartitionKey,
byte[] upperBoundPartitionKey)
{
return new PartitionerLocationEntry(
null,
lowerBoundPartitionKey,
upperBoundPartitionKey,
-1);
}

public static PartitionerLocationEntry NewTablet2(
RemoteTablet tablet, int partitionIndex)
{
return new PartitionerLocationEntry(tablet, null, null, partitionIndex);
}
}
}

public readonly struct PartitionResult
{
public RemoteTablet Tablet { get; }

public int PartitionIndex { get; }

public PartitionResult(RemoteTablet tablet, int partitionIndex)
{
Tablet = tablet;
PartitionIndex = partitionIndex;
}
}
}
2 changes: 1 addition & 1 deletion src/Knet.Kudu.Client/KuduScanTokenBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public async ValueTask<List<KuduScanToken>> BuildAsync(
var partitionRange = pruner.NextPartitionKeyRange;

var newKeyRanges = await Client.GetTableKeyRangesAsync(
Table,
Table.TableId,
LowerBoundPrimaryKey,
UpperBoundPrimaryKey,
partitionRange.Lower.Length == 0 ? null : partitionRange.Lower,
Expand Down
22 changes: 14 additions & 8 deletions src/Knet.Kudu.Client/Tablet/RemoteTablet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,49 @@
namespace Knet.Kudu.Client.Tablet
{
/// <summary>
/// <para>
/// This class encapsulates the information regarding a tablet and its locations.
///
/// </para>
///
/// <para>
/// RemoteTablet's main function is to keep track of where the leader for this
/// tablet is. For example, an RPC might call GetServerInfo, contact that TS, find
/// it's not the leader anymore, and then re-fetch the tablet locations.
///
/// it's not the leader anymore, and then re-fetch the tablet locations. This
/// class is immutable.
/// </para>
///
/// <para>
/// A RemoteTablet's life is expected to be long in a cluster where roles aren't
/// changing often, and short when they do since the Kudu client will replace the
/// RemoteTablet it caches with new ones after getting tablet locations from the master.
/// </para>
/// </summary>
public class RemoteTablet : IEquatable<RemoteTablet>
{
private readonly ServerInfoCache _cache;
private readonly List<Replica> _replicas;

public string TableId { get; }

public string TabletId { get; }

public Partition Partition { get; }

public IReadOnlyList<Replica> Replicas { get; }

public RemoteTablet(
string tableId,
string tabletId,
Partition partition,
ServerInfoCache cache,
List<Replica> replicas)
IReadOnlyList<Replica> replicas)
{
TableId = tableId;
TabletId = tabletId;
Partition = partition;
Replicas = replicas;
_cache = cache;
_replicas = replicas;
}

public IReadOnlyList<Replica> Replicas => _replicas;

public ServerInfo GetServerInfo(
ReplicaSelection replicaSelection, string location = null)
{
Expand Down
Loading

0 comments on commit d07c2d1

Please sign in to comment.