diff --git a/docs/dev/libs/ml/contribution_guide.md b/docs/dev/libs/ml/contribution_guide.md index b30c53ecac388..c28fdaa209154 100644 --- a/docs/dev/libs/ml/contribution_guide.md +++ b/docs/dev/libs/ml/contribution_guide.md @@ -55,7 +55,7 @@ The rest are considered unit tests and should only test behavior which is local An integration test is a test which requires the full Flink system to be started. In order to do that properly, all integration test cases have to mix in the trait `FlinkTestBase`. -This trait will set the right `ExecutionEnvironment` so that the test will be executed on a special `FlinkMiniCluster` designated for testing purposes. +This trait will set the right `ExecutionEnvironment` so that the test will be executed on Flink's `MiniCluster`. Thus, an integration test could look the following: {% highlight scala %} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 14cf647cd24b9..d36b6c1adc7d4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -98,7 +98,7 @@ public abstract class FileInputFormat extends RichInputFormat - AkkaUtils.getAddress(jmActorSystems(index)).port match { - case Some(p) => p - case None => -1 - } - - case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " + - "started properly.") - } - } - - def getResourceManagerAkkaConfig(index: Int): Config = { - if (useSingleActorSystem) { - AkkaUtils.getAkkaConfig(originalConfiguration, None) - } else { - val port = originalConfiguration.getInteger( - ResourceManagerOptions.IPC_PORT) - - val resolvedPort = if(port != 0) port + index else port - - AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort))) - } - } - - def getJobManagerAkkaConfig(index: Int): Config = { - if (useSingleActorSystem) { - AkkaUtils.getAkkaConfig(originalConfiguration, None) - } - else { - val port = originalConfiguration.getInteger(JobManagerOptions.PORT) - - val resolvedPort = if(port != 0) port + index else port - - AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort))) - } - } - - def getTaskManagerAkkaConfig(index: Int): Config = { - val portRange = originalConfiguration.getString(TaskManagerOptions.RPC_PORT) - - val portRangeIterator = NetUtils.getPortRangeFromString(portRange) - - val resolvedPort = if (portRangeIterator.hasNext) { - val port = portRangeIterator.next() - if (port > 0) port + index else 0 - } else 0 - - AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort))) - } - - /** - * Sets CI environment (Travis) specific config defaults. - */ - def setDefaultCiConfig(config: Configuration) : Unit = { - // https://docs.travis-ci.com/user/environment-variables#Default-Environment-Variables - if (sys.env.contains("CI")) { - // Only set if nothing specified in config - if (!config.contains(AkkaOptions.ASK_TIMEOUT)) { - val duration = Duration(AkkaOptions.ASK_TIMEOUT.defaultValue()) * 10 - config.setString(AkkaOptions.ASK_TIMEOUT, s"${duration.toSeconds}s") - - LOG.info(s"Akka ask timeout set to ${duration.toSeconds}s") - } - } - } - - // -------------------------------------------------------------------------- - // Start/Stop Methods - // -------------------------------------------------------------------------- - - def startResourceManagerActorSystem(index: Int): ActorSystem = { - val config = getResourceManagerAkkaConfig(index) - val testConfig = AkkaUtils.testDispatcherConfig.withFallback(config) - AkkaUtils.createActorSystem(testConfig) - } - - def startJobManagerActorSystem(index: Int): ActorSystem = { - val config = getJobManagerAkkaConfig(index) - val testConfig = AkkaUtils.testDispatcherConfig.withFallback(config) - AkkaUtils.createActorSystem(testConfig) - } - - def startTaskManagerActorSystem(index: Int): ActorSystem = { - val config = getTaskManagerAkkaConfig(index) - val testConfig = AkkaUtils.testDispatcherConfig.withFallback(config) - AkkaUtils.createActorSystem(testConfig) - } - - def startJobClientActorSystem(jobID: JobID): ActorSystem = { - if (useSingleActorSystem) { - jobManagerActorSystems match { - case Some(jmActorSystems) => jmActorSystems(0) - case None => throw new JobExecutionException( - jobID, - "The FlinkMiniCluster has not been started yet.") - } - } else { - JobClient.startJobClientActorSystem(originalConfiguration, hostname) - } - } - - def start(): Unit = { - start(true) - } - - def start(waitForTaskManagerRegistration: Boolean): Unit = { - LOG.info("Starting FlinkMiniCluster.") - - lazy val singleActorSystem = startJobManagerActorSystem(0) - - val metricRegistry = new MetricRegistryImpl( - MetricRegistryConfiguration.fromConfiguration(originalConfiguration)) - - metricRegistryOpt = Some(metricRegistry) - - if (originalConfiguration.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) { - metricRegistry.startQueryService(singleActorSystem, null) - } - - val (jmActorSystems, jmActors) = - (for(i <- 0 until numJobManagers) yield { - val actorSystem = if(useSingleActorSystem) { - singleActorSystem - } else { - startJobManagerActorSystem(i) - } - - if (i == 0) { - webMonitor = startWebServer(originalConfiguration, actorSystem) - } - - (actorSystem, startJobManager(i, actorSystem, webMonitor.map(_.getRestAddress))) - }).unzip - - jobManagerActorSystems = Some(jmActorSystems) - jobManagerActors = Some(jmActors) - - // find out which job manager the leader is - jobManagerLeaderRetrievalService = Option(highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID)) - - jobManagerLeaderRetrievalService.foreach(_.start(this)) - - // start as many resource managers as job managers - val (rmActorSystems, rmActors) = - (for(i <- 0 until getNumberOfResourceManagers) yield { - val actorSystem = if(useSingleActorSystem) { - jmActorSystems(0) - } else { - startResourceManagerActorSystem(i) - } - - (actorSystem, startResourceManager(i, actorSystem)) - }).unzip - - resourceManagerActorSystems = Some(rmActorSystems) - resourceManagerActors = Some(rmActors) - - // start task managers - val (tmActorSystems, tmActors) = - (for(i <- 0 until numTaskManagers) yield { - val actorSystem = if(useSingleActorSystem) { - jmActorSystems(0) - } else { - startTaskManagerActorSystem(i) - } - - (actorSystem, startTaskManager(i, actorSystem)) - }).unzip - - taskManagerActorSystems = Some(tmActorSystems) - taskManagerActors = Some(tmActors) - - if(waitForTaskManagerRegistration) { - waitForTaskManagersToBeRegistered() - } - - isRunning = true - } - - def startWebServer( - config: Configuration, - actorSystem: ActorSystem) - : Option[WebMonitor] = { - if( - config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) && - config.getInteger(WebOptions.PORT, 0) >= 0) { - - val flinkTimeout = FutureUtils.toTime(timeout) - - LOG.info("Starting JobManger web frontend") - // start the new web frontend. we need to load this dynamically - // because it is not in the same project/dependencies - val webServer = None: Option[WebMonitor] - - webServer.foreach(_.start()) - - webServer - } else { - None - } - } - - def stop(): Unit = { - LOG.info("Stopping FlinkMiniCluster.") - startInternalShutdown() - awaitTermination() - - jobManagerLeaderRetrievalService.foreach(_.stop()) - - highAvailabilityServices.closeAndCleanupAllData() - - isRunning = false - - ExecutorUtils.gracefulShutdown( - timeout.toMillis, - TimeUnit.MILLISECONDS, - futureExecutor, - ioExecutor) - } - - def firstActorSystem(): Option[ActorSystem] = { - jobManagerActorSystems match { - case Some(jmActorSystems) => Some(jmActorSystems.head) - case None => None - } - } - - protected def startInternalShutdown(): Unit = { - webMonitor foreach { - _.stop() - } - - val tmFutures = taskManagerActors map { - _.map(gracefulStop(_, timeout)) - } getOrElse(Seq()) - - - val jmFutures = jobManagerActors map { - _.map(gracefulStop(_, timeout)) - } getOrElse(Seq()) - - val rmFutures = resourceManagerActors map { - _.map(gracefulStop(_, timeout)) - } getOrElse(Seq()) - - Await.ready(Future.sequence(jmFutures ++ tmFutures ++ rmFutures), timeout) - - metricRegistryOpt.foreach(_.shutdown().get()) - - if (!useSingleActorSystem) { - taskManagerActorSystems foreach { - _ foreach(_.terminate()) - } - - resourceManagerActorSystems foreach { - _ foreach(_.terminate()) - } - } - - jobManagerActorSystems foreach { - _ foreach(_.terminate()) - } - } - - def awaitTermination(): Unit = { - jobManagerActorSystems foreach { - _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf)) - } - - resourceManagerActorSystems foreach { - _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf)) - } - - taskManagerActorSystems foreach { - _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf)) - } - } - - def running = isRunning - - // -------------------------------------------------------------------------- - // Utility Methods - // -------------------------------------------------------------------------- - - /** Waits with the default timeout until all task managers have registered at the job manager - * - * @throws java.util.concurrent.TimeoutException - * @throws java.lang.InterruptedException - */ - @throws(classOf[TimeoutException]) - @throws(classOf[InterruptedException]) - def waitForTaskManagersToBeRegistered(): Unit = { - waitForTaskManagersToBeRegistered(timeout) - } - - /** Waits until all task managers have registered at the job manager until the timeout is reached. - * - * @param timeout - * @throws java.util.concurrent.TimeoutException - * @throws java.lang.InterruptedException - */ - @throws(classOf[TimeoutException]) - @throws(classOf[InterruptedException]) - def waitForTaskManagersToBeRegistered(timeout: FiniteDuration): Unit = { - val futures = taskManagerActors map { - _ map(taskManager => (taskManager ? NotifyWhenRegisteredAtJobManager)(timeout)) - } getOrElse(Seq()) - - Await.ready(Future.sequence(futures), timeout) - } - - @throws(classOf[JobExecutionException]) - def submitJobAndWait( - jobGraph: JobGraph, - printUpdates: Boolean) - : JobExecutionResult = { - submitJobAndWait(jobGraph, printUpdates, timeout) - } - - @throws(classOf[JobExecutionException]) - def submitJobAndWait( - jobGraph: JobGraph, - printUpdates: Boolean, - timeout: FiniteDuration) - : JobExecutionResult = { - - val clientActorSystem = startJobClientActorSystem(jobGraph.getJobID) - - val userCodeClassLoader = - try { - createUserCodeClassLoader( - jobGraph.getUserJars, - jobGraph.getClasspaths, - Thread.currentThread().getContextClassLoader) - } catch { - case e: Exception => throw new JobExecutionException( - jobGraph.getJobID, - "Could not create the user code class loader.", - e) - } - - try { - JobClient.submitJobAndWait( - clientActorSystem, - configuration, - highAvailabilityServices, - jobGraph, - timeout, - printUpdates, - userCodeClassLoader) - } finally { - if(!useSingleActorSystem) { - // we have to shutdown the just created actor system - shutdownJobClientActorSystem(clientActorSystem) - } - } - } - - @throws(classOf[JobExecutionException]) - def submitJobDetached(jobGraph: JobGraph) : JobSubmissionResult = { - - val jobManagerGateway = try { - getLeaderGateway(timeout) - } catch { - case t: Throwable => - throw new JobExecutionException( - jobGraph.getJobID, - "Could not retrieve JobManager ActorRef.", - t - ) - } - - val userCodeClassLoader = - try { - createUserCodeClassLoader( - jobGraph.getUserJars, - jobGraph.getClasspaths, - Thread.currentThread().getContextClassLoader) - } catch { - case e: Exception => throw new JobExecutionException( - jobGraph.getJobID, - "Could not create the user code class loader.", - e) - } - - JobClient.submitJobDetached( - new AkkaJobManagerGateway(jobManagerGateway), - configuration, - jobGraph, - Time.milliseconds(timeout.toMillis), - userCodeClassLoader) - - new JobSubmissionResult(jobGraph.getJobID) - } - - def shutdownJobClientActorSystem(actorSystem: ActorSystem): Unit = { - if(!useSingleActorSystem) { - actorSystem.terminate().onComplete { - case Success(_) => - case Failure(t) => LOG.warn("Could not cleanly shut down the job client actor system.", t) - } - } - } - - protected def clearLeader(): Unit = { - futureLock.synchronized{ - leaderGateway = Promise() - leaderIndex = Promise() - } - } - - override def notifyLeaderAddress(address: String, leaderSessionID: UUID): Unit = { - if (address != null && !address.equals("")) { - // only accept leader addresses which are not null and non-empty - - val selectedLeader = (jobManagerActorSystems, jobManagerActors) match { - case (Some(systems), Some(actors)) => - val actorPaths = systems.zip(actors).zipWithIndex.map { - case ((system, actor), index) => (AkkaUtils.getAkkaURL(system, actor), actor, index) - } - - actorPaths.find { - case (path, actor, index) => path.equals(address) - }.map(x => (x._2, x._3)) - case _ => None - } - - futureLock.synchronized { - if (leaderGateway.isCompleted) { - // assignments happen atomically and only here - leaderGateway = Promise() - leaderIndex = Promise() - } - - selectedLeader match { - case Some((leader, index)) => - leaderGateway.success(new AkkaActorGateway(leader, leaderSessionID)) - leaderIndex.success(index) - case None => - leaderGateway.failure( - new Exception(s"Could not find job manager with address ${address}.")) - leaderIndex.failure( - new Exception(s"Could not find job manager index with address ${address}.") - ) - } - } - } - } - - override def handleError(exception: Exception): Unit = { - futureLock.synchronized { - if(leaderGateway.isCompleted) { - leaderGateway = Promise.failed(exception) - leaderIndex = Promise.failed(exception) - } else{ - leaderGateway.failure(exception) - leaderIndex.failure(exception) - } - } - } - - private def createUserCodeClassLoader( - jars: java.util.List[Path], - classPaths: java.util.List[URL], - parentClassLoader: ClassLoader): URLClassLoader = { - - val urls = new Array[URL](jars.size() + classPaths.size()) - - import scala.collection.JavaConverters._ - - var counter = 0 - - for (path <- jars.asScala) { - urls(counter) = path.makeQualified(path.getFileSystem).toUri.toURL - counter += 1 - } - - for (classPath <- classPaths.asScala) { - urls(counter) = classPath - counter += 1 - } - - FlinkUserCodeClassLoaders.parentFirst(urls, parentClassLoader) - } - - /** - * Run the given job and block until its execution result can be returned. - * - * @param jobGraph to execute - * @return Execution result of the executed job - * @throws JobExecutionException if the job failed to execute - */ - override def executeJobBlocking(jobGraph: JobGraph) = { - submitJobAndWait(jobGraph, false) - } - - override def closeAsync() = { - try { - stop() - CompletableFuture.completedFuture(null) - } catch { - case e: Exception => - FutureUtils.completedExceptionally(e) - } - } -} diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala deleted file mode 100644 index 6b9e4a8d1c4b2..0000000000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ /dev/null @@ -1,464 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.minicluster - -import java.net.InetAddress -import java.util.concurrent.{Executor, ScheduledExecutorService} - -import akka.actor.{ActorRef, ActorSystem, Props} -import org.apache.flink.api.common.JobID -import org.apache.flink.api.common.io.FileOutputFormat -import org.apache.flink.configuration._ -import org.apache.flink.core.fs.Path -import org.apache.flink.runtime.blob.BlobServer -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory -import org.apache.flink.runtime.clusterframework.FlinkResourceManager -import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager -import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable} -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory -import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} -import org.apache.flink.runtime.instance.InstanceManager -import org.apache.flink.runtime.io.disk.iomanager.IOManager -import org.apache.flink.runtime.io.network.NetworkEnvironment -import org.apache.flink.runtime.io.network.netty.NettyConfig -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler -import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore} -import org.apache.flink.runtime.jobmaster.JobMaster -import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService -import org.apache.flink.runtime.memory.MemoryManager -import org.apache.flink.runtime.messages.JobManagerMessages -import org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, StoppingFailure, StoppingResponse} -import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, TaskManagerMetricGroup} -import org.apache.flink.runtime.metrics.util.{MetricUtils, SystemResourcesMetricsInitializer} -import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager -import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} -import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} -import org.apache.flink.runtime.util.EnvironmentInformation -import org.apache.flink.util.NetUtils - -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{Await, ExecutionContext} - -/** - * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same - * JVM. It extends the [[FlinkMiniCluster]] by having convenience functions to setup Flink's - * configuration and implementations to create [[JobManager]] and [[TaskManager]]. - * - * @param userConfiguration Configuration object with the user provided configuration values - * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same - * [[ActorSystem]], otherwise false - */ -class LocalFlinkMiniCluster( - userConfiguration: Configuration, - highAvailabilityServices: HighAvailabilityServices, - singleActorSystem: Boolean) extends FlinkMiniCluster( - userConfiguration, - highAvailabilityServices, - singleActorSystem) { - - def this(userConfiguration: Configuration, useSingleActorSystem: Boolean) = { - this( - userConfiguration, - HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( - userConfiguration, - ExecutionContext.global), - useSingleActorSystem) - } - - def this(userConfiguration: Configuration) = this(userConfiguration, true) - - // -------------------------------------------------------------------------- - - override def generateConfiguration(userConfiguration: Configuration): Configuration = { - val config = getDefaultConfig - - setDefaultCiConfig(config) - - config.addAll(userConfiguration) - setMemory(config) - initializeIOFormatClasses(config) - config - } - - //------------------------------------------------------------------------------------------------ - // Actor classes - //------------------------------------------------------------------------------------------------ - - val jobManagerClass: Class[_ <: JobManager] = classOf[JobManager] - - val taskManagerClass: Class[_ <: TaskManager] = classOf[TaskManager] - - val memoryArchivistClass: Class[_ <: MemoryArchivist] = classOf[MemoryArchivist] - - val resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]] = - classOf[StandaloneResourceManager] - - //------------------------------------------------------------------------------------------------ - // Start methods for the distributed components - //------------------------------------------------------------------------------------------------ - - override def startJobManager( - index: Int, - system: ActorSystem, - optRestAddress: Option[String]) - : ActorRef = { - val config = originalConfiguration.clone() - - val jobManagerName = getJobManagerName(index) - val archiveName = getArchiveName(index) - - val jobManagerPort = config.getInteger(JobManagerOptions.PORT) - - if(jobManagerPort > 0) { - config.setInteger(JobManagerOptions.PORT, jobManagerPort + index) - } - - val (instanceManager, - scheduler, - blobServer, - libraryCacheManager, - restartStrategyFactory, - timeout, - archiveCount, - archivePath, - jobRecoveryTimeout, - jobManagerMetricGroup) = JobManager.createJobManagerComponents( - config, - futureExecutor, - ioExecutor, - highAvailabilityServices.createBlobStore(), - metricRegistryOpt.get) - - val archive = system.actorOf( - getArchiveProps( - memoryArchivistClass, - archiveCount, - archivePath), - archiveName) - - system.actorOf( - getJobManagerProps( - jobManagerClass, - config, - futureExecutor, - ioExecutor, - instanceManager, - scheduler, - blobServer, - libraryCacheManager, - archive, - restartStrategyFactory, - timeout, - highAvailabilityServices.getJobManagerLeaderElectionService( - HighAvailabilityServices.DEFAULT_JOB_ID), - highAvailabilityServices.getSubmittedJobGraphStore(), - highAvailabilityServices.getCheckpointRecoveryFactory(), - jobRecoveryTimeout, - jobManagerMetricGroup, - optRestAddress), - jobManagerName) - } - - override def startResourceManager(index: Int, system: ActorSystem): ActorRef = { - val config = originalConfiguration.clone() - - val resourceManagerName = getResourceManagerName(index) - - val resourceManagerPort = config.getInteger( - ResourceManagerOptions.IPC_PORT) - - if(resourceManagerPort > 0) { - config.setInteger(ResourceManagerOptions.IPC_PORT, - resourceManagerPort + index) - } - - val resourceManagerProps = getResourceManagerProps( - resourceManagerClass, - config, - highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID)) - - system.actorOf(resourceManagerProps, resourceManagerName) - } - - override def startTaskManager(index: Int, system: ActorSystem): ActorRef = { - val config = originalConfiguration.clone() - - val rpcPortRange = config.getString(TaskManagerOptions.RPC_PORT) - - val rpcPortIterator = NetUtils.getPortRangeFromString(rpcPortRange) - - val dataPort = config.getInteger(TaskManagerOptions.DATA_PORT) - - if (rpcPortIterator.hasNext) { - val rpcPort = rpcPortIterator.next() - if (rpcPort > 0) { - config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index) - } - } - if (dataPort > 0) { - config.setInteger(TaskManagerOptions.DATA_PORT, dataPort + index) - } - - val localExecution = numTaskManagers == 1 - - val taskManagerActorName = if (singleActorSystem) { - TaskExecutor.TASK_MANAGER_NAME + "_" + (index + 1) - } else { - TaskExecutor.TASK_MANAGER_NAME - } - - val resourceID = ResourceID.generate() // generate random resource id - - val taskManagerAddress = InetAddress.getByName(hostname) - - val taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(config) - val taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( - config, - taskManagerAddress, - localExecution) - - val taskManagerServices = TaskManagerServices.fromConfiguration( - taskManagerServicesConfiguration, - resourceID, - ioExecutor, - EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag, - EnvironmentInformation.getMaxJvmHeapMemory) - - val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( - metricRegistryOpt.get, - taskManagerServices.getTaskManagerLocation(), - taskManagerServices.getNetworkEnvironment(), - taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval) - - val props = getTaskManagerProps( - taskManagerClass, - taskManagerConfiguration, - resourceID, - taskManagerServices.getTaskManagerLocation(), - taskManagerServices.getMemoryManager(), - taskManagerServices.getIOManager(), - taskManagerServices.getNetworkEnvironment, - taskManagerServices.getTaskManagerStateStore, - taskManagerMetricGroup) - - system.actorOf(props, taskManagerActorName) - } - - //------------------------------------------------------------------------------------------------ - // Props for the distributed components - //------------------------------------------------------------------------------------------------ - - def getArchiveProps( - archiveClass: Class[_ <: MemoryArchivist], - archiveCount: Int, - arhivePath: Option[Path]): Props = { - JobManager.getArchiveProps(archiveClass, archiveCount, Option.empty) - } - - def getJobManagerProps( - jobManagerClass: Class[_ <: JobManager], - configuration: Configuration, - futureExecutor: ScheduledExecutorService, - ioExecutor: Executor, - instanceManager: InstanceManager, - scheduler: Scheduler, - blobServer: BlobServer, - libraryCacheManager: BlobLibraryCacheManager, - archive: ActorRef, - restartStrategyFactory: RestartStrategyFactory, - timeout: FiniteDuration, - leaderElectionService: LeaderElectionService, - submittedJobGraphStore: SubmittedJobGraphStore, - checkpointRecoveryFactory: CheckpointRecoveryFactory, - jobRecoveryTimeout: FiniteDuration, - jobManagerMetricGroup: JobManagerMetricGroup, - optRestAddress: Option[String]) - : Props = { - - JobManager.getJobManagerProps( - jobManagerClass, - configuration, - futureExecutor, - ioExecutor, - instanceManager, - scheduler, - blobServer, - libraryCacheManager, - archive, - restartStrategyFactory, - timeout, - leaderElectionService, - submittedJobGraphStore, - checkpointRecoveryFactory, - jobRecoveryTimeout, - jobManagerMetricGroup, - optRestAddress) - } - - def getTaskManagerProps( - taskManagerClass: Class[_ <: TaskManager], - taskManagerConfig: TaskManagerConfiguration, - resourceID: ResourceID, - taskManagerLocation: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - networkEnvironment: NetworkEnvironment, - taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager, - taskManagerMetricGroup: TaskManagerMetricGroup): Props = { - - TaskManager.getTaskManagerProps( - taskManagerClass, - taskManagerConfig, - resourceID, - taskManagerLocation, - memoryManager, - ioManager, - networkEnvironment, - taskManagerLocalStateStoresManager, - highAvailabilityServices, - taskManagerMetricGroup) - } - - def getResourceManagerProps( - resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]], - configuration: Configuration, - leaderRetrievalService: LeaderRetrievalService): Props = { - - FlinkResourceManager.getResourceManagerProps( - resourceManagerClass, - configuration, - leaderRetrievalService) - } - - //------------------------------------------------------------------------------------------------ - // Helper methods - //------------------------------------------------------------------------------------------------ - - def initializeIOFormatClasses(configuration: Configuration): Unit = { - try { - val om = classOf[FileOutputFormat[_]].getDeclaredMethod("initDefaultsFromConfiguration", - classOf[Configuration]) - om.setAccessible(true) - om.invoke(null, configuration) - } catch { - case e: Exception => - LOG.error("Cannot (re) initialize the globally loaded defaults. Some classes might not " + - "follow the specified default behaviour.") - } - } - - def setMemory(config: Configuration): Unit = { - // set this only if no memory was pre-configured - if (config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals( - TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())) { - - val numTaskManager = config.getInteger( - ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, - ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER) - - val memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION) - - // full memory size - var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag - - // compute the memory size per task manager. we assume equally much memory for - // each TaskManagers and each JobManager - memorySize /= numTaskManager + 1 // the +1 is the job manager - - // for each TaskManager, subtract the memory needed for network memory buffers - memorySize -= TaskManagerServices.calculateNetworkBufferMemory(memorySize, config) - memorySize = (memorySize * memoryFraction).toLong - memorySize >>= 20 // bytes to megabytes - config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, memorySize + "m") - } - } - - def getDefaultConfig: Configuration = { - val config: Configuration = new Configuration() - - config.setString(JobManagerOptions.ADDRESS, hostname) - config.setInteger(JobManagerOptions.PORT, 0) - - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, - ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER) - - // Reduce number of threads for local execution - config.setInteger(NettyConfig.NUM_THREADS_CLIENT, 1) - config.setInteger(NettyConfig.NUM_THREADS_SERVER, 2) - - config - } - - protected def getJobManagerName(index: Int): String = { - if(singleActorSystem) { - JobMaster.JOB_MANAGER_NAME + "_" + (index + 1) - } else { - JobMaster.JOB_MANAGER_NAME - } - } - - protected def getResourceManagerName(index: Int): String = { - if(singleActorSystem) { - FlinkResourceManager.RESOURCE_MANAGER_NAME + "_" + (index + 1) - } else { - FlinkResourceManager.RESOURCE_MANAGER_NAME - } - } - - protected def getArchiveName(index: Int): String = { - if(singleActorSystem) { - JobMaster.ARCHIVE_NAME + "_" + (index + 1) - } else { - JobMaster.ARCHIVE_NAME - } - } - - // -------------------------------------------------------------------------- - // Actions on running jobs - // -------------------------------------------------------------------------- - - def currentlyRunningJobs: Iterable[JobID] = { - val leader = getLeaderGateway(timeout) - val future = leader.ask(JobManagerMessages.RequestRunningJobsStatus, timeout) - .mapTo[RunningJobsStatus] - Await.result(future, timeout).runningJobs.map(_.getJobId) - } - - def getCurrentlyRunningJobsJava(): java.util.List[JobID] = { - val list = new java.util.ArrayList[JobID]() - currentlyRunningJobs.foreach(list.add) - list - } - - def stopJob(id: JobID) : Unit = { - val leader = getLeaderGateway(timeout) - val response = leader.ask(new JobManagerMessages.StopJob(id), timeout) - .mapTo[StoppingResponse] - val rc = Await.result(response, timeout) - - rc match { - case failure: StoppingFailure => - throw new Exception(s"Stopping the job with ID $id failed.", failure.cause) - case _ => - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java index f6082f4e081fd..e19fe2020ddaa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java @@ -18,18 +18,13 @@ package org.apache.flink.runtime.jobmaster; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.io.network.api.reader.RecordReader; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.minicluster.TestingMiniCluster; import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; -import org.apache.flink.types.IntValue; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -76,11 +71,11 @@ public void testCoLocationConstraintJobExecution() throws Exception { private JobGraph createJobGraph(int parallelism) { final JobVertex sender = new JobVertex("Sender"); sender.setParallelism(parallelism); - sender.setInvokableClass(Sender.class); + sender.setInvokableClass(TestingAbstractInvokables.Sender.class); final JobVertex receiver = new JobVertex("Receiver"); receiver.setParallelism(parallelism); - receiver.setInvokableClass(Receiver.class); + receiver.setInvokableClass(TestingAbstractInvokables.Receiver.class); // In order to make testCoLocationConstraintJobExecution fail, one needs to // remove the co-location constraint and the slot sharing groups, because then @@ -98,54 +93,4 @@ private JobGraph createJobGraph(int parallelism) { return jobGraph; } - - /** - * Basic sender {@link AbstractInvokable} which sends 42 and 1337 down stream. - */ - public static class Sender extends AbstractInvokable { - - public Sender(Environment environment) { - super(environment); - } - - @Override - public void invoke() throws Exception { - final RecordWriter writer = new RecordWriter<>(getEnvironment().getWriter(0)); - - try { - writer.emit(new IntValue(42)); - writer.emit(new IntValue(1337)); - writer.flushAll(); - } finally { - writer.clearBuffers(); - } - } - } - - /** - * Basic receiver {@link AbstractInvokable} which verifies the sent elements - * from the {@link Sender}. - */ - public static class Receiver extends AbstractInvokable { - - public Receiver(Environment environment) { - super(environment); - } - - @Override - public void invoke() throws Exception { - final RecordReader reader = new RecordReader<>( - getEnvironment().getInputGate(0), - IntValue.class, - getEnvironment().getTaskManagerInfo().getTmpDirectories()); - - final IntValue i1 = reader.next(); - final IntValue i2 = reader.next(); - final IntValue i3 = reader.next(); - - if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) { - throw new Exception("Wrong data received."); - } - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java index 9a16e3229df18..d003912eae353 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.testutils.MiniClusterResource; @@ -81,7 +80,7 @@ private void runTaskFailureRecoveryTest(final JobGraph jobGraph) throws Exceptio private JobGraph createjobGraph(boolean slotSharingEnabled) throws IOException { final JobVertex sender = new JobVertex("Sender"); sender.setParallelism(PARALLELISM); - sender.setInvokableClass(Tasks.Sender.class); + sender.setInvokableClass(TestingAbstractInvokables.Sender.class); final JobVertex receiver = new JobVertex("Receiver"); receiver.setParallelism(PARALLELISM); @@ -108,7 +107,7 @@ private JobGraph createjobGraph(boolean slotSharingEnabled) throws IOException { /** * Receiver which fails once before successfully completing. */ - public static final class FailingOnceReceiver extends JobExecutionITCase.Receiver { + public static final class FailingOnceReceiver extends TestingAbstractInvokables.Receiver { private static volatile boolean failed = false; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java new file mode 100644 index 0000000000000..d227918b82391 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.types.IntValue; + +/** + * {@link AbstractInvokable} for testing purposes. + */ +public class TestingAbstractInvokables { + + private TestingAbstractInvokables() { + throw new UnsupportedOperationException(getClass().getSimpleName() + " should not be instantiated."); + } + + /** + * Basic sender {@link AbstractInvokable} which sends 42 and 1337 down stream. + */ + public static class Sender extends AbstractInvokable { + + public Sender(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + final RecordWriter writer = new RecordWriter<>(getEnvironment().getWriter(0)); + + try { + writer.emit(new IntValue(42)); + writer.emit(new IntValue(1337)); + writer.flushAll(); + } finally { + writer.clearBuffers(); + } + } + } + + /** + * Basic receiver {@link AbstractInvokable} which verifies the sent elements + * from the {@link Sender}. + */ + public static class Receiver extends AbstractInvokable { + + public Receiver(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + final RecordReader reader = new RecordReader<>( + getEnvironment().getInputGate(0), + IntValue.class, + getEnvironment().getTaskManagerInfo().getTmpDirectories()); + + final IntValue i1 = reader.next(); + final IntValue i2 = reader.next(); + final IntValue i3 = reader.next(); + + if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) { + throw new Exception("Wrong data received."); + } + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index cd541053e8e05..3744f11618415 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -38,11 +38,11 @@ import org.apache.flink.runtime.jobmanager.Tasks.ExceptionSender; import org.apache.flink.runtime.jobmanager.Tasks.Forwarder; import org.apache.flink.runtime.jobmanager.Tasks.InstantiationErrorSender; -import org.apache.flink.runtime.jobmanager.Tasks.Receiver; -import org.apache.flink.runtime.jobmanager.Tasks.Sender; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables.Receiver; +import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables.Sender; import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 8ab55fefefa58..71c2775374e10 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -23,12 +23,16 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables; import org.apache.flink.runtime.minicluster.TestingMiniCluster; import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; import org.apache.flink.runtime.testutils.CommonTestUtils; @@ -141,7 +145,8 @@ private CompletableFuture submitJobAndWaitUntilRunning(JobGraph jobGr } private SupplierWithException jobIsRunning(Supplier> executionGraphFutureSupplier) { - final Predicate allExecutionsRunning = ExecutionGraphTestUtils.allExecutionsPredicate(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING)); + final Predicate runningOrFinished = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.FINISHED)); + final Predicate allExecutionsRunning = ExecutionGraphTestUtils.allExecutionsPredicate(runningOrFinished); return () -> { final AccessExecutionGraph executionGraph = executionGraphFutureSupplier.get().join(); @@ -159,19 +164,28 @@ private JobGraph createJobGraphWithRestartStrategy(int parallelism) throws IOExc } private JobGraph createJobGraph(int parallelism) { - final JobVertex vertex = new JobVertex("blocking operator"); - vertex.setParallelism(parallelism); - vertex.setInvokableClass(BlockingOperator.class); + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(parallelism); + sender.setInvokableClass(TestingAbstractInvokables.Sender.class); + final JobVertex receiver = new JobVertex("Blocking receiver"); + receiver.setParallelism(parallelism); + receiver.setInvokableClass(BlockingOperator.class); BlockingOperator.reset(); - return new JobGraph("Blocking test job", vertex); + receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + + final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); + sender.setSlotSharingGroup(slotSharingGroup); + receiver.setSlotSharingGroup(slotSharingGroup); + + return new JobGraph("Blocking test job with slot sharing", sender, receiver); } /** * Blocking invokable which is controlled by a static field. */ - public static class BlockingOperator extends AbstractInvokable { + public static class BlockingOperator extends TestingAbstractInvokables.Receiver { private static CountDownLatch countDownLatch = new CountDownLatch(1); public BlockingOperator(Environment environment) { @@ -181,6 +195,7 @@ public BlockingOperator(Environment environment) { @Override public void invoke() throws Exception { countDownLatch.await(); + super.invoke(); } public static void unblock() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index a397b6ca915a2..79809e32c79c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -53,6 +53,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; @@ -585,7 +586,7 @@ public void testGateChannelEdgeMismatch() { jid, "TestJob", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "Sender", 1, 0, 1, 0, - new Configuration(), new Configuration(), Tasks.Sender.class.getName(), + new Configuration(), new Configuration(), TestingAbstractInvokables.Sender.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList<>(), Collections.emptyList(), 0); @@ -594,7 +595,7 @@ public void testGateChannelEdgeMismatch() { jid, "TestJob", vid2, eid2, new SerializedValue<>(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, - new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), + new Configuration(), new Configuration(), TestingAbstractInvokables.Receiver.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList<>(), Collections.emptyList(), 0); @@ -694,7 +695,7 @@ public void testRunJobWithForwardChannel() { jid, "TestJob", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "Sender", 1, 0, 1, 0, - new Configuration(), new Configuration(), Tasks.Sender.class.getName(), + new Configuration(), new Configuration(), TestingAbstractInvokables.Sender.class.getName(), irpdd, Collections.emptyList(), new ArrayList<>(), Collections.emptyList(), 0); @@ -702,7 +703,7 @@ public void testRunJobWithForwardChannel() { jid, "TestJob", vid2, eid2, new SerializedValue<>(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, - new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), + new Configuration(), new Configuration(), TestingAbstractInvokables.Receiver.class.getName(), Collections.emptyList(), Collections.singletonList(ircdd), new ArrayList<>(), Collections.emptyList(), 0); @@ -843,7 +844,7 @@ public void testCancellingDependentAndStateUpdateFails() { jid, "TestJob", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "Sender", 1, 0, 1, 0, - new Configuration(), new Configuration(), Tasks.Sender.class.getName(), + new Configuration(), new Configuration(), TestingAbstractInvokables.Sender.class.getName(), irpdd, Collections.emptyList(), new ArrayList<>(), Collections.emptyList(), 0); diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala deleted file mode 100644 index e2702c7d2cf45..0000000000000 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmanager - -import akka.actor.{ActorSystem, Kill, PoisonPill} -import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.runtime.akka.ListeningBehaviour -import org.apache.flink.runtime.client.JobExecutionException -import org.apache.flink.runtime.io.network.partition.ResultPartitionType -import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} -import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup -import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailure, JobSubmitSuccess, SubmitJob} -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ -import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - -import scala.concurrent.duration._ - -@RunWith(classOf[JUnitRunner]) -class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem) - extends TestKit(_system) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with ScalaTestingUtils { - - def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig)) - - override def afterAll(): Unit = { - TestKit.shutdownActorSystem(system) - } - - "The JobManager" should { - "handle gracefully failing task manager with slot sharing" in { - val num_tasks = 100 - - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - - sender.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass(classOf[BlockingReceiver]) - - sender.setParallelism(num_tasks) - receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val sharingGroup = new SlotSharingGroup() - sender.setSlotSharingGroup(sharingGroup) - receiver.setSlotSharingGroup(sharingGroup) - - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) - val jobID = jobGraph.getJobID - - val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2) - val jmGateway = cluster.getLeaderGateway(1 seconds) - val taskManagers = cluster.getTaskManagers - - try{ - within(TestingUtils.TESTING_DURATION) { - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - - jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self) - - expectMsg(AllVerticesRunning(jobID)) - - //kill task manager - taskManagers(0) ! PoisonPill - - val failure = expectMsgType[JobResultFailure] - val exception = failure.cause.deserializeError(getClass.getClassLoader()) - exception match { - case e: JobExecutionException => - jobGraph.getJobID should equal(e.getJobID) - case e => fail(s"Received wrong exception $e.") - } - } - } finally { - cluster.stop() - } - } - - "handle hard failing task manager with slot sharing" in { - val num_tasks = 20 - - val sender = new JobVertex("Sender") - val receiver = new JobVertex("Receiver") - - sender.setInvokableClass(classOf[Sender]) - receiver.setInvokableClass(classOf[BlockingReceiver]) - - sender.setParallelism(num_tasks) - receiver.setParallelism(num_tasks) - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED) - - val sharingGroup = new SlotSharingGroup() - sender.setSlotSharingGroup(sharingGroup) - receiver.setSlotSharingGroup(sharingGroup) - - val jobGraph = new JobGraph("Pointwise Job", sender, receiver) - val jobID = jobGraph.getJobID - - val cluster = TestingUtils.startTestingCluster(num_tasks/2, 2) - val jmGateway = cluster.getLeaderGateway(1 seconds) - val taskManagers = cluster.getTaskManagers - - try{ - within(TestingUtils.TESTING_DURATION) { - jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self) - expectMsg(JobSubmitSuccess(jobGraph.getJobID)) - - jmGateway.tell(WaitForAllVerticesToBeRunningOrFinished(jobID), self) - expectMsg(AllVerticesRunning(jobID)) - - //kill task manager - taskManagers(0) ! Kill - - val failure = expectMsgType[JobResultFailure] - val exception = failure.cause.deserializeError(getClass.getClassLoader()) - exception match { - case e: JobExecutionException => - jobGraph.getJobID should equal(e.getJobID) - - case e => fail(s"Received wrong exception $e.") - } - } - }finally{ - cluster.stop() - } - } - } - -} diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala index 029f5ef5112bc..472d15c8ae0e9 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala @@ -26,22 +26,6 @@ import org.apache.flink.types.IntValue object Tasks { - class Sender(environment: Environment) - extends AbstractInvokable(environment) { - - override def invoke(): Unit = { - val writer = new RecordWriter[IntValue](getEnvironment.getWriter(0)) - - try{ - writer.emit(new IntValue(42)) - writer.emit(new IntValue(1337)) - writer.flushAll() - }finally{ - writer.clearBuffers() - } - } - } - class Forwarder(environment: Environment) extends AbstractInvokable(environment) { @@ -71,46 +55,6 @@ object Tasks { } } - class Receiver(environment: Environment) - extends AbstractInvokable(environment) { - - override def invoke(): Unit = { - val reader = new RecordReader[IntValue]( - getEnvironment.getInputGate(0), - classOf[IntValue], - getEnvironment.getTaskManagerInfo.getTmpDirectories) - - val i1 = reader.next() - val i2 = reader.next() - val i3 = reader.next() - - if(i1.getValue != 42 || i2.getValue != 1337 || i3 != null){ - throw new Exception("Wrong data received.") - } - } - } - - class BlockingOnceReceiver(environment: Environment) - extends Receiver(environment) { - import BlockingOnceReceiver.blocking - - override def invoke(): Unit = { - if(blocking) { - val o = new Object - o.synchronized{ - o.wait() - } - } else { - super.invoke() - } - } - - } - - object BlockingOnceReceiver{ - var blocking = true - } - class AgnosticReceiver(environment: Environment) extends AbstractInvokable(environment) { diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala deleted file mode 100644 index f722935f90d2e..0000000000000 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ /dev/null @@ -1,440 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.testingUtils - -import java.io.IOException -import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit, TimeoutException} - -import akka.actor.{ActorRef, ActorSystem, Props} -import akka.pattern.Patterns._ -import akka.pattern.ask -import akka.testkit.CallingThreadDispatcher -import org.apache.flink.api.common.JobID -import org.apache.flink.configuration.{Configuration, JobManagerOptions} -import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.blob.BlobServer -import org.apache.flink.runtime.checkpoint.savepoint.Savepoint -import org.apache.flink.runtime.checkpoint.{CheckpointOptions, CheckpointRecoveryFactory, CheckpointRetentionPolicy} -import org.apache.flink.runtime.clusterframework.FlinkResourceManager -import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable -import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory -import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} -import org.apache.flink.runtime.instance.{ActorGateway, InstanceManager} -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler -import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore} -import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.messages.JobManagerMessages -import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster -import org.apache.flink.runtime.taskmanager.TaskManager -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ -import org.apache.flink.runtime.testingUtils.TestingMessages.Alive -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager -import org.apache.flink.runtime.testutils.TestingResourceManager - -import scala.concurrent.duration.Duration -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{Await, ExecutionContext, Future} - -/** - * Testing cluster which starts the [[JobManager]] and [[TaskManager]] actors with testing support - * in the same or separate [[ActorSystem]]s. - * - * @param userConfiguration Configuration object with the user provided configuration values - * @param singleActorSystem true if all actors shall be running in the same [[ActorSystem]], - * otherwise false - */ -class TestingCluster( - userConfiguration: Configuration, - highAvailabilityServices: HighAvailabilityServices, - singleActorSystem: Boolean, - synchronousDispatcher: Boolean) - extends LocalFlinkMiniCluster( - userConfiguration, - highAvailabilityServices, - singleActorSystem) { - - def this( - userConfiguration: Configuration, - singleActorSystem: Boolean, - synchronousDispatcher: Boolean) = { - this( - userConfiguration, - HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( - userConfiguration, - ExecutionContext.global), - singleActorSystem, - synchronousDispatcher) - } - - def this(userConfiguration: Configuration, singleActorSystem: Boolean) = { - this( - userConfiguration, - singleActorSystem, - false) - } - - def this(userConfiguration: Configuration) = this(userConfiguration, true) - - // -------------------------------------------------------------------------- - - override val jobManagerClass: Class[_ <: JobManager] = classOf[TestingJobManager] - - override val resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]] = - classOf[TestingResourceManager] - - override val taskManagerClass: Class[_ <: TaskManager] = classOf[TestingTaskManager] - - override val memoryArchivistClass: Class[_ <: MemoryArchivist] = classOf[TestingMemoryArchivist] - - override def getJobManagerProps( - jobManagerClass: Class[_ <: JobManager], - configuration: Configuration, - futureExecutor: ScheduledExecutorService, - ioExecutor: Executor, - instanceManager: InstanceManager, - scheduler: Scheduler, - blobServer: BlobServer, - libraryCacheManager: BlobLibraryCacheManager, - archive: ActorRef, - restartStrategyFactory: RestartStrategyFactory, - timeout: FiniteDuration, - leaderElectionService: LeaderElectionService, - submittedJobGraphStore: SubmittedJobGraphStore, - checkpointRecoveryFactory: CheckpointRecoveryFactory, - jobRecoveryTimeout: FiniteDuration, - jobManagerMetricGroup: JobManagerMetricGroup, - optRestAddress: Option[String]): Props = { - - val props = super.getJobManagerProps( - jobManagerClass, - configuration, - futureExecutor, - ioExecutor, - instanceManager, - scheduler, - blobServer, - libraryCacheManager, - archive, - restartStrategyFactory, - timeout, - leaderElectionService, - submittedJobGraphStore, - checkpointRecoveryFactory, - jobRecoveryTimeout, - jobManagerMetricGroup, - optRestAddress) - - if (synchronousDispatcher) { - props.withDispatcher(CallingThreadDispatcher.Id) - } else { - props - } - } - - @throws(classOf[TimeoutException]) - @throws(classOf[InterruptedException]) - def waitForTaskManagersToBeAlive(): Unit = { - val aliveFutures = taskManagerActors map { - _ map { - tm => (tm ? Alive)(timeout) - } - } getOrElse(Seq()) - - val combinedFuture = Future.sequence(aliveFutures) - - Await.ready(combinedFuture, timeout) - } - - @throws(classOf[TimeoutException]) - @throws(classOf[InterruptedException]) - def waitForActorsToBeAlive(): Unit = { - val tmsAliveFutures = taskManagerActors map { - _ map { - tm => (tm ? Alive)(timeout) - } - } getOrElse(Seq()) - - val jmsAliveFutures = jobManagerActors map { - _ map { - jm => (jm ? Alive)(timeout) - } - } getOrElse(Seq()) - - val resourceManagersAliveFutures = resourceManagerActors map { - _ map { - rm => (rm ? Alive)(timeout) - } - } getOrElse(Seq()) - - val combinedFuture = Future.sequence(tmsAliveFutures ++ jmsAliveFutures ++ - resourceManagersAliveFutures) - - Await.ready(combinedFuture, timeout) - } - - def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = { - val futures = taskManagerActors.map { - _.map { - tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout) - } - }.getOrElse(Seq()) - - try { - Await.ready(Future.sequence(futures), timeout) - } catch { - case t: TimeoutException => - throw new Exception("Timeout while waiting for TaskManagers to register at " + - s"${jobManager.path}") - } - - } - - def restartLeadingJobManager(): Unit = { - this.synchronized { - (jobManagerActorSystems, jobManagerActors) match { - case (Some(jmActorSystems), Some(jmActors)) => - val leader = getLeaderGateway(AkkaUtils.getTimeout(originalConfiguration)) - val index = getLeaderIndex(AkkaUtils.getTimeout(originalConfiguration)) - - // restart the leading job manager with the same port - val port = getLeaderRPCPort - val oldPort = originalConfiguration.getInteger( - JobManagerOptions.PORT, - 0) - - // we have to set the old port in the configuration file because this is used for startup - originalConfiguration.setInteger(JobManagerOptions.PORT, port) - - clearLeader() - - val stopped = gracefulStop(leader.actor(), TestingCluster.MAX_RESTART_DURATION) - Await.result(stopped, TestingCluster.MAX_RESTART_DURATION) - - if(!singleActorSystem) { - jmActorSystems(index).terminate() - Await.ready(jmActorSystems(index).whenTerminated, Duration.Inf) - } - - val newJobManagerActorSystem = if(!singleActorSystem) { - startJobManagerActorSystem(index) - } else { - jmActorSystems.head - } - - // reset the original configuration - originalConfiguration.setInteger(JobManagerOptions.PORT, oldPort) - - val newJobManagerActor = startJobManager( - index, - newJobManagerActorSystem, - webMonitor.map(_.getRestAddress)) - - jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1)) - jobManagerActorSystems = Some(jmActorSystems.patch( - index, - Seq(newJobManagerActorSystem), - 1)) - - jobManagerLeaderRetrievalService.foreach(_.stop()) - - jobManagerLeaderRetrievalService = Option( - highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID)) - - jobManagerLeaderRetrievalService.foreach(_.start(this)) - - case _ => throw new Exception("The JobManager of the TestingCluster have not " + - "been started properly.") - } - } - } - - def restartTaskManager(index: Int): Unit = { - (taskManagerActorSystems, taskManagerActors) match { - case (Some(tmActorSystems), Some(tmActors)) => - val stopped = gracefulStop(tmActors(index), TestingCluster.MAX_RESTART_DURATION) - Await.result(stopped, TestingCluster.MAX_RESTART_DURATION) - - if(!singleActorSystem) { - tmActorSystems(index).terminate() - Await.ready(tmActorSystems(index).whenTerminated, Duration.Inf) - } - - val taskManagerActorSystem = if(!singleActorSystem) { - startTaskManagerActorSystem(index) - } else { - tmActorSystems.head - } - - val taskManagerActor = startTaskManager(index, taskManagerActorSystem) - - taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1)) - taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1)) - - case _ => throw new Exception("The TaskManager of the TestingCluster have not " + - "been started properly.") - } - } - - def addTaskManager(): Unit = { - if (useSingleActorSystem) { - (jobManagerActorSystems, taskManagerActors) match { - case (Some(jmSystems), Some(tmActors)) => - val index = numTaskManagers - taskManagerActors = Some(tmActors :+ startTaskManager(index, jmSystems(0))) - numTaskManagers += 1 - case _ => throw new IllegalStateException("Cluster has not been started properly.") - } - } else { - (taskManagerActorSystems, taskManagerActors) match { - case (Some(tmSystems), Some(tmActors)) => - val index = numTaskManagers - val newTmSystem = startTaskManagerActorSystem(index) - val newTmActor = startTaskManager(index, newTmSystem) - - taskManagerActorSystems = Some(tmSystems :+ newTmSystem) - taskManagerActors = Some(tmActors :+ newTmActor) - - numTaskManagers += 1 - case _ => throw new IllegalStateException("Cluster has not been started properly.") - } - } - } - - @throws(classOf[IOException]) - def triggerSavepoint(jobId: JobID): String = { - val timeout = AkkaUtils.getTimeout(configuration) - triggerSavepoint(jobId, getLeaderGateway(timeout), timeout) - } - - @throws(classOf[IOException]) - def requestSavepoint(savepointPath: String): Savepoint = { - val timeout = AkkaUtils.getTimeout(configuration) - requestSavepoint(savepointPath, getLeaderGateway(timeout), timeout) - } - - @throws(classOf[IOException]) - def disposeSavepoint(savepointPath: String): Unit = { - val timeout = AkkaUtils.getTimeout(configuration) - disposeSavepoint(savepointPath, getLeaderGateway(timeout), timeout) - } - - @throws(classOf[IOException]) - def triggerSavepoint( - jobId: JobID, - jobManager: ActorGateway, - timeout: FiniteDuration): String = { - val result = Await.result( - jobManager.ask( - TriggerSavepoint(jobId), timeout), timeout) - - result match { - case success: TriggerSavepointSuccess => success.savepointPath - case fail: TriggerSavepointFailure => throw new IOException(fail.cause) - case _ => throw new IllegalStateException("Trigger savepoint failed") - } - } - - @throws(classOf[IOException]) - def requestSavepoint( - savepointPath: String, - jobManager: ActorGateway, - timeout: FiniteDuration): Savepoint = { - val result = Await.result( - jobManager.ask( - TestingJobManagerMessages.RequestSavepoint(savepointPath), timeout), timeout) - - result match { - case success: ResponseSavepoint => success.savepoint - case _ => throw new IOException("Request savepoint failed") - } - } - - @throws(classOf[IOException]) - def disposeSavepoint( - savepointPath: String, - jobManager: ActorGateway, - timeout: FiniteDuration): Unit = { - val timeout = AkkaUtils.getTimeout(originalConfiguration) - val jobManager = getLeaderGateway(timeout) - val result = Await.result(jobManager.ask(DisposeSavepoint(savepointPath), timeout), timeout) - result match { - case DisposeSavepointSuccess => - case _ => throw new IOException("Dispose savepoint failed") - } - } - - @throws(classOf[IOException]) - def requestCheckpoint( - jobId: JobID, - checkpointRetentionPolicy: CheckpointRetentionPolicy): String = { - - val jobManagerGateway = getLeaderGateway(timeout) - - // wait until the cluster is ready to take a checkpoint. - val allRunning = jobManagerGateway.ask( - TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobId), timeout) - - Await.ready(allRunning, timeout) - - // trigger checkpoint - val result = Await.result( - jobManagerGateway.ask(CheckpointRequest(jobId, checkpointRetentionPolicy), timeout), timeout) - - result match { - case success: CheckpointRequestSuccess => success.path - case fail: CheckpointRequestFailure => throw fail.cause - case _ => throw new IllegalStateException("Trigger checkpoint failed") - } - } - - /** - * This cancels the given job and waits until it has been completely removed from - * the cluster. - * - * @param jobId identifying the job to cancel - * @throws Exception if something goes wrong - */ - @throws[Exception] - def cancelJob(jobId: JobID): Unit = { - if (getCurrentlyRunningJobsJava.contains(jobId)) { - val jobManagerGateway = getLeaderGateway(timeout) - val jobRemoved = jobManagerGateway.ask(NotifyWhenJobRemoved(jobId), timeout) - val cancelFuture = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout) - val result = Await.result(cancelFuture, timeout) - - result match { - case CancellationFailure(_, cause) => - throw new Exception("Cancellation failed", cause) - case _ => // noop - } - - // wait until the job has been removed - Await.result(jobRemoved, timeout) - } - else throw new IllegalStateException("Job is not running") - } - } - -object TestingCluster { - val MAX_RESTART_DURATION = new FiniteDuration(2, TimeUnit.MINUTES) -} diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 9d791a90f9d17..2c021e9eec706 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -83,21 +83,6 @@ object TestingUtils { def infiniteTime: Time = { Time.milliseconds(Integer.MAX_VALUE); } - - - def startTestingCluster(numSlots: Int, numTMs: Int = 1, - timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = { - val config = new Configuration() - config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots) - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs) - config.setString(AkkaOptions.ASK_TIMEOUT, timeout) - - val cluster = new TestingCluster(config) - - cluster.start() - - cluster - } /** * Gets the shared global testing execution context diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 9140570a89009..df4e6f37dbab4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -446,7 +446,7 @@ public void testCanRestoreWithModifiedStatelessOperators() throws Exception { cluster.after(); } - // create a new TestingCluster to make sure we start with completely + // create a new MiniCluster to make sure we start with completely // new resources cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java index 43de1f4645b30..ba9c373da50c1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java @@ -37,7 +37,7 @@ import static org.junit.Assert.fail; /** - * A series of tests (reusing one FlinkMiniCluster) where tasks fail (one or more time) + * A series of tests (reusing one MiniCluster) where tasks fail (one or more time) * and the recovery should restart them to verify job completion. */ @SuppressWarnings("serial")