-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[New Scheduler] Add ActivationService #5070
Conversation
public class Empty { | ||
// Workaround for this issue https://github.com/akka/akka-grpc/issues/289 | ||
// Gradle complains about no java sources. | ||
// Note. Openwhisk is using a lower gradle version, so the latest akka-grpc version cannot be used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we fix this? The issue has been fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've already tried it, but It requires a higher Gradle version to use the latest akka-grpc.
|
||
//#services | ||
service ActivationService { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove extra lines
import scala.util.Try | ||
|
||
class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService { | ||
implicit val requestTimeout: Timeout = Timeout(50.seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this not be hardcoded?
import scala.util.Try | ||
|
||
class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService { | ||
implicit val requestTimeout: Timeout = Timeout(50.seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this not be hardcoded?
QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match { | ||
case Some(queueValue) => | ||
// enqueue activation message to reschedule | ||
logging.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be debug
implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher | ||
|
||
override def rescheduleActivation(request: RescheduleRequest): Future[RescheduleResponse] = { | ||
logging.info(this, s"Try to reschedule activation ${request.invocationNamespace} ${request.fqn} ${request.rev}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be debug
statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rescheduling is a special case that happens very occasionally. I think we can keep this log with the info
level.
implicit val serdes = jsonFormat(ActionMismatch.apply _, "actionMisMatch") | ||
} | ||
|
||
object QueuePool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this safe as a global static object being accessed from futures?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only field _queuePool(TrieMap[MemoryQueueKey, MemoryQueueValue]) which is included in this object is a thread-safe.
|
||
rpc FetchActivation (FetchRequest) returns (FetchResponse) {} | ||
|
||
rpc RescheduleActivation (RescheduleRequest) returns (RescheduleResponse) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the design document, RescheduleActivation rpc is for rescheduling when the container proxy cannot process messages
, can give some example here? i think just describe it is enough.
Codecov Report
@@ Coverage Diff @@
## master #5070 +/- ##
===========================================
- Coverage 81.95% 38.42% -43.53%
===========================================
Files 210 218 +8
Lines 10167 10700 +533
Branches 440 450 +10
===========================================
- Hits 8332 4112 -4220
- Misses 1835 6588 +4753
Continue to review full report at Codecov.
|
Seems there are some formatting errors.
|
Description
ActivationService is a GRPC service implementation for the scheduler that communicates with the container proxy.
Please refer to this document:
https://cwiki.apache.org/confluence/display/OPENWHISK/ActivationServiceImpl
Related issue and scope
My changes affect the following components
Types of changes
Checklist: