Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/simplify pubsub #2554

Merged
merged 13 commits into from
Oct 13, 2023
4 changes: 0 additions & 4 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package main

import (
"context"
"crypto/tls"
"net"
"net/http"
Expand Down Expand Up @@ -264,9 +263,6 @@ func setupEvilGlobals(c *cli.Context, v store.Store, f forge.Forge) {
server.Config.Services.Queue = setupQueue(c, v)
server.Config.Services.Logs = logging.New()
server.Config.Services.Pubsub = pubsub.New()
if err := server.Config.Services.Pubsub.Create(context.Background(), "topic/events"); err != nil {
log.Error().Err(err).Msg("could not create pubsub service")
}
server.Config.Services.Registries = setupRegistryService(c, v)

// TODO(1544): fix encrypted store
Expand Down
7 changes: 2 additions & 5 deletions server/api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func EventStreamSSE(c *gin.Context) {
}()

go func() {
err := server.Config.Services.Pubsub.Subscribe(ctx, "topic/events", func(m pubsub.Message) {
server.Config.Services.Pubsub.Subscribe(ctx, func(m pubsub.Message) {
defer func() {
obj := recover() // fix #2480 // TODO: check if it's still needed
log.Trace().Msgf("pubsub subscribe recover return: %v", obj)
Expand All @@ -95,10 +95,7 @@ func EventStreamSSE(c *gin.Context) {
}
}
})
if err != nil {
log.Error().Err(err).Msg("Subscribe failed")
}
cancel(err)
cancel(nil)
}()

for {
Expand Down
2 changes: 1 addition & 1 deletion server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

var Config = struct {
Services struct {
Pubsub pubsub.Publisher
Pubsub *pubsub.Publisher
Queue queue.Queue
Logs logging.Log
Secrets model.SecretService
Expand Down
20 changes: 7 additions & 13 deletions server/grpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
type RPC struct {
forge forge.Forge
queue queue.Queue
pubsub pubsub.Publisher
pubsub *pubsub.Publisher
logger logging.Log
store store.Store
pipelineTime *prometheus.GaugeVec
Expand Down Expand Up @@ -102,7 +102,7 @@ func (s *RPC) Extend(c context.Context, id string) error {
}

// Update implements the rpc.Update function
func (s *RPC) Update(c context.Context, id string, state rpc.State) error {
func (s *RPC) Update(_ context.Context, id string, state rpc.State) error {
workflowID, err := strconv.ParseInt(id, 10, 64)
if err != nil {
return err
Expand Down Expand Up @@ -150,9 +150,7 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error {
Repo: *repo,
Pipeline: *currentPipeline,
})
if err := s.pubsub.Publish(c, "topic/events", message); err != nil {
log.Error().Err(err).Msg("can not publish step list to")
}
s.pubsub.Publish(message)

return nil
}
Expand Down Expand Up @@ -208,9 +206,7 @@ func (s *RPC) Init(c context.Context, id string, state rpc.State) error {
Repo: *repo,
Pipeline: *currentPipeline,
})
if err := s.pubsub.Publish(c, "topic/events", message); err != nil {
log.Error().Err(err).Msg("can not publish step list to")
}
s.pubsub.Publish(message)
}()

workflow, err = pipeline.UpdateWorkflowToStatusStarted(s.store, *workflow, state)
Expand Down Expand Up @@ -297,7 +293,7 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
}
}()

if err := s.notify(c, repo, currentPipeline); err != nil {
if err := s.notify(repo, currentPipeline); err != nil {
return err
}

Expand Down Expand Up @@ -399,7 +395,7 @@ func (s *RPC) updateForgeStatus(ctx context.Context, repo *model.Repo, pipeline
}
}

func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeline) (err error) {
func (s *RPC) notify(repo *model.Repo, pipeline *model.Pipeline) (err error) {
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
Expand All @@ -410,9 +406,7 @@ func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeli
Repo: *repo,
Pipeline: *pipeline,
})
if err := s.pubsub.Publish(c, "topic/events", message); err != nil {
log.Error().Err(err).Msgf("grpc could not notify event: '%v'", message)
}
s.pubsub.Publish(message)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion server/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type WoodpeckerServer struct {
peer RPC
}

func NewWoodpeckerServer(forge forge.Forge, queue queue.Queue, logger logging.Log, pubsub pubsub.Publisher, store store.Store) proto.WoodpeckerServer {
func NewWoodpeckerServer(forge forge.Forge, queue queue.Queue, logger logging.Log, pubsub *pubsub.Publisher, store store.Store) proto.WoodpeckerServer {
pipelineTime := promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "pipeline_time",
Expand Down
4 changes: 1 addition & 3 deletions server/pipeline/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ func Cancel(ctx context.Context, store store.Store, repo *model.Repo, user *mode
if killedPipeline.Workflows, err = store.WorkflowGetTree(killedPipeline); err != nil {
return err
}
if err := publishToTopic(ctx, killedPipeline, repo); err != nil {
log.Error().Err(err).Msg("publishToTopic")
}
publishToTopic(killedPipeline, repo)

return nil
}
Expand Down
6 changes: 2 additions & 4 deletions server/pipeline/decline.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/woodpecker-ci/woodpecker/server/store"
)

// Decline update the status to declined for blocked pipeline because of a gated repo
// Decline updates the status to declined for blocked pipelines because of a gated repo
func Decline(ctx context.Context, store store.Store, pipeline *model.Pipeline, user *model.User, repo *model.Repo) (*model.Pipeline, error) {
if pipeline.Status != model.StatusBlocked {
return nil, fmt.Errorf("cannot decline a pipeline with status %s", pipeline.Status)
Expand All @@ -41,9 +41,7 @@ func Decline(ctx context.Context, store store.Store, pipeline *model.Pipeline, u

updatePipelineStatus(ctx, pipeline, repo, user)

if err := publishToTopic(ctx, pipeline, repo); err != nil {
log.Error().Err(err).Msg("publishToTopic")
}
publishToTopic(pipeline, repo)

return pipeline, nil
}
5 changes: 1 addition & 4 deletions server/pipeline/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ func start(ctx context.Context, store store.Store, activePipeline *model.Pipelin
}

func publishPipeline(ctx context.Context, pipeline *model.Pipeline, repo *model.Repo, repoUser *model.User) {
if err := publishToTopic(ctx, pipeline, repo); err != nil {
log.Error().Err(err).Msg("publishToTopic")
}

publishToTopic(pipeline, repo)
updatePipelineStatus(ctx, pipeline, repo, repoUser)
}
5 changes: 2 additions & 3 deletions server/pipeline/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package pipeline

import (
"context"
"encoding/json"
"strconv"

Expand All @@ -25,7 +24,7 @@ import (
)

// publishToTopic publishes message to UI clients
func publishToTopic(c context.Context, pipeline *model.Pipeline, repo *model.Repo) (err error) {
func publishToTopic(pipeline *model.Pipeline, repo *model.Repo) {
message := pubsub.Message{
Labels: map[string]string{
"repo": repo.FullName,
Expand All @@ -38,5 +37,5 @@ func publishToTopic(c context.Context, pipeline *model.Pipeline, repo *model.Rep
Repo: *repo,
Pipeline: pipelineCopy,
})
return server.Config.Services.Pubsub.Publish(c, "topic/events", message)
server.Config.Services.Pubsub.Publish(message)
}
12 changes: 0 additions & 12 deletions server/pubsub/README.md

This file was deleted.

73 changes: 25 additions & 48 deletions server/pubsub/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,71 +19,48 @@ import (
"sync"
)

type subscriber struct {
receiver Receiver
// Message defines a published message.
type Message struct {
// ID identifies this message.
ID string `json:"id,omitempty"`

// Data is the actual data in the entry.
Data []byte `json:"data"`

// Labels represents the key-value pairs the entry is labeled with.
Labels map[string]string `json:"labels,omitempty"`
}

type publisher struct {
// Receiver receives published messages.
type Receiver func(Message)

type Publisher struct {
sync.Mutex

topics map[string]*topic
subs map[*Receiver]struct{}
qwerty287 marked this conversation as resolved.
Show resolved Hide resolved
}

// New creates an in-memory publisher.
func New() Publisher {
return &publisher{
topics: make(map[string]*topic),
func New() *Publisher {
return &Publisher{
subs: make(map[*Receiver]struct{}),
}
}

func (p *publisher) Create(_ context.Context, dest string) error {
func (p *Publisher) Publish(message Message) {
p.Lock()
_, ok := p.topics[dest]
if !ok {
t := newTopic(dest)
p.topics[dest] = t
for s := range p.subs {
go (*s)(message)
}
p.Unlock()
return nil
}

func (p *publisher) Publish(_ context.Context, dest string, message Message) error {
p.Lock()
t, ok := p.topics[dest]
p.Unlock()
if !ok {
return ErrNotFound
}
t.publish(message)
return nil
}

func (p *publisher) Subscribe(c context.Context, dest string, receiver Receiver) error {
func (p *Publisher) Subscribe(c context.Context, receiver Receiver) {
p.Lock()
t, ok := p.topics[dest]
p.subs[&receiver] = struct{}{}
p.Unlock()
if !ok {
return ErrNotFound
}
s := &subscriber{
receiver: receiver,
}
t.subscribe(s)
select {
case <-c.Done():
case <-t.done:
}
t.unsubscribe(s)
return nil
}

func (p *publisher) Remove(_ context.Context, dest string) error {
<-c.Done()
p.Lock()
t, ok := p.topics[dest]
if ok {
delete(p.topics, dest)
t.close()
}
delete(p.subs, &receiver)
p.Unlock()
return nil
}
Loading