Skip to content

Commit

Permalink
Handle polling errors and update status appropriately (kubernetes-ret…
Browse files Browse the repository at this point in the history
…ired#2368)

* Handle polling errors and update status appropriately

* Added back the log message about polling error
  • Loading branch information
nilebox authored and k8s-ci-robot committed Oct 1, 2018
1 parent 11af9a0 commit b1a1c45
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 20 deletions.
57 changes: 44 additions & 13 deletions pkg/controller/controller_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,21 +964,27 @@ func (c *controller) pollServiceInstance(instance *v1beta1.ServiceInstance) erro
return c.finishPollingServiceInstance(instance)
}

// We got some kind of error and should continue polling.
//
// The instance's Ready condition should already be False, so
// we just need to record an event.
reason := errorPollingLastOperationReason
message := fmt.Sprintf("Error polling last operation: %v", err)
glog.V(4).Info(pcb.Message(message))
c.recorder.Event(instance, corev1.EventTypeWarning, reason, message)
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionFalse, reason, message)

if c.reconciliationRetryDurationExceeded(instance.Status.OperationStartTime) {
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionFalse, reason, message)
return c.processServiceInstancePollingFailureRetryTimeout(instance, readyCond)
}

return c.continuePollingServiceInstance(instance)
if httpErr, ok := osb.IsHTTPError(err); ok {
if isRetriableHTTPStatus(httpErr.StatusCode) {
return c.processServiceInstancePollingTemporaryFailure(instance, readyCond)
}
// A failure with a given HTTP response code is treated as a terminal
// failure.
failedCond := newServiceInstanceFailedCondition(v1beta1.ConditionTrue, reason, message)
return c.processServiceInstancePollingTerminalFailure(instance, readyCond, failedCond)
}

// Unknown error: update status and continue polling
return c.processServiceInstancePollingTemporaryFailure(instance, readyCond)
}

description := "(no description provided)"
Expand Down Expand Up @@ -1071,9 +1077,11 @@ func (c *controller) pollServiceInstance(instance *v1beta1.ServiceInstance) erro

return c.finishPollingServiceInstance(instance)
default:
glog.Warning(pcb.Messagef("Got invalid state in LastOperationResponse: %q", response.State))
message := pcb.Messagef("Got invalid state in LastOperationResponse: %q", response.State)
glog.Warning(message)
if c.reconciliationRetryDurationExceeded(instance.Status.OperationStartTime) {
return c.processServiceInstancePollingFailureRetryTimeout(instance, nil)
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionUnknown, errorPollingLastOperationReason, message)
return c.processServiceInstancePollingFailureRetryTimeout(instance, readyCond)
}

