Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/agoda-com/samsahai into e…
Browse files Browse the repository at this point in the history
…xporter
  • Loading branch information
npetchapan committed Apr 13, 2020
2 parents 53b39b0 + 93ceda3 commit 9dc37d5
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 18 deletions.
15 changes: 12 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ Find more configuration information in [samsahai-example](https://www.github.com
http://<minikube_ip>:<node_port>/version (e.g. http://192.168.64.14:32501/version)
http://<minikube_ip>:<node_port>/swagger/index.html# (e.g. http://192.168.64.14:32501/swagger/index.html#)
```
10. Apply team
10. Apply configuration
```
kubectl apply -f https://raw.githubusercontent.com/agoda-com/samsahai-example/master/configs/crds/config-example.yaml
```
11. Apply team
```
kubectl apply -f https://raw.githubusercontent.com/agoda-com/samsahai-example/master/configs/crds/team-example.yaml
```
Expand Down Expand Up @@ -198,9 +202,10 @@ To save the cluster resources once every upgrade component verification has fini
2. Clone project in the directory above
3. Prepare environment and export KUBECONFIG
```
make install-init
make init
make prepare-env-e2e-k3d
export KUBECONFIG=/tmp/s2h/k3s-kubeconfig
make install-crds
```
4. Run `samsahai controller` by using go build with following configurations:
```
Expand All @@ -212,7 +217,11 @@ To save the cluster resources once every upgrade component verification has fini
```
http://localhost:8080/swagger/index.html#
```
6. Apply team
6. Apply configuration
```
kubectl apply -f https://raw.githubusercontent.com/agoda-com/samsahai-example/master/configs/crds/config-example.yaml
```
7. Apply team
```
kubectl apply -f https://raw.githubusercontent.com/agoda-com/samsahai-example/master/configs/crds/team-example-local.yaml
```
Expand Down
4 changes: 3 additions & 1 deletion cmd/samsahai/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,12 @@ func startCtrlCmd() *cobra.Command {
}
},
}

defaultImage := "quay.io/samsahai/samsahai:latest"
if s2h.Version != "" {
defaultImage = "quay.io/samsahai/samsahai:" + s2h.Version
defaultImage = "quay.io/samsahai/samsahai:v" + s2h.Version
}

