Skip to content

Commit

Permalink
Merge pull request #561 from fejta/ancient
Browse files Browse the repository at this point in the history
Check preconditions when reading.
  • Loading branch information
google-oss-robot authored Jul 20, 2021
2 parents 1bd8237 + d384fbd commit 9ae7fd8
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 23 deletions.
2 changes: 1 addition & 1 deletion cmd/updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func main() {
"build": opt.buildConcurrency,
}).Info("Configured concurrency")

groupUpdater := updater.GCS(opt.groupTimeout, opt.buildTimeout, opt.buildConcurrency, opt.confirm, updater.SortStarted)
groupUpdater := updater.GCS(client, opt.groupTimeout, opt.buildTimeout, opt.buildConcurrency, opt.confirm, updater.SortStarted)

mets := setupMetrics(ctx)

Expand Down
6 changes: 3 additions & 3 deletions config/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (q *TestGroupQueue) FixAll(whens map[string]time.Time) error {
"group": name,
"when": when,
}).Info("Fixing groups")
it.when = when
}
it.when = when
}
heap.Init(&q.queue)
if len(missing) > 0 {
Expand All @@ -136,9 +136,9 @@ func (q *TestGroupQueue) Fix(name string, when time.Time) error {
"group": name,
"when": when,
}).Info("Fixed group")
it.when = when
heap.Fix(&q.queue, it.index)
}
it.when = when
heap.Fix(&q.queue, it.index)
return nil
}

