Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Update #3

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.4.1-rest-1.0.37</version>
<version>1.4.1-rest-1.0.41</version>
<packaging>jar</packaging>
<description>Elasticsearch - Open Source, Distributed, RESTful Search Engine</description>
<inceptionYear>2009</inceptionYear>
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.http.HttpEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.action.*;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -538,6 +539,16 @@ public RestRequest.Method getMethod() {
return RestRequest.Method.POST;
}

@Override
public ActionRestRequest getActionRestRequest(Version version) {
ActionRestRequest actionRestRequest = super.getActionRestRequest(version);
if (version.id >= Version.V_5_0_0_ID) {
return new BulkRequestV5();
} else {
return actionRestRequest;
}
}

@Override
public HttpEntity getEntity() throws IOException {
//todo add support for streaming version of getRestEntity()
Expand All @@ -549,4 +560,39 @@ public HttpEntity getEntity() throws IOException {
return new NStringEntity(builder.toString(), "UTF-8");
}

private class BulkRequestV5 implements ActionRestRequest {

@Override
public RestRequest.Method getMethod() {
return BulkRequest.this.getMethod();
}

@Override
public String getEndPoint() {
return BulkRequest.this.getEndPoint();
}

@Override
public HttpEntity getEntity() throws IOException {
//todo add support for streaming version of getRestEntity()
StringBuilder builder = new StringBuilder();
for (ActionRequest request : requests) {
ActionRestRequest restRequest = request.getActionRestRequest(Version.V_5_0_0);
String payload = HttpUtils.readUtf8(restRequest.getBulkEntity());
builder.append(payload);
}
return new NStringEntity(builder.toString(), "UTF-8");
}

@Override
public Map<String, String> getParams() {
return BulkRequest.this.getParams();
}

@Override
public HttpEntity getBulkEntity() throws IOException {
return BulkRequest.this.getBulkEntity();
}
}

}
97 changes: 89 additions & 8 deletions src/main/java/org/elasticsearch/action/update/UpdateRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionRestRequest;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
import org.elasticsearch.common.Nullable;
Expand All @@ -41,12 +43,14 @@
import org.elasticsearch.common.util.UriBuilder;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.ScriptFilterParser;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.script.ScriptService;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;

Expand Down Expand Up @@ -206,7 +210,9 @@ public String script() {
return this.script;
}

public ScriptService.ScriptType scriptType() { return this.scriptType; }
public ScriptService.ScriptType scriptType() {
return this.scriptType;
}

