Skip to content

Commit

Permalink
[ML-DataFrame] fix wire serialization issues in data frame response o…
Browse files Browse the repository at this point in the history
…bjects (#39790)

fix wire serialization issues in data frame response objects
  • Loading branch information
Hendrik Muhs authored Mar 7, 2019
1 parent 78c27b3 commit 4cab8ec
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Request(String id) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
}

public Request() {
private Request() {
}

public Request(StreamInput in) throws IOException {
Expand Down Expand Up @@ -113,7 +113,7 @@ protected RequestBuilder(ElasticsearchClient client, DeleteDataFrameTransformAct
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean acknowledged;
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
super(in);
readFrom(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Request(String id) {
}
}

public Request() {}
private Request() {}

public Request(StreamInput in) throws IOException {
super(in);
Expand Down Expand Up @@ -149,7 +149,7 @@ public Response() {
}

public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
super(in);
readFrom(in);
}

Expand All @@ -173,6 +173,7 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
List<String> invalidTransforms = new ArrayList<>();
builder.startObject();
toXContentCommon(builder, params);
builder.field(DataFrameField.COUNT.getPreferredName(), transformConfigurations.size());
// XContentBuilder does not support passing the params object for Iterables
builder.field(DataFrameField.TRANSFORMS.getPreferredName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Request(String id) {
}
}

public Request() {}
private Request() {}

public Request(StreamInput in) throws IOException {
super(in);
Expand Down Expand Up @@ -138,7 +138,7 @@ public Response() {
}

public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
super(in);
readFrom(in);
}

Expand All @@ -161,6 +161,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toXContentCommon(builder, params);
builder.field(DataFrameField.COUNT.getPreferredName(), transformsStateAndStats.size());
builder.field(DataFrameField.TRANSFORMS.getPreferredName(), transformsStateAndStats);
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Request(String id) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
}

public Request() {
private Request() {
}

public Request(StreamInput in) throws IOException {
Expand Down Expand Up @@ -108,7 +108,7 @@ public Response() {
}

public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
super(in);
readFrom(in);
}

Expand Down Expand Up @@ -136,6 +136,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toXContentCommon(builder, params);
builder.field("started", started);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Request(String id, boolean waitForCompletion, @Nullable TimeValue timeout
this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout);
}

public Request() {
private Request() {
this(null, false, null);
}

Expand Down Expand Up @@ -149,7 +149,7 @@ public Response() {
}

public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
super(in);
readFrom(in);
}

Expand Down Expand Up @@ -177,6 +177,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toXContentCommon(builder, params);
builder.field("stopped", stopped);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe.action;

import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.junit.Before;

import static java.util.Collections.emptyList;

public abstract class AbstractWireSerializingDataFrameTestCase<T extends Writeable> extends AbstractWireSerializingTestCase<T> {
/**
* Test case that ensures aggregation named objects are registered
*/
private NamedWriteableRegistry namedWriteableRegistry;
private NamedXContentRegistry namedXContentRegistry;

@Before
public void registerAggregationNamedObjects() throws Exception {
// register aggregations as NamedWriteable
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());

namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables());
namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents());
}

@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return namedWriteableRegistry;
}

@Override
protected NamedXContentRegistry xContentRegistry() {
return namedXContentRegistry;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe.action;

import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Response;

public class DeleteDataFrameTransformActionResponseTests extends AbstractWireSerializingDataFrameTestCase<Response> {
@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}

@Override
protected Reader<Response> instanceReader() {
return Response::new;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@

package org.elasticsearch.xpack.core.dataframe.action;

import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.watcher.watch.Payload.XContent;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.watcher.watch.Payload.XContent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class GetDataFrameTransformsActionResponseTests extends ESTestCase {
public class GetDataFrameTransformsActionResponseTests extends AbstractWireSerializingDataFrameTestCase<Response> {

public void testInvalidTransforms() throws IOException {
List<DataFrameTransformConfig> transforms = new ArrayList<>();
Expand Down Expand Up @@ -66,4 +66,19 @@ public void testNoHeaderInResponse() throws IOException {
assertEquals(null, XContentMapValues.extractValue("headers", transformsResponse.get(i)));
}
}

@Override
protected Response createTestInstance() {
List<DataFrameTransformConfig> configs = new ArrayList<>();
for (int i = 0; i < randomInt(10); ++i) {
configs.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig());
}

return new Response(configs);
}

@Override
protected Reader<Response> instanceReader() {
return Response::new;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe.action;

import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests;

import java.util.ArrayList;
import java.util.List;

public class GetDataFrameTransformsStatsActionResponseTests extends AbstractWireSerializingDataFrameTestCase<Response> {
@Override
protected Response createTestInstance() {
List<DataFrameTransformStateAndStats> stats = new ArrayList<>();
for (int i = 0; i < randomInt(10); ++i) {
stats.add(DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats());
}

return new Response(stats);
}

@Override
protected Reader<Response> instanceReader() {
return Response::new;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe.action;

import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction.Response;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PreviewDataFrameTransformsActionResponseWireTests extends AbstractWireSerializingDataFrameTestCase<Response> {

@Override
protected Response createTestInstance() {
int size = randomIntBetween(0, 10);
List<Map<String, Object>> data = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Map<String, Object> datum = new HashMap<>();
Map<String, Object> entry = new HashMap<>();
entry.put("value1", randomIntBetween(1, 100));
datum.put(randomAlphaOfLength(10), entry);
data.add(datum);
}
return new Response(data);
}

@Override
protected Reader<Response> instanceReader() {
return Response::new;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe.action;

import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Response;

public class PutDataFrameTransformActionResponseTests extends AbstractStreamableTestCase<Response> {

@Override
protected Response createBlankInstance() {
return new Response();
}

@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction.Request;

public class StartDataFrameTransformActionTests extends AbstractWireSerializingTestCase<Request> {
public class StartDataFrameTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 20));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe.action;

import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction.Response;

public class StartDataFrameTransformActionResponseTests extends AbstractWireSerializingDataFrameTestCase<Response> {

@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}

@Override
protected Reader<Response> instanceReader() {
return Response::new;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe.action;

import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction.Response;

public class StopDataFrameTransformActionResponseTests extends AbstractWireSerializingDataFrameTestCase<Response> {

@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}

@Override
protected Reader<Response> instanceReader() {
return Response::new;
}

}

0 comments on commit 4cab8ec

Please sign in to comment.