Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log streamer #46

Merged
merged 4 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions apiserver/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,29 @@ import (
gErrors "garm/errors"
runnerParams "garm/params"
"garm/runner"
wsWriter "garm/websocket"

"github.com/gorilla/websocket"
"github.com/pkg/errors"
)

func NewAPIController(r *runner.Runner, auth *auth.Authenticator) (*APIController, error) {
func NewAPIController(r *runner.Runner, auth *auth.Authenticator, hub *wsWriter.Hub) (*APIController, error) {
return &APIController{
r: r,
auth: auth,
hub: hub,
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 16384,
},
}, nil
}

type APIController struct {
r *runner.Runner
auth *auth.Authenticator
r *runner.Runner
auth *auth.Authenticator
hub *wsWriter.Hub
upgrader websocket.Upgrader
}

func handleError(w http.ResponseWriter, err error) {
Expand Down Expand Up @@ -124,6 +133,42 @@ func (a *APIController) CatchAll(w http.ResponseWriter, r *http.Request) {
}
}

func (a *APIController) WSHandler(writer http.ResponseWriter, req *http.Request) {
ctx := req.Context()
if !auth.IsAdmin(ctx) {
writer.WriteHeader(http.StatusForbidden)
writer.Write([]byte("you need admin level access to view logs"))
return
}

if a.hub == nil {
handleError(writer, gErrors.NewBadRequestError("log streamer is disabled"))
return
}

conn, err := a.upgrader.Upgrade(writer, req, nil)
if err != nil {
log.Printf("error upgrading to websockets: %v", err)
return
}

// TODO (gsamfira): Handle ExpiresAt. Right now, if a client uses
// a valid token to authenticate, and keeps the websocket connection
// open, it will allow that client to stream logs via websockets
// until the connection is broken. We need to forcefully disconnect
// the client once the token expires.
client, err := wsWriter.NewClient(conn, a.hub)
if err != nil {
log.Printf("failed to create new client: %v", err)
return
}
if err := a.hub.Register(client); err != nil {
log.Printf("failed to register new client: %v", err)
return
}
client.Go()
}

// NotFoundHandler is returned when an invalid URL is acccessed
func (a *APIController) NotFoundHandler(w http.ResponseWriter, r *http.Request) {
apiErr := params.APIErrorResponse{
Expand Down
139 changes: 70 additions & 69 deletions apiserver/routers/routers.go

Large diffs are not rendered by default.

99 changes: 99 additions & 0 deletions cmd/garm-cli/cmd/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package cmd

import (
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"time"

apiParams "garm/apiserver/params"

"github.com/gorilla/websocket"
"github.com/spf13/cobra"
)

var logCmd = &cobra.Command{
Use: "debug-log",
SilenceUsage: true,
Short: "Stream garm log",
Long: `Stream all garm logging to the terminal.`,
RunE: func(cmd *cobra.Command, args []string) error {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)

parsedURL, err := url.Parse(mgr.BaseURL)
if err != nil {
return err
}

wsScheme := "ws"
if parsedURL.Scheme == "https" {
wsScheme = "wss"
}
u := url.URL{Scheme: wsScheme, Host: parsedURL.Host, Path: "/api/v1/ws"}
log.Printf("connecting to %s", u.String())

header := http.Header{}
header.Add("Authorization", fmt.Sprintf("Bearer %s", mgr.Token))

c, response, err := websocket.DefaultDialer.Dial(u.String(), header)
if err != nil {
var resp apiParams.APIErrorResponse
var msg string
if err := json.NewDecoder(response.Body).Decode(&resp); err == nil {
msg = resp.Details
}
log.Fatalf("failed to stream logs: %s (%s)", msg, response.Status)
}
defer c.Close()

done := make(chan struct{})

go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Printf("read: %q", err)
return
}
log.Print(string(message))
}
}()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-done:
return nil
case t := <-ticker.C:
err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
if err != nil {
return err
}
case <-interrupt:
// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
return err
}
select {
case <-done:
case <-time.After(time.Second):
}
return nil
}
}
},
}

func init() {
rootCmd.AddCommand(logCmd)
}
18 changes: 16 additions & 2 deletions cmd/garm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"flag"
"fmt"
"io"
"log"
"net"
"net/http"
Expand All @@ -31,6 +32,7 @@ import (
"garm/database/common"
"garm/runner"
"garm/util"
"garm/websocket"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -75,7 +77,19 @@ func main() {
if err != nil {
log.Fatalf("fetching log writer: %+v", err)
}
log.SetOutput(logWriter)

var writers []io.Writer = []io.Writer{
logWriter,
}
var hub *websocket.Hub
if cfg.Default.EnableLogStreamer {
hub = websocket.NewHub(ctx)
hub.Start()
defer hub.Stop()
writers = append(writers, hub)
}

log.SetOutput(io.MultiWriter(writers...))

db, err := database.NewDatabase(ctx, cfg.Database)
if err != nil {
Expand All @@ -98,7 +112,7 @@ func main() {
}

authenticator := auth.NewAuthenticator(cfg.JWTAuth, db)
controller, err := controllers.NewAPIController(runner, authenticator)
controller, err := controllers.NewAPIController(runner, authenticator, hub)
if err != nil {
log.Fatalf("failed to create controller: %+v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var (
// DefaultConfigDir is the default path on disk to the config dir. The config
// file will probably be in the same folder, but it is not mandatory.
DefaultConfigDir = "/etc/garm"

// DefaultUserGroups are the groups the default user will be part of.
DefaultUserGroups = []string{
"sudo", "adm", "cdrom", "dialout",
Expand Down Expand Up @@ -167,7 +167,8 @@ type Default struct {
ConfigDir string `toml:"config_dir,omitempty" json:"config-dir,omitempty"`
CallbackURL string `toml:"callback_url" json:"callback-url"`
// LogFile is the location of the log file.
LogFile string `toml:"log_file,omitempty" json:"log-file"`
LogFile string `toml:"log_file,omitempty" json:"log-file"`
EnableLogStreamer bool `toml:"enable_log_streamer"`
}

func (d *Default) Validate() error {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/google/uuid v1.3.0
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/jedib0t/go-pretty/v6 v6.3.1
github.com/juju/clock v0.0.0-20220704231616-a2b96c8eeb27
github.com/juju/retry v0.0.0-20220204093819-62423bf33287
Expand All @@ -23,12 +24,12 @@ require (
golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.3.3
gorm.io/driver/sqlite v1.3.2
gorm.io/gorm v1.23.4
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0
)

require (
Expand All @@ -40,7 +41,6 @@ require (
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
Expand Down
6 changes: 6 additions & 0 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ type Runner struct {
}

func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredentials, error) {
if !auth.IsAdmin(ctx) {
return nil, runnerErrors.ErrUnauthorized
}
ret := []params.GithubCredentials{}

for _, val := range r.config.Github {
Expand All @@ -224,6 +227,9 @@ func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredential
}

func (r *Runner) ListProviders(ctx context.Context) ([]params.Provider, error) {
if !auth.IsAdmin(ctx) {
return nil, runnerErrors.ErrUnauthorized
}
ret := []params.Provider{}

for _, val := range r.providers {
Expand Down
3 changes: 3 additions & 0 deletions testdata/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ config_dir = "/etc/garm"
# Uncomment this line if you'd like to log to a file instead of standard output.
# log_file = "/tmp/runner-manager.log"

# Enable streaming logs via web sockets. Use garm-cli debug-log.
enable_log_streamer = false

[jwt_auth]
# A JWT token secret used to sign tokens.
# Obviously, this needs to be changed :).
Expand Down
Loading