Skip to content

Commit

Permalink
[7.x] [Transform] fix listener for search context missing exception (e…
Browse files Browse the repository at this point in the history
…lastic#75615) (elastic#75619)

Fix a unreleased regression introduced in elastic#74984. In case a pit search context disappeared the listener was called twice and the transform fails.
  • Loading branch information
elasticsearchmachine authored Jul 22, 2021
1 parent da4411e commit 598e497
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ private void doSearch(SearchRequest searchRequest, ActionListener<SearchResponse
searchRequest,
listener
);
return;
}
listener.onFailure(e);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -196,6 +198,29 @@ public void testPitInjection() throws InterruptedException {

indexer.onStop();
assertEquals(0L, client.getPitContextCounter());

this.<SearchResponse>assertAsync(
listener -> indexer.doNextSearch(0, listener),
response -> { assertEquals("the_pit_id+", response.pointInTimeId()); }
);

this.<SearchResponse>assertAsync(
listener -> indexer.doNextSearch(0, listener),
response -> { assertEquals("the_pit_id++", response.pointInTimeId()); }
);

this.<SearchResponse>assertAsync(
listener -> indexer.doNextSearch(0, listener),
response -> { assertEquals("the_pit_id+++", response.pointInTimeId()); }
);

assertEquals(1L, client.getPitContextCounter());

// throws search context missing:
this.<SearchResponse>assertAsync(
listener -> indexer.doNextSearch(0, listener),
response -> { assertNull(response.pointInTimeId()); }
);
}
}

Expand Down Expand Up @@ -356,28 +381,35 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
} else if (request instanceof SearchRequest) {
SearchRequest searchRequest = (SearchRequest) request;

SearchResponse response = new SearchResponse(
new InternalSearchResponse(
new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f),
// Simulate completely null aggs
// throw search context missing for the 4th run
if (searchRequest.pointInTimeBuilder() != null
&& "the_pit_id+++".equals(searchRequest.pointInTimeBuilder().getEncodedId())) {
listener.onFailure(new SearchContextMissingException(new ShardSearchContextId("sc_missing", 42)));
} else {
SearchResponse response = new SearchResponse(
new InternalSearchResponse(
new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f),
// Simulate completely null aggs
null,
new Suggest(Collections.emptyList()),
new SearchProfileShardResults(Collections.emptyMap()),
false,
false,
1
),
null,
new Suggest(Collections.emptyList()),
new SearchProfileShardResults(Collections.emptyMap()),
false,
false,
1
),
null,
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY,
// copy the pit from the request
searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null
);
listener.onResponse((Response) response);
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY,
// copy the pit from the request
searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null
);
listener.onResponse((Response) response);

}
return;
}

Expand All @@ -392,7 +424,10 @@ private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> f
LatchedActionListener<T> listener = new LatchedActionListener<>(ActionListener.wrap(r -> {
assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true));
furtherTests.accept(r);
}, e -> { fail("got unexpected exception: " + e); }), latch);
}, e -> {
assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true));
fail("got unexpected exception: " + e);
}), latch);

function.accept(listener);
assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS));
Expand Down

0 comments on commit 598e497

Please sign in to comment.