cmd.Flags().String(s2h.VKPodNamespace, "default", "Namespace that the controller works on.")
cmd.Flags().String(s2h.VKS2HConfigPath, "samsahai.yaml", "Samsahai configuration file path.")
cmd.Flags().String(s2h.VKClusterDomain, "cluster.local", "Internal domain of the cluster.")
Expand Down
7 changes: 6 additions & 1 deletion internal/queue/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (c *controller) RemoveAllQueues() error {
func (c *controller) add(ctx context.Context, queue *v1beta1.Queue, atTop bool) error {
queueList, err := c.list(nil)
if err != nil {
logger.Error(err, "cannot list queue")
return err
}

Expand Down Expand Up @@ -213,14 +214,15 @@ func (c *controller) list(opts *client.ListOptions) (list *v1beta1.QueueList, er
opts = &client.ListOptions{Namespace: c.namespace}
}
if err = c.client.List(context.Background(), list, opts); err != nil {
return
return list, errors.Wrapf(err, "cannot list queue with options: %+v", opts)
}
return list, nil
}

func (c *controller) SetLastOrder(q *v1beta1.Queue) error {
queueList, err := c.list(nil)
if err != nil {
logger.Error(err, "cannot list queue")
return err
}

Expand All @@ -233,6 +235,7 @@ func (c *controller) SetLastOrder(q *v1beta1.Queue) error {
func (c *controller) SetReverifyQueueAtFirst(q *v1beta1.Queue) error {
list, err := c.list(nil)
if err != nil {
logger.Error(err, "cannot list queue")
return err
}

Expand All @@ -250,6 +253,7 @@ func (c *controller) SetReverifyQueueAtFirst(q *v1beta1.Queue) error {
func (c *controller) SetRetryQueue(q *v1beta1.Queue, noOfRetry int, nextAt time.Time) error {
list, err := c.list(nil)
if err != nil {
logger.Error(err, "cannot list queue")
return err
}

Expand All @@ -269,6 +273,7 @@ func (c *controller) SetRetryQueue(q *v1beta1.Queue, noOfRetry int, nextAt time.
func (c *controller) updateQueueList(ql *v1beta1.QueueList) error {
for i := range ql.Items {
if err := c.client.Update(context.TODO(), &ql.Items[i]); err != nil {
logger.Error(err, "cannot update queue list", "queue", ql.Items[i].Name)
return errors.Wrapf(err, "cannot update queue %s in %s", ql.Items[i].Name, ql.Items[i].Namespace)
}
}
Expand Down
21 changes: 11 additions & 10 deletions internal/staging/collect_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ func (c *controller) collectResult(queue *s2hv1beta1.Queue) error {
}

// Queue will finished if type are Active promotion related
switch queue.Spec.Type {
case s2hv1beta1.QueueTypePromoteToActive, s2hv1beta1.QueueTypeDemoteFromActive, s2hv1beta1.QueueTypePreActive:
if queue.IsActivePromotionQueue() {
return c.updateQueueWithState(queue, s2hv1beta1.Finished)
}

Expand All @@ -56,10 +55,8 @@ func (c *controller) collectResult(queue *s2hv1beta1.Queue) error {
return err
}

if !queue.IsActivePromotionQueue() {
if err := c.setStableAndSendReport(queue); err != nil {
return err
}
if err := c.setStableAndSendReport(queue); err != nil {
return err
}

queue.Status.SetCondition(s2hv1beta1.QueueCleaningAfterStarted, corev1.ConditionTrue,
Expand Down Expand Up @@ -124,14 +121,14 @@ func (c *controller) createQueueHistory(q *s2hv1beta1.Queue) error {
err := c.client.Get(ctx, types.NamespacedName{Name: history.Name, Namespace: history.Namespace}, fetched)
if err != nil {
if k8serrors.IsNotFound(err) {
if err := c.client.Create(context.TODO(), history); err != nil {
logger.Error(err, "cannot create history")
if err := c.client.Create(ctx, history); err != nil {
logger.Error(err, "cannot create queuehistory")
return err
}

return nil
}
logger.Error(err, "cannot get history")
logger.Error(err, "cannot get queuehistory")
return err
}

Expand All @@ -145,6 +142,7 @@ func (c *controller) deleteQueueHistoryOutOfRange(ctx context.Context, namespace
return nil
}

logger.Error(err, "cannot list queuehistories")
return errors.Wrapf(err, "cannot list queuehistories in %s", namespace)
}

Expand All @@ -153,6 +151,7 @@ func (c *controller) deleteQueueHistoryOutOfRange(ctx context.Context, namespace
// get configuration
cfg, err := c.getConfiguration()
if err != nil {
logger.Error(err, "cannot get configuration")
return err
}

Expand All @@ -176,6 +175,7 @@ func (c *controller) deleteQueueHistoryOutOfRange(ctx context.Context, namespace
continue
}

logger.Error(err, fmt.Sprintf("cannot delete queuehistories %s", queueHists.Items[i].Name))
return errors.Wrapf(err, "cannot delete queuehistories %s", queueHists.Items[i].Name)
}
continue
Expand Down Expand Up @@ -408,7 +408,8 @@ func (c *controller) sendComponentUpgradeReport(status rpc.ComponentUpgrade_Upgr
if c.s2hClient != nil {
_, err = c.s2hClient.RunPostComponentUpgrade(ctx, comp)
if err != nil {
return errors.Wrap(err, "cannot load send component upgrade report")
logger.Error(err, "cannot send component upgrade report", "queue", queue.Spec.Name)
return errors.Wrap(err, "cannot send component upgrade report")
}
}

Expand Down
7 changes: 5 additions & 2 deletions internal/staging/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (c *controller) process() bool {
c.mtQueue.Lock()
// pick new queue
if c.currentQueue, err = c.queueCtrl.First(); err != nil {
logger.Error(err, "cannot pick the first component of queue")
c.mtQueue.Unlock()
return false
}
Expand Down Expand Up @@ -350,7 +351,8 @@ func (c *controller) cleanBefore(queue *s2hv1beta1.Queue) error {
if err != nil {
return err
} else if !isCleaned {
logger.Warn("waiting for component cleaned", "queue", queue.Name)
logger.Warn("waiting for component cleaned",
"queue", queue.Name, "state", s2hv1beta1.CleaningBefore)
time.Sleep(2 * time.Second)
return nil
}
Expand Down Expand Up @@ -399,7 +401,8 @@ func (c *controller) cleanAfter(queue *s2hv1beta1.Queue) error {
if err != nil {
return err
} else if !isCleaned {
logger.Warn("waiting for component cleaned", "queue", queue.Name)
logger.Warn("waiting for component cleaned",
"queue", queue.Name, "state", s2hv1beta1.CleaningAfter)
time.Sleep(2 * time.Second)
return nil
}
Expand Down
14 changes: 13 additions & 1 deletion internal/staging/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,19 @@ func (c *controller) deployEnvironment(queue *s2hv1beta1.Queue) error {
if err != nil {
return err
}
comp = comps[queue.Spec.Name]

var ok bool
comp, ok = comps[queue.Spec.Name]
if !ok {
// delete queue if component does not exist in config
if err := c.client.Delete(context.TODO(), queue); err != nil {
logger.Error(err, "deleting queue error")
return err
}
c.clearCurrentQueue()
return nil
}

parentComp = comp

if comp.Parent != "" {
Expand Down
39 changes: 39 additions & 0 deletions internal/staging/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/pkg/errors"
"github.com/twitchtv/twirp"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"

s2hv1beta1 "github.com/agoda-com/samsahai/api/v1beta1"
"github.com/agoda-com/samsahai/internal"
Expand Down Expand Up @@ -86,6 +88,11 @@ func (c *controller) updateQueue(queue *s2hv1beta1.Queue) error {
}

func (c *controller) deleteQueue(q *s2hv1beta1.Queue) error {
// update queue history before processing next queue
if err := c.updateQueueHistory(q); err != nil {
return errors.Wrap(err, "updating queuehistory error")
}

isDeploySuccess, isTestSuccess, isReverify := q.IsDeploySuccess(), q.IsTestSuccess(), q.IsReverify()

if isDeploySuccess && isTestSuccess && !isReverify {
Expand Down Expand Up @@ -165,3 +172,35 @@ func (c *controller) updateQueueWithState(q *s2hv1beta1.Queue, state s2hv1beta1.
func (c *controller) genReleaseName(comp *s2hv1beta1.Component) string {
return internal.GenReleaseName(c.namespace, comp.Name)
}

func (c *controller) updateQueueHistory(q *s2hv1beta1.Queue) error {
ctx := context.TODO()

qHistName := q.Status.QueueHistoryName
fetched := &s2hv1beta1.QueueHistory{}
err := c.client.Get(ctx, types.NamespacedName{Name: qHistName, Namespace: c.namespace}, fetched)
if err != nil {
if k8serrors.IsNotFound(err) {
logger.Warnf("queuehistory %s not found, creating", qHistName)
if err := c.createQueueHistory(q); err != nil {
return err
}
return nil
}

logger.Error(err, fmt.Sprintf("cannot get queuehistory: %s", qHistName))
return err
}

fetched.Spec.Queue = &s2hv1beta1.Queue{
Spec: q.Spec,
Status: q.Status,
}

if err := c.client.Update(ctx, fetched); err != nil {
logger.Error(err, fmt.Sprintf("cannot update queuehistory: %s", qHistName))
return err
}

return nil
}

0 comments on commit 9dc37d5

Please sign in to comment.