Skip to content

Commit

Permalink
[usage] Implment ledger control loop
Browse files Browse the repository at this point in the history
  • Loading branch information
easyCZ authored and roboquat committed Sep 6, 2022
1 parent f7008c1 commit a03681a
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 10 deletions.
101 changes: 98 additions & 3 deletions components/usage/pkg/apiv1/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/google/uuid"
"math"
"time"

Expand All @@ -26,7 +27,9 @@ import (
var _ v1.UsageServiceServer = (*UsageService)(nil)

type UsageService struct {
conn *gorm.DB
conn *gorm.DB
nowFunc func() time.Time
pricer *WorkspacePricer

contentService contentservice.Interface

Expand Down Expand Up @@ -193,19 +196,24 @@ func (s *UsageService) ReconcileUsageWithLedger(ctx context.Context, req *v1.Rec
return nil, status.Errorf(codes.InvalidArgument, "To must not be before From")
}

now := s.nowFunc()

var instances []db.WorkspaceInstanceForUsage
stopped, err := db.FindStoppedWorkspaceInstancesInRange(ctx, s.conn, from, to)
if err != nil {
logger.WithError(err).Errorf("Failed to find stopped workspace instances.")
return nil, status.Errorf(codes.Internal, "failed to query for stopped instances")
}
logger.Infof("Found %d stopped workspace instances in range.", len(stopped))
instances = append(instances, stopped...)

running, err := db.FindRunningWorkspaceInstances(ctx, s.conn)
if err != nil {
logger.WithError(err).Errorf("Failed to find running workspace instances.")
return nil, status.Errorf(codes.Internal, "failed to query for running instances")
}
logger.Infof("Found %d running workspaces since the beginning of time.", len(running))
instances = append(instances, running...)

usageDrafts, err := db.FindAllDraftUsage(ctx, s.conn)
if err != nil {
Expand All @@ -214,12 +222,99 @@ func (s *UsageService) ReconcileUsageWithLedger(ctx context.Context, req *v1.Rec
}
logger.Infof("Found %d draft usage records.", len(usageDrafts))

instancesWithUsageInDraft, err := db.FindWorkspaceInstancesByIds(ctx, s.conn, collectWorkspaceInstanceIDs(usageDrafts))
if err != nil {
logger.WithError(err).Errorf("Failed to find workspace instances for usage records in draft.")
return nil, status.Errorf(codes.Internal, "failed to find workspace instances for usage records in draft state")
}
logger.Infof("Found %d workspaces instances for usage records in draft.", len(instancesWithUsageInDraft))
instances = append(instances, instancesWithUsageInDraft...)

inserts, updates := reconcileUsageWithLedger(instances, usageDrafts, s.pricer, now)
logger.WithField("inserts", inserts).WithField("updates", updates).Infof("Identified %d inserts and %d updates against usage records.", len(inserts), len(updates))

return &v1.ReconcileUsageWithLedgerResponse{}, nil
}

func NewUsageService(conn *gorm.DB, reportGenerator *ReportGenerator, contentSvc contentservice.Interface) *UsageService {
func reconcileUsageWithLedger(instances []db.WorkspaceInstanceForUsage, drafts []db.Usage, pricer *WorkspacePricer, now time.Time) (inserts []db.Usage, updates []db.Usage) {

instancesByID := dedupeWorkspaceInstancesForUsage(instances)

draftsByWorkspaceID := map[uuid.UUID]db.Usage{}
for _, draft := range drafts {
draftsByWorkspaceID[draft.WorkspaceInstanceID] = draft
}

for instanceID, instance := range instancesByID {
if usage, exists := draftsByWorkspaceID[instanceID]; exists {
updates = append(updates, updateUsageFromInstance(instance, usage, pricer, now))
continue
}

inserts = append(inserts, newUsageFromInstance(instance, pricer, now))
}

return inserts, updates
}

const usageDescriptionFromController = "Usage collected by automated system."

func newUsageFromInstance(instance db.WorkspaceInstanceForUsage, pricer *WorkspacePricer, now time.Time) db.Usage {
draft := true
if instance.StoppingTime.IsSet() {
draft = false
}

effectiveTime := now
if instance.StoppingTime.IsSet() {
effectiveTime = instance.StoppingTime.Time()
}

return db.Usage{
ID: uuid.New(),
AttributionID: instance.UsageAttributionID,
Description: usageDescriptionFromController,
CreditCents: db.NewCreditCents(pricer.CreditsUsedByInstance(&instance, now)),
EffectiveTime: db.NewVarcharTime(effectiveTime),
Kind: db.WorkspaceInstanceUsageKind,
WorkspaceInstanceID: instance.ID,
Draft: draft,
Metadata: nil,
}
}

func updateUsageFromInstance(instance db.WorkspaceInstanceForUsage, usage db.Usage, pricer *WorkspacePricer, now time.Time) db.Usage {
// We construct a new record to ensure we always take the data from the source of truth - the workspace instance
updated := newUsageFromInstance(instance, pricer, now)
// but we override the ID to the one we already have
updated.ID = usage.ID

return updated
}

func collectWorkspaceInstanceIDs(usage []db.Usage) []uuid.UUID {
var ids []uuid.UUID
for _, u := range usage {
ids = append(ids, u.WorkspaceInstanceID)
}
return ids
}

func dedupeWorkspaceInstancesForUsage(instances []db.WorkspaceInstanceForUsage) map[uuid.UUID]db.WorkspaceInstanceForUsage {
set := map[uuid.UUID]db.WorkspaceInstanceForUsage{}
for _, instance := range instances {
set[instance.ID] = instance
}
return set
}

func NewUsageService(conn *gorm.DB, reportGenerator *ReportGenerator, contentSvc contentservice.Interface, pricer *WorkspacePricer) *UsageService {
return &UsageService{
conn: conn,
conn: conn,
nowFunc: func() time.Time {
return time.Now().UTC()
},
pricer: pricer,
reportGenerator: reportGenerator,
contentService: contentSvc,
}
Expand Down
108 changes: 105 additions & 3 deletions components/usage/pkg/apiv1/usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestUsageService_ListBilledUsage(t *testing.T) {
)

generator := NewReportGenerator(dbconn, DefaultWorkspacePricer)
v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, generator, nil))
v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, generator, nil, DefaultWorkspacePricer))
baseserver.StartServerForTests(t, srv)