public Map<String, Object> scriptParams() {
return this.scriptParams;
Expand Down Expand Up @@ -573,6 +579,7 @@ public UpdateRequest source(byte[] source, int offset, int length) throws Except

/**
* Should this update attempt to detect if it is a noop?
*
* @return this for chaining
*/
public UpdateRequest detectNoop(boolean detectNoop) {
Expand Down Expand Up @@ -632,15 +639,15 @@ public boolean docAsUpsert() {
public void docAsUpsert(boolean shouldUpsertDoc) {
this.docAsUpsert = shouldUpsertDoc;
}
public boolean scriptedUpsert(){

public boolean scriptedUpsert() {
return this.scriptedUpsert;
}

public void scriptedUpsert(boolean scriptedUpsert) {
this.scriptedUpsert = scriptedUpsert;
}


@Override
public void readFrom(StreamInput in) throws IOException {
Expand All @@ -651,7 +658,7 @@ public void readFrom(StreamInput in) throws IOException {
id = in.readString();
routing = in.readOptionalString();
script = in.readOptionalString();
if(Strings.hasLength(script)) {
if (Strings.hasLength(script)) {
if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
scriptType = ScriptService.ScriptType.readFrom(in);
} else {
Expand Down Expand Up @@ -745,6 +752,16 @@ public String getEndPoint() {
return UriBuilder.newBuilder().slash(index(), type(), id(), "_update").build();
}

@Override
public ActionRestRequest getActionRestRequest(Version version) {
ActionRestRequest actionRestRequest = super.getActionRestRequest(version);
if (version.id >= Version.V_5_0_0_ID) {
return new UpdateRequestV5();
} else {
return actionRestRequest;
}
}

@Override
public RestRequest.Method getMethod() {
return RestRequest.Method.POST;
Expand Down Expand Up @@ -783,8 +800,7 @@ private Map<String, Object> getPayload() {
if (this.detectNoop) {
payload.put("detect_noop", Boolean.TRUE);
}
}
else if (Strings.hasLength(script)) {
} else if (Strings.hasLength(script)) {
payload.putIfNotNull("lang", this.scriptLang);
payload.putIf("scripted_upsert", Boolean.TRUE, this.scriptedUpsert);
payload.put("script", this.script);
Expand All @@ -799,4 +815,69 @@ else if (Strings.hasLength(script)) {
}
return payload.map();
}

private class UpdateRequestV5 implements ActionRestRequest {

public RestRequest.Method getMethod() {
return UpdateRequest.this.getMethod();
}

public String getEndPoint() {
return UpdateRequest.this.getEndPoint();
}

@Override
public Map<String, String> getParams() {
return UpdateRequest.this.getParams();
}

@Override
public HttpEntity getEntity() throws IOException {
Map<String, Object> payload = getPayload();
String json = XContentHelper.convertToJson(payload, false);
return new NStringEntity(json, StandardCharsets.UTF_8);

}

private Map<String, Object> getPayload() {
MapBuilder<String, Object> payload = MapBuilder.newMapBuilder();
if (UpdateRequest.this.doc != null) {
payload.put("doc", UpdateRequest.this.doc.sourceAsMap());
if (UpdateRequest.this.docAsUpsert) {
payload.put("doc_as_upsert", Boolean.TRUE);
}
if (UpdateRequest.this.detectNoop) {
payload.put("detect_noop", Boolean.TRUE);
}
} else if (Strings.hasLength(script)) {
Map<String, Object> scriptObj = new LinkedHashMap<>();
scriptObj.put(UpdateRequest.this.scriptType.name().toLowerCase(Locale.ROOT), script);
scriptObj.put("lang", scriptLang);
scriptObj.put("params", scriptParams);
payload.put("script", scriptObj);
}
if (UpdateRequest.this.upsertRequest != null) {
payload.put("upsert", UpdateRequest.this.upsertRequest.sourceAsMap());
}

if (payload.isEmpty()) {
throw new IllegalStateException("Nothing to update. No doc, script or upsert provided");
}
return payload.map();
}

public HttpEntity getBulkEntity() throws IOException {
Map<String, Object> payload = Maps.newLinkedHashMap();
Map<String, Object> actionMetadata = Maps.newLinkedHashMap();
actionMetadata.put("_index", index);
actionMetadata.put("_type", type);
actionMetadata.put("_id", id);
payload.put(BULK_TYPE, actionMetadata);
String json = XContentHelper.convertToJson(payload, false);

String payloadJson = XContentHelper.convertToJson(getPayload(), false);
String fullPayload = Strings.join(json, "\n", payloadJson, "\n");
return new NStringEntity(fullPayload, StandardCharsets.UTF_8);
}
}
}
47 changes: 45 additions & 2 deletions src/main/java/org/elasticsearch/index/query/BoolFilterBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@

package org.elasticsearch.index.query;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.elasticsearch.Version;
import org.elasticsearch.common.xcontent.ToXContentUtils;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -130,15 +137,19 @@ public BoolFilterBuilder cacheKey(String cacheKey) {
@Override
protected void doXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("bool");
doXContentInternal(builder, params);
addCacheToQuery(cacheKey, cache, builder, params);
builder.endObject();
}

private void doXContentInternal(XContentBuilder builder, Params params) throws IOException {
doXArrayContent("must", mustClauses, builder, params);
doXArrayContent("must_not", mustNotClauses, builder, params);
doXArrayContent("should", shouldClauses, builder, params);

if (filterName != null) {
builder.field("_name", filterName);
}
addCacheToQuery(cacheKey, cache, builder, params);
builder.endObject();
}

private void doXArrayContent(String field, List<FilterBuilder> clauses, XContentBuilder builder, Params params) throws IOException {
Expand All @@ -156,4 +167,36 @@ private void doXArrayContent(String field, List<FilterBuilder> clauses, XContent
builder.endArray();
}
}

private String generateCacheKey() throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE);
builder.startObject("bool");
doXContentInternal(builder, EMPTY_PARAMS);
builder.endObject();
return DigestUtils.sha512Hex(builder.bytes().streamInput());
}

@Override
protected void addCacheToQuery(String cacheKey, Boolean cache, XContentBuilder builder, Params params) throws IOException {
if (ToXContentUtils.getVersionFromParams(params).onOrAfter(Version.V_5_0_0)) {
if (BooleanUtils.isTrue(cache)) {
if (cacheKey != null) {
builder.field("_cache_key", cacheKey);
builder.field("_cache_any", cacheKey);
} else {
builder.field("_cache_any", generateCacheKey());
}
}

builder.field("_cache_sha", generateCacheKey());
return;
}

if (cache != null) {
builder.field("_cache", cache);
}
if (cacheKey != null) {
builder.field("_cache_key", cacheKey);
}
}
}