diff --git a/CHANGELOG.md b/CHANGELOG.md index 1553d38be7..5044681fa8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ Current ------- ### Added: +- [Implement DruidPartialDataResponseProcessor](https://github.com/yahoo/fili/pull/275) + * Add `FullResponseProcessor` interface that extends `ResponseProcessor` + * Add response status code to JSON response + * Add `DruidPartialDataResponseProcessor` that checks for any missing data that's not being found + - [Add `DataSourceName` concept, removing responsibility from `TableName`](https://github.com/yahoo/fili/pull/263) * `TableName` was serving double-duty, and it was causing problems and confusion. Splitting the concepts fixes it. diff --git a/fili-core/src/main/java/com/yahoo/bard/webservice/druid/client/HttpErrorCallback.java b/fili-core/src/main/java/com/yahoo/bard/webservice/druid/client/HttpErrorCallback.java index 82de7b7b9b..8a145452fa 100644 --- a/fili-core/src/main/java/com/yahoo/bard/webservice/druid/client/HttpErrorCallback.java +++ b/fili-core/src/main/java/com/yahoo/bard/webservice/druid/client/HttpErrorCallback.java @@ -14,18 +14,18 @@ public interface HttpErrorCallback { /** * Invoke the error callback code. * + * @param statusCode Http status code of the error response * @param reasonPhrase The reason for the error. Often the status code description. * @param responseBody The body of the error response - * @param statusCode Http status code of the error response */ void invoke(int statusCode, String reasonPhrase, String responseBody); /** * Stop the request timer, start the response timer, and then invoke the error callback code. * + * @param statusCode Http status code of the error response * @param reasonPhrase The reason for the error. Often the status code description. * @param responseBody The body of the error response - * @param statusCode Http status code of the error response */ default void dispatch(int statusCode, String reasonPhrase, String responseBody) { RequestLog.stopTiming(REQUEST_WORKFLOW_TIMER); diff --git a/fili-core/src/main/java/com/yahoo/bard/webservice/druid/client/impl/HeaderNestingJsonBuilderStrategy.java b/fili-core/src/main/java/com/yahoo/bard/webservice/druid/client/impl/HeaderNestingJsonBuilderStrategy.java index 651fe58016..3411a5dd3a 100644 --- a/fili-core/src/main/java/com/yahoo/bard/webservice/druid/client/impl/HeaderNestingJsonBuilderStrategy.java +++ b/fili-core/src/main/java/com/yahoo/bard/webservice/druid/client/impl/HeaderNestingJsonBuilderStrategy.java @@ -2,6 +2,8 @@ // Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms. package com.yahoo.bard.webservice.druid.client.impl; +import com.yahoo.bard.webservice.web.responseprocessors.DruidJsonResponseContentKeys; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.MappingJsonFactory; import com.fasterxml.jackson.databind.node.JsonNodeFactory; @@ -30,15 +32,22 @@ public HeaderNestingJsonBuilderStrategy(Function baseStrateg @Override public JsonNode apply(Response response) { + MappingJsonFactory mappingJsonFactory = new MappingJsonFactory(); ObjectNode objectNode = JsonNodeFactory.instance.objectNode(); - objectNode.set("response", baseStrategy.apply(response)); + objectNode.set(DruidJsonResponseContentKeys.RESPONSE.getName(), baseStrategy.apply(response)); try { objectNode.set( - "X-Druid-Response-Context", - new MappingJsonFactory() - .createParser(response.getHeader("X-Druid-Response-Context")) + DruidJsonResponseContentKeys.DRUID_RESPONSE_CONTEXT.getName(), + mappingJsonFactory + .createParser( + response.getHeader(DruidJsonResponseContentKeys.DRUID_RESPONSE_CONTEXT.getName()) + ) .readValueAsTree() ); + objectNode.set( + DruidJsonResponseContentKeys.STATUS_CODE.getName(), + mappingJsonFactory.createParser(String.valueOf(response.getStatusCode())).readValueAsTree() + ); } catch (IOException ioe) { throw new IllegalStateException(ioe); } diff --git a/fili-core/src/main/java/com/yahoo/bard/webservice/web/ErrorMessageFormat.java b/fili-core/src/main/java/com/yahoo/bard/webservice/web/ErrorMessageFormat.java index e791bfc1d9..f227ca2e77 100644 --- a/fili-core/src/main/java/com/yahoo/bard/webservice/web/ErrorMessageFormat.java +++ b/fili-core/src/main/java/com/yahoo/bard/webservice/web/ErrorMessageFormat.java @@ -216,22 +216,35 @@ public enum ErrorMessageFormat implements MessageFormatter { RESULT_MAPPING_FAILURE( "Error occurred while processing response data: %s" ), + INVALID_DATASOURCE_UNION( "Union Data Source had conflicting name mappings for logical dimension '%s' with mappings of '%s' and '%s'" ), - DATA_AVAILABILITY_MISMATCH( "Data availability expectation does not match with actual query result obtained from druid for the " + "following intervals %s where druid does not have data" ), - TOO_MUCH_INTERVAL_MISSING( - "More than %s interval missing information received from druid, inspect if query " + - "expects more than %s missing intervals or increase " + - "uncoveredIntervalsLimit configuration value" + TOO_MANY_INTERVALS_MISSING( + "Query is returning more than the configured limit of '%s' missing intervals. " + + "There may be a problem with your data." ), + CONTEXT_AND_STATUS_MISSING_FROM_RESPONSE("JSON response is missing X-Druid-Response-Context and status code"), + + DRUID_RESPONSE_CONTEXT_MISSING_FROM_RESPONSE("JSON response is missing X-Druid-Response-Context"), + + UNCOVERED_INTERVALS_MISSING_FROM_RESPONSE( + "JSON response is missing 'uncoveredIntervals' from X-Druid-Response-Context header" + ), + + UNCOVERED_INTERVALS_OVERFLOWED_MISSING_FROM_RESPONSE( + "JSON response is missing 'uncoveredIntervalsOverflowed' from X-Druid-Response-Context header" + ), + + STATUS_CODE_MISSING_FROM_RESPONSE("JSON response is missing response status code"), + TOO_MANY_BACKING_DATA_SOURCES("TableDataSource built with too many backing data sources: %s"), TOO_FEW_BACKING_DATA_SOURCES("TableDataSource built with insufficient backing data sources: %s") ; diff --git a/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/DruidJsonResponseContentKeys.java b/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/DruidJsonResponseContentKeys.java new file mode 100644 index 0000000000..b51cd6f7ce --- /dev/null +++ b/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/DruidJsonResponseContentKeys.java @@ -0,0 +1,30 @@ +// Copyright 2017 Yahoo Inc. +// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms. +package com.yahoo.bard.webservice.web.responseprocessors; + +/** + * Enumerates the list of keys expected to be found in the FullResponseProcessor. + */ +public enum DruidJsonResponseContentKeys { + DRUID_RESPONSE_CONTEXT("X-Druid-Response-Context"), + UNCOVERED_INTERVALS("uncoveredIntervals"), + UNCOVERED_INTERVALS_OVERFLOWED("uncoveredIntervalsOverflowed"), + STATUS_CODE("status-code"), + RESPONSE("response") + ; + + private final String name; + + /** + * Constructor. + * + * @param name Name of the context key + */ + DruidJsonResponseContentKeys(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} diff --git a/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/DruidPartialDataResponseProcessor.java b/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/DruidPartialDataResponseProcessor.java new file mode 100644 index 0000000000..8e07a535c0 --- /dev/null +++ b/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/DruidPartialDataResponseProcessor.java @@ -0,0 +1,261 @@ +// Copyright 2017 Yahoo Inc. +// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms. +package com.yahoo.bard.webservice.web.responseprocessors; + +import com.yahoo.bard.webservice.druid.client.FailureCallback; +import com.yahoo.bard.webservice.druid.client.HttpErrorCallback; +import com.yahoo.bard.webservice.druid.model.query.DruidAggregationQuery; +import com.yahoo.bard.webservice.util.SimplifiedIntervalList; +import com.yahoo.bard.webservice.web.ErrorMessageFormat; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; + +import org.joda.time.Interval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +import javax.ws.rs.core.Response.Status; + +/** + * Response processor for finding missing partial data in Druid + *

