Skip to content

Commit

Permalink
Merge pull request #671 from DamnWidget/dev
Browse files Browse the repository at this point in the history
fix: fixes #654, #653 and #652
  • Loading branch information
rogeralsing authored May 26, 2022
2 parents 685ce42 + a5fc4a9 commit d33ab0e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
13 changes: 12 additions & 1 deletion cluster/clusterproviders/k8s/k8s_cluster_monitor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package k8s

import (
"time"

"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/log"
"github.com/asynkron/protoactor-go/scheduler"
Expand All @@ -18,7 +20,16 @@ func (kcm *k8sClusterMonitorActor) Receive(ctx actor.Context) { kcm.Behavior.Rec
func (kcm *k8sClusterMonitorActor) init(ctx actor.Context) {
switch r := ctx.Message().(type) {
case *RegisterMember:
if err := kcm.registerMember(ctx.ReceiveTimeout()); err != nil {
// make sure timeout is set to some meaningful value
timeout := ctx.ReceiveTimeout()
if timeout.Microseconds() == 0 {
timeout = kcm.Provider.cluster.Config.RequestTimeoutTime
if timeout.Microseconds() == 0 {
timeout = time.Second * 5 // default to 5 seconds
}
}

if err := kcm.registerMember(timeout); err != nil {
plog.Error("Failed to register service to k8s, will retry", log.Error(err))
ctx.Send(ctx.Self(), r)
return
Expand Down
30 changes: 18 additions & 12 deletions cluster/clusterproviders/k8s/k8s_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,20 @@ func (p *Provider) startWatchingCluster(timeout time.Duration) error {

plog.Debug(fmt.Sprintf("Starting to watch pods with %s", selector), log.String("selector", selector))

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

watcher, err := p.client.CoreV1().Pods(p.retrieveNamespace()).Watch(ctx, metav1.ListOptions{LabelSelector: selector, Watch: true, TimeoutSeconds: &watchTimeoutSeconds})
if err != nil {
return fmt.Errorf("unable to watch the cluster status: %w", err)
}
// error placeholder
var watcherr error

// start a new goroutine to monitor the cluster events
go func() {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

watcher, err := p.client.CoreV1().Pods(p.retrieveNamespace()).Watch(ctx, metav1.ListOptions{LabelSelector: selector, Watch: true, TimeoutSeconds: &watchTimeoutSeconds})
if err != nil {
watcherr = fmt.Errorf("unable to watch the cluster status: %w", err)
return
}

for !p.shutdown {

event, ok := <-watcher.ResultChan()
Expand Down Expand Up @@ -306,7 +310,7 @@ func (p *Provider) startWatchingCluster(timeout time.Duration) error {
}
}()

return nil
return watcherr
}

// deregister itself as a member from a k8s cluster
Expand Down Expand Up @@ -334,14 +338,16 @@ func (p *Provider) deregisterMember(timeout time.Duration) error {

// prepares a patching payload and sends it to kubernetes to replace labels
func (p *Provider) replacePodLabels(ctx context.Context, pod *v1.Pod) error {
payload := struct {
payload := []struct {
Op string `json:"op"`
Path string `json:"path"`
Value Labels `json:"value"`
}{
Op: "replace",
Path: "/metadata/labels",
Value: pod.GetLabels(),
{
Op: "replace",
Path: "/metadata/labels",
Value: pod.GetLabels(),
},
}

payloadData, err := json.Marshal(payload)
Expand Down

0 comments on commit d33ab0e

Please sign in to comment.