Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset / Overwrite invokerId for unique name in zookeeper manually #5024

Merged
merged 5 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,76 @@ import org.apache.curator.framework.recipes.shared.SharedCount
import org.apache.curator.retry.RetryUntilElapsed
import org.apache.openwhisk.common.Logging

import scala.collection.JavaConverters._

/**
* Computes the instanceId for invoker
*
* @param connectionString zooKeeper connection string
*/
private[invoker] class InstanceIdAssigner(connectionString: String)(implicit logger: Logging) {

def getId(name: String): Int = {
def setAndGetId(name: String, overwriteId: Option[Int] = None): Int = {
logger.info(this, s"invokerReg: creating zkClient to $connectionString")
val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms intervals until 5 seconds have elapsed
val zkClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy)
zkClient.start()
zkClient.blockUntilConnected()
logger.info(this, "invokerReg: connected to zookeeper")

val myIdPath = "/invokers/idAssignment/mapping/" + name
val assignedId = Option(zkClient.checkExists().forPath(myIdPath)) match {
case None =>
// path doesn't exist -> no previous mapping for this invoker
logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
idCounter.start()
val rootPath = "/invokers/idAssignment/mapping"
val myIdPath = rootPath + s"/$name"
bdoyle0182 marked this conversation as resolved.
Show resolved Hide resolved
val assignedId = if (overwriteId.isEmpty) {
bdoyle0182 marked this conversation as resolved.
Show resolved Hide resolved
Option(zkClient.checkExists().forPath(myIdPath)) match {
case None =>
// path doesn't exist -> no previous mapping for this invoker
logger.info(this, s"invokerReg: no prior assignment of id for invoker $name")
val idCounter = new SharedCount(zkClient, "/invokers/idAssignment/counter", 0)
idCounter.start()

def assignId(): Int = {
val current = idCounter.getVersionedValue()
if (idCounter.trySetCount(current, current.getValue() + 1)) {
current.getValue()
} else {
assignId()
def assignId(): Int = {
val current = idCounter.getVersionedValue()
if (idCounter.trySetCount(current, current.getValue() + 1)) {
current.getValue()
} else {
assignId()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's "possible" this can recurse forever, should this give up after N trials?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that way yea, but I'd rather deal with that in a separate PR since it's already existing.

}
}
}

val newId = assignId()
idCounter.close()
zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
newId
val newId = assignId()
idCounter.close()
zkClient.create().creatingParentContainersIfNeeded().forPath(myIdPath, BigInt(newId).toByteArray)
logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
newId

case Some(_) =>
// path already exists -> there is a previous mapping for this invoker we should use
val rawOldId = zkClient.getData().forPath(myIdPath)
val oldId = BigInt(rawOldId).intValue
logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
oldId
}
} else {
val newId = overwriteId.get
bdoyle0182 marked this conversation as resolved.
Show resolved Hide resolved

//check if the invokerId already exists for another unique name
val instanceIdExists = zkClient
.getChildren()
.forPath(rootPath)
.asScala
.map(uniqueName => {
val idPath = rootPath + s"/$uniqueName"
bdoyle0182 marked this conversation as resolved.
Show resolved Hide resolved
BigInt(zkClient.getData().forPath(idPath)).intValue
})
.find(_ == newId)

if (instanceIdExists.nonEmpty) {
throw new IllegalArgumentException(s"invokerReg: an invoker with id $newId already exists in zookeeper")
}

case Some(_) =>
// path already exists -> there is a previous mapping for this invoker we should use
val rawOldId = zkClient.getData().forPath(myIdPath)
val oldId = BigInt(rawOldId).intValue
logger.info(this, s"invokerReg: invoker $name was assigned its previous invokerId $oldId")
oldId
zkClient.create().orSetData().forPath(myIdPath, BigInt(newId).toByteArray)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I got this correctly, I think there would be still data for obsolete unique names.
Shouldn't we remove them?

For example, let's assume data looks like this:

/invokers/idAssignment/mapping/host1 0   // obsolete
/invokers/idAssignment/mapping/host2 1   // obsolete
/invokers/idAssignment/mapping/host3 2
/invokers/idAssignment/mapping/host4 3

You can update this like,

/invokers/idAssignment/mapping/host1 0   // obsolete
/invokers/idAssignment/mapping/host2 1   // obsolete
/invokers/idAssignment/mapping/host3 0
/invokers/idAssignment/mapping/host4 1

But there will still be data for host1 and host2.
If such names are used again by any chance, isn't it possible that id conflicts can happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct. I added a comment that you should not use this overwrite unless you are sure that there is not another invoker that shares the id.

I would like to make it cleaner to update invoker host mapping, but currently it's tracked by a single atomic counter in zookeeper which makes this complicated since it only goes up. And since the invokers are hostname -> id mappings, you would need to get all invokers and look at all of the ids to determine what is the lowest id available to add it as (which I'm not sure if you can do atomically like with the counter I'm not super familiar with zookeeper). This is just meant to be a manual operation used for corrections when your fleet is out of sync. Do you have any better ideas because I do think it's a problem you can never really go back and remap invoker hosts to a different instance id

Copy link
Member

@style95 style95 Nov 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it's a bit picky.
How about removing the obsolete data from the zookeeper when overwriting the ID?
Maybe we can iterate the mappings and figure out the invoker with the overwritten ID.

I am also fine with the proper comment as this feature would not be used without any manual intervention.

Copy link
Contributor Author

@bdoyle0182 bdoyle0182 Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed a change which checks all uniqueNames to see if the instance id already exists before overwriting it to this invoker. This isn't atomic like the counter is for assigning id's so it's not perfect, but I think it's more than good enough for something that should be used for corrective actions to an out of sync fleet.

Next up, I would like to figure out a way to improve the dynamic id assigner so that it can handle these gaps and backfill if things are missing in an atomic way to this check of what ids are there. But that should be saved for another pr. This is good enough for now for us to correct things.

exception case should be covered in a unit test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure this would fix your problem.
It seems the path is not transient so that even if an obsolete invoker no longer exists, there will be the path and data in zookeeper.
So you would be unable to overwrite the existing ID all the time.

Is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that's right the data would have to be gone in zookeeper before the overwrite attempt happens. I guess I could try to delete the existing mapping once I find it and then overwrite the id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, it should delete the mapping with the instance id before it overwrites

logger.info(this, s"invokerReg: invoker $name was assigned invokerId $newId")
newId
}

zkClient.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Try

case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
case class CmdLineArgs(uniqueName: Option[String] = None,
id: Option[Int] = None,
displayedName: Option[String] = None,
overwriteId: Option[Int] = None)

object Invoker {

Expand Down Expand Up @@ -133,6 +136,8 @@ object Invoker {
// --uniqueName <value> a unique name to dynamically assign Kafka topics from Zookeeper
// --displayedName <value> a name to identify this invoker via invoker health protocol
// --id <value> proposed invokerId
// --overwriteId <value> proposed invokerId to re-write with uniqueName in Zookeeper,
// DO NOT USE overwriteId unless sure invokerId does not exist for other uniqueName
def parse(ls: List[String], c: CmdLineArgs): CmdLineArgs = {
ls match {
case "--uniqueName" :: uniqueName :: tail =>
Expand All @@ -141,6 +146,8 @@ object Invoker {
parse(tail, c.copy(displayedName = nonEmptyString(displayedName)))
case "--id" :: id :: tail if Try(id.toInt).isSuccess =>
parse(tail, c.copy(id = Some(id.toInt)))
case "--overwriteId" :: overwriteId :: tail if Try(overwriteId.toInt).isSuccess =>
parse(tail, c.copy(overwriteId = Some(overwriteId.toInt)))
case Nil => c
case _ => abort(s"Error processing command line arguments $ls")
}
Expand All @@ -150,16 +157,16 @@ object Invoker {

val assignedInvokerId = cmdLineArgs match {
// --id is defined with a valid value, use this id directly.
case CmdLineArgs(_, Some(id), _) =>
case CmdLineArgs(_, Some(id), _, _) =>
logger.info(this, s"invokerReg: using proposedInvokerId $id")
id

// --uniqueName is defined with a valid value, id is empty, assign an id via zookeeper
case CmdLineArgs(Some(unique), None, _) =>
case CmdLineArgs(Some(unique), None, _, overwriteId) =>
if (config.zookeeperHosts.startsWith(":") || config.zookeeperHosts.endsWith(":")) {
abort(s"Must provide valid zookeeper host and port to use dynamicId assignment (${config.zookeeperHosts})")
}
new InstanceIdAssigner(config.zookeeperHosts).getId(unique)
new InstanceIdAssigner(config.zookeeperHosts).setAndGetId(unique, overwriteId)

case _ => abort(s"Either --id or --uniqueName must be configured with correct values")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,26 @@ class InstanceIdAssignerTests extends FlatSpec with Matchers with StreamLogging

it should "assign fresh id" in {
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
assigner.getId("foo") shouldBe 0
assigner.setAndGetId("foo") shouldBe 0
}

it should "reuse id if exists" in {
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
assigner.getId("foo") shouldBe 0
assigner.getId("bar") shouldBe 1
assigner.getId("bar") shouldBe 1
assigner.setAndGetId("foo") shouldBe 0
assigner.setAndGetId("bar") shouldBe 1
assigner.setAndGetId("bar") shouldBe 1
}

it should "attempt to overwrite id for unique name if overwrite set" in {
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
assigner.setAndGetId("foo") shouldBe 0
assigner.setAndGetId("foo", Some(2)) shouldBe 2
}

it should "fail to overwrite an id for unique name that already exists" in {
val assigner = new InstanceIdAssigner(zkServer.getConnectString)
assigner.setAndGetId("foo") shouldBe 0
assigner.setAndGetId("bar") shouldBe 1
assertThrows[IllegalArgumentException](assigner.setAndGetId("bar", Some(1)))
}
}