Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 32 additions & 23 deletions src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,18 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
{
_compositeCancelToken.ThrowIfCancellationRequested();

var r = _partitionedBulkRequest;
var request = _partitionedBulkRequest;
var response = await _client.BulkAsync(s =>
{
s.Index(r.Index).Type(r.Type);
if (r.BufferToBulk != null) r.BufferToBulk(s, buffer);
s.Index(request.Index).Type(request.Type);
if (request.BufferToBulk != null) request.BufferToBulk(s, buffer);
else s.IndexMany(buffer);
if (!string.IsNullOrEmpty(r.Pipeline)) s.Pipeline(r.Pipeline);
if (!string.IsNullOrEmpty(request.Pipeline)) s.Pipeline(request.Pipeline);
#pragma warning disable 618
if (r.Refresh.HasValue) s.Refresh(r.Refresh.Value);
if (request.Refresh.HasValue) s.Refresh(request.Refresh.Value);
#pragma warning restore 618
if (r.Routing != null) s.Routing(r.Routing);
if (r.WaitForActiveShards.HasValue) s.WaitForActiveShards(r.WaitForActiveShards.ToString());
if (request.Routing != null) s.Routing(request.Routing);
if (request.WaitForActiveShards.HasValue) s.WaitForActiveShards(request.WaitForActiveShards.ToString());

return s;
}, _compositeCancelToken)
Expand All @@ -134,29 +134,38 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
if (!response.ApiCall.Success)
return await HandleBulkRequest(buffer, page, backOffRetries, response);

var documentsWithResponse = response.Items.Zip(buffer, Tuple.Create).ToList();
var successfulDocuments = new List<Tuple<IBulkResponseItem, T>>();
var retryableDocuments = new List<T>();
var droppedDocuments = new List<Tuple<IBulkResponseItem, T>>();

HandleDroppedDocuments(documentsWithResponse, response);
foreach (var documentWithResponse in response.Items.Zip(buffer, Tuple.Create))
{
if (documentWithResponse.Item1.IsValid)
successfulDocuments.Add(documentWithResponse);
else
{
if (_retryPredicate(documentWithResponse.Item1, documentWithResponse.Item2))
retryableDocuments.Add(documentWithResponse.Item2);
else
droppedDocuments.Add(documentWithResponse);
}
}

var retryDocuments = documentsWithResponse
.Where(x => !x.Item1.IsValid && _retryPredicate(x.Item1, x.Item2))
.Select(x => x.Item2)
.ToList();
HandleDroppedDocuments(droppedDocuments, response);

if (retryDocuments.Count > 0 && backOffRetries < _backOffRetries)
return await RetryDocuments(page, ++backOffRetries, retryDocuments);
else if (retryDocuments.Count > 0)
if (retryableDocuments.Count > 0 && backOffRetries < _backOffRetries)
return await RetryDocuments(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false);

if (retryableDocuments.Count > 0)
throw ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times");

_partitionedBulkRequest.BackPressure?.Release();
return new BulkAllResponse { Retries = backOffRetries, Page = page };
request.BackPressure?.Release();

return new BulkAllResponse { Retries = backOffRetries, Page = page, Items = response.Items };
}

private void HandleDroppedDocuments(List<Tuple<IBulkResponseItem, T>> documentsWithResponse, IBulkResponse response)
private void HandleDroppedDocuments(List<Tuple<IBulkResponseItem, T>> droppedDocuments, IBulkResponse response)
{
var droppedDocuments = documentsWithResponse
.Where(x => !x.Item1.IsValid && !_retryPredicate(x.Item1, x.Item2))
.ToList();
if (droppedDocuments.Count <= 0) return;

foreach (var dropped in droppedDocuments) _droppedDocumentCallBack(dropped.Item1, dropped.Item2);
Expand Down Expand Up @@ -185,7 +194,7 @@ private async Task<IBulkAllResponse> HandleBulkRequest(IList<T> buffer, long pag
throw ThrowOnBadBulk(response,
$"BulkAll halted after {nameof(PipelineFailure)}{failureReason.GetStringValue()} from _bulk");
default:
return await RetryDocuments(page, ++backOffRetries, buffer);
return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}

Expand Down
10 changes: 9 additions & 1 deletion src/Nest/Document/Multiple/BulkAll/BulkAllResponse.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using Newtonsoft.Json;

namespace Nest
{
Expand All @@ -12,6 +14,9 @@ public interface IBulkAllResponse

/// <summary>The number of back off retries were needed to store this document.</summary>
int Retries { get; }

/// <summary>The items returned from the bulk response</summary>
IReadOnlyCollection<IBulkResponseItem> Items { get; }
}

/// <inheritdoc />
Expand All @@ -26,5 +31,8 @@ public class BulkAllResponse : IBulkAllResponse

/// <inheritdoc />
public int Retries { get; internal set; }

/// <inheritdoc />
public IReadOnlyCollection<IBulkResponseItem> Items { get; internal set; }
}
}
12 changes: 10 additions & 2 deletions src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private void ScrollAll(string index, int size, int numberOfShards, int numberOfD

seenDocuments.Should().Be(numberOfDocuments);
var groups = seenSlices.GroupBy(s => s).ToList();
groups.Count().Should().Be(numberOfShards);
groups.Count.Should().Be(numberOfShards);
groups.Should().OnlyContain(g => g.Count() > 1);
}

Expand All @@ -70,7 +70,15 @@ private void BulkAll(string index, IEnumerable<SmallObject> documents, int size,
.Index(index)
);
//we set up an observer
var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b => Interlocked.Increment(ref seenPages));
var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b =>
{
Interlocked.Increment(ref seenPages);
foreach (var item in b.Items)
{
item.IsValid.Should().BeTrue();
item.Id.Should().NotBeNullOrEmpty();
}
});

droppedDocuments.Take(10).Should().BeEmpty();
bulkObserver.TotalNumberOfFailedBuffers.Should().Be(0, "All buffers are expected to be indexed");
Expand Down