Skip to content

Commit

Permalink
Implement LookupLoadTask
Browse files Browse the repository at this point in the history
  • Loading branch information
QubitPi committed Jan 31, 2018
1 parent 80ecc07 commit 72be3ea
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_COORDINATOR_METADATA;
import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_DIMENSIONS_LOADER;
import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_LOOKUP;
import static com.yahoo.bard.webservice.web.handlers.CacheRequestHandler.CACHE_HITS;
import static com.yahoo.bard.webservice.web.handlers.CacheRequestHandler.CACHE_REQUESTS;
import static com.yahoo.bard.webservice.web.handlers.DefaultWebServiceHandlerSelector.QUERY_REQUEST_TOTAL;
Expand All @@ -13,6 +14,7 @@
import com.yahoo.bard.webservice.application.healthchecks.AllDimensionsLoadedHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.DataSourceMetadataLoaderHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.DruidDimensionsLoaderHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.LookupHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.VersionHealthCheck;
import com.yahoo.bard.webservice.async.broadcastchannels.BroadcastChannel;
import com.yahoo.bard.webservice.async.broadcastchannels.SimpleBroadcastChannel;
Expand Down Expand Up @@ -71,6 +73,7 @@
import com.yahoo.bard.webservice.druid.util.SketchFieldConverter;
import com.yahoo.bard.webservice.metadata.DataSourceMetadataLoadTask;
import com.yahoo.bard.webservice.metadata.DataSourceMetadataService;
import com.yahoo.bard.webservice.metadata.LookupLoadTask;
import com.yahoo.bard.webservice.metadata.QuerySigningService;
import com.yahoo.bard.webservice.metadata.RequestedIntervalsFunction;
import com.yahoo.bard.webservice.metadata.SegmentIntervalsHashIdGenerator;
Expand Down Expand Up @@ -157,6 +160,7 @@ public abstract class AbstractBinderFactory implements BinderFactory {
private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance();

public static final String HEALTH_CHECK_NAME_DATASOURCE_METADATA = "datasource metadata loader";
public static final String HEALTH_CHECK_NAME_LOOKUP = "lookup loader";
public static final String HEALTH_CHECK_NAME_DRUID_DIM_LOADER = "druid dimensions loader";
public static final String HEALTH_CHECK_VERSION = "version";
public static final String HEALTH_CHECK_NAME_DIMENSION = "dimension check";
Expand Down Expand Up @@ -306,6 +310,13 @@ protected void configure() {
setupDataSourceMetaData(healthCheckRegistry, dataSourceMetadataLoader);
}

if (DRUID_LOOKUP.isOn()) {
setupLookUp(
healthCheckRegistry,
buildLookupLoader(metadataDruidWebService, loader.getDimensionDictionary())
);
}

bind(querySigningService).to(QuerySigningService.class);

bind(buildJobRowBuilder()).to(JobRowBuilder.class);
Expand Down Expand Up @@ -726,6 +737,19 @@ protected DataSourceMetadataLoadTask buildDataSourceMetadataLoader(
);
}

/**
* Builds a lookup loader.
*
* @param webService The web service used by the loader to query druid for lookup statuses.
* @param dimensionDictionary A {@link com.yahoo.bard.webservice.data.dimension.DimensionDictionary} that is used
* to obtain a list of lookups in Fili.
*
* @return a lookup loader
*/
protected LookupLoadTask buildLookupLoader(DruidWebService webService, DimensionDictionary dimensionDictionary) {
return new LookupLoadTask(webService, dimensionDictionary);
}

/**
* Build a DimensionValueLoadTask.
*
Expand Down Expand Up @@ -768,6 +792,17 @@ protected final void setupDataSourceMetaData(
healthCheckRegistry.register(HEALTH_CHECK_NAME_DATASOURCE_METADATA, dataSourceMetadataLoaderHealthCheck);
}

/**
* Schedule a lookup loader and register its health check.
*
* @param healthCheckRegistry The health check registry to register lookup health checks.
* @param lookupLoadTask The {@link com.yahoo.bard.webservice.metadata.LookupLoadTask} to use.
*/
protected final void setupLookUp(HealthCheckRegistry healthCheckRegistry, LookupLoadTask lookupLoadTask) {
scheduleLoader(lookupLoadTask);
healthCheckRegistry.register(HEALTH_CHECK_NAME_LOOKUP, new LookupHealthCheck(lookupLoadTask));
}

