diff --git a/docs/docs/rest-api/public/api/v2/types/unreachableStrategy.raml b/docs/docs/rest-api/public/api/v2/types/unreachableStrategy.raml
index ec71ad9b918..120ca1f2aea 100644
--- a/docs/docs/rest-api/public/api/v2/types/unreachableStrategy.raml
+++ b/docs/docs/rest-api/public/api/v2/types/unreachableStrategy.raml
@@ -1,6 +1,10 @@
#%RAML 1.0 Library
types:
- UnreachableStrategy:
+ UnreachableStrategy: (UnreachableDisabled | UnreachableEnabled)
+ UnreachableDisabled:
+ type: string
+ enum: [ disabled ]
+ UnreachableEnabled:
type: object
properties:
inactiveAfterSeconds?:
@@ -13,8 +17,7 @@ types:
as inactive. This will trigger a new instance launch. The original task is not
expunged yet. Must be less than expungeAfterSeconds.
- The default value is set to 5 minutes for ephemeral tasks (300 seconds).
- The default value is set to 1 hour for resident tasks (3600 seconds).
+ The default value is set to 5 minutes (300 seconds).
expungeAfterSeconds?:
type: integer
@@ -26,8 +29,4 @@ types:
it will be killed if it ever comes back. Instances are usually marked as unreachable before they are expunged
but they don't have to. This value is required to be greater than inactiveAfterSeconds.
- The default value is set to 10 minutes for ephemeral tasks (600 seconds).
- The default value is set to 7 days for resident tasks (604800 seconds).
-
- If the instance has any persistent volumes associated with it, then they will be destroyed and associated data
- will be deleted.
+ The default value is set to 10 minutes (600 seconds).
diff --git a/src/main/java/mesosphere/marathon/Protos.java b/src/main/java/mesosphere/marathon/Protos.java
index 89dc2470a04..ce638434e9b 100644
--- a/src/main/java/mesosphere/marathon/Protos.java
+++ b/src/main/java/mesosphere/marathon/Protos.java
@@ -7949,12 +7949,6 @@ public final boolean isInitialized() {
return false;
}
}
- if (hasUnreachableStrategy()) {
- if (!getUnreachableStrategy().isInitialized()) {
- memoizedIsInitialized = 0;
- return false;
- }
- }
memoizedIsInitialized = 1;
return true;
}
@@ -9103,12 +9097,6 @@ public final boolean isInitialized() {
return false;
}
}
- if (hasUnreachableStrategy()) {
- if (!getUnreachableStrategy().isInitialized()) {
-
- return false;
- }
- }
return true;
}
@@ -12921,9 +12909,9 @@ public mesosphere.marathon.Protos.UnreachableStrategyOrBuilder getUnreachableStr
public interface UnreachableStrategyOrBuilder
extends com.google.protobuf.MessageOrBuilder {
- // required uint64 inactiveAfterSeconds = 1 [default = 900];
+ // optional uint64 inactiveAfterSeconds = 1 [default = 900];
/**
- * required uint64 inactiveAfterSeconds = 1 [default = 900];
+ * optional uint64 inactiveAfterSeconds = 1 [default = 900];
*
*
* 15 minutes @@ -12931,7 +12919,7 @@ public interface UnreachableStrategyOrBuilder */ boolean hasInactiveAfterSeconds(); /** - *required uint64 inactiveAfterSeconds = 1 [default = 900];
+ *optional uint64 inactiveAfterSeconds = 1 [default = 900];
* ** 15 minutes @@ -12939,9 +12927,9 @@ public interface UnreachableStrategyOrBuilder */ long getInactiveAfterSeconds(); - // required uint64 expungeAfterSeconds = 2 [default = 604800]; + // optional uint64 expungeAfterSeconds = 2 [default = 604800]; /** - *required uint64 expungeAfterSeconds = 2 [default = 604800];
+ *optional uint64 expungeAfterSeconds = 2 [default = 604800];
* ** 7 days @@ -12949,7 +12937,7 @@ public interface UnreachableStrategyOrBuilder */ boolean hasExpungeAfterSeconds(); /** - *required uint64 expungeAfterSeconds = 2 [default = 604800];
+ *optional uint64 expungeAfterSeconds = 2 [default = 604800];
* ** 7 days @@ -13058,11 +13046,11 @@ public com.google.protobuf.ParsergetParserForType() { } private int bitField0_; - // required uint64 inactiveAfterSeconds = 1 [default = 900]; + // optional uint64 inactiveAfterSeconds = 1 [default = 900]; public static final int INACTIVEAFTERSECONDS_FIELD_NUMBER = 1; private long inactiveAfterSeconds_; /** - * required uint64 inactiveAfterSeconds = 1 [default = 900];
+ *optional uint64 inactiveAfterSeconds = 1 [default = 900];
* ** 15 minutes @@ -13072,7 +13060,7 @@ public boolean hasInactiveAfterSeconds() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - *required uint64 inactiveAfterSeconds = 1 [default = 900];
+ *optional uint64 inactiveAfterSeconds = 1 [default = 900];
* ** 15 minutes @@ -13082,11 +13070,11 @@ public long getInactiveAfterSeconds() { return inactiveAfterSeconds_; } - // required uint64 expungeAfterSeconds = 2 [default = 604800]; + // optional uint64 expungeAfterSeconds = 2 [default = 604800]; public static final int EXPUNGEAFTERSECONDS_FIELD_NUMBER = 2; private long expungeAfterSeconds_; /** - *required uint64 expungeAfterSeconds = 2 [default = 604800];
+ *optional uint64 expungeAfterSeconds = 2 [default = 604800];
* ** 7 days @@ -13096,7 +13084,7 @@ public boolean hasExpungeAfterSeconds() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - *required uint64 expungeAfterSeconds = 2 [default = 604800];
+ *optional uint64 expungeAfterSeconds = 2 [default = 604800];
* ** 7 days @@ -13115,14 +13103,6 @@ public final boolean isInitialized() { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; - if (!hasInactiveAfterSeconds()) { - memoizedIsInitialized = 0; - return false; - } - if (!hasExpungeAfterSeconds()) { - memoizedIsInitialized = 0; - return false; - } memoizedIsInitialized = 1; return true; } @@ -13336,14 +13316,6 @@ public Builder mergeFrom(mesosphere.marathon.Protos.UnreachableStrategy other) { } public final boolean isInitialized() { - if (!hasInactiveAfterSeconds()) { - - return false; - } - if (!hasExpungeAfterSeconds()) { - - return false; - } return true; } @@ -13366,10 +13338,10 @@ public Builder mergeFrom( } private int bitField0_; - // required uint64 inactiveAfterSeconds = 1 [default = 900]; + // optional uint64 inactiveAfterSeconds = 1 [default = 900]; private long inactiveAfterSeconds_ = 900L; /** - *required uint64 inactiveAfterSeconds = 1 [default = 900];
+ *optional uint64 inactiveAfterSeconds = 1 [default = 900];
* ** 15 minutes @@ -13379,7 +13351,7 @@ public boolean hasInactiveAfterSeconds() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - *required uint64 inactiveAfterSeconds = 1 [default = 900];
+ *optional uint64 inactiveAfterSeconds = 1 [default = 900];
* ** 15 minutes @@ -13389,7 +13361,7 @@ public long getInactiveAfterSeconds() { return inactiveAfterSeconds_; } /** - *required uint64 inactiveAfterSeconds = 1 [default = 900];
+ *optional uint64 inactiveAfterSeconds = 1 [default = 900];
* ** 15 minutes @@ -13402,7 +13374,7 @@ public Builder setInactiveAfterSeconds(long value) { return this; } /** - *required uint64 inactiveAfterSeconds = 1 [default = 900];
+ *optional uint64 inactiveAfterSeconds = 1 [default = 900];
* ** 15 minutes @@ -13415,10 +13387,10 @@ public Builder clearInactiveAfterSeconds() { return this; } - // required uint64 expungeAfterSeconds = 2 [default = 604800]; + // optional uint64 expungeAfterSeconds = 2 [default = 604800]; private long expungeAfterSeconds_ = 604800L; /** - *required uint64 expungeAfterSeconds = 2 [default = 604800];
+ *optional uint64 expungeAfterSeconds = 2 [default = 604800];
* ** 7 days @@ -13428,7 +13400,7 @@ public boolean hasExpungeAfterSeconds() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - *required uint64 expungeAfterSeconds = 2 [default = 604800];
+ *optional uint64 expungeAfterSeconds = 2 [default = 604800];
* ** 7 days @@ -13438,7 +13410,7 @@ public long getExpungeAfterSeconds() { return expungeAfterSeconds_; } /** - *required uint64 expungeAfterSeconds = 2 [default = 604800];
+ *optional uint64 expungeAfterSeconds = 2 [default = 604800];
* ** 7 days @@ -13451,7 +13423,7 @@ public Builder setExpungeAfterSeconds(long value) { return this; } /** - *required uint64 expungeAfterSeconds = 2 [default = 604800];
+ *optional uint64 expungeAfterSeconds = 2 [default = 604800];
* ** 7 days @@ -43799,8 +43771,8 @@ public Builder setSecretIdBytes( "nce\022\033\n\023taskKillGracePeriod\030\037 \001(\003\022E\n\023unre" + "achableStrategy\030 \001(\0132(.mesosphere.marat" + "hon.UnreachableStrategy\"]\n\023UnreachableSt" + - "rategy\022!\n\024inactiveAfterSeconds\030\001 \002(\004:\00390" + - "0\022#\n\023expungeAfterSeconds\030\002 \002(\004:\006604800\"\024" + + "rategy\022!\n\024inactiveAfterSeconds\030\001 \001(\004:\00390" + + "0\022#\n\023expungeAfterSeconds\030\002 \001(\004:\006604800\"\024" + "\n\004Json\022\014\n\004json\030\001 \002(\t\"\035\n\rResourceRoles\022\014\n" + "\004role\030\001 \003(\t\"\346\t\n\014MarathonTask\022\n\n\002id\030\001 \002(\t" + "\022\025\n\rOBSOLETE_host\030\002 \001(\t\022\r\n\005ports\030\003 \003(\r\022-" + diff --git a/src/main/proto/marathon.proto b/src/main/proto/marathon.proto index f19107c8e8a..29bda368e6d 100644 --- a/src/main/proto/marathon.proto +++ b/src/main/proto/marathon.proto @@ -118,8 +118,8 @@ message ServiceDefinition { } message UnreachableStrategy { - required uint64 inactiveAfterSeconds = 1 [default = 900 ]; // 15 minutes - required uint64 expungeAfterSeconds = 2 [default = 604800 ]; // 7 days + optional uint64 inactiveAfterSeconds = 1 [default = 900 ]; // 15 minutes + optional uint64 expungeAfterSeconds = 2 [default = 604800 ]; // 7 days } // we serialize PodDefinition and Instances as json, only required for legacy content diff --git a/src/main/scala/mesosphere/marathon/api/v2/json/Formats.scala b/src/main/scala/mesosphere/marathon/api/v2/json/Formats.scala index fc9f4f53a72..7da29852ffa 100644 --- a/src/main/scala/mesosphere/marathon/api/v2/json/Formats.scala +++ b/src/main/scala/mesosphere/marathon/api/v2/json/Formats.scala @@ -15,7 +15,7 @@ import mesosphere.marathon.core.pod.PodDefinition import mesosphere.marathon.core.readiness.ReadinessCheck import mesosphere.marathon.core.task.Task import mesosphere.marathon.core.task.state.NetworkInfo -import mesosphere.marathon.raml.{ Pod, Raml, Resources, UnreachableStrategy, KillSelection } +import mesosphere.marathon.raml.{ Pod, Raml, Resources, KillSelection } import mesosphere.marathon.state import mesosphere.marathon.state._ import mesosphere.marathon.upgrade.DeploymentManager.DeploymentStepInfo @@ -1096,7 +1096,7 @@ trait AppAndGroupFormats { readinessChecks = extra.readinessChecks, secrets = extra.secrets, taskKillGracePeriod = extra.maybeTaskKillGracePeriod, - unreachableStrategy = extra.unreachableStrategy.fold(defaultUnreachableStrategy)(Raml.fromRaml(_)), + unreachableStrategy = extra.unreachableStrategy.getOrElse(defaultUnreachableStrategy), killSelection = extra.killSelection.fold(state.KillSelection.DefaultKillSelection)(Raml.fromRaml(_)) ) } @@ -1163,6 +1163,17 @@ trait AppAndGroupFormats { } } + implicit val UnreachableStrategyFormat: Format[UnreachableStrategy] = new Format[UnreachableStrategy] { + // override def + override def reads(json: JsValue): JsResult[UnreachableStrategy] = { + json.validate[raml.UnreachableStrategy].map(Raml.fromRaml(_)) + } + + override def writes(unreachableStrategy: UnreachableStrategy): JsValue = { + Json.toJson(Raml.toRaml(unreachableStrategy)) + } + } + implicit lazy val ResidencyFormat: Format[Residency] = ( (__ \ "relaunchEscalationTimeoutSeconds").formatNullable[Long] .withDefault(Residency.defaultRelaunchEscalationTimeoutSeconds) ~ @@ -1379,7 +1390,7 @@ trait AppAndGroupFormats { storeUrls = storeUrls, requirePorts = requirePorts, backoff = backoffSeconds, backoffFactor = backoffFactor, maxLaunchDelay = maxLaunchDelaySeconds, container = container, healthChecks = healthChecks, dependencies = dependencies, - unreachableStrategy = unreachableStrategy.map(Raml.fromRaml(_)), + unreachableStrategy = unreachableStrategy, killSelection = killSelection.map(Raml.fromRaml(_)) ) ).flatMap { update => diff --git a/src/main/scala/mesosphere/marathon/core/instance/Instance.scala b/src/main/scala/mesosphere/marathon/core/instance/Instance.scala index c9f08f5917e..af9dada0439 100644 --- a/src/main/scala/mesosphere/marathon/core/instance/Instance.scala +++ b/src/main/scala/mesosphere/marathon/core/instance/Instance.scala @@ -7,7 +7,7 @@ import com.fasterxml.uuid.{ EthernetAddress, Generators } import mesosphere.marathon.core.condition.Condition import mesosphere.marathon.core.instance.Instance.{ AgentInfo, InstanceState } import mesosphere.marathon.core.task.Task -import mesosphere.marathon.state.{ MarathonState, PathId, Timestamp, UnreachableStrategy } +import mesosphere.marathon.state.{ MarathonState, PathId, Timestamp, UnreachableStrategy, UnreachableDisabled, UnreachableEnabled } import mesosphere.marathon.stream.Implicits._ import mesosphere.mesos.Placed import org.apache._ @@ -26,7 +26,7 @@ case class Instance( state: InstanceState, tasksMap: Map[Task.Id, Task], runSpecVersion: Timestamp, - unreachableStrategy: UnreachableStrategy = UnreachableStrategy.defaultEphemeral) extends MarathonState[Protos.Json, Instance] with Placed { + unreachableStrategy: UnreachableStrategy) extends MarathonState[Protos.Json, Instance] with Placed { val runSpecId: PathId = instanceId.runSpecId val isLaunched: Boolean = state.condition.isActive @@ -79,7 +79,9 @@ object Instance { AgentInfo("", None, Nil), InstanceState(Condition.Unknown, Timestamp.zero, activeSince = None, healthy = None), Map.empty[Task.Id, Task], - Timestamp.zero) + Timestamp.zero, + UnreachableStrategy.default() + ) } def instancesById(tasks: Seq[Instance]): Map[Instance.Id, Instance] = @@ -133,12 +135,12 @@ object Instance { maybeOldState: Option[InstanceState], newTaskMap: Map[Task.Id, Task], now: Timestamp, - unreachableInactiveAfter: FiniteDuration = 5.minutes): InstanceState = { + unreachableStrategy: UnreachableStrategy = UnreachableStrategy.default()): InstanceState = { val tasks = newTaskMap.values // compute the new instance condition - val condition = conditionFromTasks(tasks, now, unreachableInactiveAfter) + val condition = conditionFromTasks(tasks, now, unreachableStrategy) val active: Option[Timestamp] = activeSince(tasks) @@ -152,14 +154,16 @@ object Instance { /** * @return condition for instance with tasks. */ - def conditionFromTasks(tasks: Iterable[Task], now: Timestamp, unreachableInactiveAfter: FiniteDuration): Condition = { + def conditionFromTasks(tasks: Iterable[Task], now: Timestamp, unreachableStrategy: UnreachableStrategy): Condition = { if (tasks.isEmpty) { Condition.Unknown } else { // The smallest Condition according to conditionOrdering is the condition for the whole instance. tasks.view.map(_.status.condition).minBy(conditionHierarchy) match { - case Condition.Unreachable if shouldBecomeInactive(tasks, now, unreachableInactiveAfter) => Condition.UnreachableInactive - case condition => condition + case Condition.Unreachable if shouldBecomeInactive(tasks, now, unreachableStrategy) => + Condition.UnreachableInactive + case condition => + condition } } } @@ -177,9 +181,13 @@ object Instance { /** * @return if one of tasks has been UnreachableInactive for more than unreachableInactiveAfter. */ - def shouldBecomeInactive(tasks: Iterable[Task], now: Timestamp, unreachableInactiveAfter: FiniteDuration): Boolean = { - tasks.exists(_.isUnreachableExpired(now, unreachableInactiveAfter)) - } + def shouldBecomeInactive(tasks: Iterable[Task], now: Timestamp, unreachableStrategy: UnreachableStrategy): Boolean = + unreachableStrategy match { + case _: UnreachableDisabled => false + case unreachableEnabled: UnreachableEnabled => + val inactiveAfter = unreachableEnabled.inactiveAfter + tasks.exists(_.isUnreachableExpired(now, inactiveAfter)) + } } private[this] def isRunningUnhealthy(task: Task): Boolean = { @@ -313,12 +321,11 @@ object Instance { } } - implicit val unreachableStrategyFormat: Format[UnreachableStrategy] = Json.format[UnreachableStrategy] - implicit val agentFormat: Format[AgentInfo] = Json.format[AgentInfo] implicit val idFormat: Format[Instance.Id] = Json.format[Instance.Id] implicit val instanceConditionFormat: Format[Condition] = Json.format[Condition] implicit val instanceStateFormat: Format[InstanceState] = Json.format[InstanceState] + import api.v2.json.Formats.UnreachableStrategyFormat implicit val instanceJsonWrites: Writes[Instance] = Json.writes[Instance] implicit val unreachableStrategyReads: Reads[Instance] = { @@ -330,7 +337,7 @@ object Instance { (__ \ "state").read[InstanceState] ~ (__ \ "unreachableStrategy").readNullable[UnreachableStrategy] ) { (instanceId, agentInfo, tasksMap, runSpecVersion, state, maybeUnreachableStrategy) => - val unreachableStrategy = maybeUnreachableStrategy.getOrElse(UnreachableStrategy.defaultEphemeral) + val unreachableStrategy = maybeUnreachableStrategy.getOrElse(UnreachableStrategy.default()) new Instance(instanceId, agentInfo, state, tasksMap, runSpecVersion, unreachableStrategy) } } @@ -362,18 +369,11 @@ object Instance { * @param tasksMap a map of one key/value pair consisting of the actual task * @param runSpecVersion the version of the task related runSpec */ -class LegacyAppInstance( - instanceId: Instance.Id, - agentInfo: Instance.AgentInfo, - state: InstanceState, - tasksMap: Map[Task.Id, Task], - runSpecVersion: Timestamp) extends Instance(instanceId, agentInfo, state, tasksMap, runSpecVersion) - object LegacyAppInstance { - def apply(task: Task, agentInfo: AgentInfo, unreachableStrategy: UnreachableStrategy = UnreachableStrategy.defaultEphemeral): Instance = { + def apply(task: Task, agentInfo: AgentInfo, unreachableStrategy: UnreachableStrategy): Instance = { val since = task.status.startedAt.getOrElse(task.status.stagedAt) val tasksMap = Map(task.taskId -> task) - val state = Instance.InstanceState(None, tasksMap, since) + val state = Instance.InstanceState(None, tasksMap, since, unreachableStrategy) new Instance(task.taskId.instanceId, agentInfo, state, tasksMap, task.runSpecVersion, unreachableStrategy) } diff --git a/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdater.scala b/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdater.scala index b81e16efe87..2aa02c7ae4a 100644 --- a/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdater.scala +++ b/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdater.scala @@ -6,7 +6,7 @@ import mesosphere.marathon.core.instance.Instance import mesosphere.marathon.core.instance.update.InstanceUpdateOperation.{ LaunchEphemeral, LaunchOnReservation, MesosUpdate, Reserve } import mesosphere.marathon.core.task.Task import mesosphere.marathon.core.task.update.{ TaskUpdateEffect, TaskUpdateOperation } -import mesosphere.marathon.state.Timestamp +import mesosphere.marathon.state.{ Timestamp, UnreachableEnabled } /** * Provides methods that apply a given [[InstanceUpdateOperation]] @@ -18,7 +18,7 @@ object InstanceUpdater extends StrictLogging { val updatedTasks = instance.tasksMap.updated(updatedTask.taskId, updatedTask) instance.copy( tasksMap = updatedTasks, - state = Instance.InstanceState(Some(instance.state), updatedTasks, now, instance.unreachableStrategy.inactiveAfter)) + state = Instance.InstanceState(Some(instance.state), updatedTasks, now, instance.unreachableStrategy)) } private[marathon] def launchEphemeral(op: LaunchEphemeral, now: Timestamp): InstanceUpdateEffect = { @@ -48,11 +48,22 @@ object InstanceUpdater extends StrictLogging { } // We might still become UnreachableInactive. - case TaskUpdateEffect.Noop if op.condition == Condition.Unreachable && instance.state.condition != Condition.UnreachableInactive => + case TaskUpdateEffect.Noop if op.condition == Condition.Unreachable && + instance.state.condition != Condition.UnreachableInactive => val updated: Instance = updatedInstance(instance, task, now) if (updated.state.condition == Condition.UnreachableInactive) { - logger.info(s"${updated.instanceId} is updated to UnreachableInactive after being Unreachable for more than ${updated.unreachableStrategy.inactiveAfter.toSeconds} seconds.") - val events = eventsGenerator.events(updated, Some(task), now, previousCondition = Some(instance.state.condition)) + updated.unreachableStrategy match { + case u: UnreachableEnabled => + logger.info( + s"${updated.instanceId} is updated to UnreachableInactive after being Unreachable for more than ${u.inactiveAfter.toSeconds} seconds.") + case _ => + // We shouldn't get here + logger.info( + s"${updated.instanceId} is updated to UnreachableInactive in spite of there being no UnreachableStrategy") + + } + val events = eventsGenerator.events( + updated, Some(task), now, previousCondition = Some(instance.state.condition)) InstanceUpdateEffect.Update(updated, oldState = Some(instance), events) } else { InstanceUpdateEffect.Noop(instance.instanceId) diff --git a/src/main/scala/mesosphere/marathon/core/launcher/impl/InstanceOpFactoryImpl.scala b/src/main/scala/mesosphere/marathon/core/launcher/impl/InstanceOpFactoryImpl.scala index da9bcbc8b03..d6bf484fbd6 100644 --- a/src/main/scala/mesosphere/marathon/core/launcher/impl/InstanceOpFactoryImpl.scala +++ b/src/main/scala/mesosphere/marathon/core/launcher/impl/InstanceOpFactoryImpl.scala @@ -261,7 +261,8 @@ class InstanceOpFactoryImpl( healthy = None ), tasksMap = Map(task.taskId -> task), - runSpecVersion = runSpec.version + runSpecVersion = runSpec.version, + unreachableStrategy = runSpec.unreachableStrategy ) val stateOp = InstanceUpdateOperation.Reserve(instance) taskOperationFactory.reserveAndCreateVolumes(frameworkId, stateOp, resourceMatch.resources, localVolumes) diff --git a/src/main/scala/mesosphere/marathon/core/pod/PodDefinition.scala b/src/main/scala/mesosphere/marathon/core/pod/PodDefinition.scala index febf3b3e57b..d77abd4f818 100644 --- a/src/main/scala/mesosphere/marathon/core/pod/PodDefinition.scala +++ b/src/main/scala/mesosphere/marathon/core/pod/PodDefinition.scala @@ -117,6 +117,6 @@ object PodDefinition { val DefaultNetworks = Seq.empty[Network] val DefaultBackoffStrategy = BackoffStrategy() val DefaultUpgradeStrategy = AppDefinition.DefaultUpgradeStrategy - val DefaultUnreachableStrategy = UnreachableStrategy.defaultEphemeral + val DefaultUnreachableStrategy = UnreachableStrategy.default(false) } diff --git a/src/main/scala/mesosphere/marathon/core/task/jobs/impl/ExpungeOverdueLostTasksActor.scala b/src/main/scala/mesosphere/marathon/core/task/jobs/impl/ExpungeOverdueLostTasksActor.scala index 0f44312b638..e18bdc49720 100644 --- a/src/main/scala/mesosphere/marathon/core/task/jobs/impl/ExpungeOverdueLostTasksActor.scala +++ b/src/main/scala/mesosphere/marathon/core/task/jobs/impl/ExpungeOverdueLostTasksActor.scala @@ -11,7 +11,7 @@ import mesosphere.marathon.core.instance.update.InstanceUpdateOperation import mesosphere.marathon.core.task.jobs.TaskJobsConfig import mesosphere.marathon.core.task.tracker.{ InstanceTracker, TaskStateOpProcessor } import mesosphere.marathon.core.task.tracker.InstanceTracker.SpecInstances -import mesosphere.marathon.state.{ PathId, Timestamp } +import mesosphere.marathon.state.{ PathId, Timestamp, UnreachableEnabled, UnreachableDisabled } /** * Business logic of overdue tasks actor. @@ -33,15 +33,20 @@ trait ExpungeOverdueLostTasksActorLogic { } /** - * @return instances that have been UnreachableInactive according to the RunSpec definition. + * @return instances that should be expunged according to the RunSpec definition. */ - def filterOverdueUnreachableInactive(instances: Map[PathId, SpecInstances], now: Timestamp) = - instances.values.flatMap(_.instances) - .withFilter(_.isUnreachableInactive) - .withFilter { instance => - val unreachableExpungeAfter = instance.unreachableStrategy.expungeAfter - instance.tasksMap.valuesIterator.exists(_.isUnreachableExpired(now, unreachableExpungeAfter)) - } + def filterUnreachableForExpunge(instances: Map[PathId, SpecInstances], now: Timestamp) = + instances.values. + flatMap(_.instances). + withFilter { i => shouldExpunge(i, now) } + + private[impl] def shouldExpunge(instance: Instance, now: Timestamp): Boolean = instance.unreachableStrategy match { + case _: UnreachableDisabled => + false + case unreachableEnabled: UnreachableEnabled => + instance.isUnreachableInactive && + instance.tasksMap.valuesIterator.exists(_.isUnreachableExpired(now, unreachableEnabled.expungeAfter)) + } } class ExpungeOverdueLostTasksActor( @@ -70,7 +75,7 @@ class ExpungeOverdueLostTasksActor( override def receive: Receive = { case Tick => instanceTracker.instancesBySpec() pipeTo self case InstanceTracker.InstancesBySpec(instances) => - filterOverdueUnreachableInactive(instances, clock.now()).foreach(triggerExpunge) + filterUnreachableForExpunge(instances, clock.now()).foreach(triggerExpunge) } } diff --git a/src/main/scala/mesosphere/marathon/raml/UnreachableStrategyConversion.scala b/src/main/scala/mesosphere/marathon/raml/UnreachableStrategyConversion.scala index 031f050adaf..ebfc05493ad 100644 --- a/src/main/scala/mesosphere/marathon/raml/UnreachableStrategyConversion.scala +++ b/src/main/scala/mesosphere/marathon/raml/UnreachableStrategyConversion.scala @@ -8,16 +8,22 @@ import scala.concurrent.duration._ */ trait UnreachableStrategyConversion { - implicit val ramlUnreachableStrategyRead = Reads[UnreachableStrategy, state.UnreachableStrategy] { strategy => - state.UnreachableStrategy( - inactiveAfter = strategy.inactiveAfterSeconds.seconds, - expungeAfter = strategy.expungeAfterSeconds.seconds) + implicit val ramlUnreachableStrategyRead = Reads[UnreachableStrategy, state.UnreachableStrategy] { + case strategy: UnreachableEnabled => + state.UnreachableEnabled( + inactiveAfter = strategy.inactiveAfterSeconds.seconds, + expungeAfter = strategy.expungeAfterSeconds.seconds) + case _: UnreachableDisabled => + state.UnreachableDisabled } - implicit val ramlUnreachableStrategyWrite = Writes[state.UnreachableStrategy, UnreachableStrategy]{ strategy => - UnreachableStrategy( - inactiveAfterSeconds = strategy.inactiveAfter.toSeconds, - expungeAfterSeconds = strategy.expungeAfter.toSeconds) + implicit val ramlUnreachableStrategyWrite = Writes[state.UnreachableStrategy, UnreachableStrategy]{ + case strategy: state.UnreachableEnabled => + UnreachableEnabled( + inactiveAfterSeconds = strategy.inactiveAfter.toSeconds, + expungeAfterSeconds = strategy.expungeAfter.toSeconds) + case state.UnreachableDisabled => + UnreachableDisabled("disabled") } } diff --git a/src/main/scala/mesosphere/marathon/state/AppDefinition.scala b/src/main/scala/mesosphere/marathon/state/AppDefinition.scala index 626902f2bc3..d9c3eafb1d8 100644 --- a/src/main/scala/mesosphere/marathon/state/AppDefinition.scala +++ b/src/main/scala/mesosphere/marathon/state/AppDefinition.scala @@ -220,7 +220,11 @@ case class AppDefinition( if (proto.getPortsCount > 0) PortDefinitions(proto.getPortsList.map(_.intValue)(collection.breakOut): _*) else proto.getPortDefinitionsList.map(PortDefinitionSerializer.fromProto).to[Seq] - val unreachableStrategy = if (proto.hasUnreachableStrategy) UnreachableStrategy.fromProto(proto.getUnreachableStrategy) else UnreachableStrategy.defaultEphemeral + val unreachableStrategy = + if (proto.hasUnreachableStrategy) + UnreachableStrategy.fromProto(proto.getUnreachableStrategy) + else + UnreachableStrategy.default(residencyOption.isDefined) AppDefinition( id = PathId(proto.getId), @@ -425,7 +429,7 @@ object AppDefinition extends GeneralPurposeCombinators { val DefaultSecrets = Map.empty[String, Secret] - val DefaultUnreachableStrategy = UnreachableStrategy.defaultEphemeral + val DefaultUnreachableStrategy = UnreachableStrategy.default(false) object Labels { val Default = Map.empty[String, String] @@ -614,6 +618,14 @@ object AppDefinition extends GeneralPurposeCombinators { appDef.healthChecks.count(_.isInstanceOf[MesosCommandHealthCheck])) <= 1 } + private[state] val requireUnreachableDisabledForResidentTasks = + isTrue[AppDefinition]("unreachableStrategy must be disabled for resident tasks") { app => + if (app.isResident) + app.unreachableStrategy.isInstanceOf[UnreachableDisabled] + else + true + } + private def validBasicAppDefinition(enabledFeatures: Set[String]) = validator[AppDefinition] { appDef => appDef.upgradeStrategy is valid appDef.container.each is valid(Container.validContainer(enabledFeatures)) @@ -640,6 +652,7 @@ object AppDefinition extends GeneralPurposeCombinators { appDef must complyWithResidencyRules appDef must complyWithSingleInstanceLabelRules appDef must complyWithUpgradeStrategyRules + appDef should requireUnreachableDisabledForResidentTasks appDef.constraints.each must complyWithConstraintRules appDef.ipAddress must optional(complyWithIpAddressRules(appDef)) appDef.unreachableStrategy is valid diff --git a/src/main/scala/mesosphere/marathon/state/UnreachableStrategy.scala b/src/main/scala/mesosphere/marathon/state/UnreachableStrategy.scala index b053dadd027..d68e4a0b36b 100644 --- a/src/main/scala/mesosphere/marathon/state/UnreachableStrategy.scala +++ b/src/main/scala/mesosphere/marathon/state/UnreachableStrategy.scala @@ -1,17 +1,28 @@ package mesosphere.marathon package state +import com.wix.accord._ import com.wix.accord.dsl._ import scala.concurrent.duration._ import mesosphere.marathon.Protos +sealed trait UnreachableStrategy { + def toProto: Protos.UnreachableStrategy +} + +sealed trait UnreachableDisabled extends UnreachableStrategy +case object UnreachableDisabled extends UnreachableDisabled { + val toProto: Protos.UnreachableStrategy = + Protos.UnreachableStrategy.getDefaultInstance +} + /** * Defines the time outs for unreachable tasks. */ -case class UnreachableStrategy( - inactiveAfter: FiniteDuration = UnreachableStrategy.DefaultEphemeralInactiveAfter, - expungeAfter: FiniteDuration = UnreachableStrategy.DefaultEphemeralExpungeAfter) { +case class UnreachableEnabled( + inactiveAfter: FiniteDuration = UnreachableEnabled.DefaultInactiveAfter, + expungeAfter: FiniteDuration = UnreachableEnabled.DefaultExpungeAfter) extends UnreachableStrategy { def toProto: Protos.UnreachableStrategy = Protos.UnreachableStrategy.newBuilder. @@ -19,28 +30,39 @@ case class UnreachableStrategy( setInactiveAfterSeconds(inactiveAfter.toSeconds). build } +object UnreachableEnabled { + val DefaultInactiveAfter: FiniteDuration = 5.minutes + val DefaultExpungeAfter: FiniteDuration = 10.minutes + val default = UnreachableEnabled() -object UnreachableStrategy { - val DefaultEphemeralInactiveAfter: FiniteDuration = 5.minutes - val DefaultEphemeralExpungeAfter: FiniteDuration = 10.minutes - val DefaultResidentInactiveAfter: FiniteDuration = 1.hour - val DefaultResidentExpungeAfter: FiniteDuration = 7.days - - val defaultEphemeral = UnreachableStrategy(DefaultEphemeralInactiveAfter, DefaultEphemeralExpungeAfter) - val defaultResident = UnreachableStrategy(DefaultResidentInactiveAfter, DefaultResidentExpungeAfter) - - implicit val unreachableStrategyValidator = validator[UnreachableStrategy] { strategy => + implicit val unreachableEnabledValidator = validator[UnreachableEnabled] { strategy => strategy.inactiveAfter should be >= 1.second strategy.inactiveAfter should be < strategy.expungeAfter } +} + +object UnreachableStrategy { - def default(resident: Boolean): UnreachableStrategy = { - if (resident) defaultResident else defaultEphemeral + // val default = UnreachableStrategy(DefaultInactiveAfter, DefaultExpungeAfter) + def default(resident: Boolean = false): UnreachableStrategy = { + if (resident) UnreachableDisabled else UnreachableEnabled.default } def fromProto(unreachableStrategyProto: Protos.UnreachableStrategy): UnreachableStrategy = { - UnreachableStrategy( - inactiveAfter = unreachableStrategyProto.getInactiveAfterSeconds.seconds, - expungeAfter = unreachableStrategyProto.getExpungeAfterSeconds.seconds) + if (unreachableStrategyProto.hasInactiveAfterSeconds && unreachableStrategyProto.hasExpungeAfterSeconds) + UnreachableEnabled( + inactiveAfter = unreachableStrategyProto.getInactiveAfterSeconds.seconds, + expungeAfter = unreachableStrategyProto.getExpungeAfterSeconds.seconds) + else + UnreachableDisabled + } + + implicit val unreachableStrategyValidator = new Validator[UnreachableStrategy] { + def apply(strategy: UnreachableStrategy): Result = strategy match { + case _: UnreachableDisabled => + Success + case unreachableEnabled: UnreachableEnabled => + validate(unreachableEnabled) + } } } diff --git a/src/main/scala/mesosphere/marathon/state/Volume.scala b/src/main/scala/mesosphere/marathon/state/Volume.scala index 457e0e5adb2..bd5cd6c6575 100644 --- a/src/main/scala/mesosphere/marathon/state/Volume.scala +++ b/src/main/scala/mesosphere/marathon/state/Volume.scala @@ -6,7 +6,7 @@ import java.util.regex.Pattern import com.wix.accord._ import com.wix.accord.dsl._ import mesosphere.marathon.Protos.Constraint -import mesosphere.marathon.api.v2.Validation.{ oneOf, _ } +import mesosphere.marathon.api.v2.Validation._ import mesosphere.marathon.core.externalvolume.ExternalVolumes import mesosphere.marathon.stream.Implicits._ import org.apache.mesos.Protos.Resource.DiskInfo.Source diff --git a/src/test/scala/mesosphere/marathon/api/v2/PodsResourceTest.scala b/src/test/scala/mesosphere/marathon/api/v2/PodsResourceTest.scala index 96821cb0f22..0f7a14987db 100644 --- a/src/test/scala/mesosphere/marathon/api/v2/PodsResourceTest.scala +++ b/src/test/scala/mesosphere/marathon/api/v2/PodsResourceTest.scala @@ -22,7 +22,7 @@ import mesosphere.marathon.metrics.Metrics import mesosphere.marathon.plugin.auth.{ Authenticator, Authorizer } import mesosphere.marathon.raml.{ ExecutorResources, FixedPodScalingPolicy, NetworkMode, Pod, Raml, Resources } import mesosphere.marathon.state.PathId._ -import mesosphere.marathon.state.Timestamp +import mesosphere.marathon.state.{ Timestamp, UnreachableStrategy } import mesosphere.marathon.test.Mockito import mesosphere.marathon.upgrade.DeploymentPlan import mesosphere.marathon.util.SemanticVersion @@ -309,8 +309,13 @@ class PodsResourceTest extends AkkaUnitTest with Mockito { "attempting to kill a single instance" in { implicit val killer = mock[TaskKiller] val f = Fixture() - val instance = Instance(Instance.Id.forRunSpec("/id1".toRootPath), Instance.AgentInfo("", None, Nil), - InstanceState(Condition.Running, Timestamp.now(), Some(Timestamp.now()), None), Map.empty, runSpecVersion = Timestamp.now()) + val instance = Instance( + Instance.Id.forRunSpec("/id1".toRootPath), Instance.AgentInfo("", None, Nil), + InstanceState(Condition.Running, Timestamp.now(), Some(Timestamp.now()), None), + Map.empty, + runSpecVersion = Timestamp.now(), + unreachableStrategy = UnreachableStrategy.default() + ) killer.kill(any, any, any)(any) returns Future.successful(Seq(instance)) val response = f.podsResource.killInstance("/id", instance.instanceId.toString, f.auth.request) withClue(s"response body: ${response.getEntity}") { @@ -323,9 +328,14 @@ class PodsResourceTest extends AkkaUnitTest with Mockito { implicit val killer = mock[TaskKiller] val instances = Seq( Instance(Instance.Id.forRunSpec("/id1".toRootPath), Instance.AgentInfo("", None, Nil), - InstanceState(Condition.Running, Timestamp.now(), Some(Timestamp.now()), None), Map.empty, runSpecVersion = Timestamp.now()), + InstanceState(Condition.Running, Timestamp.now(), Some(Timestamp.now()), None), Map.empty, + runSpecVersion = Timestamp.now(), + unreachableStrategy = UnreachableStrategy.default() + ), Instance(Instance.Id.forRunSpec("/id1".toRootPath), Instance.AgentInfo("", None, Nil), - InstanceState(Condition.Running, Timestamp.now(), Some(Timestamp.now()), None), Map.empty, runSpecVersion = Timestamp.now())) + InstanceState(Condition.Running, Timestamp.now(), Some(Timestamp.now()), None), Map.empty, + runSpecVersion = Timestamp.now(), + unreachableStrategy = UnreachableStrategy.default())) val f = Fixture() diff --git a/src/test/scala/mesosphere/marathon/api/v2/json/AppDefinitionFormatsTest.scala b/src/test/scala/mesosphere/marathon/api/v2/json/AppDefinitionFormatsTest.scala index c8348f1562f..d8162fefc5d 100644 --- a/src/test/scala/mesosphere/marathon/api/v2/json/AppDefinitionFormatsTest.scala +++ b/src/test/scala/mesosphere/marathon/api/v2/json/AppDefinitionFormatsTest.scala @@ -477,7 +477,17 @@ class AppDefinitionFormatsTest extends UnitTest (json \ "secrets" \ "secret3" \ "source").as[String] should equal("/foo2") } - "FromJSON should parse unreachable instance strategy" in { + "FromJSON should parse unreachable disabled instance strategy" in { + val appDef = Json.parse( + """{ + | "id": "test", + | "unreachableStrategy": "disabled" + |}""".stripMargin).as[AppDefinition] + + appDef.unreachableStrategy should be(UnreachableDisabled) + } + + "FromJSON should parse unreachable enabled instance strategy" in { val appDef = Json.parse( """{ | "id": "test", @@ -487,12 +497,11 @@ class AppDefinitionFormatsTest extends UnitTest | } |}""".stripMargin).as[AppDefinition] - appDef.unreachableStrategy.inactiveAfter should be(10.minutes) - appDef.unreachableStrategy.expungeAfter should be(20.minutes) + appDef.unreachableStrategy should be(UnreachableEnabled(inactiveAfter = 10.minutes, expungeAfter = 20.minutes)) } "ToJSON should serialize unreachable instance strategy" in { - val strategy = UnreachableStrategy(6.minutes, 12.minutes) + val strategy = UnreachableEnabled(6.minutes, 12.minutes) val appDef = AppDefinition(id = PathId("test"), unreachableStrategy = strategy) val json = Json.toJson(appDef) @@ -542,4 +551,4 @@ class AppDefinitionFormatsTest extends UnitTest } should have message ("JsResultException(errors:List((/container/docker,List(ValidationError(List(error.path.missing),WrappedArray())))))") } } -} \ No newline at end of file +} diff --git a/src/test/scala/mesosphere/marathon/api/v2/json/AppUpdateTest.scala b/src/test/scala/mesosphere/marathon/api/v2/json/AppUpdateTest.scala index f0b90241cb7..6ee94a13665 100644 --- a/src/test/scala/mesosphere/marathon/api/v2/json/AppUpdateTest.scala +++ b/src/test/scala/mesosphere/marathon/api/v2/json/AppUpdateTest.scala @@ -145,7 +145,7 @@ class AppUpdateTest extends UnitTest { ports = Seq(Port(name = "http", number = 80, protocol = "tcp")) ) )), - unreachableStrategy = Some(UnreachableStrategy(998.seconds, 999.seconds)) + unreachableStrategy = Some(UnreachableEnabled(998.seconds, 999.seconds)) ) JsonTestHelper.assertSerializationRoundtripWorks(update1) } diff --git a/src/test/scala/mesosphere/marathon/api/validation/AppDefinitionValidationTest.scala b/src/test/scala/mesosphere/marathon/api/validation/AppDefinitionValidationTest.scala index 4f3c4a1b3e3..5204991b15f 100644 --- a/src/test/scala/mesosphere/marathon/api/validation/AppDefinitionValidationTest.scala +++ b/src/test/scala/mesosphere/marathon/api/validation/AppDefinitionValidationTest.scala @@ -3,7 +3,7 @@ package api.validation import mesosphere.UnitTest import mesosphere.marathon.core.plugin.PluginManager -import mesosphere.marathon.state.{ AppDefinition, PathId, UnreachableStrategy } +import mesosphere.marathon.state._ import com.wix.accord.scalatest.ResultMatchers import scala.concurrent.duration._ @@ -26,7 +26,7 @@ class AppDefinitionValidationTest extends UnitTest with ResultMatchers { val app = AppDefinition( id = PathId("/test"), cmd = Some("sleep 1000"), - unreachableStrategy = UnreachableStrategy(0.second)) + unreachableStrategy = UnreachableEnabled(0.seconds)) val expectedViolation = GroupViolationMatcher(description = "unreachableStrategy", constraint = "is invalid") validator(app) should failWith(expectedViolation) diff --git a/src/test/scala/mesosphere/marathon/api/validation/RunSpecValidatorTest.scala b/src/test/scala/mesosphere/marathon/api/validation/RunSpecValidatorTest.scala index 97f12c189ba..91be5aa2ee5 100644 --- a/src/test/scala/mesosphere/marathon/api/validation/RunSpecValidatorTest.scala +++ b/src/test/scala/mesosphere/marathon/api/validation/RunSpecValidatorTest.scala @@ -766,7 +766,8 @@ class RunSpecValidatorTest extends UnitTest { id = PathId(id), cmd = Some("test"), container = Some(Container.Mesos(volumes)), - residency = Some(Residency(123, Protos.ResidencyDefinition.TaskLostBehavior.RELAUNCH_AFTER_TIMEOUT)) + residency = Some(Residency(123, Protos.ResidencyDefinition.TaskLostBehavior.RELAUNCH_AFTER_TIMEOUT)), + unreachableStrategy = UnreachableStrategy.default(resident = true) ) } val vol1 = persistentVolume("foo") diff --git a/src/test/scala/mesosphere/marathon/core/appinfo/TaskCountsTest.scala b/src/test/scala/mesosphere/marathon/core/appinfo/TaskCountsTest.scala index 27cc873a546..fc3c85b703c 100644 --- a/src/test/scala/mesosphere/marathon/core/appinfo/TaskCountsTest.scala +++ b/src/test/scala/mesosphere/marathon/core/appinfo/TaskCountsTest.scala @@ -8,7 +8,7 @@ import mesosphere.marathon.core.instance.Instance.AgentInfo import mesosphere.marathon.core.instance.{ Instance, LegacyAppInstance, TestTaskBuilder } import mesosphere.marathon.core.task.Task import mesosphere.marathon.core.task.state.NetworkInfoPlaceholder -import mesosphere.marathon.state.{ PathId, Timestamp } +import mesosphere.marathon.state.{ PathId, Timestamp, UnreachableStrategy } import scala.collection.immutable.Seq @@ -188,7 +188,10 @@ class TaskCountsTest extends UnitTest { object Fixture { implicit class TaskImplicits(val task: Task) extends AnyVal { - def toInstance: Instance = LegacyAppInstance(task, AgentInfo(host = "host", agentId = Some("agent"), attributes = Nil)) + def toInstance: Instance = LegacyAppInstance( + task, AgentInfo(host = "host", agentId = Some("agent"), attributes = Nil), + unreachableStrategy = UnreachableStrategy.default(resident = task.reservationWithVolumes.nonEmpty) + ) } } diff --git a/src/test/scala/mesosphere/marathon/core/appinfo/TaskLifeTimeTest.scala b/src/test/scala/mesosphere/marathon/core/appinfo/TaskLifeTimeTest.scala index c16f05df570..d2326d6b21b 100644 --- a/src/test/scala/mesosphere/marathon/core/appinfo/TaskLifeTimeTest.scala +++ b/src/test/scala/mesosphere/marathon/core/appinfo/TaskLifeTimeTest.scala @@ -6,7 +6,7 @@ import mesosphere.marathon.core.base.ConstantClock import mesosphere.marathon.core.instance.Instance.AgentInfo import mesosphere.marathon.core.instance.{ Instance, LegacyAppInstance, TestTaskBuilder } import mesosphere.marathon.core.task.Task -import mesosphere.marathon.state.{ PathId, Timestamp } +import mesosphere.marathon.state.{ PathId, Timestamp, UnreachableStrategy } class TaskLifeTimeTest extends UnitTest { private[this] val now: Timestamp = ConstantClock().now() @@ -17,13 +17,14 @@ class TaskLifeTimeTest extends UnitTest { } private[this] def stagedInstance(): Instance = { - LegacyAppInstance(TestTaskBuilder.Helper.stagedTask(newTaskId()), agentInfo) + LegacyAppInstance(TestTaskBuilder.Helper.stagedTask(newTaskId()), agentInfo, UnreachableStrategy.default()) } private[this] def runningInstanceWithLifeTime(lifeTimeSeconds: Double): Instance = { LegacyAppInstance( TestTaskBuilder.Helper.runningTask(newTaskId(), startedAt = (now.millis - lifeTimeSeconds * 1000.0).round), - agentInfo + agentInfo, + UnreachableStrategy.default() ) } diff --git a/src/test/scala/mesosphere/marathon/core/appinfo/TaskStatsByVersionTest.scala b/src/test/scala/mesosphere/marathon/core/appinfo/TaskStatsByVersionTest.scala index 0fa5e720e62..eea6c57aba8 100644 --- a/src/test/scala/mesosphere/marathon/core/appinfo/TaskStatsByVersionTest.scala +++ b/src/test/scala/mesosphere/marathon/core/appinfo/TaskStatsByVersionTest.scala @@ -7,7 +7,7 @@ import mesosphere.marathon.core.health.Health import mesosphere.marathon.core.instance.Instance.AgentInfo import mesosphere.marathon.core.instance.{ Instance, LegacyAppInstance, TestTaskBuilder } import mesosphere.marathon.core.task.Task -import mesosphere.marathon.state.{ PathId, Timestamp, VersionInfo } +import mesosphere.marathon.state.{ PathId, Timestamp, UnreachableStrategy, VersionInfo } import play.api.libs.json.Json import scala.concurrent.duration._ @@ -103,6 +103,10 @@ class TaskStatsByVersionTest extends UnitTest { private[this] def runningInstanceStartedAt(version: Timestamp, startingDelay: FiniteDuration): Instance = { val startedAt = (version + startingDelay).millis val agentInfo = AgentInfo(host = "host", agentId = Some("agent"), attributes = Nil) - LegacyAppInstance(TestTaskBuilder.Helper.runningTask(newTaskId(), appVersion = version, startedAt = startedAt), agentInfo) + LegacyAppInstance( + TestTaskBuilder.Helper.runningTask(newTaskId(), appVersion = version, startedAt = startedAt), + agentInfo, + unreachableStrategy = UnreachableStrategy.default() + ) } } diff --git a/src/test/scala/mesosphere/marathon/core/appinfo/impl/AppInfoBaseDataTest.scala b/src/test/scala/mesosphere/marathon/core/appinfo/impl/AppInfoBaseDataTest.scala index 11e540f2925..3c8c2944e68 100644 --- a/src/test/scala/mesosphere/marathon/core/appinfo/impl/AppInfoBaseDataTest.scala +++ b/src/test/scala/mesosphere/marathon/core/appinfo/impl/AppInfoBaseDataTest.scala @@ -417,7 +417,9 @@ class AppInfoBaseDataTest extends UnitTest with GroupCreation { agentInfo = Instance.AgentInfo("", None, Nil), state = InstanceState(None, tasks, f.clock.now()), tasksMap = tasks, - runSpecVersion = pod.version) + runSpecVersion = pod.version, + unreachableStrategy = UnreachableStrategy.default() + ) } "pod statuses xref the correct spec versions" in { @@ -469,4 +471,4 @@ class AppInfoBaseDataTest extends UnitTest with GroupCreation { maybeStatus3 should be ('empty) } } -} \ No newline at end of file +} diff --git a/src/test/scala/mesosphere/marathon/core/health/HealthCheckTest.scala b/src/test/scala/mesosphere/marathon/core/health/HealthCheckTest.scala index 69175f1798e..984aa85ed03 100644 --- a/src/test/scala/mesosphere/marathon/core/health/HealthCheckTest.scala +++ b/src/test/scala/mesosphere/marathon/core/health/HealthCheckTest.scala @@ -299,7 +299,7 @@ class HealthCheckTest extends UnitTest { val hostPorts = Seq(4321) t.copy(status = t.status.copy(networkInfo = NetworkInfo(hostName, hostPorts, ipAddresses = Nil))) } - val instance = LegacyAppInstance(task, agentInfo) + val instance = LegacyAppInstance(task, agentInfo, unreachableStrategy = UnreachableStrategy.default()) assert(check.effectivePort(app, instance) == 4321) } diff --git a/src/test/scala/mesosphere/marathon/core/health/impl/HealthCheckWorkerActorTest.scala b/src/test/scala/mesosphere/marathon/core/health/impl/HealthCheckWorkerActorTest.scala index 1104c7cce80..e7dbb8a7277 100644 --- a/src/test/scala/mesosphere/marathon/core/health/impl/HealthCheckWorkerActorTest.scala +++ b/src/test/scala/mesosphere/marathon/core/health/impl/HealthCheckWorkerActorTest.scala @@ -11,7 +11,7 @@ import mesosphere.marathon.core.instance.Instance.AgentInfo import mesosphere.marathon.core.instance.{ LegacyAppInstance, TestTaskBuilder } import mesosphere.marathon.core.task.Task import mesosphere.marathon.core.task.state.NetworkInfo -import mesosphere.marathon.state.{ AppDefinition, PathId } +import mesosphere.marathon.state.{ AppDefinition, PathId, UnreachableStrategy } import scala.collection.immutable.Seq import scala.concurrent.Future @@ -38,7 +38,7 @@ class HealthCheckWorkerActorTest extends AkkaUnitTest with ImplicitSender { val hostPorts = Seq(socketPort) t.copy(status = t.status.copy(networkInfo = NetworkInfo(hostName, hostPorts, ipAddresses = Nil))) } - val instance = LegacyAppInstance(task, agentInfo) + val instance = LegacyAppInstance(task, agentInfo, UnreachableStrategy.default()) val ref = TestActorRef[HealthCheckWorkerActor](Props(classOf[HealthCheckWorkerActor])) ref ! HealthCheckJob(app, instance, MarathonTcpHealthCheck(portIndex = Some(PortReference(0)))) @@ -68,7 +68,7 @@ class HealthCheckWorkerActorTest extends AkkaUnitTest with ImplicitSender { val hostPorts = Seq(socketPort) t.copy(status = t.status.copy(networkInfo = NetworkInfo(hostName, hostPorts, ipAddresses = Nil))) } - val instance = LegacyAppInstance(task, agentInfo) + val instance = LegacyAppInstance(task, agentInfo, UnreachableStrategy.default()) val ref = TestActorRef[HealthCheckWorkerActor](Props(classOf[HealthCheckWorkerActor])) ref ! HealthCheckJob(app, instance, MarathonTcpHealthCheck(portIndex = Some(PortReference(0)))) diff --git a/src/test/scala/mesosphere/marathon/core/instance/InstanceFormatTest.scala b/src/test/scala/mesosphere/marathon/core/instance/InstanceFormatTest.scala index 49ded13d542..f394eb5d506 100644 --- a/src/test/scala/mesosphere/marathon/core/instance/InstanceFormatTest.scala +++ b/src/test/scala/mesosphere/marathon/core/instance/InstanceFormatTest.scala @@ -2,42 +2,47 @@ package mesosphere.marathon package core.instance import mesosphere.UnitTest -import mesosphere.marathon.state.UnreachableStrategy +import mesosphere.marathon.state.{ UnreachableStrategy, UnreachableDisabled, UnreachableEnabled } import play.api.libs.json._ import scala.concurrent.duration._ class InstanceFormatTest extends UnitTest { - import Instance._ - "Instance.unreachableStrategyFormat" should { - "parse a proper JSON" in { - val json = Json.parse("""{ "inactiveAfter": 1, "expungeAfter": 2 }""") - json.as[UnreachableStrategy].inactiveAfter should be(1.second) - json.as[UnreachableStrategy].expungeAfter should be(2.seconds) - } + val template = Json.obj( + "instanceId" -> Json.obj( + "idString" -> "app.instance-1337"), + "tasksMap" -> Json.obj(), + "runSpecVersion" -> "2015-01-01", + "agentInfo" -> Json.obj( + "host" -> "localhost", + "attributes" -> Json.arr()), + "state" -> Json.obj( + "since" -> "2015-01-01", + "condition" -> Json.obj ("str" -> "Running"))) + + "Instance.instanceFormat" should { + "parse a valid unreachable strategy" in { + val json = template ++ Json.obj( + "unreachableStrategy" -> Json.obj( + "inactiveAfterSeconds" -> 1, "expungeAfterSeconds" -> 2)) + val instance = json.as[Instance] - "not parse a JSON with empty fields" in { - val json = Json.parse("""{ "unreachableExpungeAfter": 2 }""") - a[JsResultException] should be thrownBy { json.as[UnreachableStrategy] } + instance.unreachableStrategy shouldBe (UnreachableEnabled(inactiveAfter = 1.second, expungeAfter = 2.seconds)) } - } + "parse a disabled unreachable strategy" in { + val json = template ++ Json.obj("unreachableStrategy" -> "disabled") + val instance = json.as[Instance] + + instance.unreachableStrategy shouldBe (UnreachableDisabled) + } - "Instance.instanceFormat" should { "fill UnreachableStrategy with defaults if empty" in { - val json = Json.parse( - """{ "instanceId": { "idString": "app.instance-1337" }, - | "tasksMap": {}, - | "runSpecVersion": "2015-01-01", - | "agentInfo": { "host": "localhost", "attributes": [] }, - | "state": { "since": "2015-01-01", "condition": { "str": "Running" } } - |}""".stripMargin) - val instance = json.as[Instance] + val instance = template.as[Instance] - instance.unreachableStrategy.inactiveAfter should be(UnreachableStrategy.DefaultEphemeralInactiveAfter) - instance.unreachableStrategy.expungeAfter should be(UnreachableStrategy.DefaultEphemeralExpungeAfter) + instance.unreachableStrategy shouldBe (UnreachableStrategy.default(resident = false)) } } } diff --git a/src/test/scala/mesosphere/marathon/core/instance/InstanceStateTest.scala b/src/test/scala/mesosphere/marathon/core/instance/InstanceStateTest.scala index f335b685d3a..c41b7162545 100644 --- a/src/test/scala/mesosphere/marathon/core/instance/InstanceStateTest.scala +++ b/src/test/scala/mesosphere/marathon/core/instance/InstanceStateTest.scala @@ -6,7 +6,7 @@ import mesosphere.marathon.core.base.ConstantClock import mesosphere.marathon.core.condition.Condition import mesosphere.marathon.core.task.Task import mesosphere.marathon.core.task.bus.MesosTaskStatusTestHelper -import mesosphere.marathon.state.Timestamp +import mesosphere.marathon.state.{ Timestamp, UnreachableEnabled } import mesosphere.marathon.state.PathId._ import org.scalatest.prop.TableDrivenPropertyChecks @@ -121,7 +121,8 @@ class InstanceStateTest extends UnitTest with TableDrivenPropertyChecks { val tasks = f.tasks(conditions).values - val actualCondition = Instance.InstanceState.conditionFromTasks(tasks, f.clock.now, 5.minutes) + val actualCondition = Instance.InstanceState.conditionFromTasks( + tasks, f.clock.now, UnreachableEnabled(5.minutes)) s"return condition $expected" in { actualCondition should be(expected) } } @@ -132,7 +133,8 @@ class InstanceStateTest extends UnitTest with TableDrivenPropertyChecks { it should { "return Unknown for an empty task list" in { val f = new Fixture() - val result = Instance.InstanceState.conditionFromTasks(Iterable.empty, f.clock.now(), 5.minutes) + val result = Instance.InstanceState.conditionFromTasks( + Iterable.empty, f.clock.now(), UnreachableEnabled(5.minutes)) result should be(Condition.Unknown) } diff --git a/src/test/scala/mesosphere/marathon/core/instance/InstanceTest.scala b/src/test/scala/mesosphere/marathon/core/instance/InstanceTest.scala index 5b21fe083d3..51ebe1bbacf 100644 --- a/src/test/scala/mesosphere/marathon/core/instance/InstanceTest.scala +++ b/src/test/scala/mesosphere/marathon/core/instance/InstanceTest.scala @@ -8,7 +8,7 @@ import mesosphere.marathon.core.condition.Condition._ import mesosphere.marathon.core.task.Task import mesosphere.marathon.core.task.bus.MesosTaskStatusTestHelper import mesosphere.marathon.state.PathId._ -import mesosphere.marathon.state.Timestamp +import mesosphere.marathon.state.{ Timestamp, UnreachableStrategy } import org.scalatest.prop.TableDrivenPropertyChecks class InstanceTest extends UnitTest with TableDrivenPropertyChecks { @@ -116,7 +116,8 @@ class InstanceTest extends UnitTest with TableDrivenPropertyChecks { val currentTasks = tasks(conditions.map(_ => condition)) val newTasks = tasks(conditions) val state = Instance.InstanceState(None, currentTasks, Timestamp.now()) - val instance = Instance(Instance.Id.forRunSpec(id), agentInfo, state, currentTasks, runSpecVersion = Timestamp.now()) + val instance = Instance(Instance.Id.forRunSpec(id), agentInfo, state, currentTasks, + runSpecVersion = Timestamp.now(), UnreachableStrategy.default()) (instance, newTasks) } } diff --git a/src/test/scala/mesosphere/marathon/core/instance/TestInstanceBuilder.scala b/src/test/scala/mesosphere/marathon/core/instance/TestInstanceBuilder.scala index f4cdc0bbd98..babe6c1dbf4 100644 --- a/src/test/scala/mesosphere/marathon/core/instance/TestInstanceBuilder.scala +++ b/src/test/scala/mesosphere/marathon/core/instance/TestInstanceBuilder.scala @@ -7,7 +7,7 @@ import mesosphere.marathon.core.instance.update.{ InstanceUpdateOperation, Insta import mesosphere.marathon.core.pod.MesosContainer import mesosphere.marathon.core.task.Task import mesosphere.marathon.core.task.state.NetworkInfoPlaceholder -import mesosphere.marathon.state.{ PathId, Timestamp } +import mesosphere.marathon.state.{ PathId, Timestamp, UnreachableStrategy } import org.apache.mesos import scala.collection.immutable.Seq @@ -119,7 +119,8 @@ object TestInstanceBuilder { agentInfo = TestInstanceBuilder.defaultAgentInfo, state = InstanceState(Condition.Created, now, None, healthy = None), tasksMap = Map.empty, - runSpecVersion = version + runSpecVersion = version, + UnreachableStrategy.default() ) private val defaultAgentInfo = Instance.AgentInfo(host = "host.some", agentId = None, attributes = Seq.empty) diff --git a/src/test/scala/mesosphere/marathon/core/instance/update/InstanceUpdaterTest.scala b/src/test/scala/mesosphere/marathon/core/instance/update/InstanceUpdaterTest.scala index 7da08b5534d..92ea1191c53 100644 --- a/src/test/scala/mesosphere/marathon/core/instance/update/InstanceUpdaterTest.scala +++ b/src/test/scala/mesosphere/marathon/core/instance/update/InstanceUpdaterTest.scala @@ -12,7 +12,7 @@ import mesosphere.marathon.core.task.Task import mesosphere.marathon.core.task.bus.{ MesosTaskStatusTestHelper, TaskStatusUpdateTestHelper } import mesosphere.marathon.core.task.state.NetworkInfoPlaceholder import mesosphere.marathon.raml.Resources -import mesosphere.marathon.state.{ PathId, UnreachableStrategy } +import mesosphere.marathon.state.{ PathId, UnreachableEnabled, UnreachableStrategy } import org.apache.mesos.Protos.TaskState.TASK_UNREACHABLE import scala.concurrent.duration._ @@ -63,7 +63,7 @@ class InstanceUpdaterTest extends UnitTest { "processing an expired unreachable" should { val f = new Fixture - val unreachableInactiveAfter = f.instance.unreachableStrategy.inactiveAfter + val unreachableInactiveAfter = f.instance.unreachableStrategy.asInstanceOf[UnreachableEnabled].inactiveAfter val newMesosStatus = MesosTaskStatusTestHelper.unreachable(f.taskId, since = f.clock.now()) // Forward time to expire unreachable status @@ -192,7 +192,7 @@ class InstanceUpdaterTest extends UnitTest { val unreachableStatus = f.taskStatus.copy(startedAt = None, condition = Condition.Unreachable, mesosStatus = Some(mesosTaskStatus)) val unreachableTask = f.task.copy(status = unreachableStatus) val unreachableState = f.instanceState.copy(condition = Condition.Unreachable) - val unreachableStrategy = UnreachableStrategy(inactiveAfter = 30.minutes, expungeAfter = 1.hour) + val unreachableStrategy = UnreachableEnabled(inactiveAfter = 30.minutes, expungeAfter = 1.hour) val unreachableInstance = f.instance.copy( tasksMap = Map(f.taskId -> unreachableTask), state = unreachableState, @@ -218,7 +218,7 @@ class InstanceUpdaterTest extends UnitTest { val unreachableStatus = f.taskStatus.copy(startedAt = None, condition = Condition.Unreachable, mesosStatus = Some(mesosTaskStatus)) val unreachableTask = f.task.copy(status = unreachableStatus) val unreachableInactiveState = f.instanceState.copy(condition = Condition.UnreachableInactive) - val unreachableStrategy = UnreachableStrategy(inactiveAfter = 1.minute, expungeAfter = 1.hour) + val unreachableStrategy = UnreachableEnabled(inactiveAfter = 1.minute, expungeAfter = 1.hour) val unreachableInactiveInstance = f.instance.copy( tasksMap = Map(f.taskId -> unreachableTask), state = unreachableInactiveState, @@ -305,6 +305,8 @@ class InstanceUpdaterTest extends UnitTest { networkInfo = NetworkInfoPlaceholder() ) val task = Task.LaunchedEphemeral(taskId, runSpecVersion = clock.now(), status = taskStatus) - val instance = Instance(Instance.Id("foobar.instance-baz"), agentInfo, instanceState, Map(taskId -> task), clock.now()) + val instance = Instance( + Instance.Id("foobar.instance-baz"), agentInfo, instanceState, Map(taskId -> task), clock.now(), + UnreachableStrategy.default()) } } diff --git a/src/test/scala/mesosphere/marathon/core/task/jobs/ExpungeOverdueLostTasksActorTest.scala b/src/test/scala/mesosphere/marathon/core/task/jobs/ExpungeOverdueLostTasksActorTest.scala index e8cc9934f4b..73b0c8ad901 100644 --- a/src/test/scala/mesosphere/marathon/core/task/jobs/ExpungeOverdueLostTasksActorTest.scala +++ b/src/test/scala/mesosphere/marathon/core/task/jobs/ExpungeOverdueLostTasksActorTest.scala @@ -13,7 +13,7 @@ import mesosphere.marathon.core.task.jobs.impl.{ ExpungeOverdueLostTasksActor, E import mesosphere.marathon.core.task.tracker.InstanceTracker.InstancesBySpec import mesosphere.marathon.core.task.tracker.{ InstanceTracker, TaskStateOpProcessor } import mesosphere.marathon.state.PathId._ -import mesosphere.marathon.state.{ Timestamp, UnreachableStrategy } +import mesosphere.marathon.state.{ Timestamp, UnreachableEnabled, UnreachableDisabled, UnreachableStrategy } import mesosphere.marathon.test.MarathonTestHelper import org.scalatest.prop.TableDrivenPropertyChecks @@ -27,7 +27,7 @@ class ExpungeOverdueLostTasksActorTest extends AkkaUnitTest with TableDrivenProp val config = MarathonTestHelper.defaultConfig(maxTasksPerOffer = 10) val stateOpProcessor: TaskStateOpProcessor = mock[TaskStateOpProcessor] val taskTracker: InstanceTracker = mock[InstanceTracker] - val strategy = UnreachableStrategy(5.minutes, 10.minutes) + val strategy = UnreachableEnabled(5.minutes, 10.minutes) } def withActor(testCode: (Fixture, ActorRef) => Any): Unit = { @@ -60,26 +60,32 @@ class ExpungeOverdueLostTasksActorTest extends AkkaUnitTest with TableDrivenProp // format: OFF // Different task configuration with startedAt, status since and condition values. Expunge indicates whether an // expunge is expected or not. + val fiveTen = UnreachableEnabled(5.minutes, 10.minutes) + val disabled = UnreachableDisabled val taskCases = Table( - ("name", "startedAt", "since", "condition", "expunge"), - ("running", Timestamp.zero, Timestamp.zero, Condition.Running, false ), - ("expired inactive", Timestamp.zero, f.clock.now - f.strategy.expungeAfter - 1.minute, Condition.UnreachableInactive, true ), - ("unreachable", Timestamp.zero, f.clock.now - f.strategy.inactiveAfter, Condition.Unreachable, false ) + ("name", "startedAt", "since", "unreachableStrategy", "condition", "expunge"), + ("running", Timestamp.zero, Timestamp.zero, fiveTen, Condition.Running, false ), + ("expired inactive", Timestamp.zero, f.clock.now - fiveTen.expungeAfter - 1.minute, fiveTen, Condition.UnreachableInactive, true ), + ("unreachable", Timestamp.zero, f.clock.now - 5.minutes, fiveTen, Condition.Unreachable, false ), + ("expired disabled", Timestamp.zero, f.clock.now - 365.days, disabled, Condition.Unreachable, false ) ) // format: ON - forAll(taskCases) { (name: String, startedAt: Timestamp, since: Timestamp, condition: Condition, expunge: Boolean) => + forAll(taskCases) { (name: String, startedAt: Timestamp, since: Timestamp, unreachableStrategy: UnreachableStrategy, condition: Condition, expunge: Boolean) => When(s"filtering $name task since $since") val instance: Instance = (condition match { - case Condition.Unreachable => TestInstanceBuilder.newBuilder("/unreachable".toPath).addTaskUnreachable(since = since).getInstance() - case Condition.UnreachableInactive => TestInstanceBuilder.newBuilder("/unreachable".toPath).addTaskUnreachableInactive(since = since).getInstance() - case _ => TestInstanceBuilder.newBuilder("/running".toPath).addTaskRunning(startedAt = startedAt).getInstance() - }).copy(unreachableStrategy = f.strategy) + case Condition.Unreachable => + TestInstanceBuilder.newBuilder("/unreachable".toPath).addTaskUnreachable(since = since).getInstance() + case Condition.UnreachableInactive => + TestInstanceBuilder.newBuilder("/unreachable".toPath).addTaskUnreachableInactive(since = since).getInstance() + case _ => + TestInstanceBuilder.newBuilder("/running".toPath).addTaskRunning(startedAt = startedAt).getInstance() + }).copy(unreachableStrategy = unreachableStrategy) val instances = InstancesBySpec.forInstances(instance).instancesMap - val filterForExpunge = businessLogic.filterOverdueUnreachableInactive(instances, f.clock.now()).map(identity) + val filterForExpunge = businessLogic.filterUnreachableForExpunge(instances, f.clock.now()).map(identity) - Then(s"${if (!expunge) "not" else ""} select it for expunge") + Then(s"${if (!expunge) "not " else ""}select it for expunge") filterForExpunge.nonEmpty should be(expunge) } @@ -92,7 +98,7 @@ class ExpungeOverdueLostTasksActorTest extends AkkaUnitTest with TableDrivenProp .copy(unreachableStrategy = f.strategy) val instances = InstancesBySpec.forInstances(running1, running2).instancesMap - val filtered = businessLogic.filterOverdueUnreachableInactive(instances, f.clock.now()).map(identity) + val filtered = businessLogic.filterUnreachableForExpunge(instances, f.clock.now()).map(identity) Then("return an empty collection") filtered.isEmpty should be(true) @@ -107,7 +113,7 @@ class ExpungeOverdueLostTasksActorTest extends AkkaUnitTest with TableDrivenProp val instances2 = InstancesBySpec.forInstances(inactive1, inactive2).instancesMap - val filtered2 = businessLogic.filterOverdueUnreachableInactive(instances2, f.clock.now()).map(identity) + val filtered2 = businessLogic.filterUnreachableForExpunge(instances2, f.clock.now()).map(identity) Then("return the expired Unreachable tasks") filtered2 should be(Iterable(inactive1, inactive2)) diff --git a/src/test/scala/mesosphere/marathon/core/task/update/impl/steps/PostToEventStreamStepImplTest.scala b/src/test/scala/mesosphere/marathon/core/task/update/impl/steps/PostToEventStreamStepImplTest.scala index e8bd68b1469..af271c6caa4 100644 --- a/src/test/scala/mesosphere/marathon/core/task/update/impl/steps/PostToEventStreamStepImplTest.scala +++ b/src/test/scala/mesosphere/marathon/core/task/update/impl/steps/PostToEventStreamStepImplTest.scala @@ -9,6 +9,7 @@ import mesosphere.marathon.core.event.{ InstanceHealthChanged, MarathonEvent } import mesosphere.marathon.core.instance.Instance.InstanceState import mesosphere.marathon.core.instance.update._ import mesosphere.marathon.core.instance.Instance +import mesosphere.marathon.state.UnreachableStrategy import scala.collection.immutable.Seq @@ -107,7 +108,10 @@ class PostToEventStreamStepImplTest extends UnitTest { val agentInfo = Instance.AgentInfo("localhost", None, Seq.empty) val instanceState = InstanceState(Condition.Running, clock.now(), Some(clock.now()), healthy = None) - val instance = Instance(Instance.Id("foobar.instance-baz"), agentInfo, instanceState, Map.empty, clock.now()) + val instance = Instance( + Instance.Id("foobar.instance-baz"), agentInfo, instanceState, Map.empty, clock.now(), + UnreachableStrategy.default() + ) val eventStream = mock[EventStream] val step = new PostToEventStreamStepImpl(eventStream) diff --git a/src/test/scala/mesosphere/marathon/integration/TaskUnreachableIntegrationTest.scala b/src/test/scala/mesosphere/marathon/integration/TaskUnreachableIntegrationTest.scala index 120111a5221..0e2669ce63a 100644 --- a/src/test/scala/mesosphere/marathon/integration/TaskUnreachableIntegrationTest.scala +++ b/src/test/scala/mesosphere/marathon/integration/TaskUnreachableIntegrationTest.scala @@ -1,13 +1,13 @@ package mesosphere.marathon package integration +import mesosphere.marathon.state.UnreachableEnabled import scala.concurrent.duration._ import mesosphere.AkkaIntegrationTest import mesosphere.marathon.Protos.Constraint.Operator import mesosphere.marathon.api.v2.json.AppUpdate import mesosphere.marathon.integration.facades.ITEnrichedTask import mesosphere.marathon.integration.setup._ -import mesosphere.marathon.state.UnreachableStrategy import org.scalatest.Inside @SerialIntegrationTest @@ -41,7 +41,7 @@ class TaskUnreachableIntegrationTest extends AkkaIntegrationTest with EmbeddedMa "TaskUnreachable" should { "A task unreachable update will trigger a replacement task" in { Given("a new app with proper timeouts") - val strategy = UnreachableStrategy(10.seconds, 5.minutes) + val strategy = UnreachableEnabled(10.seconds, 5.minutes) val app = appProxy(testBasePath / "unreachable", "v1", instances = 1, healthCheck = None).copy(unreachableStrategy = strategy) waitForDeployment(marathon.createAppV2(app)) val task = waitForTasks(app.id, 1).head @@ -96,7 +96,7 @@ class TaskUnreachableIntegrationTest extends AkkaIntegrationTest with EmbeddedMa // start both slaves mesosCluster.agents.foreach(_.start()) - val strategy = UnreachableStrategy(5.minutes, 10.minutes) + val strategy = UnreachableEnabled(5.minutes, 10.minutes) val app = appProxy(testBasePath / "regression", "v1", instances = 2, healthCheck = None) .copy(constraints = Set(constraint), unreachableStrategy = strategy) diff --git a/src/test/scala/mesosphere/marathon/raml/PodStatusConversionTest.scala b/src/test/scala/mesosphere/marathon/raml/PodStatusConversionTest.scala index 10e53688186..53c90ff83fd 100644 --- a/src/test/scala/mesosphere/marathon/raml/PodStatusConversionTest.scala +++ b/src/test/scala/mesosphere/marathon/raml/PodStatusConversionTest.scala @@ -483,7 +483,8 @@ object PodStatusConversionTest { ) ) ).map(t => t.taskId -> t)(collection.breakOut), - runSpecVersion = pod.version + runSpecVersion = pod.version, + unreachableStrategy = state.UnreachableStrategy.default() ) InstanceFixture(since, agentInfo, taskIds, instance) diff --git a/src/test/scala/mesosphere/marathon/raml/UnreachableStrategyConversionTest.scala b/src/test/scala/mesosphere/marathon/raml/UnreachableStrategyConversionTest.scala index 0a852d19767..313b36a5b4a 100644 --- a/src/test/scala/mesosphere/marathon/raml/UnreachableStrategyConversionTest.scala +++ b/src/test/scala/mesosphere/marathon/raml/UnreachableStrategyConversionTest.scala @@ -10,20 +10,20 @@ class UnreachableStrategyConversionTest extends UnitTest { "UnreachableStrategyConversion" should { "read from RAML" in { - val raml = UnreachableStrategy() + val raml = UnreachableEnabled() val result: state.UnreachableStrategy = UnreachableStrategyConversion.ramlUnreachableStrategyRead(raml) - result.inactiveAfter should be(state.UnreachableStrategy.DefaultEphemeralInactiveAfter) - result.expungeAfter should be(state.UnreachableStrategy.DefaultEphemeralExpungeAfter) + result shouldBe (state.UnreachableStrategy.default(resident = false)) } } it should { "write to RAML" in { - val strategy = state.UnreachableStrategy(10.minutes, 20.minutes) + val strategy = state.UnreachableEnabled(10.minutes, 20.minutes) - val raml: UnreachableStrategy = UnreachableStrategyConversion.ramlUnreachableStrategyWrite(strategy) + val raml = UnreachableStrategyConversion.ramlUnreachableStrategyWrite(strategy). + asInstanceOf[UnreachableEnabled] raml.inactiveAfterSeconds should be(600) raml.expungeAfterSeconds should be(1200) diff --git a/src/test/scala/mesosphere/marathon/state/AppDefinitionTest.scala b/src/test/scala/mesosphere/marathon/state/AppDefinitionTest.scala index 9b6f9c6f473..d8ceed5f411 100644 --- a/src/test/scala/mesosphere/marathon/state/AppDefinitionTest.scala +++ b/src/test/scala/mesosphere/marathon/state/AppDefinitionTest.scala @@ -269,7 +269,7 @@ class AppDefinitionTest extends UnitTest { "three" -> "ccc" ), versionInfo = fullVersion, - unreachableStrategy = UnreachableStrategy(998.seconds, 999.seconds) + unreachableStrategy = UnreachableEnabled(998.seconds, 999.seconds) ) val result1 = AppDefinition(id = runSpecId).mergeFromProto(app1.toProto) assert(result1 == app1) diff --git a/src/test/scala/mesosphere/marathon/state/UnreachableStrategyTest.scala b/src/test/scala/mesosphere/marathon/state/UnreachableStrategyTest.scala index 1935dd9fcf8..52919f4bf6a 100644 --- a/src/test/scala/mesosphere/marathon/state/UnreachableStrategyTest.scala +++ b/src/test/scala/mesosphere/marathon/state/UnreachableStrategyTest.scala @@ -10,28 +10,41 @@ class UnreachableStrategyTest extends UnitTest with ResultMatchers { "UnreachableStrategy.unreachableStrategyValidator" should { "validate default strategy" in { - val strategy = UnreachableStrategy() + val strategy = UnreachableEnabled() UnreachableStrategy.unreachableStrategyValidator(strategy) shouldBe aSuccess } "validate with other parameters successfully" in { - val strategy = UnreachableStrategy(13.minutes, 37.minutes) + val strategy = UnreachableEnabled(13.minutes, 37.minutes) UnreachableStrategy.unreachableStrategyValidator(strategy) shouldBe aSuccess } - "fail with invalid time until inactive" in { - val strategy = UnreachableStrategy(inactiveAfter = 0.second) - UnreachableStrategy.unreachableStrategyValidator(strategy) should failWith("inactiveAfter" -> "got 0 seconds, expected 1 second or more") + "sees disabled as valid" in { + val strategy = UnreachableDisabled + UnreachableStrategy.unreachableStrategyValidator(strategy) shouldBe aSuccess } "fail when time until expunge is smaller" in { - val strategy = UnreachableStrategy(inactiveAfter = 2.seconds, expungeAfter = 1.second) - UnreachableStrategy.unreachableStrategyValidator(strategy) should failWith("inactiveAfter" -> "got 2 seconds, expected less than 1 second") + val strategy = UnreachableEnabled(inactiveAfter = 2.seconds, expungeAfter = 1.second) + UnreachableStrategy.unreachableStrategyValidator(strategy) should failWith( + "inactiveAfter" -> "got 2 seconds, expected less than 1 second") } "fail when time until expunge is equal to time until inactive" in { - val strategy = UnreachableStrategy(inactiveAfter = 2.seconds, expungeAfter = 2.seconds) - UnreachableStrategy.unreachableStrategyValidator(strategy) should failWith("inactiveAfter" -> "got 2 seconds, expected less than 2 seconds") + val strategy = UnreachableEnabled(inactiveAfter = 2.seconds, expungeAfter = 2.seconds) + UnreachableStrategy.unreachableStrategyValidator(strategy) should failWith( + "inactiveAfter" -> "got 2 seconds, expected less than 2 seconds") } } + + "toProto" should { + Seq( + UnreachableDisabled, + UnreachableEnabled(10.seconds, 20.seconds)).foreach { unreachableStrategy => + + s"round trip serializes ${unreachableStrategy}" in { + UnreachableStrategy.fromProto(unreachableStrategy.toProto) shouldBe (unreachableStrategy) + } + } + } } diff --git a/src/test/scala/mesosphere/marathon/tasks/InstanceOpFactoryImplTest.scala b/src/test/scala/mesosphere/marathon/tasks/InstanceOpFactoryImplTest.scala index 9ccb9d4cbb2..71677b3b4d1 100644 --- a/src/test/scala/mesosphere/marathon/tasks/InstanceOpFactoryImplTest.scala +++ b/src/test/scala/mesosphere/marathon/tasks/InstanceOpFactoryImplTest.scala @@ -63,7 +63,9 @@ class InstanceOpFactoryImplTest extends UnitTest { attributes = Vector.empty ) - val expectedInstance = Instance(expectedTaskId.instanceId, expectedAgentInfo, instance.state, Map(expectedTaskId -> expectedTask), runSpecVersion = app.version) + val expectedInstance = Instance( + expectedTaskId.instanceId, expectedAgentInfo, instance.state, Map(expectedTaskId -> expectedTask), + runSpecVersion = app.version, app.unreachableStrategy) assert(matched.instanceOp.stateOp == InstanceUpdateOperation.LaunchEphemeral(expectedInstance)) } diff --git a/src/test/scala/mesosphere/marathon/test/MarathonTestHelper.scala b/src/test/scala/mesosphere/marathon/test/MarathonTestHelper.scala index d9ec6d788c4..54b733c7de6 100644 --- a/src/test/scala/mesosphere/marathon/test/MarathonTestHelper.scala +++ b/src/test/scala/mesosphere/marathon/test/MarathonTestHelper.scala @@ -333,7 +333,8 @@ object MarathonTestHelper { agentInfo = Instance.AgentInfo("", None, Nil), state = InstanceState(Condition.Created, since = clock.now(), None, healthy = None), tasksMap = Map.empty[Task.Id, Task], - runSpecVersion = clock.now() + runSpecVersion = clock.now(), + UnreachableStrategy.default() ) def createTaskTracker( diff --git a/src/test/scala/mesosphere/marathon/upgrade/ReadinessBehaviorTest.scala b/src/test/scala/mesosphere/marathon/upgrade/ReadinessBehaviorTest.scala index 68a04504cb1..fe38ce29982 100644 --- a/src/test/scala/mesosphere/marathon/upgrade/ReadinessBehaviorTest.scala +++ b/src/test/scala/mesosphere/marathon/upgrade/ReadinessBehaviorTest.scala @@ -219,7 +219,9 @@ class ReadinessBehaviorTest extends AkkaUnitTest with Eventually with GroupCreat def instance = { val task = mockTask - val instance = Instance(instanceId, agentInfo, InstanceState(Running, version, Some(version), healthy = Some(true)), Map(task.taskId -> task), runSpecVersion = version) + val instance = Instance( + instanceId, agentInfo, InstanceState(Running, version, Some(version), healthy = Some(true)), + Map(task.taskId -> task), runSpecVersion = version, UnreachableStrategy.default()) tracker.instance(any) returns Future.successful(Some(instance)) instance }