Skip to content

[usage] Implement CollectUsage #11681

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package controller
package apiv1

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package controller
package apiv1

import (
"testing"
Expand Down
126 changes: 126 additions & 0 deletions components/usage/pkg/apiv1/usage-report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package apiv1

import (
"context"
"fmt"
"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/usage/pkg/db"
"gorm.io/gorm"
"time"
)

type InvalidSession struct {
Reason string
Session db.WorkspaceInstanceForUsage
}

type UsageReport struct {
GenerationTime time.Time

From time.Time
To time.Time

RawSessions []db.WorkspaceInstanceForUsage
InvalidSessions []InvalidSession

UsageRecords []db.WorkspaceInstanceUsage
}

func NewReportGenerator(conn *gorm.DB, pricer *WorkspacePricer) *ReportGenerator {
return &ReportGenerator{
conn: conn,
pricer: pricer,
nowFunc: time.Now,
}
}

type ReportGenerator struct {
conn *gorm.DB
pricer *WorkspacePricer
nowFunc func() time.Time
}

func (g *ReportGenerator) GenerateUsageReport(ctx context.Context, from, to time.Time) (UsageReport, error) {
now := g.nowFunc().UTC()
log.Infof("Gathering usage data from %s to %s", from, to)

report := UsageReport{
GenerationTime: now,
From: from,
To: to,
}

instances, err := db.ListWorkspaceInstancesInRange(ctx, g.conn, from, to)
if err != nil {
return report, fmt.Errorf("failed to list instances from db: %w", err)
}
report.RawSessions = instances

valid, invalid := validateInstances(instances)
report.InvalidSessions = invalid

if len(invalid) > 0 {
log.WithField("invalid_workspace_instances", invalid).Errorf("Detected %d invalid instances. These will be skipped in the current run.", len(invalid))
}
log.WithField("workspace_instances", instances).Debug("Successfully loaded workspace instances.")

trimmed := trimStartStopTime(valid, from, to)

report.UsageRecords = instancesToUsageRecords(trimmed, g.pricer, now)
return report, nil
}

func validateInstances(instances []db.WorkspaceInstanceForUsage) (valid []db.WorkspaceInstanceForUsage, invalid []InvalidSession) {
for _, i := range instances {
// i is a pointer to the current element, we need to assign it to ensure we're copying the value, not the current pointer.
instance := i

// Each instance must have a start time, without it, we do not have a baseline for usage computation.
if !instance.CreationTime.IsSet() {
invalid = append(invalid, InvalidSession{
Reason: "missing creation time",
Session: instance,
})
continue
}

start := instance.CreationTime.Time()

// Currently running instances do not have a stopped time set, so we ignore these.
if instance.StoppedTime.IsSet() {
stop := instance.StoppedTime.Time()
if stop.Before(start) {
invalid = append(invalid, InvalidSession{
Reason: "stop time is before start time",
Session: instance,
})
continue
}
}

valid = append(valid, instance)
}
return valid, invalid
}

// trimStartStopTime ensures that start time or stop time of an instance is never outside of specified start or stop time range.
func trimStartStopTime(instances []db.WorkspaceInstanceForUsage, maximumStart, minimumStop time.Time) []db.WorkspaceInstanceForUsage {
var updated []db.WorkspaceInstanceForUsage

for _, instance := range instances {
if instance.CreationTime.Time().Before(maximumStart) {
instance.CreationTime = db.NewVarcharTime(maximumStart)
}

if instance.StoppedTime.Time().After(minimumStop) {
instance.StoppedTime = db.NewVarcharTime(minimumStop)
}

updated = append(updated, instance)
}
return updated
}
112 changes: 107 additions & 5 deletions components/usage/pkg/apiv1/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
package apiv1

import (
context "context"
"context"
"database/sql"
"fmt"
"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/usage/pkg/contentservice"
"time"

v1 "github.com/gitpod-io/gitpod/usage-api/v1"
Expand All @@ -21,12 +24,17 @@ var _ v1.UsageServiceServer = (*UsageService)(nil)

type UsageService struct {
conn *gorm.DB

contentService contentservice.Interface

reportGenerator *ReportGenerator

v1.UnimplementedUsageServiceServer
}

const maxQuerySize = 31 * 24 * time.Hour

func (us *UsageService) ListBilledUsage(ctx context.Context, in *v1.ListBilledUsageRequest) (*v1.ListBilledUsageResponse, error) {
func (s *UsageService) ListBilledUsage(ctx context.Context, in *v1.ListBilledUsageRequest) (*v1.ListBilledUsageResponse, error) {
to := time.Now()
if in.To != nil {
to = in.To.AsTime()
Expand All @@ -52,7 +60,7 @@ func (us *UsageService) ListBilledUsage(ctx context.Context, in *v1.ListBilledUs
order = db.DescendingOrder
}

usageRecords, err := db.ListUsage(ctx, us.conn, db.AttributionID(in.GetAttributionId()), from, to, order)
usageRecords, err := db.ListUsage(ctx, s.conn, db.AttributionID(in.GetAttributionId()), from, to, order)
if err != nil {
log.Log.
WithField("attribution_id", in.AttributionId).
Expand Down Expand Up @@ -88,6 +96,100 @@ func (us *UsageService) ListBilledUsage(ctx context.Context, in *v1.ListBilledUs
}, nil
}

func NewUsageService(conn *gorm.DB) *UsageService {
return &UsageService{conn: conn}
func (s *UsageService) ReconcileUsage(ctx context.Context, req *v1.ReconcileUsageRequest) (*v1.ReconcileUsageResponse, error) {
from := req.GetStartTime().AsTime()
to := req.GetEndTime().AsTime()

if to.Before(from) {
return nil, status.Errorf(codes.InvalidArgument, "End time must be after start time")
}

report, err := s.reportGenerator.GenerateUsageReport(ctx, from, to)
if err != nil {
log.Log.WithError(err).Error("Failed to reconcile time range.")
return nil, status.Error(codes.Internal, "failed to reconcile time range")
}

err = db.CreateUsageRecords(ctx, s.conn, report.UsageRecords)
if err != nil {
log.Log.WithError(err).Error("Failed to persist usage records.")
return nil, status.Error(codes.Internal, "failed to persist usage records")
}

filename := fmt.Sprintf("%s.gz", time.Now().Format(time.RFC3339))
err = s.contentService.UploadUsageReport(ctx, filename, report.UsageRecords)
if err != nil {
log.Log.WithError(err).Error("Failed to persist usage report to content service.")
return nil, status.Error(codes.Internal, "failed to persist usage report to content service")
}

var sessions []*v1.BilledSession
for _, instance := range report.UsageRecords {
sessions = append(sessions, usageRecordToBilledUsageProto(instance))
}

return &v1.ReconcileUsageResponse{
Sessions: sessions,
}, nil

}

func NewUsageService(conn *gorm.DB, reportGenerator *ReportGenerator, contentSvc contentservice.Interface) *UsageService {
return &UsageService{
conn: conn,
reportGenerator: reportGenerator,
contentService: contentSvc,
}
}

func usageRecordToBilledUsageProto(usageRecord db.WorkspaceInstanceUsage) *v1.BilledSession {
var endTime *timestamppb.Timestamp
if usageRecord.StoppedAt.Valid {
endTime = timestamppb.New(usageRecord.StoppedAt.Time)
}
return &v1.BilledSession{
AttributionId: string(usageRecord.AttributionID),
UserId: usageRecord.UserID.String(),
WorkspaceId: usageRecord.WorkspaceID,
TeamId: "",
WorkspaceType: string(usageRecord.WorkspaceType),
ProjectId: usageRecord.ProjectID,
InstanceId: usageRecord.InstanceID.String(),
WorkspaceClass: usageRecord.WorkspaceClass,
StartTime: timestamppb.New(usageRecord.StartedAt),
EndTime: endTime,
Credits: usageRecord.CreditsUsed,
}
}

func instancesToUsageRecords(instances []db.WorkspaceInstanceForUsage, pricer *WorkspacePricer, now time.Time) []db.WorkspaceInstanceUsage {
var usageRecords []db.WorkspaceInstanceUsage

for _, instance := range instances {
var stoppedAt sql.NullTime
if instance.StoppedTime.IsSet() {
stoppedAt = sql.NullTime{Time: instance.StoppedTime.Time(), Valid: true}
}

projectID := ""
if instance.ProjectID.Valid {
projectID = instance.ProjectID.String
}

usageRecords = append(usageRecords, db.WorkspaceInstanceUsage{
InstanceID: instance.ID,
AttributionID: instance.UsageAttributionID,
WorkspaceID: instance.WorkspaceID,
ProjectID: projectID,
UserID: instance.OwnerID,
WorkspaceType: instance.Type,
WorkspaceClass: instance.WorkspaceClass,
StartedAt: instance.CreationTime.Time(),
StoppedAt: stoppedAt,
CreditsUsed: pricer.CreditsUsedByInstance(&instance, now),
GenerationID: 0,
})
}

return usageRecords
}
Loading