Skip to content

Commit

Permalink
feat: example application and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelsmejkal committed Oct 14, 2024
1 parent fca160e commit cefb8e3
Show file tree
Hide file tree
Showing 27 changed files with 692 additions and 38 deletions.
33 changes: 23 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ Also when doing graceful shutdown the tasks needs to be closed in reversed order
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

signaler := plumber.NewErrorSignaler()
signal := plumber.NewSignal()

err := plumber.Start(ctx,
// Serial pipeline. Task are started sequentially and closed in reverse order.
Expand All @@ -105,8 +105,7 @@ err := plumber.Start(ctx,
fmt.Println("pipeline is closing")
return nil
}),
plumber.GracefulRunner(func(ctx context.Context, ready plumber.ReadyFunc) error {
ready()
plumber.GracefulRunner(func(ctx context.Context) error {
fmt.Println("Task 1 starting")
<-ctx.Done()
return nil
Expand All @@ -116,12 +115,28 @@ err := plumber.Start(ctx,
}),
// The parallel pipeline all task are stared and closed in parallel.
plumber.Parallel(
plumber.SimpleRunner(func(ctx context.Context) error {
fmt.Println("Task 2 starting")
<-ctx.Done()
// Runner that implements Runner, Readier, Closeable
plumber.NewRunner(
func(ctx context.Context) error {
go func() {
time.Sleep(1 * time.Second)
fmt.Println("Task 2 is ready")
signal.Notify()
}()
fmt.Println("Task 2 starting")
<-ctx.Done()
return nil
},
plumber.WithClose(func(ctx context.Context) error {
fmt.Println("Task 2 closing")
return nil
}),
plumber.WithReady(signal),
)
plumber.NewRunner(func(ctx context.Context) error {
return nil
}),
plumber.SimpleRunner(func(ctx context.Context) error {
plumber.NewRunner(func(ctx context.Context) error {
fmt.Println("Task 3 starting")
<-ctx.Done()
return nil
Expand All @@ -148,7 +163,7 @@ err := plumber.Start(ctx,
// Dependency graph based runner
&a.D4,
&a.HTTP.Server,
).With(plumber.Signaler(signaler)),
),
// The pipeline needs to finish startup phase within 30 seconds. If not, run context is canceled. Close is initiated.
plumber.Readiness(30*time.Second),
// The pipeline needs to gracefully close with 120 seconds. If not, internal run and close contexts are canceled.
Expand All @@ -157,7 +172,5 @@ err := plumber.Start(ctx,
plumber.TTL(120*time.Second),
// When given signals will be received pipeline will be closed gracefully.
plumber.SignalCloser(),
// When some tasks covered with signaler reports and error pipeline will be closed.
plumber.Closing(signaler),
)
```
2 changes: 1 addition & 1 deletion container.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *Container) CleanupError(fn func() error) {
c.cleanup = append(c.cleanup, fn)
}

// Close calls all clenaup functions
// Close calls all cleanup functions
func (c *Container) Close() error {
errs := []error{}
for _, cleanup := range c.cleanup {
Expand Down
31 changes: 31 additions & 0 deletions example/adapter/async/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: async infra for example application

// Package async provides async infra for example application
package async

import (
"context"
"fmt"

"github.com/getoutreach/plumber"
"github.com/getoutreach/plumber/example/contract"
)

// Publisher service
type Publisher struct {
*plumber.BaseLooper
broker string
}

func NewPublisher(broker string) *Publisher {
return &Publisher{
broker: broker,
BaseLooper: contract.NewWorker("async.Publisher"),
}
}

func (p *Publisher) Publish(ctx context.Context, e *contract.Entity) error {
fmt.Printf("Publishing entity #%v name=%s\n", e.ID, e.Name)
return nil
}
64 changes: 64 additions & 0 deletions example/adapter/database/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: database infra for example application

// Package database provides database infra for example application
package database

import (
"context"
"fmt"
"sync/atomic"

"github.com/getoutreach/plumber"
"github.com/getoutreach/plumber/example/contract"
)

// EntityMask provides a mask to format an entity
const EntityMask = "Entity %v"

// Repository represents a plain database repository
type Repository struct {
id int64
}

func NewRepository() (*Repository, error) {
return &Repository{}, nil
}

func (s *Repository) Get(ctx context.Context, id int64) (*contract.Entity, error) {
return &contract.Entity{
ID: id,
Name: fmt.Sprintf(EntityMask, id),
}, nil
}

func (s *Repository) Create(ctx context.Context, name string) (*contract.Entity, error) {
nextID := atomic.AddInt64(&s.id, 1)
return &contract.Entity{
ID: nextID,
Name: fmt.Sprintf(EntityMask, nextID),
}, nil
}

// BatchingRepository represents a database repository that can batch single entity reads into batch query
type BatchingRepository struct {
*plumber.BaseLooper
inner contract.Repository
}

func NewBatchingRepository(inner contract.Repository, batchSize int) (*BatchingRepository, error) {
r := &BatchingRepository{
inner: inner,
BaseLooper: contract.NewWorker("database.BatchingRepository"),
}
return r, nil
}

func (s *BatchingRepository) Get(ctx context.Context, id int64) (*contract.Entity, error) {
// Here suppose to be a logic that batches requests into a single batch query
return s.inner.Get(ctx, id)
}

func (s *BatchingRepository) Create(ctx context.Context, name string) (*contract.Entity, error) {
return s.inner.Create(ctx, name)
}
33 changes: 33 additions & 0 deletions example/adapter/graphql/graphql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: graphql infra for example application

// Package graphql provides graphql infra for example application
package graphql

import (
"github.com/getoutreach/plumber"
"github.com/getoutreach/plumber/example/contract"
"github.com/getoutreach/plumber/example/service"
)

// Server represents a graphql server
type Server struct {
*plumber.BaseLooper
port int32
querier *service.QueryService
mutator contract.MutatorService
}

// NewServer returns intance of the *Server
func NewServer(
port int32,
querier *service.QueryService,
mutator contract.MutatorService,
) (*Server, error) {
return &Server{
port: port,
querier: querier,
mutator: mutator,
BaseLooper: contract.NewWorker("graphql.Server"),
}, nil
}
33 changes: 33 additions & 0 deletions example/adapter/grpc/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: grpc infra for example application

// Package grpc provides grpc infra for example application
package grpc

import (
"github.com/getoutreach/plumber"
"github.com/getoutreach/plumber/example/contract"
"github.com/getoutreach/plumber/example/service"
)

// Server represents a grpc server
type Server struct {
*plumber.BaseLooper
port int32
querier *service.QueryService
mutator contract.MutatorService
}

// NewServer returns intance of the *Server
func NewServer(
port int32,
querier *service.QueryService,
mutator contract.MutatorService,
) (*Server, error) {
return &Server{
port: port,
querier: querier,
mutator: mutator,
BaseLooper: contract.NewWorker("grpc.Server"),
}, nil
}
41 changes: 41 additions & 0 deletions example/application.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: application root dependency container
package example

import (
"context"

"github.com/getoutreach/plumber"
)

// Config represents a application configuration structure
type Config struct {
AsyncBroker string
}

// Definer allows to redefine container on startup
type Definer = func(ctx context.Context, cf *Config, a *Container)

// Container represents root application dependency container
type Container struct {
plumber.Container
Async *Async
Database *Database
GraphQL *GraphQL
GRPC *GRPC
Service *Service
}

// NewApplication returns instance of the root dependency container
func NewApplication(ctx context.Context, cf *Config, definers ...Definer) *Container {
a := &Container{
GRPC: new(GRPC),
Database: new(Database),
GraphQL: new(GraphQL),
Service: new(Service),
Async: new(Async),
}
return plumber.DefineContainers(ctx, cf, definers, a,
a.Async, a.Database, a.GRPC, a.GraphQL, a.Service,
)
}
22 changes: 22 additions & 0 deletions example/application_async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: async related dependencies
package example

import (
"context"

"github.com/getoutreach/plumber"
"github.com/getoutreach/plumber/example/adapter/async"
)

// Async service represents async processing related dependency container
type Async struct {
Publisher plumber.R[*async.Publisher]
}

// Define resolves dependencies
func (c *Async) Define(ctx context.Context, cf *Config, a *Container) {
c.Publisher.Resolve(func(rr *plumber.ResolutionR[*async.Publisher]) {
rr.Resolve(async.NewPublisher(cf.AsyncBroker))
})
}
30 changes: 30 additions & 0 deletions example/application_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: database related dependencies
package example

import (
"context"

"github.com/getoutreach/plumber"
"github.com/getoutreach/plumber/example/adapter/database"
"github.com/getoutreach/plumber/example/contract"
)

// Database represents database related dependency container
type Database struct {
Repository plumber.D[contract.Repository]
BatchingRepository plumber.R[*database.BatchingRepository]
}

// Define resolves dependencies
func (c *Database) Define(ctx context.Context, cf *Config, a *Container) {
c.Repository.DefineError(func() (contract.Repository, error) {
return database.NewRepository()
})

c.BatchingRepository.Resolve(func(r *plumber.ResolutionR[*database.BatchingRepository]) {
r.Require(&c.Repository).Then(func() {
r.ResolveError(database.NewBatchingRepository(c.Repository.Instance(), 100))
})
})
}
15 changes: 15 additions & 0 deletions example/application_env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: env specific overrides
package example

import "context"

// WithTestEnvironment redefines application dependency graph for test environment
func WithTestEnvironment(ctx context.Context, cf *Config, a *Container) {
a.GRPC.Port.Const(1001)
}

// WithIntegrationEnvironment redefines application graph for integration environment
func WithIntegrationEnvironment(ctx context.Context, cf *Config, a *Container) {
a.GRPC.Port.Const(1000)
}
34 changes: 34 additions & 0 deletions example/application_graphql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: graphql related dependencies
package example

import (
"context"

"github.com/getoutreach/plumber"
"github.com/getoutreach/plumber/example/adapter/graphql"
)

// GraphQL represents graphql related dependency container
type GraphQL struct {
Port plumber.D[int32]
Server plumber.R[*graphql.Server]
}

// Define resolves dependencies
func (c *GraphQL) Define(ctx context.Context, cf *Config, a *Container) {
c.Port.Const(5000)

c.Server.Resolve(func(r *plumber.ResolutionR[*graphql.Server]) {
r.Require(
&a.Service.Querier,
&a.Service.Mutator,
).Then(func() {
r.ResolveError(graphql.NewServer(
c.Port.Instance(),
a.Service.Querier.Instance(),
a.Service.Mutator.Instance(),
))
})
})
}
Loading

0 comments on commit cefb8e3

Please sign in to comment.