Expand Down
29 changes: 12 additions & 17 deletions pkg/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ func (mets *Metrics) delay(dur time.Duration) {
type GroupUpdater func(parent context.Context, log logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path) (bool, error)

// GCS returns a GCS-based GroupUpdater, which knows how to process result data stored in GCS.
func GCS(groupTimeout, buildTimeout time.Duration, concurrency int, write bool, sortCols ColumnSorter) GroupUpdater {
func GCS(colClient gcs.Client, groupTimeout, buildTimeout time.Duration, concurrency int, write bool, sortCols ColumnSorter) GroupUpdater {
return func(parent context.Context, log logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path) (bool, error) {
if !tg.UseKubernetesClient {
log.Debug("Skipping non-kubernetes client group")
return false, nil
}
ctx, cancel := context.WithTimeout(parent, groupTimeout)
defer cancel()
gcsColReader := gcsColumnReader(client, buildTimeout, concurrency)
gcsColReader := gcsColumnReader(colClient, buildTimeout, concurrency)
reprocess := 20 * time.Minute // allow 20m for prow to finish uploading artifacts
return InflateDropAppend(ctx, log, client, tg, gridPath, write, gcsColReader, sortCols, reprocess)
}
Expand Down Expand Up @@ -195,13 +195,8 @@ func update(ctx context.Context, client gcs.ConditionalClient, log logrus.FieldL
return more, nil
}

type testGroupClient interface {
gcs.Opener
gcs.Stater
}

func updateTestGroups(ctx context.Context, client testGroupClient, q *config.TestGroupQueue, configPath gcs.Path, gridPrefix string, groupNames []string, freq time.Duration) (int64, map[string]int64, error) {
r, attrs, err := client.Open(ctx, configPath)
func updateTestGroups(ctx context.Context, opener gcs.Opener, stater gcs.Stater, q *config.TestGroupQueue, configPath gcs.Path, gridPrefix string, groupNames []string, freq time.Duration) (int64, map[string]int64, error) {
r, attrs, err := opener.Open(ctx, configPath)
if err != nil {
if !isPreconditionFailed(err) {
err = fmt.Errorf("read: %v", err)
Expand Down Expand Up @@ -239,17 +234,15 @@ func updateTestGroups(ctx context.Context, client testGroupClient, q *config.Tes
if err != nil {
return configGen, nil, err
}
attrs := gcs.Stat(ctx, client, 20, paths...)
attrs := gcs.Stat(ctx, stater, 20, paths...)
updates := make(map[string]time.Time, len(attrs))
now := time.Now()
for i, attrs := range attrs {
name := groups[i].Name
switch {
case attrs.Attrs != nil:
updates[name] = attrs.Attrs.Updated.Add(freq)
generations[name] = attrs.Attrs.Generation
case attrs.Err == storage.ErrObjectNotExist:
updates[name] = now
generations[name] = 0
default:
// no change
Expand All @@ -274,7 +267,7 @@ func Update(parent context.Context, client gcs.ConditionalClient, mets *Metrics,
var q config.TestGroupQueue

log.Debug("Fetching testgroup metadata state...")
gen, generations, err := updateTestGroups(ctx, client, &q, configPath, gridPrefix, groupNames, freq)
gen, generations, err := updateTestGroups(ctx, client, client, &q, configPath, gridPrefix, groupNames, freq)
if err != nil {
return err
}
Expand Down Expand Up @@ -323,7 +316,7 @@ func Update(parent context.Context, client gcs.ConditionalClient, mets *Metrics,

go func() {
cond := storage.Conditions{GenerationNotMatch: gen}
client := client.If(&cond, nil)
opener := client.If(&cond, &cond)
ticker := time.NewTicker(time.Minute)
for {
depth, next, when := q.Status()
Expand All @@ -344,10 +337,12 @@ func Update(parent context.Context, client gcs.ConditionalClient, mets *Metrics,
ticker.Stop()
return
case <-ticker.C:
if gen, _, err := updateTestGroups(ctx, client, &q, configPath, gridPrefix, groupNames, freq); err != nil {
log.WithError(err).Error("Failed to update configuration")
} else {
gen, _, err := updateTestGroups(ctx, opener, client, &q, configPath, gridPrefix, groupNames, freq)
switch {
case err == nil:
cond.GenerationNotMatch = gen
case !isPreconditionFailed(err):
log.WithError(err).Error("Failed to update configuration")
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/updater/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestGCS(t *testing.T) {
// either because the context is canceled or things like client are unset)
ctx, cancel := context.WithCancel(context.Background())
cancel()
updater := GCS(0, 0, 0, false, SortStarted)
updater := GCS(nil, 0, 0, 0, false, SortStarted)
defer func() {
if r := recover(); r != nil {
if !tc.fail {
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestUpdate(t *testing.T) {
client.Lister[buildsPath] = fi
}

groupUpdater := GCS(*tc.groupTimeout, *tc.buildTimeout, tc.buildConcurrency, !tc.skipConfirm, SortStarted)
groupUpdater := GCS(client, *tc.groupTimeout, *tc.buildTimeout, tc.buildConcurrency, !tc.skipConfirm, SortStarted)
mets := &Metrics{
Successes: &fakeCounter{},
Errors: &fakeCounter{},
Expand Down
1 change: 1 addition & 0 deletions util/gcs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"@com_github_sirupsen_logrus//:go_default_library",
"@com_google_cloud_go_storage//:go_default_library",
"@io_k8s_api//core/v1:go_default_library",
"@org_golang_google_api//googleapi:go_default_library",
"@org_golang_google_api//iterator:go_default_library",
"@org_golang_google_api//option:go_default_library",
],
Expand Down
21 changes: 21 additions & 0 deletions util/gcs/real_gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package gcs
import (
"context"
"io"
"net/http"
"strings"

"cloud.google.com/go/storage"
"google.golang.org/api/googleapi"
)

var (
Expand Down Expand Up @@ -65,9 +67,28 @@ func (rgc realGCSClient) Open(ctx context.Context, path Path) (io.ReadCloser, *s
if r == nil {
return nil, nil, err
}
if err == nil && rgc.readCond != nil {
err = checkPreconditions(r.Attrs, rgc.readCond)
}
return r, &r.Attrs, err
}

var (
errPreconditions = googleapi.Error{
Code: http.StatusPreconditionFailed,
}
)

func checkPreconditions(attrs storage.ReaderObjectAttrs, cond *storage.Conditions) error {
if g := cond.GenerationMatch; g > 0 && g != attrs.Generation {
return &errPreconditions
}
if g := cond.GenerationNotMatch; g > 0 && g == attrs.Generation {
return &errPreconditions
}
return nil
}

func (rgc realGCSClient) Objects(ctx context.Context, path Path, delimiter, startOffset string) Iterator {
p := path.Object()
if !strings.HasSuffix(p, "/") {
Expand Down

0 comments on commit 9ae7fd8

Please sign in to comment.