Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[usage] Implement ledger control loop #12662

Merged
merged 1 commit into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 98 additions & 3 deletions components/usage/pkg/apiv1/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/google/uuid"
"math"
"time"

Expand All @@ -26,7 +27,9 @@ import (
var _ v1.UsageServiceServer = (*UsageService)(nil)

type UsageService struct {
conn *gorm.DB
conn *gorm.DB
nowFunc func() time.Time
pricer *WorkspacePricer

contentService contentservice.Interface

Expand Down Expand Up @@ -193,19 +196,24 @@ func (s *UsageService) ReconcileUsageWithLedger(ctx context.Context, req *v1.Rec
return nil, status.Errorf(codes.InvalidArgument, "To must not be before From")
}

now := s.nowFunc()

var instances []db.WorkspaceInstanceForUsage
stopped, err := db.FindStoppedWorkspaceInstancesInRange(ctx, s.conn, from, to)
if err != nil {
logger.WithError(err).Errorf("Failed to find stopped workspace instances.")
return nil, status.Errorf(codes.Internal, "failed to query for stopped instances")
}
logger.Infof("Found %d stopped workspace instances in range.", len(stopped))
instances = append(instances, stopped...)

running, err := db.FindRunningWorkspaceInstances(ctx, s.conn)
if err != nil {
logger.WithError(err).Errorf("Failed to find running workspace instances.")
return nil, status.Errorf(codes.Internal, "failed to query for running instances")
}
logger.Infof("Found %d running workspaces since the beginning of time.", len(running))
instances = append(instances, running...)

usageDrafts, err := db.FindAllDraftUsage(ctx, s.conn)
if err != nil {
Expand All @@ -214,12 +222,99 @@ func (s *UsageService) ReconcileUsageWithLedger(ctx context.Context, req *v1.Rec
}
logger.Infof("Found %d draft usage records.", len(usageDrafts))

instancesWithUsageInDraft, err := db.FindWorkspaceInstancesByIds(ctx, s.conn, collectWorkspaceInstanceIDs(usageDrafts))
if err != nil {
logger.WithError(err).Errorf("Failed to find workspace instances for usage records in draft.")
return nil, status.Errorf(codes.Internal, "failed to find workspace instances for usage records in draft state")
}
logger.Infof("Found %d workspaces instances for usage records in draft.", len(instancesWithUsageInDraft))
instances = append(instances, instancesWithUsageInDraft...)

inserts, updates := reconcileUsageWithLedger(instances, usageDrafts, s.pricer, now)
logger.WithField("inserts", inserts).WithField("updates", updates).Infof("Identified %d inserts and %d updates against usage records.", len(inserts), len(updates))

return &v1.ReconcileUsageWithLedgerResponse{}, nil
}

func NewUsageService(conn *gorm.DB, reportGenerator *ReportGenerator, contentSvc contentservice.Interface) *UsageService {
func reconcileUsageWithLedger(instances []db.WorkspaceInstanceForUsage, drafts []db.Usage, pricer *WorkspacePricer, now time.Time) (inserts []db.Usage, updates []db.Usage) {

instancesByID := dedupeWorkspaceInstancesForUsage(instances)

draftsByWorkspaceID := map[uuid.UUID]db.Usage{}
for _, draft := range drafts {
draftsByWorkspaceID[draft.WorkspaceInstanceID] = draft
}

for instanceID, instance := range instancesByID {
if usage, exists := draftsByWorkspaceID[instanceID]; exists {
updates = append(updates, updateUsageFromInstance(instance, usage, pricer, now))
continue
}

inserts = append(inserts, newUsageFromInstance(instance, pricer, now))
}

return inserts, updates
}

const usageDescriptionFromController = "Usage collected by automated system."

func newUsageFromInstance(instance db.WorkspaceInstanceForUsage, pricer *WorkspacePricer, now time.Time) db.Usage {
draft := true
if instance.StoppingTime.IsSet() {
draft = false
}

effectiveTime := now
if instance.StoppingTime.IsSet() {
effectiveTime = instance.StoppingTime.Time()
}

return db.Usage{
ID: uuid.New(),
AttributionID: instance.UsageAttributionID,
Description: usageDescriptionFromController,
CreditCents: db.NewCreditCents(pricer.CreditsUsedByInstance(&instance, now)),
EffectiveTime: db.NewVarcharTime(effectiveTime),
Kind: db.WorkspaceInstanceUsageKind,
WorkspaceInstanceID: instance.ID,
Draft: draft,
Metadata: nil,
}
}

func updateUsageFromInstance(instance db.WorkspaceInstanceForUsage, usage db.Usage, pricer *WorkspacePricer, now time.Time) db.Usage {
// We construct a new record to ensure we always take the data from the source of truth - the workspace instance
updated := newUsageFromInstance(instance, pricer, now)
// but we override the ID to the one we already have
updated.ID = usage.ID

return updated
}

func collectWorkspaceInstanceIDs(usage []db.Usage) []uuid.UUID {
var ids []uuid.UUID
for _, u := range usage {
ids = append(ids, u.WorkspaceInstanceID)
}
return ids
}

func dedupeWorkspaceInstancesForUsage(instances []db.WorkspaceInstanceForUsage) map[uuid.UUID]db.WorkspaceInstanceForUsage {
set := map[uuid.UUID]db.WorkspaceInstanceForUsage{}
for _, instance := range instances {
set[instance.ID] = instance
}
return set
}

func NewUsageService(conn *gorm.DB, reportGenerator *ReportGenerator, contentSvc contentservice.Interface, pricer *WorkspacePricer) *UsageService {
return &UsageService{
conn: conn,
conn: conn,
nowFunc: func() time.Time {
return time.Now().UTC()
},
pricer: pricer,
reportGenerator: reportGenerator,
contentService: contentSvc,
}
Expand Down
108 changes: 105 additions & 3 deletions components/usage/pkg/apiv1/usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestUsageService_ListBilledUsage(t *testing.T) {
)

generator := NewReportGenerator(dbconn, DefaultWorkspacePricer)
v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, generator, nil))
v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, generator, nil, DefaultWorkspacePricer))
baseserver.StartServerForTests(t, srv)

