From 96292ba4b644b7ca9d53c33bad60d762e47fdc65 Mon Sep 17 00:00:00 2001 From: Florian Schade Date: Tue, 8 Aug 2023 16:53:56 +0200 Subject: [PATCH 1/4] enhancement: add sse service skeleton --- .gitignore | 1 + .../unreleased/enhancement-sse-messaging.md | 10 +++ docs/services/general-info/port-ranges.md | 2 +- docs/services/sse/_index.md | 16 ++++ docs/services/sse/configuration.md | 15 ++++ ocis-pkg/config/config.go | 2 + ocis-pkg/config/defaultconfig.go | 2 + ocis/pkg/command/services.go | 10 ++- ocis/pkg/runtime/service/service.go | 11 ++- services/sse/Makefile | 37 +++++++++ services/sse/README.md | 3 + services/sse/cmd/sse/main.go | 14 ++++ services/sse/pkg/command/health.go | 62 ++++++++++++++ services/sse/pkg/command/root.go | 30 +++++++ services/sse/pkg/command/server.go | 81 +++++++++++++++++++ services/sse/pkg/command/version.go | 27 +++++++ services/sse/pkg/config/config.go | 51 ++++++++++++ .../sse/pkg/config/defaults/defaultconfig.go | 42 ++++++++++ services/sse/pkg/config/parser/parse.go | 38 +++++++++ services/sse/pkg/service/service.go | 71 ++++++++++++++++ 20 files changed, 520 insertions(+), 5 deletions(-) create mode 100644 changelog/unreleased/enhancement-sse-messaging.md create mode 100644 docs/services/sse/_index.md create mode 100644 docs/services/sse/configuration.md create mode 100644 services/sse/Makefile create mode 100644 services/sse/README.md create mode 100644 services/sse/cmd/sse/main.go create mode 100644 services/sse/pkg/command/health.go create mode 100644 services/sse/pkg/command/root.go create mode 100644 services/sse/pkg/command/server.go create mode 100644 services/sse/pkg/command/version.go create mode 100644 services/sse/pkg/config/config.go create mode 100644 services/sse/pkg/config/defaults/defaultconfig.go create mode 100644 services/sse/pkg/config/parser/parse.go create mode 100644 services/sse/pkg/service/service.go diff --git a/.gitignore b/.gitignore index 0902237f4a7..c4c08a62aa2 100644 --- a/.gitignore +++ b/.gitignore @@ -49,6 +49,7 @@ protogen/buf.sha1.lock /third-party-licenses # misc +/tmp go.work go.work.sum .env diff --git a/changelog/unreleased/enhancement-sse-messaging.md b/changelog/unreleased/enhancement-sse-messaging.md new file mode 100644 index 00000000000..31c98eb2a07 --- /dev/null +++ b/changelog/unreleased/enhancement-sse-messaging.md @@ -0,0 +1,10 @@ +Enhancement: SSE for messaging + +So far, sse has only been used to exchange messages between the server and the client. +In order to be able to send more content to the client, we have moved the endpoint to a separate service and are now also using it for other notifications like: + +* notify postprocessing state changes. +* notify file locking and unlocking. +* ... @toDo + +https://github.com/owncloud/ocis/pull/6992 diff --git a/docs/services/general-info/port-ranges.md b/docs/services/general-info/port-ranges.md index f2286dadc5a..e3baac04d54 100644 --- a/docs/services/general-info/port-ranges.md +++ b/docs/services/general-info/port-ranges.md @@ -30,7 +30,7 @@ We also suggest to use the last port in your extensions' range as a debug/metric | 9120-9124 | [graph]({{< ref "../graph/_index.md" >}}) | | 9125-9129 | [policies]({{< ref "../policies/_index.md" >}}) | | 9130-9134 | [idp]({{< ref "../idp/_index.md" >}}) | -| 9135-9139 | FREE (formerly used by graph-explorer) | +| 9135-9139 | [sse]({{< ref "../sse/_index.md" >}}) | | 9140-9141 | [frontend]({{< ref "../frontend/_index.md" >}}) | | 9142-9143 | [gateway]({{< ref "../gateway/_index.md" >}}) | | 9144-9145 | [users]({{< ref "../users/_index.md" >}}) | diff --git a/docs/services/sse/_index.md b/docs/services/sse/_index.md new file mode 100644 index 00000000000..d839d16de50 --- /dev/null +++ b/docs/services/sse/_index.md @@ -0,0 +1,16 @@ +--- +title: SSE +date: 2022-08-08T00:00:00+00:00 +weight: 20 +geekdocRepo: https://github.com/owncloud/ocis +geekdocEditPath: edit/master/docs/services/sse +geekdocFilePath: _index.md +geekdocCollapseSection: true +--- + +## Abstract + + +## Table of Contents + +{{< toc-tree >}} diff --git a/docs/services/sse/configuration.md b/docs/services/sse/configuration.md new file mode 100644 index 00000000000..902a2b55295 --- /dev/null +++ b/docs/services/sse/configuration.md @@ -0,0 +1,15 @@ +--- +title: Service Configuration +date: 2018-08-08T00:00:00+00:00 +weight: 20 +geekdocRepo: https://github.com/owncloud/ocis +geekdocEditPath: edit/master/docs/services/sse +geekdocFilePath: configuration.md +geekdocCollapseSection: true +--- + +## Example YAML Config + +{{< include file="services/_includes/app-provider-config-example.yaml" language="yaml" >}} + +{{< include file="services/_includes/app-provider_configvars.md" >}} diff --git a/ocis-pkg/config/config.go b/ocis-pkg/config/config.go index 394945fb227..bf16fd67174 100644 --- a/ocis-pkg/config/config.go +++ b/ocis-pkg/config/config.go @@ -27,6 +27,7 @@ import ( search "github.com/owncloud/ocis/v2/services/search/pkg/config" settings "github.com/owncloud/ocis/v2/services/settings/pkg/config" sharing "github.com/owncloud/ocis/v2/services/sharing/pkg/config" + sse "github.com/owncloud/ocis/v2/services/sse/pkg/config" storagepublic "github.com/owncloud/ocis/v2/services/storage-publiclink/pkg/config" storageshares "github.com/owncloud/ocis/v2/services/storage-shares/pkg/config" storagesystem "github.com/owncloud/ocis/v2/services/storage-system/pkg/config" @@ -99,6 +100,7 @@ type Config struct { Proxy *proxy.Config `yaml:"proxy"` Settings *settings.Config `yaml:"settings"` Sharing *sharing.Config `yaml:"sharing"` + SSE *sse.Config `yaml:"sse"` StorageSystem *storagesystem.Config `yaml:"storage_system"` StoragePublicLink *storagepublic.Config `yaml:"storage_public"` StorageShares *storageshares.Config `yaml:"storage_shares"` diff --git a/ocis-pkg/config/defaultconfig.go b/ocis-pkg/config/defaultconfig.go index 3afd0896c8a..bfa03aee634 100644 --- a/ocis-pkg/config/defaultconfig.go +++ b/ocis-pkg/config/defaultconfig.go @@ -26,6 +26,7 @@ import ( search "github.com/owncloud/ocis/v2/services/search/pkg/config/defaults" settings "github.com/owncloud/ocis/v2/services/settings/pkg/config/defaults" sharing "github.com/owncloud/ocis/v2/services/sharing/pkg/config/defaults" + sse "github.com/owncloud/ocis/v2/services/sse/pkg/config/defaults" storagepublic "github.com/owncloud/ocis/v2/services/storage-publiclink/pkg/config/defaults" storageshares "github.com/owncloud/ocis/v2/services/storage-shares/pkg/config/defaults" storageSystem "github.com/owncloud/ocis/v2/services/storage-system/pkg/config/defaults" @@ -72,6 +73,7 @@ func DefaultConfig() *Config { Search: search.FullDefaultConfig(), Settings: settings.DefaultConfig(), Sharing: sharing.DefaultConfig(), + SSE: sse.DefaultConfig(), StoragePublicLink: storagepublic.DefaultConfig(), StorageShares: storageshares.DefaultConfig(), StorageSystem: storageSystem.DefaultConfig(), diff --git a/ocis/pkg/command/services.go b/ocis/pkg/command/services.go index 960abd452d1..df143b623e6 100644 --- a/ocis/pkg/command/services.go +++ b/ocis/pkg/command/services.go @@ -1,13 +1,13 @@ package command import ( + "github.com/urfave/cli/v2" + "github.com/owncloud/ocis/v2/ocis-pkg/config" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/ocis-pkg/config/parser" "github.com/owncloud/ocis/v2/ocis/pkg/command/helper" "github.com/owncloud/ocis/v2/ocis/pkg/register" - "github.com/urfave/cli/v2" - antivirus "github.com/owncloud/ocis/v2/services/antivirus/pkg/command" appprovider "github.com/owncloud/ocis/v2/services/app-provider/pkg/command" appregistry "github.com/owncloud/ocis/v2/services/app-registry/pkg/command" @@ -33,6 +33,7 @@ import ( search "github.com/owncloud/ocis/v2/services/search/pkg/command" settings "github.com/owncloud/ocis/v2/services/settings/pkg/command" sharing "github.com/owncloud/ocis/v2/services/sharing/pkg/command" + sse "github.com/owncloud/ocis/v2/services/sse/pkg/command" storagepubliclink "github.com/owncloud/ocis/v2/services/storage-publiclink/pkg/command" storageshares "github.com/owncloud/ocis/v2/services/storage-shares/pkg/command" storagesystem "github.com/owncloud/ocis/v2/services/storage-system/pkg/command" @@ -172,6 +173,11 @@ var svccmds = []register.Command{ cfg.Sharing.Commons = cfg.Commons }) }, + func(cfg *config.Config) *cli.Command { + return ServiceCommand(cfg, cfg.SSE.Service.Name, sse.GetCommands(cfg.SSE), func(c *config.Config) { + cfg.SSE.Commons = cfg.Commons + }) + }, func(cfg *config.Config) *cli.Command { return ServiceCommand(cfg, cfg.StoragePublicLink.Service.Name, storagepubliclink.GetCommands(cfg.StoragePublicLink), func(c *config.Config) { cfg.StoragePublicLink.Commons = cfg.Commons diff --git a/ocis/pkg/runtime/service/service.go b/ocis/pkg/runtime/service/service.go index 958d2d16718..027ef7e5514 100644 --- a/ocis/pkg/runtime/service/service.go +++ b/ocis/pkg/runtime/service/service.go @@ -15,6 +15,8 @@ import ( "github.com/mohae/deepcopy" "github.com/olekukonko/tablewriter" + "github.com/thejerf/suture/v4" + ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/ocis-pkg/shared" @@ -42,6 +44,7 @@ import ( search "github.com/owncloud/ocis/v2/services/search/pkg/command" settings "github.com/owncloud/ocis/v2/services/settings/pkg/command" sharing "github.com/owncloud/ocis/v2/services/sharing/pkg/command" + sse "github.com/owncloud/ocis/v2/services/sse/pkg/command" storagepublic "github.com/owncloud/ocis/v2/services/storage-publiclink/pkg/command" storageshares "github.com/owncloud/ocis/v2/services/storage-shares/pkg/command" storageSystem "github.com/owncloud/ocis/v2/services/storage-system/pkg/command" @@ -53,7 +56,6 @@ import ( web "github.com/owncloud/ocis/v2/services/web/pkg/command" webdav "github.com/owncloud/ocis/v2/services/webdav/pkg/command" webfinger "github.com/owncloud/ocis/v2/services/webfinger/pkg/command" - "github.com/thejerf/suture/v4" ) var ( @@ -298,11 +300,16 @@ func NewService(options ...Option) (*Service, error) { cfg.Sharing.Commons = cfg.Commons return sharing.Execute(cfg.Sharing) }) + dreg(opts.Config.SSE.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error { + cfg.SSE.Context = ctx + cfg.SSE.Commons = cfg.Commons + return sse.Execute(cfg.SSE) + }) return s, nil } -// Start an rpc service. By default the package scope Start will run all default services to provide with a working +// Start a rpc service. By default, the package scope Start will run all default services to provide with a working // oCIS instance. func Start(o ...Option) error { // Start the runtime. Most likely this was called ONLY by the `ocis server` subcommand, but since we cannot protect diff --git a/services/sse/Makefile b/services/sse/Makefile new file mode 100644 index 00000000000..5381f8cfafa --- /dev/null +++ b/services/sse/Makefile @@ -0,0 +1,37 @@ +SHELL := bash +NAME := sse + +include ../../.make/recursion.mk + +############ tooling ############ +ifneq (, $(shell command -v go 2> /dev/null)) # suppress `command not found warnings` for non go targets in CI +include ../../.bingo/Variables.mk +endif + +############ go tooling ############ +include ../../.make/go.mk + +############ release ############ +include ../../.make/release.mk + +############ docs generate ############ +include ../../.make/docs.mk + +.PHONY: docs-generate +docs-generate: config-docs-generate + +############ generate ############ +include ../../.make/generate.mk + +.PHONY: ci-go-generate +ci-go-generate: # CI runs ci-node-generate automatically before this target + +.PHONY: ci-node-generate +ci-node-generate: + +############ licenses ############ +.PHONY: ci-node-check-licenses +ci-node-check-licenses: + +.PHONY: ci-node-save-licenses +ci-node-save-licenses: diff --git a/services/sse/README.md b/services/sse/README.md new file mode 100644 index 00000000000..4c0aabfc0dd --- /dev/null +++ b/services/sse/README.md @@ -0,0 +1,3 @@ +# SSE + +@todo diff --git a/services/sse/cmd/sse/main.go b/services/sse/cmd/sse/main.go new file mode 100644 index 00000000000..8bd438f1148 --- /dev/null +++ b/services/sse/cmd/sse/main.go @@ -0,0 +1,14 @@ +package main + +import ( + "os" + + "github.com/owncloud/ocis/v2/services/sse/pkg/command" + "github.com/owncloud/ocis/v2/services/sse/pkg/config/defaults" +) + +func main() { + if err := command.Execute(defaults.DefaultConfig()); err != nil { + os.Exit(1) + } +} diff --git a/services/sse/pkg/command/health.go b/services/sse/pkg/command/health.go new file mode 100644 index 00000000000..195dceade24 --- /dev/null +++ b/services/sse/pkg/command/health.go @@ -0,0 +1,62 @@ +package command + +import ( + "fmt" + "net/http" + + "github.com/owncloud/ocis/v2/ocis-pkg/log" + + "github.com/urfave/cli/v2" + + "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" + "github.com/owncloud/ocis/v2/services/sse/pkg/config/parser" +) + +// Health is the entrypoint for the health command. +func Health(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "health", + Usage: "check health status", + Category: "info", + Before: func(c *cli.Context) error { + return configlog.ReturnError(parser.ParseConfig(cfg)) + }, + Action: func(c *cli.Context) error { + logger := log.NewLogger( + log.Name(cfg.Service.Name), + log.Level(cfg.Log.Level), + log.Pretty(cfg.Log.Pretty), + log.Color(cfg.Log.Color), + log.File(cfg.Log.File), + ) + + resp, err := http.Get( + fmt.Sprintf( + "http://%s/healthz", + cfg.Debug.Addr, + ), + ) + + if err != nil { + logger.Fatal(). + Err(err). + Msg("Failed to request health check") + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + logger.Fatal(). + Int("code", resp.StatusCode). + Msg("Health seems to be in bad state") + } + + logger.Debug(). + Int("code", resp.StatusCode). + Msg("Health got a good state") + + return nil + }, + } +} diff --git a/services/sse/pkg/command/root.go b/services/sse/pkg/command/root.go new file mode 100644 index 00000000000..f3a6e4b2769 --- /dev/null +++ b/services/sse/pkg/command/root.go @@ -0,0 +1,30 @@ +package command + +import ( + "os" + + "github.com/urfave/cli/v2" + + "github.com/owncloud/ocis/v2/ocis-pkg/clihelper" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" +) + +// GetCommands provides all commands for this service +func GetCommands(cfg *config.Config) cli.Commands { + return []*cli.Command{ + Server(cfg), + Health(cfg), + Version(cfg), + } +} + +// Execute is the entry point for the sse command. +func Execute(cfg *config.Config) error { + app := clihelper.DefaultApp(&cli.App{ + Name: "sse", + Usage: "Serve ownCloud sse for oCIS", + Commands: GetCommands(cfg), + }) + + return app.Run(os.Args) +} diff --git a/services/sse/pkg/command/server.go b/services/sse/pkg/command/server.go new file mode 100644 index 00000000000..7d3d8c32ab8 --- /dev/null +++ b/services/sse/pkg/command/server.go @@ -0,0 +1,81 @@ +package command + +import ( + "context" + "fmt" + + "github.com/oklog/run" + "github.com/urfave/cli/v2" + + "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/ocis-pkg/handlers" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" + "github.com/owncloud/ocis/v2/ocis-pkg/version" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" + "github.com/owncloud/ocis/v2/services/sse/pkg/config/parser" + "github.com/owncloud/ocis/v2/services/sse/pkg/service" +) + +// Server is the entrypoint for the server command. +func Server(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "server", + Usage: fmt.Sprintf("start the %s service without runtime (unsupervised mode)", cfg.Service.Name), + Category: "server", + Before: func(c *cli.Context) error { + return configlog.ReturnFatal(parser.ParseConfig(cfg)) + }, + Action: func(c *cli.Context) error { + var ( + gr = run.Group{} + ctx, cancel = func() (context.Context, context.CancelFunc) { + if cfg.Context == nil { + return context.WithCancel(context.Background()) + } + return context.WithCancel(cfg.Context) + }() + logger = log.NewLogger( + log.Name(cfg.Service.Name), + log.Level(cfg.Log.Level), + log.Pretty(cfg.Log.Pretty), + log.Color(cfg.Log.Color), + log.File(cfg.Log.File), + ) + ) + defer cancel() + + { + svc, err := service.NewSSE(cfg, logger) + if err != nil { + return err + } + + gr.Add(svc.Run, func(_ error) { + cancel() + }) + } + + { + server := debug.NewService( + debug.Logger(logger), + debug.Name(cfg.Service.Name), + debug.Version(version.GetString()), + debug.Address(cfg.Debug.Addr), + debug.Token(cfg.Debug.Token), + debug.Pprof(cfg.Debug.Pprof), + debug.Zpages(cfg.Debug.Zpages), + debug.Health(handlers.Health), + debug.Ready(handlers.Ready), + ) + + gr.Add(server.ListenAndServe, func(_ error) { + _ = server.Shutdown(ctx) + cancel() + }) + } + + return gr.Run() + }, + } +} diff --git a/services/sse/pkg/command/version.go b/services/sse/pkg/command/version.go new file mode 100644 index 00000000000..fe3b344f0fb --- /dev/null +++ b/services/sse/pkg/command/version.go @@ -0,0 +1,27 @@ +package command + +import ( + "fmt" + + "github.com/owncloud/ocis/v2/ocis-pkg/version" + + "github.com/urfave/cli/v2" + + "github.com/owncloud/ocis/v2/services/sse/pkg/config" +) + +// Version prints the service versions of all running instances. +func Version(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "version", + Usage: "print the version of this binary and the running service instances", + Category: "info", + Action: func(c *cli.Context) error { + fmt.Println("Version: " + version.GetString()) + fmt.Printf("Compiled: %s\n", version.Compiled()) + fmt.Println("") + + return nil + }, + } +} diff --git a/services/sse/pkg/config/config.go b/services/sse/pkg/config/config.go new file mode 100644 index 00000000000..a03c63f459f --- /dev/null +++ b/services/sse/pkg/config/config.go @@ -0,0 +1,51 @@ +package config + +import ( + "context" + + "github.com/owncloud/ocis/v2/ocis-pkg/shared" +) + +// Config combines all available configuration parts. +type Config struct { + Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service + Log *Log + + Debug Debug `mask:"struct" yaml:"debug"` + + Service Service `yaml:"-"` + + Events Events + + Context context.Context `yaml:"-" json:"-"` +} + +// Service defines the available service configuration. +type Service struct { + Name string `yaml:"-"` +} + +// Log defines the available log configuration. +type Log struct { + Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;SSE_LOG_LEVEL" desc:"The log level. Valid values are: 'panic', 'fatal', 'error', 'warn', 'info', 'debug', 'trace'."` + Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;SSE_LOG_PRETTY" desc:"Activates pretty log output."` + Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;SSE_LOG_COLOR" desc:"Activates colorized log output."` + File string `mapstructure:"file" env:"OCIS_LOG_FILE;SSE_LOG_FILE" desc:"The path to the log file. Activates logging to this file if set."` +} + +// Debug defines the available debug configuration. +type Debug struct { + Addr string `yaml:"addr" env:"SSE_DEBUG_ADDR" desc:"Bind address of the debug server, where metrics, health, config and debug endpoints will be exposed."` + Token string `yaml:"token" env:"SSE_DEBUG_TOKEN" desc:"Token to secure the metrics endpoint."` + Pprof bool `yaml:"pprof" env:"SSE_DEBUG_PPROF" desc:"Enables pprof, which can be used for profiling."` + Zpages bool `yaml:"zpages" env:"SSE_DEBUG_ZPAGES" desc:"Enables zpages, which can be used for collecting and viewing in-memory traces."` +} + +// Events combines the configuration options for the event bus. +type Events struct { + Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;SSE_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."` + Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;SSE_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."` + TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;SSE_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."` + TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"OCIS_EVENTS_TLS_ROOT_CA_CERTIFICATE;SSE_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided SSE_EVENTS_TLS_INSECURE will be seen as false."` + EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;SSE_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services."` +} diff --git a/services/sse/pkg/config/defaults/defaultconfig.go b/services/sse/pkg/config/defaults/defaultconfig.go new file mode 100644 index 00000000000..c1e77c262bb --- /dev/null +++ b/services/sse/pkg/config/defaults/defaultconfig.go @@ -0,0 +1,42 @@ +package defaults + +import ( + "github.com/owncloud/ocis/v2/services/sse/pkg/config" +) + +// FullDefaultConfig returns a fully initialized default configuration which is needed for doc generation. +func FullDefaultConfig() *config.Config { + cfg := DefaultConfig() + EnsureDefaults(cfg) + Sanitize(cfg) + return cfg +} + +// DefaultConfig returns the services default config +func DefaultConfig() *config.Config { + return &config.Config{ + Debug: config.Debug{ + Addr: "127.0.0.1:9135", + Token: "", + }, + Service: config.Service{ + Name: "sse", + }, + Events: config.Events{ + Endpoint: "127.0.0.1:9233", + Cluster: "ocis-cluster", + }, + } +} + +// EnsureDefaults adds default values to the configuration if they are not set yet +func EnsureDefaults(cfg *config.Config) { + if cfg.Log == nil { + cfg.Log = &config.Log{} + } +} + +// Sanitize sanitizes the configuration +func Sanitize(cfg *config.Config) { + +} diff --git a/services/sse/pkg/config/parser/parse.go b/services/sse/pkg/config/parser/parse.go new file mode 100644 index 00000000000..dc9375210de --- /dev/null +++ b/services/sse/pkg/config/parser/parse.go @@ -0,0 +1,38 @@ +package parser + +import ( + "errors" + + ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" + "github.com/owncloud/ocis/v2/services/sse/pkg/config/defaults" + + "github.com/owncloud/ocis/v2/ocis-pkg/config/envdecode" +) + +// ParseConfig loads configuration from known paths. +func ParseConfig(cfg *config.Config) error { + _, err := ociscfg.BindSourcesToStructs(cfg.Service.Name, cfg) + if err != nil { + return err + } + + defaults.EnsureDefaults(cfg) + + // load all env variables relevant to the config in the current context. + if err := envdecode.Decode(cfg); err != nil { + // no environment variable set for this config is an expected "error" + if !errors.Is(err, envdecode.ErrNoTargetFieldsAreSet) { + return err + } + } + + defaults.Sanitize(cfg) + + return Validate(cfg) +} + +// Validate validates our little config +func Validate(cfg *config.Config) error { + return nil +} diff --git a/services/sse/pkg/service/service.go b/services/sse/pkg/service/service.go new file mode 100644 index 00000000000..72be9d020a8 --- /dev/null +++ b/services/sse/pkg/service/service.go @@ -0,0 +1,71 @@ +package service + +import ( + "bytes" + "crypto/x509" + "fmt" + "io" + "net/http" + "os" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/events/stream" + "github.com/cs3org/reva/v2/pkg/rhttp" + + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" +) + +// NewSSE returns a service implementation for Service. +func NewSSE(c *config.Config, l log.Logger) (SSE, error) { + s := SSE{c: c, l: l, client: rhttp.GetHTTPClient(rhttp.Insecure(true))} + + return s, nil +} + +// SSE defines implements the business logic for Service. +type SSE struct { + c *config.Config + l log.Logger + m uint64 + + client *http.Client +} + +// Run runs the service +func (s SSE) Run() error { + evtsCfg := s.c.Events + + var rootCAPool *x509.CertPool + if evtsCfg.TLSRootCACertificate != "" { + rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) + if err != nil { + return err + } + + var certBytes bytes.Buffer + if _, err := io.Copy(&certBytes, rootCrtFile); err != nil { + return err + } + + rootCAPool = x509.NewCertPool() + rootCAPool.AppendCertsFromPEM(certBytes.Bytes()) + evtsCfg.TLSInsecure = false + } + + natsStream, err := stream.NatsFromConfig(stream.NatsConfig(s.c.Events)) + if err != nil { + return err + } + + ch, err := events.Consume(natsStream, "sse", events.StartPostprocessingStep{}) + if err != nil { + return err + } + + for e := range ch { + fmt.Println(e) // todo + } + + return nil +} From 3eb66d8dac4c793cbf2bfa0351b96974caf5ba54 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Thu, 17 Aug 2023 15:51:38 +0200 Subject: [PATCH 2/4] more functionality for the sse service Signed-off-by: jkoberg --- .drone.star | 1 + Makefile | 1 + services/sse/README.md | 6 +- services/sse/pkg/command/server.go | 28 ++++- services/sse/pkg/config/config.go | 29 ++++- .../sse/pkg/config/defaults/defaultconfig.go | 51 ++++++++- services/sse/pkg/config/tracing.go | 21 ++++ services/sse/pkg/server/http/option.go | 76 +++++++++++++ services/sse/pkg/server/http/server.go | 96 ++++++++++++++++ services/sse/pkg/service/service.go | 104 ++++++++++-------- services/userlog/README.md | 4 - 11 files changed, 363 insertions(+), 54 deletions(-) create mode 100644 services/sse/pkg/config/tracing.go create mode 100644 services/sse/pkg/server/http/option.go create mode 100644 services/sse/pkg/server/http/server.go diff --git a/.drone.star b/.drone.star index 951690839d1..dcf05705a0b 100644 --- a/.drone.star +++ b/.drone.star @@ -78,6 +78,7 @@ config = { "services/search", "services/settings", "services/sharing", + "services/sse", "services/storage-system", "services/storage-publiclink", "services/storage-shares", diff --git a/Makefile b/Makefile index 92d5b88573e..8e648ffc237 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,7 @@ OCIS_MODULES = \ services/search \ services/settings \ services/sharing \ + services/sse \ services/storage-system \ services/storage-publiclink \ services/storage-shares \ diff --git a/services/sse/README.md b/services/sse/README.md index 4c0aabfc0dd..ac5e43ae415 100644 --- a/services/sse/README.md +++ b/services/sse/README.md @@ -1,3 +1,7 @@ # SSE -@todo +The `sse` service is responsible for sending sse (Server-Sent Events) to a user. See [What is Server-Sent Events](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for a simple introduction and examples of server sent events. + +## Subscribing + +Clients can subscribe to the `/sse` endpoint to be informed by the server when an event happens. The `sse` endpoint will respect language changes of the user without needing to reconnect. Note that SSE has a limitation of six open connections per browser which can be reached if one has opened various tabs of the Web UI pointing to the same Infinite Scale instance. diff --git a/services/sse/pkg/command/server.go b/services/sse/pkg/command/server.go index 7d3d8c32ab8..b5da4063da4 100644 --- a/services/sse/pkg/command/server.go +++ b/services/sse/pkg/command/server.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/oklog/run" "github.com/urfave/cli/v2" @@ -11,12 +13,18 @@ import ( "github.com/owncloud/ocis/v2/ocis-pkg/handlers" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" + "github.com/owncloud/ocis/v2/ocis-pkg/tracing" "github.com/owncloud/ocis/v2/ocis-pkg/version" "github.com/owncloud/ocis/v2/services/sse/pkg/config" "github.com/owncloud/ocis/v2/services/sse/pkg/config/parser" - "github.com/owncloud/ocis/v2/services/sse/pkg/service" + "github.com/owncloud/ocis/v2/services/sse/pkg/server/http" ) +// all events we care about +var _registeredEvents = []events.Unmarshaller{ + events.SendSSE{}, +} + // Server is the entrypoint for the server command. func Server(cfg *config.Config) *cli.Command { return &cli.Command{ @@ -45,13 +53,27 @@ func Server(cfg *config.Config) *cli.Command { ) defer cancel() + tracerProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name) + if err != nil { + return err + } + { - svc, err := service.NewSSE(cfg, logger) + natsStream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) if err != nil { return err } - gr.Add(svc.Run, func(_ error) { + server, err := http.Server( + http.Logger(logger), + http.Context(ctx), + http.Config(cfg), + http.Consumer(natsStream), + http.RegisteredEvents(_registeredEvents), + http.TracerProvider(tracerProvider), + ) + + gr.Add(server.Run, func(_ error) { cancel() }) } diff --git a/services/sse/pkg/config/config.go b/services/sse/pkg/config/config.go index a03c63f459f..b7e9509c485 100644 --- a/services/sse/pkg/config/config.go +++ b/services/sse/pkg/config/config.go @@ -11,11 +11,14 @@ type Config struct { Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service Log *Log - Debug Debug `mask:"struct" yaml:"debug"` + Debug Debug `mask:"struct" yaml:"debug"` + Tracing *Tracing `yaml:"tracing"` Service Service `yaml:"-"` - Events Events + Events Events + HTTP HTTP `yaml:"http"` + TokenManager *TokenManager `yaml:"token_manager"` Context context.Context `yaml:"-" json:"-"` } @@ -49,3 +52,25 @@ type Events struct { TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"OCIS_EVENTS_TLS_ROOT_CA_CERTIFICATE;SSE_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided SSE_EVENTS_TLS_INSECURE will be seen as false."` EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;SSE_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services."` } + +// CORS defines the available cors configuration. +type CORS struct { + AllowedOrigins []string `yaml:"allow_origins" env:"OCIS_CORS_ALLOW_ORIGINS;SSE_CORS_ALLOW_ORIGINS" desc:"A comma-separated list of allowed CORS origins. See following chapter for more details: *Access-Control-Allow-Origin* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Origin"` + AllowedMethods []string `yaml:"allow_methods" env:"OCIS_CORS_ALLOW_METHODS;SSE_CORS_ALLOW_METHODS" desc:"A comma-separated list of allowed CORS methods. See following chapter for more details: *Access-Control-Request-Method* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Request-Method"` + AllowedHeaders []string `yaml:"allow_headers" env:"OCIS_CORS_ALLOW_HEADERS;SSE_CORS_ALLOW_HEADERS" desc:"A blank or comma-separated list of allowed CORS headers. See following chapter for more details: *Access-Control-Request-Headers* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Request-Headers."` + AllowCredentials bool `yaml:"allow_credentials" env:"OCIS_CORS_ALLOW_CREDENTIALS;SSE_CORS_ALLOW_CREDENTIALS" desc:"Allow credentials for CORS.See following chapter for more details: *Access-Control-Allow-Credentials* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Credentials."` +} + +// HTTP defines the available http configuration. +type HTTP struct { + Addr string `yaml:"addr" env:"SSE_HTTP_ADDR" desc:"The bind address of the HTTP service."` + Namespace string `yaml:"-"` + Root string `yaml:"root" env:"SSE_HTTP_ROOT" desc:"Subdirectory that serves as the root for this HTTP service."` + CORS CORS `yaml:"cors"` + TLS shared.HTTPServiceTLS `yaml:"tls"` +} + +// TokenManager is the config for using the reva token manager +type TokenManager struct { + JWTSecret string `yaml:"jwt_secret" env:"OCIS_JWT_SECRET;SSE_JWT_SECRET" desc:"The secret to mint and validate jwt tokens."` +} diff --git a/services/sse/pkg/config/defaults/defaultconfig.go b/services/sse/pkg/config/defaults/defaultconfig.go index c1e77c262bb..0aba9557fc3 100644 --- a/services/sse/pkg/config/defaults/defaultconfig.go +++ b/services/sse/pkg/config/defaults/defaultconfig.go @@ -1,6 +1,8 @@ package defaults import ( + "strings" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" ) @@ -26,17 +28,64 @@ func DefaultConfig() *config.Config { Endpoint: "127.0.0.1:9233", Cluster: "ocis-cluster", }, + HTTP: config.HTTP{ + Addr: "127.0.0.1:0", + Root: "/", + Namespace: "com.owncloud.sse", + CORS: config.CORS{ + AllowedOrigins: []string{"*"}, + AllowedMethods: []string{"GET"}, + AllowedHeaders: []string{"Authorization", "Origin", "Content-Type", "Accept", "X-Requested-With", "X-Request-Id", "Ocs-Apirequest"}, + AllowCredentials: true, + }, + }, } } // EnsureDefaults adds default values to the configuration if they are not set yet func EnsureDefaults(cfg *config.Config) { - if cfg.Log == nil { + // provide with defaults for shared logging, since we need a valid destination address for "envdecode". + if cfg.Log == nil && cfg.Commons != nil && cfg.Commons.Log != nil { + cfg.Log = &config.Log{ + Level: cfg.Commons.Log.Level, + Pretty: cfg.Commons.Log.Pretty, + Color: cfg.Commons.Log.Color, + File: cfg.Commons.Log.File, + } + } else if cfg.Log == nil { cfg.Log = &config.Log{} } + + if cfg.TokenManager == nil && cfg.Commons != nil && cfg.Commons.TokenManager != nil { + cfg.TokenManager = &config.TokenManager{ + JWTSecret: cfg.Commons.TokenManager.JWTSecret, + } + } else if cfg.TokenManager == nil { + cfg.TokenManager = &config.TokenManager{} + } + + if cfg.Commons != nil { + cfg.HTTP.TLS = cfg.Commons.HTTPServiceTLS + } + + // provide with defaults for shared tracing, since we need a valid destination address for "envdecode". + if cfg.Tracing == nil && cfg.Commons != nil && cfg.Commons.Tracing != nil { + cfg.Tracing = &config.Tracing{ + Enabled: cfg.Commons.Tracing.Enabled, + Type: cfg.Commons.Tracing.Type, + Endpoint: cfg.Commons.Tracing.Endpoint, + Collector: cfg.Commons.Tracing.Collector, + } + } else if cfg.Tracing == nil { + cfg.Tracing = &config.Tracing{} + } } // Sanitize sanitizes the configuration func Sanitize(cfg *config.Config) { + // sanitize config + if cfg.HTTP.Root != "/" { + cfg.HTTP.Root = strings.TrimSuffix(cfg.HTTP.Root, "/") + } } diff --git a/services/sse/pkg/config/tracing.go b/services/sse/pkg/config/tracing.go new file mode 100644 index 00000000000..5a177c3f594 --- /dev/null +++ b/services/sse/pkg/config/tracing.go @@ -0,0 +1,21 @@ +package config + +import "github.com/owncloud/ocis/v2/ocis-pkg/tracing" + +// Tracing defines the available tracing configuration. +type Tracing struct { + Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;SSE_TRACING_ENABLED" desc:"Activates tracing."` + Type string `yaml:"type" env:"OCIS_TRACING_TYPE;SSE_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."` + Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;SSE_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."` + Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;SSE_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."` +} + +// Convert Tracing to the tracing package's Config struct. +func (t Tracing) Convert() tracing.Config { + return tracing.Config{ + Enabled: t.Enabled, + Type: t.Type, + Endpoint: t.Endpoint, + Collector: t.Collector, + } +} diff --git a/services/sse/pkg/server/http/option.go b/services/sse/pkg/server/http/option.go new file mode 100644 index 00000000000..3640ba9ea90 --- /dev/null +++ b/services/sse/pkg/server/http/option.go @@ -0,0 +1,76 @@ +package http + +import ( + "context" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/services/sse/pkg/config" + "go.opentelemetry.io/otel/trace" +) + +// Option defines a single option function. +type Option func(o *Options) + +// Options defines the available options for this package. +type Options struct { + Logger log.Logger + Context context.Context + Config *config.Config + Consumer events.Consumer + RegisteredEvents []events.Unmarshaller + TracerProvider trace.TracerProvider +} + +// newOptions initializes the available default options. +func newOptions(opts ...Option) Options { + opt := Options{} + + for _, o := range opts { + o(&opt) + } + + return opt +} + +// Logger provides a function to set the logger option. +func Logger(val log.Logger) Option { + return func(o *Options) { + o.Logger = val + } +} + +// Context provides a function to set the context option. +func Context(val context.Context) Option { + return func(o *Options) { + o.Context = val + } +} + +// Config provides a function to set the config option. +func Config(val *config.Config) Option { + return func(o *Options) { + o.Config = val + } +} + +// Consumer provides a function to configure the consumer +func Consumer(consumer events.Consumer) Option { + return func(o *Options) { + o.Consumer = consumer + } +} + +// RegisteredEvents provides a function to register events +func RegisteredEvents(evs []events.Unmarshaller) Option { + return func(o *Options) { + o.RegisteredEvents = evs + } +} + +// TracerProvider provides a function to set the TracerProvider option +func TracerProvider(val trace.TracerProvider) Option { + return func(o *Options) { + o.TracerProvider = val + } +} diff --git a/services/sse/pkg/server/http/server.go b/services/sse/pkg/server/http/server.go new file mode 100644 index 00000000000..430ae0db3d3 --- /dev/null +++ b/services/sse/pkg/server/http/server.go @@ -0,0 +1,96 @@ +package http + +import ( + "fmt" + + stdhttp "net/http" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/go-chi/chi/v5" + chimiddleware "github.com/go-chi/chi/v5/middleware" + "github.com/owncloud/ocis/v2/ocis-pkg/account" + "github.com/owncloud/ocis/v2/ocis-pkg/cors" + "github.com/owncloud/ocis/v2/ocis-pkg/middleware" + "github.com/owncloud/ocis/v2/ocis-pkg/service/http" + "github.com/owncloud/ocis/v2/ocis-pkg/tracing" + "github.com/owncloud/ocis/v2/ocis-pkg/version" + svc "github.com/owncloud/ocis/v2/services/sse/pkg/service" + "github.com/riandyrn/otelchi" + "go-micro.dev/v4" +) + +// Service is the service interface +type Service interface { +} + +// Server initializes the http service and server. +func Server(opts ...Option) (http.Service, error) { + options := newOptions(opts...) + + service, err := http.NewService( + http.TLSConfig(options.Config.HTTP.TLS), + http.Logger(options.Logger), + http.Namespace(options.Config.HTTP.Namespace), + http.Name(options.Config.Service.Name), + http.Version(version.GetString()), + http.Address(options.Config.HTTP.Addr), + http.Context(options.Context), + ) + if err != nil { + options.Logger.Error(). + Err(err). + Msg("Error initializing http service") + return http.Service{}, fmt.Errorf("could not initialize http service: %w", err) + } + + middlewares := []func(stdhttp.Handler) stdhttp.Handler{ + chimiddleware.RequestID, + middleware.Version( + "userlog", + version.GetString(), + ), + middleware.Logger( + options.Logger, + ), + middleware.ExtractAccountUUID( + account.Logger(options.Logger), + account.JWTSecret(options.Config.TokenManager.JWTSecret), + ), + middleware.Cors( + cors.Logger(options.Logger), + cors.AllowedOrigins(options.Config.HTTP.CORS.AllowedOrigins), + cors.AllowedMethods(options.Config.HTTP.CORS.AllowedMethods), + cors.AllowedHeaders(options.Config.HTTP.CORS.AllowedHeaders), + cors.AllowCredentials(options.Config.HTTP.CORS.AllowCredentials), + ), + middleware.Secure, + } + + mux := chi.NewMux() + mux.Use(middlewares...) + + mux.Use( + otelchi.Middleware( + "sse", + otelchi.WithChiRoutes(mux), + otelchi.WithTracerProvider(options.TracerProvider), + otelchi.WithPropagators(tracing.GetPropagator()), + ), + ) + + ch, err := events.Consume(options.Consumer, "sse", options.RegisteredEvents...) + if err != nil { + return http.Service{}, err + } + + handle, err := svc.NewSSE(options.Config, options.Logger, ch, mux) + if err != nil { + return http.Service{}, err + } + + if err := micro.RegisterHandler(service.Server(), handle); err != nil { + return http.Service{}, err + } + + return service, nil +} diff --git a/services/sse/pkg/service/service.go b/services/sse/pkg/service/service.go index 72be9d020a8..e0043f2a867 100644 --- a/services/sse/pkg/service/service.go +++ b/services/sse/pkg/service/service.go @@ -1,71 +1,89 @@ package service import ( - "bytes" - "crypto/x509" - "fmt" - "io" "net/http" - "os" + revactx "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/events" - "github.com/cs3org/reva/v2/pkg/events/stream" - "github.com/cs3org/reva/v2/pkg/rhttp" + "github.com/go-chi/chi/v5" + "github.com/r3labs/sse/v2" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/sse/pkg/config" ) +// SSE defines implements the business logic for Service. +type SSE struct { + c *config.Config + l log.Logger + m *chi.Mux + sse *sse.Server + evChannel <-chan events.Event +} + // NewSSE returns a service implementation for Service. -func NewSSE(c *config.Config, l log.Logger) (SSE, error) { - s := SSE{c: c, l: l, client: rhttp.GetHTTPClient(rhttp.Insecure(true))} +func NewSSE(c *config.Config, l log.Logger, ch <-chan events.Event, mux *chi.Mux) (SSE, error) { + s := SSE{ + c: c, + l: l, + m: mux, + sse: sse.New(), + evChannel: ch, + } + mux.Route("/ocs/v2.php/apps/notifications/api/v1/notifications", func(r chi.Router) { + r.Get("/sse", s.HandleSSE) + }) + + go s.ListenForEvents() return s, nil } -// SSE defines implements the business logic for Service. -type SSE struct { - c *config.Config - l log.Logger - m uint64 - - client *http.Client +// ServeHTTP fulfills Handler interface +func (s SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.m.ServeHTTP(w, r) } -// Run runs the service -func (s SSE) Run() error { - evtsCfg := s.c.Events - - var rootCAPool *x509.CertPool - if evtsCfg.TLSRootCACertificate != "" { - rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) - if err != nil { - return err +// ListenForEvents listens for events +func (s SSE) ListenForEvents() error { + for e := range s.evChannel { + switch ev := e.Event.(type) { + default: + s.l.Error().Interface("event", ev).Msg("unhandled event") + case events.SendSSE: + s.sse.Publish(ev.UserID, &sse.Event{ + Event: []byte(ev.Type), + Data: ev.Message, + }) } + } - var certBytes bytes.Buffer - if _, err := io.Copy(&certBytes, rootCrtFile); err != nil { - return err - } + return nil +} - rootCAPool = x509.NewCertPool() - rootCAPool.AppendCertsFromPEM(certBytes.Bytes()) - evtsCfg.TLSInsecure = false +// HandleSSE is the GET handler for events +func (s SSE) HandleSSE(w http.ResponseWriter, r *http.Request) { + u, ok := revactx.ContextGetUser(r.Context()) + if !ok { + s.l.Error().Msg("sse: no user in context") + w.WriteHeader(http.StatusInternalServerError) + return } - natsStream, err := stream.NatsFromConfig(stream.NatsConfig(s.c.Events)) - if err != nil { - return err + uid := u.GetId().GetOpaqueId() + if uid == "" { + s.l.Error().Msg("sse: user in context is broken") + w.WriteHeader(http.StatusInternalServerError) + return } - ch, err := events.Consume(natsStream, "sse", events.StartPostprocessingStep{}) - if err != nil { - return err - } + stream := s.sse.CreateStream(uid) + stream.AutoReplay = false - for e := range ch { - fmt.Println(e) // todo - } + // add stream to URL + q := r.URL.Query() + q.Set("stream", uid) + r.URL.RawQuery = q.Encode() - return nil + s.sse.ServeHTTP(w, r) } diff --git a/services/userlog/README.md b/services/userlog/README.md index 474fcd07f76..aaaf18f4091 100644 --- a/services/userlog/README.md +++ b/services/userlog/README.md @@ -30,10 +30,6 @@ For the time being, the configuration which user related events are of interest The `userlog` service provides an API to retrieve configured events. For now, this API is mostly following the [oc10 notification GET API](https://doc.owncloud.com/server/next/developer_manual/core/apis/ocs-notification-endpoint-v1.html#get-user-notifications). -## Subscribing - -Additionally to the oc10 API, the `userlog` service also provides an `/sse` (Server-Sent Events) endpoint to be informed by the server when an event happens. See [What is Server-Sent Events](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for a simple introduction and examples of server sent events. The `sse` endpoint will respect language changes of the user without needing to reconnect. Note that SSE has a limitation of six open connections per browser which can be reached if one has opened various tabs of the Web UI pointing to the same Infinite Scale instance. - ## Posting The userlog service is able to store global messages that will be displayed in the Web UI to all users. If a user deletes the message in the Web UI, it reappears on reload. Global messages use the endpoint `/ocs/v2.php/apps/notifications/api/v1/notifications/global` and are activated by sending a `POST` request. Note that sending another `POST` request of the same type overwrites the previous one. For the time being, only the type `deprovision` is supported. From 1bfdc430548bb653c3a9015116fb4caa4ea9e769 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Fri, 18 Aug 2023 11:05:03 +0200 Subject: [PATCH 3/4] reroute sse path to sse service Signed-off-by: jkoberg --- services/proxy/pkg/config/defaults/defaultconfig.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/services/proxy/pkg/config/defaults/defaultconfig.go b/services/proxy/pkg/config/defaults/defaultconfig.go index e8a93e0de52..4c561e66e5e 100644 --- a/services/proxy/pkg/config/defaults/defaultconfig.go +++ b/services/proxy/pkg/config/defaults/defaultconfig.go @@ -122,6 +122,11 @@ func DefaultPolicies() []config.Policy { Endpoint: "/archiver", Service: "com.owncloud.web.frontend", }, + { + // reroute oc10 notifications endpoint to userlog service + Endpoint: "/ocs/v2.php/apps/notifications/api/v1/notifications/sse", + Service: "com.owncloud.sse.sse", + }, { // reroute oc10 notifications endpoint to userlog service Endpoint: "/ocs/v2.php/apps/notifications/api/v1/notifications", From 91176db30d009af2d3724cc0e99e3a31fde8d2f5 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Fri, 18 Aug 2023 11:20:32 +0200 Subject: [PATCH 4/4] adjust userlog service Signed-off-by: jkoberg --- services/userlog/pkg/command/server.go | 4 +-- services/userlog/pkg/config/config.go | 2 +- services/userlog/pkg/server/http/option.go | 8 +++--- services/userlog/pkg/server/http/server.go | 2 +- services/userlog/pkg/service/http.go | 27 -------------------- services/userlog/pkg/service/options.go | 8 +++--- services/userlog/pkg/service/service.go | 25 ++++++++---------- services/userlog/pkg/service/service_test.go | 14 ++++++---- 8 files changed, 31 insertions(+), 59 deletions(-) diff --git a/services/userlog/pkg/command/server.go b/services/userlog/pkg/command/server.go index 6002bf4f3e5..3ec49220cdb 100644 --- a/services/userlog/pkg/command/server.go +++ b/services/userlog/pkg/command/server.go @@ -81,7 +81,7 @@ func Server(cfg *config.Config) *cli.Command { defer cancel() - consumer, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) + stream, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) if err != nil { return err } @@ -121,7 +121,7 @@ func Server(cfg *config.Config) *cli.Command { http.Config(cfg), http.Metrics(mtrcs), http.Store(st), - http.Consumer(consumer), + http.Stream(stream), http.GatewaySelector(gatewaySelector), http.History(hClient), http.Value(vClient), diff --git a/services/userlog/pkg/config/config.go b/services/userlog/pkg/config/config.go index 11e81ccab65..782fb079172 100644 --- a/services/userlog/pkg/config/config.go +++ b/services/userlog/pkg/config/config.go @@ -28,7 +28,7 @@ type Config struct { Events Events `yaml:"events"` Persistence Persistence `yaml:"persistence"` - DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE,USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer be able to connect to the sse endpoint."` + DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE,USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer receive sse notifications."` GlobalNotificationsSecret string `yaml:"global_notifications_secret" env:"USERLOG_GLOBAL_NOTIFICATIONS_SECRET" desc:"The secret to secure the global notifications endpoint. Only system admins and users knowing that secret can call the global notifications POST/DELETE endpoints."` diff --git a/services/userlog/pkg/server/http/option.go b/services/userlog/pkg/server/http/option.go index 02ffe0d6deb..44443d57bd4 100644 --- a/services/userlog/pkg/server/http/option.go +++ b/services/userlog/pkg/server/http/option.go @@ -28,7 +28,7 @@ type Options struct { Flags []cli.Flag Namespace string Store store.Store - Consumer events.Consumer + Stream events.Stream GatewaySelector pool.Selectable[gateway.GatewayAPIClient] HistoryClient ehsvc.EventHistoryService ValueClient settingssvc.ValueService @@ -97,10 +97,10 @@ func Store(store store.Store) Option { } } -// Consumer provides a function to configure the consumer -func Consumer(consumer events.Consumer) Option { +// Stream provides a function to configure the stream +func Stream(stream events.Stream) Option { return func(o *Options) { - o.Consumer = consumer + o.Stream = stream } } diff --git a/services/userlog/pkg/server/http/server.go b/services/userlog/pkg/server/http/server.go index 4adcaec018a..1766f1d566e 100644 --- a/services/userlog/pkg/server/http/server.go +++ b/services/userlog/pkg/server/http/server.go @@ -80,7 +80,7 @@ func Server(opts ...Option) (http.Service, error) { handle, err := svc.NewUserlogService( svc.Logger(options.Logger), - svc.Consumer(options.Consumer), + svc.Stream(options.Stream), svc.Mux(mux), svc.Store(options.Store), svc.Config(options.Config), diff --git a/services/userlog/pkg/service/http.go b/services/userlog/pkg/service/http.go index c689e18eb04..74167e4219f 100644 --- a/services/userlog/pkg/service/http.go +++ b/services/userlog/pkg/service/http.go @@ -91,33 +91,6 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request w.Write(b) } -// HandleSSE is the GET handler for events -func (ul *UserlogService) HandleSSE(w http.ResponseWriter, r *http.Request) { - u, ok := revactx.ContextGetUser(r.Context()) - if !ok { - ul.log.Error().Msg("sse: no user in context") - w.WriteHeader(http.StatusInternalServerError) - return - } - - uid := u.GetId().GetOpaqueId() - if uid == "" { - ul.log.Error().Msg("sse: user in context is broken") - w.WriteHeader(http.StatusInternalServerError) - return - } - - stream := ul.sse.CreateStream(uid) - stream.AutoReplay = false - - // add stream to URL - q := r.URL.Query() - q.Set("stream", uid) - r.URL.RawQuery = q.Encode() - - ul.sse.ServeHTTP(w, r) -} - // HandlePostGlobaelEvent is the POST handler for global events func (ul *UserlogService) HandlePostGlobalEvent(w http.ResponseWriter, r *http.Request) { var req PostEventsRequest diff --git a/services/userlog/pkg/service/options.go b/services/userlog/pkg/service/options.go index a4422926df8..9cd9f97ec01 100644 --- a/services/userlog/pkg/service/options.go +++ b/services/userlog/pkg/service/options.go @@ -19,7 +19,7 @@ type Option func(*Options) // Options for the userlog service type Options struct { Logger log.Logger - Consumer events.Consumer + Stream events.Stream Mux *chi.Mux Store store.Store Config *config.Config @@ -38,10 +38,10 @@ func Logger(log log.Logger) Option { } } -// Consumer configures an event consumer for the userlog service -func Consumer(c events.Consumer) Option { +// Stream configures an event stream for the userlog service +func Stream(s events.Stream) Option { return func(o *Options) { - o.Consumer = c + o.Stream = s } } diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index dd82fae18d7..7ebee777684 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -26,7 +26,6 @@ import ( settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" "github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" - "github.com/r3labs/sse/v2" micrometadata "go-micro.dev/v4/metadata" "go-micro.dev/v4/store" "go.opentelemetry.io/otel/trace" @@ -42,10 +41,10 @@ type UserlogService struct { historyClient ehsvc.EventHistoryService gatewaySelector pool.Selectable[gateway.GatewayAPIClient] valueClient settingssvc.ValueService - sse *sse.Server registeredEvents map[string]events.Unmarshaller tp trace.TracerProvider tracer trace.Tracer + publisher events.Publisher } // NewUserlogService returns an EventHistory service @@ -55,11 +54,11 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { opt(o) } - if o.Consumer == nil || o.Store == nil { - return nil, fmt.Errorf("need non nil consumer (%v) and store (%v) to work properly", o.Consumer, o.Store) + if o.Stream == nil || o.Store == nil { + return nil, fmt.Errorf("need non nil stream (%v) and store (%v) to work properly", o.Stream, o.Store) } - ch, err := events.Consume(o.Consumer, "userlog", o.RegisteredEvents...) + ch, err := events.Consume(o.Stream, "userlog", o.RegisteredEvents...) if err != nil { return nil, err } @@ -75,10 +74,7 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { registeredEvents: make(map[string]events.Unmarshaller), tp: o.TraceProvider, tracer: o.TraceProvider.Tracer("github.com/owncloud/ocis/services/userlog/pkg/service"), - } - - if !ul.cfg.DisableSSE { - ul.sse = sse.New() + publisher: o.Stream, } for _, e := range o.RegisteredEvents { @@ -97,10 +93,6 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) { r.Delete("/", ul.HandleDeleteEvents) r.Post("/global", RequireAdminOrSecret(&m, o.Config.GlobalNotificationsSecret)(ul.HandlePostGlobalEvent)) r.Delete("/global", RequireAdminOrSecret(&m, o.Config.GlobalNotificationsSecret)(ul.HandleDeleteGlobalEvent)) - - if !ul.cfg.DisableSSE { - r.Get("/sse", ul.HandleSSE) - } }) go ul.MemorizeEvents(ch) @@ -348,8 +340,11 @@ func (ul *UserlogService) sendSSE(userid string, event events.Event) error { return err } - ul.sse.Publish(userid, &sse.Event{Data: b}) - return nil + return events.Publish(context.Background(), ul.publisher, events.SendSSE{ + UserID: userid, + Type: "userlog-notification", + Message: b, + }) } func (ul *UserlogService) removeExpiredEvents(userid string, all []string, received []*ehmsg.Event) error { diff --git a/services/userlog/pkg/service/service_test.go b/services/userlog/pkg/service/service_test.go index 6c79b5f6b35..cc34f9c0c42 100644 --- a/services/userlog/pkg/service/service_test.go +++ b/services/userlog/pkg/service/service_test.go @@ -79,7 +79,7 @@ var _ = Describe("UserlogService", func() { ul, err = service.NewUserlogService( service.Config(cfg), - service.Consumer(bus), + service.Stream(bus), service.Store(sto), service.Logger(log.NewLogger()), service.Mux(chi.NewMux()), @@ -96,9 +96,9 @@ var _ = Describe("UserlogService", func() { It("it stores, returns and deletes a couple of events", func() { ids := make(map[string]struct{}) - ids[bus.Publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{} - ids[bus.Publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{} - ids[bus.Publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{} + ids[bus.publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{} + ids[bus.publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{} + ids[bus.publish(events.SpaceDisabled{Executant: &user.UserId{OpaqueId: "executinguserid"}})] = struct{}{} // ids[bus.Publish(events.SpaceMembershipExpired{SpaceOwner: &user.UserId{OpaqueId: "userid"}})] = struct{}{} // ids[bus.Publish(events.ShareCreated{Executant: &user.UserId{OpaqueId: "userid"}})] = struct{}{} @@ -156,7 +156,11 @@ func (tb testBus) Consume(_ string, _ ...microevents.ConsumeOption) (<-chan micr return ch, nil } -func (tb testBus) Publish(e interface{}) string { +func (tb testBus) Publish(_ string, _ interface{}, _ ...microevents.PublishOption) error { + return nil +} + +func (tb testBus) publish(e interface{}) string { ev := events.Event{ ID: uuid.New().String(), Type: reflect.TypeOf(e).String(),