Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BulkIngester close() does not wait for listener to finish #559

Open
frank-montyne opened this issue Apr 21, 2023 · 3 comments
Open

BulkIngester close() does not wait for listener to finish #559

frank-montyne opened this issue Apr 21, 2023 · 3 comments

Comments

@frank-montyne
Copy link

Java API client version

7.17.9 and 7.17.10-SNAPSHOT

Java version

java 19

Elasticsearch Version

7.17.9

Problem description

When using the BulkIngester with a BulkListener then calling the close() on the BulkIngester returns before all afterBulk() BulkListener callbacks are finished. Below is a snippet of code that uses the BulkIngester. If you simply add a few logging statements after the close() and in the afterBulk() you will see that the afterBulk() callbacks are still busy after the close() returns.
That should not be the case. On return of the close() call the bulk should have been handled completely.

Thanks for looking into the problem.

``

private void handleBulk(Collection<E> events) {
	long start = DateUtils.nowMillis();

	// Create bulk listener.
	BulkListener<String> bulkListener = new BulkListener<String>() {
		@Override
		public void beforeBulk(long executionId, BulkRequest bulkRequest, List<String> contexts) {
		}

		@Override
		public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, BulkResponse bulkResponse) {
			for (BulkResponseItem bulkResponseItem : bulkResponse.items()) {

				if (bulkResponseItem.error() != null) {
					logger.error("Event with id '%s' has error %s".formatted(bulkResponseItem.id(), bulkResponseItem.error()));
				}
			}
			// Check for errors.
			if (bulkResponse.errors()) {
				logger.error("Bulk processing of %d events has some failures".formatted(events.size()));
			}
			else {
				logger.info("Bulk processed %d events in %d ms".formatted(events.size(), DateUtils.diffMillis(start)));
			}
		}

		@Override
		public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, Throwable failure) {
			// Since all event processing failed we can skip adding the specific event indexes to the set of indexes to refresh after the bulk request is completed if necessary.

			logger.error("Bulk processing %d events failed completely. %s".formatted(events.size(), ExceptionUtils.getMessage(failure)));
		}
	};

	// Create bulk ingester.
	BulkIngester<String> bulkIngester = BulkIngester.of(ingesterBuilder ->
		ingesterBuilder
			.client(elasticSearch.getESClient())
			// Accumulate one next batch while processing the current batch. Do not set to 0, the bulkIngester.flush() will hang, filed bug, not fixed yet in 7.17.10-SNAPSHOT!
			.maxConcurrentRequests(1)
			// Process current batch after each 10.000 operations added.
			.maxOperations(10000)
			// Or process current batch after 5 MB of data was added.
			.maxSize(5 * 1024 * 1024)
			// Or process current batch when 1 seconds have elapsed.
			.flushInterval(1, TimeUnit.SECONDS)
			.globalSettings(gsBuilder ->
				gsBuilder
					.waitForActiveShards(asBuilder -> asBuilder.count(1))
					.refresh(Refresh.False)
			.listener(bulkListener)
		);

	try {
		// Add events to bulk ingester.
		for (E event : events) {
			switch (event.action()) {
				case create:
					bulkIngester.add(new CreateOperation.Builder<BinaryData>()
							.index(event.esIndex())
							.id(event.id())
							.document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8), ContentType.APPLICATION_JSON))
							.build()
							._toBulkOperation());
					break;

				case update:
              // Full update of document.
					bulkIngester.add(new IndexOperation.Builder<BinaryData>()
							.index(event.esIndex())
							.id(event.id())
							.document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8), ContentType.APPLICATION_JSON))
							.build()
							._toBulkOperation());
					break;

				case purge:
					// Real physical delete of document.
					bulkIngester.add(new DeleteOperation.Builder()
							.index(event.esIndex())
							.id(event.id())
							.build()
							._toBulkOperation());
					break;

				default:
					// Should not get here. Log anyway.
					logger.error(String.format("Skipped event with unsupported action '%s' -> %s", event.action().name(), event.toJson()));
					break;
			}
		}
     // ElasticSearch bug: the call to close does not wait until the listener has handled the afterBulk() callback.
     //
		bulkIngester.close();
	}
	catch (Exception e) {
		logger.error("Failed to process %d events. %s".formatted(events.size(), e.getMessage()));
	}
}
@frank-montyne
Copy link
Author

@swallez any ideas on this one?

@itajun
Copy link

itajun commented Mar 28, 2024

+1
flush() should wait for the listeners as well, as it was the case with the previous implementation of the BulkProcessor.

I have a scenario where I need to use the BulkIngester to handle retries, metrics, etc., but I want to have control over when the request is sent so that I can rollback the distributed operation in case of failure. My idea was to just set the maximum size/number of requests to Long.MAX_VALUE and call flush() when certain conditions were met relevant to my use case.

The problem is that there is no way to know if the operation succeeded or not, since the listener's afterBulk calls happen after things are considered "complete", differently to the way the BulkProcessor worked in previous versions.

...
        if (exec != null) {
            // A request was actually sent
            exec.futureResponse.handle((resp, thr) -> {

                sendRequestCondition.signalIfReadyAfter(() -> {
                    requestsInFlightCount--;
                    closeCondition.signalAllIfReady();
                });

// Problem between the above ^ and below \/

                if (resp != null) {
                    // Success
                    if (listener != null) {
                        listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
                    }
...

I have no way to synchronise the call on my scenario, where I want to throw an exception synchronously when flush() has any errors in the response. If I try to check the number of requestsInFlight/Pending, there may be a race condition where it gets to zero before the callbacks are called.

If we could return the exec.id on flush(), we could wait until it was sent to afterBulk and sync things up, but it would still be an ugly workaround that wasn't required in the BulkProcessor.

@fabriziofortino
Copy link
Contributor

This might be fixed with #867

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants