Skip to content

Commit

Permalink
HLRC: migration api - upgrade (elastic#34898)
Browse files Browse the repository at this point in the history
Implement high level client for migration upgrade API. It should wrap
RestHighLevelClient and expose high level IndexUpgradeRequest (new),
IndexTaskResponse for submissions with wait_for_completion=false and
BulkByScrollResponse (already used) objects.

refers: elastic#29827
  • Loading branch information
pgomulka committed Nov 14, 2018
1 parent 1209d25 commit 88b7250
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
import org.elasticsearch.client.migration.IndexUpgradeInfoResponse;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.client.migration.IndexUpgradeRequest;


import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -52,4 +57,19 @@ public IndexUpgradeInfoResponse getAssistance(IndexUpgradeInfoRequest request, R
return restHighLevelClient.performRequestAndParseEntity(request, MigrationRequestConverters::getMigrationAssistance, options,
IndexUpgradeInfoResponse::fromXContent, Collections.emptySet());
}

public BulkByScrollResponse upgrade(IndexUpgradeRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, MigrationRequestConverters::migrate, options,
BulkByScrollResponse::fromXContent, Collections.emptySet());
}

public TaskSubmissionResponse submitUpgradeTask(IndexUpgradeRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, MigrationRequestConverters::submitMigrateTask, options,
TaskSubmissionResponse::fromXContent, Collections.emptySet());
}

