Skip to content

Commit

Permalink
Merge pull request #46 from gabriel-samfira/add-log-stream
Browse files Browse the repository at this point in the history
Add log streamer
  • Loading branch information
gabriel-samfira authored Oct 21, 2022
2 parents 79087d1 + d277c0d commit 3247065
Show file tree
Hide file tree
Showing 10 changed files with 456 additions and 78 deletions.
51 changes: 48 additions & 3 deletions apiserver/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,29 @@ import (
gErrors "garm/errors"
runnerParams "garm/params"
"garm/runner"
wsWriter "garm/websocket"

"github.com/gorilla/websocket"
"github.com/pkg/errors"
)

func NewAPIController(r *runner.Runner, auth *auth.Authenticator) (*APIController, error) {
func NewAPIController(r *runner.Runner, auth *auth.Authenticator, hub *wsWriter.Hub) (*APIController, error) {
return &APIController{
r: r,
auth: auth,
hub: hub,
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 16384,
},
}, nil
}

type APIController struct {
r *runner.Runner
auth *auth.Authenticator
r *runner.Runner
auth *auth.Authenticator
hub *wsWriter.Hub
upgrader websocket.Upgrader
}

func handleError(w http.ResponseWriter, err error) {
Expand Down Expand Up @@ -124,6 +133,42 @@ func (a *APIController) CatchAll(w http.ResponseWriter, r *http.Request) {
}
}

func (a *APIController) WSHandler(writer http.ResponseWriter, req *http.Request) {
ctx := req.Context()
if !auth.IsAdmin(ctx) {
writer.WriteHeader(http.StatusForbidden)
writer.Write([]byte("you need admin level access to view logs"))
return
}

if a.hub == nil {
handleError(writer, gErrors.NewBadRequestError("log streamer is disabled"))
return
}

conn, err := a.upgrader.Upgrade(writer, req, nil)
if err != nil {
log.Printf("error upgrading to websockets: %v", err)
return
}

// TODO (gsamfira): Handle ExpiresAt. Right now, if a client uses
// a valid token to authenticate, and keeps the websocket connection
// open, it will allow that client to stream logs via websockets
// until the connection is broken. We need to forcefully disconnect
// the client once the token expires.
client, err := wsWriter.NewClient(conn, a.hub)
if err != nil {
log.Printf("failed to create new client: %v", err)
return
}
if err := a.hub.Register(client); err != nil {
log.Printf("failed to register new client: %v", err)
return
}
client.Go()
}

// NotFoundHandler is returned when an invalid URL is acccessed
func (a *APIController) NotFoundHandler(w http.ResponseWriter, r *http.Request) {
apiErr := params.APIErrorResponse{
Expand Down
139 changes: 70 additions & 69 deletions apiserver/routers/routers.go

Large diffs are not rendered by default.

99 changes: 99 additions & 0 deletions cmd/garm-cli/cmd/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package cmd

import (
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"time"

apiParams "garm/apiserver/params"

"github.com/gorilla/websocket"
"github.com/spf13/cobra"
)

var logCmd = &cobra.Command{
Use: "debug-log",
SilenceUsage: true,
Short: "Stream garm log",
Long: `Stream all garm logging to the terminal.`,
RunE: func(cmd *cobra.Command, args []string) error {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)

parsedURL, err := url.Parse(mgr.BaseURL)
if err != nil {
return err
}

wsScheme := "ws"
if parsedURL.Scheme == "https" {
wsScheme = "wss"
}
u := url.URL{Scheme: wsScheme, Host: parsedURL.Host, Path: "/api/v1/ws"}
log.Printf("connecting to %s", u.String())

header := http.Header{}
header.Add("Authorization", fmt.Sprintf("Bearer %s", mgr.Token))

c, response, err := websocket.DefaultDialer.Dial(u.String(), header)
if err != nil {
var resp apiParams.APIErrorResponse
var msg string
if err := json.NewDecoder(response.Body).Decode(&resp); err == nil {
msg = resp.Details
}
log.Fatalf("failed to stream logs: %s (%s)", msg, response.Status)
}
defer c.Close()

done := make(chan struct{})

go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Printf("read: %q", err)
return
}
log.Print(string(message))
}
}()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-done:
return nil
case t := <-ticker.C:
err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
if err != nil {
return err
}
case <-interrupt:
// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
return err
}
select {
case <-done:
case <-time.After(time.Second):
}
return nil
}
}
},
}