+ * In druid version 0.9.0 or later, druid implemented a feature that returns missing intervals for a given query. + * For example + * + *

+ * {@code
+ * Content-Type: application/json
+ * 200 OK
+ * Date:  Mon, 10 Apr 2017 16:24:24 GMT
+ * Content-Type:  application/json
+ * X-Druid-Query-Id:  92c81bed-d9e6-4242-836b-0fcd1efdee9e
+ * X-Druid-Response-Context: {
+ *     "uncoveredIntervals": [
+ *         "2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z","2016-12-25T00:00:00.000Z/2017-
+ *         01-03T00:00:00.000Z","2017-01-31T00:00:00.000Z/2017-02-01T00:00:00.000Z","2017-02-
+ *         08T00:00:00.000Z/2017-02-09T00:00:00.000Z","2017-02-10T00:00:00.000Z/2017-02-
+ *         13T00:00:00.000Z","2017-02-16T00:00:00.000Z/2017-02-20T00:00:00.000Z","2017-02-
+ *         22T00:00:00.000Z/2017-02-25T00:00:00.000Z","2017-02-26T00:00:00.000Z/2017-03-
+ *         01T00:00:00.000Z","2017-03-04T00:00:00.000Z/2017-03-05T00:00:00.000Z","2017-03-
+ *         08T00:00:00.000Z/2017-03-09T00:00:00.000Z"
+ *     ],
+ *     "uncoveredIntervalsOverflowed": true
+ * }
+ * Content-Encoding:  gzip
+ * Vary:  Accept-Encoding, User-Agent
+ * Transfer-Encoding:  chunked
+ * Server:  Jetty(9.2.5.v20141112)
+ * }
+ * 
+ * + * The missing intervals are indicated in "uncoveredIntervals". We compare it to the missing intervals that we expects + * from Partial Data V1. If "uncoveredIntervals" contains any interval that is not present in our expected + * missing interval list, we can send back an error response indicating the mismatch in data availability before the + * response is cached. + */ +public class DruidPartialDataResponseProcessor implements FullResponseProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(DruidPartialDataResponseProcessor.class); + + private final ResponseProcessor next; + + /** + * Constructor. + * + * @param next Next ResponseProcessor in the chain + */ + public DruidPartialDataResponseProcessor(ResponseProcessor next) { + this.next = next; + } + + @Override + public ResponseContext getResponseContext() { + return next.getResponseContext(); + } + + @Override + public FailureCallback getFailureCallback(DruidAggregationQuery druidQuery) { + return next.getFailureCallback(druidQuery); + } + + @Override + public HttpErrorCallback getErrorCallback(DruidAggregationQuery druidQuery) { + return next.getErrorCallback(druidQuery); + } + + /** + * If status code is 200, do the following + * + *
    + *
  1. + * Extract uncoveredIntervalsOverflowed from X-Druid-Response-Context inside the JsonNode passed into + * DruidPartialDataResponseProcessor::processResponse, if it is true, invoke error response saying limit + * overflowed, + *
  2. + *
  3. + * Extract uncoveredIntervals from X-Druid-Response-Contex inside the JsonNode passed into + * DruidPartialDataResponseProcessor::processResponse, + *
  4. + *
  5. + * Parse both the uncoveredIntervals extracted above and allAvailableIntervals extracted from the union of + * all the query's datasource's availabilities from DataSourceMetadataService into SimplifiedIntervalLists, + *
  6. + *
  7. + * Compare both SimplifiedIntervalLists above, if allAvailableIntervals has any overlap with + * uncoveredIntervals, invoke error response indicating druid is missing some data that are we are expecting + * to exists. + *
  8. + *
+ * + * @param json The json representing a druid data response + * @param query The query with the schema for processing this response + * @param metadata The LoggingContext to use + */ + @Override + public void processResponse(JsonNode json, DruidAggregationQuery query, LoggingContext metadata) { + validateJsonResponse(json, query); + + int statusCode = json.get(DruidJsonResponseContentKeys.STATUS_CODE.getName()).asInt(); + if (statusCode == Status.OK.getStatusCode()) { + checkOverflow(json, query); + + SimplifiedIntervalList overlap = getOverlap(json, query); + if (!overlap.isEmpty()) { + logAndGetErrorCallback(ErrorMessageFormat.DATA_AVAILABILITY_MISMATCH.format(overlap), query); + } + if (next instanceof FullResponseProcessor) { + next.processResponse(json, query, metadata); + } else { + next.processResponse(json.get(DruidJsonResponseContentKeys.RESPONSE.getName()), query, metadata); + } + } else if (statusCode == Status.NOT_MODIFIED.getStatusCode() && !(next instanceof FullResponseProcessor)) { + logAndGetErrorCallback( + "Content Not Modified(304), but no etag cache response processor is available to process " + + "the 304 response", + query); + } else { + next.processResponse(json, query, metadata); + } + } + + /** + * Validates JSON response object to make sure it contains all of the following information. + * + * + * @param json The JSON response that is to be validated + * @param query The query with the schema for processing this response + */ + private void validateJsonResponse(JsonNode json, DruidAggregationQuery query) { + if (json.getNodeType() == JsonNodeType.ARRAY) { + logAndGetErrorCallback(ErrorMessageFormat.CONTEXT_AND_STATUS_MISSING_FROM_RESPONSE.format(), query); + } + + if (!json.has(DruidJsonResponseContentKeys.DRUID_RESPONSE_CONTEXT.getName())) { + logAndGetErrorCallback(ErrorMessageFormat.DRUID_RESPONSE_CONTEXT_MISSING_FROM_RESPONSE.format(), query); + return; + } + JsonNode druidResponseContext = json.get(DruidJsonResponseContentKeys.DRUID_RESPONSE_CONTEXT.getName()); + if (!druidResponseContext.has(DruidJsonResponseContentKeys.UNCOVERED_INTERVALS.getName())) { + logAndGetErrorCallback( + ErrorMessageFormat.UNCOVERED_INTERVALS_MISSING_FROM_RESPONSE.format(), + query + ); + return; + } + if (!druidResponseContext.has(DruidJsonResponseContentKeys.UNCOVERED_INTERVALS_OVERFLOWED.getName())) { + logAndGetErrorCallback( + ErrorMessageFormat.UNCOVERED_INTERVALS_OVERFLOWED_MISSING_FROM_RESPONSE.format(), + query + ); + return; + } + if (!json.has(DruidJsonResponseContentKeys.STATUS_CODE.getName())) { + logAndGetErrorCallback(ErrorMessageFormat.STATUS_CODE_MISSING_FROM_RESPONSE.format(), query); + } + } + + /** + * Checks and invokes error if the number of missing intervals are overflowed, i.e. more than the configured limit. + * + * @param json The json object containing the overflow flag + * @param query The query with the schema for processing this response + */ + private void checkOverflow(JsonNode json, DruidAggregationQuery query) { + if (json.get(DruidJsonResponseContentKeys.DRUID_RESPONSE_CONTEXT.getName()) + .get(DruidJsonResponseContentKeys.UNCOVERED_INTERVALS_OVERFLOWED.getName()) + .asBoolean() + ) { + logAndGetErrorCallback( + ErrorMessageFormat.TOO_MANY_INTERVALS_MISSING.format( + query.getContext().getUncoveredIntervalsLimit() + ), + query + ); + } + } + + /** + * Logs and gets error call back on the response with the provided error message. + * + * @param message The error message passed to the logger and the exception + * @param query The query with the schema for processing this response + */ + private void logAndGetErrorCallback(String message, DruidAggregationQuery query) { + LOG.error(message); + getErrorCallback(query).dispatch( + Status.INTERNAL_SERVER_ERROR.getStatusCode(), + "The server encountered an unexpected condition which prevented it from fulfilling the request.", + message); + } + + /** + * Returns the overlap between uncoveredIntervals from Druid and missing intervals that Fili expects. + * + * @param json The JSON node that contains the uncoveredIntervals from Druid, for example + *
+     * {@code
+     * X-Druid-Response-Context: {
+     *     "uncoveredIntervals": [
+     *         "2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z","2016-12-25T00:00:00.000Z/2017-
+     *         01-03T00:00:00.000Z","2017-01-31T00:00:00.000Z/2017-02-01T00:00:00.000Z","2017-02-
+     *         08T00:00:00.000Z/2017-02-09T00:00:00.000Z","2017-02-10T00:00:00.000Z/2017-02-
+     *         13T00:00:00.000Z","2017-02-16T00:00:00.000Z/2017-02-20T00:00:00.000Z","2017-02-
+     *         22T00:00:00.000Z/2017-02-25T00:00:00.000Z","2017-02-26T00:00:00.000Z/2017-03-
+     *         01T00:00:00.000Z","2017-03-04T00:00:00.000Z/2017-03-05T00:00:00.000Z","2017-03-
+     *         08T00:00:00.000Z/2017-03-09T00:00:00.000Z"
+     *     ],
+     *     "uncoveredIntervalsOverflowed": true
+     * }
+     * }
+     * 
+ * @param query The Druid query that contains the missing intervals that Fili expects + * + * @return the overlap between uncoveredIntervals from Druid and missing intervals that Fili expects. + */ + private SimplifiedIntervalList getOverlap(JsonNode json, DruidAggregationQuery query) { + List intervals = new ArrayList<>(); + for (JsonNode jsonNode : + json.get(DruidJsonResponseContentKeys.DRUID_RESPONSE_CONTEXT.getName()) + .get(DruidJsonResponseContentKeys.UNCOVERED_INTERVALS.getName()) + ) { + intervals.add(new Interval(jsonNode.asText())); + } + SimplifiedIntervalList druidIntervals = new SimplifiedIntervalList(intervals); + + return druidIntervals.intersect( + query.getDataSource().getPhysicalTable().getAvailableIntervals() + ); + } +} diff --git a/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/FullResponseProcessor.java b/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/FullResponseProcessor.java new file mode 100644 index 0000000000..c0ce1ed726 --- /dev/null +++ b/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/FullResponseProcessor.java @@ -0,0 +1,42 @@ +// Copyright 2017 Yahoo Inc. +// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms. +package com.yahoo.bard.webservice.web.responseprocessors; + +/** + * Response processor for that extracts header information from Druid response and put the information in our own + * response. + *