conn, err := grpc.Dial(srv.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestUsageService_ListBilledUsage_Pagination(t *testing.T) {
)

generator := NewReportGenerator(dbconn, DefaultWorkspacePricer)
v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, generator, nil))
v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, generator, nil, DefaultWorkspacePricer))
baseserver.StartServerForTests(t, srv)

conn, err := grpc.Dial(srv.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down Expand Up @@ -580,7 +580,7 @@ func TestUsageService_ReconcileUsageWithLedger(t *testing.T) {
baseserver.WithGRPC(baseserver.MustUseRandomLocalAddress(t)),
)

v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, nil, nil))
v1.RegisterUsageServiceServer(srv.GRPC(), NewUsageService(dbconn, nil, nil, DefaultWorkspacePricer))
baseserver.StartServerForTests(t, srv)

conn, err := grpc.Dial(srv.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand All @@ -594,3 +594,105 @@ func TestUsageService_ReconcileUsageWithLedger(t *testing.T) {
})
require.NoError(t, err)
}

func TestReconcileWithLedger(t *testing.T) {
now := time.Date(2022, 9, 1, 10, 0, 0, 0, time.UTC)
pricer, err := NewWorkspacePricer(map[string]float64{
"default": 0.1666666667,
"g1-standard": 0.1666666667,
"g1-standard-pvc": 0.1666666667,
"g1-large": 0.3333333333,
"g1-large-pvc": 0.3333333333,
"gitpodio-internal-xl": 0.3333333333,
})
require.NoError(t, err)

t.Run("no action with no instances and no drafts", func(t *testing.T) {
inserts, updates := reconcileUsageWithLedger(nil, nil, pricer, now)
require.Len(t, inserts, 0)
require.Len(t, updates, 0)
})

t.Run("no action with no instances but existing drafts", func(t *testing.T) {
drafts := []db.Usage{dbtest.NewUsage(t, db.Usage{})}
inserts, updates := reconcileUsageWithLedger(nil, drafts, pricer, now)
require.Len(t, inserts, 0)
require.Len(t, updates, 0)
})

t.Run("creates a new usage record when no draft exists, removing duplicates", func(t *testing.T) {
instance := db.WorkspaceInstanceForUsage{
ID: uuid.New(),
WorkspaceID: dbtest.GenerateWorkspaceID(),
OwnerID: uuid.New(),
ProjectID: sql.NullString{
String: "my-project",
Valid: true,
},
WorkspaceClass: db.WorkspaceClass_Default,
Type: db.WorkspaceType_Regular,
UsageAttributionID: db.NewTeamAttributionID(uuid.New().String()),
CreationTime: db.NewVarcharTime(now.Add(1 * time.Minute)),
}

inserts, updates := reconcileUsageWithLedger([]db.WorkspaceInstanceForUsage{instance, instance}, nil, pricer, now)
require.Len(t, inserts, 1)
require.Len(t, updates, 0)
require.Equal(t, db.Usage{
ID: inserts[0].ID,
AttributionID: instance.UsageAttributionID,
Description: usageDescriptionFromController,
CreditCents: db.NewCreditCents(pricer.CreditsUsedByInstance(&instance, now)),
EffectiveTime: db.NewVarcharTime(now),
Kind: db.WorkspaceInstanceUsageKind,
WorkspaceInstanceID: instance.ID,
Draft: true,
Metadata: nil,
}, inserts[0])
})

t.Run("updates a usage record when a draft exists", func(t *testing.T) {
instance := db.WorkspaceInstanceForUsage{
ID: uuid.New(),
WorkspaceID: dbtest.GenerateWorkspaceID(),
OwnerID: uuid.New(),
ProjectID: sql.NullString{
String: "my-project",
Valid: true,
},
WorkspaceClass: db.WorkspaceClass_Default,
Type: db.WorkspaceType_Regular,
UsageAttributionID: db.NewTeamAttributionID(uuid.New().String()),
CreationTime: db.NewVarcharTime(now.Add(1 * time.Minute)),
}

// the fields in the usage record deliberately do not match the instance, except for the Instance ID.
// we do this to test that the fields in the usage records get updated to reflect the true values from the source of truth - instances.
draft := dbtest.NewUsage(t, db.Usage{
ID: uuid.New(),
AttributionID: db.NewUserAttributionID(uuid.New().String()),
Description: "Some description",
CreditCents: 1,
EffectiveTime: db.VarcharTime{},
Kind: db.WorkspaceInstanceUsageKind,
WorkspaceInstanceID: instance.ID,
Draft: true,
Metadata: nil,
})

inserts, updates := reconcileUsageWithLedger([]db.WorkspaceInstanceForUsage{instance}, []db.Usage{draft}, pricer, now)
require.Len(t, inserts, 0)
require.Len(t, updates, 1)
require.Equal(t, db.Usage{
ID: draft.ID,
AttributionID: instance.UsageAttributionID,
Description: usageDescriptionFromController,
CreditCents: db.NewCreditCents(pricer.CreditsUsedByInstance(&instance, now)),
EffectiveTime: db.NewVarcharTime(now),
Kind: db.WorkspaceInstanceUsageKind,
WorkspaceInstanceID: instance.ID,
Draft: true,
Metadata: nil,
}, updates[0])
})
}
14 changes: 13 additions & 1 deletion components/usage/pkg/db/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package db
import (
"context"
"fmt"
"math"

"github.com/google/uuid"
"gorm.io/datatypes"
Expand All @@ -21,11 +22,22 @@ const (
InvoiceUsageKind = "invoice"
)

func NewCreditCents(n float64) CreditCents {
inCents := n * 100
return CreditCents(int64(math.Round(inCents)))
}

type CreditCents int64

func (cc CreditCents) ToCredits() float64 {
return float64(cc) / 100
}

type Usage struct {
ID uuid.UUID `gorm:"primary_key;column:id;type:char;size:36;" json:"id"`
AttributionID AttributionID `gorm:"column:attributionId;type:varchar;size:255;" json:"attributionId"`
Description string `gorm:"column:description;type:varchar;size:255;" json:"description"`
CreditCents int64 `gorm:"column:creditCents;type:bigint;" json:"creditCents"`
CreditCents CreditCents `gorm:"column:creditCents;type:bigint;" json:"creditCents"`
EffectiveTime VarcharTime `gorm:"column:effectiveTime;type:varchar;size:255;" json:"effectiveTime"`
Kind UsageKind `gorm:"column:kind;type:char;size:10;" json:"kind"`
WorkspaceInstanceID uuid.UUID `gorm:"column:workspaceInstanceId;type:char;size:36;" json:"workspaceInstanceId"`
Expand Down
58 changes: 58 additions & 0 deletions components/usage/pkg/db/usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,61 @@ func TestFindAllDraftUsage(t *testing.T) {
require.True(t, usage.Draft)
}
}

func TestCreditCents(t *testing.T) {
for _, s := range []struct {
value float64
expected db.CreditCents
expectedAsFloat float64
}{
{
value: 0,
expected: 0,
expectedAsFloat: 0,
},
{
value: 0.1,
expected: 10,
expectedAsFloat: 0.1,
},
{
value: 1.1111,
expected: 111,
expectedAsFloat: 1.11,
},
{
value: 1.4999,
expected: 150,
expectedAsFloat: 1.50,
},
{
value: 1.500,
expected: 150,
expectedAsFloat: 1.50,
},
{
value: 1.501,
expected: 150,
expectedAsFloat: 1.50,
},
{
value: 1.50999,
expected: 151,
expectedAsFloat: 1.51,
},
{
value: 1.9999,
expected: 200,
expectedAsFloat: 2.00,
},
{
value: -1.9999,
expected: -200,
expectedAsFloat: -2.00,
},
} {
cc := db.NewCreditCents(s.value)
require.Equal(t, s.expected, cc)
require.Equal(t, s.expectedAsFloat, cc.ToCredits())
}
}
6 changes: 3 additions & 3 deletions components/usage/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func Start(cfg Config) error {

reportGenerator := apiv1.NewReportGenerator(conn, pricer)

err = registerGRPCServices(srv, conn, stripeClient, reportGenerator, contentService, *cfg.BillInstancesAfter)
err = registerGRPCServices(srv, conn, stripeClient, reportGenerator, contentService, pricer, *cfg.BillInstancesAfter)
if err != nil {
return fmt.Errorf("failed to register gRPC services: %w", err)
}
Expand All @@ -180,8 +180,8 @@ func Start(cfg Config) error {
return nil
}

func registerGRPCServices(srv *baseserver.Server, conn *gorm.DB, stripeClient *stripe.Client, reportGenerator *apiv1.ReportGenerator, contentSvc contentservice.Interface, billInstancesAfter time.Time) error {
v1.RegisterUsageServiceServer(srv.GRPC(), apiv1.NewUsageService(conn, reportGenerator, contentSvc))
func registerGRPCServices(srv *baseserver.Server, conn *gorm.DB, stripeClient *stripe.Client, reportGenerator *apiv1.ReportGenerator, contentSvc contentservice.Interface, pricer *apiv1.WorkspacePricer, billInstancesAfter time.Time) error {
v1.RegisterUsageServiceServer(srv.GRPC(), apiv1.NewUsageService(conn, reportGenerator, contentSvc, pricer))
if stripeClient == nil {
v1.RegisterBillingServiceServer(srv.GRPC(), &apiv1.BillingServiceNoop{})
} else {
Expand Down

0 comments on commit a03681a

Please sign in to comment.