Skip to content

Commit

Permalink
Remove type field from DocWriteRequest and associated Response objects (
Browse files Browse the repository at this point in the history
elastic#47671)

This commit removes the type field from index, update and delete requests, and their
associated responses.

Relates to elastic#41059
  • Loading branch information
romseygeek authored and howardhuanghua committed Oct 14, 2019
1 parent 5679b7c commit 997c21c
Show file tree
Hide file tree
Showing 251 changed files with 704 additions and 3,105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

private static class BulkRestBuilderListener extends RestBuilderListener<BulkRequest> {
private final BulkItemResponse ITEM_RESPONSE = new BulkItemResponse(1, DocWriteRequest.OpType.UPDATE,
new UpdateResponse(new ShardId("mock", "", 1), "mock_type", "1", 0L, 1L, 1L, DocWriteResponse.Result.CREATED));
new UpdateResponse(new ShardId("mock", "", 1), "1", 0L, 1L, 1L, DocWriteResponse.Result.CREATED));

private final RestRequest request;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

public class TransportNoopBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
private static final BulkItemResponse ITEM_RESPONSE = new BulkItemResponse(1, DocWriteRequest.OpType.UPDATE,
new UpdateResponse(new ShardId("mock", "", 1), "mock_type", "1", 0L, 1L, 1L, DocWriteResponse.Result.CREATED));
new UpdateResponse(new ShardId("mock", "", 1), "1", 0L, 1L, 1L, DocWriteResponse.Result.CREATED));

@Inject
public TransportNoopBulkAction(TransportService transportService, ActionFilters actionFilters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.rankeval.RankEvalRequest;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
Expand Down Expand Up @@ -102,7 +101,7 @@ private RequestConverters() {
}

static Request delete(DeleteRequest deleteRequest) {
String endpoint = endpoint(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
String endpoint = endpoint(deleteRequest.index(), deleteRequest.id());
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);

Params parameters = new Params();
Expand Down Expand Up @@ -170,11 +169,6 @@ static Request bulk(BulkRequest bulkRequest) throws IOException {
if (Strings.hasLength(action.index())) {
metadata.field("_index", action.index());
}
if (Strings.hasLength(action.type())) {
if (MapperService.SINGLE_MAPPING_NAME.equals(action.type()) == false) {
metadata.field("_type", action.type());
}
}
if (Strings.hasLength(action.id())) {
metadata.field("_id", action.id());
}
Expand Down Expand Up @@ -309,11 +303,9 @@ static Request index(IndexRequest indexRequest) {

String endpoint;
if (indexRequest.opType() == DocWriteRequest.OpType.CREATE) {
endpoint = indexRequest.type().equals(MapperService.SINGLE_MAPPING_NAME)
? endpoint(indexRequest.index(), "_create", indexRequest.id())
: endpoint(indexRequest.index(), indexRequest.type(), indexRequest.id(), "_create");
endpoint = endpoint(indexRequest.index(), "_create", indexRequest.id());
} else {
endpoint = endpoint(indexRequest.index(), indexRequest.type(), indexRequest.id());
endpoint = endpoint(indexRequest.index(), indexRequest.id());
}

Request request = new Request(method, endpoint);
Expand Down Expand Up @@ -341,9 +333,7 @@ static Request ping() {
}

static Request update(UpdateRequest updateRequest) throws IOException {
String endpoint = updateRequest.type().equals(MapperService.SINGLE_MAPPING_NAME)
? endpoint(updateRequest.index(), "_update", updateRequest.id())
: endpoint(updateRequest.index(), updateRequest.type(), updateRequest.id(), "_update");
String endpoint = endpoint(updateRequest.index(), "_update", updateRequest.id());
Request request = new Request(HttpPost.METHOD_NAME, endpoint);

Params parameters = new Params();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,6 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
{
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
//Check that untyped document additions inherit the global type
String localType = null;
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
Expand All @@ -331,7 +329,7 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
.setGlobalPipeline("pipeline_id")
.build()) {

indexDocs(processor, numDocs, null, localType, "test", "pipeline_id");
indexDocs(processor, numDocs, null, "test", "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
Expand All @@ -356,33 +354,29 @@ private Matcher<SearchHit>[] expectedIds(int numDocs) {
.<Matcher<SearchHit>>toArray(Matcher[]::new);
}

private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex,
String globalIndex, String globalPipeline) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
processor.add(new IndexRequest(localIndex, localType, Integer.toString(i))
processor.add(new IndexRequest(localIndex).id(Integer.toString(i))
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
} else {
BytesArray data = bytesBulkRequest(localIndex, localType, i);
BytesArray data = bytesBulkRequest(localIndex, i);
processor.add(data, globalIndex, globalPipeline, XContentType.JSON);
}
multiGetRequest.add(localIndex, Integer.toString(i));
}
return multiGetRequest;
}

private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException {
private static BytesArray bytesBulkRequest(String localIndex, int id) throws IOException {
XContentBuilder action = jsonBuilder().startObject().startObject("index");

if (localIndex != null) {
action.field("_index", localIndex);
}

if (localType != null) {
action.field("_type", localType);
}

action.field("_id", Integer.toString(id));
action.endObject().endObject();

Expand All @@ -396,7 +390,7 @@ private static BytesArray bytesBulkRequest(String localIndex, String localType,
}

private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
return indexDocs(processor, numDocs, "test", null, null, null);
return indexDocs(processor, numDocs, "test", null, null);
}

private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testIndexFollowing() throws Exception {
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
assertThat(putFollowResponse.isIndexFollowingStarted(), is(true));

IndexRequest indexRequest = new IndexRequest("leader", "_doc")
IndexRequest indexRequest = new IndexRequest("leader")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source("{}", XContentType.JSON);
highLevelClient().index(indexRequest, RequestOptions.DEFAULT);
Expand Down
Loading

0 comments on commit 997c21c

Please sign in to comment.