func init() {
rootCmd.AddCommand(logCmd)
}
18 changes: 16 additions & 2 deletions cmd/garm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"flag"
"fmt"
"io"
"log"
"net"
"net/http"
Expand All @@ -31,6 +32,7 @@ import (
"garm/database/common"
"garm/runner"
"garm/util"
"garm/websocket"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -75,7 +77,19 @@ func main() {
if err != nil {
log.Fatalf("fetching log writer: %+v", err)
}
log.SetOutput(logWriter)

var writers []io.Writer = []io.Writer{
logWriter,
}
var hub *websocket.Hub
if cfg.Default.EnableLogStreamer {
hub = websocket.NewHub(ctx)
hub.Start()
defer hub.Stop()
writers = append(writers, hub)
}

log.SetOutput(io.MultiWriter(writers...))

db, err := database.NewDatabase(ctx, cfg.Database)
if err != nil {
Expand All @@ -98,7 +112,7 @@ func main() {
}

authenticator := auth.NewAuthenticator(cfg.JWTAuth, db)
controller, err := controllers.NewAPIController(runner, authenticator)
controller, err := controllers.NewAPIController(runner, authenticator, hub)
if err != nil {
log.Fatalf("failed to create controller: %+v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var (
// DefaultConfigDir is the default path on disk to the config dir. The config
// file will probably be in the same folder, but it is not mandatory.
DefaultConfigDir = "/etc/garm"

// DefaultUserGroups are the groups the default user will be part of.
DefaultUserGroups = []string{
"sudo", "adm", "cdrom", "dialout",
Expand Down Expand Up @@ -167,7 +167,8 @@ type Default struct {
ConfigDir string `toml:"config_dir,omitempty" json:"config-dir,omitempty"`
CallbackURL string `toml:"callback_url" json:"callback-url"`
// LogFile is the location of the log file.
LogFile string `toml:"log_file,omitempty" json:"log-file"`
LogFile string `toml:"log_file,omitempty" json:"log-file"`
EnableLogStreamer bool `toml:"enable_log_streamer"`
}

func (d *Default) Validate() error {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/google/uuid v1.3.0
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/jedib0t/go-pretty/v6 v6.3.1
github.com/juju/clock v0.0.0-20220704231616-a2b96c8eeb27
github.com/juju/retry v0.0.0-20220204093819-62423bf33287
Expand All @@ -23,12 +24,12 @@ require (
golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.3.3
gorm.io/driver/sqlite v1.3.2
gorm.io/gorm v1.23.4
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0
)

require (
Expand All @@ -40,7 +41,6 @@ require (
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
Expand Down
6 changes: 6 additions & 0 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ type Runner struct {
}

func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredentials, error) {
if !auth.IsAdmin(ctx) {
return nil, runnerErrors.ErrUnauthorized
}
ret := []params.GithubCredentials{}

for _, val := range r.config.Github {
Expand All @@ -224,6 +227,9 @@ func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredential
}

func (r *Runner) ListProviders(ctx context.Context) ([]params.Provider, error) {
if !auth.IsAdmin(ctx) {
return nil, runnerErrors.ErrUnauthorized
}
ret := []params.Provider{}

for _, val := range r.providers {
Expand Down
3 changes: 3 additions & 0 deletions testdata/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ config_dir = "/etc/garm"
# Uncomment this line if you'd like to log to a file instead of standard output.
# log_file = "/tmp/runner-manager.log"

# Enable streaming logs via web sockets. Use garm-cli debug-log.
enable_log_streamer = false

[jwt_auth]
# A JWT token secret used to sign tokens.
# Obviously, this needs to be changed :).
Expand Down
Loading

0 comments on commit 3247065

Please sign in to comment.