-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: API for dynamic scaling of Sharded daemon process instances #31844
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks promising
...sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ShardedDaemonProcess.scala
Outdated
Show resolved
Hide resolved
// FIXME stash or deny new requests if rescale in progress? | ||
def start(): Behavior[ShardedDaemonProcessCommand] = { | ||
replicatorAdapter.askGet( | ||
replyTo => Replicator.Get(key, Replicator.ReadLocal, replyTo), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should use some kind of read and write majority to be safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a write all to know it reaches all pingers, so a local read would be ok, but I'm not convinced that's a good solution for pausing the pingers anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went with read/write majority and explicit ping protocol in d608bee
a38762f
to
ab032a0
Compare
9e2caad
to
76d7af1
Compare
cffcb78
to
734493c
Compare
Two more things we may want here: a query message to ask the coordinator about current number of workers, an additional check in the local ddata state with revision from pid before starting an entity (in case of delayed delivery of ping) |
Checking revision/number of workers in pingers and on worker start implemented. I'll push querying the coordinator about current state to a separate PR I think as this is large enough as it is and that is not a completely required feature (although would be nice as it could allow for example for looking a the current scale/state of a sharded daemon process in management or something like that). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't reviewed the coordinator yet, but here is a first round of comments.
...ed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessCoordinator.scala
Show resolved
Hide resolved
...sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala
Outdated
Show resolved
Hide resolved
// use process n for shard id | ||
def shardId(entityId: String): String = { | ||
if (supportsRescale) entityId.split(Separator)(2) | ||
else entityId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this MessageExtractor work in the same way as the old for a rolling update? Also for the case of switching from init
to initWithContext
(changing supportsRescale). So that we can support a rolling update that enables the new feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, the pingers live on the old nodes, so may send the single id ping until roll complete, but extractor on new nodes now that it supports rescale, but they can't get revision and total count from anywhere.
Not sure what sharding does for a failing extractor, but we'd sort of want to drop those Start messages somehow when supportsRescale
but getting a non-rescale message. Once the roll is completed the pingers will live on new nodes and send the right Start messages.
Maybe we can parse such messages into revision -1 and the startup check will cancel starting the worker and we'll be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in 7e8904e but I need to test it out, and add a mention in docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something more around this: cluster singleton will not start pinging nodes until all nodes are rolled since it will run on oldest - unless you limit it with role, in that scenario there could still be old nodes left when all of a certain role has rolled up, so there is probably a caveat to document for rolling upgrades there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we handle revision 0 to use the same entity ids as before, same as for the init(
case?
I think there is a high risk that users will switch from init
to initWithContext
later, not necessarily at the same time as bumping the Akka version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went with just id in string for revision 0 in b1998c8, should work, but didn't figure out how to add a multinode test covering it to be sure.
...ing-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessKeepAlivePinger.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessKeepAlivePinger.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessKeepAlivePinger.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessKeepAlivePinger.scala
Outdated
Show resolved
Hide resolved
...ed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessReScaleSpec.scala
Outdated
Show resolved
Hide resolved
initialNumberOfProcesses: Int, | ||
daemonProcessName: String, | ||
shardingRef: ActorRef[ShardingEnvelope[T]]): Behavior[ShardedDaemonProcessCommand] = | ||
Behaviors.setup { context => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any risk of fail and need of restart supervision?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in 662adcf
...rc/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessKeepAlivePinger.scala
Outdated
Show resolved
Hide resolved
...ed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessCoordinator.scala
Outdated
Show resolved
Hide resolved
...ed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessCoordinator.scala
Outdated
Show resolved
Hide resolved
...ng-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessState.scala
Outdated
Show resolved
Hide resolved
|
||
// first step of rescale, pause all pinger actors so they do not wake the entities up again | ||
// when we stop them | ||
private def pauseAllPingers( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One option we have is to move the pinging to the coordinator. Might simplify the process. Drawback is less ping redundancy in case of network partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mmm, that would certainly simplify things quite a bit, I think it is an attractive idea.
Worst case scenario is worker crashes in face of partition, and isn't restarted until partition heals, but then you can work around that by making sure the worker doesn't crash by supervise.restart it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 034116a
revision: Long, | ||
numberOfProcesses: Int, | ||
completed: Boolean, | ||
started: Instant) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about making this a proper ReplicatedData
? Merge should be dead simple since the revision is increasing and completed is changing from false to true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a great motivation but I don't think I ever did a custom ReplicatedData structure so I'll do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed in d530fe8
* INTERNAL API | ||
*/ | ||
@InternalApi | ||
trait ClusterShardingTypedSerializable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[akka]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in a56a07b
...ed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessCoordinator.scala
Outdated
Show resolved
Hide resolved
...ed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessCoordinator.scala
Outdated
Show resolved
Hide resolved
replicatorAdapter.askUpdate( | ||
replyTo => | ||
Replicator.Update(key, initialState, Replicator.WriteMajority(settings.rescaleWriteStateTimeout), replyTo)( | ||
(existingState: ShardedDaemonProcessState) => existingState.merge(newState)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be more correct usage to not merge
here, because that is what the Replicator will do. This can just return newState
.
Now we know that we are single writer, but otherwise it would be existingState.startScalingTo(request.newNumberOfProcesses)
here instead of changing that in advance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah was scratching my head about what was the right thing here. Added that startScaling method for that reason but then I need the resulting state later. I'll change to just replace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in a56a07b
daemonProcessName, | ||
request.newNumberOfProcesses, | ||
newState.revision) | ||
prepareRescale(newState, request, currentState.numberOfProcesses) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it also cancel the Tick timer? I know it's ignored but anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cancelled in a56a07b
...ed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessCoordinator.scala
Show resolved
Hide resolved
...rding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessId.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just one thing, then ready to merge
totalProcesses: Int, | ||
name: String, | ||
revision: Long) | ||
extends ShardedDaemonProcessContext | ||
|
||
def init[T](name: String, numberOfInstances: Int, behaviorFactory: Int => Behavior[T])( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are missing a corresponding initWithContext
for this one. good to have all 3 to make the "migration" seamless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried that but got compiler errors because of ambiguous overloads in some way I didn't figure out a way around, so backed out ot of it completely. Can try giving it another shot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Great work @johanandren 🎉 |
This allows a user to rescale the number of sharded daemon process workers dynamically, after starting the cluster.