Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COST-3571] generate daily reports #184

Merged
merged 52 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
b97bc77
only collect full days of previous data
maskarb Apr 25, 2023
3a9d244
update test
maskarb Apr 25, 2023
2271bdf
fix some logic
maskarb Apr 25, 2023
9f77411
fix test
maskarb Apr 25, 2023
f5470a7
update logging for ErrNoData
maskarb Apr 25, 2023
490c109
collect 4 days data before packaging
maskarb Apr 25, 2023
46841d8
add comment
maskarb Apr 25, 2023
32c359f
update variable name
maskarb Apr 25, 2023
681662d
move status update to daily loop rather than in the packing branch
maskarb Apr 25, 2023
c569a41
but more specific when breaking from loop
maskarb Apr 25, 2023
f1f1775
oof
maskarb Apr 26, 2023
ed688bd
use enum
maskarb Apr 26, 2023
8f4ee2a
update comment
maskarb Apr 26, 2023
0061453
update comment
maskarb May 1, 2023
d2e9d37
remove panic
maskarb May 1, 2023
3f5326f
make FilesAction a func type
maskarb May 1, 2023
9b61390
simplify copy method
maskarb May 1, 2023
19b9b7e
update tests
maskarb May 1, 2023
b32c9d4
update workflow
maskarb May 1, 2023
5c3aa66
fix the workflow
maskarb May 1, 2023
98ab8cb
fix the workflow
maskarb May 1, 2023
2231126
update CI workflow
maskarb May 1, 2023
ec20c49
Merge branch 'main' into collect-full-days-previous
maskarb May 2, 2023
37f3912
use constant
maskarb May 9, 2023
d762111
Merge branch 'main' into collect-full-days-previous
maskarb May 10, 2023
1672bd7
add gomock and update ginkgo to v2
maskarb May 10, 2023
7e45f59
add mocks
maskarb May 10, 2023
0951326
create mocks package
maskarb May 10, 2023
a2e78db
remove focus
maskarb May 10, 2023
91eb659
update workflow
maskarb May 10, 2023
f861725
idk why this doesnt work :(
maskarb May 10, 2023
7ad570f
eliminate the need for renaming the CR
maskarb May 10, 2023
49d4b60
finallyyyyyy
maskarb May 11, 2023
bdf731b
more udpates
maskarb May 11, 2023
ba3b048
ITS WORKING SO MUCH BETTER
maskarb May 11, 2023
53fafdf
move status setting
maskarb May 11, 2023
5c184d6
remove focus
maskarb May 11, 2023
8c93bd1
add test
maskarb May 11, 2023
fe8e4de
add very complicated test
maskarb May 11, 2023
7735f6a
account for single line item in pod report
maskarb May 11, 2023
6ab3e1e
rename to test file
maskarb May 11, 2023
74d9ed1
add bigger tests
maskarb May 12, 2023
0296845
add end of day test
maskarb May 15, 2023
eb4a087
some cleanup
maskarb May 15, 2023
ae5bce8
remove interface
maskarb May 15, 2023
010f02f
format timestamp
maskarb May 15, 2023
59e7820
fix test assertions
maskarb May 15, 2023
1e7a74c
remove unused var
maskarb May 15, 2023
af50e17
add comment
maskarb May 15, 2023
196b0b2
add flag to manifest
maskarb May 15, 2023
22e70fb
update tests to correctly count reports
maskarb May 16, 2023
ab2203d
use constant
maskarb May 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 6 additions & 15 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ jobs:
runs-on: ubuntu-22.04
steps:
- name: Checkout repository
uses: actions/checkout@v3.1.0
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Setup Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: 1.18
- name: golangci-lint
Expand All @@ -31,27 +31,18 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v3.1.0
uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Display build environment
run: printenv

- name: Setup Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
stable: 'true'
go-version: 1.18 # The Go version to download (if necessary) and use.

- name: Cache Go modules
uses: actions/cache@v2
with:
path: |
~/go/pkg/mod
key: ${{ runner.os }}-build-${{ hashFiles('**/go.sum') }}


- name: Install kubebuilder
run : |
os=$(go env GOOS)
Expand Down Expand Up @@ -80,7 +71,7 @@ jobs:
- name: Checkout
# this checkout is required for the coverage report. If we don't do this, then
# the uploaded report is invalid and codecov doesn't know how to process it.
uses: actions/checkout@v3.1.0
uses: actions/checkout@v3
with:
fetch-depth: 0

Expand All @@ -90,7 +81,7 @@ jobs:
name: coverage

- name: Upload coverage
uses: codecov/codecov-action@v3.1.1
uses: codecov/codecov-action@v3
with:
file: ./cover.out
flags: unittests
Expand Down
10 changes: 4 additions & 6 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package collector

import (
"errors"
"fmt"
"regexp"
"sort"
Expand Down Expand Up @@ -33,6 +34,8 @@ var (
statusTimeFormat = "2006-01-02 15:04:05"

log = logr.Log.WithName("collector")

ErrNoData = errors.New("no data to collect")
)

type mappedCSVStruct map[string]csvStruct
Expand Down Expand Up @@ -197,10 +200,8 @@ func GenerateReports(cr *metricscfgv1beta1.MetricsConfig, dirCfg *dirconfig.Dire

if len(nodeResults) <= 0 {
log.Info("no data to report")
cr.Status.Reports.DataCollected = false
cr.Status.Reports.DataCollectionMessage = "No data to report for the hour queried."
// there is no data for the hour queried. Return nothing
return nil
return ErrNoData
}
for node, val := range nodeResults {
resourceID := getResourceID(val["provider_id"].(string))
Expand Down Expand Up @@ -248,9 +249,6 @@ func GenerateReports(cr *metricscfgv1beta1.MetricsConfig, dirCfg *dirconfig.Dire

//################################################################################################################

cr.Status.Reports.DataCollected = true
cr.Status.Reports.DataCollectionMessage = ""

return nil
}

Expand Down
6 changes: 1 addition & 5 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,9 @@ func TestGenerateReportsNoNodeData(t *testing.T) {
},
TimeSeries: &fakeTimeRange,
}
if err := GenerateReports(fakeCR, fakeDirCfg, fakeCollector); err != nil {
if err := GenerateReports(fakeCR, fakeDirCfg, fakeCollector); err != nil && err != ErrNoData {
t.Errorf("Failed to generate reports: %v", err)
}
wanted := "No data to report for the hour queried."
if fakeCR.Status.Reports.DataCollectionMessage != wanted {
t.Errorf("Status not updated correctly: got %s want %s", fakeCR.Status.Reports.DataCollectionMessage, wanted)
}
filelist, err := ioutil.ReadDir(filepath.Join("test_files", "test_reports"))
if err != nil {
t.Fatalf("Failed to read expected reports dir")
Expand Down
123 changes: 77 additions & 46 deletions controllers/kokumetricsconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"github.com/project-koku/koku-metrics-operator/storage"
)

const HOURS_IN_DAY int = 23 // first hour is 0: 0 -> 23 == 24 hrs

var (
GitCommit string

Expand Down Expand Up @@ -67,11 +69,11 @@ type MetricsConfigReconciler struct {
InCluster bool
Namespace string

apiReader client.Reader
cvClientBuilder cv.ClusterVersionBuilder
promCollector *collector.PrometheusCollector
disablePreviousDataCollection bool
overrideSecretPath bool
apiReader client.Reader
cvClientBuilder cv.ClusterVersionBuilder
promCollector *collector.PrometheusCollector
initialDataCollection bool
overrideSecretPath bool
}

type previousAuthValidation struct {
Expand Down Expand Up @@ -426,20 +428,20 @@ func checkSource(r *MetricsConfigReconciler, handler *sources.SourceHandler, cr
}
}

func packageFiles(p *packaging.FilePackager) {
func packageFiles(p *packaging.FilePackager, cr *metricscfgv1beta1.MetricsConfig) {
log := log.WithName("packageAndUpload")

// if its time to package
if !checkCycle(log, *p.CR.Status.Upload.UploadCycle, p.CR.Status.Packaging.LastSuccessfulPackagingTime, "file packaging") {
if !checkCycle(log, *cr.Status.Upload.UploadCycle, cr.Status.Packaging.LastSuccessfulPackagingTime, "file packaging") {
return
}

// Package and split the payload if necessary
p.CR.Status.Packaging.PackagingError = ""
if err := p.PackageReports(); err != nil {
cr.Status.Packaging.PackagingError = ""
if err := p.PackageReports(cr); err != nil {
log.Error(err, "PackageReports failed")
// update the CR packaging error status
p.CR.Status.Packaging.PackagingError = err.Error()
cr.Status.Packaging.PackagingError = err.Error()
}
}

Expand Down Expand Up @@ -546,9 +548,7 @@ func configurePVC(r *MetricsConfigReconciler, req ctrl.Request, cr *metricscfgv1

if strings.Contains(cr.Status.Storage.VolumeType, "EmptyDir") {
cr.Status.Storage.VolumeMounted = false
if err := r.Status().Update(ctx, cr); err != nil {
log.Error(err, "failed to update MetricsConfig status")
}
r.updateStatusAndLogError(ctx, cr)
return &ctrl.Result{}, fmt.Errorf("PVC not mounted")
}
return nil, nil
Expand Down Expand Up @@ -594,9 +594,7 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// set the cluster ID & return if there are errors
if err := setClusterID(r, cr); err != nil {
log.Error(err, "failed to obtain clusterID")
if err := r.Status().Update(ctx, cr); err != nil {
log.Error(err, "failed to update MetricsConfig status")
}
r.updateStatusAndLogError(ctx, cr)
return ctrl.Result{}, err
}

Expand All @@ -610,6 +608,7 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// so we need to package the old files before generating new reports.
// We set this packaging time to zero so that the next call to packageFiles
// will force file packaging to occur.
log.Info("commit changed, resetting packaging time to force packaging of old data files")
cr.Status.Packaging.LastSuccessfulPackagingTime = metav1.Time{}
cr.Status.OperatorCommit = GitCommit
}
Expand All @@ -623,9 +622,11 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
}

startTime, endTime := getTimeRange(ctx, r, cr)

packager := &packaging.FilePackager{
CR: cr,
DirCfg: dirCfg,
DirCfg: dirCfg,
FilesAction: packaging.MoveFiles,
}

// if packaging time is zero but there are files in the data dir, this is an upgraded operator.
Expand All @@ -635,44 +636,70 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
files, err := dirCfg.Reports.GetFiles()
if err == nil && len(files) > 0 {
log.Info("packaging files from an old operator version")
packageFiles(packager)
packageFiles(packager, cr)
// after packaging files after an upgrade, truncate the start time so we recollect
// all of today's data. This ensures that today's report contains any new report changes.
startTime = startTime.Truncate(24 * time.Hour)
}
}

// attempt to collect prometheus stats and create reports
if err := getPromCollector(r, cr); err != nil {
log.Error(err, "failed to get prometheus connection")
log.Info("failed to get prometheus connection", "error", err)
r.updateStatusAndLogError(ctx, cr)
return ctrl.Result{RequeueAfter: time.Minute * 2}, err // give things a break and try again in 2 minutes
}
originalStartTime, endTime := getTimeRange(ctx, r, cr)
startTime := originalStartTime
for startTime.Before(endTime) {
t := startTime
timeRange := promv1.Range{
Start: t,
End: t.Add(59*time.Minute + 59*time.Second),
Step: time.Minute,

for start := startTime; !start.After(endTime); start = start.AddDate(0, 0, 1) {
t := start
hours := int(endTime.Sub(t).Hours())
if hours > HOURS_IN_DAY {
hours = HOURS_IN_DAY
}
collectPromStats(r, cr, dirCfg, timeRange)
if startTime.Sub(originalStartTime) == 48*time.Hour {
// after collecting 48 hours of data, package the report to compress the files
for i := 0; i <= hours; i++ {
timeRange := promv1.Range{
Start: t,
End: t.Add(59*time.Minute + 59*time.Second),
Step: time.Minute,
}
if err := collectPromStats(r, cr, dirCfg, timeRange); err != nil {
if err == collector.ErrNoData && t.Hour() == 0 && t.Day() != endTime.Day() && r.initialDataCollection {
// if there is no data for the first hour of the day, and we are doing the
// initial data collection, skip to the next day so we avoid collecting
// partial data for a full day. This ensures we are generating a full daily
// report upon initial ingest.
log.Info("skipping data collection for day", "datetime", timeRange.Start)
break
}
}
t = t.Add(1 * time.Hour)
}

if r.initialDataCollection && t.Sub(startTime).Hours() == 96 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might still split into multiple files for the 4 day period? Also anything specific about the 96 mark or just feels right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here was to fit the number of reports into the default max of 30. Assuming a cluster has 90 days of data, this would generate 90*24/96 ~= 23 reports.

If the reports are too big and require splitting, then yea, this logic goes out the window. But it is a best effort to fit a full 90 days into 30 reports.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this some more, when we split a payload, that is still counted as a single report. Here is the logic for that:
https://github.com/project-koku/koku-metrics-operator/blob/main/packaging/packaging.go#L428-L435
We get the timestamp from the tar.gz and insert into a set, then count the length of the set.

All payloads from a split report use this basename:
https://github.com/project-koku/koku-metrics-operator/blob/main/packaging/packaging.go#L517

which is just TIMESTAMP-cost-mgmt...

// only perform these steps during the initial data collection.
// after collecting 96 hours of data, package the report to compress the files
// packaging is guarded by this LastSuccessfulPackagingTime, so setting it to
// zero enables packaging to occur thruout this loop
log.Info("collected 96 hours of data, resetting packaging time to force packaging")
cr.Status.Packaging.LastSuccessfulPackagingTime = metav1.Time{}
packageFiles(packager)
originalStartTime = startTime
}
startTime = startTime.Add(1 * time.Hour)
if err := r.Status().Update(ctx, cr); err != nil {
// it's not critical to handle this error. We update the status here to show progress
// if this loop takes a long time to complete. A missed update here does not impact
// data collection here.
log.Info("failed to update MetricsConfig status")
packageFiles(packager, cr)
startTime = t
// update status to show progress
r.updateStatusAndLogError(ctx, cr)
}
}

r.initialDataCollection = false
packager.FilesAction = packaging.CopyFiles
if endTime.Hour() == HOURS_IN_DAY && !cr.Status.Prometheus.LastQuerySuccessTime.Equal(&metav1.Time{Time: startTime}) {
// when we've reached the end of the day, force packaging to occur to generate the daily report
log.Info("collected a full day of data, resetting packaging time to force packaging")
cr.Status.Packaging.LastSuccessfulPackagingTime = metav1.Time{}
packager.FilesAction = packaging.MoveFiles
}

// package report files
packageFiles(packager)
packageFiles(packager, cr)

// Initial returned result -> requeue reconcile after 5 min.
// This result is replaced if upload or status update results in error.
Expand All @@ -693,9 +720,7 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques

// obtain credentials token/basic & return if there are authentication credential errors
if err := setAuthentication(r, authConfig, cr, req.NamespacedName); err != nil {
if err := r.Status().Update(ctx, cr); err != nil {
log.Error(err, "failed to update MetricsConfig status")
}
r.updateStatusAndLogError(ctx, cr)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -727,7 +752,7 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

// remove old reports if maximum report count has been exceeded
if err := packager.TrimPackages(); err != nil {
if err := packager.TrimPackages(cr); err != nil {
result = ctrl.Result{}
errors = append(errors, err)
}
Expand All @@ -740,7 +765,7 @@ func (r *MetricsConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
cr.Status.Packaging.PackagedFiles = uploadFiles

if err := r.Status().Update(ctx, cr); err != nil {
log.Error(err, "failed to update MetricsConfig status")
log.Info("failed to update MetricsConfig status", "error", err)
result = ctrl.Result{}
errors = append(errors, err)
}
Expand All @@ -757,6 +782,12 @@ func (r *MetricsConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *MetricsConfigReconciler) updateStatusAndLogError(ctx context.Context, cr *metricscfgv1beta1.MetricsConfig) {
if err := r.Status().Update(ctx, cr); err != nil {
log.Info("failed to update MetricsConfig status", "error", err)
}
}

// concatErrs combines all the errors into one error
func concatErrs(errors ...error) error {
var err error
Expand Down
Loading