Skip to content

Commit

Permalink
Handle context cancellation to shutdown the exporter (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
ThisIsAreku authored Aug 21, 2024
1 parent aef192f commit 0fe3f9c
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 15 deletions.
55 changes: 48 additions & 7 deletions cmd/prometheus-plex-exporter/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package main

import (
"context"
"errors"
"net/http"
"os"
"os/signal"
"syscall"
"time"

kitlog "github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -12,33 +17,69 @@ import (
"github.com/grafana/plexporter/pkg/plex"
)

const (
MetricsServerAddr = ":9000"
)

var (
log = kitlog.NewLogfmtLogger(kitlog.NewSyncWriter(os.Stderr))
)

func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

serverAddress := os.Getenv("PLEX_SERVER")
if serverAddress == "" {
level.Error(log).Log("msg", "PLEX_SERVER environment variable must be specified")
return
os.Exit(1)
}

plexToken := os.Getenv("PLEX_TOKEN")
if plexToken == "" {
level.Error(log).Log("msg", "PLEX_TOKEN environment variable must be specified")
return
os.Exit(1)
}

server, err := plex.NewServer(serverAddress, plexToken)
if err != nil {
level.Error(log).Log("msg", err)
return
level.Error(log).Log("msg", "cannot initialize connection to plex server", "error", err)
os.Exit(1)
}
server.Listen(log)

metrics.Register(server)

http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(":9000", nil)
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
metricsServer := http.Server{
Addr: MetricsServerAddr,
Handler: mux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 120 * time.Second,
}

go func() {
level.Info(log).Log("msg", "starting metrics server on "+MetricsServerAddr)
err = metricsServer.ListenAndServe()
if !errors.Is(err, http.ErrServerClosed) {
level.Error(log).Log("msg", "cannot start metrics server", "error", err)
}
}()

exitCode := 0
err = server.Listen(ctx, log)
if err != nil {
level.Error(log).Log("msg", "cannot listen to plex server events", "error", err)
exitCode = 1
}

level.Debug(log).Log("msg", "shutting down metrics server")
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
defer shutdownCancel()
if err := metricsServer.Shutdown(shutdownCtx); err != nil {
level.Error(log).Log("msg", "cannot gracefully shutdown metrics server", "error", err)
}

os.Exit(exitCode)
}
42 changes: 37 additions & 5 deletions pkg/plex/listener.go
Original file line number Diff line number Diff line change
@@ -1,54 +1,86 @@
package plex

import (
"context"
"errors"
"fmt"
"os"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/websocket"
"github.com/jrudio/go-plex-client"
)

var (
ErrAlreadyListening = errors.New("already listening")
)

type plexListener struct {
server *Server
conn *plex.Plex
activeSessions *sessions
log log.Logger
}

func (s *Server) Listen(log log.Logger) error {
func (s *Server) Listen(ctx context.Context, log log.Logger) error {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.listener != nil {
s.mtx.Unlock()
return ErrAlreadyListening
}

conn, err := plex.New(s.URL.String(), s.Token)
if err != nil {
s.mtx.Unlock()
return fmt.Errorf("failed to connect to %s: %w", s.URL.String(), err)
}

// TODO: Gracefully close any previous listener
s.listener = &plexListener{
server: s,
conn: conn,
activeSessions: NewSessions(s),
activeSessions: NewSessions(ctx, s),
log: log,
}

s.mtx.Unlock()

// forward context completion to jrudio/go-plex-client
ctrlC := make(chan os.Signal, 1)
go func() {
<-ctx.Done()
close(ctrlC)
}()

doneChan := make(chan error, 1)
onError := func(err error) {
defer close(doneChan)
var closeErr *websocket.CloseError
if errors.As(err, &closeErr) {
if closeErr.Code == websocket.CloseNormalClosure {
return
}
}
level.Error(log).Log("msg", "error in websocket processing", "err", err)
doneChan <- err
}

events := plex.NewNotificationEvents()
events.OnPlaying(s.listener.onPlayingHandler)

// TODO - Does this automatically reconnect on websocket failure?
conn.SubscribeToNotifications(events, ctrlC, onError)
select { // SubscribeToNotifications doesn't return error directly, so we read one from channel without blocking.
case err = <-doneChan:
return err
default:
// noop
}

level.Info(log).Log("msg", "Successfully connected", "machineID", s.ID, "server", s.Name)

return nil
return <-doneChan
}

func getSessionByID(sessions plex.CurrentSessions, sessionID string) *plex.Metadata {
Expand Down
13 changes: 10 additions & 3 deletions pkg/plex/sessions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package plex

import (
"context"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -43,16 +44,22 @@ type sessions struct {
totalEstimatedTransmittedKBits float64
}

func NewSessions(server *Server) *sessions {
func NewSessions(ctx context.Context, server *Server) *sessions {
s := &sessions{
sessions: map[string]session{},
server: server,
}

ticker := time.NewTicker(time.Minute)
go func() {
for range ticker.C {
s.pruneOldSessions()
for {
select {
case <-ticker.C:
s.pruneOldSessions()
case <-ctx.Done():
ticker.Stop()
return
}
}
}()

Expand Down

0 comments on commit 0fe3f9c

Please sign in to comment.