Skip to content

Commit

Permalink
Implement DruidPartialDataRequestHandler (#287)
Browse files Browse the repository at this point in the history
  • Loading branch information
QubitPi authored May 31, 2017
1 parent f9db95a commit 7d0435c
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 0 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ pull request if there was one.
Current
-------
### Added:
- [Implement DruidPartialDataRequestHandler](https://github.com/yahoo/fili/pull/287)
* Implement `DruidPartialDataRequestHandler` that injects `druid_uncovered_interval_limit` into Druid query context
* Append `DruidPartialDataResponseProcessor` to the current next `ResponseProcessor` chain
* Add `DruidPartialDataRequestHandler` to `DruidWorkflow` between `AsyncDruidRequestHandler` and
`CacheV2RequestHandler` and invoke the `DruidPartialDataRequestHandler` if `druid_uncovered_interval_limit` is
greater than 0

- [Implement DruidPartialDataResponseProcessor](https://github.com/yahoo/fili/pull/275)
* Add `FullResponseProcessor` interface that extends `ResponseProcessor`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2017 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.web.handlers;

import com.yahoo.bard.webservice.config.SystemConfig;
import com.yahoo.bard.webservice.config.SystemConfigProvider;
import com.yahoo.bard.webservice.druid.model.query.DruidAggregationQuery;
import com.yahoo.bard.webservice.web.DataApiRequest;
import com.yahoo.bard.webservice.web.responseprocessors.DruidPartialDataResponseProcessor;
import com.yahoo.bard.webservice.web.responseprocessors.ResponseProcessor;

import javax.validation.constraints.NotNull;

/**
* A request handler that builds responses for Druid partial data
* <p>
* The handler inject "uncoveredIntervalsLimit: $druid_uncovered_interval_limit" context to Druid query.
*/
public class DruidPartialDataRequestHandler implements DataRequestHandler {

private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance();

private final DataRequestHandler next;
private final int druidUncoveredIntervalLimit = SYSTEM_CONFIG.getIntProperty(
SYSTEM_CONFIG.getPackageVariableName("druid_uncovered_interval_limit"), 0
);

/**
* Constructor.
*
* @param next Next Handler in the chain
*/
public DruidPartialDataRequestHandler(@NotNull DataRequestHandler next) {
this.next = next;
}

@Override
public boolean handleRequest(
RequestContext context,
DataApiRequest request,
DruidAggregationQuery<?> druidQuery,
ResponseProcessor response
) {
return next.handleRequest(
context,
request,
druidQuery.withContext(
druidQuery.getContext().withUncoveredIntervalsLimit(druidUncoveredIntervalLimit)
),
new DruidPartialDataResponseProcessor(response)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package com.yahoo.bard.webservice.web.handlers.workflow;

import com.yahoo.bard.webservice.config.BardFeatureFlag;
import com.yahoo.bard.webservice.config.SystemConfig;
import com.yahoo.bard.webservice.config.SystemConfigProvider;
import com.yahoo.bard.webservice.data.PartialDataHandler;
import com.yahoo.bard.webservice.data.cache.DataCache;
import com.yahoo.bard.webservice.data.volatility.VolatileIntervalsService;
Expand All @@ -14,6 +16,7 @@
import com.yahoo.bard.webservice.web.handlers.CacheV2RequestHandler;
import com.yahoo.bard.webservice.web.handlers.DataRequestHandler;
import com.yahoo.bard.webservice.web.handlers.DebugRequestHandler;
import com.yahoo.bard.webservice.web.handlers.DruidPartialDataRequestHandler;
import com.yahoo.bard.webservice.web.handlers.PaginationRequestHandler;
import com.yahoo.bard.webservice.web.handlers.PartialDataRequestHandler;
import com.yahoo.bard.webservice.web.handlers.SplitQueryRequestHandler;
Expand Down Expand Up @@ -44,8 +47,14 @@
@Singleton
public class DruidWorkflow implements RequestWorkflowProvider {

private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance();

public static final String RESPONSE_WORKFLOW_TIMER = "ResponseWorkflow";
public static final String REQUEST_WORKFLOW_TIMER = "RequestWorkflow";
private final int druidUncoveredIntervalLimit = SYSTEM_CONFIG.getIntProperty(
SYSTEM_CONFIG.getPackageVariableName("druid_uncovered_interval_limit"),
0
);

protected final @NotNull DataCache<?> dataCache;
protected final @NotNull DruidWebService uiWebService;
Expand Down Expand Up @@ -99,6 +108,12 @@ public DataRequestHandler buildWorkflow() {
DataRequestHandler uiHandler = new AsyncWebServiceRequestHandler(uiWebService, mapper);
DataRequestHandler nonUiHandler = new AsyncWebServiceRequestHandler(nonUiWebService, mapper);

// If Druid sends uncoveredIntervals, missing intervals are checked before sending the request
if (druidUncoveredIntervalLimit > 0) {
uiHandler = new DruidPartialDataRequestHandler(uiHandler);
nonUiHandler = new DruidPartialDataRequestHandler(nonUiHandler);
}

// If query caching is enabled, the cache is checked before sending the request
if (BardFeatureFlag.DRUID_CACHE.isOn()) {
if (BardFeatureFlag.DRUID_CACHE_V2.isOn()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2017 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.web.handlers

import com.yahoo.bard.webservice.config.SystemConfig
import com.yahoo.bard.webservice.config.SystemConfigProvider
import com.yahoo.bard.webservice.druid.model.datasource.DataSource
import com.yahoo.bard.webservice.druid.model.filter.Filter
import com.yahoo.bard.webservice.druid.model.having.Having
import com.yahoo.bard.webservice.druid.model.orderby.LimitSpec
import com.yahoo.bard.webservice.druid.model.query.DruidAggregationQuery
import com.yahoo.bard.webservice.druid.model.query.Granularity
import com.yahoo.bard.webservice.druid.model.query.GroupByQuery
import com.yahoo.bard.webservice.druid.model.query.QueryContext
import com.yahoo.bard.webservice.web.DataApiRequest
import com.yahoo.bard.webservice.web.responseprocessors.DruidPartialDataResponseProcessor
import com.yahoo.bard.webservice.web.responseprocessors.ResponseProcessor

import spock.lang.Specification

class DruidPartialDataRequestHandlerSpec extends Specification {
private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance();

def "New query context is passed to next handler"() {
given:
SystemConfig systemConfig = SystemConfigProvider.getInstance()
String uncoveredKey = SYSTEM_CONFIG.getPackageVariableName("druid_uncovered_interval_limit")
systemConfig.setProperty(uncoveredKey, '10')

DataRequestHandler nextHandler = Mock(DataRequestHandler)
ResponseProcessor responseProcessor = Mock(ResponseProcessor)
RequestContext requestContext = Mock(RequestContext)
DruidAggregationQuery druidQuery = Mock(DruidAggregationQuery)
QueryContext queryContext = Mock(QueryContext)
DataApiRequest apiRequest = Mock(DataApiRequest)

druidQuery.getContext() >> queryContext

DruidPartialDataRequestHandler druidPartialDataRequestHandler = new DruidPartialDataRequestHandler(
nextHandler
)

when:
druidPartialDataRequestHandler.handleRequest(requestContext, apiRequest, druidQuery, responseProcessor)

then:
1 * druidQuery.withContext(queryContext) >> druidQuery
1 * queryContext.withUncoveredIntervalsLimit(10) >> queryContext
1 * nextHandler.handleRequest(requestContext, apiRequest, druidQuery, _ as DruidPartialDataResponseProcessor)

cleanup:
systemConfig.clearProperty(uncoveredKey)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_CACHE
import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_CACHE_V2
import static com.yahoo.bard.webservice.config.BardFeatureFlag.QUERY_SPLIT

import com.yahoo.bard.webservice.config.SystemConfig
import com.yahoo.bard.webservice.config.SystemConfigProvider
import com.yahoo.bard.webservice.data.PartialDataHandler
import com.yahoo.bard.webservice.data.cache.DataCache
import com.yahoo.bard.webservice.data.cache.TupleDataCache
Expand All @@ -20,6 +22,7 @@ import com.yahoo.bard.webservice.web.handlers.CacheV2RequestHandler
import com.yahoo.bard.webservice.web.handlers.DataRequestHandler
import com.yahoo.bard.webservice.web.handlers.DebugRequestHandler
import com.yahoo.bard.webservice.web.handlers.DefaultWebServiceHandlerSelector
import com.yahoo.bard.webservice.web.handlers.DruidPartialDataRequestHandler
import com.yahoo.bard.webservice.web.handlers.SplitQueryRequestHandler
import com.yahoo.bard.webservice.web.handlers.WebServiceSelectorRequestHandler
import com.yahoo.bard.webservice.web.handlers.WeightCheckRequestHandler
Expand All @@ -33,6 +36,7 @@ import spock.lang.Specification
class DruidWorkflowSpec extends Specification {
private static final ObjectMapper MAPPER = new ObjectMapper()
.registerModule(new Jdk8Module().configureAbsentsAsNulls(false))
private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance();

boolean cacheStatus
boolean cacheV2Status
Expand All @@ -48,10 +52,16 @@ class DruidWorkflowSpec extends Specification {
QuerySigningService<Long> querySigningService = Mock(SegmentIntervalsHashIdGenerator)
VolatileIntervalsService volatileIntervalsService = Mock(VolatileIntervalsService)

SystemConfig systemConfig
String uncoveredKey

def setup() {
cacheStatus = DRUID_CACHE.isOn()
cacheV2Status = DRUID_CACHE_V2.isOn()
splittingStatus = QUERY_SPLIT.isOn()

systemConfig = SystemConfigProvider.getInstance()
uncoveredKey = SYSTEM_CONFIG.getPackageVariableName("druid_uncovered_interval_limit")
}

def cleanup() {
Expand Down Expand Up @@ -193,4 +203,68 @@ class DruidWorkflowSpec extends Specification {
def byClass(Class c) {
{ it->it.class == c}
}

def "Test workflow contains DruidPartialDataRequestHandler when druidUncoveredIntervalLimit > 0"() {
setup:
systemConfig.setProperty(uncoveredKey, '10')
dw = new DruidWorkflow(
dataCache,
uiWebService,
nonUiWebService,
weightUtil,
physicalTableDictionary,
partialDataHandler,
querySigningService,
volatileIntervalsService,
MAPPER
)
DataRequestHandler workflow = dw.buildWorkflow()
List<DataRequestHandler> handlers = getHandlerChain(workflow)
WebServiceSelectorRequestHandler select = handlers.find(byClass(WebServiceSelectorRequestHandler))
def defaultHandler = select.handlerSelector as DefaultWebServiceHandlerSelector

when:
def handlers1 = getHandlerChain(defaultHandler.uiWebServiceHandler.next)
def handlers2 = getHandlerChain(defaultHandler.nonUiWebServiceHandler.next)

then:
handlers1.find(byClass(DruidPartialDataRequestHandler)) != null
handlers2.find(byClass(DruidPartialDataRequestHandler)) != null

cleanup:
systemConfig.clearProperty(uncoveredKey)
}

def "Test workflow doesn't contain DruidPartialDataRequestHandler when druidUncoveredIntervalLimit <= 0"() {
setup:
SystemConfig systemConfig = SystemConfigProvider.getInstance()
String uncoveredKey = SYSTEM_CONFIG.getPackageVariableName("druid_uncovered_interval_limit")
systemConfig.setProperty(uncoveredKey, '0')
dw = new DruidWorkflow(
dataCache,
uiWebService,
nonUiWebService,
weightUtil,
physicalTableDictionary,
partialDataHandler,
querySigningService,
volatileIntervalsService,
MAPPER
)
DataRequestHandler workflow = dw.buildWorkflow()
List<DataRequestHandler> handlers = getHandlerChain(workflow)
WebServiceSelectorRequestHandler select = handlers.find(byClass(WebServiceSelectorRequestHandler))
def defaultHandler = select.handlerSelector as DefaultWebServiceHandlerSelector

when:
def handlers1 = getHandlerChain(defaultHandler.uiWebServiceHandler.next)
def handlers2 = getHandlerChain(defaultHandler.nonUiWebServiceHandler.next)

then:
handlers1.find(byClass(DruidPartialDataRequestHandler)) == null
handlers2.find(byClass(DruidPartialDataRequestHandler)) == null

cleanup:
systemConfig.clearProperty(uncoveredKey)
}
}

0 comments on commit 7d0435c

Please sign in to comment.