/**
* Schedule DimensionValueLoadTask and register its health check.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2018 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.application.healthchecks;

import com.yahoo.bard.webservice.metadata.LookupLoadTask;

import com.codahale.metrics.health.HealthCheck;

import java.util.Set;

import javax.inject.Singleton;

/**
* Check load statuses of all Druid lookups.
*/
@Singleton
public class LookupHealthCheck extends HealthCheck {
private final LookupLoadTask lookupLoadTask;

/**
* Constructor.
*
* @param lookupLoadTask A {@link com.yahoo.bard.webservice.metadata.LookupLoadTask} that keeps load statuses of
* all Druid lookups
*/
public LookupHealthCheck(LookupLoadTask lookupLoadTask) {
this.lookupLoadTask = lookupLoadTask;
}

@Override
public Result check() {
Set<String> unloadedLookups = lookupLoadTask.getUnloadedLookups();
return unloadedLookups.isEmpty()
? Result.healthy("All Druid lookups have been loaded.")
: Result.unhealthy("Lookups %s are not loaded.", unloadedLookups);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public enum BardFeatureFlag implements FeatureFlag {
INTERSECTION_REPORTING("intersection_reporting_enabled"),
UPDATED_METADATA_COLLECTION_NAMES("updated_metadata_collection_names_enabled"),
DRUID_COORDINATOR_METADATA("druid_coordinator_metadata_enabled"),
DRUID_LOOKUP("druid_lookup_enabled"),
DRUID_DIMENSIONS_LOADER("druid_dimensions_loader_enabled"),
CASE_SENSITIVE_KEYS("case_sensitive_keys_enabled");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2018 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.metadata;

import com.yahoo.bard.webservice.application.LoadTask;
import com.yahoo.bard.webservice.config.SystemConfig;
import com.yahoo.bard.webservice.config.SystemConfigProvider;
import com.yahoo.bard.webservice.data.dimension.DimensionDictionary;
import com.yahoo.bard.webservice.data.dimension.impl.LookupDimension;
import com.yahoo.bard.webservice.druid.client.DruidWebService;
import com.yahoo.bard.webservice.druid.client.FailureCallback;
import com.yahoo.bard.webservice.druid.client.HttpErrorCallback;
import com.yahoo.bard.webservice.druid.client.SuccessCallback;

import com.fasterxml.jackson.databind.JsonNode;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Lookup Load task sends requests to Druid coordinator and returns list of configured lookup statuses in Druid.
*/
public class LookupLoadTask extends LoadTask<Boolean> {
private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance();

/**
* Location of lookup statuses on Druid coordinator.
*/
public static final String LOOKUP_QUERY = "/lookups/status/__default";
/**
* Time between 2 consecutive lookup loading call in milliseconds.
*/
public static final String LOOKUP_NORMAL_CHECKING_PERIOD_KEY = SYSTEM_CONFIG.getPackageVariableName(
"lookup_normal_checking_period"
);
/**
* Wait on https://github.com/yahoo/fili/issues/619.
*/
public static final String LOOKUP_ERROR_CHECKING_PERIOD_KEY = SYSTEM_CONFIG.getPackageVariableName(
"lookup_error_checking_period"
);
/**
* Parameter specifying the delay before the first run of {@link LookupLoadTask}, in milliseconds.
*/
public static final String INITIAL_LOOKUP_CHECKING_DELAY = SYSTEM_CONFIG.getPackageVariableName(
"initial_lookup_checking_delay"
);

private final DruidWebService druidClient;
private final DimensionDictionary dimensionDictionary;
private final SuccessCallback successCallback;
private final FailureCallback failureCallback;
private final HttpErrorCallback errorCallback;
private Set<String> unloadedLookups;

/**
* Constructor.
*
* @param druidClient The client to query Druid coordinator
* @param dimensionDictionary A {@link com.yahoo.bard.webservice.data.dimension.DimensionDictionary} that is used
* to obtain a list of lookups in Fili.
*/
public LookupLoadTask(DruidWebService druidClient, DimensionDictionary dimensionDictionary) {
super(
LookupLoadTask.class.getSimpleName(),
SYSTEM_CONFIG.getLongProperty(INITIAL_LOOKUP_CHECKING_DELAY, 0),
SYSTEM_CONFIG.getLongProperty(LOOKUP_NORMAL_CHECKING_PERIOD_KEY, TimeUnit.MINUTES.toMillis(1))
);
this.druidClient = druidClient;
this.dimensionDictionary = dimensionDictionary;
this.successCallback = buildLookupSuccessCallback();
this.failureCallback = getFailureCallback();
this.errorCallback = getErrorCallback();
}

@Override
public void run() {
// download load statuses of all lookups
druidClient.getJsonObject(successCallback, errorCallback, failureCallback, LOOKUP_QUERY);
}

/**
* Returns a set of lookup namespaces that have not been loaded to Druid yet.
*
* @return the set of lookup namespaces that have not been loaded to Druid yet
*/
public Set<String> getUnloadedLookups() {
return unloadedLookups;
}

/**
* Returns a callback that has actions on lookups from a successful Druid response.
* <p>
* The callback obtains a complete list of configured lookups from Druid coordinator, compares this list against
* the list of lookups configured in Fili, and finds all lookup namespace names that are either not loaded yet in
* Druid or does not exist in Druid at all. These namespaces can be retrieved later by calling
* {@link #getUnloadedLookups()}.
*
* @return the callback that has actions on lookups from a successful Druid response
*/
protected SuccessCallback buildLookupSuccessCallback() {
return rootNode -> {
Map<String, Boolean> lookupStatuses = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> entries = rootNode.fields();
while (entries.hasNext()) {
Map.Entry<String, JsonNode> entry = entries.next();
lookupStatuses.put(entry.getKey(), entry.getValue().get("loaded").asBoolean());
}

unloadedLookups = dimensionDictionary.findAll().stream()
.filter(dimension -> dimension instanceof LookupDimension)
.map(dimension -> (LookupDimension) dimension)
.map(LookupDimension::getNamespaces)
.flatMap(List::stream)
.filter(namespace -> !lookupStatuses.containsKey(namespace) || !lookupStatuses.get(namespace))
.collect(Collectors.toSet());
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class FeatureFlagRegistrySpec extends Specification {
values == ["partial_data_enabled", "druid_cache_enabled", "druid_cache_v2_enabled", "query_split_enabled",
"top_n_enabled", "data_filter_substring_operations_enabled", "intersection_reporting_enabled",
"updated_metadata_collection_names_enabled", "druid_coordinator_metadata_enabled",
"druid_dimensions_loader_enabled", "case_sensitive_keys_enabled"] as Set
"druid_lookup_enabled", "druid_dimensions_loader_enabled", "case_sensitive_keys_enabled"] as Set
}

@Unroll
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2018 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.metadata

import com.yahoo.bard.webservice.data.dimension.DimensionDictionary
import com.yahoo.bard.webservice.data.dimension.impl.LookupDimension
import com.yahoo.bard.webservice.druid.client.DruidWebService
import com.yahoo.bard.webservice.models.druid.client.impl.TestDruidWebService

import spock.lang.Specification

class LookupLoadTaskSpec extends Specification {
DruidWebService druidClient

LookupDimension lookupDimension
DimensionDictionary dimensionDictionary

LookupLoadTask lookupLoadTask

def setup() {
druidClient = new TestDruidWebService()
druidClient.jsonResponse = {
"""
{
"loadedLookup": {
"loaded": true
},
"unLoadedLookup": {
"loaded": false
}
}
"""
}

lookupDimension = Mock(LookupDimension)
lookupDimension.getNamespaces() >> ["loadedLookup", "unLoadedLookup", "LookupNotInDruid"]

lookupLoadTask = new LookupLoadTask(druidClient, new DimensionDictionary([lookupDimension] as Set))
}

def "LookupLoadTask, when runs, finds un-loaded lookups"() {
when:
lookupLoadTask.run()

then:
lookupLoadTask.getUnloadedLookups() == ["unLoadedLookup", "LookupNotInDruid"] as Set
}
}

0 comments on commit 72be3ea

Please sign in to comment.