66 "fmt"
77 "net/url"
88 "os"
9+ "sync"
910 "time"
1011
1112 "cdr.dev/slog"
@@ -27,13 +28,14 @@ var (
2728 minAgentAPIV2 = "v2.9"
2829)
2930
30- // Coder establishes a connection to the Coder instance located at
31- // coderURL and authenticates using token. It then establishes a
32- // dRPC connection to the Agent API and begins sending logs.
33- // If the version of Coder does not support the Agent API, it will
34- // fall back to using the PatchLogs endpoint.
35- // The returned function is used to block until all logs are sent.
36- func Coder (ctx context.Context , coderURL * url.URL , token string ) (Func , func (), error ) {
31+ // Coder establishes a connection to the Coder instance located at coderURL and
32+ // authenticates using token. It then establishes a dRPC connection to the Agent
33+ // API and begins sending logs. If the version of Coder does not support the
34+ // Agent API, it will fall back to using the PatchLogs endpoint. The closer is
35+ // used to close the logger and to wait at most logSendGracePeriod for logs to
36+ // be sent. Cancelling the context will close the logs immediately without
37+ // waiting for logs to be sent.
38+ func Coder (ctx context.Context , coderURL * url.URL , token string ) (logger Func , closer func (), err error ) {
3739 // To troubleshoot issues, we need some way of logging.
3840 metaLogger := slog .Make (sloghuman .Sink (os .Stderr ))
3941 defer metaLogger .Sync ()
@@ -44,18 +46,20 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(),
4446 }
4547 if semver .Compare (semver .MajorMinor (bi .Version ), minAgentAPIV2 ) < 0 {
4648 metaLogger .Warn (ctx , "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API" , slog .F ("coder_version" , bi .Version ))
47- sendLogs , flushLogs : = sendLogsV1 (ctx , client , metaLogger .Named ("send_logs_v1" ))
48- return sendLogs , flushLogs , nil
49+ logger , closer = sendLogsV1 (ctx , client , metaLogger .Named ("send_logs_v1" ))
50+ return logger , closer , nil
4951 }
52+ // Note that ctx passed to initRPC will be inherited by the
53+ // underlying connection, nothing we can do about that here.
5054 dac , err := initRPC (ctx , client , metaLogger .Named ("init_rpc" ))
5155 if err != nil {
5256 // Logged externally
5357 return nil , nil , fmt .Errorf ("init coder rpc client: %w" , err )
5458 }
5559 ls := agentsdk .NewLogSender (metaLogger .Named ("coder_log_sender" ))
5660 metaLogger .Warn (ctx , "Sending logs via AgentAPI v2" , slog .F ("coder_version" , bi .Version ))
57- sendLogs , doneFunc : = sendLogsV2 (ctx , dac , ls , metaLogger .Named ("send_logs_v2" ))
58- return sendLogs , doneFunc , nil
61+ logger , closer = sendLogsV2 (ctx , dac , ls , metaLogger .Named ("send_logs_v2" ))
62+ return logger , closer , nil
5963}
6064
6165type coderLogSender interface {
@@ -74,13 +78,14 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client {
7478func initRPC (ctx context.Context , client * agentsdk.Client , l slog.Logger ) (proto.DRPCAgentClient20 , error ) {
7579 var c proto.DRPCAgentClient20
7680 var err error
77- retryCtx , retryCancel := context .WithTimeout (context . Background () , rpcConnectTimeout )
81+ retryCtx , retryCancel := context .WithTimeout (ctx , rpcConnectTimeout )
7882 defer retryCancel ()
7983 attempts := 0
8084 for r := retry .New (100 * time .Millisecond , time .Second ); r .Wait (retryCtx ); {
8185 attempts ++
8286 // Maximize compatibility.
8387 c , err = client .ConnectRPC20 (ctx )
88+ l .Info (ctx , "Connecting to Coder" , slog .F ("attempt" , attempts ), slog .F ("error" , err ))
8489 if err != nil {
8590 l .Debug (ctx , "Failed to connect to Coder" , slog .F ("error" , err ), slog .F ("attempt" , attempts ))
8691 continue
@@ -95,65 +100,67 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto
95100
96101// sendLogsV1 uses the PatchLogs endpoint to send logs.
97102// This is deprecated, but required for backward compatibility with older versions of Coder.
98- func sendLogsV1 (ctx context.Context , client * agentsdk.Client , l slog.Logger ) (Func , func ()) {
103+ func sendLogsV1 (ctx context.Context , client * agentsdk.Client , l slog.Logger ) (logger Func , closer func ()) {
99104 // nolint: staticcheck // required for backwards compatibility
100- sendLogs , flushLogs := agentsdk .LogsSender (agentsdk .ExternalLogSourceID , client .PatchLogs , slog.Logger {})
105+ sendLog , flushAndClose := agentsdk .LogsSender (agentsdk .ExternalLogSourceID , client .PatchLogs , slog.Logger {})
106+ var mu sync.Mutex
101107 return func (lvl Level , msg string , args ... any ) {
102108 log := agentsdk.Log {
103109 CreatedAt : time .Now (),
104110 Output : fmt .Sprintf (msg , args ... ),
105111 Level : codersdk .LogLevel (lvl ),
106112 }
107- if err := sendLogs (ctx , log ); err != nil {
113+ mu .Lock ()
114+ defer mu .Unlock ()
115+ if err := sendLog (ctx , log ); err != nil {
108116 l .Warn (ctx , "failed to send logs to Coder" , slog .Error (err ))
109117 }
110118 }, func () {
111- if err := flushLogs (ctx ); err != nil {
119+ ctx , cancel := context .WithTimeout (ctx , logSendGracePeriod )
120+ defer cancel ()
121+ if err := flushAndClose (ctx ); err != nil {
112122 l .Warn (ctx , "failed to flush logs" , slog .Error (err ))
113123 }
114124 }
115125}
116126
117127// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9.
118- func sendLogsV2 (ctx context.Context , dest agentsdk.LogDest , ls coderLogSender , l slog.Logger ) (Func , func ()) {
128+ func sendLogsV2 (ctx context.Context , dest agentsdk.LogDest , ls coderLogSender , l slog.Logger ) (logger Func , closer func ()) {
129+ sendCtx , sendCancel := context .WithCancel (ctx )
119130 done := make (chan struct {})
120131 uid := uuid .New ()
121132 go func () {
122133 defer close (done )
123- if err := ls .SendLoop (ctx , dest ); err != nil {
134+ if err := ls .SendLoop (sendCtx , dest ); err != nil {
124135 if ! errors .Is (err , context .Canceled ) {
125136 l .Warn (ctx , "failed to send logs to Coder" , slog .Error (err ))
126137 }
127138 }
128-
129- // Wait for up to 10 seconds for logs to finish sending.
130- sendCtx , sendCancel := context .WithTimeout (context .Background (), logSendGracePeriod )
131- defer sendCancel ()
132- // Try once more to send any pending logs
133- if err := ls .SendLoop (sendCtx , dest ); err != nil {
134- if ! errors .Is (err , context .DeadlineExceeded ) {
135- l .Warn (ctx , "failed to send remaining logs to Coder" , slog .Error (err ))
136- }
137- }
138- ls .Flush (uid )
139- if err := ls .WaitUntilEmpty (sendCtx ); err != nil {
140- if ! errors .Is (err , context .DeadlineExceeded ) {
141- l .Warn (ctx , "log sender did not empty" , slog .Error (err ))
142- }
143- }
144139 }()
145140
146- logFunc := func (l Level , msg string , args ... any ) {
147- ls .Enqueue (uid , agentsdk.Log {
148- CreatedAt : time .Now (),
149- Output : fmt .Sprintf (msg , args ... ),
150- Level : codersdk .LogLevel (l ),
151- })
152- }
141+ var closeOnce sync.Once
142+ return func (l Level , msg string , args ... any ) {
143+ ls .Enqueue (uid , agentsdk.Log {
144+ CreatedAt : time .Now (),
145+ Output : fmt .Sprintf (msg , args ... ),
146+ Level : codersdk .LogLevel (l ),
147+ })
148+ }, func () {
149+ closeOnce .Do (func () {
150+ // Trigger a flush and wait for logs to be sent.
151+ ls .Flush (uid )
152+ ctx , cancel := context .WithTimeout (ctx , logSendGracePeriod )
153+ defer cancel ()
154+ err := ls .WaitUntilEmpty (ctx )
155+ if err != nil {
156+ l .Warn (ctx , "log sender did not empty" , slog .Error (err ))
157+ }
153158
154- doneFunc := func () {
155- <- done
156- }
159+ // Stop the send loop.
160+ sendCancel ()
161+ })
157162
158- return logFunc , doneFunc
163+ // Wait for the send loop to finish.
164+ <- done
165+ }
159166}
0 commit comments