Skip to content

Commit

Permalink
Implement LookupMetadataLoadTask
Browse files Browse the repository at this point in the history
  • Loading branch information
QubitPi authored Feb 12, 2018
1 parent b436653 commit b84f494
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ Current

### Added:

- [Implement LookupLoadTask](https://github.com/yahoo/fili/pull/620)
* Add capability for Fili to check load statuses of Druid lookups.

- [Extraction Function on selector filter](https://github.com/yahoo/fili/pull/617)
* Added extraction function on dimensional filter, defaults to extraction function on dimension if it exists.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_COORDINATOR_METADATA;
import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_DIMENSIONS_LOADER;
import static com.yahoo.bard.webservice.config.BardFeatureFlag.DRUID_LOOKUP_METADATA;
import static com.yahoo.bard.webservice.web.handlers.CacheRequestHandler.CACHE_HITS;
import static com.yahoo.bard.webservice.web.handlers.CacheRequestHandler.CACHE_REQUESTS;
import static com.yahoo.bard.webservice.web.handlers.DefaultWebServiceHandlerSelector.QUERY_REQUEST_TOTAL;
Expand All @@ -13,6 +14,7 @@
import com.yahoo.bard.webservice.application.healthchecks.AllDimensionsLoadedHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.DataSourceMetadataLoaderHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.DruidDimensionsLoaderHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.LookupHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.VersionHealthCheck;
import com.yahoo.bard.webservice.async.broadcastchannels.BroadcastChannel;
import com.yahoo.bard.webservice.async.broadcastchannels.SimpleBroadcastChannel;
Expand Down Expand Up @@ -71,6 +73,7 @@
import com.yahoo.bard.webservice.druid.util.SketchFieldConverter;
import com.yahoo.bard.webservice.metadata.DataSourceMetadataLoadTask;
import com.yahoo.bard.webservice.metadata.DataSourceMetadataService;
import com.yahoo.bard.webservice.metadata.LookupMetadataLoadTask;
import com.yahoo.bard.webservice.metadata.QuerySigningService;
import com.yahoo.bard.webservice.metadata.RequestedIntervalsFunction;
import com.yahoo.bard.webservice.metadata.SegmentIntervalsHashIdGenerator;
Expand Down Expand Up @@ -157,6 +160,7 @@ public abstract class AbstractBinderFactory implements BinderFactory {
private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance();

public static final String HEALTH_CHECK_NAME_DATASOURCE_METADATA = "datasource metadata loader";
public static final String HEALTH_CHECK_NAME_LOOKUP_METADATA = "lookup metadata loader";
public static final String HEALTH_CHECK_NAME_DRUID_DIM_LOADER = "druid dimensions loader";
public static final String HEALTH_CHECK_VERSION = "version";
public static final String HEALTH_CHECK_NAME_DIMENSION = "dimension check";
Expand Down Expand Up @@ -306,6 +310,13 @@ protected void configure() {
setupDataSourceMetaData(healthCheckRegistry, dataSourceMetadataLoader);
}

if (DRUID_LOOKUP_METADATA.isOn()) {
setupLookUpMetadataLoader(
healthCheckRegistry,
buildLookupMetaDataLoader(metadataDruidWebService, loader.getDimensionDictionary())
);
}

bind(querySigningService).to(QuerySigningService.class);

bind(buildJobRowBuilder()).to(JobRowBuilder.class);
Expand Down Expand Up @@ -726,6 +737,22 @@ protected DataSourceMetadataLoadTask buildDataSourceMetadataLoader(
);
}

/**
* Builds a lookup metadata loader.
*
* @param webService The web service used by the loader to query druid for lookup statuses.
* @param dimensionDictionary A {@link com.yahoo.bard.webservice.data.dimension.DimensionDictionary} that is used
* to obtain a list of lookups in Fili.
*
* @return a lookup metadata loader
*/
protected LookupMetadataLoadTask buildLookupMetaDataLoader(
DruidWebService webService,
DimensionDictionary dimensionDictionary
) {
return new LookupMetadataLoadTask(webService, dimensionDictionary);
}

/**
* Build a DimensionValueLoadTask.
*
Expand Down Expand Up @@ -768,6 +795,20 @@ protected final void setupDataSourceMetaData(
healthCheckRegistry.register(HEALTH_CHECK_NAME_DATASOURCE_METADATA, dataSourceMetadataLoaderHealthCheck);
}

/**
* Schedule a lookup metadata loader and register its health check.
*
* @param healthCheckRegistry The health check registry to register lookup health checks.
* @param lookupMetadataLoadTask The {@link LookupMetadataLoadTask} to use.
*/
protected final void setupLookUpMetadataLoader(
HealthCheckRegistry healthCheckRegistry,
LookupMetadataLoadTask lookupMetadataLoadTask
) {
scheduleLoader(lookupMetadataLoadTask);
healthCheckRegistry.register(HEALTH_CHECK_NAME_LOOKUP_METADATA, new LookupHealthCheck(lookupMetadataLoadTask));
}

/**
* Schedule DimensionValueLoadTask and register its health check.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2018 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.application.healthchecks;

import com.yahoo.bard.webservice.metadata.LookupMetadataLoadTask;

import com.codahale.metrics.health.HealthCheck;

import java.util.Set;

import javax.inject.Singleton;

/**
* Check load statuses of all Druid lookups.
*/
@Singleton
public class LookupHealthCheck extends HealthCheck {
private volatile LookupMetadataLoadTask lookupMetadataLoadTask;

/**
* Constructor.
*
* @param lookupMetadataLoadTask A {@link LookupMetadataLoadTask} that keeps load statuses of
* all Druid lookups
*/
public LookupHealthCheck(LookupMetadataLoadTask lookupMetadataLoadTask) {
this.lookupMetadataLoadTask = lookupMetadataLoadTask;
}

@Override
public Result check() {
Set<String> unloadedLookups = lookupMetadataLoadTask.getPendingLookups();
return unloadedLookups.isEmpty()
? Result.healthy("All Druid lookups have been loaded.")
: Result.unhealthy("Lookups %s are not loaded.", unloadedLookups);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public enum BardFeatureFlag implements FeatureFlag {
INTERSECTION_REPORTING("intersection_reporting_enabled"),
UPDATED_METADATA_COLLECTION_NAMES("updated_metadata_collection_names_enabled"),
DRUID_COORDINATOR_METADATA("druid_coordinator_metadata_enabled"),
DRUID_LOOKUP_METADATA("druid_lookup_metadata_enabled"),
DRUID_DIMENSIONS_LOADER("druid_dimensions_loader_enabled"),
CASE_SENSITIVE_KEYS("case_sensitive_keys_enabled");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2018 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.metadata;

import com.yahoo.bard.webservice.application.LoadTask;
import com.yahoo.bard.webservice.config.SystemConfig;
import com.yahoo.bard.webservice.config.SystemConfigProvider;
import com.yahoo.bard.webservice.data.dimension.DimensionDictionary;
import com.yahoo.bard.webservice.data.dimension.impl.LookupDimension;
import com.yahoo.bard.webservice.druid.client.DruidWebService;
import com.yahoo.bard.webservice.druid.client.FailureCallback;
import com.yahoo.bard.webservice.druid.client.HttpErrorCallback;
import com.yahoo.bard.webservice.druid.client.SuccessCallback;

import com.fasterxml.jackson.databind.JsonNode;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Lookup Load task sends requests to Druid coordinator and returns load statuses of lookup metadata in Druid.
*/
public class LookupMetadataLoadTask extends LoadTask<Boolean> {
private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance();

/**
* A comma separated string of lookup tiers.
* <p>
* See http://druid.io/docs/latest/querying/lookups.html for details on "tier".
*/
private static final String TIERS_KEY = SYSTEM_CONFIG.getPackageVariableName("druid_registered_lookup_tiers");
/**
* Location of lookup statuses on Druid coordinator.
*/
private static final String LOOKUP_QUERY_FORMAT = "/lookups/status/%s";
/**
* Time between 2 consecutive lookup loading call in milliseconds.
*/
private static final String LOOKUP_NORMAL_CHECKING_PERIOD_KEY = SYSTEM_CONFIG.getPackageVariableName(
"lookup_normal_checking_period"
);
/**
* Wait on https://github.com/yahoo/fili/issues/619.
*/
private static final String LOOKUP_ERROR_CHECKING_PERIOD_KEY = SYSTEM_CONFIG.getPackageVariableName(
"lookup_error_checking_period"
);
/**
* Parameter specifying the delay before the first run of {@link LookupMetadataLoadTask}, in milliseconds.
*/
private static final String INITIAL_LOOKUP_CHECKING_DELAY = SYSTEM_CONFIG.getPackageVariableName(
"initial_lookup_checking_delay"
);

private final DruidWebService druidClient;
private final DimensionDictionary dimensionDictionary;
private final SuccessCallback successCallback;
private final FailureCallback failureCallback;
private final HttpErrorCallback errorCallback;
private final Set<String> pendingLookups;

private Set<String> lookupTiers;


/**
* Constructor.
*
* @param druidClient The client to query Druid coordinator
* @param dimensionDictionary A {@link com.yahoo.bard.webservice.data.dimension.DimensionDictionary} that is used
* to obtain a list of lookups in Fili.
*/
public LookupMetadataLoadTask(DruidWebService druidClient, DimensionDictionary dimensionDictionary) {
super(
LookupMetadataLoadTask.class.getSimpleName(),
SYSTEM_CONFIG.getLongProperty(INITIAL_LOOKUP_CHECKING_DELAY, 0),
SYSTEM_CONFIG.getLongProperty(LOOKUP_NORMAL_CHECKING_PERIOD_KEY, TimeUnit.MINUTES.toMillis(1))
);
this.druidClient = druidClient;
this.dimensionDictionary = dimensionDictionary;
this.successCallback = buildLookupSuccessCallback();
this.failureCallback = getFailureCallback();
this.errorCallback = getErrorCallback();
this.pendingLookups = new HashSet<>();
this.lookupTiers = getTiers(SYSTEM_CONFIG.getStringProperty(TIERS_KEY, "__default"));
}

@Override
public void run() {
getPendingLookups().clear();
// download load statuses of all lookups of each lookup tier
lookupTiers.forEach(lookupTier ->
druidClient.getJsonObject(
successCallback,
errorCallback,
failureCallback,
String.format(LOOKUP_QUERY_FORMAT, lookupTier)
)
);
}

/**
* Returns a list of configured lookup tiers.
* <p>
* See http://druid.io/docs/latest/querying/lookups.html for details on "tier".
*
* @return the list of configured lookup tiers
*/
public Set<String> getLookupTiers() {
return lookupTiers;
}

/**
* Updates list of configured lookup tiers.
* <p>
* This method becomes useful when a new look up tier is added at runtime.
*
* @param lookupTiers A new list of configured lookup tiers
*/
public void setLookupTiers(Set<String> lookupTiers) {
this.lookupTiers = lookupTiers;
}

/**
* Returns a set of lookup namespaces that have not been loaded to Druid yet.
*
* @return the set of lookup namespaces that have not been loaded to Druid yet
*/
public Set<String> getPendingLookups() {
return pendingLookups;
}

/**
* Returns a callback that has actions on lookup metadata from a successful Druid response.
* <p>
* The callback obtains a complete list of configured lookups from Druid coordinator, compares this list against
* the list of lookups configured in Fili, and finds all lookup namespace names that are either not loaded yet in
* Druid or does not exist in Druid at all. These namespaces can be retrieved later by calling
* {@link #getPendingLookups()}.
*
* @return the callback that has actions on lookups from a successful Druid response
*/
protected SuccessCallback buildLookupSuccessCallback() {
return rootNode -> {
Map<String, Boolean> lookupStatuses = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> entries = rootNode.fields();
while (entries.hasNext()) {
Map.Entry<String, JsonNode> entry = entries.next();
lookupStatuses.put(entry.getKey(), entry.getValue().get("loaded").asBoolean());
}

pendingLookups.addAll(
dimensionDictionary.findAll().stream()
.filter(dimension -> dimension instanceof LookupDimension)
.map(dimension -> (LookupDimension) dimension)
.map(LookupDimension::getNamespaces)
.flatMap(List::stream)
.filter(namespace ->
!lookupStatuses.containsKey(namespace) || !lookupStatuses.get(namespace)
)
.collect(Collectors.toSet())
);
};
}

/**
* Returns a list of lookup tiers from a comma separated string.
* <p>
* For example, {@code "tier1,tier2,tier3"} becomes {@code ["tier1", "tier2", "tier3"]}.
*
* @param string The comma separated string of lookup tiers.
*
* @return the list of lookup tiers
*/
private static Set<String> getTiers(String string) {
return Arrays.stream(string.split(",")).collect(Collectors.toSet());
}
}
6 changes: 6 additions & 0 deletions fili-core/src/main/resources/moduleConfig.properties
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,9 @@ bard__druid_uncovered_interval_limit = 0
# The implementation of the com.yahoo.bard.webservice.logging.LogFormatter to use to format the RequestLog logging
# blocks. By default, the RequestLog is formatted as JSON.
bard__log_formatter_implementation=com.yahoo.bard.webservice.logging.JsonLogFormatter

# Druid
# A comma separated list of configured lookup tiers. See http://druid.io/docs/latest/querying/lookups.html
# The default is a list of 1 tier called "__default"
# If you have multiple tiers, write them like "tier1,tier2,tier3"
bard__druid_registered_lookup_tiers=__default
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class FeatureFlagRegistrySpec extends Specification {
values == ["partial_data_enabled", "druid_cache_enabled", "druid_cache_v2_enabled", "query_split_enabled",
"top_n_enabled", "data_filter_substring_operations_enabled", "intersection_reporting_enabled",
"updated_metadata_collection_names_enabled", "druid_coordinator_metadata_enabled",
"druid_dimensions_loader_enabled", "case_sensitive_keys_enabled"] as Set
"druid_lookup_metadata_enabled", "druid_dimensions_loader_enabled", "case_sensitive_keys_enabled"] as Set
}

@Unroll
Expand Down
Loading

0 comments on commit b84f494

Please sign in to comment.