Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep last known state when orchestrator is down #219

Merged
merged 1 commit into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/controller/orchestrator/orchestrator_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func (ou *orcUpdater) Sync(ctx context.Context) (syncer.SyncResult, error) {

if instances, err = ou.orcClient.Cluster(ou.cluster.GetClusterAlias()); err != nil {
log.V(-1).Info("can't get instances from orchestrator", "alias", ou.cluster.GetClusterAlias(), "error", err.Error())
if !orc.IsNotFound(err) {
log.Error(err, "orchestrator is not reachable", "cluster_alias", ou.cluster.GetClusterAlias())
return syncer.SyncResult{}, err
}
}

if len(instances) != 0 {
Expand Down
27 changes: 27 additions & 0 deletions pkg/controller/orchestrator/orchestrator_reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,33 @@ var _ = Describe("Orchestrator reconciler", func() {
})
})

When("orchestrator is not available", func() {
BeforeEach(func() {
// register nodes into orchestrator
cluster.Status.ReadyNodes = 1
_, err := orcSyncer.Sync(context.TODO())
Expect(err).To(Succeed())

// second reconcile event to update cluster status
_, err = orcSyncer.Sync(context.TODO())
Expect(err).To(Succeed())

// check that sync was successful
Expect(cluster.GetNodeStatusFor(cluster.GetPodHostname(0))).To(
haveNodeCondWithStatus(api.NodeConditionMaster, core.ConditionTrue))

// make orchestrator fake client unreachable
orcClient.MakeOrcUnreachable()
})

It("should not reconcile and keep the last known state", func() {
_, err := orcSyncer.Sync(context.TODO())
Expect(err).ToNot(Succeed())

Expect(cluster.GetNodeStatusFor(cluster.GetPodHostname(0))).To(haveNodeCondWithStatus(api.NodeConditionMaster, core.ConditionTrue))
})
})

