forked from taskcluster/taskcluster
-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
253 lines (217 loc) · 7.77 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package main
import (
"fmt"
"log"
"math"
"net/http"
"net/http/pprof"
"os"
"strconv"
"sync"
stream "github.com/taskcluster/taskcluster/v39/tools/livelog/writer"
)
const (
DEFAULT_PUT_PORT = 60022
DEFAULT_GET_PORT = 60023
)
// Run an http.Server. In production this is just `ListenAndServe`, but
// is overridden in testing to use ephemeral ports and ensure servers are
// shut down correctly.
var runServer func(server *http.Server, addr, crtFile, keyFile string) error
func abort(writer http.ResponseWriter) {
// We need to hijack and abort the request...
conn, _, err := writer.(http.Hijacker).Hijack()
if err != nil {
return
}
// Force the connection closed to signal that the response was not
// completed...
conn.Close()
}
func startLogServe(stream *stream.Stream, getAddr string) {
// Get access token from environment variable
accessToken := os.Getenv("ACCESS_TOKEN")
routes := http.NewServeMux()
routes.HandleFunc("/log/", func(w http.ResponseWriter, r *http.Request) {
log.Printf("output %s %s", r.Method, r.URL.String())
// Authenticate the request with accessToken, this is good enough because
// live logs are short-lived, we do this by slicing away '/log/' from the
// URL and comparing the reminder to the accessToken, ensuring a URL pattern
// /log/<accessToken>
if r.URL.String()[5:] != accessToken {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(401)
fmt.Fprint(w, "Access denied")
} else {
getLog(stream, w, r)
}
})
server := http.Server{
Handler: routes,
}
crtFile := os.Getenv("SERVER_CRT_FILE")
keyFile := os.Getenv("SERVER_KEY_FILE")
var err error
if crtFile != "" && keyFile != "" {
log.Printf("Output server listening... %s (with TLS)", server.Addr)
log.Printf("key %s ", keyFile)
log.Printf("crt %s ", crtFile)
err = runServer(&server, getAddr, crtFile, keyFile)
} else {
log.Printf("Output server listening... %s (without TLS)", server.Addr)
err = runServer(&server, getAddr, "", "")
}
if err != nil && err != http.ErrServerClosed {
log.Fatalf("%s", err)
}
}
// HTTP logic for serving the contents of a stream...
func getLog(
stream *stream.Stream,
writer http.ResponseWriter,
req *http.Request,
) {
// NOTE: this once attempted to support Range requests, but did so incorrectly:
//
// (a) returned bytes beginning at offset zero, even if the range did not
// begin there, when and only when those bytes had already been written to
// the backing store
// (b) did not respond with 206 Partial Content
// (c) did not respond with a Content-Range header
// (d) was tested in such a way to to not trigger bug (a) and not check for
// (b) or (c)
//
// On the concluaion that such requests are not used, support has been
// removed.
handle := stream.Observe(0, math.MaxInt64)
defer func() {
// Ensure we close our file handle...
// Ensure the stream is cleaned up after errors, etc...
stream.Unobserve(handle)
log.Print("send connection close...")
}()
// TODO: Allow the input stream to configure headers rather then assume
// intentions...
writer.Header().Set("Content-Type", "text/plain; charset=utf-8")
writer.Header().Set("Access-Control-Allow-Origin", "*")
writer.Header().Set("Access-Control-Expose-Headers", "Transfer-Encoding")
// Send headers so its clear what we are trying to do...
writer.WriteHeader(200)
log.Print("wrote headers...")
// Begin streaming any pending results...
_, writeToErr := handle.WriteTo(writer)
if writeToErr != nil {
log.Println("Error during write...", writeToErr)
abort(writer)
}
}
// Logic here mostly inspired by what docker does...
func attachProfiler(router *http.ServeMux) {
router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/heap", pprof.Handler("heap").ServeHTTP)
router.HandleFunc("/debug/pprof/goroutine", pprof.Handler("goroutine").ServeHTTP)
router.HandleFunc("/debug/pprof/threadcreate", pprof.Handler("threadcreate").ServeHTTP)
}
func main() {
// TODO: Right now this is a collection of hacks until we build out something
// nice which can handle multiple log connections. Right now the intent is to
// use this as a process per task (which has overhead) but should be fairly
// clean (memory wise) in the long run as we will terminate the process
// frequently per task run.
// portAddressOrExit is a helper function to translate a port number in an
// envronment variable into a valid address string which can be used when
// starting web service. This helper function will cause the go program to
// exit if an invalid value is specified in the environment variable.
portAddressOrExit := func(envVar string, defaultValue uint16, notANumberExitCode, outOfRangeExitCode int) (addr string) {
addr = fmt.Sprintf(":%v", defaultValue)
if port := os.Getenv(envVar); port != "" {
p, err := strconv.Atoi(port)
if err != nil {
log.Printf("env var %v is not a number (%v)", envVar, port)
os.Exit(notANumberExitCode)
}
if p < 0 || p > 65535 {
log.Printf("env var %v is not between [0, 65535] (%v)", envVar, p)
os.Exit(outOfRangeExitCode)
}
addr = ":" + port
}
return
}
putAddr := portAddressOrExit("LIVELOG_PUT_PORT", DEFAULT_PUT_PORT, 64, 65)
getAddr := portAddressOrExit("LIVELOG_GET_PORT", DEFAULT_GET_PORT, 66, 67)
runServer = func(server *http.Server, addr, crtFile, keyFile string) error {
server.Addr = addr
if crtFile != "" && keyFile != "" {
return server.ListenAndServeTLS(crtFile, keyFile)
} else {
return server.ListenAndServe()
}
}
serve(putAddr, getAddr)
}
func serve(putAddr, getAddr string) {
handlingPut := false
mutex := sync.Mutex{}
routes := http.NewServeMux()
if os.Getenv("DEBUG") != "" {
attachProfiler(routes)
}
server := http.Server{
Handler: routes,
}
// The "main" http server is for the PUT side which should not be exposed
// publicly but via links in the docker container... In the future we can
// handle something fancier.
routes.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
log.Printf("input %s %s", r.Method, r.URL.String())
if r.Method != "PUT" {
log.Print("input not put")
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("This endpoint can only handle PUT requests"))
return
}
// Threadsafe checking of the `handlingPut` flag
mutex.Lock()
if handlingPut {
log.Print("Attempt to put when in progress")
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("This endpoint can only process one http PUT at a time"))
mutex.Unlock() // used instead of defer so we don't block other rejections
return
}
mutex.Unlock() // So we don't block other rejections...
stream, streamErr := stream.NewStream(r.Body)
if streamErr != nil {
log.Printf("input stream open err %v", streamErr)
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("Could not open stream for body"))
// Allow for retries of the initial put if something goes wrong...
mutex.Lock()
handlingPut = false
mutex.Unlock()
}
// Signal initial success...
w.WriteHeader(http.StatusCreated)
// Initialize the sub server in another go routine...
log.Print("Begin consuming...")
go startLogServe(stream, getAddr)
consumeErr := stream.Consume()
if consumeErr != nil {
log.Println("Error finalizing consume of stream", consumeErr)
abort(w)
return
}
})
// Listen forever on the PUT side...
log.Printf("input server listening... %s", server.Addr)
// Main put server listens on the public root for the worker.
err := runServer(&server, putAddr, "", "")
if err != nil && err != http.ErrServerClosed {
log.Fatalf("%s", err)
}
}