+ * This is a "Full" response process in a sense that it extracts and incorporates header information. For example, + * + *

+ * {@code
+ * Content-Type: application/json
+ * 200 OK
+ * Date:  Mon, 10 Apr 2017 16:24:24 GMT
+ * Content-Type:  application/json
+ * X-Druid-Query-Id:  92c81bed-d9e6-4242-836b-0fcd1efdee9e
+ * X-Druid-Response-Context: {
+ *     "uncoveredIntervals": [
+ *         "2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z","2016-12-25T00:00:00.000Z/2017-
+ *         01-03T00:00:00.000Z","2017-01-31T00:00:00.000Z/2017-02-01T00:00:00.000Z","2017-02-
+ *         08T00:00:00.000Z/2017-02-09T00:00:00.000Z","2017-02-10T00:00:00.000Z/2017-02-
+ *         13T00:00:00.000Z","2017-02-16T00:00:00.000Z/2017-02-20T00:00:00.000Z","2017-02-
+ *         22T00:00:00.000Z/2017-02-25T00:00:00.000Z","2017-02-26T00:00:00.000Z/2017-03-
+ *         01T00:00:00.000Z","2017-03-04T00:00:00.000Z/2017-03-05T00:00:00.000Z","2017-03-
+ *         08T00:00:00.000Z/2017-03-09T00:00:00.000Z"
+ *     ],
+ *     "uncoveredIntervalsOverflowed": true
+ * }
+ * Content-Encoding:  gzip
+ * Vary:  Accept-Encoding, User-Agent
+ * Transfer-Encoding:  chunked
+ * Server:  Jetty(9.2.5.v20141112)
+ * }
+ * 
+ * + * The JSON nesting strategy is to extract status code ("200" in this case) and "X-Druid-Response-Context" and put them + * along side with JSON response + * + */ +public interface FullResponseProcessor extends ResponseProcessor { +} diff --git a/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/ResponseProcessor.java b/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/ResponseProcessor.java index 345cc660d8..50e0f04c5f 100644 --- a/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/ResponseProcessor.java +++ b/fili-core/src/main/java/com/yahoo/bard/webservice/web/responseprocessors/ResponseProcessor.java @@ -42,8 +42,8 @@ public interface ResponseProcessor { /** * Process the response json and respond to the original web request. * - * @param query The query with the schema for processing this response * @param json The json representing a druid data response + * @param query The query with the schema for processing this response * @param metadata The LoggingContext to use */ void processResponse(JsonNode json, DruidAggregationQuery query, LoggingContext metadata); diff --git a/fili-core/src/test/groovy/com/yahoo/bard/webservice/druid/client/impl/AsyncDruidWebServiceImplWithHeaderNestingSpec.groovy b/fili-core/src/test/groovy/com/yahoo/bard/webservice/druid/client/impl/HeaderNestingJsonBuilderStrategySpec.groovy similarity index 93% rename from fili-core/src/test/groovy/com/yahoo/bard/webservice/druid/client/impl/AsyncDruidWebServiceImplWithHeaderNestingSpec.groovy rename to fili-core/src/test/groovy/com/yahoo/bard/webservice/druid/client/impl/HeaderNestingJsonBuilderStrategySpec.groovy index a4e82d39db..576c23f853 100644 --- a/fili-core/src/test/groovy/com/yahoo/bard/webservice/druid/client/impl/AsyncDruidWebServiceImplWithHeaderNestingSpec.groovy +++ b/fili-core/src/test/groovy/com/yahoo/bard/webservice/druid/client/impl/HeaderNestingJsonBuilderStrategySpec.groovy @@ -12,7 +12,7 @@ import org.asynchttpclient.Response import java.nio.charset.StandardCharsets import java.util.function.Supplier -class AsyncDruidWebServiceImplWithHeaderNestingSpec extends Specification { +class HeaderNestingJsonBuilderStrategySpec extends Specification { def "Make sure X-Druid-Response-Context and status-code are merged into existing JsonNode"() { given: @@ -28,6 +28,7 @@ class AsyncDruidWebServiceImplWithHeaderNestingSpec extends Specification { "uncoveredIntervalsOverflowed": true } '''.replace(" ", "").replace("\n", "") + response.getStatusCode() >> 200 DruidServiceConfig druidServiceConfig = Mock(DruidServiceConfig) druidServiceConfig.getTimeout() >> 100 @@ -50,7 +51,8 @@ class AsyncDruidWebServiceImplWithHeaderNestingSpec extends Specification { "2016-12-25T00:00:00.000Z/2017-01-03T00:00:00.000Z" ], "uncoveredIntervalsOverflowed": true - } + }, + "status-code": 200 } '''.replace(" ", "").replace("\n", "") } diff --git a/fili-core/src/test/groovy/com/yahoo/bard/webservice/web/responseprocessors/DruidPartialDataResponseProcessorSpec.groovy b/fili-core/src/test/groovy/com/yahoo/bard/webservice/web/responseprocessors/DruidPartialDataResponseProcessorSpec.groovy new file mode 100644 index 0000000000..b728e69a81 --- /dev/null +++ b/fili-core/src/test/groovy/com/yahoo/bard/webservice/web/responseprocessors/DruidPartialDataResponseProcessorSpec.groovy @@ -0,0 +1,240 @@ +// Copyright 2016 Yahoo Inc. +// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms. +package com.yahoo.bard.webservice.web.responseprocessors + +import com.yahoo.bard.webservice.druid.client.HttpErrorCallback +import com.yahoo.bard.webservice.druid.model.datasource.DataSource +import com.yahoo.bard.webservice.druid.model.query.DruidAggregationQuery +import com.yahoo.bard.webservice.druid.model.query.QueryContext +import com.yahoo.bard.webservice.table.ConstrainedTable +import com.yahoo.bard.webservice.util.SimplifiedIntervalList +import com.yahoo.bard.webservice.web.ErrorMessageFormat + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node.ArrayNode +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module + +import org.joda.time.Interval + +import spock.lang.Specification + +import java.util.stream.Collectors + +class DruidPartialDataResponseProcessorSpec extends Specification { + private static final ObjectMapper MAPPER = new ObjectMapper() + .registerModule(new Jdk8Module().configureAbsentsAsNulls(false)) + private static final int ERROR_STATUS_CODE = 500 + private static final String REASON_PHRASE = 'The server encountered an unexpected condition which ' + + 'prevented it from fulfilling the request.' + + ResponseProcessor next + HttpErrorCallback httpErrorCallback + DruidAggregationQuery druidAggregationQuery + DruidPartialDataResponseProcessor druidPartialDataResponseProcessor + + def setup() { + next = Mock(ResponseProcessor) + httpErrorCallback = Mock(HttpErrorCallback) + druidAggregationQuery = Mock(DruidAggregationQuery) + next.getErrorCallback(druidAggregationQuery) >> httpErrorCallback + druidPartialDataResponseProcessor = new DruidPartialDataResponseProcessor(next) + } + + def "getOverlap returns intersection between Druid intervals and Fili intervals in case of #caseDescription"() { + given: + JsonNode json = MAPPER.readTree(constructJSON(missingIntervals)) + + DataSource dataSource = Mock(DataSource) + ConstrainedTable constrainedTable = Mock(ConstrainedTable) + + constrainedTable.getAvailableIntervals() >> new SimplifiedIntervalList( + availableIntervals.collect{it -> new Interval(it)} + ) + dataSource.getPhysicalTable() >> constrainedTable + druidAggregationQuery.getDataSource() >> dataSource + + expect: + druidPartialDataResponseProcessor.getOverlap(json, druidAggregationQuery) == new SimplifiedIntervalList( + expected.collect{it -> new Interval(it)} + ) + + where: + missingIntervals | availableIntervals | expected | caseDescription + ["2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z", "2016-12-25T00:00:00.000Z/2017-01-03T00:00:00.000Z"] | + ["2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z", "2016-12-25T00:00:00.000Z/2017-01-03T00:00:00.000Z"] | + ["2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z", "2016-12-25T00:00:00.000Z/2017-01-03T00:00:00.000Z"] | + "completely overlapped" + ["2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z", "2016-12-25T00:00:00.000Z/2017-01-03T00:00:00.000Z"] | + ["2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z"] | + ["2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z"] | "partially overlapped (Fili's intervals contained inside Druid's)" + ["2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z"] | + ["2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z", "2016-12-25T00:00:00.000Z/2017-01-03T00:00:00.000Z"] | + ["2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z"] | "partially overlapped (Druid's intervals contained inside Fili's)" + ["2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z","2016-12-25T00:00:00.000Z/2017-01-03T00:00:00.000Z"] | + ["2019-11-22T00:00:00.000Z/2019-12-18T00:00:00.000Z"] | + [] | "no overlapping" + ["2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z","2016-12-25T00:00:00.000Z/2017-01-03T00:00:00.000Z"] | + [] | + [] | "no overlapping (Fili has no emtpy intervals)" + } + + def "checkOverflow recognizes interval overflow correctly"() { + given: + QueryContext queryContext = Mock(QueryContext) + queryContext.getUncoveredIntervalsLimit() >> 10 + druidAggregationQuery.getContext() >> queryContext + JsonNode json = MAPPER.readTree( + ''' + { + "response": [{"k1":"v1"}], + "X-Druid-Response-Context": { + "uncoveredIntervals": [ + "2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z", + "2016-12-25T00:00:00.000Z/2017-01-03T00:00:00.000Z" + ], + "uncoveredIntervalsOverflowed": true + }, + "status-code": 200 + } + ''' + ) + + when: + druidPartialDataResponseProcessor.checkOverflow(json, druidAggregationQuery) + + then: + 1 * httpErrorCallback.dispatch( + ERROR_STATUS_CODE, + REASON_PHRASE, + ErrorMessageFormat.TOO_MANY_INTERVALS_MISSING.format("10") + ) + } + + def "processResponse logs and invokes error callback on data availability mismatch"() { + given: + JsonNode json = MAPPER.readTree( + constructJSON( + [ + "2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z", + "2016-12-25T00:00:00.000Z/2017-01-03T00:00:00.000Z" + ] + ) + ) + + DataSource dataSource = Mock(DataSource) + ConstrainedTable constrainedTable = Mock(ConstrainedTable) + + constrainedTable.getAvailableIntervals() >> new SimplifiedIntervalList( + [new Interval("2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z")] + ) + dataSource.getPhysicalTable() >> constrainedTable + druidAggregationQuery.getDataSource() >> dataSource + + when: + druidPartialDataResponseProcessor.processResponse(json, druidAggregationQuery, Mock(LoggingContext)) + + then: + 1 * httpErrorCallback.dispatch( + ERROR_STATUS_CODE, + REASON_PHRASE, + ErrorMessageFormat.DATA_AVAILABILITY_MISMATCH.format("[2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z]") + ) + } + + def "validateJsonResponse recognizes missing component"() { + given: + ArrayNode arrayNode = MAPPER.createArrayNode() + JsonNode druidResponseContext = Mock(JsonNode) + JsonNode json = Mock(JsonNode) + json.get(DruidJsonResponseContentKeys.DRUID_RESPONSE_CONTEXT.getName()) >> druidResponseContext + + when: + // Druid returns response in an ArrayNode, which doesn't contain X-Druid-Response-Context or status code + druidPartialDataResponseProcessor.validateJsonResponse(arrayNode, druidAggregationQuery) + then: + 1 * httpErrorCallback.dispatch( + ERROR_STATUS_CODE, + REASON_PHRASE, + ErrorMessageFormat.CONTEXT_AND_STATUS_MISSING_FROM_RESPONSE.format() + ) + + when: + // Druid response is missing X-Druid-Response-Context + json.has(DruidJsonResponseContentKeys.DRUID_RESPONSE_CONTEXT.getName()) >> false + druidPartialDataResponseProcessor.validateJsonResponse(json, druidAggregationQuery) + then: + 1 * httpErrorCallback.dispatch( + ERROR_STATUS_CODE, + REASON_PHRASE, + ErrorMessageFormat.DRUID_RESPONSE_CONTEXT_MISSING_FROM_RESPONSE.format() + ) + + when: + // Druid response has X-Druid-Response-Context, + // but the X-Druid-Response-Context is missing uncoveredIntervals + json.has(DruidJsonResponseContentKeys.DRUID_RESPONSE_CONTEXT.getName()) >> true + druidResponseContext.has(DruidJsonResponseContentKeys.UNCOVERED_INTERVALS.getName()) >> false + druidPartialDataResponseProcessor.validateJsonResponse(json, druidAggregationQuery) + then: + 1 * httpErrorCallback.dispatch( + ERROR_STATUS_CODE, + REASON_PHRASE, + ErrorMessageFormat.UNCOVERED_INTERVALS_MISSING_FROM_RESPONSE.format() + ) + + when: + // Druid response has X-Druid-Response-Context, + // but the X-Druid-Response-Context is missing uncoveredIntervalsOverflowed + json.has(DruidJsonResponseContentKeys.DRUID_RESPONSE_CONTEXT.getName()) >> true + druidResponseContext.has(DruidJsonResponseContentKeys.UNCOVERED_INTERVALS.getName()) >> true + druidResponseContext.has(DruidJsonResponseContentKeys.UNCOVERED_INTERVALS_OVERFLOWED.getName()) >> false + druidPartialDataResponseProcessor.validateJsonResponse(json, druidAggregationQuery) + then: + 1 * httpErrorCallback.dispatch( + ERROR_STATUS_CODE, + REASON_PHRASE, + ErrorMessageFormat.UNCOVERED_INTERVALS_OVERFLOWED_MISSING_FROM_RESPONSE.format() + ) + + when: + // Druid response has X-Druid-Response-Context, + // but the response is missing status code + json.has(DruidJsonResponseContentKeys.DRUID_RESPONSE_CONTEXT.getName()) >> true + druidResponseContext.has(DruidJsonResponseContentKeys.UNCOVERED_INTERVALS.getName()) >> true + druidResponseContext.has(DruidJsonResponseContentKeys.UNCOVERED_INTERVALS_OVERFLOWED.getName()) >> true + json.has(DruidJsonResponseContentKeys.STATUS_CODE.getName()) >> false + druidPartialDataResponseProcessor.validateJsonResponse(json, druidAggregationQuery) + then: + 1 * httpErrorCallback.dispatch( + ERROR_STATUS_CODE, + REASON_PHRASE, + ErrorMessageFormat.STATUS_CODE_MISSING_FROM_RESPONSE.format() + ) + } + + /** + * Constructs a JSON response using a template and a list of provided intervals in String representation. + * + * @param intervals the list of intervals to be used by the template + */ + static String constructJSON(List intervals) { + return String.format( + ''' + { + "response": [{"k1":"v1"}], + "X-Druid-Response-Context": { + "uncoveredIntervals": [ + %s + ], + "uncoveredIntervalsOverflowed": false + }, + "status-code": 200 + } + ''', + intervals.stream() + .map{it -> "\"" + it + "\""} + .collect(Collectors.joining(",")) + ) + } +}