66	"fmt" 
77	"net/url" 
88	"os" 
9- 	"sync" 
109	"time" 
1110
1211	"cdr.dev/slog" 
@@ -28,14 +27,13 @@ var (
2827	minAgentAPIV2       =  "v2.9" 
2928)
3029
31- // Coder establishes a connection to the Coder instance located at coderURL and 
32- // authenticates using token. It then establishes a dRPC connection to the Agent 
33- // API and begins sending logs. If the version of Coder does not support the 
34- // Agent API, it will fall back to using the PatchLogs endpoint. The closer is 
35- // used to close the logger and to wait at most logSendGracePeriod for logs to 
36- // be sent. Cancelling the context will close the logs immediately without 
37- // waiting for logs to be sent. 
38- func  Coder (ctx  context.Context , coderURL  * url.URL , token  string ) (logger  Func , closer  func (), err  error ) {
30+ // Coder establishes a connection to the Coder instance located at 
31+ // coderURL and authenticates using token. It then establishes a 
32+ // dRPC connection to the Agent API and begins sending logs. 
33+ // If the version of Coder does not support the Agent API, it will 
34+ // fall back to using the PatchLogs endpoint. 
35+ // The returned function is used to block until all logs are sent. 
36+ func  Coder (ctx  context.Context , coderURL  * url.URL , token  string ) (Func , func (), error ) {
3937	// To troubleshoot issues, we need some way of logging. 
4038	metaLogger  :=  slog .Make (sloghuman .Sink (os .Stderr ))
4139	defer  metaLogger .Sync ()
@@ -46,26 +44,18 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, c
4644	}
4745	if  semver .Compare (semver .MajorMinor (bi .Version ), minAgentAPIV2 ) <  0  {
4846		metaLogger .Warn (ctx , "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API" , slog .F ("coder_version" , bi .Version ))
49- 		logger ,  closer   =  sendLogsV1 (ctx , client , metaLogger .Named ("send_logs_v1" ))
50- 		return  logger ,  closer , nil 
47+ 		sendLogs ,  flushLogs   : =  sendLogsV1 (ctx , client , metaLogger .Named ("send_logs_v1" ))
48+ 		return  sendLogs ,  flushLogs , nil 
5149	}
52- 	// Note that ctx passed to initRPC will be inherited by the 
53- 	// underlying connection, nothing we can do about that here. 
5450	dac , err  :=  initRPC (ctx , client , metaLogger .Named ("init_rpc" ))
5551	if  err  !=  nil  {
5652		// Logged externally 
5753		return  nil , nil , fmt .Errorf ("init coder rpc client: %w" , err )
5854	}
5955	ls  :=  agentsdk .NewLogSender (metaLogger .Named ("coder_log_sender" ))
6056	metaLogger .Warn (ctx , "Sending logs via AgentAPI v2" , slog .F ("coder_version" , bi .Version ))
61- 	logger , closer  =  sendLogsV2 (ctx , dac , ls , metaLogger .Named ("send_logs_v2" ))
62- 	var  closeOnce  sync.Once 
63- 	return  logger , func () {
64- 		closer ()
65- 		closeOnce .Do (func () {
66- 			_  =  dac .DRPCConn ().Close ()
67- 		})
68- 	}, nil 
57+ 	sendLogs , doneFunc  :=  sendLogsV2 (ctx , dac , ls , metaLogger .Named ("send_logs_v2" ))
58+ 	return  sendLogs , doneFunc , nil 
6959}
7060
7161type  coderLogSender  interface  {
@@ -84,7 +74,7 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client {
8474func  initRPC (ctx  context.Context , client  * agentsdk.Client , l  slog.Logger ) (proto.DRPCAgentClient20 , error ) {
8575	var  c  proto.DRPCAgentClient20 
8676	var  err  error 
87- 	retryCtx , retryCancel  :=  context .WithTimeout (ctx , rpcConnectTimeout )
77+ 	retryCtx , retryCancel  :=  context .WithTimeout (context . Background () , rpcConnectTimeout )
8878	defer  retryCancel ()
8979	attempts  :=  0 
9080	for  r  :=  retry .New (100 * time .Millisecond , time .Second ); r .Wait (retryCtx ); {
@@ -105,67 +95,65 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto
10595
10696// sendLogsV1 uses the PatchLogs endpoint to send logs. 
10797// This is deprecated, but required for backward compatibility with older versions of Coder. 
108- func  sendLogsV1 (ctx  context.Context , client  * agentsdk.Client , l  slog.Logger ) (logger   Func ,  closer  func ()) {
98+ func  sendLogsV1 (ctx  context.Context , client  * agentsdk.Client , l  slog.Logger ) (Func , func ()) {
10999	// nolint: staticcheck // required for backwards compatibility 
110- 	sendLog , flushAndClose  :=  agentsdk .LogsSender (agentsdk .ExternalLogSourceID , client .PatchLogs , slog.Logger {})
111- 	var  mu  sync.Mutex 
100+ 	sendLogs , flushLogs  :=  agentsdk .LogsSender (agentsdk .ExternalLogSourceID , client .PatchLogs , slog.Logger {})
112101	return  func (lvl  Level , msg  string , args  ... any ) {
113102			log  :=  agentsdk.Log {
114103				CreatedAt : time .Now (),
115104				Output :    fmt .Sprintf (msg , args ... ),
116105				Level :     codersdk .LogLevel (lvl ),
117106			}
118- 			mu .Lock ()
119- 			defer  mu .Unlock ()
120- 			if  err  :=  sendLog (ctx , log ); err  !=  nil  {
107+ 			if  err  :=  sendLogs (ctx , log ); err  !=  nil  {
121108				l .Warn (ctx , "failed to send logs to Coder" , slog .Error (err ))
122109			}
123110		}, func () {
124- 			ctx , cancel  :=  context .WithTimeout (ctx , logSendGracePeriod )
125- 			defer  cancel ()
126- 			if  err  :=  flushAndClose (ctx ); err  !=  nil  {
111+ 			if  err  :=  flushLogs (ctx ); err  !=  nil  {
127112				l .Warn (ctx , "failed to flush logs" , slog .Error (err ))
128113			}
129114		}
130115}
131116
132117// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9. 
133- func  sendLogsV2 (ctx  context.Context , dest  agentsdk.LogDest , ls  coderLogSender , l  slog.Logger ) (logger  Func , closer  func ()) {
134- 	sendCtx , sendCancel  :=  context .WithCancel (ctx )
118+ func  sendLogsV2 (ctx  context.Context , dest  agentsdk.LogDest , ls  coderLogSender , l  slog.Logger ) (Func , func ()) {
135119	done  :=  make (chan  struct {})
136120	uid  :=  uuid .New ()
137121	go  func () {
138122		defer  close (done )
139- 		if  err  :=  ls .SendLoop (sendCtx , dest ); err  !=  nil  {
123+ 		if  err  :=  ls .SendLoop (ctx , dest ); err  !=  nil  {
140124			if  ! errors .Is (err , context .Canceled ) {
141125				l .Warn (ctx , "failed to send logs to Coder" , slog .Error (err ))
142126			}
143127		}
128+ 
129+ 		// Wait for up to 10 seconds for logs to finish sending. 
130+ 		sendCtx , sendCancel  :=  context .WithTimeout (context .Background (), logSendGracePeriod )
131+ 		defer  sendCancel ()
132+ 		// Try once more to send any pending logs 
133+ 		if  err  :=  ls .SendLoop (sendCtx , dest ); err  !=  nil  {
134+ 			if  ! errors .Is (err , context .DeadlineExceeded ) {
135+ 				l .Warn (ctx , "failed to send remaining logs to Coder" , slog .Error (err ))
136+ 			}
137+ 		}
138+ 		ls .Flush (uid )
139+ 		if  err  :=  ls .WaitUntilEmpty (sendCtx ); err  !=  nil  {
140+ 			if  ! errors .Is (err , context .DeadlineExceeded ) {
141+ 				l .Warn (ctx , "log sender did not empty" , slog .Error (err ))
142+ 			}
143+ 		}
144144	}()
145145
146- 	var  closeOnce  sync.Once 
147- 	return  func (l  Level , msg  string , args  ... any ) {
148- 			ls .Enqueue (uid , agentsdk.Log {
149- 				CreatedAt : time .Now (),
150- 				Output :    fmt .Sprintf (msg , args ... ),
151- 				Level :     codersdk .LogLevel (l ),
152- 			})
153- 		}, func () {
154- 			closeOnce .Do (func () {
155- 				// Trigger a flush and wait for logs to be sent. 
156- 				ls .Flush (uid )
157- 				ctx , cancel  :=  context .WithTimeout (ctx , logSendGracePeriod )
158- 				defer  cancel ()
159- 				err  :=  ls .WaitUntilEmpty (ctx )
160- 				if  err  !=  nil  {
161- 					l .Warn (ctx , "log sender did not empty" , slog .Error (err ))
162- 				}
146+ 	logFunc  :=  func (l  Level , msg  string , args  ... any ) {
147+ 		ls .Enqueue (uid , agentsdk.Log {
148+ 			CreatedAt : time .Now (),
149+ 			Output :    fmt .Sprintf (msg , args ... ),
150+ 			Level :     codersdk .LogLevel (l ),
151+ 		})
152+ 	}
163153
164- 				 // Stop the send loop. 
165- 				 sendCancel () 
166- 			}) 
154+ 	doneFunc   :=   func () { 
155+ 		<- done 
156+ 	} 
167157
168- 			// Wait for the send loop to finish. 
169- 			<- done 
170- 		}
158+ 	return  logFunc , doneFunc 
171159}
0 commit comments