From 7d0435c480c2eabc982507a25eb81413005c5fa9 Mon Sep 17 00:00:00 2001 From: Jiaqi Liu Date: Wed, 31 May 2017 09:02:47 -0700 Subject: [PATCH] Implement DruidPartialDataRequestHandler (#287) --- CHANGELOG.md | 6 ++ .../DruidPartialDataRequestHandler.java | 53 +++++++++++++ .../web/handlers/workflow/DruidWorkflow.java | 15 ++++ .../DruidPartialDataRequestHandlerSpec.groovy | 54 ++++++++++++++ .../workflow/DruidWorkflowSpec.groovy | 74 +++++++++++++++++++ 5 files changed, 202 insertions(+) create mode 100644 fili-core/src/main/java/com/yahoo/bard/webservice/web/handlers/DruidPartialDataRequestHandler.java create mode 100644 fili-core/src/test/groovy/com/yahoo/bard/webservice/web/handlers/DruidPartialDataRequestHandlerSpec.groovy diff --git a/CHANGELOG.md b/CHANGELOG.md index 5044681fa8..1ba64dee19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ pull request if there was one. Current ------- ### Added: +- [Implement DruidPartialDataRequestHandler](https://github.com/yahoo/fili/pull/287) + * Implement `DruidPartialDataRequestHandler` that injects `druid_uncovered_interval_limit` into Druid query context + * Append `DruidPartialDataResponseProcessor` to the current next `ResponseProcessor` chain + * Add `DruidPartialDataRequestHandler` to `DruidWorkflow` between `AsyncDruidRequestHandler` and + `CacheV2RequestHandler` and invoke the `DruidPartialDataRequestHandler` if `druid_uncovered_interval_limit` is + greater than 0 - [Implement DruidPartialDataResponseProcessor](https://github.com/yahoo/fili/pull/275) * Add `FullResponseProcessor` interface that extends `ResponseProcessor` diff --git a/fili-core/src/main/java/com/yahoo/bard/webservice/web/handlers/DruidPartialDataRequestHandler.java b/fili-core/src/main/java/com/yahoo/bard/webservice/web/handlers/DruidPartialDataRequestHandler.java new file mode 100644 index 0000000000..457e48d0ef --- /dev/null +++ b/fili-core/src/main/java/com/yahoo/bard/webservice/web/handlers/DruidPartialDataRequestHandler.java @@ -0,0 +1,53 @@ +// Copyright 2017 Yahoo Inc. +// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms. +package com.yahoo.bard.webservice.web.handlers; + +import com.yahoo.bard.webservice.config.SystemConfig; +import com.yahoo.bard.webservice.config.SystemConfigProvider; +import com.yahoo.bard.webservice.druid.model.query.DruidAggregationQuery; +import com.yahoo.bard.webservice.web.DataApiRequest; +import com.yahoo.bard.webservice.web.responseprocessors.DruidPartialDataResponseProcessor; +import com.yahoo.bard.webservice.web.responseprocessors.ResponseProcessor; + +import javax.validation.constraints.NotNull; + +/** + * A request handler that builds responses for Druid partial data + *

+ * The handler inject "uncoveredIntervalsLimit: $druid_uncovered_interval_limit" context to Druid query. + */ +public class DruidPartialDataRequestHandler implements DataRequestHandler { + + private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance(); + + private final DataRequestHandler next; + private final int druidUncoveredIntervalLimit = SYSTEM_CONFIG.getIntProperty( + SYSTEM_CONFIG.getPackageVariableName("druid_uncovered_interval_limit"), 0 + ); + + /** + * Constructor. + * + * @param next Next Handler in the chain + */ + public DruidPartialDataRequestHandler(@NotNull DataRequestHandler next) { + this.next = next; + } + + @Override + public boolean handleRequest( + RequestContext context, + DataApiRequest request, + DruidAggregationQuery druidQuery, + ResponseProcessor response + ) { + return next.handleRequest( + context, + request, + druidQuery.withContext( + druidQuery.getContext().withUncoveredIntervalsLimit(druidUncoveredIntervalLimit) + ), + new DruidPartialDataResponseProcessor(response) + ); + } +} diff --git a/fili-core/src/main/java/com/yahoo/bard/webservice/web/handlers/workflow/DruidWorkflow.java b/fili-core/src/main/java/com/yahoo/bard/webservice/web/handlers/workflow/DruidWorkflow.java index a6ebf7b0e3..da6b384099 100644 --- a/fili-core/src/main/java/com/yahoo/bard/webservice/web/handlers/workflow/DruidWorkflow.java +++ b/fili-core/src/main/java/com/yahoo/bard/webservice/web/handlers/workflow/DruidWorkflow.java @@ -3,6 +3,8 @@ package com.yahoo.bard.webservice.web.handlers.workflow; import com.yahoo.bard.webservice.config.BardFeatureFlag; +import com.yahoo.bard.webservice.config.SystemConfig; +import com.yahoo.bard.webservice.config.SystemConfigProvider; import com.yahoo.bard.webservice.data.PartialDataHandler; import com.yahoo.bard.webservice.data.cache.DataCache; import com.yahoo.bard.webservice.data.volatility.VolatileIntervalsService; @@ -14,6 +16,7 @@ import com.yahoo.bard.webservice.web.handlers.CacheV2RequestHandler; import com.yahoo.bard.webservice.web.handlers.DataRequestHandler; import com.yahoo.bard.webservice.web.handlers.DebugRequestHandler; +import com.yahoo.bard.webservice.web.handlers.DruidPartialDataRequestHandler; import com.yahoo.bard.webservice.web.handlers.PaginationRequestHandler; import com.yahoo.bard.webservice.web.handlers.PartialDataRequestHandler; import com.yahoo.bard.webservice.web.handlers.SplitQueryRequestHandler; @@ -44,8 +47,14 @@ @Singleton public class DruidWorkflow implements RequestWorkflowProvider { + private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance(); + public static final String RESPONSE_WORKFLOW_TIMER = "ResponseWorkflow"; public static final String REQUEST_WORKFLOW_TIMER = "RequestWorkflow"; + private final int druidUncoveredIntervalLimit = SYSTEM_CONFIG.getIntProperty( + SYSTEM_CONFIG.getPackageVariableName("druid_uncovered_interval_limit"), + 0 + ); protected final @NotNull DataCache dataCache; protected final @NotNull DruidWebService uiWebService; @@ -99,6 +108,12 @@ public DataRequestHandler buildWorkflow() { DataRequestHandler uiHandler = new AsyncWebServiceRequestHandler(uiWebService, mapper); DataRequestHandler nonUiHandler = new AsyncWebServiceRequestHandler(nonUiWebService, mapper); + // If Druid sends uncoveredIntervals, missing intervals are checked before sending the request + if (druidUncoveredIntervalLimit > 0) { + uiHandler = new DruidPartialDataRequestHandler(uiHandler); + nonUiHandler = new DruidPartialDataRequestHandler(nonUiHandler); + } + // If query caching is enabled, the cache is checked before sending the request if (BardFeatureFlag.DRUID_CACHE.isOn()) { if (BardFeatureFlag.DRUID_CACHE_V2.isOn()) { diff --git a/fili-core/src/test/groovy/com/yahoo/bard/webservice/web/handlers/DruidPartialDataRequestHandlerSpec.groovy b/fili-core/src/test/groovy/com/yahoo/bard/webservice/web/handlers/DruidPartialDataRequestHandlerSpec.groovy new file mode 100644 index 0000000000..49be3d4bcd --- /dev/null +++ b/fili-core/src/test/groovy/com/yahoo/bard/webservice/web/handlers/DruidPartialDataRequestHandlerSpec.groovy @@ -0,0 +1,54 @@ +// Copyright 2017 Yahoo Inc. +// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms. +package com.yahoo.bard.webservice.web.handlers + +import com.yahoo.bard.webservice.config.SystemConfig +import com.yahoo.bard.webservice.config.SystemConfigProvider +import com.yahoo.bard.webservice.druid.model.datasource.DataSource +import com.yahoo.bard.webservice.druid.model.filter.Filter +import com.yahoo.bard.webservice.druid.model.having.Having +import com.yahoo.bard.webservice.druid.model.orderby.LimitSpec +import com.yahoo.bard.webservice.druid.model.query.DruidAggregationQuery +import com.yahoo.bard.webservice.druid.model.query.Granularity +import com.yahoo.bard.webservice.druid.model.query.GroupByQuery +import com.yahoo.bard.webservice.druid.model.query.QueryContext +import com.yahoo.bard.webservice.web.DataApiRequest +import com.yahoo.bard.webservice.web.responseprocessors.DruidPartialDataResponseProcessor +import com.yahoo.bard.webservice.web.responseprocessors.ResponseProcessor + +import spock.lang.Specification + +class DruidPartialDataRequestHandlerSpec extends Specification { + private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance(); + + def "New query context is passed to next handler"() { + given: + SystemConfig systemConfig = SystemConfigProvider.getInstance() + String uncoveredKey = SYSTEM_CONFIG.getPackageVariableName("druid_uncovered_interval_limit") + systemConfig.setProperty(uncoveredKey, '10') + + DataRequestHandler nextHandler = Mock(DataRequestHandler) + ResponseProcessor responseProcessor = Mock(ResponseProcessor) + RequestContext requestContext = Mock(RequestContext) + DruidAggregationQuery druidQuery = Mock(DruidAggregationQuery) + QueryContext queryContext = Mock(QueryContext) + DataApiRequest apiRequest = Mock(DataApiRequest) + + druidQuery.getContext() >> queryContext + + DruidPartialDataRequestHandler druidPartialDataRequestHandler = new DruidPartialDataRequestHandler( + nextHandler + ) + + when: + druidPartialDataRequestHandler.handleRequest(requestContext, apiRequest, druidQuery, responseProcessor) + + then: + 1 * druidQuery.withContext(queryContext) >> druidQuery + 1 * queryContext.withUncoveredIntervalsLimit(10) >> queryContext + 1 * nextHandler.handleRequest(requestContext, apiRequest, druidQuery, _ as DruidPartialDataResponseProcessor) + + cleanup: + systemConfig.clearProperty(uncoveredKey) + } +} diff --git a/fili-core/src/test/groovy/com/yahoo/bard/webservice/web/handlers/workflow/DruidWorkflowSpec.groovy b/fili-core/src/test/groovy/com/yahoo/bard/webservice/web/handlers/workflow/DruidWorkflowSpec.groovy index aa9f35ac0d..359a6f4ac8 100644 --- a/fili-core/src/test/groovy/com/yahoo/bard/webservice/web/handlers/workflow/DruidWorkflowSpec.groovy +++ b/fili-core/src/test/groovy/com/yahoo/bard/webservice/web/handlers/workflow/DruidWorkflowSpec.groovy @@ -6,6 +6,8 @@ import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_CACHE import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_CACHE_V2 import static com.yahoo.bard.webservice.config.BardFeatureFlag.QUERY_SPLIT +import com.yahoo.bard.webservice.config.SystemConfig +import com.yahoo.bard.webservice.config.SystemConfigProvider import com.yahoo.bard.webservice.data.PartialDataHandler import com.yahoo.bard.webservice.data.cache.DataCache import com.yahoo.bard.webservice.data.cache.TupleDataCache @@ -20,6 +22,7 @@ import com.yahoo.bard.webservice.web.handlers.CacheV2RequestHandler import com.yahoo.bard.webservice.web.handlers.DataRequestHandler import com.yahoo.bard.webservice.web.handlers.DebugRequestHandler import com.yahoo.bard.webservice.web.handlers.DefaultWebServiceHandlerSelector +import com.yahoo.bard.webservice.web.handlers.DruidPartialDataRequestHandler import com.yahoo.bard.webservice.web.handlers.SplitQueryRequestHandler import com.yahoo.bard.webservice.web.handlers.WebServiceSelectorRequestHandler import com.yahoo.bard.webservice.web.handlers.WeightCheckRequestHandler @@ -33,6 +36,7 @@ import spock.lang.Specification class DruidWorkflowSpec extends Specification { private static final ObjectMapper MAPPER = new ObjectMapper() .registerModule(new Jdk8Module().configureAbsentsAsNulls(false)) + private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance(); boolean cacheStatus boolean cacheV2Status @@ -48,10 +52,16 @@ class DruidWorkflowSpec extends Specification { QuerySigningService querySigningService = Mock(SegmentIntervalsHashIdGenerator) VolatileIntervalsService volatileIntervalsService = Mock(VolatileIntervalsService) + SystemConfig systemConfig + String uncoveredKey + def setup() { cacheStatus = DRUID_CACHE.isOn() cacheV2Status = DRUID_CACHE_V2.isOn() splittingStatus = QUERY_SPLIT.isOn() + + systemConfig = SystemConfigProvider.getInstance() + uncoveredKey = SYSTEM_CONFIG.getPackageVariableName("druid_uncovered_interval_limit") } def cleanup() { @@ -193,4 +203,68 @@ class DruidWorkflowSpec extends Specification { def byClass(Class c) { { it->it.class == c} } + + def "Test workflow contains DruidPartialDataRequestHandler when druidUncoveredIntervalLimit > 0"() { + setup: + systemConfig.setProperty(uncoveredKey, '10') + dw = new DruidWorkflow( + dataCache, + uiWebService, + nonUiWebService, + weightUtil, + physicalTableDictionary, + partialDataHandler, + querySigningService, + volatileIntervalsService, + MAPPER + ) + DataRequestHandler workflow = dw.buildWorkflow() + List handlers = getHandlerChain(workflow) + WebServiceSelectorRequestHandler select = handlers.find(byClass(WebServiceSelectorRequestHandler)) + def defaultHandler = select.handlerSelector as DefaultWebServiceHandlerSelector + + when: + def handlers1 = getHandlerChain(defaultHandler.uiWebServiceHandler.next) + def handlers2 = getHandlerChain(defaultHandler.nonUiWebServiceHandler.next) + + then: + handlers1.find(byClass(DruidPartialDataRequestHandler)) != null + handlers2.find(byClass(DruidPartialDataRequestHandler)) != null + + cleanup: + systemConfig.clearProperty(uncoveredKey) + } + + def "Test workflow doesn't contain DruidPartialDataRequestHandler when druidUncoveredIntervalLimit <= 0"() { + setup: + SystemConfig systemConfig = SystemConfigProvider.getInstance() + String uncoveredKey = SYSTEM_CONFIG.getPackageVariableName("druid_uncovered_interval_limit") + systemConfig.setProperty(uncoveredKey, '0') + dw = new DruidWorkflow( + dataCache, + uiWebService, + nonUiWebService, + weightUtil, + physicalTableDictionary, + partialDataHandler, + querySigningService, + volatileIntervalsService, + MAPPER + ) + DataRequestHandler workflow = dw.buildWorkflow() + List handlers = getHandlerChain(workflow) + WebServiceSelectorRequestHandler select = handlers.find(byClass(WebServiceSelectorRequestHandler)) + def defaultHandler = select.handlerSelector as DefaultWebServiceHandlerSelector + + when: + def handlers1 = getHandlerChain(defaultHandler.uiWebServiceHandler.next) + def handlers2 = getHandlerChain(defaultHandler.nonUiWebServiceHandler.next) + + then: + handlers1.find(byClass(DruidPartialDataRequestHandler)) == null + handlers2.find(byClass(DruidPartialDataRequestHandler)) == null + + cleanup: + systemConfig.clearProperty(uncoveredKey) + } }