Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRILL-7191 / DRILL-7026]: RM state blob persistence in Zookeeper and Integration of Distributed queue configuration with Planner #1762

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

HanumathRao
Copy link
Contributor

@HanumathRao HanumathRao commented Apr 22, 2019

This PR contains changes for the support of RM Framework both on execution and planning side, tracked by JIRA's DRILL-7191 and DRILL-7026.

  1. Refactoring existing ZK based queue to accommodate new Distributed queue for RM. Moved QueryResourceAllocators memory allocation code to utility classes like ZKQueueMemoryAllocationUtilities and DefaultMemoryAllocationUtilities. Refactored the Parallelizer code to accommodate the memory adjustment for the operators during parallelization phase. There are 3 different implementation of SimpleParallelizer such as ZKQueueParallelizer, DistributedQueueParallelizer and DefaultParallelizer which will be used by ZK based RM, Distributed RM and Non RM configuration.

  2. Planner integration with RM to select queue and reduce query level memory to be within queue limits. Changes to handle scenarios where buffered operator are at least getting minimum required memory allocation. Based on the calculated memory for each operator within each fragment it’s initial and maximum memory allocation is set which is later consumed by execution layer to enforce memory limits.

  3. Introduced new DrillNode class to deal with issues when DrillbitEndpoint is searched in a map using some of it’s field.

  4. Changes to support storing UUID for each Drillbit Service Instance locally to be used by planner and execution layer. This UUID is used to uniquely identify a Drillbit and register Drillbit information in the RM StateBlobs. Introduced a PersistentStore named ZookeeperTransactionalPersistenceStore with Transactional capabilities using Zookeeper Transactional API’s. This is used for updating RM State blobs as all the updates need to happen in transactional manner. Added RMStateBlobs definition and support for serde to Zookeeper. Implementation for DistributedRM and its corresponding QueryRM apis.

  5. Updated the state management of Query in Foreman so that same Foreman object can be submitted multiple times. Also introduced concept of 2 maps keeping track of waiting and running queries. These were done to support for async admit protocol which will be needed with Distributed RM.

  6. Support for serde of optimalMemoryAllocation for each operator in each minor fragment in QueryProfile. This is needed to verify the optimalMemory calculated by planner is correct.

HanumathRao and others added 3 commits April 22, 2019 13:42
…tion with Simple Parallelizer.

Refactor existing ZK based queue to accommodate new Distributed queue for RM.
Refactor and rename the existing memory allocation utilities to ZKQueueMemoryAllocationUtilities and DefaultMemoryAllocationUtilities.
Parallelizer code is changed to accommodate the memory adjustment for the operators during parallelization phase.
With this change, there are 3 different implementation of SimpleParallelizer; they are ZKQueueParallelizer, DistributedQueueParallelizer and DefaultParallelizer which will be used by ZK based RM, Distributed RM and Non RM configuration.
UUID support for DrillbitEndpoint
RMState Blobs definition, serialization and deserialization, Zookeeper client support for transactions
ZookeeperPersistentTransactional Store and RMStateBlobManager to do updates under lock
Protect running and waiting queries map in WorkerBee
…tion with Simple Parallelizer.

Integration changes with new DistributedRM queue configuration.
a) Remove the redundant NodeResource and merge the additional member functions with the NodeResources class.
b) Added new UUID logic and selection of a queue based on the memory requirement during parallelization phase.
c) Changed proto definitions to set the UUID of a drillbit.
d) Implementation of new DrillNode Wrapper over DrillbitEndpoint to fix the equality comparisions between DrillbitEndpoints.
@sohami sohami force-pushed the DRILL-7026-Final-PR branch 3 times, most recently from de9e5f7 to e9b4fa5 Compare April 23, 2019 00:21
sohami and others added 2 commits April 22, 2019 21:02
Added stubs for QueryResourceManager exit and wait/cleanup thread
Update MemoryCalculator to use DrillNode instead of DrillbitEndpoint
Changes to support localbit resource registration to cluster state blob using DrillbitStatusListener
Support ThrottledResourceManager via ResourceManagerBuilder
Add some E2E tests and RMStateBlobs tests along with some bug fixes
Fix TestRMConfigLoad tests to handle case where ZKQueues are explicitly enabled
…tion with Simple Parallelizer.

Changes to set the memory allocation per operator in query profile.
Addressing an memory minimization logic was not considering non-buffered operators.
Handling error cases when memory requirements for buffered or non-buffered cannot be reduced.
@sohami sohami changed the title DRILL-7191 Distributed state persistence and Integration of Distributed queue configuration with Planner [DRILL-7191 / DRILL-7026]: RM state blob persistence in Zookeeper and Integration of Distributed queue configuration with Planner Apr 23, 2019

