Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
QubitPi committed May 22, 2017
1 parent d7805e7 commit 991983b
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
import com.yahoo.bard.webservice.druid.client.FailureCallback;
import com.yahoo.bard.webservice.druid.client.HttpErrorCallback;
import com.yahoo.bard.webservice.druid.model.query.DruidAggregationQuery;
import com.yahoo.bard.webservice.util.SimplifiedIntervalList;
import com.yahoo.bard.webservice.web.ErrorMessageFormat;

import com.fasterxml.jackson.databind.JsonNode;

import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

import javax.ws.rs.core.Response.Status;

/**
Expand Down Expand Up @@ -48,7 +54,7 @@
* Date: Mon, 10 Apr 2017 16:24:24 GMT
* Content-Type: application/json
* X-Druid-Query-Id: 92c81bed-d9e6-4242-836b-0fcd1efdee9e
* X-Druid-Response-Context: {
* X-Druid-Response-Context: {
* "uncoveredIntervals": [
* "2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z","2016-12-25T00:00:00.000Z/2017-
* 01-03T00:00:00.000Z","2017-01-31T00:00:00.000Z/2017-02-01T00:00:00.000Z","2017-02-
Expand Down Expand Up @@ -110,27 +116,21 @@ public HttpErrorCallback getErrorCallback(DruidAggregationQuery<?> druidQuery) {
* <li>
* Extract uncoveredIntervalsOverflowed from X-Druid-Response-Context inside the JsonNode passed into
* PartialDataV2ResponseProcessor::processResponse, if it is true, invoke error response saying limit
* overflowed
* overflowed,
* </li>
* <li>
* Extract uncoveredIntervals from X-Druid-Response-Contex inside the JsonNode passed into
* PartialDataV2ResponseProcessor::processResponse
* PartialDataV2ResponseProcessor::processResponse,
* </li>
* <li>
* Parse both the uncoveredIntervals extracted above and allAvailableIntervals extracted from the union of
* all the query's datasource's availabilities from DataSourceMetadataService into SimplifiedIntervalLists
* all the query's datasource's availabilities from DataSourceMetadataService into SimplifiedIntervalLists,
* </li>
* <li>
* Compare both SimplifiedIntervalLists above, if allAvailableIntervals has any overlap with
* uncoveredIntervals, invoke error response indicating druid is missing some data that are we expects to
* exists.
* </li>
* <li>
* Otherwise, check if the next responseProcessor is a FullResponseProcessor or not, if yes, call the next
* responseProcessor with the same JsonNode as passed int, otherwise call the next response with the
* JsonNode being the response body JsonNode instead of the ObjectNode containing the extra
* "X-Druid_Response-Context"
* </li>
* </ol>
*
* @param json The json representing a druid data response
Expand All @@ -141,9 +141,20 @@ public HttpErrorCallback getErrorCallback(DruidAggregationQuery<?> druidQuery) {
public void processResponse(JsonNode json, DruidAggregationQuery<?> query, LoggingContext metadata) {
validateJsonResponse(json, query);

int statusCode = json.get("status-code").asInt();
if (statusCode == Status.OK.getStatusCode()) {
// implementation is blocked by https://github.com/yahoo/fili/pull/262
if (json.get("status-code").asInt() == Status.OK.getStatusCode()) {
checkOverflow(json, query);

SimplifiedIntervalList overlap = getUncoveredIntervalsFromResponse(json).intersect(
query.getDataSource().getPhysicalTable().getAvailableIntervals()
);
if (!overlap.isEmpty()) {
logAndGetErrorCallback(ErrorMessageFormat.DATA_AVAILABILITY_MISMATCH.format(overlap), query);
}
if (next instanceof FullResponseProcessor) {
next.processResponse(json, query, metadata);
} else {
next.processResponse(json.get("response"), query, metadata);
}
}

next.processResponse(json, query, metadata);
Expand All @@ -162,37 +173,81 @@ public void processResponse(JsonNode json, DruidAggregationQuery<?> query, Loggi
* </ul>
*
* @param json The JSON response that is to be validated
* @param druidQuery The query with the schema for processing this response
* @param query The query with the schema for processing this response
*/
private void validateJsonResponse(JsonNode json, DruidAggregationQuery<?> druidQuery) {
private void validateJsonResponse(JsonNode json, DruidAggregationQuery<?> query) {
if (!json.has("X-Druid-Response-Context")) {
logAndGetErrorCallback("Response is missing X-Druid-Response-Context", druidQuery);
logAndGetErrorCallback("Response is missing X-Druid-Response-Context", query);
}
if (!json.get("X-Druid-Response-Context").has("uncoveredIntervals")) {
logAndGetErrorCallback("Response is missing 'uncoveredIntervals' X-Druid-Response-Context", druidQuery);
logAndGetErrorCallback("Response is missing 'uncoveredIntervals' X-Druid-Response-Context", query);
}
if (!json.get("X-Druid-Response-Context").has("uncoveredIntervalsOverflowed")) {
logAndGetErrorCallback(
"Response is missing 'uncoveredIntervalsOverflowed' X-Druid-Response-Context",
druidQuery
query
);
}
if (!json.has("status-code")) {
logAndGetErrorCallback("Response is missing response status code", druidQuery);
logAndGetErrorCallback("Response is missing response status code", query);
}
}

/**
* Checks and invokes error if the number of missing intervals are overflowed, i.e. more than the configured limit.
*
* @param json The json object containing the overflow flag
* @param query The query with the schema for processing this response
*/
private void checkOverflow(JsonNode json, DruidAggregationQuery<?> query) {
if (json.get("X-Druid-Response-Context").get("uncoveredIntervalsOverflowed").asBoolean()) {
int limit = query.getContext().getUncoveredIntervalsLimit();
logAndGetErrorCallback(ErrorMessageFormat.TOO_MUCH_INTERVAL_MISSING.format(limit, limit), query);
}
}

/**
* Logs and gets error call back on the response with the provided error message.
*
* @param message The error message passed to the logger and the exception
* @param druidQuery The query with the schema for processing this response
* @param query The query with the schema for processing this response
*/
private void logAndGetErrorCallback(String message, DruidAggregationQuery<?> druidQuery) {
private void logAndGetErrorCallback(String message, DruidAggregationQuery<?> query) {
LOG.error(message);
getErrorCallback(druidQuery).dispatch(
getErrorCallback(query).dispatch(
Status.INTERNAL_SERVER_ERROR.getStatusCode(),
"The server encountered an unexpected condition which prevented it from fulfilling the request.",
message);
}

/**
* Returns uncovered intervals from Druid response in a SimplifiedIntervalList.
*
* @param json the JSON object containing the list of uncovered intervals, for example
* <pre>
* {@code
* X-Druid-Response-Context: {
* "uncoveredIntervals": [
* "2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z","2016-12-25T00:00:00.000Z/2017-
* 01-03T00:00:00.000Z","2017-01-31T00:00:00.000Z/2017-02-01T00:00:00.000Z","2017-02-
* 08T00:00:00.000Z/2017-02-09T00:00:00.000Z","2017-02-10T00:00:00.000Z/2017-02-
* 13T00:00:00.000Z","2017-02-16T00:00:00.000Z/2017-02-20T00:00:00.000Z","2017-02-
* 22T00:00:00.000Z/2017-02-25T00:00:00.000Z","2017-02-26T00:00:00.000Z/2017-03-
* 01T00:00:00.000Z","2017-03-04T00:00:00.000Z/2017-03-05T00:00:00.000Z","2017-03-
* 08T00:00:00.000Z/2017-03-09T00:00:00.000Z"
* ],
* "uncoveredIntervalsOverflowed": true
* }
* }
* </pre>
*
* @return uncovered intervals in a SimplifiedIntervalList.
*/
private static SimplifiedIntervalList getUncoveredIntervalsFromResponse(JsonNode json) {
List<Interval> intervals = new ArrayList<>();
for (JsonNode jsonNode : json.get("X-Druid-Response-Context").get("uncoveredIntervals")) {
intervals.add(new Interval(jsonNode.asText()));
}
return new SimplifiedIntervalList(intervals);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2016 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.web.responseprocessors

import com.yahoo.bard.webservice.druid.client.FailureCallback
import com.yahoo.bard.webservice.druid.client.HttpErrorCallback
import com.yahoo.bard.webservice.druid.model.query.DruidAggregationQuery
import com.yahoo.bard.webservice.util.SimplifiedIntervalList

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module

import org.joda.time.Interval

import spock.lang.Specification

class PartialDataV2ResponseProcessorSpec extends Specification {
private static final ObjectMapper MAPPER = new ObjectMapper()
.registerModule(new Jdk8Module().configureAbsentsAsNulls(false))

DruidAggregationQuery druidAggregationQuery
ResponseContext responseContext
FailureCallback failureCallback
HttpErrorCallback httpErrorCallback
ResponseProcessor next
PartialDataV2ResponseProcessor partialDataV2ResponseProcessor

def setup() {
druidAggregationQuery = Mock(DruidAggregationQuery)
responseContext = Mock(ResponseContext)
failureCallback = Mock(FailureCallback)
httpErrorCallback = Mock(HttpErrorCallback)
next = Mock(ResponseProcessor)
next.getResponseContext() >> responseContext
next.getFailureCallback(druidAggregationQuery) >> failureCallback
next.getErrorCallback(druidAggregationQuery) >> httpErrorCallback
partialDataV2ResponseProcessor = new PartialDataV2ResponseProcessor(next)
}

def "Test constructor"() {
expect:
partialDataV2ResponseProcessor.next == next;
}

def "Test proxy calls"() {
when:
partialDataV2ResponseProcessor.getResponseContext()
partialDataV2ResponseProcessor.getErrorCallback(druidAggregationQuery)
partialDataV2ResponseProcessor.getFailureCallback(druidAggregationQuery)

then:
1 * next.getResponseContext() >> responseContext
1 * next.getFailureCallback(druidAggregationQuery) >> failureCallback
1 * next.getErrorCallback(druidAggregationQuery) >> httpErrorCallback
}

def "getUncoveredIntervalsFromResponse returns all uncovered intervals in SimplifiedList"() {
given:
JsonNode json = MAPPER.readTree(
jsonInString
.replace(" ", "")
.replace("\n", "")
)

expect:
partialDataV2ResponseProcessor.getUncoveredIntervalsFromResponse(json) == new SimplifiedIntervalList(
expected.collect{it -> new Interval(it)}
)

where:
jsonInString | expected
'''
{
"response": [{"k1":"v1"}],
"X-Druid-Response-Context": {
"uncoveredIntervals": [
"2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z",
"2016-12-25T00:00:00.000Z/2017-01-03T00:00:00.000Z"
],
"uncoveredIntervalsOverflowed": true
},
"status-code": 200
}
''' | ["2016-11-22T00:00:00.000Z/2016-12-18T00:00:00.000Z", "2016-12-25T00:00:00.000Z/2017-01-03T00:00:00.000Z"]
}
}

0 comments on commit 991983b

Please sign in to comment.