public void upgradeAsync(IndexUpgradeRequest request, RequestOptions options, ActionListener<BulkByScrollResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request, MigrationRequestConverters::migrate, options,
BulkByScrollResponse::fromXContent, listener, Collections.emptySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
package org.elasticsearch.client;

import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
import org.elasticsearch.client.migration.IndexUpgradeRequest;

final class MigrationRequestConverters {

private MigrationRequestConverters() {}
private MigrationRequestConverters() {
}

static Request getMigrationAssistance(IndexUpgradeInfoRequest indexUpgradeInfoRequest) {
RequestConverters.EndpointBuilder endpointBuilder = new RequestConverters.EndpointBuilder()
Expand All @@ -36,4 +39,26 @@ static Request getMigrationAssistance(IndexUpgradeInfoRequest indexUpgradeInfoRe
parameters.withIndicesOptions(indexUpgradeInfoRequest.indicesOptions());
return request;
}

static Request migrate(IndexUpgradeRequest indexUpgradeRequest) {
return prepareMigrateRequest(indexUpgradeRequest, true);
}

static Request submitMigrateTask(IndexUpgradeRequest indexUpgradeRequest) {
return prepareMigrateRequest(indexUpgradeRequest, false);
}

private static Request prepareMigrateRequest(IndexUpgradeRequest indexUpgradeRequest, boolean waitForCompletion) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack", "migration", "upgrade")
.addPathPart(indexUpgradeRequest.index())
.build();

Request request = new Request(HttpPost.METHOD_NAME, endpoint);

RequestConverters.Params params = new RequestConverters.Params(request)
.withWaitForCompletion(waitForCompletion);

return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import java.util.Arrays;
import java.util.Objects;

/**
* A request for retrieving upgrade information
* Part of Migration API
*/
public class IndexUpgradeInfoRequest extends TimedRequest implements IndicesRequest.Replaceable {

private String[] indices = Strings.EMPTY_ARRAY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;

/**
* Response object that contains information about indices to be upgraded
*/
public class IndexUpgradeInfoResponse {

private static final ParseField INDICES = new ParseField("indices");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.migration;

import org.elasticsearch.client.Validatable;

import java.util.Objects;

/**
* A request for performing Upgrade on Index
* Part of Migration API
*/
public class IndexUpgradeRequest implements Validatable {

private String index;

public IndexUpgradeRequest(String index) {
this.index = index;
}

public String index() {
return index;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IndexUpgradeRequest request = (IndexUpgradeRequest) o;
return Objects.equals(index, request.index);
}

@Override
public int hashCode() {
return Objects.hash(index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,73 @@

package org.elasticsearch.client;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
import org.elasticsearch.client.migration.IndexUpgradeInfoResponse;
import org.elasticsearch.client.migration.IndexUpgradeRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.settings.Settings;

import java.io.IOException;
import java.util.function.BooleanSupplier;

import static org.hamcrest.Matchers.containsString;

public class MigrationIT extends ESRestHighLevelClientTestCase {

public void testGetAssistance() throws IOException {
RestHighLevelClient client = highLevelClient();
{
IndexUpgradeInfoResponse response = client.migration().getAssistance(new IndexUpgradeInfoRequest(), RequestOptions.DEFAULT);
IndexUpgradeInfoResponse response = highLevelClient().migration()
.getAssistance(new IndexUpgradeInfoRequest(), RequestOptions.DEFAULT);
assertEquals(0, response.getActions().size());
}
{
client.indices().create(new CreateIndexRequest("test"), RequestOptions.DEFAULT);
IndexUpgradeInfoResponse response = client.migration().getAssistance(
createIndex("test", Settings.EMPTY);
IndexUpgradeInfoResponse response = highLevelClient().migration().getAssistance(
new IndexUpgradeInfoRequest("test"), RequestOptions.DEFAULT);
assertEquals(0, response.getActions().size());
}
}

public void testUpgradeWhenIndexCannotBeUpgraded() throws IOException {
createIndex("test", Settings.EMPTY);

ThrowingRunnable execute = () -> execute(new IndexUpgradeRequest("test"),
highLevelClient().migration()::upgrade,
highLevelClient().migration()::upgradeAsync);

ElasticsearchStatusException responseException = expectThrows(ElasticsearchStatusException.class, execute);

assertThat(responseException.getDetailedMessage(), containsString("cannot be upgraded"));
}

public void testUpgradeWithTaskApi() throws IOException, InterruptedException {
createIndex("test", Settings.EMPTY);

IndexUpgradeRequest request = new IndexUpgradeRequest("test");

TaskSubmissionResponse upgrade = highLevelClient().migration()
.submitUpgradeTask(request, RequestOptions.DEFAULT);

assertNotNull(upgrade.getTask());

BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(upgrade);
awaitBusy(hasUpgradeCompleted);
}

/**
* Using low-level api as high-level-rest-client's getTaskById work is in progress.
* TODO revisit once that work is finished
*/
private BooleanSupplier checkCompletionStatus(TaskSubmissionResponse upgrade) {
return () -> {
try {
Response response = client().performRequest(new Request("GET", "/_tasks/" + upgrade.getTask()));
return (boolean) entityAsMap(response).get("completed");
} catch (IOException e) {
fail(e.getMessage());
return false;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.elasticsearch.client;

import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
import org.elasticsearch.client.migration.IndexUpgradeRequest;
import org.elasticsearch.test.ESTestCase;

import java.util.HashMap;
Expand All @@ -45,4 +47,20 @@ public void testGetMigrationAssistance() {
assertNull(request.getEntity());
assertEquals(expectedParams, request.getParameters());
}

public void testUpgradeRequest() {
String[] indices = RequestConvertersTests.randomIndicesNames(1, 1);
IndexUpgradeRequest upgradeInfoRequest = new IndexUpgradeRequest(indices[0]);

String expectedEndpoint = "/_xpack/migration/upgrade/" + indices[0];
Map<String, String> expectedParams = new HashMap<>();
expectedParams.put("wait_for_completion", Boolean.TRUE.toString());

Request request = MigrationRequestConverters.migrate(upgradeInfoRequest);

assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(expectedEndpoint, request.getEndpoint());
assertNull(request.getEntity());
assertEquals(expectedParams, request.getParameters());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,30 @@

package org.elasticsearch.client.documentation;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
import org.elasticsearch.client.migration.IndexUpgradeInfoResponse;
import org.elasticsearch.client.migration.IndexUpgradeRequest;
import org.elasticsearch.client.migration.UpgradeActionRequired;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.BulkByScrollResponse;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.isEmptyOrNullString;
import static org.hamcrest.Matchers.not;

/**
* This class is used to generate the Java Migration API documentation.
Expand Down Expand Up @@ -80,4 +93,66 @@ public void testGetAssistance() throws IOException {
}
// end::get-assistance-response
}

public void testUpgrade() throws IOException {

RestHighLevelClient client = highLevelClient();
createIndex("test", Settings.EMPTY);


// tag::upgrade-request
IndexUpgradeRequest request = new IndexUpgradeRequest("test"); // <1>
// end::upgrade-request

try {

// tag::upgrade-execute
BulkByScrollResponse response = client.migration().upgrade(request, RequestOptions.DEFAULT);
// end::upgrade-execute

} catch (ElasticsearchStatusException e) {
assertThat(e.getMessage(), containsString("cannot be upgraded"));
}
}

public void testUpgradeAsync() throws IOException, InterruptedException {
RestHighLevelClient client = highLevelClient();
createIndex("test", Settings.EMPTY);
final CountDownLatch latch = new CountDownLatch(1);

// tag::upgrade-async-listener
ActionListener<BulkByScrollResponse> listener = new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkResponse) {
// <1>
}

@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::upgrade-async-listener

listener = new LatchedActionListener<>(listener, latch);

// tag::upgrade-async-execute
client.migration().upgradeAsync(new IndexUpgradeRequest("test"), RequestOptions.DEFAULT, listener); // <1>
// end::upgrade-async-execute

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}

public void testUpgradeWithTaskApi() throws IOException {
createIndex("test", Settings.EMPTY);
RestHighLevelClient client = highLevelClient();
// tag::upgrade-task-api
IndexUpgradeRequest request = new IndexUpgradeRequest("test");

TaskSubmissionResponse response = client.migration()
.submitUpgradeTask(request, RequestOptions.DEFAULT);
String taskId = response.getTask();
// end::upgrade-task-api
assertThat(taskId, not(isEmptyOrNullString()));
}
}
Loading

0 comments on commit 88b7250

Please sign in to comment.