-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathagent.go
149 lines (130 loc) · 3.55 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
)
const listenPort = 5000
type AgentHandler func(json.RawMessage) (interface{}, error)
type ReportMessage struct {
Total int
}
type Memory struct {
Checks int
Receives int
}
// register is called by ActiveWorkflow to determine metadata and options for
// the agent.
func register(json.RawMessage) (interface{}, error) {
log.Print("Register")
return map[string]interface{}{
"name": "GoCounterAgent",
"display_name": "Go Test Counter Agent",
"description": "Description goes here",
"default_options": map[string]interface{}{},
}, nil
}
// check is called by ActiveWorkflow on a regular basis if the user has
// configured a schedule for the agent. It increments the counter for the
// number of checks seen in the agent's memory and returns a message containing
// the new total count.
func check(rawParams json.RawMessage) (interface{}, error) {
log.Print("check", string(rawParams))
var params struct {
Memory Memory
}
err := json.Unmarshal(rawParams, ¶ms)
if err != nil {
return nil, err
}
memory := params.Memory
memory.Checks += 1
return map[string]interface{}{
"logs": []string{"Check done"},
"errors": []string{},
"memory": memory,
"messages": []ReportMessage{{Total: memory.Checks + memory.Receives}},
}, nil
}
// receive is called whenever the agent receives a message from another agent.
// The counter for the number of messages seen is updated in the agent state
// and a new message containing the new total count is returned.
func receive(rawParams json.RawMessage) (interface{}, error) {
fmt.Println("Receive", string(rawParams))
var params struct {
Memory Memory
Message struct {
Id int
// This agent doesn't care about anything but the message id so no
// other fields are included for unmarshalling.
}
}
err := json.Unmarshal(rawParams, ¶ms)
if err != nil {
return nil, err
}
memory := params.Memory
memory.Receives += 1
return map[string]interface{}{
"logs": []string{fmt.Sprintf("Received message %d", params.Message.Id)},
"errors": []string{},
"memory": memory,
"messages": []ReportMessage{{Total: memory.Checks + memory.Receives}},
}, nil
}
func handle(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Unmarshal the top layer of JSON to determine which remote API method is
// being called.
var input struct {
Method string
Params json.RawMessage
}
err := json.NewDecoder(req.Body).Decode(&input)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "could not parse input: %v", err)
return
}
// Find a handler for the type of request.
handler := lookupHandler(input.Method)
if handler == nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "unknown method: %q", input.Method)
return
}
// Call the handler.
out, err := handler(input.Params)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err)
return
}
// Put the result in a "result" field and convert it to JSON.
err = json.NewEncoder(w).Encode(map[string]interface{}{"result": out})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err)
return
}
}
func lookupHandler(method string) AgentHandler {
switch method {
case "register":
return register
case "check":
return check
case "receive":
return receive
default:
return nil
}
}
func main() {
http.HandleFunc("/", handle)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", listenPort), nil))
}