diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala index 884c0e7f93a..981522e7fa5 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InstanceIdAssigner.scala @@ -22,6 +22,8 @@ import org.apache.curator.framework.recipes.shared.SharedCount import org.apache.curator.retry.RetryUntilElapsed import org.apache.openwhisk.common.Logging +import scala.collection.JavaConverters._ + /** * Computes the instanceId for invoker * @@ -29,7 +31,7 @@ import org.apache.openwhisk.common.Logging */ private[invoker] class InstanceIdAssigner(connectionString: String)(implicit logger: Logging) { - def getId(name: String): Int = { + def setAndGetId(name: String, overwriteId: Option[Int] = None): Int = { logger.info(this, s"invokerReg: creating zkClient to $connectionString") val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed val zkClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy) @@ -37,36 +39,63 @@ private[invoker] class InstanceIdAssigner(connectionString: String)(implicit log zkClient.blockUntilConnected() logger.info(this, "invokerReg: connected to zookeeper") - val myIdPath = "/invokers/idAssignment/mapping/" + name - val assignedId = Option(zkClient.checkExists().forPath(myIdPath)) match { - case None => - // path doesn't exist -> no previous mapping for this invoker - logger.info(this, s"invokerReg: no prior assignment of id for invoker $name") - val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0) - idCounter.start() + val rootPath = "/invokers/idAssignment/mapping" + val myIdPath = s"$rootPath/$name" + val assignedId = overwriteId + .map(newId => { + val invokers = zkClient.getChildren.forPath(rootPath).asScala - def assignId(): Int = { - val current = idCounter.getVersionedValue() - if (idCounter.trySetCount(current, current.getValue() + 1)) { - current.getValue() - } else { - assignId() - } - } + if (invokers.size < newId) + throw new IllegalArgumentException(s"invokerReg: cannot assign $newId to $name: not enough invokers") + + //check if the invokerId already exists for another unique name and delete if it does + invokers + .map(uniqueName => { + val idPath = s"$rootPath/$uniqueName" + (idPath, BigInt(zkClient.getData.forPath(idPath)).intValue) + }) + .find(_._2 == newId) + .map(id => zkClient.delete().forPath(id._1)) + + zkClient.create().orSetData().forPath(myIdPath, BigInt(newId).toByteArray) - val newId = assignId() - idCounter.close() - zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray) logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId") newId + }) + .getOrElse({ + Option(zkClient.checkExists().forPath(myIdPath)) match { + case None => + // path doesn't exist -> no previous mapping for this invoker + logger.info(this, s"invokerReg: no prior assignment of id for invoker $name") + val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0) + idCounter.start() - case Some(_) => - // path already exists -> there is a previous mapping for this invoker we should use - val rawOldId = zkClient.getData().forPath(myIdPath) - val oldId = BigInt(rawOldId).intValue - logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId") - oldId - } + def assignId(): Int = { + val current = idCounter.getVersionedValue() + val numInvokers = Option(zkClient.checkExists().forPath(rootPath)) + .map(_ => zkClient.getChildren.forPath(rootPath).size()) + .getOrElse(0) + if (idCounter.trySetCount(current, numInvokers + 1)) { + numInvokers + } else { + assignId() + } + } + + val newId = assignId() + idCounter.close() + zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray) + logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId") + newId + + case Some(_) => + // path already exists -> there is a previous mapping for this invoker we should use + val rawOldId = zkClient.getData.forPath(myIdPath) + val oldId = BigInt(rawOldId).intValue + logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId") + oldId + } + }) zkClient.close() assignedId diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 9901f951a37..1b0c8bf797e 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -40,7 +40,10 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.Try -case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None) +case class CmdLineArgs(uniqueName: Option[String] = None, + id: Option[Int] = None, + displayedName: Option[String] = None, + overwriteId: Option[Int] = None) object Invoker { @@ -133,6 +136,8 @@ object Invoker { // --uniqueName a unique name to dynamically assign Kafka topics from Zookeeper // --displayedName a name to identify this invoker via invoker health protocol // --id proposed invokerId + // --overwriteId proposed invokerId to re-write with uniqueName in Zookeeper, + // DO NOT USE overwriteId unless sure invokerId does not exist for other uniqueName def parse(ls: List[String], c: CmdLineArgs): CmdLineArgs = { ls match { case "--uniqueName" :: uniqueName :: tail => @@ -141,6 +146,8 @@ object Invoker { parse(tail, c.copy(displayedName = nonEmptyString(displayedName))) case "--id" :: id :: tail if Try(id.toInt).isSuccess => parse(tail, c.copy(id = Some(id.toInt))) + case "--overwriteId" :: overwriteId :: tail if Try(overwriteId.toInt).isSuccess => + parse(tail, c.copy(overwriteId = Some(overwriteId.toInt))) case Nil => c case _ => abort(s"Error processing command line arguments $ls") } @@ -150,16 +157,16 @@ object Invoker { val assignedInvokerId = cmdLineArgs match { // --id is defined with a valid value, use this id directly. - case CmdLineArgs(_, Some(id), _) => + case CmdLineArgs(_, Some(id), _, _) => logger.info(this, s"invokerReg: using proposedInvokerId $id") id // --uniqueName is defined with a valid value, id is empty, assign an id via zookeeper - case CmdLineArgs(Some(unique), None, _) => + case CmdLineArgs(Some(unique), None, _, overwriteId) => if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":")) { abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})") } - new InstanceIdAssigner(config.zookeeperHosts).getId(unique) + new InstanceIdAssigner(config.zookeeperHosts).setAndGetId(unique, overwriteId) case _ => abort(s"Either --id or --uniqueName must be configured with correct values") } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala index 038f373a1de..112d1190714 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/invoker/test/InstanceIdAssignerTests.scala @@ -40,14 +40,33 @@ class InstanceIdAssignerTests extends FlatSpec with Matchers with StreamLogging it should "assign fresh id" in { val assigner = new InstanceIdAssigner(zkServer.getConnectString) - assigner.getId("foo") shouldBe 0 + assigner.setAndGetId("foo") shouldBe 0 } it should "reuse id if exists" in { val assigner = new InstanceIdAssigner(zkServer.getConnectString) - assigner.getId("foo") shouldBe 0 - assigner.getId("bar") shouldBe 1 - assigner.getId("bar") shouldBe 1 + assigner.setAndGetId("foo") shouldBe 0 + assigner.setAndGetId("bar") shouldBe 1 + assigner.setAndGetId("bar") shouldBe 1 } + it should "attempt to overwrite id for unique name if overwrite set" in { + val assigner = new InstanceIdAssigner(zkServer.getConnectString) + assigner.setAndGetId("foo") shouldBe 0 + assigner.setAndGetId("bar", Some(0)) shouldBe 0 + } + + it should "overwrite an id for unique name that already exists and reset overwritten id" in { + val assigner = new InstanceIdAssigner(zkServer.getConnectString) + assigner.setAndGetId("foo") shouldBe 0 + assigner.setAndGetId("bar", Some(0)) shouldBe 0 + assigner.setAndGetId("foo") shouldBe 1 + assigner.setAndGetId("cat") shouldBe 2 + } + + it should "fail to overwrite an id too large for the invoker pool size" in { + val assigner = new InstanceIdAssigner(zkServer.getConnectString) + assigner.setAndGetId("foo") shouldBe 0 + assertThrows[IllegalArgumentException](assigner.setAndGetId("bar", Some(2))) + } }