-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathrun.go
129 lines (108 loc) · 3.94 KB
/
run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package dispatcher
import (
"context"
"pack.ag/amqp"
"sync"
"time"
"github.com/lawrencegripper/ion/internal/app/dispatcher/providers" //TODO couldn't it be moved into internal/pkg ?
"github.com/lawrencegripper/ion/internal/pkg/messaging" //TODO couldn't it be moved into internal/pkg ?
"github.com/lawrencegripper/ion/internal/pkg/servicebus"
"github.com/lawrencegripper/ion/internal/pkg/types"
log "github.com/sirupsen/logrus"
)
// Run will start the dispatcher server and wait for new AMQP messages
func Run(cfg *types.Configuration) {
ctx := context.Background()
amqpConnection := servicebus.NewAmqpConnection(ctx, cfg)
handlerArgs := providers.GetSharedHandlerArgs(cfg, amqpConnection.AccessKeys)
var provider providers.Provider
if cfg.AzureBatch != nil {
log.Info("Using Azure batch provider...")
batchProvider, err := providers.NewAzureBatchProvider(cfg, handlerArgs)
if err != nil {
log.WithError(err).Panic("Couldn't create azure batch provider")
}
provider = batchProvider
} else {
log.Info("Defaulting to using Kubernetes provider...")
k8sProvider, err := providers.NewKubernetesProvider(cfg, handlerArgs)
if err != nil {
log.WithError(err).Panic("Couldn't create kubernetes provider")
}
provider = k8sProvider
}
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
for {
// Renew message locks with ServiceBus
//https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#message-renew-lock
time.Sleep(time.Duration(45) * time.Second)
activeMessages := provider.GetActiveMessages()
messagesAMQP := make([]*amqp.Message, 0, len(activeMessages))
for _, m := range activeMessages {
originalMessage := m.GetAMQPMessage()
messagesAMQP = append(messagesAMQP, originalMessage)
}
err := amqpConnection.RenewLocks(ctx, messagesAMQP)
if err != nil {
log.WithError(err).Error("failed to renew locks")
}
}
}()
go func() {
defer wg.Done()
for {
message, err := amqpConnection.Receiver.Receive(ctx)
if err != nil {
// Todo: Investigate the type of error here. If this could be triggered by a poisened message
// app shouldn't panic.
log.WithError(err).Panic("Error received dequeuing message")
}
if message == nil {
log.WithError(err).Panic("Error received dequeuing message - nil message")
}
wrapper := messaging.NewAmqpMessageWrapper(message)
contextualLogger := providers.GetLoggerForMessage(wrapper, log.NewEntry(log.StandardLogger()))
contextualLogger.Debug("message received")
if wrapper.DeliveryCount() > cfg.Job.RetryCount+1 {
contextualLogger.Error("message re-received when above retryCount. AMQP provider wrongly redelivered message.")
err := wrapper.Reject()
if err != nil {
contextualLogger.Error("error rejecting message")
}
}
err = provider.Dispatch(wrapper)
if err != nil {
contextualLogger.WithError(err).Error("Couldn't dispatch message to kubernetes provider")
}
contextualLogger.Debug("message dispatched")
queueStats, err := amqpConnection.GetQueueDepth()
if err != nil {
contextualLogger.WithError(err).Error("failed getting queue depth from listener")
}
contextualLogger.WithField("activeMessageCount", queueStats.ActiveMessageCount).WithField("deadLetteredMessageCount", queueStats.DeadLetterMessageCount).Info("listenerStats")
}
}()
go func() {
defer wg.Done()
for {
log.Debug("reconciling...")
err := provider.Reconcile()
if err != nil {
// Todo: Should this panic here? Should we tolerate a few failures (k8s upgade causing masters not to be vailable for example?)
log.WithError(err).Panic("Failed to reconcile ....")
}
log.WithField("inProgress", provider.InProgressCount()).Info("providerStats")
time.Sleep(time.Second * 15)
}
}()
wg.Wait()
//init flaeg
//flaeg := flaeg.New(rootCmd, os.Args[1:])
//run test
//if err := flaeg.Run(); err != nil {
// fmt.Printf("Error %s \n", err.Error())
//}
}