Skip to content

Commit

Permalink
feat(wire): increase wire adoption and add billing worker (#1958)
Browse files Browse the repository at this point in the history
Co-authored-by: Peter Marton <peter@openmeter.io>
  • Loading branch information
turip and hekike authored Dec 15, 2024
1 parent 478560f commit 36bd2f8
Show file tree
Hide file tree
Showing 53 changed files with 2,386 additions and 1,122 deletions.
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ build-balance-worker: ## Build balance-worker binary
$(call print-target)
go build -o build/balance-worker ./cmd/balance-worker

.PHONY: build-billing-worker
build-billing-worker: ## Build billing-worker binary
$(call print-target)
go build -o build/billing-worker ./cmd/billing-worker

.PHONY: build-notification-service
build-notification-service: ## Build notification-service binary
$(call print-target)
Expand All @@ -86,6 +91,12 @@ balance-worker: ## Run balance-worker
$(call print-target)
air -c ./cmd/balance-worker/.air.toml

.PHONY: billing-worker
billing-worker: ## Run billing-worker
@ if [ config.yaml -ot config.example.yaml ]; then diff -u config.yaml config.example.yaml || (echo "!!! The configuration example changed. Please update your config.yaml file accordingly (or at least touch it). !!!" && false); fi
$(call print-target)
air -c ./cmd/billing-worker/.air.toml

.PHONY: notification-service
notification-service: ## Run notification-service
@ if [ config.yaml -ot config.example.yaml ]; then diff -u config.yaml config.example.yaml || (echo "!!! The configuration example changed. Please update your config.yaml file accordingly (or at least touch it). !!!" && false); fi
Expand Down
107 changes: 81 additions & 26 deletions app/common/app.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,99 @@
package common

import (
"errors"
"context"
"fmt"
"log/slog"
"net/http"

"github.com/oklog/run"
"github.com/google/wire"

"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/app"
appadapter "github.com/openmeterio/openmeter/openmeter/app/adapter"
appsandbox "github.com/openmeterio/openmeter/openmeter/app/sandbox"
appservice "github.com/openmeterio/openmeter/openmeter/app/service"
appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe"
appstripeadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/adapter"
appstripeservice "github.com/openmeterio/openmeter/openmeter/app/stripe/service"
"github.com/openmeterio/openmeter/openmeter/customer"
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
"github.com/openmeterio/openmeter/openmeter/namespace"
"github.com/openmeterio/openmeter/openmeter/secret"
)

// Metadata provides information about the service to components that need it (eg. telemetry).
type Metadata struct {
ServiceName string
Version string
Environment string
OpenTelemetryName string
}
var App = wire.NewSet(
NewAppService,
NewAppStripeService,
NewAppSandboxProvisioner,
)

type AppSandboxProvisioner func() error

func NewAppService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration) (app.Service, error) {
// TODO: remove this check after enabled by default
if !appsConfig.Enabled || db == nil {
return nil, nil
}

func NewMetadata(conf config.Configuration, version string, serviceName string) Metadata {
return Metadata{
ServiceName: fmt.Sprintf("openmeter-%s", serviceName),
Version: version,
Environment: conf.Environment,
OpenTelemetryName: fmt.Sprintf("openmeter.io/%s", serviceName),
appAdapter, err := appadapter.New(appadapter.Config{
Client: db,
BaseURL: appsConfig.BaseURL,
})
if err != nil {
return nil, fmt.Errorf("failed to create app adapter: %w", err)
}

return appservice.New(appservice.Config{
Adapter: appAdapter,
})
}

// Runner is a helper struct that runs a group of services.
type Runner struct {
Group run.Group
Logger *slog.Logger
func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service) (appstripe.Service, error) {
// TODO: remove this check after enabled by default
if !appsConfig.Enabled || db == nil {
return nil, nil
}

appStripeAdapter, err := appstripeadapter.New(appstripeadapter.Config{
Client: db,
AppService: appService,
CustomerService: customerService,
SecretService: secretService,
})
if err != nil {
return nil, fmt.Errorf("failed to create appstripe adapter: %w", err)
}

return appstripeservice.New(appstripeservice.Config{
Adapter: appStripeAdapter,
AppService: appService,
SecretService: secretService,
})
}

func (r Runner) Run() {
err := r.Group.Run()
if e := (run.SignalError{}); errors.As(err, &e) {
r.Logger.Info("received signal: shutting down", slog.String("signal", e.Signal.String()))
} else if !errors.Is(err, http.ErrServerClosed) {
r.Logger.Error("application stopped due to error", slog.String("error", err.Error()))
func NewAppSandboxProvisioner(ctx context.Context, logger *slog.Logger, appsConfig config.AppsConfiguration, appService app.Service, namespaceManager *namespace.Manager) (AppSandboxProvisioner, error) {
if !appsConfig.Enabled {
return nil, nil
}

_, err := appsandbox.NewFactory(appsandbox.Config{
AppService: appService,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize app sandbox factory: %w", err)
}

return func() error {
app, err := appsandbox.AutoProvision(ctx, appsandbox.AutoProvisionInput{
Namespace: namespaceManager.GetDefaultNamespace(),
AppService: appService,
})
if err != nil {
return fmt.Errorf("failed to auto-provision sandbox app: %w", err)
}

logger.Info("sandbox app auto-provisioned", "app_id", app.GetID().ID)

return nil
}, nil
}
63 changes: 63 additions & 0 deletions app/common/billing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package common

import (
"fmt"
"log/slog"

"github.com/google/wire"

"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/app"
appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe"
"github.com/openmeterio/openmeter/openmeter/billing"
billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter"
billingservice "github.com/openmeterio/openmeter/openmeter/billing/service"
"github.com/openmeterio/openmeter/openmeter/customer"
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
"github.com/openmeterio/openmeter/openmeter/meter"
"github.com/openmeterio/openmeter/openmeter/productcatalog/feature"
"github.com/openmeterio/openmeter/openmeter/streaming"
)

var Billing = wire.NewSet(
BillingService,
)

func BillingService(
logger *slog.Logger,
db *entdb.Client,
appService app.Service,
appStripeService appstripe.Service,
billingConfig config.BillingConfiguration,
customerService customer.Service,
featureConnector feature.FeatureConnector,
meterRepo meter.Repository,
streamingConnector streaming.Connector,
) (billing.Service, error) {
// TODO: remove this check after enabled by default
if db == nil {
return nil, nil
}

if !billingConfig.Enabled {
return nil, nil
}

adapter, err := billingadapter.New(billingadapter.Config{
Client: db,
Logger: logger,
})
if err != nil {
return nil, fmt.Errorf("creating billing adapter: %w", err)
}

return billingservice.New(billingservice.Config{
Adapter: adapter,
AppService: appService,
CustomerService: customerService,
FeatureService: featureConnector,
Logger: logger,
MeterRepo: meterRepo,
StreamingConnector: streamingConnector,
})
}
8 changes: 6 additions & 2 deletions app/common/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import (
"fmt"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/google/wire"

"github.com/openmeterio/openmeter/app/config"
)

var ClickHouse = wire.NewSet(
NewClickHouse,
)

// TODO: add closer function?
func NewClickHouse(conf config.ClickHouseAggregationConfiguration) (driver.Conn, error) {
func NewClickHouse(conf config.ClickHouseAggregationConfiguration) (clickhouse.Conn, error) {
conn, err := clickhouse.Open(conf.GetClientOptions())
if err != nil {
return nil, fmt.Errorf("failed to initialize clickhouse client: %w", err)
Expand Down
44 changes: 44 additions & 0 deletions app/common/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package common

import (
"github.com/google/wire"

"github.com/openmeterio/openmeter/app/config"
)

// We have configs separatly to be able to reuse wires in other projects
var Config = wire.NewSet(
// App
wire.FieldsOf(new(config.Configuration), "Apps"),
// Aggregation
wire.FieldsOf(new(config.Configuration), "Aggregation"),
// Billing
wire.FieldsOf(new(config.Configuration), "Billing"),
// ClickHouse
wire.FieldsOf(new(config.AggregationConfiguration), "ClickHouse"),
// Database
wire.FieldsOf(new(config.Configuration), "Postgres"),
// Entitlement
wire.FieldsOf(new(config.Configuration), "Entitlements"),
// Events
wire.FieldsOf(new(config.Configuration), "Events"),
// Kafka
// TODO: refactor to move out Kafka config from ingest and consolidate
wire.FieldsOf(new(config.KafkaIngestConfiguration), "KafkaConfiguration"),
wire.FieldsOf(new(config.Configuration), "Ingest"),
wire.FieldsOf(new(config.IngestConfiguration), "Kafka"),
wire.FieldsOf(new(config.KafkaIngestConfiguration), "TopicProvisionerConfig"),
// Namespace
wire.FieldsOf(new(config.Configuration), "Namespace"),
// Notification
wire.FieldsOf(new(config.Configuration), "Notification"),
// ProductCatalog
wire.FieldsOf(new(config.Configuration), "ProductCatalog"),
// Svix
wire.FieldsOf(new(config.Configuration), "Svix"),
// Telemetry
wire.FieldsOf(new(config.Configuration), "Telemetry"),
wire.FieldsOf(new(config.TelemetryConfig), "Metrics"),
wire.FieldsOf(new(config.TelemetryConfig), "Trace"),
wire.FieldsOf(new(config.TelemetryConfig), "Log"),
)
36 changes: 36 additions & 0 deletions app/common/customer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package common

import (
"fmt"
"log/slog"

"github.com/google/wire"

"github.com/openmeterio/openmeter/openmeter/customer"
customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter"
customerservice "github.com/openmeterio/openmeter/openmeter/customer/service"
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
)

var Customer = wire.NewSet(
NewCustomerService,
)

func NewCustomerService(logger *slog.Logger, db *entdb.Client) (customer.Service, error) {
// TODO: remove this check after enabled by default
if db == nil {
return nil, nil
}

customerAdapter, err := customeradapter.New(customeradapter.Config{
Client: db,
Logger: logger.WithGroup("customer.postgres"),
})
if err != nil {
return nil, fmt.Errorf("failed to create customer adapter: %w", err)
}

return customerservice.New(customerservice.Config{
Adapter: customerAdapter,
})
}
10 changes: 10 additions & 0 deletions app/common/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"

"github.com/google/wire"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"

Expand All @@ -16,6 +17,15 @@ import (
"github.com/openmeterio/openmeter/tools/migrate"
)

var Database = wire.NewSet(
wire.Struct(new(Migrator), "*"),

NewPostgresDriver,
NewDB,
NewEntPostgresDriver,
NewEntClient,
)

// Migrator executes database migrations.
type Migrator struct {
Config config.PostgresConfig
Expand Down
41 changes: 41 additions & 0 deletions app/common/entitlement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package common

import (
"log/slog"

"github.com/google/wire"

"github.com/openmeterio/openmeter/app/config"
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
"github.com/openmeterio/openmeter/openmeter/meter"
"github.com/openmeterio/openmeter/openmeter/registry"
registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
)

var Entitlement = wire.NewSet(
NewEntitlementRegistry,
)

func NewEntitlementRegistry(
logger *slog.Logger,
db *entdb.Client,
entitlementConfig config.EntitlementsConfiguration,
streamingConnector streaming.Connector,
meterRepository meter.Repository,
eventPublisher eventbus.Publisher,
) *registry.Entitlement {
// TODO: remove this check after enabled by default
if db == nil {
return nil
}

return registrybuilder.GetEntitlementRegistry(registrybuilder.EntitlementOptions{
DatabaseClient: db,
StreamingConnector: streamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Publisher: eventPublisher,
})
}
8 changes: 8 additions & 0 deletions app/common/framework.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package common

import "github.com/google/wire"

var Framework = wire.NewSet(
wire.Struct(new(GlobalInitializer), "*"),
wire.Struct(new(Runner), "*"),
)
Loading

0 comments on commit 36bd2f8

Please sign in to comment.