diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala index 732b970f03e..a705067bb98 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -19,6 +19,7 @@ package edu.uci.ics.amber.engine.architecture.controller +import edu.uci.ics.amber.core.executor.OpExecWithClassName import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext} import edu.uci.ics.amber.engine.architecture.scheduling.{ @@ -33,6 +34,7 @@ class WorkflowScheduler( ) extends java.io.Serializable { var physicalPlan: PhysicalPlan = _ private var schedule: Schedule = _ + var lastRegion: Set[Region] = _ /** * Update the schedule to be executed, based on the given physicalPlan. @@ -50,6 +52,12 @@ class WorkflowScheduler( this.physicalPlan = updatedPhysicalPlan } - def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next() + def getNextRegions: Set[Region] = { + val region: Set[Region] = if (!schedule.hasNext) Set() else schedule.next() + val isAgg = region.head.physicalOps.exists { op =>op.opExecInitInfo.asInstanceOf[OpExecWithClassName].className.contains("Aggregate")} + if (isAgg) lastRegion = region + println("ergergerg", lastRegion) + if (lastRegion != null) lastRegion else region + } } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/RegionExecution.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/RegionExecution.scala index e78bac29567..a00532df27d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/RegionExecution.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/RegionExecution.scala @@ -59,7 +59,7 @@ case class RegionExecution(region: Region) { physicalOpId: PhysicalOpIdentity, inheritOperatorExecution: Option[OperatorExecution] = None ): OperatorExecution = { - assert(!operatorExecutions.contains(physicalOpId), "OperatorExecution already exists.") + //assert(!operatorExecutions.contains(physicalOpId), "OperatorExecution already exists.") operatorExecutions.getOrElseUpdate( physicalOpId, diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/WorkflowExecution.scala index b36e8b27b4b..4120090e6ef 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -45,10 +45,7 @@ case class WorkflowExecution() { */ def initRegionExecution(region: Region): RegionExecution = { // ensure the region execution hasn't been initialized already. - assert( - !regionExecutions.contains(region.id), - s"RegionExecution of ${region.id} already initialized." - ) + //assert(!regionExecutions.contains(region.id), s"RegionExecution of ${region.id} already initialized.") regionExecutions.getOrElseUpdate(region.id, RegionExecution(region)) } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala index f28735a2954..13eb8361c98 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala @@ -28,7 +28,7 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlcommands.{ } import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.EmptyReturn import edu.uci.ics.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer -import edu.uci.ics.amber.engine.architecture.worker.statistics.WorkerState.{PAUSED, READY, RUNNING} +import edu.uci.ics.amber.engine.architecture.worker.statistics.WorkerState.{PAUSED, READY, RUNNING, COMPLETED} import edu.uci.ics.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage import java.net.URI @@ -51,7 +51,7 @@ trait AssignPortHandler { // Same as AddInputChannelHandler dp.inputGateway.getChannel(channelId).setPortId(msg.portId) dp.inputManager.getPort(msg.portId).channels.add(channelId) - dp.stateManager.assertState(READY, RUNNING, PAUSED) + dp.stateManager.assertState(READY, RUNNING, PAUSED, COMPLETED) } } else { val storageURIOption: Option[URI] = msg.storageUris.head match { diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index c3b63a2df81..619c2a61824 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -116,6 +116,11 @@ object WorkflowExecutionsResource { OPERATOR_PORT_EXECUTIONS.RESULT_URI ) .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString) + .onConflict( + OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID, + OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID + ) + .doNothing() .execute() } else { ExecutionResourcesMapping.addResourceUri(eid, uri)