diff --git a/test/Knet.Kudu.Client.FunctionalTests/FlexiblePartitioningTests.cs b/test/Knet.Kudu.Client.FunctionalTests/FlexiblePartitioningTests.cs index 58163ef7..7ce548a5 100644 --- a/test/Knet.Kudu.Client.FunctionalTests/FlexiblePartitioningTests.cs +++ b/test/Knet.Kudu.Client.FunctionalTests/FlexiblePartitioningTests.cs @@ -118,9 +118,12 @@ private async Task TestPartitionSchemaAsync(TableBuilder tableBuilder) var rows = CreateRows(); await InsertRowsAsync(table, rows); + var tableScanner = _client.NewScanBuilder(table) + .SetReadMode(ReadMode.ReadYourWrites) + .Build(); + // Full table scan - Assert.Equal(rows, await CollectRowsAsync( - _client.NewScanBuilder(table).Build())); + Assert.Equal(rows, await CollectRowsAsync(tableScanner)); { // Lower bound var minRow = new Row("1", "3", "5"); @@ -132,11 +135,14 @@ private async Task TestPartitionSchemaAsync(TableBuilder tableBuilder) .ToHashSet(); var scanner = _client.NewScanBuilder(table) - .LowerBound(lowerBound).Build(); + .SetReadMode(ReadMode.ReadYourWrites) + .LowerBound(lowerBound) + .Build(); var results = await CollectRowsAsync(scanner); Assert.Equal(expected, results); var scanTokenBuilder = _client.NewScanTokenBuilder(table) + .SetReadMode(ReadMode.ReadYourWrites) .LowerBound(lowerBound); var tokenResults = await CollectRowsAsync(scanTokenBuilder); Assert.Equal(expected, tokenResults); @@ -152,11 +158,14 @@ private async Task TestPartitionSchemaAsync(TableBuilder tableBuilder) .ToHashSet(); var scanner = _client.NewScanBuilder(table) - .ExclusiveUpperBound(upperBound).Build(); + .SetReadMode(ReadMode.ReadYourWrites) + .ExclusiveUpperBound(upperBound) + .Build(); var results = await CollectRowsAsync(scanner); Assert.Equal(expected, results); var scanTokenBuilder = _client.NewScanTokenBuilder(table) + .SetReadMode(ReadMode.ReadYourWrites) .ExclusiveUpperBound(upperBound); var tokenResults = await CollectRowsAsync(scanTokenBuilder); Assert.Equal(expected, tokenResults); @@ -175,6 +184,7 @@ private async Task TestPartitionSchemaAsync(TableBuilder tableBuilder) .ToHashSet(); var scanner = _client.NewScanBuilder(table) + .SetReadMode(ReadMode.ReadYourWrites) .LowerBound(lowerBound) .ExclusiveUpperBound(upperBound) .Build(); @@ -183,6 +193,7 @@ private async Task TestPartitionSchemaAsync(TableBuilder tableBuilder) Assert.Equal(expected, results); var scanTokenBuilder = _client.NewScanTokenBuilder(table) + .SetReadMode(ReadMode.ReadYourWrites) .LowerBound(lowerBound) .ExclusiveUpperBound(upperBound); var tokenResults = await CollectRowsAsync(scanTokenBuilder); @@ -198,6 +209,7 @@ private async Task TestPartitionSchemaAsync(TableBuilder tableBuilder) foreach (var tablet in tablets) { var scanner = _client.NewScanBuilder(table) + .SetReadMode(ReadMode.ReadYourWrites) .LowerBoundPartitionKeyRaw(tablet.Partition.PartitionKeyStart) .ExclusiveUpperBoundPartitionKeyRaw(tablet.Partition.PartitionKeyEnd) .Build(); @@ -227,6 +239,7 @@ private async Task TestPartitionSchemaAsync(TableBuilder tableBuilder) foreach (var tablet in tablets) { var scanner = _client.NewScanBuilder(table) + .SetReadMode(ReadMode.ReadYourWrites) .LowerBound(lowerBound) .ExclusiveUpperBound(upperBound) .LowerBoundPartitionKeyRaw(tablet.Partition.PartitionKeyStart) diff --git a/test/Knet.Kudu.Client.FunctionalTests/MultipleLeaderFailoverTests.cs b/test/Knet.Kudu.Client.FunctionalTests/MultipleLeaderFailoverTests.cs index 91380df6..5a62701e 100644 --- a/test/Knet.Kudu.Client.FunctionalTests/MultipleLeaderFailoverTests.cs +++ b/test/Knet.Kudu.Client.FunctionalTests/MultipleLeaderFailoverTests.cs @@ -45,7 +45,9 @@ public async Task TestMultipleFailover(bool restart) } await session.FlushAsync(); - await ClientTestUtil.WaitUntilRowCountAsync(client, table, rowsPerIteration); + + var rowCount = await ClientTestUtil.CountRowsAsync(client, table); + Assert.Equal(rowsPerIteration, rowCount); int currentRows = rowsPerIteration; for (int i = 0; i < numIterations; i++) @@ -71,9 +73,11 @@ public async Task TestMultipleFailover(bool restart) if (!restart) await harness.StartAllTabletServersAsync(); - await ClientTestUtil.WaitUntilRowCountAsync(client, table, currentRows); + rowCount = await ClientTestUtil.CountRowsAsync(client, table); + Assert.Equal(currentRows, rowCount); } - await ClientTestUtil.WaitUntilRowCountAsync(client, table, totalRowsToInsert); + rowCount = await ClientTestUtil.CountRowsAsync(client, table); + Assert.Equal(totalRowsToInsert, rowCount); } } diff --git a/test/Knet.Kudu.Client.FunctionalTests/ScanMultiTabletTests.cs b/test/Knet.Kudu.Client.FunctionalTests/ScanMultiTabletTests.cs index 3efe5517..ba6188ed 100644 --- a/test/Knet.Kudu.Client.FunctionalTests/ScanMultiTabletTests.cs +++ b/test/Knet.Kudu.Client.FunctionalTests/ScanMultiTabletTests.cs @@ -14,7 +14,7 @@ public class ScanMultiTabletTests : IAsyncLifetime private readonly string _tableName = "ScanMultiTabletTests"; private KuduTestHarness _harness; private KuduClient _client; - private IKuduSession _session; + private KuduClient _newClient; private KuduTable _table; private KuduSchema _schema; @@ -27,9 +27,8 @@ public class ScanMultiTabletTests : IAsyncLifetime public async Task InitializeAsync() { _harness = await new MiniKuduClusterBuilder().BuildHarnessAsync(); - - await using var client = _harness.CreateClient(); - await using var session = client.NewSession(); + _client = _harness.CreateClient(); + await using var session = _client.NewSession(); // Create a 4-tablets table for scanning. var builder = new TableBuilder(_tableName) @@ -47,7 +46,7 @@ public async Task InitializeAsync() }); } - var table = await client.CreateTableAsync(builder); + var table = await _client.CreateTableAsync(builder); // The data layout ends up like this: // tablet '', '1': no rows @@ -68,25 +67,24 @@ public async Task InitializeAsync() } } - _beforeWriteTimestamp = client.LastPropagatedTimestamp; + _beforeWriteTimestamp = _client.LastPropagatedTimestamp; // Reset the client in order to clear the propagated timestamp. - _client = _harness.CreateClient(); - _session = _client.NewSession(); + _newClient = _harness.CreateClient(); // Reopen the table using the new client. - _table = await _client.OpenTableAsync(_tableName); + _table = await _newClient.OpenTableAsync(_tableName); _schema = _table.Schema; } public async Task DisposeAsync() { - if (_session != null) - await _session.DisposeAsync(); - - if (_client != null) + if (_client is not null) await _client.DisposeAsync(); + if (_newClient is not null) + await _newClient.DisposeAsync(); + await _harness.DisposeAsync(); } @@ -149,7 +147,7 @@ public async Task TestKeyStartEnd() GetScanner(null, null, null, null))); // Full table scan with empty bounds // Test that we can close a scanner while in between two tablets. We start on - // the second tablet and our first nextRows() will get 3 rows. At that moment + // the second tablet and our first ResultSet will get 3 rows. At that moment // we want to close the scanner before getting on the 3rd tablet. var scanner = GetScanner("1", "", null, null); var scanEnumerator = scanner.GetAsyncEnumerator(); @@ -212,33 +210,33 @@ public async Task TestKeysAndPredicates() } [SkippableFact] - public async Task TestProjections() + public void TestProjections() { // Test with column names. - var builder = _client.NewScanBuilder(_table) + var builder = _newClient.NewScanBuilder(_table) .SetProjectedColumns("key1", "key2"); - await BuildScannerAndCheckColumnsCountAsync(builder, "key1", "key2"); + BuildScannerAndCheckColumnsCount(builder, "key1", "key2"); // Test with column indexes. - builder = _client.NewScanBuilder(_table) + builder = _newClient.NewScanBuilder(_table) .SetProjectedColumns(0, 1); - await BuildScannerAndCheckColumnsCountAsync(builder, "key1", "key2"); + BuildScannerAndCheckColumnsCount(builder, "key1", "key2"); // Test with column names overriding indexes. - builder = _client.NewScanBuilder(_table) + builder = _newClient.NewScanBuilder(_table) .SetProjectedColumns(0, 1) .SetProjectedColumns("key1"); - await BuildScannerAndCheckColumnsCountAsync(builder, "key1"); + BuildScannerAndCheckColumnsCount(builder, "key1"); // Test with keys last with indexes. - builder = _client.NewScanBuilder(_table) + builder = _newClient.NewScanBuilder(_table) .SetProjectedColumns(2, 1, 0); - await BuildScannerAndCheckColumnsCountAsync(builder, "val", "key2", "key1"); + BuildScannerAndCheckColumnsCount(builder, "val", "key2", "key1"); // Test with keys last with column names. - builder = _client.NewScanBuilder(_table) + builder = _newClient.NewScanBuilder(_table) .SetProjectedColumns("val", "key1"); - await BuildScannerAndCheckColumnsCountAsync(builder, "val", "key1"); + BuildScannerAndCheckColumnsCount(builder, "val", "key1"); } [SkippableTheory] @@ -246,34 +244,39 @@ public async Task TestProjections() [InlineData(ReplicaSelection.ClosestReplica)] public async Task TestReplicaSelections(ReplicaSelection replicaSelection) { - var scanner = _client.NewScanBuilder(_table) + var scanner = _newClient.NewScanBuilder(_table) .SetReplicaSelection(replicaSelection) .Build(); - Assert.Equal(9, await ClientTestUtil.CountRowsInScanAsync(scanner)); + await ClientTestUtil.WaitUntilRowCountAsync(scanner, 9); } [SkippableFact] public async Task TestScanTokenReplicaSelections() { - var tokens = await _client.NewScanTokenBuilder(_table) + var tokens = await _newClient.NewScanTokenBuilder(_table) .SetReplicaSelection(ReplicaSelection.ClosestReplica) .BuildAsync(); - long totalRows = 0; - - foreach (var token in tokens) + var totalRows = await ClientTestUtil.WaitUntilAsync(rows => rows == 9, async () => { - var serializedToken = token.Serialize(); + long rows = 0; - // Deserialize the scan token into a scanner, and make sure it is using - // 'CLOSEST_REPLICA' selection policy. - var scanBuilder = await _client.NewScanBuilderFromTokenAsync(serializedToken); - var scanner = scanBuilder.Build(); + foreach (var token in tokens) + { + var serializedToken = token.Serialize(); - Assert.Equal(ReplicaSelection.ClosestReplica, scanner.ReplicaSelection); - totalRows += await ClientTestUtil.CountRowsInScanAsync(scanner); - } + // Deserialize the scan token into a scanner, and make sure it is using + // 'CLOSEST_REPLICA' selection policy. + var scanBuilder = await _newClient.NewScanBuilderFromTokenAsync(serializedToken); + var scanner = scanBuilder.Build(); + + Assert.Equal(ReplicaSelection.ClosestReplica, scanner.ReplicaSelection); + rows += await ClientTestUtil.CountRowsInScanAsync(scanner); + } + + return rows; + }); Assert.Equal(9, totalRows); } @@ -285,7 +288,7 @@ public async Task TestReadAtSnapshotNoTimestamp() // specified. Verify that the scanner timestamp is set from the tablet // server response. - var scanner = _client.NewScanBuilder(_table) + var scanner = _newClient.NewScanBuilder(_table) .SetReadMode(ReadMode.ReadAtSnapshot) .Build(); @@ -320,14 +323,14 @@ public async Task TestReadYourWritesFreshClientFreshTable() { // Perform scan in READ_YOUR_WRITES mode. Before the scan, verify that the // propagated timestamp is unset, since this is a fresh client. - var scanner = _client.NewScanBuilder(_table) + var scanner = _newClient.NewScanBuilder(_table) .SetReadMode(ReadMode.ReadYourWrites) .Build(); await using var scanEnumerator = scanner.GetAsyncEnumerator(); Assert.Equal(ReadMode.ReadYourWrites, scanner.ReadMode); - Assert.Equal(KuduClient.NoTimestamp, _client.LastPropagatedTimestamp); + Assert.Equal(KuduClient.NoTimestamp, _newClient.LastPropagatedTimestamp); Assert.Equal(KuduClient.NoTimestamp, scanEnumerator.SnapshotTimestamp); // Since there isn't any write performed from the client, the count @@ -341,7 +344,7 @@ public async Task TestReadYourWritesFreshClientFreshTable() Assert.True(count >= 0); Assert.True(count <= 9); - Assert.NotEqual(KuduClient.NoTimestamp, _client.LastPropagatedTimestamp); + Assert.NotEqual(KuduClient.NoTimestamp, _newClient.LastPropagatedTimestamp); Assert.NotEqual(KuduClient.NoTimestamp, scanEnumerator.SnapshotTimestamp); } @@ -352,12 +355,12 @@ public async Task TestReadYourWrites() // Update the propagated timestamp to ensure we see the rows written // in the constructor. - _client.LastPropagatedTimestamp = preTs; + _newClient.LastPropagatedTimestamp = preTs; // Perform scan in READ_YOUR_WRITES mode. Before the scan, verify that the // scanner timestamp is not yet set. It will get set only once the scan // is opened. - var scanner = _client.NewScanBuilder(_table) + var scanner = _newClient.NewScanBuilder(_table) .SetReadMode(ReadMode.ReadYourWrites) .Build(); @@ -390,12 +393,12 @@ public async Task TestReadYourWrites() return insert; }); - await _client.WriteAsync(rows); + await _newClient.WriteAsync(rows); scanEnumerator = scanner.GetAsyncEnumerator(); - Assert.True(preTs < _client.LastPropagatedTimestamp); - preTs = _client.LastPropagatedTimestamp; + Assert.True(preTs < _newClient.LastPropagatedTimestamp); + preTs = _newClient.LastPropagatedTimestamp; count = 0; while (await scanEnumerator.MoveNextAsync()) @@ -416,9 +419,9 @@ public async Task TestReadYourWrites() public async Task TestScanPropagatesLatestTimestamp() { // Initially, the client does not have the timestamp set. - Assert.Equal(KuduClient.NoTimestamp, _client.LastPropagatedTimestamp); + Assert.Equal(KuduClient.NoTimestamp, _newClient.LastPropagatedTimestamp); - var scanner = _client.NewScanBuilder(_table).Build(); + var scanner = _newClient.NewScanBuilder(_table).Build(); var scanEnumerator = scanner.GetAsyncEnumerator(); Assert.True(await scanEnumerator.MoveNextAsync()); @@ -427,13 +430,13 @@ public async Task TestScanPropagatesLatestTimestamp() // At this point, the call to the first tablet server should have been // done already, so the client should have received the propagated timestamp // in the scanner response. - long tsRef = _client.LastPropagatedTimestamp; + long tsRef = _newClient.LastPropagatedTimestamp; Assert.NotEqual(KuduClient.NoTimestamp, tsRef); while (await scanEnumerator.MoveNextAsync()) { rowCount += scanEnumerator.Current.Count; - var ts = _client.LastPropagatedTimestamp; + var ts = _newClient.LastPropagatedTimestamp; // Next scan responses from tablet servers should move the propagated // timestamp further. @@ -448,14 +451,14 @@ public async Task TestScanPropagatesLatestTimestamp() public async Task TestScanTokenPropagatesTimestamp() { // Initially, the client does not have the timestamp set. - Assert.Equal(KuduClient.NoTimestamp, _client.LastPropagatedTimestamp); + Assert.Equal(KuduClient.NoTimestamp, _newClient.LastPropagatedTimestamp); - var scanner = _client.NewScanBuilder(_table).Build(); + var scanner = _newClient.NewScanBuilder(_table).Build(); await using var scanEnumerator = scanner.GetAsyncEnumerator(); // Let the client receive the propagated timestamp in the scanner response. Assert.True(await scanEnumerator.MoveNextAsync()); - var tsPrev = _client.LastPropagatedTimestamp; + var tsPrev = _newClient.LastPropagatedTimestamp; var tsPropagated = tsPrev + 1000000; var tokenPb = new ScanTokenPB @@ -469,18 +472,18 @@ public async Task TestScanTokenPropagatesTimestamp() // Deserialize scan tokens and make sure the client's last propagated // timestamp is updated accordingly. - Assert.Equal(tsPrev, _client.LastPropagatedTimestamp); + Assert.Equal(tsPrev, _newClient.LastPropagatedTimestamp); - var scanBuilder = await _client.NewScanBuilderFromTokenAsync(serializedToken); + var scanBuilder = await _newClient.NewScanBuilderFromTokenAsync(serializedToken); var tokenScanner = scanBuilder.Build(); - Assert.Equal(tsPropagated, _client.LastPropagatedTimestamp); + Assert.Equal(tsPropagated, _newClient.LastPropagatedTimestamp); } [SkippableFact] public async Task TestScanTokenReadMode() { - var tokens = await _client.NewScanTokenBuilder(_table) + var tokens = await _newClient.NewScanTokenBuilder(_table) .SetReadMode(ReadMode.ReadYourWrites) .BuildAsync(); @@ -489,7 +492,7 @@ public async Task TestScanTokenReadMode() // Deserialize scan tokens and make sure the read mode is updated accordingly. foreach (var token in tokens) { - var scanBuilder = await _client.NewScanBuilderFromTokenAsync(token); + var scanBuilder = await _newClient.NewScanBuilderFromTokenAsync(token); var scanner = scanBuilder.Build(); Assert.Equal(ReadMode.ReadYourWrites, scanner.ReadMode); @@ -503,7 +506,8 @@ private KuduScanner GetScanner( string exclusiveUpperBoundKeyTwo, params KuduPredicate[] predicates) { - var builder = _client.NewScanBuilder(_table); + var builder = _client.NewScanBuilder(_table) + .SetReadMode(ReadMode.ReadYourWrites); if (lowerBoundKeyOne != null) { @@ -527,16 +531,11 @@ private KuduScanner GetScanner( return builder.Build(); } - private static async Task BuildScannerAndCheckColumnsCountAsync( + private static void BuildScannerAndCheckColumnsCount( KuduScannerBuilder builder, params string[] expectedColumnNames) { var scanner = builder.Build(); - KuduSchema schema = null; - - await foreach (var resultSet in scanner) - { - schema = resultSet.Schema; - } + var schema = scanner.ProjectionSchema; Assert.Equal( expectedColumnNames, diff --git a/test/Knet.Kudu.Client.FunctionalTests/ScannerTests.cs b/test/Knet.Kudu.Client.FunctionalTests/ScannerTests.cs index c3cf958e..e65dc2e4 100644 --- a/test/Knet.Kudu.Client.FunctionalTests/ScannerTests.cs +++ b/test/Knet.Kudu.Client.FunctionalTests/ScannerTests.cs @@ -124,7 +124,10 @@ public async Task TestIterable() await session.FlushAsync(); - var scanner = client.NewScanBuilder(table).Build(); + var scanner = client.NewScanBuilder(table) + .SetReadMode(ReadMode.ReadYourWrites) + .SetReplicaSelection(ReplicaSelection.LeaderOnly) + .Build(); await foreach (var resultSet in scanner) { @@ -174,9 +177,9 @@ public async Task TestScannerExpiration() .Select(i => ClientTestUtil.CreateBasicSchemaInsert(table, i)); await client.WriteAsync(rows); - await ClientTestUtil.WaitUntilRowCountAsync(client, table, numRows); var scanner = client.NewScanBuilder(table) + .SetReadMode(ReadMode.ReadYourWrites) .SetReplicaSelection(ReplicaSelection.ClosestReplica) .SetBatchSizeBytes(100) // Use a small batch size so we get many batches. .Build(); @@ -311,6 +314,7 @@ public async Task TestOpenScanWithDroppedPartition() var scanner = client.NewScanBuilder(table) // Set a small batch size so the first scan doesn't read all the rows. .SetBatchSizeBytes(100) + .SetReadMode(ReadMode.ReadYourWrites) .Build(); long rowsScanned = 0; diff --git a/test/Knet.Kudu.Client.FunctionalTests/Util/ClientTestUtil.cs b/test/Knet.Kudu.Client.FunctionalTests/Util/ClientTestUtil.cs index 9535c32b..382f2d2b 100644 --- a/test/Knet.Kudu.Client.FunctionalTests/Util/ClientTestUtil.cs +++ b/test/Knet.Kudu.Client.FunctionalTests/Util/ClientTestUtil.cs @@ -188,7 +188,10 @@ public static KuduOperation CreateAllNullsInsert(KuduTable table, int key) public static Task CountRowsAsync(KuduClient client, KuduTable table) { - var scanner = client.NewScanBuilder(table).Build(); + var scanner = client.NewScanBuilder(table) + .SetReadMode(ReadMode.ReadYourWrites) + .SetReplicaSelection(ReplicaSelection.LeaderOnly) + .Build(); return CountRowsInScanAsync(scanner); } @@ -216,7 +219,9 @@ public static async Task> ScanTableToStringsAsync( KuduClient client, KuduTable table, params KuduPredicate[] predicates) { var rowStrings = new List(); - var scanBuilder = client.NewScanBuilder(table); + var scanBuilder = client.NewScanBuilder(table) + .SetReadMode(ReadMode.ReadYourWrites) + .SetReplicaSelection(ReplicaSelection.LeaderOnly); foreach (var predicate in predicates) scanBuilder.AddPredicate(predicate); @@ -241,14 +246,12 @@ void ParseResults(ResultSet resultSet) return rowStrings; } - public static async Task WaitUntilRowCountAsync( - KuduClient client, KuduTable table, int rowCount) + public static async Task WaitUntilRowCountAsync(KuduScanner scanner, int rowCount) { long readCount = 0; for (int i = 0; i < 10; i++) { - var scanner = client.NewScanBuilder(table).Build(); readCount = await CountRowsInScanAsync(scanner); if (readCount == rowCount) @@ -259,4 +262,21 @@ public static async Task WaitUntilRowCountAsync( Assert.Equal(rowCount, readCount); } + + public static async Task WaitUntilAsync(Func predicate, Func> action) + { + T result = default; + + for (int i = 0; i < 10; i++) + { + result = await action(); + + if (predicate(result)) + return result; + + await Task.Delay(TimeSpan.FromSeconds(1)); + } + + return result; + } }