Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Logs via Websocket Server #7240

Merged
merged 5 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion beacon-chain/core/feed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ go_library(
name = "go_default_library",
srcs = ["event.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/core/feed",
visibility = ["//beacon-chain:__subpackages__"],
visibility = [
"//beacon-chain:__subpackages__",
"//shared:__subpackages__",
],
)
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/golang/snappy v0.0.1
github.com/google/gofuzz v1.1.0
github.com/google/uuid v1.1.1
github.com/gorilla/websocket v1.4.2
github.com/graph-gophers/graphql-go v0.0.0-20200309224638-dae41bde9ef9 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
Expand Down
7 changes: 6 additions & 1 deletion shared/logutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ load("@prysm//tools/go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["logutil.go"],
srcs = [
"logutil.go",
"stream.go",
],
importpath = "github.com/prysmaticlabs/prysm/shared/logutil",
visibility = ["//visibility:public"],
deps = [
"//shared/event:go_default_library",
"//shared/params:go_default_library",
"@com_github_gorilla_websocket//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
8 changes: 6 additions & 2 deletions shared/logutil/logutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (
"github.com/sirupsen/logrus"
)

func addLogWriter(w io.Writer) {
mw := io.MultiWriter(logrus.StandardLogger().Out, w)
logrus.SetOutput(mw)
}

// ConfigurePersistentLogging adds a log-to-file writer. File content is identical to stdout.
func ConfigurePersistentLogging(logFileName string) error {
logrus.WithField("logFileName", logFileName).Info("Logs will be made persistent")
Expand All @@ -18,8 +23,7 @@ func ConfigurePersistentLogging(logFileName string) error {
return err
}

mw := io.MultiWriter(os.Stdout, f)
logrus.SetOutput(mw)
addLogWriter(f)

logrus.Info("File logging initialized")
return nil
Expand Down
72 changes: 72 additions & 0 deletions shared/logutil/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package logutil

import (
"io"
"net/http"

"github.com/gorilla/websocket"
"github.com/prysmaticlabs/prysm/shared/event"
log "github.com/sirupsen/logrus"
)

// Compile time interface check.
var _ = io.Writer(&StreamServer{})

// StreamServer defines a a websocket server which can receive events from
// a feed and write them to open websocket connections.
type StreamServer struct {
feed *event.Feed
}

// NewLogStreamServer initializes a new stream server capable of
// streaming log events via a websocket connection.
func NewLogStreamServer() *StreamServer {
ss := &StreamServer{
feed: new(event.Feed),
}
addLogWriter(ss)
return ss
}

var streamUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}

// Handler for new websocket connections to stream new events received
// via an event feed as they occur.
func (ss *StreamServer) Handler(w http.ResponseWriter, r *http.Request) {
conn, err := streamUpgrader.Upgrade(w, r, nil)
if err != nil {
log.Errorf("Could not write websocket message: %v", err)
return
}

ch := make(chan []byte)
sub := ss.feed.Subscribe(ch)
defer sub.Unsubscribe()

for {
select {
case evt := <-ch:
if err := conn.WriteMessage(websocket.TextMessage, evt); err != nil {
log.Errorf("Could not write websocket message: %v", err)
}
case <-r.Context().Done():
if err := conn.WriteMessage(websocket.CloseNormalClosure, []byte("context canceled")); err != nil {
log.Error(err)
}
case err := <-sub.Err():
if err := conn.WriteMessage(websocket.CloseInternalServerErr, []byte(err.Error())); err != nil {
log.Error(err)
}
}
}
}

// Write a binary message and send over the event feed.
func (ss *StreamServer) Write(p []byte) (n int, err error) {
ss.feed.Send(p)
return len(p), nil
}
1 change: 1 addition & 0 deletions shared/prometheus/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//shared:go_default_library",
"//shared/logutil:go_default_library",
"@com_github_golang_gddo//httputil:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
Expand Down
2 changes: 2 additions & 0 deletions shared/prometheus/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/logutil"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -42,6 +43,7 @@ func NewPrometheusService(addr string, svcRegistry *shared.ServiceRegistry, addi
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/healthz", s.healthzHandler)
mux.HandleFunc("/goroutinez", s.goroutinezHandler)
mux.HandleFunc("/logs", logutil.NewLogStreamServer().Handler)

// Register additional handlers.
for _, h := range additionalHandlers {
Expand Down