Skip to content

Commit

Permalink
DRILL-2362: Profile Mgmt
Browse files Browse the repository at this point in the history
1. Write to Index
2. Read chronologically from Partitioned Dirs
3. Leverage Guava Cache
4. Infer which partitioned dir has a profile based on queryId alone
5. Trace Exception [qId: 259432dc-7f8e-8fc5-af69-16a1ca817689 ] 

TODO: 
1. Auto Index for 1st time (In batches of 10000) from root dir (sync if Distributed)
2. Figure out if need to maintain profileSet cache and how!
  a. Flag to indicate change i.e. writeIncrementor
redefine ranges


Short circuit path exploration


Init for Indexer


Clean up


Stats Range


added s3 support
  • Loading branch information
Kunal Khatua committed Apr 15, 2019
1 parent 494c206 commit 52855ae
Show file tree
Hide file tree
Showing 10 changed files with 820 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ private ExecConstants() {
public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
public static final String PROFILES_STORE_INMEMORY = "drill.exec.profiles.store.inmemory";
public static final String PROFILES_STORE_CAPACITY = "drill.exec.profiles.store.capacity";
public static final String PROFILES_STORE_CACHE_SIZE = "drill.exec.profiles.store.cache.size";
public static final String PROFILES_STORE_INDEX_ENABLED = "drill.exec.profiles.store.index.enabled";
public static final String PROFILES_STORE_INDEX_FORMAT = "drill.exec.profiles.store.index.format";
public static final String PROFILES_STORE_INDEX_MAX = "drill.exec.profiles.store.index.max";
public static final String PROFILES_STORE_INDEX_SUPPORTED_FS = "drill.exec.profiles.store.index.supported.fs";
public static final String IMPERSONATION_ENABLED = "drill.exec.impersonation.enabled";
public static final String IMPERSONATION_MAX_CHAINED_USER_HOPS = "drill.exec.impersonation.max_chained_user_hops";
public static final String AUTHENTICATION_MECHANISMS = "drill.exec.security.auth.mechanisms";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.server.DrillbitStateManager.DrillbitState;
import org.apache.drill.exec.server.options.OptionDefinition;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.server.options.OptionValue.OptionScope;
import org.apache.drill.exec.server.profile.ProfileIndexer;
import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.server.rest.WebServer;
import org.apache.drill.exec.service.ServiceEngine;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
import org.apache.drill.exec.store.sys.store.LocalPersistentStore;
import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider;
import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
Expand Down Expand Up @@ -235,6 +239,15 @@ public void run() throws Exception {
shutdownHook = new ShutdownThread(this, new StackTrace());
Runtime.getRuntime().addShutdownHook(shutdownHook);
gracefulShutdownThread.start();

// Launch an archiving job that is # files and time bound
PersistentStore<QueryProfile> queryProfileStore = drillbitContext.getProfileStoreContext().getCompletedProfileStore();
if (queryProfileStore instanceof LocalPersistentStore
&& context.getConfig().getBoolean(ExecConstants.PROFILES_STORE_INDEX_ENABLED)) {
ProfileIndexer profileIndexer = new ProfileIndexer(coord, drillbitContext);
profileIndexer.indexProfiles();
}

logger.info("Startup completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.server.profile;

import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;

import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.DistributedSemaphore;
import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
import org.apache.drill.exec.coord.zk.ZkDistributedSemaphore;
import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.QueryProfileStoreContext;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.store.DrillSysFilePathFilter;
import org.apache.drill.exec.store.sys.store.LocalPersistentStore;
import org.apache.drill.exec.store.sys.store.ProfileSet;
import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Manage profiles by archiving
*/
public class ProfileIndexer {
private static final Logger logger = LoggerFactory.getLogger(ProfileIndexer.class);
private static final String lockPathString = "/profileIndexer";
private static final int DRILL_SYS_FILE_EXT_SIZE = DRILL_SYS_FILE_SUFFIX.length();

private final ZKClusterCoordinator zkCoord;
private final DrillFileSystem fs;
private final Path basePath;
private final ProfileSet profiles;
private final int indexingRate;
private final PathFilter sysFileSuffixFilter;
private SimpleDateFormat indexedPathFormat;
private final boolean useZkCoordinatedManagement;
private DrillConfig drillConfig;

private PersistentStoreConfig<QueryProfile> pStoreConfig;
private LocalPersistentStore<QueryProfile> completedProfileStore;
private Stopwatch indexWatch;
private int indexedCount;
private int currentProfileCount;


/**
* ProfileIndexer
*/
public ProfileIndexer(ClusterCoordinator coord, DrillbitContext context) throws StoreException, IOException {
drillConfig = context.getConfig();

// FileSystem
try {
this.fs = inferFileSystem(drillConfig);
} catch (IOException ex) {
throw new StoreException("Unable to get filesystem", ex);
}

//Use Zookeeper for coordinated management
final List<String> supportedFS = drillConfig.getStringList(ExecConstants.PROFILES_STORE_INDEX_SUPPORTED_FS);
if (this.useZkCoordinatedManagement = supportedFS.contains(fs.getScheme())) {
this.zkCoord = (ZKClusterCoordinator) coord;
} else {
this.zkCoord = null;
}

// Query Profile Store
QueryProfileStoreContext pStoreContext = context.getProfileStoreContext();
this.completedProfileStore = (LocalPersistentStore<QueryProfile>) pStoreContext.getCompletedProfileStore();
this.pStoreConfig = pStoreContext.getProfileStoreConfig();
this.basePath = completedProfileStore.getBasePath();

this.indexingRate = drillConfig.getInt(ExecConstants.PROFILES_STORE_INDEX_MAX);
this.profiles = new ProfileSet(indexingRate);
this.indexWatch = Stopwatch.createUnstarted();
this.sysFileSuffixFilter = new DrillSysFilePathFilter();
String indexPathPattern = drillConfig.getString(ExecConstants.PROFILES_STORE_INDEX_FORMAT);
this.indexedPathFormat = new SimpleDateFormat(indexPathPattern);
logger.info("Organizing any existing unindexed profiles");
}


/**
* Index profiles
*/
public void indexProfiles() {
this.indexWatch.start();

// Acquire lock IFF required
if (useZkCoordinatedManagement) {
DistributedSemaphore indexerMutex = new ZkDistributedSemaphore(zkCoord.getCurator(), lockPathString, 1);
try (DistributedLease lease = indexerMutex.acquire(0, TimeUnit.SECONDS)) {
if (lease != null) {
listAndIndex();
} else {
logger.debug("Couldn't get a lease acquisition");
}
} catch (Exception e) {
//DoNothing since lease acquisition failed
logger.error("Exception during lease-acquisition:: {}", e);
}
} else {
try {
listAndIndex();
} catch (IOException e) {
logger.error("Failed to index: {}", e);
}
}
logger.info("Successfully indexed {} of {} profiles during startup in {} seconds", indexedCount, currentProfileCount, this.indexWatch.stop().elapsed(TimeUnit.SECONDS));
}


//Lists and Indexes the latest profiles
private void listAndIndex() throws IOException {
currentProfileCount = listForArchiving();
indexedCount = 0;
logger.info("Found {} profiles that need to be indexed. Will attempt to index {} profiles", currentProfileCount,
(currentProfileCount > this.indexingRate) ? this.indexingRate : currentProfileCount);

// Track MRU index paths
Map<String, Path> mruIndexPath = new HashMap<>();
if (currentProfileCount > 0) {
while (!this.profiles.isEmpty()) {
String profileToIndex = profiles.removeYoungest() + DRILL_SYS_FILE_SUFFIX;
Path srcPath = new Path(basePath, profileToIndex);
long profileStartTime = getProfileStart(srcPath);
if (profileStartTime < 0) {
logger.debug("Will skip indexing {}", srcPath);
continue;
}
String indexPath = indexedPathFormat.format(new Date(profileStartTime));
//Check if dest dir exists
Path indexDestPath = null;
if (!mruIndexPath.containsKey(indexPath)) {
indexDestPath = new Path(basePath, indexPath);
if (!fs.isDirectory(indexDestPath)) {
// Build dir
if (fs.mkdirs(indexDestPath)) {
mruIndexPath.put(indexPath, indexDestPath);
} else {
//Creation failed. Did someone else create?
if (fs.isDirectory(indexDestPath)) {
mruIndexPath.put(indexPath, indexDestPath);
}
}
} else {
mruIndexPath.put(indexPath, indexDestPath);
}
} else {
indexDestPath = mruIndexPath.get(indexPath);
}

//Attempt Move
boolean renameStatus = false;
if (indexDestPath != null) {
Path destPath = new Path(indexDestPath, profileToIndex);
renameStatus = DrillFileSystemUtil.rename(fs, srcPath, destPath);
if (renameStatus) {
indexedCount++;
}
}
if (indexDestPath == null || !renameStatus) {
// Stop attempting any more archiving since other StoreProviders might be archiving
logger.error("Move failed for {} [{} | {}]", srcPath, indexDestPath == null, renameStatus);
continue;
}
}
}
}

// Deserialized and extract the profile's start time
private long getProfileStart(Path srcPath) {
try (InputStream is = fs.open(srcPath)) {
QueryProfile profile = pStoreConfig.getSerializer().deserialize(IOUtils.toByteArray(is));
return profile.getStart();
} catch (IOException e) {
logger.info("Unable to deserialize {}\n---{}====", srcPath, e.getMessage()); //Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: [B@f76ca5b; line: 1, column: 65538]
logger.info("deserialization RCA==> \n {}", ExceptionUtils.getRootCause(e));
}
return Long.MIN_VALUE;
}

// List all profiles in store's root and identify potential candidates for archiving
private int listForArchiving() throws IOException {
// Not performing recursive search of profiles
List<FileStatus> fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter );

int numProfilesInStore = 0;
for (FileStatus stat : fileStatuses) {
String profileName = stat.getPath().getName();
//Strip extension and store only query ID
profiles.add(profileName.substring(0, profileName.length() - DRILL_SYS_FILE_EXT_SIZE), false);
numProfilesInStore++;
}

return numProfilesInStore;
}

// Infers File System of Local Store
private DrillFileSystem inferFileSystem(DrillConfig drillConfig) throws IOException {
boolean hasZkBlobRoot = drillConfig.hasPath(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT);
final Path blobRoot = hasZkBlobRoot ?
new org.apache.hadoop.fs.Path(drillConfig.getString(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) :
LocalPersistentStore.getLogDir();

return LocalPersistentStore.getFileSystem(drillConfig, blobRoot);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import javax.ws.rs.core.UriInfo;
import javax.xml.bind.annotation.XmlRootElement;


import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
Expand Down Expand Up @@ -90,7 +91,7 @@ public static class ProfileInfo implements Comparable<ProfileInfo> {

public ProfileInfo(DrillConfig drillConfig, String queryId, long startTime, long endTime, String foreman, String query,
String state, String user, double totalCost, String queueName) {
this.queryId = queryId;
this.queryId = queryId.substring(queryId.lastIndexOf('/') + 1);
this.startTime = startTime;
this.endTime = endTime;
this.time = new Date(startTime);
Expand Down Expand Up @@ -378,7 +379,7 @@ public Viewable getProfile(@PathParam("queryid") String queryId){
ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId), work.getContext().getConfig());
return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/profile.ftl", sc, wrapper);
} catch (Exception | Error e) {
logger.error("Exception was thrown when fetching profile {} :\n{}", queryId, e);
logger.error("Exception was thrown when fetching profile {} :\n{}\n====\n", queryId, e);
return ViewableWithPermissions.create(authEnabled.get(), "/rest/errorMessage.ftl", sc, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.sys.store;

import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

/**
* Filter for Drill System Files
*/
public class DrillSysFilePathFilter implements PathFilter {

//NOTE: The filename is a combination of query ID (which is monotonically
//decreasing value derived off epoch timestamp) and a random value. This
//filter helps eliminate that list
String cutoffFileName = null;
public DrillSysFilePathFilter() {}

public DrillSysFilePathFilter(String cutoffSysFileName) {
if (cutoffSysFileName != null) {
this.cutoffFileName = cutoffSysFileName + DRILL_SYS_FILE_SUFFIX;
}
}

/* (non-Javadoc)
* @see org.apache.hadoop.fs.PathFilter#accept(org.apache.hadoop.fs.Path)
*/
@Override
public boolean accept(Path file){
if (file.getName().endsWith(DRILL_SYS_FILE_SUFFIX)) {
if (cutoffFileName != null) {
return (file.getName().compareTo(cutoffFileName) <= 0);
} else {
return true;
}
}
return false;
}
}
Loading

0 comments on commit 52855ae

Please sign in to comment.