conn, err := grpc.Dial(srv.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestUsageService_ListBilledUsage_Pagination(t *testing.T) {
)

generator := NewReportGenerator(dbconn, DefaultWorkspacePricer)
v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, generator, nil))
v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, generator, nil, DefaultWorkspacePricer))
baseserver.StartServerForTests(t, srv)

conn, err := grpc.Dial(srv.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down Expand Up @@ -580,7 +580,7 @@ func TestUsageService_ReconcileUsageWithLedger(t *testing.T) {
baseserver.WithGRPC(baseserver.MustUseRandomLocalAddress(t)),
)

v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, nil, nil))
v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, nil, nil, DefaultWorkspacePricer))
baseserver.StartServerForTests(t, srv)

conn, err := grpc.Dial(srv.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand All @@ -594,3 +594,105 @@ func TestUsageService_ReconcileUsageWithLedger(t *testing.T) {
})
require.NoError(t, err)
}

func TestReconcileWithLedger(t *testing.T) {
now := time.Date(2022, 9, 1, 10, 0, 0, 0, time.UTC)
pricer, err := NewWorkspacePricer(map[string]float64{
"default": 0.1666666667,
"g1-standard": 0.1666666667,
"g1-standard-pvc": 0.1666666667,
"g1-large": 0.3333333333,
"g1-large-pvc": 0.3333333333,
"gitpodio-internal-xl": 0.3333333333,
})
require.NoError(t, err)

t.Run("no action with no instances and no drafts", func(t *testing.T) {
inserts, updates := reconcileUsageWithLedger(nil, nil, pricer, now)
require.Len(t, inserts, 0)
require.Len(t, updates, 0)
})

t.Run("no action with no instances but existing drafts", func(t *testing.T) {
drafts := []db.Usage{dbtest.NewUsage(t, db.Usage{})}
inserts, updates := reconcileUsageWithLedger(nil, drafts, pricer, now)
require.Len(t, inserts, 0)
require.Len(t, updates, 0)
})

t.Run("creates a new usage record when no draft exists, removing duplicates", func(t *testing.T) {
instance := db.WorkspaceInstanceForUsage{
ID: uuid.New(),
WorkspaceID: dbtest.GenerateWorkspaceID(),
OwnerID: uuid.New(),
ProjectID: sql.NullString{
String: "my-project",
Valid: true,
},
WorkspaceClass: db.WorkspaceClass_Default,
Type: db.WorkspaceType_Regular,
UsageAttributionID: db.NewTeamAttributionID(uuid.New().String()),
CreationTime: db.NewVarcharTime(now.Add(1 * time.Minute)),
}

inserts, updates := reconcileUsageWithLedger([]db.WorkspaceInstanceForUsage{instance, instance}, nil, pricer, now)
require.Len(t, inserts, 1)
require.Len(t, updates, 0)
require.Equal(t, db.Usage{
ID: inserts[0].ID,
AttributionID: instance.UsageAttributionID,
Description: usageDescriptionFromController,
CreditCents: db.NewCreditCents(pricer.CreditsUsedByInstance(&instance, now)),
EffectiveTime: db.NewVarcharTime(now),
Kind: db.WorkspaceInstanceUsageKind,
WorkspaceInstanceID: instance.ID,
Draft: true,
Metadata: nil,
}, inserts[0])
})

t.Run("updates a usage record when a draft exists", func(t *testing.T) {
instance := db.WorkspaceInstanceForUsage{
ID: uuid.New(),
WorkspaceID: dbtest.GenerateWorkspaceID(),
OwnerID: uuid.New(),
ProjectID: sql.NullString{
String: "my-project",
Valid: true,
},
WorkspaceClass: db.WorkspaceClass_Default,
Type: db.WorkspaceType_Regular,
UsageAttributionID: db.NewTeamAttributionID(uuid.New().String()),
CreationTime: db.NewVarcharTime(now.Add(1 * time.Minute)),
}

// the fields in the usage record deliberately do not match the instance, except for the Instance ID.
// we do this to test that the fields in the usage records get updated to reflect the true values from the source of truth - instances.
draft := dbtest.NewUsage(t, db.Usage{
ID: uuid.New(),
AttributionID: db.NewUserAttributionID(uuid.New().String()),
Description: "Some description",
CreditCents: 1,
EffectiveTime: db.VarcharTime{},
Kind: db.WorkspaceInstanceUsageKind,
WorkspaceInstanceID: instance.ID,
Draft: true,
Metadata: nil,
})

inserts, updates := reconcileUsageWithLedger([]db.WorkspaceInstanceForUsage{instance}, []db.Usage{draft}, pricer, now)
require.Len(t, inserts, 0)
require.Len(t, updates, 1)
require.Equal(t, db.Usage{
ID: draft.ID,
AttributionID: instance.UsageAttributionID,
Description: usageDescriptionFromController,
CreditCents: db.NewCreditCents(pricer.CreditsUsedByInstance(&instance, now)),
EffectiveTime: db.NewVarcharTime(now),
Kind: db.WorkspaceInstanceUsageKind,
WorkspaceInstanceID: instance.ID,
Draft: true,
Metadata: nil,
}, updates[0])
})
}
14 changes: 13 additions & 1 deletion components/usage/pkg/db/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package db
import (
"context"
"fmt"
"math"

"github.com/google/uuid"
"gorm.io/datatypes"
Expand All @@ -21,11 +22,22 @@ const (
InvoiceUsageKind = "invoice"
)

func NewCreditCents(n float64) CreditCents {
inCents := n * 100
return CreditCents(int64(math.Round(inCents)))
}

type CreditCents int64

func (cc CreditCents) ToCredits() float64 {
return float64(cc) / 100
}

type Usage struct {
ID uuid.UUID `gorm:"primary_key;column:id;type:char;size:36;" json:"id"`
AttributionID AttributionID `gorm:"column:attributionId;type:varchar;size:255;" json:"attributionId"`
Description string `gorm:"column:description;type:varchar;size:255;" json:"description"`
CreditCents int64 `gorm:"column:creditCents;type:bigint;" json:"creditCents"`
CreditCents CreditCents `gorm:"column:creditCents;type:bigint;" json:"creditCents"`
EffectiveTime VarcharTime `gorm:"column:effectiveTime;type:varchar;size:255;" json:"effectiveTime"`
Kind UsageKind `gorm:"column:kind;type:char;size:10;" json:"kind"`
WorkspaceInstanceID uuid.UUID `gorm:"column:workspaceInstanceId;type:char;size:36;" json:"workspaceInstanceId"`
Expand Down
58 changes: 58 additions & 0 deletions components/usage/pkg/db/usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,61 @@ func TestFindAllDraftUsage(t *testing.T) {
require.True(t, usage.Draft)
}
}

func TestCreditCents(t *testing.T) {
for _, s := range []struct {
value float64
expected db.CreditCents
expectedAsFloat float64
}{
{
value: 0,
expected: 0,
expectedAsFloat: 0,
},
{
value: 0.1,
expected: 10,
expectedAsFloat: 0.1,
},
{
value: 1.1111,
expected: 111,
expectedAsFloat: 1.11,
},
{
value: 1.4999,
expected: 150,
expectedAsFloat: 1.50,
},
{
value: 1.500,
expected: 150,
expectedAsFloat: 1.50,
},
{
value: 1.501,
expected: 150,
expectedAsFloat: 1.50,
},
{
value: 1.50999,
expected: 151,
expectedAsFloat: 1.51,
},
{
value: 1.9999,
expected: 200,
expectedAsFloat: 2.00,
},
{
value: -1.9999,
expected: -200,
expectedAsFloat: -2.00,
},
} {
cc := db.NewCreditCents(s.value)
require.Equal(t, s.expected, cc)
require.Equal(t, s.expectedAsFloat, cc.ToCredits())
}
}
6 changes: 3 additions & 3 deletions components/usage/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func Start(cfg Config) error {

reportGenerator := apiv1.NewReportGenerator(conn, pricer)

err = registerGRPCServices(srv, conn, stripeClient, reportGenerator, contentService, *cfg.BillInstancesAfter)
err = registerGRPCServices(srv, conn, stripeClient, reportGenerator, contentService, pricer, *cfg.BillInstancesAfter)
if err != nil {
return fmt.Errorf("failed to register gRPC services: %w", err)
}
Expand All @@ -180,8 +180,8 @@ func Start(cfg Config) error {
return nil
}

func registerGRPCServices(srv *baseserver.Server, conn *gorm.DB, stripeClient *stripe.Client, reportGenerator *apiv1.ReportGenerator, contentSvc contentservice.Interface, billInstancesAfter time.Time) error {
v1.RegisterUsageServiceServer(srv.GRPC(), apiv1.NewUsageService(conn, reportGenerator, contentSvc))
func registerGRPCServices(srv *baseserver.Server, conn *gorm.DB, stripeClient *stripe.Client, reportGenerator *apiv1.ReportGenerator, contentSvc contentservice.Interface, pricer *apiv1.WorkspacePricer, billInstancesAfter time.Time) error {
v1.RegisterUsageServiceServer(srv.GRPC(), apiv1.NewUsageService(conn, reportGenerator, contentSvc, pricer))
if stripeClient == nil {
v1.RegisterBillingServiceServer(srv.GRPC(), &apiv1.BillingServiceNoop{})
} else {
Expand Down