err := fmt.Errorf(`Got invalid state in LastOperationResponse: %q`, response.State)
Expand Down Expand Up @@ -1103,13 +1111,18 @@ func isServiceInstanceProcessedAlready(instance *v1beta1.ServiceInstance) bool {
// processServiceInstancePollingFailureRetryTimeout marks the instance as having
// failed polling due to its reconciliation retry duration expiring
func (c *controller) processServiceInstancePollingFailureRetryTimeout(instance *v1beta1.ServiceInstance, readyCond *v1beta1.ServiceInstanceCondition) error {
msg := "Stopping reconciliation retries because too much time has elapsed"
failedCond := newServiceInstanceFailedCondition(v1beta1.ConditionTrue, errorReconciliationRetryTimeoutReason, msg)
return c.processServiceInstancePollingTerminalFailure(instance, readyCond, failedCond)
}

// processServiceInstancePollingTerminalFailure marks the instance as having
// failed polling due to terminal error
func (c *controller) processServiceInstancePollingTerminalFailure(instance *v1beta1.ServiceInstance, readyCond, failedCond *v1beta1.ServiceInstanceCondition) error {
mitigatingOrphan := instance.Status.OrphanMitigationInProgress
provisioning := instance.Status.CurrentOperation == v1beta1.ServiceInstanceOperationProvision && !mitigatingOrphan
deleting := instance.Status.CurrentOperation == v1beta1.ServiceInstanceOperationDeprovision || mitigatingOrphan

msg := "Stopping reconciliation retries because too much time has elapsed"
failedCond := newServiceInstanceFailedCondition(v1beta1.ConditionTrue, errorReconciliationRetryTimeoutReason, msg)

var err error
switch {
case deleting:
Expand All @@ -1119,16 +1132,34 @@ func (c *controller) processServiceInstancePollingFailureRetryTimeout(instance *
c.finishPollingServiceInstance(instance)
return c.processTerminalProvisionFailure(instance, readyCond, failedCond, true)
default:
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionFalse, errorReconciliationRetryTimeoutReason, msg)
readyCond := newServiceInstanceReadyCondition(v1beta1.ConditionFalse, failedCond.Reason, failedCond.Message)
err = c.processTerminalUpdateServiceInstanceFailure(instance, readyCond, failedCond)
}
if err != nil {
c.recorder.Event(instance, corev1.EventTypeWarning, failedCond.Reason, failedCond.Message)
return c.handleServiceInstancePollingError(instance, err)
}

return c.finishPollingServiceInstance(instance)
}

// processServiceInstancePollingTerminalFailure marks the instance as having
// failed polling with a temporary error
func (c *controller) processServiceInstancePollingTemporaryFailure(instance *v1beta1.ServiceInstance, readyCond *v1beta1.ServiceInstanceCondition) error {
c.recorder.Event(instance, corev1.EventTypeWarning, readyCond.Reason, readyCond.Message)
setServiceInstanceCondition(instance, v1beta1.ServiceInstanceConditionReady, readyCond.Status, readyCond.Reason, readyCond.Message)

if _, err := c.updateServiceInstanceStatus(instance); err != nil {
return c.handleServiceInstancePollingError(instance, err)
}

// The instance will be requeued in any case, since we updated the status
// a few lines above.
// But we still need to return a non-nil error for retriable errors and
// orphan mitigation to avoid resetting the rate limiter.
return fmt.Errorf(readyCond.Message)
}

// resolveReferences checks to see if (Cluster)ServiceClassRef and/or (Cluster)ServicePlanRef are
// nil and if so, will resolve the references and update the instance.
// If references needed to be resolved, and the instance status was successfully updated, the method returns true
Expand Down
79 changes: 72 additions & 7 deletions pkg/controller/controller_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2935,10 +2935,10 @@ func TestPollServiceInstanceStatusGoneDeprovisioningWithOperationNoFinalizer(t *
}
}

