Skip to content

Commit

Permalink
Implement LookupLoadTask
Browse files Browse the repository at this point in the history
  • Loading branch information
QubitPi committed Feb 8, 2018
1 parent b436653 commit 7500070
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ Current

### Added:

- [Implement LookupLoadTask](https://github.com/yahoo/fili/pull/620)
* Add capability for Fili to check load statuses of Druid lookups.

- [Extraction Function on selector filter](https://github.com/yahoo/fili/pull/617)
* Added extraction function on dimensional filter, defaults to extraction function on dimension if it exists.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_COORDINATOR_METADATA;
import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_DIMENSIONS_LOADER;
import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_LOOKUP_METADATA;
import static com.yahoo.bard.webservice.web.handlers.CacheRequestHandler.CACHE_HITS;
import static com.yahoo.bard.webservice.web.handlers.CacheRequestHandler.CACHE_REQUESTS;
import static com.yahoo.bard.webservice.web.handlers.DefaultWebServiceHandlerSelector.QUERY_REQUEST_TOTAL;
Expand All @@ -13,6 +14,7 @@
import com.yahoo.bard.webservice.application.healthchecks.AllDimensionsLoadedHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.DataSourceMetadataLoaderHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.DruidDimensionsLoaderHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.LookupHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.VersionHealthCheck;
import com.yahoo.bard.webservice.async.broadcastchannels.BroadcastChannel;
import com.yahoo.bard.webservice.async.broadcastchannels.SimpleBroadcastChannel;
Expand Down Expand Up @@ -71,6 +73,7 @@
import com.yahoo.bard.webservice.druid.util.SketchFieldConverter;
import com.yahoo.bard.webservice.metadata.DataSourceMetadataLoadTask;
import com.yahoo.bard.webservice.metadata.DataSourceMetadataService;
import com.yahoo.bard.webservice.metadata.LookupMetadataLoadTask;
import com.yahoo.bard.webservice.metadata.QuerySigningService;
import com.yahoo.bard.webservice.metadata.RequestedIntervalsFunction;
import com.yahoo.bard.webservice.metadata.SegmentIntervalsHashIdGenerator;
Expand Down Expand Up @@ -157,6 +160,7 @@ public abstract class AbstractBinderFactory implements BinderFactory {
private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance();

public static final String HEALTH_CHECK_NAME_DATASOURCE_METADATA = "datasource metadata loader";
public static final String HEALTH_CHECK_NAME_LOOKUP = "lookup loader";
public static final String HEALTH_CHECK_NAME_DRUID_DIM_LOADER = "druid dimensions loader";
public static final String HEALTH_CHECK_VERSION = "version";
public static final String HEALTH_CHECK_NAME_DIMENSION = "dimension check";
Expand Down Expand Up @@ -306,6 +310,13 @@ protected void configure() {
setupDataSourceMetaData(healthCheckRegistry, dataSourceMetadataLoader);
}

if (DRUID_LOOKUP_METADATA.isOn()) {
setupLookUpMetadataLoader(
healthCheckRegistry,
buildLookupMetaDataLoader(metadataDruidWebService, loader.getDimensionDictionary())
);
}

bind(querySigningService).to(QuerySigningService.class);

bind(buildJobRowBuilder()).to(JobRowBuilder.class);
Expand Down Expand Up @@ -726,6 +737,22 @@ protected DataSourceMetadataLoadTask buildDataSourceMetadataLoader(
);
}

/**
* Builds a lookup metadata loader.
*
* @param webService The web service used by the loader to query druid for lookup statuses.
* @param dimensionDictionary A {@link com.yahoo.bard.webservice.data.dimension.DimensionDictionary} that is used
* to obtain a list of lookups in Fili.
*
* @return a lookup loader
*/
protected LookupMetadataLoadTask buildLookupMetaDataLoader(
DruidWebService webService,
DimensionDictionary dimensionDictionary
) {
return new LookupMetadataLoadTask(webService, dimensionDictionary);
}

/**
* Build a DimensionValueLoadTask.
*
Expand Down Expand Up @@ -768,6 +795,20 @@ protected final void setupDataSourceMetaData(
healthCheckRegistry.register(HEALTH_CHECK_NAME_DATASOURCE_METADATA, dataSourceMetadataLoaderHealthCheck);
}

/**
* Schedule a lookup loader and register its health check.
*
* @param healthCheckRegistry The health check registry to register lookup health checks.
* @param lookupMetadataLoadTask The {@link LookupMetadataLoadTask} to use.
*/
protected final void setupLookUpMetadataLoader(
HealthCheckRegistry healthCheckRegistry,
LookupMetadataLoadTask lookupMetadataLoadTask
) {
scheduleLoader(lookupMetadataLoadTask);
healthCheckRegistry.register(HEALTH_CHECK_NAME_LOOKUP, new LookupHealthCheck(lookupMetadataLoadTask));
}

/**
* Schedule DimensionValueLoadTask and register its health check.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2018 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.application.healthchecks;

import com.yahoo.bard.webservice.metadata.LookupMetadataLoadTask;

import com.codahale.metrics.health.HealthCheck;

import java.util.Set;

import javax.inject.Singleton;

/**
* Check load statuses of all Druid lookups.
*/
@Singleton
public class LookupHealthCheck extends HealthCheck {
private final LookupMetadataLoadTask lookupMetadataLoadTask;

/**
* Constructor.
*
* @param lookupMetadataLoadTask A {@link LookupMetadataLoadTask} that keeps load statuses of
* all Druid lookups
*/
public LookupHealthCheck(LookupMetadataLoadTask lookupMetadataLoadTask) {
this.lookupMetadataLoadTask = lookupMetadataLoadTask;
}

@Override
public Result check() {
Set<String> unloadedLookups = lookupMetadataLoadTask.getPendingLookups();
return unloadedLookups.isEmpty()
? Result.healthy("All Druid lookups have been loaded.")
: Result.unhealthy("Lookups %s are not loaded.", unloadedLookups);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public enum BardFeatureFlag implements FeatureFlag {
INTERSECTION_REPORTING("intersection_reporting_enabled"),
UPDATED_METADATA_COLLECTION_NAMES("updated_metadata_collection_names_enabled"),
DRUID_COORDINATOR_METADATA("druid_coordinator_metadata_enabled"),
DRUID_LOOKUP_METADATA("druid_lookup_metadata_enabled"),
DRUID_DIMENSIONS_LOADER("druid_dimensions_loader_enabled"),
CASE_SENSITIVE_KEYS("case_sensitive_keys_enabled");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2018 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.metadata;

import com.yahoo.bard.webservice.application.LoadTask;
import com.yahoo.bard.webservice.config.SystemConfig;
import com.yahoo.bard.webservice.config.SystemConfigProvider;
import com.yahoo.bard.webservice.data.dimension.DimensionDictionary;
import com.yahoo.bard.webservice.data.dimension.impl.LookupDimension;
import com.yahoo.bard.webservice.druid.client.DruidWebService;
import com.yahoo.bard.webservice.druid.client.FailureCallback;
import com.yahoo.bard.webservice.druid.client.HttpErrorCallback;
import com.yahoo.bard.webservice.druid.client.SuccessCallback;

import com.fasterxml.jackson.databind.JsonNode;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Lookup Load task sends requests to Druid coordinator and returns list of configured lookup statuses in Druid.
*/
public class LookupMetadataLoadTask extends LoadTask<Boolean> {
private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance();

/**
* Location of lookup statuses on Druid coordinator.
*/
public static final String LOOKUP_QUERY = "/lookups/status/__default";
/**
* Time between 2 consecutive lookup loading call in milliseconds.
*/
public static final String LOOKUP_NORMAL_CHECKING_PERIOD_KEY = SYSTEM_CONFIG.getPackageVariableName(
"lookup_normal_checking_period"
);
/**
* Wait on https://github.com/yahoo/fili/issues/619.
*/
public static final String LOOKUP_ERROR_CHECKING_PERIOD_KEY = SYSTEM_CONFIG.getPackageVariableName(
"lookup_error_checking_period"
);
/**
* Parameter specifying the delay before the first run of {@link LookupMetadataLoadTask}, in milliseconds.
*/
public static final String INITIAL_LOOKUP_CHECKING_DELAY = SYSTEM_CONFIG.getPackageVariableName(
"initial_lookup_checking_delay"
);

private final DruidWebService druidClient;
private final DimensionDictionary dimensionDictionary;
private final SuccessCallback successCallback;
private final FailureCallback failureCallback;
private final HttpErrorCallback errorCallback;
private Set<String> pendingLookups;

/**
* Constructor.
*
* @param druidClient The client to query Druid coordinator
* @param dimensionDictionary A {@link com.yahoo.bard.webservice.data.dimension.DimensionDictionary} that is used
* to obtain a list of lookups in Fili.
*/
public LookupMetadataLoadTask(DruidWebService druidClient, DimensionDictionary dimensionDictionary) {
super(
LookupMetadataLoadTask.class.getSimpleName(),
SYSTEM_CONFIG.getLongProperty(INITIAL_LOOKUP_CHECKING_DELAY, 0),
SYSTEM_CONFIG.getLongProperty(LOOKUP_NORMAL_CHECKING_PERIOD_KEY, TimeUnit.MINUTES.toMillis(1))
);
this.druidClient = druidClient;
this.dimensionDictionary = dimensionDictionary;
this.successCallback = buildLookupSuccessCallback();
this.failureCallback = getFailureCallback();
this.errorCallback = getErrorCallback();
}

@Override
public void run() {
// download load statuses of all lookups
druidClient.getJsonObject(successCallback, errorCallback, failureCallback, LOOKUP_QUERY);
}

/**
* Returns a set of lookup namespaces that have not been loaded to Druid yet.
*
* @return the set of lookup namespaces that have not been loaded to Druid yet
*/
public Set<String> getPendingLookups() {
return pendingLookups;
}

/**
* Returns a callback that has actions on lookup metadata from a successful Druid response.
* <p>
* The callback obtains a complete list of configured lookups from Druid coordinator, compares this list against
* the list of lookups configured in Fili, and finds all lookup namespace names that are either not loaded yet in
* Druid or does not exist in Druid at all. These namespaces can be retrieved later by calling
* {@link #getPendingLookups()}.
*
* @return the callback that has actions on lookups from a successful Druid response
*/
protected SuccessCallback buildLookupSuccessCallback() {
return rootNode -> {
Map<String, Boolean> lookupStatuses = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> entries = rootNode.fields();
while (entries.hasNext()) {
Map.Entry<String, JsonNode> entry = entries.next();
lookupStatuses.put(entry.getKey(), entry.getValue().get("loaded").asBoolean());
}

pendingLookups = dimensionDictionary.findAll().stream()
.filter(dimension -> dimension instanceof LookupDimension)
.map(dimension -> (LookupDimension) dimension)
.map(LookupDimension::getNamespaces)
.flatMap(List::stream)
.filter(namespace -> !lookupStatuses.containsKey(namespace) || !lookupStatuses.get(namespace))
.collect(Collectors.toSet());
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class FeatureFlagRegistrySpec extends Specification {
values == ["partial_data_enabled", "druid_cache_enabled", "druid_cache_v2_enabled", "query_split_enabled",
"top_n_enabled", "data_filter_substring_operations_enabled", "intersection_reporting_enabled",
"updated_metadata_collection_names_enabled", "druid_coordinator_metadata_enabled",
"druid_dimensions_loader_enabled", "case_sensitive_keys_enabled"] as Set
"druid_lookup_metadata_enabled", "druid_dimensions_loader_enabled", "case_sensitive_keys_enabled"] as Set
}

@Unroll
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2018 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.metadata

import com.yahoo.bard.webservice.data.dimension.DimensionDictionary
import com.yahoo.bard.webservice.data.dimension.impl.LookupDimension
import com.yahoo.bard.webservice.druid.client.DruidWebService
import com.yahoo.bard.webservice.models.druid.client.impl.TestDruidWebService

import spock.lang.Specification

class LookupMetadataLoadTaskSpec extends Specification {
DruidWebService druidClient

LookupDimension lookupDimension
DimensionDictionary dimensionDictionary

LookupMetadataLoadTask lookupLoadTask

def setup() {
druidClient = new TestDruidWebService()
druidClient.jsonResponse = {
"""
{
"loadedLookup": {
"loaded": true
},
"pendingLookup": {
"loaded": false
}
}
"""
}

lookupDimension = Mock(LookupDimension)
lookupDimension.getNamespaces() >> ["loadedLookup", "pendingLookup", "LookupNotInDruid"]

lookupLoadTask = new LookupMetadataLoadTask(druidClient, new DimensionDictionary([lookupDimension] as Set))
}

def "LookupLoadTask, when runs, finds pending lookups"() {
when:
lookupLoadTask.run()

then:
lookupLoadTask.getPendingLookups() == ["pendingLookup", "LookupNotInDruid"] as Set
}
}

0 comments on commit 7500070

Please sign in to comment.