diff --git a/cmd/server/server.go b/cmd/server/server.go index 1ce7439aee7..7b8d8634f67 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -15,7 +15,6 @@ package main import ( - "context" "crypto/tls" "net" "net/http" @@ -264,9 +263,6 @@ func setupEvilGlobals(c *cli.Context, v store.Store, f forge.Forge) { server.Config.Services.Queue = setupQueue(c, v) server.Config.Services.Logs = logging.New() server.Config.Services.Pubsub = pubsub.New() - if err := server.Config.Services.Pubsub.Create(context.Background(), "topic/events"); err != nil { - log.Error().Err(err).Msg("could not create pubsub service") - } server.Config.Services.Registries = setupRegistryService(c, v) // TODO(1544): fix encrypted store diff --git a/server/api/stream.go b/server/api/stream.go index a4914860c7d..8eb004d9b88 100644 --- a/server/api/stream.go +++ b/server/api/stream.go @@ -79,7 +79,7 @@ func EventStreamSSE(c *gin.Context) { }() go func() { - err := server.Config.Services.Pubsub.Subscribe(ctx, "topic/events", func(m pubsub.Message) { + server.Config.Services.Pubsub.Subscribe(ctx, func(m pubsub.Message) { defer func() { obj := recover() // fix #2480 // TODO: check if it's still needed log.Trace().Msgf("pubsub subscribe recover return: %v", obj) @@ -95,10 +95,7 @@ func EventStreamSSE(c *gin.Context) { } } }) - if err != nil { - log.Error().Err(err).Msg("Subscribe failed") - } - cancel(err) + cancel(nil) }() for { diff --git a/server/config.go b/server/config.go index 59a1b73e44c..d69b2d5ef43 100644 --- a/server/config.go +++ b/server/config.go @@ -32,7 +32,7 @@ import ( var Config = struct { Services struct { - Pubsub pubsub.Publisher + Pubsub *pubsub.Publisher Queue queue.Queue Logs logging.Log Secrets model.SecretService diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index 36d7072165d..7cbdd9a82a7 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -43,7 +43,7 @@ import ( type RPC struct { forge forge.Forge queue queue.Queue - pubsub pubsub.Publisher + pubsub *pubsub.Publisher logger logging.Log store store.Store pipelineTime *prometheus.GaugeVec @@ -102,7 +102,7 @@ func (s *RPC) Extend(c context.Context, id string) error { } // Update implements the rpc.Update function -func (s *RPC) Update(c context.Context, id string, state rpc.State) error { +func (s *RPC) Update(_ context.Context, id string, state rpc.State) error { workflowID, err := strconv.ParseInt(id, 10, 64) if err != nil { return err @@ -150,9 +150,7 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error { Repo: *repo, Pipeline: *currentPipeline, }) - if err := s.pubsub.Publish(c, "topic/events", message); err != nil { - log.Error().Err(err).Msg("can not publish step list to") - } + s.pubsub.Publish(message) return nil } @@ -208,9 +206,7 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error { Repo: *repo, Pipeline: *currentPipeline, }) - if err := s.pubsub.Publish(c, "topic/events", message); err != nil { - log.Error().Err(err).Msg("can not publish step list to") - } + s.pubsub.Publish(message) }() workflow, err = pipeline.UpdateWorkflowToStatusStarted(s.store, *workflow, state) @@ -297,7 +293,7 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { } }() - if err := s.notify(c, repo, currentPipeline); err != nil { + if err := s.notify(repo, currentPipeline); err != nil { return err } @@ -399,7 +395,7 @@ func (s *RPC) updateForgeStatus(ctx context.Context, repo *model.Repo, pipeline } } -func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeline) (err error) { +func (s *RPC) notify(repo *model.Repo, pipeline *model.Pipeline) (err error) { message := pubsub.Message{ Labels: map[string]string{ "repo": repo.FullName, @@ -410,9 +406,7 @@ func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeli Repo: *repo, Pipeline: *pipeline, }) - if err := s.pubsub.Publish(c, "topic/events", message); err != nil { - log.Error().Err(err).Msgf("grpc could not notify event: '%v'", message) - } + s.pubsub.Publish(message) return nil } diff --git a/server/grpc/server.go b/server/grpc/server.go index 353c6c86a4e..44e73277c1d 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -37,7 +37,7 @@ type WoodpeckerServer struct { peer RPC } -func NewWoodpeckerServer(forge forge.Forge, queue queue.Queue, logger logging.Log, pubsub pubsub.Publisher, store store.Store) proto.WoodpeckerServer { +func NewWoodpeckerServer(forge forge.Forge, queue queue.Queue, logger logging.Log, pubsub *pubsub.Publisher, store store.Store) proto.WoodpeckerServer { pipelineTime := promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "woodpecker", Name: "pipeline_time", diff --git a/server/pipeline/cancel.go b/server/pipeline/cancel.go index bd45933b119..49ada76a47e 100644 --- a/server/pipeline/cancel.go +++ b/server/pipeline/cancel.go @@ -93,9 +93,7 @@ func Cancel(ctx context.Context, store store.Store, repo *model.Repo, user *mode if killedPipeline.Workflows, err = store.WorkflowGetTree(killedPipeline); err != nil { return err } - if err := publishToTopic(ctx, killedPipeline, repo); err != nil { - log.Error().Err(err).Msg("publishToTopic") - } + publishToTopic(killedPipeline, repo) return nil } diff --git a/server/pipeline/decline.go b/server/pipeline/decline.go index 71a48dfd67f..79838b46c76 100644 --- a/server/pipeline/decline.go +++ b/server/pipeline/decline.go @@ -24,7 +24,7 @@ import ( "github.com/woodpecker-ci/woodpecker/server/store" ) -// Decline update the status to declined for blocked pipeline because of a gated repo +// Decline updates the status to declined for blocked pipelines because of a gated repo func Decline(ctx context.Context, store store.Store, pipeline *model.Pipeline, user *model.User, repo *model.Repo) (*model.Pipeline, error) { if pipeline.Status != model.StatusBlocked { return nil, fmt.Errorf("cannot decline a pipeline with status %s", pipeline.Status) @@ -41,9 +41,7 @@ func Decline(ctx context.Context, store store.Store, pipeline *model.Pipeline, u updatePipelineStatus(ctx, pipeline, repo, user) - if err := publishToTopic(ctx, pipeline, repo); err != nil { - log.Error().Err(err).Msg("publishToTopic") - } + publishToTopic(pipeline, repo) return pipeline, nil } diff --git a/server/pipeline/start.go b/server/pipeline/start.go index 60503e55049..c6601e28fb8 100644 --- a/server/pipeline/start.go +++ b/server/pipeline/start.go @@ -61,9 +61,6 @@ func start(ctx context.Context, store store.Store, activePipeline *model.Pipelin } func publishPipeline(ctx context.Context, pipeline *model.Pipeline, repo *model.Repo, repoUser *model.User) { - if err := publishToTopic(ctx, pipeline, repo); err != nil { - log.Error().Err(err).Msg("publishToTopic") - } - + publishToTopic(pipeline, repo) updatePipelineStatus(ctx, pipeline, repo, repoUser) } diff --git a/server/pipeline/topic.go b/server/pipeline/topic.go index 02bf3ea5d44..9ec2ee9013b 100644 --- a/server/pipeline/topic.go +++ b/server/pipeline/topic.go @@ -15,7 +15,6 @@ package pipeline import ( - "context" "encoding/json" "strconv" @@ -25,7 +24,7 @@ import ( ) // publishToTopic publishes message to UI clients -func publishToTopic(c context.Context, pipeline *model.Pipeline, repo *model.Repo) (err error) { +func publishToTopic(pipeline *model.Pipeline, repo *model.Repo) { message := pubsub.Message{ Labels: map[string]string{ "repo": repo.FullName, @@ -38,5 +37,5 @@ func publishToTopic(c context.Context, pipeline *model.Pipeline, repo *model.Rep Repo: *repo, Pipeline: pipelineCopy, }) - return server.Config.Services.Pubsub.Publish(c, "topic/events", message) + server.Config.Services.Pubsub.Publish(message) } diff --git a/server/pubsub/README.md b/server/pubsub/README.md deleted file mode 100644 index 42c6c87a588..00000000000 --- a/server/pubsub/README.md +++ /dev/null @@ -1,12 +0,0 @@ -# pubusb package - -Go package provides a common interface for publish-subscriber messaging. - -## History - -This was originally published in: https://github.com/cncd/pubsub -Then it was included in: https://github.com/drone-ci/drone/cncd/pubsub - -## Documentation: - -https://godoc.org/github.com/woodpecker-ci/woodpecker/server/pubsub diff --git a/server/pubsub/pub.go b/server/pubsub/pub.go index 8d64599e098..73d2114c936 100644 --- a/server/pubsub/pub.go +++ b/server/pubsub/pub.go @@ -19,71 +19,48 @@ import ( "sync" ) -type subscriber struct { - receiver Receiver +// Message defines a published message. +type Message struct { + // ID identifies this message. + ID string `json:"id,omitempty"` + + // Data is the actual data in the entry. + Data []byte `json:"data"` + + // Labels represents the key-value pairs the entry is labeled with. + Labels map[string]string `json:"labels,omitempty"` } -type publisher struct { +// Receiver receives published messages. +type Receiver func(Message) + +type Publisher struct { sync.Mutex - topics map[string]*topic + subs map[*Receiver]struct{} } // New creates an in-memory publisher. -func New() Publisher { - return &publisher{ - topics: make(map[string]*topic), +func New() *Publisher { + return &Publisher{ + subs: make(map[*Receiver]struct{}), } } -func (p *publisher) Create(_ context.Context, dest string) error { +func (p *Publisher) Publish(message Message) { p.Lock() - _, ok := p.topics[dest] - if !ok { - t := newTopic(dest) - p.topics[dest] = t + for s := range p.subs { + go (*s)(message) } p.Unlock() - return nil -} - -func (p *publisher) Publish(_ context.Context, dest string, message Message) error { - p.Lock() - t, ok := p.topics[dest] - p.Unlock() - if !ok { - return ErrNotFound - } - t.publish(message) - return nil } -func (p *publisher) Subscribe(c context.Context, dest string, receiver Receiver) error { +func (p *Publisher) Subscribe(c context.Context, receiver Receiver) { p.Lock() - t, ok := p.topics[dest] + p.subs[&receiver] = struct{}{} p.Unlock() - if !ok { - return ErrNotFound - } - s := &subscriber{ - receiver: receiver, - } - t.subscribe(s) - select { - case <-c.Done(): - case <-t.done: - } - t.unsubscribe(s) - return nil -} - -func (p *publisher) Remove(_ context.Context, dest string) error { + <-c.Done() p.Lock() - t, ok := p.topics[dest] - if ok { - delete(p.topics, dest) - t.close() - } + delete(p.subs, &receiver) p.Unlock() - return nil } diff --git a/server/pubsub/pub_test.go b/server/pubsub/pub_test.go index 48c91d485fd..3a0d515580d 100644 --- a/server/pubsub/pub_test.go +++ b/server/pubsub/pub_test.go @@ -16,7 +16,6 @@ package pubsub import ( "context" - "errors" "sync" "testing" "time" @@ -28,7 +27,6 @@ func TestPubsub(t *testing.T) { var ( wg sync.WaitGroup - testTopic = "test" testMessage = Message{ Data: []byte("test"), } @@ -39,81 +37,20 @@ func TestPubsub(t *testing.T) { ) broker := New() - assert.NoError(t, broker.Create(ctx, testTopic)) go func() { - assert.NoError(t, broker.Subscribe(ctx, testTopic, func(message Message) { wg.Done() })) + broker.Subscribe(ctx, func(message Message) { assert.Equal(t, testMessage, message); wg.Done() }) }() go func() { - assert.NoError(t, broker.Subscribe(ctx, testTopic, func(message Message) { wg.Done() })) + broker.Subscribe(ctx, func(message Message) { wg.Done() }) }() <-time.After(500 * time.Millisecond) - if _, ok := broker.(*publisher).topics[testTopic]; !ok { - t.Errorf("Expect topic registered with publisher") - } - wg.Add(2) go func() { - assert.NoError(t, broker.Publish(ctx, testTopic, testMessage)) + broker.Publish(testMessage) }() wg.Wait() cancel(nil) } - -func TestPublishNotFound(t *testing.T) { - var ( - testTopic = "test" - testMessage = Message{ - Data: []byte("test"), - } - ) - broker := New() - err := broker.Publish(context.Background(), testTopic, testMessage) - if !errors.Is(err, ErrNotFound) { - t.Errorf("Expect Not Found error when topic does not exist") - } -} - -func TestSubscribeNotFound(t *testing.T) { - var ( - testTopic = "test" - testCallback = func(message Message) {} - ) - broker := New() - err := broker.Subscribe(context.Background(), testTopic, testCallback) - if !errors.Is(err, ErrNotFound) { - t.Errorf("Expect Not Found error when topic does not exist") - } -} - -func TestSubscriptionClosed(t *testing.T) { - var ( - wg sync.WaitGroup - - testTopic = "test" - testCallback = func(Message) {} - ) - - broker := New() - assert.NoError(t, broker.Create(context.Background(), testTopic)) - go func() { - assert.NoError(t, broker.Subscribe(context.Background(), testTopic, testCallback)) - wg.Done() - }() - - <-time.After(500 * time.Millisecond) - - if _, ok := broker.(*publisher).topics[testTopic]; !ok { - t.Errorf("Expect topic registered with publisher") - } - - wg.Add(1) - assert.NoError(t, broker.Remove(context.Background(), testTopic)) - wg.Wait() - - if _, ok := broker.(*publisher).topics[testTopic]; ok { - t.Errorf("Expect topic removed from publisher") - } -} diff --git a/server/pubsub/pubsub.go b/server/pubsub/pubsub.go deleted file mode 100644 index fcfc99f01cd..00000000000 --- a/server/pubsub/pubsub.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2023 Woodpecker Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package pubsub implements a publish-subscriber messaging system. -package pubsub - -import ( - "context" - "errors" -) - -// ErrNotFound is returned when the named topic does not exist. -var ErrNotFound = errors.New("topic not found") - -// Message defines a published message. -type Message struct { - // ID identifies this message. - ID string `json:"id,omitempty"` - - // Data is the actual data in the entry. - Data []byte `json:"data"` - - // Labels represents the key-value pairs the entry is labeled with. - Labels map[string]string `json:"labels,omitempty"` -} - -// Receiver receives published messages. -type Receiver func(Message) - -// Publisher defines a mechanism for communicating messages from a group -// of senders, called publishers, to a group of consumers. -type Publisher interface { - // Create creates the named topic. - Create(c context.Context, topic string) error - - // Publish publishes the message. - Publish(c context.Context, topic string, message Message) error - - // Subscribe subscribes to the topic. The Receiver function is a callback - // function that receives published messages. - Subscribe(c context.Context, topic string, receiver Receiver) error - - // Remove removes the named topic. - Remove(c context.Context, topic string) error -} diff --git a/server/pubsub/topic.go b/server/pubsub/topic.go deleted file mode 100644 index 298b39f2093..00000000000 --- a/server/pubsub/topic.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright 2023 Woodpecker Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pubsub - -import "sync" - -type topic struct { - sync.Mutex - - name string - done chan struct{} - subs map[*subscriber]struct{} -} - -func newTopic(dest string) *topic { - return &topic{ - name: dest, - done: make(chan struct{}), - subs: make(map[*subscriber]struct{}), - } -} - -func (t *topic) subscribe(s *subscriber) { - t.Lock() - t.subs[s] = struct{}{} - t.Unlock() -} - -func (t *topic) unsubscribe(s *subscriber) { - t.Lock() - delete(t.subs, s) - t.Unlock() -} - -func (t *topic) publish(m Message) { - t.Lock() - for s := range t.subs { - go s.receiver(m) - } - t.Unlock() -} - -func (t *topic) close() { - t.Lock() - close(t.done) - t.Unlock() -} diff --git a/server/pubsub/topic_test.go b/server/pubsub/topic_test.go deleted file mode 100644 index e70e0a26d1f..00000000000 --- a/server/pubsub/topic_test.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2023 Woodpecker Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pubsub - -import ( - "testing" - "time" -) - -func TestTopicSubscribe(t *testing.T) { - sub := new(subscriber) - top := newTopic("foo") - top.subscribe(sub) - if _, ok := top.subs[sub]; !ok { - t.Errorf("Expect subscription registered with topic on subscribe") - } -} - -func TestTopicUnsubscribe(t *testing.T) { - sub := new(subscriber) - top := newTopic("foo") - top.subscribe(sub) - if _, ok := top.subs[sub]; !ok { - t.Errorf("Expect subscription registered with topic on subscribe") - } - top.unsubscribe(sub) - if _, ok := top.subs[sub]; ok { - t.Errorf("Expect subscription de-registered with topic on unsubscribe") - } -} - -func TestTopicClose(t *testing.T) { - sub := new(subscriber) - top := newTopic("foo") - top.subscribe(sub) - go func() { - top.close() - }() - select { - case <-top.done: - case <-time.After(1 * time.Second): - t.Errorf("Expect subscription closed") - } -}