When("cluster is registered in orchestrator", func() {
BeforeEach(func() {
// AddRecoveries signature: cluster, acked
Expand Down
87 changes: 87 additions & 0 deletions pkg/orchestrator/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright 2018 Pressinfra SRL

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package orchestrator

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
)

// Error contains orchestrator error details
type Error struct {
HTTPStatus int
Path string
Message string
Details interface{}
}

func (e Error) Error() string {
return fmt.Sprintf("[orc]: status: %d path: %s msg: %s, details: %v",
e.HTTPStatus, e.Path, e.Message, e.Details)
}

// NewError returns a specific orchestrator error with extra details
func NewError(resp *http.Response, path string, details interface{}) *Error {
rsp := &Error{
HTTPStatus: resp.StatusCode,
Path: path,
Details: details,
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
rsp.Message = "<<Can't read body>>"
return rsp
}

if err = json.Unmarshal(body, rsp); err != nil {
log.V(-1).Info("error when unmarhal error data", "body", string(body))
rsp.Message = fmt.Sprintf("<<can't get more details, in error: error: %s, body: %s>>", err, body)
return rsp
}

return rsp
}

// NewErrorMsg returns an orchestrator error with extra msg
func NewErrorMsg(msg string, path string) *Error {
return &Error{
HTTPStatus: 0,
Message: msg,
Path: path,
}
}

// IsNotFound checks if the given error is orchestrator error and it's cluster not found.
func IsNotFound(err error) bool {
if orcErr, ok := err.(*Error); ok {
if strings.Contains(orcErr.Message, "Unable to determine cluster name") {
AMecea marked this conversation as resolved.
Show resolved Hide resolved
return true
}

// When querying for instances orchestrator returns the following error message when the
// replica cannot be reached.
// https://github.com/github/orchestrator/blob/151029a103429fe16123b9842d1a5b4b175bd5d5/go/http/api.go#L184
if strings.Contains(orcErr.Message, "Cannot read instance") {
return true
}
}
return false
}
50 changes: 44 additions & 6 deletions pkg/orchestrator/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type OrcFakeClient struct {

Discovered []InstanceKey

lock *sync.Mutex
lock *sync.Mutex
reachable bool
}

var nextID int64
Expand All @@ -53,7 +54,8 @@ const (
// New fake orchestrator client
func New() *OrcFakeClient {
return &OrcFakeClient{
lock: &sync.Mutex{},
lock: &sync.Mutex{},
reachable: true,
}
}

Expand All @@ -65,6 +67,11 @@ func (o *OrcFakeClient) Reset() {
o.Discovered = []InstanceKey{}
}

// MakeOrcUnreachable makes every function return an error
func (o *OrcFakeClient) MakeOrcUnreachable() {
o.reachable = false
}

// AddInstance add a instance to orchestrator client
func (o *OrcFakeClient) AddInstance(instance Instance) *Instance {
o.lock.Lock()
Expand Down Expand Up @@ -176,6 +183,10 @@ func (o *OrcFakeClient) getHostClusterAlias(host string) string {

// Discover register a host into orchestrator
func (o *OrcFakeClient) Discover(host string, port int) error {
if !o.reachable {
return NewErrorMsg("can't connect to orc", "/")
}

o.Discovered = append(o.Discovered, InstanceKey{
Hostname: host,
Port: port,
Expand All @@ -199,6 +210,9 @@ func (o *OrcFakeClient) Discover(host string, port int) error {

// Forget removes a host from orchestrator
func (o *OrcFakeClient) Forget(host string, port int) error {
if !o.reachable {
return NewErrorMsg("can't connect to orc", "/")
}
// determine cluster name
cluster := o.getHostClusterAlias(host)
o.RemoveInstance(cluster, host)
Expand All @@ -210,26 +224,34 @@ func (o *OrcFakeClient) Master(clusterHint string) (*Instance, error) {
o.lock.Lock()
defer o.lock.Unlock()

if !o.reachable {
return nil, NewErrorMsg("can't connect to orc", "/")
}

insts, ok := o.Clusters[clusterHint]
if !ok {
return nil, fmt.Errorf("not found")
return nil, NewErrorMsg("Unable to determine cluster name", "/master")
}
for _, inst := range insts {
if !inst.ReadOnly {
return inst, nil
}
}
return nil, fmt.Errorf("[faker] master not found")
return nil, NewErrorMsg("Unable to determine master", "/master")
}

// Cluster returns the list of instances from a cluster
func (o *OrcFakeClient) Cluster(cluster string) ([]Instance, error) {
o.lock.Lock()
defer o.lock.Unlock()

if !o.reachable {
return nil, NewErrorMsg("can't connect to orc", "/")
}

instsPointers, ok := o.Clusters[cluster]
if !ok {
return nil, fmt.Errorf("not found")
return nil, NewErrorMsg("Unable to determine cluster name", "/cluster")
}

insts := []Instance{}
Expand All @@ -245,9 +267,13 @@ func (o *OrcFakeClient) AuditRecovery(cluster string) ([]TopologyRecovery, error
o.lock.Lock()
defer o.lock.Unlock()

if !o.reachable {
return nil, NewErrorMsg("can't connect to orc", "/")
}

recoveries, ok := o.Recoveries[cluster]
if !ok {
return nil, fmt.Errorf("not found")
return nil, NewErrorMsg("Unable to determine cluster name", "/audit-recovery")
}

return recoveries, nil
Expand All @@ -258,6 +284,10 @@ func (o *OrcFakeClient) AckRecovery(id int64, comment string) error {
o.lock.Lock()
defer o.lock.Unlock()

if !o.reachable {
return NewErrorMsg("can't connect to orc", "/")
}

o.AckRec = append(o.AckRec, id)
return nil
}
Expand All @@ -267,6 +297,10 @@ func (o *OrcFakeClient) SetHostWritable(key InstanceKey) error {
o.lock.Lock()
defer o.lock.Unlock()

if !o.reachable {
return NewErrorMsg("can't connect to orc", "/")
}

for _, instances := range o.Clusters {
for _, instance := range instances {
if instance.Key.Hostname == key.Hostname {
Expand All @@ -283,6 +317,10 @@ func (o *OrcFakeClient) SetHostReadOnly(key InstanceKey) error {
o.lock.Lock()
defer o.lock.Unlock()

if !o.reachable {
return NewErrorMsg("can't connect to orc", "/")
}

for _, instances := range o.Clusters {
for _, instance := range instances {
if instance.Key.Hostname == key.Hostname {
Expand Down
54 changes: 5 additions & 49 deletions pkg/orchestrator/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,71 +29,27 @@ import (

var log = logf.Log.WithName("orchestrator.client")

type orcError struct {
HTTPStatus int
Path string
Message string
Details interface{}
}

func (e orcError) Error() string {
return fmt.Sprintf("[orc]: status: %d path: %s msg: %s, details: %v",
e.HTTPStatus, e.Path, e.Message, e.Details)
}

// NewOrcError returns a specific orchestrator error with extra details
func NewOrcError(resp *http.Response, path string, details interface{}) error {
rsp := orcError{
HTTPStatus: resp.StatusCode,
Path: path,
Details: details,
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
rsp.Message = "Can't read body"
return rsp
}

if err = json.Unmarshal(body, &rsp); err != nil {
log.V(1).Info("error when unmarhal error data", "body", string(body))
rsp.Message = fmt.Sprintf("can't get more details, in error: error: %s, body: %s", err, body)
return rsp
}

return rsp
}

// NewOrcErrorMsg returns an orchestrator error with extra msg
func NewOrcErrorMsg(msg string, path string) error {
return orcError{
HTTPStatus: 0,
Message: msg,
Path: path,
}
}

func (o *orchestrator) makeGetRequest(path string, out interface{}) error {
func (o *orchestrator) makeGetRequest(path string, out interface{}) *Error {
uri := fmt.Sprintf("%s/%s", o.connectURI, path)
log.V(2).Info("orchestrator request info", "uri", uri, "outobj", out)

req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return NewOrcErrorMsg(fmt.Sprintf("can't create request: %s", err.Error()), path)
return NewErrorMsg(fmt.Sprintf("can't create request: %s", err.Error()), path)
}

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return NewOrcErrorMsg(err.Error(), path)
return NewErrorMsg(err.Error(), path)
}

if resp.StatusCode >= 500 {
return NewOrcError(resp, path, nil)
return NewError(resp, path, nil)
}

if err := unmarshalJSON(resp.Body, out); err != nil {
return NewOrcError(resp, path, err)
return NewError(resp, path, err)
}

return nil
Expand Down