public OpProfileDef(int operatorId, int operatorType, int incomingCount) {
public OpProfileDef(int operatorId, int operatorType, int incomingCount, long optimalMemoryAllocation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will all the creator of OpProfileDef always pass MaxAllocation for optimalMemoryAllocation ?

@@ -88,7 +88,7 @@ public OperatorStats(OperatorStats original, boolean isClean) {
}

@VisibleForTesting
public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) {
public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator, long initialAllocation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to rename initialAllocation to optimalMemoryAllocation

private final QueryContext queryContext;
private final long MINIMUM_MEMORY_FOR_BUFFER_OPERS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upper case variable name should be used only for constants, please change it to lower case.

endpoint.getUserPort() == otherEndpoint.getUserPort() &&
endpoint.getControlPort() == otherEndpoint.getControlPort() &&
endpoint.getDataPort() == otherEndpoint.getDataPort() &&
endpoint.getVersion().equals(otherEndpoint.getVersion());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like all the fields in DrillbitEndpoint are optional, so we should check if the field is present or not before calling equals on it. Just like done for hashCode() below or refer equals in generated file for DrillbitEndpoint.

.append(endpoint.getAddress())
.append("endpoint user port: ")
.append(endpoint.getUserPort()).toString();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if field is present or not before accessing it.

queryContext.getSession(), queryContext.getQueryContextInfo());
planner.visitPhysicalPlan(queryWorkUnit);
// planner.visitPhysicalPlan(queryWorkUnit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this commented line

@@ -79,6 +79,7 @@ message PlanFragment {
optional string options_json = 15;
optional QueryContextInformation context = 16;
repeated Collector collector = 17;
optional string endpointUUID = 18;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to rename this to assignedEndpointUUID since there are 2 endpoints as part of PlanFragment: assignedEndpoint and foremanEndpoint

* fragment is based on the cluster state and provided queue configuration.
*/
public class DistributedQueueParallelizer extends SimpleParallelizer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueueParallelizer.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

else {
return operator.getMaxAllocation();
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about below ? Also would be good to add a comment why memory for buffered operator is not retrieved from the PreCostEstimates.

return (endpoint, operator) -> {
  long operatorMemory = operator.getMaxAllocation();
  if (!planHasMemory) {
        final DrillNode drillEndpointNode = DrillNode.create(endpoint);
        if (operator.isBufferedOperator(queryContext)) {
          operatorMemory = operators.get(drillEndpointNode).get(operator);
        } else {
          operatorMemory = (long)operator.getCost().getMemoryCost();
        }
   }
   logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, operatorMemory);
   return operatorMemory;
};


return nodeMap;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of things:

  1. Why not construct Map<String, Collection<PhysicalOperator>> to start with instead of Multimap<DrillbitEndpoint, PhysicalOperator> and then converting it.
  2. bufferedOperators.asMap() is not guaranteed to work in the same way as intended since DrillbitEndpoint equals and hashCode methods are not reliable which is why we created DrillNode.

}

@Override
public Map<DrillbitEndpoint, String> getOnlineEndpointsUUID() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is duplicate code similar to that of the LocalClusterCoordinator. Can you move this function into a common place and use it in both the places?

getCache().rebuildNode(target);
} catch (final Exception e) {
throw new DrillRuntimeException("unable to put ", e);
}
}

public void createAsTransaction(List<String> paths) {
Preconditions.checkNotNull(paths, "no paths provided to create");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it also better to check for empty list of paths? Also it might be good to add a comment for this function.

* @param pathsWithData - map of blob paths to update and the final data
* @param version - version holder
*/
public void putAsTransaction(Map<String, byte[]> pathsWithData, DataChangeVersion version) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we currently use non null version. If not then can you please mention it in the comment that this is needed for future use.

String queryId, String foremanNode) throws Exception {
// Looks like leader hasn't changed yet so let's try to reserve the resources
// See if the call is to reserve or free up resources
Map<String, NodeResources> resourcesMap = queryResourceAssignment;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be changed to the following code.
Map<String, NodeResources> resourcesMap = queryResourceAssignment.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
(x) -> new NodeResources(x.getValue().getVersion(),
-x.getValue().getMemoryInBytes(),
-x.getValue().getNumVirtualCpu())));

public class RMConsistentBlobStoreManager implements RMBlobStoreManager {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RMConsistentBlobStoreManager.class);

private static final String RM_BLOBS_ROOT = "rm/blobs";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to start this path with "/" ?

totalDataBytes += entry.getValue().length;
}

// If total set operator payload is greater than 1MB then curator set operation will fail
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

greater than or equal to?

throw ex;
} finally {
// Check if the caller has acquired the mutex
if (globalBlobMutex.isAcquiredInThisProcess()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this check required. Shouldn't be the case that we should acquire the lock if we are here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants