Skip to content

Commit

Permalink
OTel tracing
Browse files Browse the repository at this point in the history
- init OTel TracerProvider
- added spans to general flow and db requests
- update astreter/amqpwrapper to v2.0.2 for propagation trace via AMQP
  • Loading branch information
astreter committed Feb 11, 2022
1 parent 2074c5e commit bf8a37f
Show file tree
Hide file tree
Showing 17 changed files with 489 additions and 233 deletions.
4 changes: 3 additions & 1 deletion cmd/httphandler.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package cmd

import (
"github.com/astreter/amqpwrapper"
"github.com/astreter/amqpwrapper/v2"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
"x-qdo/jiraclick/pkg/contract"

"x-qdo/jiraclick/pkg/config"
Expand Down Expand Up @@ -42,6 +43,7 @@ func NewHTTPHandlerCmd(
"message": "ok",
})
})
router.Use(otelgin.Middleware(config.ServiceName))

router.POST("webhooks/clickup", clickUpHandler.TaskEvent)

Expand Down
2 changes: 1 addition & 1 deletion cmd/worker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cmd

import (
"github.com/astreter/amqpwrapper"
"github.com/astreter/amqpwrapper/v2"
"github.com/spf13/cobra"

"x-qdo/jiraclick/pkg/consumer"
Expand Down
21 changes: 17 additions & 4 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package context

import (
"context"
"github.com/astreter/amqpwrapper"
"github.com/astreter/amqpwrapper/v2"
"github.com/x-qdo/otelwrapper"
"sync"
"x-qdo/jiraclick/pkg/contract"

Expand Down Expand Up @@ -41,14 +42,26 @@ func NewContext() (*Context, error) {

ctx.WaitGroup = new(sync.WaitGroup)

amqpProvider, err := amqpwrapper.NewRabbitChannel(ctx.Ctx, ctx.CancelF, ctx.WaitGroup, &amqpwrapper.Config{
URL: cfg.RabbitMQ.URL,
Debug: cfg.Debug,
tp, err := otelwrapper.InitTracerProvider(config.ServiceName, "default")
if err != nil {
return nil, err
}
go otelwrapper.ShutdownWaiting(tp, ctx.Ctx, ctx.WaitGroup)

amqpProvider, err := amqpwrapper.NewRabbitChannel(ctx.Ctx, ctx.WaitGroup, &amqpwrapper.Config{
URL: cfg.RabbitMQ.URL,
Debug: cfg.Debug,
ConfirmSends: true,
})
if err != nil {
panic(err)
}

go func() {
<-amqpProvider.Cancel()
ctx.CancelF()
}()

db, err := provider.NewPostgres(cfg)
if err != nil {
panic(err)
Expand Down
37 changes: 23 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,37 @@ module x-qdo/jiraclick
go 1.16

require (
github.com/andygrunwald/go-jira v1.13.0
github.com/andygrunwald/go-jira v1.14.0
github.com/araddon/dateparse v0.0.0-20201001162425-8aadafed4dc4
github.com/astreter/amqpwrapper v1.2.3
github.com/astreter/amqpwrapper/v2 v2.0.2
github.com/gin-gonic/gin v1.7.7
github.com/go-pg/migrations/v8 v8.0.1 // indirect
github.com/go-pg/pg/v10 v10.7.4
github.com/go-playground/validator/v10 v10.9.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-pg/migrations/v8 v8.1.0
github.com/go-pg/pg/extra/pgotel/v10 v10.10.6
github.com/go-pg/pg/v10 v10.10.6
github.com/go-playground/validator/v10 v10.10.0 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/pkg/errors v0.8.1
github.com/rabbitmq/amqp091-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/rabbitmq/amqp091-go v1.3.0
github.com/sirupsen/logrus v1.8.1
github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.4.0
github.com/trivago/tgo v1.0.1
github.com/trivago/tgo v1.0.7
github.com/ugorji/go v1.2.6 // indirect
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
github.com/x-qdo/otelwrapper v1.0.1
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.28.0
go.opentelemetry.io/otel v1.3.0
go.opentelemetry.io/otel/trace v1.3.0
go.opentelemetry.io/proto/otlp v0.12.0 // indirect
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a // indirect
google.golang.org/genproto v0.0.0-20220204002441-d6cc3cc0770e // indirect
google.golang.org/grpc v1.44.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
254 changes: 143 additions & 111 deletions go.sum

Large diffs are not rendered by default.

44 changes: 31 additions & 13 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ import (
"github.com/spf13/viper"
)

const ServiceName = "jiraclick"

var envBindings = []string{
"debug",
"rabbitmq.url",
"httphandler.port",
"metrics.port",
"postgres.url",
"postgres.insecure",
"otel.exporter.endpoint",
}

type Config struct {
Debug bool `yaml:"debug"`
RabbitMQ struct {
Expand All @@ -24,22 +36,18 @@ type Config struct {
URL string `yaml:"url"`
Insecure bool `yaml:"insecure"`
} `yaml:"postgres"`
Metrics struct {
Port string `yaml:"port"`
} `yaml:"metrics"`
OTel struct {
Exporter struct {
Endpoint string `yaml:"endpoint"`
} `yaml:"exporter"`
} `yaml:"otel"`
}

func NewConfig() (*Config, error) {
if err := viper.BindEnv("debug"); err != nil {
return nil, err
}
if err := viper.BindEnv("rabbitmq.url"); err != nil {
return nil, err
}
if err := viper.BindEnv("httphandler.port"); err != nil {
return nil, err
}
if err := viper.BindEnv("postgres.url"); err != nil {
return nil, err
}
if err := viper.BindEnv("postgres.insecure"); err != nil {
if err := bindEnvs(envBindings); err != nil {
return nil, err
}
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
Expand All @@ -51,3 +59,13 @@ func NewConfig() (*Config, error) {

return c, nil
}

func bindEnvs(envKeys []string) error {
for _, envKey := range envKeys {
if err := viper.BindEnv(envKey); err != nil {
return fmt.Errorf("error binding '%s': %w", envKey, err)
}
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/consumer/actions.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package consumer

import (
"github.com/astreter/amqpwrapper"
"github.com/astreter/amqpwrapper/v2"
"x-qdo/jiraclick/pkg/contract"
"x-qdo/jiraclick/pkg/provider/clickup"
"x-qdo/jiraclick/pkg/provider/jira"
Expand Down
38 changes: 29 additions & 9 deletions pkg/consumer/task-create-clickup.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package consumer

import (
"context"
"encoding/json"
"github.com/araddon/dateparse"
"go.opentelemetry.io/otel"

"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go"
Expand All @@ -25,45 +27,63 @@ func NewTaskCreateClickupAction(clickup *clickup.ConnectorPool, p *publisher.Eve
}, nil
}

func (a *TaskCreateClickupAction) ProcessAction(delivery amqp.Delivery) error {
func (a *TaskCreateClickupAction) ProcessAction(ctx context.Context, delivery amqp.Delivery) error {
var (
input inputBody
task *clickup.Task
payload model.TaskPayload
)

ctx, span := otel.Tracer("clickup action").Start(ctx, "ProcessAction")
defer span.End()

err := json.Unmarshal(delivery.Body, &input)
if err != nil {
return errors.Wrap(err, "Can't unmarshall task body")
err = errors.Wrap(err, "Can't unmarshall task body")
span.RecordError(err)
return err
}

err = json.Unmarshal([]byte(input.Data.Payload), &payload)
if err != nil {
return errors.Wrap(err, "Can't unmarshall task body")
err = errors.Wrap(err, "Can't unmarshall task body")
span.RecordError(err)
return err
}

request := a.generateTaskRequest(&payload)
task, err = a.client.GetInstance(payload.SlackChannel).CreateTask(request)
request := a.generateTaskRequest(ctx, &payload)
span.AddEvent("Request payload generated")
task, err = a.client.GetInstance(payload.SlackChannel).CreateTask(ctx, request)
if err != nil {
return errors.Wrap(err, "Can't create a task in ClickUp")
err = errors.Wrap(err, "Can't create a task in ClickUp")
span.RecordError(err)
return err
}

span.AddEvent("task created")

payload.ClickupID = task.ID
payload.Details["clickup_url"] = task.URL
err = a.publisher.ClickUpTaskCreated(payload)
err = a.publisher.ClickUpTaskCreated(ctx, payload)
if err != nil {
span.RecordError(err)
return err
}

span.AddEvent("result sent to BRP")

return nil
}

func (a *TaskCreateClickupAction) generateTaskRequest(payload *model.TaskPayload) *clickup.PutClickUpTaskRequest {
func (a *TaskCreateClickupAction) generateTaskRequest(ctx context.Context, payload *model.TaskPayload) *clickup.PutClickUpTaskRequest {
request := new(clickup.PutClickUpTaskRequest)

ctx, span := otel.Tracer("clickup action").Start(ctx, "generateTaskRequest")
defer span.End()

request.Name = payload.Title
request.NotifyAll = false
request.Status = a.client.GetInstance(payload.SlackChannel).GetInitialTaskStatus()
request.Status = a.client.GetInstance(payload.SlackChannel).GetInitialTaskStatus(ctx)
request.Description = payload.Description + "\n" + payload.AC
request.AddCustomField(clickup.RequestedBy, payload.SlackReporter)
request.AddCustomField(clickup.SlackLink, payload.Details["slack"])
Expand Down
30 changes: 23 additions & 7 deletions pkg/consumer/task-create-jira.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package consumer

import (
"context"
"encoding/json"

"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/trivago/tgo/tcontainer"
"go.opentelemetry.io/otel"

"x-qdo/jiraclick/pkg/contract"
"x-qdo/jiraclick/pkg/model"
Expand All @@ -25,36 +26,51 @@ func NewTaskCreateJiraAction(jira *jira.ConnectorPool, p *publisher.EventPublish
}, nil
}

func (a *TaskCreateJiraAction) ProcessAction(delivery amqp.Delivery) error {
func (a *TaskCreateJiraAction) ProcessAction(ctx context.Context, delivery amqp.Delivery) error {
var (
input inputBody
payload model.TaskPayload
)

ctx, span := otel.Tracer("jira action").Start(ctx, "ProcessAction")
defer span.End()

err := json.Unmarshal(delivery.Body, &input)
if err != nil {
return errors.Wrap(err, "Can't unmarshall task body")
err = errors.Wrap(err, "Can't unmarshall task body")
span.RecordError(err)
return err
}

err = json.Unmarshal([]byte(input.Data.Payload), &payload)
if err != nil {
return errors.Wrap(err, "Can't unmarshall task body")
err = errors.Wrap(err, "Can't unmarshall task body")
span.RecordError(err)
return err
}

task := a.generateTaskRequest(payload)
span.AddEvent("Request payload generated")

response, err := a.client.GetInstance(payload.SlackChannel).CreateIssue(task)
response, err := a.client.GetInstance(payload.SlackChannel).CreateIssue(ctx, task)
if err != nil {
return errors.Wrap(err, "Can't create task in Jira")
err = errors.Wrap(err, "Can't create task in Jira")
span.RecordError(err)
return err
}

span.AddEvent("issue created")

payload.JiraID = response.ID
payload.Details["jira_url"] = response.URL
err = a.publisher.JiraTaskCreated(payload)
err = a.publisher.JiraTaskCreated(ctx, payload)
if err != nil {
span.RecordError(err)
return err
}

span.AddEvent("result sent to BRP")

return nil
}

Expand Down
Loading

0 comments on commit bf8a37f

Please sign in to comment.