diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala index ad6187104da..8c569cd35e5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala @@ -25,7 +25,8 @@ import edu.uci.ics.amber.core.workflow._ import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{ DefaultResourceAllocator, - ExecutionClusterInfo + ExecutionClusterInfo, + GreedyResourceAllocator } import org.jgrapht.graph.DirectedAcyclicGraph import org.jgrapht.traverse.TopologicalOrderIterator @@ -125,13 +126,26 @@ abstract class ScheduleGenerator( def allocateResource( regionDAG: DirectedAcyclicGraph[Region, RegionLink] ): Unit = { + val allocatorType = ApplicationConfig.allocatorType + + val resourceAllocator = allocatorType match { + case "default" => + new DefaultResourceAllocator( + physicalPlan, + executionClusterInfo, + workflowContext.workflowSettings + ) + case "greedy" => + new GreedyResourceAllocator( + physicalPlan, + executionClusterInfo, + workflowContext.workflowSettings, + workflowContext + ) + case other => + throw new IllegalArgumentException(s"Unknown allocator type: $other") + } - val resourceAllocator = - new DefaultResourceAllocator( - physicalPlan, - executionClusterInfo, - workflowContext.workflowSettings - ) // generate the resource configs new TopologicalOrderIterator(regionDAG).asScala .foreach(region => { diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/config/WorkerConfig.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/config/WorkerConfig.scala index 14c89197967..41c95ff2645 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/config/WorkerConfig.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/config/WorkerConfig.scala @@ -25,7 +25,7 @@ import edu.uci.ics.amber.util.VirtualIdentityUtils import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity case object WorkerConfig { - def generateWorkerConfigs(physicalOp: PhysicalOp): List[WorkerConfig] = { + def generateDefaultWorkerConfigs(physicalOp: PhysicalOp): List[WorkerConfig] = { val workerCount = if (physicalOp.parallelizable) { physicalOp.suggestedWorkerNum match { // Keep suggested number of workers diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ExecutionClusterInfo.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ExecutionClusterInfo.scala index aea819a0a81..25883b2c5de 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ExecutionClusterInfo.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ExecutionClusterInfo.scala @@ -19,4 +19,9 @@ package edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies -class ExecutionClusterInfo() {} +import edu.uci.ics.amber.config.ApplicationConfig + +class ExecutionClusterInfo { + val availableNumberOfCores: Int = ApplicationConfig.availableCores + val coreToWorkerRatio: Double = ApplicationConfig.coreToWorkerRatio +} diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/GreedyResourceAllocator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/GreedyResourceAllocator.scala new file mode 100644 index 00000000000..3e57663fd27 --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/GreedyResourceAllocator.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies + +import edu.uci.ics.amber.config.ApplicationConfig +import edu.uci.ics.amber.core.storage.DocumentFactory +import edu.uci.ics.amber.core.tuple.Tuple +import edu.uci.ics.amber.core.workflow._ +import edu.uci.ics.amber.engine.architecture.scheduling.Region +import edu.uci.ics.amber.engine.architecture.scheduling.config._ +import edu.uci.ics.amber.util.VirtualIdentityUtils +import edu.uci.ics.texera.dao.SqlServer +import edu.uci.ics.texera.dao.SqlServer.withTransaction +import edu.uci.ics.texera.dao.jooq.generated.Tables.{WORKFLOW_EXECUTIONS, WORKFLOW_VERSION} + +import java.net.URI +import scala.collection.mutable + +class GreedyResourceAllocator( + physicalPlan: PhysicalPlan, + executionClusterInfo: ExecutionClusterInfo, + workflowSettings: WorkflowSettings, + workflowContext: WorkflowContext + ) extends ResourceAllocator { + + // a map of a physical link to the partition info of the upstream/downstream of this link + private val linkPartitionInfos = new mutable.HashMap[PhysicalLink, PartitionInfo]() + + private val operatorConfigs = new mutable.HashMap[PhysicalOpIdentity, OperatorConfig]() + private val linkConfigs = new mutable.HashMap[PhysicalLink, LinkConfig]() + + /** + * Allocates resources for a given region and its operators. + * + * This method calculates and assigns worker configurations for each operator + * in the region. + * For the operators that are parallelizable, it respects the + * suggested worker number if provided. + * Non-parallelizable operators are assigned a single worker. + * For parallelizable operators without suggestions, the slowest operator + * is assigned with one more worker. + * Automatic adjustment will not result the total number of workers exceeds + * configured core to worker ratio * configured cpu cores. + * @param region The region for which to allocate resources. + * @return A tuple containing: + * 1) A new Region instance with new resource configuration. + * 2) An estimated cost of the workflow with the new resource configuration, + * represented as a Double value (currently set to 0, but will be + * updated in the future). + */ + def allocate( + region: Region + ): (Region, Double) = { + val statsOpt = getOperatorExecutionStats(this.workflowContext.workflowId.id) + val maxWorkerUpperBound = executionClusterInfo.availableNumberOfCores * executionClusterInfo.coreToWorkerRatio + val operatorList = region.getOperators + + val opToOperatorConfigMapping: Map[PhysicalOpIdentity, OperatorConfig] = statsOpt match { + + case Some(stats) => + val opToWorkerList = mutable.HashMap[PhysicalOpIdentity, List[WorkerConfig]]() + + val greedyCandidates = operatorList.filter(op => + op.parallelizable && op.suggestedWorkerNum.isEmpty && stats.contains(op.id.logicalOpId.id) + ) + + val slowestOpIdStrOpt = + greedyCandidates + .flatMap(op => stats.get(op.id.logicalOpId.id).map { case (rt, _) => op.id.logicalOpId.id -> rt }) + .maxByOption(_._2) + .map(_._1) + + val basicWorkerCounts: Map[PhysicalOpIdentity, Int] = operatorList.map { op => + val opIdStr = op.id.logicalOpId.id + val baseCount = + if (!op.parallelizable) 1 + else if (op.suggestedWorkerNum.isDefined) op.suggestedWorkerNum.get + else if (stats.contains(opIdStr)) stats(opIdStr)._2 + else ApplicationConfig.numWorkerPerOperatorByDefault + op.id -> baseCount + }.toMap + + val currentTotal = basicWorkerCounts.values.sum + + val addingAllowed = slowestOpIdStrOpt.isDefined && currentTotal + 1 <= maxWorkerUpperBound + + operatorList.foreach { op => + val opIdStr = op.id.logicalOpId.id + val basicWorkerCount = basicWorkerCounts(op.id) + val updatedWorkerCount = + if (addingAllowed && slowestOpIdStrOpt.contains(opIdStr)) basicWorkerCount + 1 else basicWorkerCount + val workers = (0 until updatedWorkerCount).map { idx => + WorkerConfig( + VirtualIdentityUtils.createWorkerIdentity(op.workflowId, op.id, idx) + ) + }.toList + opToWorkerList(op.id) = workers + } + + opToWorkerList.map { + case (opId, workerList) => opId -> OperatorConfig(workerList) + }.toMap + + case None => + region.getOperators + .map(op => op.id -> OperatorConfig(WorkerConfig.generateDefaultWorkerConfigs(op))) + .toMap + } + + operatorConfigs ++= opToOperatorConfigMapping + + val updatedLinkPartitionInfos = propagatePartitionRequirement(region, physicalPlan, operatorConfigs.toMap, linkPartitionInfos.toMap) + + linkPartitionInfos ++= updatedLinkPartitionInfos + + val linkToLinkConfigMapping = + getLinkConfigs(region, operatorConfigs.toMap, linkPartitionInfos.toMap, workflowSettings) + + linkConfigs ++= linkToLinkConfigMapping + + val portConfigs: Map[GlobalPortIdentity, PortConfig] = getPortConfigs(region, operatorConfigs.toMap, workflowSettings) + + val resourceConfig = ResourceConfig( + opToOperatorConfigMapping, + linkToLinkConfigMapping, + portConfigs + ) + + (region.copy(resourceConfig = Some(resourceConfig)), 0) + } + + private def getOperatorExecutionStats(wid: Long): Option[Map[String, (Double, Int)]] = { + val uriOpt: Option[String] = + withTransaction(SqlServer.getInstance().createDSLContext()) { context => + val row = context + .select(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI) + .from(WORKFLOW_EXECUTIONS) + .join(WORKFLOW_VERSION) + .on(WORKFLOW_EXECUTIONS.VID.eq(WORKFLOW_VERSION.VID)) + .where( + WORKFLOW_VERSION.WID.eq(wid.toInt) + .and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte)) // 成功状态 + ) + .orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc()) + .limit(1) + .fetchOne() + + Option(row).map(_.get(WORKFLOW_EXECUTIONS.RUNTIME_STATS_URI)) + } + + uriOpt.flatMap { uriStr => + if (uriStr == null || uriStr.trim.isEmpty) { + None + } else { + val stats = readStatsFromUri(uriStr) + if (stats.isEmpty) None else Some(stats) + } + } + } + + + private def readStatsFromUri(uriStr: String): Map[String, (Double, Int)] = { + val uri = new URI(uriStr) + val document = DocumentFactory.openDocument(uri) + + document._1.get().foldLeft(Map.empty[String, (Double, Int)]) { (acc, tuple) => + val record = tuple.asInstanceOf[Tuple] + val operatorId = record.getField(0).asInstanceOf[String] + val dataProcessingTime = record.getField(6).asInstanceOf[Long] + val controlProcessingTime = record.getField(7).asInstanceOf[Long] + val workerNum = record.getField(9).asInstanceOf[Int] + val execTime = (dataProcessingTime + controlProcessingTime) / 1e9 / workerNum + + acc.get(operatorId) match { + case Some((prevTime, _)) => acc + (operatorId -> (Math.max(prevTime, execTime), workerNum)) + case None => acc + (operatorId -> (execTime, workerNum)) + } + } + } +} diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala index 7a7f9add58b..ee685338eda 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala @@ -19,126 +19,29 @@ package edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies -import edu.uci.ics.amber.core.virtualidentity.PhysicalOpIdentity import edu.uci.ics.amber.core.workflow._ import edu.uci.ics.amber.engine.architecture.scheduling.Region import edu.uci.ics.amber.engine.architecture.scheduling.config.ChannelConfig.generateChannelConfigs import edu.uci.ics.amber.engine.architecture.scheduling.config.LinkConfig.toPartitioning -import edu.uci.ics.amber.engine.architecture.scheduling.config.WorkerConfig.generateWorkerConfigs +import edu.uci.ics.amber.engine.architecture.scheduling.config.WorkerConfig.generateDefaultWorkerConfigs import edu.uci.ics.amber.engine.architecture.scheduling.config._ -import edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings.Partitioning import edu.uci.ics.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage import java.net.URI import scala.collection.mutable trait ResourceAllocator { - def allocate(region: Region): (Region, Double) -} - -class DefaultResourceAllocator( - physicalPlan: PhysicalPlan, - executionClusterInfo: ExecutionClusterInfo, - workflowSettings: WorkflowSettings -) extends ResourceAllocator { - - // a map of a physical link to the partition info of the upstream/downstream of this link - private val linkPartitionInfos = new mutable.HashMap[PhysicalLink, PartitionInfo]() - - private val operatorConfigs = new mutable.HashMap[PhysicalOpIdentity, OperatorConfig]() - private val linkConfigs = new mutable.HashMap[PhysicalLink, LinkConfig]() /** - * Allocates resources for a given region and its operators. + * Allocate resources for the given region (operator/link/port). + * Returns the region with a new ResourceConfig and an estimated cost. + * Different ResourceAllocator implementations may apply different methods; + * this one applies the default allocation method. * - * This method calculates and assigns worker configurations for each operator - * in the region. For the operators that are parallelizable, it respects the - * suggested worker number if provided. Otherwise, it falls back to a default - * value. Non-parallelizable operators are assigned a single worker. - * - * @param region The region for which to allocate resources. - * @return A tuple containing: - * 1) A new Region instance with new resource configuration. - * 2) An estimated cost of the workflow with the new resource configuration, - * represented as a Double value (currently set to 0, but will be - * updated in the future). + * @param region Region to allocate. + * @return (updated Region, estimated cost) */ - def allocate( - region: Region - ): (Region, Double) = { - - val opToOperatorConfigMapping = region.getOperators - .map(physicalOp => physicalOp.id -> OperatorConfig(generateWorkerConfigs(physicalOp))) - .toMap - - operatorConfigs ++= opToOperatorConfigMapping - - propagatePartitionRequirement(region) - - val linkToLinkConfigMapping = region.getLinks.map { physicalLink => - physicalLink -> LinkConfig( - generateChannelConfigs( - operatorConfigs(physicalLink.fromOpId).workerConfigs.map(_.workerId), - operatorConfigs(physicalLink.toOpId).workerConfigs.map(_.workerId), - toPortId = physicalLink.toPortId, - linkPartitionInfos(physicalLink) - ), - toPartitioning( - operatorConfigs(physicalLink.fromOpId).workerConfigs.map(_.workerId), - operatorConfigs(physicalLink.toOpId).workerConfigs.map(_.workerId), - linkPartitionInfos(physicalLink), - workflowSettings.dataTransferBatchSize - ) - ) - }.toMap - - linkConfigs ++= linkToLinkConfigMapping - - val portConfigs: Map[GlobalPortIdentity, PortConfig] = region.resourceConfig match { - case Some(existing) => - val upgradedInputPortConfigs: Map[GlobalPortIdentity, InputPortConfig] = - existing.portConfigs.collect { - case (globalPortId, rawInConfig: IntermediateInputPortConfig) if globalPortId.input => - val uris: List[URI] = rawInConfig.storageURIs - // derive partitionings for each upstream materialization - val portPartitionings: List[Partitioning] = uris.map { inputMatUri => - val toWorkerActorIds = - operatorConfigs(globalPortId.opId).workerConfigs.map(_.workerId) - val fromVirtualThreadActorIds = toWorkerActorIds.map(toWorkerActorId => - getFromActorIdForInputPortStorage(inputMatUri.toString, toWorkerActorId) - ) - // Extract the input port partitionInfo defined in the physicalOp, defaulting to UnknownPartition. - val inputPortPartitionInfo = region - .getOperator(globalPortId.opId) - .partitionRequirement - .applyOrElse(globalPortId.portId.id, (_: Int) => None) - .getOrElse(UnknownPartition()) - - toPartitioning( - fromVirtualThreadActorIds, - toWorkerActorIds, - inputPortPartitionInfo, - workflowSettings.dataTransferBatchSize - ) - } - // new InputPortConfig that carries both URIs and per-URI partitionings - globalPortId -> InputPortConfig(uris.zip(portPartitionings)) - } - - existing.portConfigs ++ upgradedInputPortConfigs - - case None => - Map.empty[GlobalPortIdentity, PortConfig] - } - - val resourceConfig = ResourceConfig( - opToOperatorConfigMapping, - linkToLinkConfigMapping, - portConfigs - ) - - (region.copy(resourceConfig = Some(resourceConfig)), 0) - } + def allocate(region: Region): (Region, Double) /** * This method propagates partitioning requirements in the PhysicalPlan DAG. @@ -152,7 +55,13 @@ class DefaultResourceAllocator( * The link A->HJ will be propagated in the first region. The link B->HJ will be propagated in the second region. * The output partition info of HJ will be derived after both links are propagated, which is in the second region. */ - private def propagatePartitionRequirement(region: Region): Unit = { + def propagatePartitionRequirement( + region: Region, + physicalPlan: PhysicalPlan, + operatorConfigs: Map[PhysicalOpIdentity, OperatorConfig], + seedLinkPartitions: Map[PhysicalLink, PartitionInfo] = Map.empty + ): Map[PhysicalLink, PartitionInfo] = { + val linkPartitionInfos = mutable.HashMap[PhysicalLink, PartitionInfo]() ++= seedLinkPartitions region .topologicalIterator() .foreach(physicalOpId => { @@ -204,5 +113,157 @@ class DefaultResourceAllocator( ) } }) + linkPartitionInfos.toMap + } + + /** + * Build port-level configs for the region’s intermediate input ports. + * + * For each input port with `storageURIs`, compute a per-URI `Partitioning` + * from the current worker assignment and the port’s partition requirement, + * then augment the existing port configs in place. + * + * @param region Region whose ports are configured. + * @param operatorConfigs Worker assignments per operator (for endpoint derivation). + * @param workflowSettings Settings used when deriving partitioning (e.g., batch size). + * @return Map from `GlobalPortIdentity` to `PortConfig`; empty if none. + */ + + def getPortConfigs( + region: Region, + operatorConfigs: Map[PhysicalOpIdentity, OperatorConfig], + workflowSettings: WorkflowSettings + ): Map[GlobalPortIdentity, PortConfig] = { + region.resourceConfig match { + case Some(existing) => + val upgradedInputPortConfigs: Map[GlobalPortIdentity, InputPortConfig] = + existing.portConfigs.collect { + case (globalPortId, rawInConfig: IntermediateInputPortConfig) if globalPortId.input => + val uris: List[URI] = rawInConfig.storageURIs + val portPartitionings: List[Partitioning] = uris.map { inputMatUri => + val toWorkerActorIds = + operatorConfigs(globalPortId.opId).workerConfigs.map(_.workerId) + val fromVirtualThreadActorIds = toWorkerActorIds.map(toWorkerActorId => + getFromActorIdForInputPortStorage(inputMatUri.toString, toWorkerActorId) + ) + val inputPortPartitionInfo = region + .getOperator(globalPortId.opId) + .partitionRequirement + .applyOrElse(globalPortId.portId.id, (_: Int) => None) + .getOrElse(UnknownPartition()) + + toPartitioning( + fromVirtualThreadActorIds, + toWorkerActorIds, + inputPortPartitionInfo, + workflowSettings.dataTransferBatchSize + ) + } + globalPortId -> InputPortConfig(uris.zip(portPartitionings)) + } + + existing.portConfigs ++ upgradedInputPortConfigs + + case None => + Map.empty[GlobalPortIdentity, PortConfig] + } + } + + /** + * Build `LinkConfig` for all links in the region. + * + * @param region Region providing the links. + * @param operatorConfigs Worker assignments per operator (for channel endpoints). + * @param linkPartitionInfos Partition info per link (for partitioning derivation). + * @param workflowSettings Settings used when deriving partitioning (e.g., batch size). + * @return Map from `PhysicalLink` to `LinkConfig`. + */ + def getLinkConfigs( + region: Region, + operatorConfigs: Map[PhysicalOpIdentity, OperatorConfig], + linkPartitionInfos: Map[PhysicalLink, PartitionInfo], + workflowSettings: WorkflowSettings + ): Map[PhysicalLink, LinkConfig] = { + region.getLinks.map { physicalLink => + physicalLink -> LinkConfig( + generateChannelConfigs( + operatorConfigs(physicalLink.fromOpId).workerConfigs.map(_.workerId), + operatorConfigs(physicalLink.toOpId).workerConfigs.map(_.workerId), + toPortId = physicalLink.toPortId, + linkPartitionInfos(physicalLink) + ), + toPartitioning( + operatorConfigs(physicalLink.fromOpId).workerConfigs.map(_.workerId), + operatorConfigs(physicalLink.toOpId).workerConfigs.map(_.workerId), + linkPartitionInfos(physicalLink), + workflowSettings.dataTransferBatchSize + ) + ) + }.toMap + } + +} + +class DefaultResourceAllocator( + physicalPlan: PhysicalPlan, + executionClusterInfo: ExecutionClusterInfo, + workflowSettings: WorkflowSettings +) extends ResourceAllocator { + + // a map of a physical link to the partition info of the upstream/downstream of this link + private val linkPartitionInfos = new mutable.HashMap[PhysicalLink, PartitionInfo]() + + private val operatorConfigs = new mutable.HashMap[PhysicalOpIdentity, OperatorConfig]() + private val linkConfigs = new mutable.HashMap[PhysicalLink, LinkConfig]() + + /** + * Allocates resources for a given region and its operators. + * + * This method calculates and assigns worker configurations for each operator + * in the region. For the operators that are parallelizable, it respects the + * suggested worker number if provided. Otherwise, it falls back to a default + * value. Non-parallelizable operators are assigned a single worker. + * + * @param region The region for which to allocate resources. + * @return A tuple containing: + * 1) A new Region instance with new resource configuration. + * 2) An estimated cost of the workflow with the new resource configuration, + * represented as a Double value (currently set to 0, but will be + * updated in the future). + */ + def allocate( + region: Region + ): (Region, Double) = { + + val opToOperatorConfigMapping = region.getOperators + .map(physicalOp => physicalOp.id -> OperatorConfig(generateDefaultWorkerConfigs(physicalOp))) + .toMap + + operatorConfigs ++= opToOperatorConfigMapping + + val updatedLinkPartitionInfos = propagatePartitionRequirement( + region, + physicalPlan, + operatorConfigs.toMap, + linkPartitionInfos.toMap + ) + + linkPartitionInfos ++= updatedLinkPartitionInfos + + val linkToLinkConfigMapping = + getLinkConfigs(region, operatorConfigs.toMap, linkPartitionInfos.toMap, workflowSettings) + + linkConfigs ++= linkToLinkConfigMapping + + val portConfigs: Map[GlobalPortIdentity, PortConfig] = + getPortConfigs(region, operatorConfigs.toMap, workflowSettings) + + val resourceConfig = ResourceConfig( + opToOperatorConfigMapping, + linkToLinkConfigMapping, + portConfigs + ) + + (region.copy(resourceConfig = Some(resourceConfig)), 0) } } diff --git a/core/config/src/main/resources/application.conf b/core/config/src/main/resources/application.conf index 4d372fa847f..2755f52abf7 100644 --- a/core/config/src/main/resources/application.conf +++ b/core/config/src/main/resources/application.conf @@ -120,7 +120,7 @@ fault-tolerance { } schedule-generator { - max-concurrent-regions = 1 + max-concurrent-regions = 2 max-concurrent-regions = ${?SCHEDULE_GENERATOR_MAX_CONCURRENT_REGIONS} use-global-search = false @@ -133,6 +133,15 @@ schedule-generator { search-timeout-milliseconds = ${?SCHEDULE_GENERATOR_SEARCH_TIMEOUT_MILLISECONDS} } +operator-parallelism { + allocator-type = "greedy" + allocator-type = ${?OPERATOR_PARALLELISM_ALLOCATOR_TYPE} + available-cores = 8 + available-cores = ${?OPERATOR_PARALLELISM_AVAILABLE_CORES} + core-to-worker-ratio = 1.5 + core-to-worker-ratio =${?OPERATOR_PARALLELISM_CORE_TO_WORKER_RATIO} +} + ai-assistant-server { assistant = "none" assistant = ${?AI_ASSISTANT_SERVER_ASSISTANT} diff --git a/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala b/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala index e9eb0f3968d..6543fee990c 100644 --- a/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala +++ b/core/config/src/main/scala/edu/uci/ics/amber/config/ApplicationConfig.scala @@ -106,6 +106,12 @@ object ApplicationConfig { val maxWorkflowWebsocketRequestPayloadSizeKb: Int = getConfSource.getInt("web-server.max-workflow-websocket-request-payload-size-kb") + // Operator parallelism + val allocatorType: String = getConfSource.getString("operator-parallelism.allocator-type") + val availableCores: Int = getConfSource.getInt("operator-parallelism.available-cores") + val coreToWorkerRatio: Double = + getConfSource.getDouble("operator-parallelism.core-to-worker-ratio") + // AI Assistant val aiAssistantConfig: Option[Config] = if (getConfSource.hasPath("ai-assistant-server")) diff --git a/core/config/src/main/scala/edu/uci/ics/amber/config/EnvironmentalVariable.scala b/core/config/src/main/scala/edu/uci/ics/amber/config/EnvironmentalVariable.scala index f186050c3b6..9bed744239b 100644 --- a/core/config/src/main/scala/edu/uci/ics/amber/config/EnvironmentalVariable.scala +++ b/core/config/src/main/scala/edu/uci/ics/amber/config/EnvironmentalVariable.scala @@ -161,6 +161,11 @@ object EnvironmentalVariable { val ENV_SCHEDULE_GENERATOR_SEARCH_TIMEOUT_MILLISECONDS = "SCHEDULE_GENERATOR_SEARCH_TIMEOUT_MILLISECONDS" + // Operator Parallelism + val ENV_OPERATOR_PARALLELISM_ALLOCATOR_TYPE = "OPERATOR_PARALLELISM_ALLOCATOR_TYPE" + val ENV_OPERATOR_PARALLELISM_AVAILABLE_CORES = "OPERATOR_PARALLELISM_AVAILABLE_CORES" + val ENV_OPERATOR_PARALLELISM_CORE_TO_WORKER_RATIO = "OPERATOR_PARALLELISM_CORE_TO_WORKER_RATIO" + // AI Assistant Server val ENV_AI_ASSISTANT_SERVER_ASSISTANT = "AI_ASSISTANT_SERVER_ASSISTANT" val ENV_AI_ASSISTANT_SERVER_AI_SERVICE_KEY = "AI_ASSISTANT_SERVER_AI_SERVICE_KEY" diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/java/JavaUDFOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/java/JavaUDFOpDesc.scala index a53f9424fbb..49f463fcf03 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/java/JavaUDFOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/java/JavaUDFOpDesc.scala @@ -20,8 +20,9 @@ package edu.uci.ics.amber.operator.udf.java import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} +import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.google.common.base.Preconditions -import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import com.kjetland.jackson.jsonSchema.annotations.{JsonSchemaInject, JsonSchemaTitle} import edu.uci.ics.amber.core.executor.OpExecWithCode import edu.uci.ics.amber.core.tuple.{Attribute, Schema} import edu.uci.ics.amber.core.workflow.{ @@ -60,10 +61,11 @@ class JavaUDFOpDesc extends LogicalOp { @JsonPropertyDescription("Input your code here") var code: String = "" - @JsonProperty(required = true, defaultValue = "1") - @JsonSchemaTitle("Worker count") - @JsonPropertyDescription("Specify how many parallel workers to lunch") - var workers: Int = Int.box(1) + @JsonProperty(required = true, defaultValue = "true") + @JsonSchemaTitle("Parallelizable?") + @JsonPropertyDescription("Default: True") + @JsonSchemaInject(json = """{"toggleHidden" : ["advanced"]}""") + val parallelizable: Boolean = Boolean.box(true) @JsonProperty(required = true, defaultValue = "true") @JsonSchemaTitle("Retain input columns") @@ -77,6 +79,17 @@ class JavaUDFOpDesc extends LogicalOp { ) var outputColumns: List[Attribute] = List() + @JsonProperty(required = true, defaultValue = "false") + @JsonSchemaTitle("Advanced Setting") + @JsonDeserialize(contentAs = classOf[java.lang.Boolean]) + @JsonSchemaInject(json = """{"toggleHidden" : ["workers"]}""") + var advanced: Boolean = Boolean.box(false) + + @JsonProperty(required = true, defaultValue = "1") + @JsonSchemaTitle("Worker count") + @JsonPropertyDescription("Specify how many parallel workers to launch") + var workers: Int = Int.box(1) + override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity @@ -109,23 +122,40 @@ class JavaUDFOpDesc extends LogicalOp { Map(operatorInfo.outputPorts.head.id -> outputSchema) } - if (workers > 1) - PhysicalOp - .oneToOnePhysicalOp( - workflowId, - executionId, - operatorIdentifier, - OpExecWithCode(code, "java") - ) - .withDerivePartition(_ => UnknownPartition()) - .withInputPorts(operatorInfo.inputPorts) - .withOutputPorts(operatorInfo.outputPorts) - .withPartitionRequirement(partitionRequirement) - .withIsOneToManyOp(true) - .withParallelizable(true) - .withSuggestedWorkerNum(workers) - .withPropagateSchema(SchemaPropagationFunc(propagateSchema)) - else + if (parallelizable) { + if (advanced) { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(code, "java") + ) + .withDerivePartition(_ => UnknownPartition()) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withPartitionRequirement(partitionRequirement) + .withIsOneToManyOp(true) + .withParallelizable(true) + .withSuggestedWorkerNum(workers) + .withPropagateSchema(SchemaPropagationFunc(propagateSchema)) + } else { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(code, "java") + ) + .withDerivePartition(_ => UnknownPartition()) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withPartitionRequirement(partitionRequirement) + .withIsOneToManyOp(true) + .withParallelizable(true) + .withPropagateSchema(SchemaPropagationFunc(propagateSchema)) + } + } else PhysicalOp .manyToOnePhysicalOp( workflowId, diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/DualInputPortsPythonUDFOpDescV2.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/DualInputPortsPythonUDFOpDescV2.scala index f7b3a27317b..6c43b3c9ce4 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/DualInputPortsPythonUDFOpDescV2.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/DualInputPortsPythonUDFOpDescV2.scala @@ -20,8 +20,9 @@ package edu.uci.ics.amber.operator.udf.python import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} +import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.google.common.base.Preconditions -import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import com.kjetland.jackson.jsonSchema.annotations.{JsonSchemaInject, JsonSchemaTitle} import edu.uci.ics.amber.core.executor.OpExecWithCode import edu.uci.ics.amber.core.tuple.{Attribute, Schema} import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc, UnknownPartition} @@ -61,10 +62,11 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp { @JsonPropertyDescription("Input your code here") var code: String = "" - @JsonProperty(required = true, defaultValue = "1") - @JsonSchemaTitle("Worker count") - @JsonPropertyDescription("Specify how many parallel workers to launch") - var workers: Int = Int.box(1) + @JsonProperty(required = true, defaultValue = "false") + @JsonSchemaTitle("Parallelizable?") + @JsonPropertyDescription("Default: True") + @JsonSchemaInject(json = """{"toggleHidden" : ["advanced"]}""") + val parallelizable: Boolean = Boolean.box(false) @JsonProperty(required = true, defaultValue = "true") @JsonSchemaTitle("Retain input columns") @@ -78,21 +80,41 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp { ) var outputColumns: List[Attribute] = List() + @JsonProperty(required = true, defaultValue = "false") + @JsonSchemaTitle("Advanced Setting") + @JsonDeserialize(contentAs = classOf[java.lang.Boolean]) + @JsonSchemaInject(json = """{"toggleHidden" : ["workers"]}""") + var advanced: Boolean = Boolean.box(false) + + @JsonProperty(required = true, defaultValue = "1") + @JsonSchemaTitle("Worker count") + @JsonPropertyDescription("Specify how many parallel workers to launch") + var workers: Int = Int.box(1) + override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity ): PhysicalOp = { Preconditions.checkArgument(workers >= 1, "Need at least 1 worker.", Array()) - val physicalOp = if (workers > 1) { - PhysicalOp - .oneToOnePhysicalOp( - workflowId, - executionId, - operatorIdentifier, - OpExecWithCode(code, "python") - ) - .withParallelizable(true) - .withSuggestedWorkerNum(workers) + val physicalOp = if (parallelizable) { + if (advanced) { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(code, "python") + ) + .withSuggestedWorkerNum(workers) + } else { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(code, "python") + ) + } } else { PhysicalOp .manyToOnePhysicalOp( @@ -101,9 +123,9 @@ class DualInputPortsPythonUDFOpDescV2 extends LogicalOp { operatorIdentifier, OpExecWithCode(code, "python") ) - .withParallelizable(false) } physicalOp + .withParallelizable(parallelizable) .withDerivePartition(_ => UnknownPartition()) .withInputPorts(operatorInfo.inputPorts) .withOutputPorts(operatorInfo.outputPorts) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/PythonUDFOpDescV2.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/PythonUDFOpDescV2.scala index c14838bb784..4897e9fc5a0 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/PythonUDFOpDescV2.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/PythonUDFOpDescV2.scala @@ -20,8 +20,9 @@ package edu.uci.ics.amber.operator.udf.python import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} +import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.google.common.base.Preconditions -import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import com.kjetland.jackson.jsonSchema.annotations.{JsonSchemaInject, JsonSchemaTitle} import edu.uci.ics.amber.core.executor.OpExecWithCode import edu.uci.ics.amber.core.tuple.{Attribute, Schema} import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} @@ -62,10 +63,11 @@ class PythonUDFOpDescV2 extends LogicalOp { @JsonPropertyDescription("Input your code here") var code: String = "" - @JsonProperty(required = true, defaultValue = "1") - @JsonSchemaTitle("Worker count") - @JsonPropertyDescription("Specify how many parallel workers to launch") - var workers: Int = Int.box(1) + @JsonProperty(required = true, defaultValue = "true") + @JsonSchemaTitle("Parallelizable?") + @JsonPropertyDescription("Default: True") + @JsonSchemaInject(json = """{"toggleHidden" : ["advanced"]}""") + val parallelizable: Boolean = Boolean.box(true) @JsonProperty(required = true, defaultValue = "true") @JsonSchemaTitle("Retain input columns") @@ -79,6 +81,17 @@ class PythonUDFOpDescV2 extends LogicalOp { ) var outputColumns: List[Attribute] = List() + @JsonProperty(required = true, defaultValue = "false") + @JsonSchemaTitle("Advanced Setting") + @JsonDeserialize(contentAs = classOf[java.lang.Boolean]) + @JsonSchemaInject(json = """{"toggleHidden" : ["workers"]}""") + var advanced: Boolean = Boolean.box(false) + + @JsonProperty(required = true, defaultValue = "1") + @JsonSchemaTitle("Worker count") + @JsonPropertyDescription("Specify how many parallel workers to launch") + var workers: Int = Int.box(1) + override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity @@ -112,16 +125,25 @@ class PythonUDFOpDescV2 extends LogicalOp { Map(operatorInfo.outputPorts.head.id -> outputSchema) } - val physicalOp = if (workers > 1) { - PhysicalOp - .oneToOnePhysicalOp( - workflowId, - executionId, - operatorIdentifier, - OpExecWithCode(code, "python") - ) - .withParallelizable(true) - .withSuggestedWorkerNum(workers) + val physicalOp = if (parallelizable) { + if (advanced) { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(code, "python") + ) + .withSuggestedWorkerNum(workers) + } else { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(code, "python") + ) + } } else { PhysicalOp .manyToOnePhysicalOp( @@ -130,10 +152,10 @@ class PythonUDFOpDescV2 extends LogicalOp { operatorIdentifier, OpExecWithCode(code, "python") ) - .withParallelizable(false) } physicalOp + .withParallelizable(parallelizable) .withDerivePartition(_ => UnknownPartition()) .withInputPorts(operatorInfo.inputPorts) .withOutputPorts(operatorInfo.outputPorts) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.scala index 0f5f31db6de..82beb682d62 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/python/source/PythonUDFSourceOpDescV2.scala @@ -20,7 +20,8 @@ package edu.uci.ics.amber.operator.udf.python.source import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} -import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.kjetland.jackson.jsonSchema.annotations.{JsonSchemaInject, JsonSchemaTitle} import edu.uci.ics.amber.core.executor.OpExecWithCode import edu.uci.ics.amber.core.tuple.{Attribute, Schema} import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc} @@ -45,16 +46,28 @@ class PythonUDFSourceOpDescV2 extends SourceOperatorDescriptor { @JsonPropertyDescription("Input your code here") var code: String = _ - @JsonProperty(required = true, defaultValue = "1") - @JsonSchemaTitle("Worker count") - @JsonPropertyDescription("Specify how many parallel workers to launch") - var workers: Int = 1 + @JsonProperty(required = true, defaultValue = "false") + @JsonSchemaTitle("Parallelizable?") + @JsonPropertyDescription("Default: False") + @JsonSchemaInject(json = """{"toggleHidden" : ["advanced"]}""") + val parallelizable: Boolean = Boolean.box(false) @JsonProperty() @JsonSchemaTitle("Columns") @JsonPropertyDescription("The columns of the source") var columns: List[Attribute] = List.empty + @JsonProperty(required = true, defaultValue = "false") + @JsonSchemaTitle("Advanced Setting") + @JsonDeserialize(contentAs = classOf[java.lang.Boolean]) + @JsonSchemaInject(json = """{"toggleHidden" : ["workers"]}""") + var advanced: Boolean = Boolean.box(false) + + @JsonProperty(required = true, defaultValue = "1") + @JsonSchemaTitle("Worker count") + @JsonPropertyDescription("Specify how many parallel workers to launch") + var workers: Int = Int.box(1) + override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity @@ -69,13 +82,13 @@ class PythonUDFSourceOpDescV2 extends SourceOperatorDescriptor { SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> sourceSchema())) ) .withLocationPreference(Option.empty) + .withParallelizable(parallelizable) - if (workers > 1) { + if (advanced && parallelizable) { physicalOp - .withParallelizable(true) .withSuggestedWorkerNum(workers) } else { - physicalOp.withParallelizable(false) + physicalOp } } diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFOpDesc.scala index 5389634ce8c..937aab0a4ca 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFOpDesc.scala @@ -20,8 +20,9 @@ package edu.uci.ics.amber.operator.udf.r import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} +import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.google.common.base.Preconditions -import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import com.kjetland.jackson.jsonSchema.annotations.{JsonSchemaInject, JsonSchemaTitle} import edu.uci.ics.amber.core.executor.OpExecWithCode import edu.uci.ics.amber.core.tuple.{Attribute, Schema} import edu.uci.ics.amber.core.workflow.{ @@ -56,10 +57,11 @@ class RUDFOpDesc extends LogicalOp { @JsonPropertyDescription("Input your code here") var code: String = "" - @JsonProperty(required = true, defaultValue = "1") - @JsonSchemaTitle("Worker count") - @JsonPropertyDescription("Specify how many parallel workers to lunch") - var workers: Int = Int.box(1) + @JsonProperty(required = true, defaultValue = "true") + @JsonSchemaTitle("Parallelizable?") + @JsonPropertyDescription("Default: True") + @JsonSchemaInject(json = """{"toggleHidden" : ["advanced"]}""") + val parallelizable: Boolean = Boolean.box(true) @JsonProperty(required = true, defaultValue = "false") @JsonSchemaTitle("Use Tuple API?") @@ -78,6 +80,17 @@ class RUDFOpDesc extends LogicalOp { ) var outputColumns: List[Attribute] = List() + @JsonProperty(required = true, defaultValue = "false") + @JsonSchemaTitle("Advanced Setting") + @JsonDeserialize(contentAs = classOf[java.lang.Boolean]) + @JsonSchemaInject(json = """{"toggleHidden" : ["workers"]}""") + var advanced: Boolean = Boolean.box(false) + + @JsonProperty(required = true, defaultValue = "1") + @JsonSchemaTitle("Worker count") + @JsonPropertyDescription("Specify how many parallel workers to launch") + var workers: Int = Int.box(1) + override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity @@ -112,16 +125,25 @@ class RUDFOpDesc extends LogicalOp { } val r_operator_type = if (useTupleAPI) "r-tuple" else "r-table" - if (workers > 1) { - PhysicalOp - .oneToOnePhysicalOp( - workflowId, - executionId, - operatorIdentifier, - OpExecWithCode(code, r_operator_type) - ) - .withParallelizable(true) - .withSuggestedWorkerNum(workers) + if (parallelizable) { + if (advanced) { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(code, r_operator_type) + ) + } else { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(code, r_operator_type) + ) + .withSuggestedWorkerNum(workers) + } } else { PhysicalOp .manyToOnePhysicalOp( @@ -130,8 +152,8 @@ class RUDFOpDesc extends LogicalOp { operatorIdentifier, OpExecWithCode(code, r_operator_type) ) - .withParallelizable(false) - }.withDerivePartition(_ => UnknownPartition()) + }.withParallelizable(parallelizable) + .withDerivePartition(_ => UnknownPartition()) .withInputPorts(operatorInfo.inputPorts) .withOutputPorts(operatorInfo.outputPorts) .withPartitionRequirement(partitionRequirement) diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.scala index 4b17ecc12d4..98867074547 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/udf/r/RUDFSourceOpDesc.scala @@ -20,7 +20,8 @@ package edu.uci.ics.amber.operator.udf.r import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} -import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.kjetland.jackson.jsonSchema.annotations.{JsonSchemaInject, JsonSchemaTitle} import edu.uci.ics.amber.core.executor.OpExecWithCode import edu.uci.ics.amber.core.tuple.{Attribute, Schema} import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc} @@ -48,10 +49,11 @@ class RUDFSourceOpDesc extends SourceOperatorDescriptor { @JsonPropertyDescription("Input your code here") var code: String = _ - @JsonProperty(required = true, defaultValue = "1") - @JsonSchemaTitle("Worker count") - @JsonPropertyDescription("Specify how many parallel workers to launch") - var workers: Int = 1 + @JsonProperty(required = true, defaultValue = "false") + @JsonSchemaTitle("Parallelizable?") + @JsonPropertyDescription("Default: False") + @JsonSchemaInject(json = """{"toggleHidden" : ["advanced"]}""") + val parallelizable: Boolean = Boolean.box(false) @JsonProperty(required = true, defaultValue = "false") @JsonSchemaTitle("Use Tuple API?") @@ -63,6 +65,17 @@ class RUDFSourceOpDesc extends SourceOperatorDescriptor { @JsonPropertyDescription("The columns of the source") var columns: List[Attribute] = List.empty + @JsonProperty(required = true, defaultValue = "false") + @JsonSchemaTitle("Advanced Setting") + @JsonDeserialize(contentAs = classOf[java.lang.Boolean]) + @JsonSchemaInject(json = """{"toggleHidden" : ["workers"]}""") + var advanced: Boolean = Boolean.box(false) + + @JsonProperty(required = true, defaultValue = "1") + @JsonSchemaTitle("Worker count") + @JsonPropertyDescription("Specify how many parallel workers to launch") + var workers: Int = Int.box(1) + override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity @@ -84,13 +97,13 @@ class RUDFSourceOpDesc extends SourceOperatorDescriptor { SchemaPropagationFunc(_ => Map(operatorInfo.outputPorts.head.id -> sourceSchema())) ) .withLocationPreference(None) + .withParallelizable(parallelizable) - if (workers > 1) { + if (parallelizable && advanced) { physicalOp - .withParallelizable(true) .withSuggestedWorkerNum(workers) } else { - physicalOp.withParallelizable(false) + physicalOp } }