Skip to content

Commit 506f86a

Browse files
committed
[usage] Implement CollectUsage
1 parent 33d362b commit 506f86a

File tree

5 files changed

+391
-376
lines changed

5 files changed

+391
-376
lines changed

components/usage/pkg/apiv1/usage.go

Lines changed: 219 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@ package apiv1
66

77
import (
88
context "context"
9+
"database/sql"
10+
"fmt"
911
"github.com/gitpod-io/gitpod/common-go/log"
12+
"github.com/gitpod-io/gitpod/usage/pkg/contentservice"
13+
"github.com/gitpod-io/gitpod/usage/pkg/controller"
1014
"time"
1115

1216
v1 "github.com/gitpod-io/gitpod/usage-api/v1"
@@ -21,12 +25,17 @@ var _ v1.UsageServiceServer = (*UsageService)(nil)
2125

2226
type UsageService struct {
2327
conn *gorm.DB
28+
29+
contentService contentservice.Interface
30+
31+
reportGenerator *ReportGenerator
32+
2433
v1.UnimplementedUsageServiceServer
2534
}
2635

2736
const maxQuerySize = 31 * 24 * time.Hour
2837

29-
func (us *UsageService) ListBilledUsage(ctx context.Context, in *v1.ListBilledUsageRequest) (*v1.ListBilledUsageResponse, error) {
38+
func (s *UsageService) ListBilledUsage(ctx context.Context, in *v1.ListBilledUsageRequest) (*v1.ListBilledUsageResponse, error) {
3039
to := time.Now()
3140
if in.To != nil {
3241
to = in.To.AsTime()
@@ -52,7 +61,7 @@ func (us *UsageService) ListBilledUsage(ctx context.Context, in *v1.ListBilledUs
5261
order = db.DescendingOrder
5362
}
5463

55-
usageRecords, err := db.ListUsage(ctx, us.conn, db.AttributionID(in.GetAttributionId()), from, to, order)
64+
usageRecords, err := db.ListUsage(ctx, s.conn, db.AttributionID(in.GetAttributionId()), from, to, order)
5665
if err != nil {
5766
log.Log.
5867
WithField("attribution_id", in.AttributionId).
@@ -88,6 +97,212 @@ func (us *UsageService) ListBilledUsage(ctx context.Context, in *v1.ListBilledUs
8897
}, nil
8998
}
9099

91-
func NewUsageService(conn *gorm.DB) *UsageService {
92-
return &UsageService{conn: conn}
100+
func (s *UsageService) ReconcileUsage(ctx context.Context, req *v1.ReconcileUsageRequest) (*v1.ReconcileUsageResponse, error) {
101+
from := req.GetStartTime().AsTime()
102+
to := req.GetEndTime().AsTime()
103+
104+
if to.Before(from) {
105+
return nil, status.Errorf(codes.InvalidArgument, "End time must be after start time")
106+
}
107+
108+
report, err := s.reportGenerator.GenerateUsageReport(ctx, from, to)
109+
if err != nil {
110+
log.Log.WithError(err).Error("Failed to reconcile time range.")
111+
return nil, status.Error(codes.Internal, "failed to reconcile time range")
112+
}
113+
114+
err = db.CreateUsageRecords(ctx, s.conn, report.UsageRecords)
115+
if err != nil {
116+
log.Log.WithError(err).Error("Failed to persist usage records.")
117+
return nil, status.Error(codes.Internal, "failed to persist usage records")
118+
}
119+
120+
filename := fmt.Sprintf("%s.gz", time.Now().Format(time.RFC3339))
121+
err = s.contentService.UploadUsageReport(ctx, filename, report.UsageRecords)
122+
if err != nil {
123+
log.Log.WithError(err).Error("Failed to persist usage report to content service.")
124+
return nil, status.Error(codes.Internal, "failed to persist usage report to content service")
125+
}
126+
127+
var sessions []*v1.BilledSession
128+
for _, instance := range report.UsageRecords {
129+
sessions = append(sessions, usageRecordToBilledUsageProto(instance))
130+
}
131+
132+
return &v1.ReconcileUsageResponse{
133+
Sessions: sessions,
134+
}, nil
135+
136+
}
137+
138+
func NewUsageService(conn *gorm.DB, reportGenerator *ReportGenerator, contentSvc contentservice.Interface) *UsageService {
139+
return &UsageService{
140+
conn: conn,
141+
reportGenerator: reportGenerator,
142+
contentService: contentSvc,
143+
}
144+
}
145+
146+
func usageRecordToBilledUsageProto(usageRecord db.WorkspaceInstanceUsage) *v1.BilledSession {
147+
var endTime *timestamppb.Timestamp
148+
if usageRecord.StoppedAt.Valid {
149+
endTime = timestamppb.New(usageRecord.StoppedAt.Time)
150+
}
151+
return &v1.BilledSession{
152+
AttributionId: string(usageRecord.AttributionID),
153+
UserId: usageRecord.UserID.String(),
154+
WorkspaceId: usageRecord.WorkspaceID,
155+
TeamId: "",
156+
WorkspaceType: string(usageRecord.WorkspaceType),
157+
ProjectId: usageRecord.ProjectID,
158+
InstanceId: usageRecord.InstanceID.String(),
159+
WorkspaceClass: usageRecord.WorkspaceClass,
160+
StartTime: timestamppb.New(usageRecord.StartedAt),
161+
EndTime: endTime,
162+
Credits: usageRecord.CreditsUsed,
163+
}
164+
}
165+
166+
type InvalidSession struct {
167+
Reason string
168+
Session db.WorkspaceInstanceForUsage
169+
}
170+
171+
type UsageReport struct {
172+
GenerationTime time.Time
173+
174+
From time.Time
175+
To time.Time
176+
177+
RawSessions []db.WorkspaceInstanceForUsage
178+
InvalidSessions []InvalidSession
179+
180+
UsageRecords []db.WorkspaceInstanceUsage
181+
}
182+
183+
func NewReportGenerator(conn *gorm.DB, pricer *controller.WorkspacePricer) *ReportGenerator {
184+
return &ReportGenerator{
185+
conn: conn,
186+
pricer: pricer,
187+
nowFunc: time.Now,
188+
}
189+
}
190+
191+
type ReportGenerator struct {
192+
conn *gorm.DB
193+
pricer *controller.WorkspacePricer
194+
nowFunc func() time.Time
195+
}
196+
197+
func (g *ReportGenerator) GenerateUsageReport(ctx context.Context, from, to time.Time) (UsageReport, error) {
198+
now := g.nowFunc().UTC()
199+
log.Infof("Gathering usage data from %s to %s", from, to)
200+
201+
report := UsageReport{
202+
GenerationTime: now,
203+
From: from,
204+
To: to,
205+
}
206+
207+
instances, err := db.ListWorkspaceInstancesInRange(ctx, g.conn, from, to)
208+
if err != nil {
209+
return report, fmt.Errorf("failed to list instances from db: %w", err)
210+
}
211+
report.RawSessions = instances
212+
213+
valid, invalid := validateInstances(instances)
214+
report.InvalidSessions = invalid
215+
216+
if len(invalid) > 0 {
217+
log.WithField("invalid_workspace_instances", invalid).Errorf("Detected %d invalid instances. These will be skipped in the current run.", len(invalid))
218+
}
219+
log.WithField("workspace_instances", instances).Debug("Successfully loaded workspace instances.")
220+
221+
trimmed := trimStartStopTime(valid, from, to)
222+
223+
report.UsageRecords = instancesToUsageRecords(trimmed, g.pricer, now)
224+
return report, nil
225+
}
226+
227+
func validateInstances(instances []db.WorkspaceInstanceForUsage) (valid []db.WorkspaceInstanceForUsage, invalid []InvalidSession) {
228+
for _, i := range instances {
229+
// i is a pointer to the current element, we need to assign it to ensure we're copying the value, not the current pointer.
230+
instance := i
231+
232+
// Each instance must have a start time, without it, we do not have a baseline for usage computation.
233+
if !instance.CreationTime.IsSet() {
234+
invalid = append(invalid, InvalidSession{
235+
Reason: "missing creation time",
236+
Session: instance,
237+
})
238+
continue
239+
}
240+
241+
start := instance.CreationTime.Time()
242+
243+
// Currently running instances do not have a stopped time set, so we ignore these.
244+
if instance.StoppedTime.IsSet() {
245+
stop := instance.StoppedTime.Time()
246+
if stop.Before(start) {
247+
invalid = append(invalid, InvalidSession{
248+
Reason: "stop time is before start time",
249+
Session: instance,
250+
})
251+
continue
252+
}
253+
}
254+
255+
valid = append(valid, instance)
256+
}
257+
return valid, invalid
258+
}
259+
260+
// trimStartStopTime ensures that start time or stop time of an instance is never outside of specified start or stop time range.
261+
func trimStartStopTime(instances []db.WorkspaceInstanceForUsage, maximumStart, minimumStop time.Time) []db.WorkspaceInstanceForUsage {
262+
var updated []db.WorkspaceInstanceForUsage
263+
264+
for _, instance := range instances {
265+
if instance.CreationTime.Time().Before(maximumStart) {
266+
instance.CreationTime = db.NewVarcharTime(maximumStart)
267+
}
268+
269+
if instance.StoppedTime.Time().After(minimumStop) {
270+
instance.StoppedTime = db.NewVarcharTime(minimumStop)
271+
}
272+
273+
updated = append(updated, instance)
274+
}
275+
return updated
276+
}
277+
278+
func instancesToUsageRecords(instances []db.WorkspaceInstanceForUsage, pricer *controller.WorkspacePricer, now time.Time) []db.WorkspaceInstanceUsage {
279+
var usageRecords []db.WorkspaceInstanceUsage
280+
281+
for _, instance := range instances {
282+
var stoppedAt sql.NullTime
283+
if instance.StoppedTime.IsSet() {
284+
stoppedAt = sql.NullTime{Time: instance.StoppedTime.Time(), Valid: true}
285+
}
286+
287+
projectID := ""
288+
if instance.ProjectID.Valid {
289+
projectID = instance.ProjectID.String
290+
}
291+
292+
usageRecords = append(usageRecords, db.WorkspaceInstanceUsage{
293+
InstanceID: instance.ID,
294+
AttributionID: instance.UsageAttributionID,
295+
WorkspaceID: instance.WorkspaceID,
296+
ProjectID: projectID,
297+
UserID: instance.OwnerID,
298+
WorkspaceType: instance.Type,
299+
WorkspaceClass: instance.WorkspaceClass,
300+
StartedAt: instance.CreationTime.Time(),
301+
StoppedAt: stoppedAt,
302+
CreditsUsed: pricer.CreditsUsedByInstance(&instance, now),
303+
GenerationID: 0,
304+
})
305+
}
306+
307+
return usageRecords
93308
}

0 commit comments

Comments
 (0)