-
Notifications
You must be signed in to change notification settings - Fork 79
/
Copy pathopampsrv.go
114 lines (97 loc) · 2.91 KB
/
opampsrv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package opampsrv
import (
"context"
"log"
"net/http"
"os"
"github.com/oklog/ulid/v2"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"github.com/open-telemetry/opamp-go/internal"
"github.com/open-telemetry/opamp-go/internal/examples/server/data"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server"
"github.com/open-telemetry/opamp-go/server/types"
)
type Server struct {
opampSrv server.OpAMPServer
agents *data.Agents
logger *Logger
}
func NewServer(agents *data.Agents) *Server {
logger := &Logger{
log.New(
log.Default().Writer(),
"[OPAMP] ",
log.Default().Flags()|log.Lmsgprefix|log.Lmicroseconds,
),
}
srv := &Server{
agents: agents,
logger: logger,
}
srv.opampSrv = server.New(logger)
return srv
}
func (srv *Server) Start() {
settings := server.StartSettings{
Settings: server.Settings{
Callbacks: server.CallbacksStruct{
OnConnectingFunc: func(request *http.Request) types.ConnectionResponse {
return types.ConnectionResponse{
Accept: true,
ConnectionCallbacks: server.ConnectionCallbacksStruct{
OnMessageFunc: srv.onMessage,
OnConnectionCloseFunc: srv.onDisconnect,
},
}
},
},
},
ListenEndpoint: "127.0.0.1:4320",
HTTPMiddleware: otelhttp.NewMiddleware("/v1/opamp"),
}
tlsConfig, err := internal.CreateServerTLSConfig(
"../../certs/certs/ca.cert.pem",
"../../certs/server_certs/server.cert.pem",
"../../certs/server_certs/server.key.pem",
)
if err != nil {
srv.logger.Debugf(context.Background(), "Could not load TLS config, working without TLS: %v", err.Error())
}
settings.TLSConfig = tlsConfig
if err := srv.opampSrv.Start(settings); err != nil {
srv.logger.Errorf(context.Background(), "OpAMP server start fail: %v", err.Error())
os.Exit(1)
}
}
func (srv *Server) Stop() {
srv.opampSrv.Stop(context.Background())
}
func (srv *Server) onDisconnect(conn types.Connection) {
srv.agents.RemoveConnection(conn)
}
func (srv *Server) onMessage(ctx context.Context, conn types.Connection, msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// Start building the response.
response := &protobufs.ServerToAgent{}
var instanceId data.InstanceId
if len(msg.InstanceUid) == 26 {
// This is an old-style ULID.
u, err := ulid.Parse(string(msg.InstanceUid))
if err != nil {
srv.logger.Errorf(ctx, "Cannot parse ULID %s: %v", string(msg.InstanceUid), err)
return response
}
instanceId = data.InstanceId(u.Bytes())
} else if len(msg.InstanceUid) == 16 {
// This is a 16 byte, new style UID.
instanceId = data.InstanceId(msg.InstanceUid)
} else {
srv.logger.Errorf(ctx, "Invalid length of msg.InstanceUid")
return response
}
agent := srv.agents.FindOrCreateAgent(instanceId, conn)
// Process the status report and continue building the response.
agent.UpdateStatus(msg, response)
// Send the response back to the Agent.
return response
}