// TestPollServiceInstanceClusterServiceBrokerError simulates polling a broker and getting a
// TestPollServiceInstanceClusterServiceBrokerTemporaryError simulates polling a broker and getting a
// Forbidden status on the poll. Test simulates that the ClusterServiceBroker was already
// in the process of being deleted prior to the Forbidden status.
func TestPollServiceInstanceClusterServiceBrokerError(t *testing.T) {
func TestPollServiceInstanceClusterServiceBrokerTemporaryError(t *testing.T) {
fakeKubeClient, fakeCatalogClient, fakeClusterServiceBrokerClient, testController, sharedInformers := newTestController(t, fakeosb.FakeClientConfiguration{
PollLastOperationReaction: &fakeosb.PollLastOperationReaction{
Error: osb.HTTPStatusCodeError{
Expand All @@ -2959,12 +2959,75 @@ func TestPollServiceInstanceClusterServiceBrokerError(t *testing.T) {
}

err := testController.pollServiceInstance(instance)

if err == nil {
t.Fatal("Expected pollServiceInstance to return error")
}
expectedErr := "Error polling last operation: Status: 403; ErrorMessage: <nil>; Description: <nil>; ResponseError: <nil>"
if e, a := expectedErr, err.Error(); e != a {
t.Fatalf("unexpected error returned: expected %q, got %q", e, a)
}

brokerActions := fakeClusterServiceBrokerClient.Actions()
assertNumberOfBrokerActions(t, brokerActions, 1)
operationKey := osb.OperationKey(testOperation)
assertPollLastOperation(t, brokerActions[0], &osb.LastOperationRequest{
InstanceID: testServiceInstanceGUID,
ServiceID: strPtr(testClusterServiceClassGUID),
PlanID: strPtr(testClusterServicePlanGUID),
OperationKey: &operationKey,
})

// verify no kube resources created.
// No actions
kubeActions := fakeKubeClient.Actions()
assertNumberOfActions(t, kubeActions, 0)

actions := fakeCatalogClient.Actions()
assertNumberOfActions(t, actions, 1)
assertUpdateStatus(t, actions[0], instance)

events := getRecordedEvents(testController)

expectedEvent := warningEventBuilder(errorPollingLastOperationReason).msg(
"Error polling last operation:",
).msg("Status: 403; ErrorMessage: <nil>; Description: <nil>; ResponseError: <nil>")
if err := checkEvents(events, expectedEvent.stringArr()); err != nil {
t.Fatal(err)
}
}

// TestPollServiceInstanceClusterServiceBrokerTerminalError simulates polling a broker and getting a
// BadRequest status on the poll. Test simulates that the ClusterServiceBroker was already
// in the process of being deleted prior to the BadRequest status.
func TestPollServiceInstanceClusterServiceBrokerTerminalError(t *testing.T) {
fakeKubeClient, fakeCatalogClient, fakeClusterServiceBrokerClient, testController, sharedInformers := newTestController(t, fakeosb.FakeClientConfiguration{
PollLastOperationReaction: &fakeosb.PollLastOperationReaction{
Error: osb.HTTPStatusCodeError{
StatusCode: http.StatusBadRequest,
},
},
})

sharedInformers.ClusterServiceBrokers().Informer().GetStore().Add(getTestClusterServiceBroker())
sharedInformers.ClusterServiceClasses().Informer().GetStore().Add(getTestClusterServiceClass())
sharedInformers.ClusterServicePlans().Informer().GetStore().Add(getTestClusterServicePlan())

instance := getTestServiceInstanceAsyncDeprovisioning(testOperation)
instanceKey := testNamespace + "/" + testServiceInstanceName

if testController.instancePollingQueue.NumRequeues(instanceKey) != 0 {
t.Fatalf("Expected polling queue to not have any record of test instance")
}

err := testController.pollServiceInstance(instance)

if err != nil {
t.Fatalf("pollServiceInstance failed: %v", err)
}

if testController.instancePollingQueue.NumRequeues(instanceKey) != 1 {
t.Fatalf("Expected polling queue to have record of seeing test instance once")
if testController.instancePollingQueue.NumRequeues(instanceKey) != 0 {
t.Fatalf("Expected polling queue to not have requeues")
}

brokerActions := fakeClusterServiceBrokerClient.Actions()
Expand All @@ -2983,14 +3046,16 @@ func TestPollServiceInstanceClusterServiceBrokerError(t *testing.T) {
assertNumberOfActions(t, kubeActions, 0)

actions := fakeCatalogClient.Actions()
assertNumberOfActions(t, actions, 0)
assertNumberOfActions(t, actions, 1)
assertUpdateStatus(t, actions[0], instance)

events := getRecordedEvents(testController)

expectedEvent := warningEventBuilder(errorPollingLastOperationReason).msg(
"Error polling last operation:",
).msg("Status: 403; ErrorMessage: <nil>; Description: <nil>; ResponseError: <nil>")
if err := checkEvents(events, expectedEvent.stringArr()); err != nil {
).msg("Status: 400; ErrorMessage: <nil>; Description: <nil>; ResponseError: <nil>")
// Event is sent twice: one for Ready condition and one for Failed
if err := checkEvents(events, []string{expectedEvent.String(), expectedEvent.String()}); err != nil {
t.Fatal(err)
}
}
Expand Down

0 comments on commit b1a1c45

Please sign in to comment.