Skip to content

Commit

Permalink
bazel: tweak logic for staging test artifacts in bazci
Browse files Browse the repository at this point in the history
Because Bazel aggressively caches build/test artifacts, if we're not
careful, bazci can copy OLD artifacts from previous build/test runs into
the artifacts directory. This is particularly an issue because TeamCity
watches that directory for `test.xml` reports, and if an old `test.xml`
reports a test failure, TeamCity will notice that and report the failure
in the UI -- and importantly, even if we replace that `test.xml` with a
completely different one that reports that the test succeeed, TC will
not amend what's displayed in the UI accordingly. So this can manifest
as reported test failures from unrelated PR's showing up in TC in an
apparently unpredictable (though uncommmon) manner.

We fix this by making bazci a little smarter about when we choose to
stage artifacts:

1. The first time the watcher loops over all the test artifacts, never
   stage anything (the artifacts are probably cached -- not enough time
   has passed for any legitimate artifacts to appear, probably).
2. Only stage artifacts incrementally if their stats have changed since
   the initial round of caching.
3. During the final loop, stage ALL artifacts (if they haven't been
   staged yet), just to make sure we don't miss anything.

Also add lots of comments to make these design decisions and their
motivations a little clearer.

Resolves #63740.

Release note: None
  • Loading branch information
rickystewart committed Apr 21, 2021
1 parent feb357a commit 5fb6a4c
Showing 1 changed file with 173 additions and 45 deletions.
218 changes: 173 additions & 45 deletions pkg/cmd/bazci/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
package main

import (
"bytes"
"encoding/xml"
"io"
"io/ioutil"
"log"
"os"
"path"
"strings"
"time"

"github.com/cockroachdb/errors"
)

// SourceDir is an enumeration of possible output locations.
Expand All @@ -38,6 +38,22 @@ type fileMetadata struct {
modTime time.Time
}

// Phase is an enumeration of the three kinds of phases the watcher goes
// through (as in a state machine).
type Phase int

const (
// The first phase wherein we perform an initial caching of all artifact
// metadata.
initialCachingPhase Phase = iota
// The second phase wherein we may update any files that have been newly
// updated since the first round of caching.
incrementalUpdatePhase
// The final phase wherein we stage ALL artifacts that have not been
// staged yet.
finalizePhase
)

// A watcher watches the status of a build and regularly copies over relevant
// artifacts.
type watcher struct {
Expand All @@ -48,32 +64,46 @@ type watcher struct {
// any given file is up-to-date. If the fileMetadata doesn't match the
// latest file `stat`, we assume the file's been updated and read it
// again.
files map[string]fileMetadata
fileToMeta map[string]fileMetadata
// Map of file path -> bool representing whether this file has already
// been staged. (We never set the bool to "false" for any value.) Useful
// because we don't ever WANT to stage the same file more than twice,
// and because we need to make sure we've staged everything in the
// finalize phase.
fileToStaged map[string]bool
}

// makeWatcher returns an appropriate watcher.
func makeWatcher(completion <-chan error, info buildInfo) *watcher {
return &watcher{
completion: completion,
info: info,
files: make(map[string]fileMetadata),
completion: completion,
info: info,
fileToMeta: make(map[string]fileMetadata),
fileToStaged: make(map[string]bool),
}
}

// Watch performs most of the heavy lifting for bazci by monitoring the
// progress of the ongoing Bazel build (represented by the `completion`
// channel in the watcher), staging any test artifacts that are produced.
func (w watcher) Watch() error {
// First, cache the file metadata for all test artifacts.
err := w.stageTestArtifacts(initialCachingPhase)
if err != nil {
return err
}
// The main watch loop.
for {
select {
case buildErr := <-w.completion:
// Note that even if the build failed, we still want to
// try to stage test artifacts.
testErr := w.stageTestArtifacts()
testErr := w.stageTestArtifacts(finalizePhase)
// Also stage binary artifacts this time.
binErr := w.stageBinaryArtifacts()
// Only check errors after we're done staging all
// artifacts.
// artifacts -- we don't want to miss anything because
// the build failed.
if buildErr != nil {
return buildErr
}
Expand All @@ -87,7 +117,7 @@ func (w watcher) Watch() error {
case <-time.After(10 * time.Second):
// Otherwise, every 10 seconds, stage the latest test
// artifacts.
err := w.stageTestArtifacts()
err := w.stageTestArtifacts(incrementalUpdatePhase)
if err != nil {
return err
}
Expand All @@ -98,35 +128,32 @@ func (w watcher) Watch() error {
// stageTestArtifacts copies the latest test artifacts into the artifactsDir.
// The "test artifacts" are the test.log (which is copied verbatim) and test.xml
// (which we need to munge a little bit before copying).
func (w watcher) stageTestArtifacts() error {
func (w watcher) stageTestArtifacts(phase Phase) error {
for _, test := range w.info.tests {
// relDir is the directory under bazel-testlogs where the test
// output files can be found.
relDir := strings.ReplaceAll(strings.TrimPrefix(test, "//"), ":", "/")
// First, copy the log file verbatim.
relLogFilePath := path.Join(relDir, "test.log")
err := w.maybeStageArtifact(testlogsSourceDir, relLogFilePath, 0666, copyContentTo)
err := w.maybeStageArtifact(testlogsSourceDir, relLogFilePath, 0666, phase,
copyContentTo)
if err != nil {
return err
}
// Munge and copy the xml file.
relXMLFilePath := path.Join(relDir, "test.xml")
err = w.maybeStageArtifact(testlogsSourceDir, relXMLFilePath, 0666,
err = w.maybeStageArtifact(testlogsSourceDir, relXMLFilePath, 0666, phase,
func(srcContent []byte, outFile io.Writer) error {
// Parse the XML into a testSuites struct.
suites := testSuites{}
err := xml.Unmarshal(srcContent, &suites)
// Note that we return an error if parsing fails. This isn't
// unexpected -- if we read the XML file before it's been
// completely written to disk, that will happen. Returning the
// error will cancel the write to disk, which is exactly what we
// want.
if err != nil {
switch {
// Syntax errors we can just ignore. (Maybe we read the file
// while it was in the process of being written?) Better to
// have something than nothing, so copy the raw content.
case errors.HasType(err, (*xml.SyntaxError)(nil)):
_, err = outFile.Write(srcContent)
return err
default:
return err
}
return err
}
// We only want the first test suite in the list of suites.
munged, err := xml.MarshalIndent(&suites.Suites[0], "", "\t")
Expand Down Expand Up @@ -182,6 +209,9 @@ type xmlMessage struct {
}

// stageBinaryArtifacts stages the latest binary artifacts from the build.
// (We only ever stage binary artifacts during the finalize phase -- it isn't
// important for them to pop up before the build is complete, as with test
// results.)
func (w watcher) stageBinaryArtifacts() error {
for _, bin := range w.info.goBinaries {
// Convert a target like `//pkg/cmd/cockroach-short` to the
Expand All @@ -190,7 +220,8 @@ func (w watcher) stageBinaryArtifacts() error {
head := strings.ReplaceAll(strings.TrimPrefix(bin, "//"), ":", "/")
components := strings.Split(bin, ":")
relBinPath := path.Join(head+"_", components[len(components)-1])
err := w.maybeStageArtifact(binSourceDir, relBinPath, 0777, copyContentTo)
err := w.maybeStageArtifact(binSourceDir, relBinPath, 0777, finalizePhase,
copyContentTo)
if err != nil {
return err
}
Expand All @@ -205,22 +236,82 @@ func copyContentTo(srcContent []byte, outFile io.Writer) error {
return err
}

// cancelableWriter is an io.WriteCloser with the following properties:
// 1. The entire contents of the file is accumulated in memory in the `buf`.
// 2. The file is written to disk when Close() is called...
// 3. ...UNLESS Canceled is set.
// These properties allow us to make sure we only stage artifacts once --
// if an artifact isn't ready to be staged, just set "Canceled = true" and we
// can trivially skip it for now.
type cancelableWriter struct {
filename string
perm os.FileMode
buf bytes.Buffer
Canceled bool
}

var _ io.WriteCloser = (*cancelableWriter)(nil)

func (w *cancelableWriter) Write(p []byte) (n int, err error) {
n, err = w.buf.Write(p)
if err != nil {
w.Canceled = true
}
return
}

func (w *cancelableWriter) Close() error {
if !w.Canceled {
err := os.MkdirAll(path.Dir(w.filename), 0777)
if err != nil {
return err
}
f, err := os.OpenFile(w.filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, w.perm)
if err != nil {
return err
}
defer f.Close()
_, err = f.Write(w.buf.Bytes())
return err
}
return nil
}

// maybeStageArtifact stages a build or test artifact in the artifactsDir unless
// the cache indicates the file is already up-to-date. maybeStageArtifact stages
// the file from the given `root` (e.g. either `bazel-testlogs` or `bazel-bin`)
// and at the given relative path. If maybeStageArtifact determines that a file
// needs to be staged, it is created in the artifactsDir at the same `root` and
// `relPath` with the given `perm`, and the `stagefn` is called to populate the
// contents of the newly staged file. With the `stagefn`, callers can choose
// whether to copy the file verbatim or edit it. If the source artifact does not
// exist, then maybeStageArtifact does nothing.
// contents of the newly staged file. If the source artifact does not exist,
// or has not been updated since the last time it was staged, maybeStageArtifact
// does nothing. If the `stagefn` returns a non-nil error, then the artifact is
// not staged. (Errors will not be propagated back up to the caller of
// maybeStageArtifact unless we're in the "finalize" phase -- errors can happen
// sporadically if we're not finalizing, especially if we read an artifact while
// it's in the process of being written.)
//
// In the intialCachingPhase, NO artifacts will be staged, but
// maybeStageArtifact will still stat the source file and cache its metadata.
// This is important because Bazel aggressively caches build and test artifacts,
// so just because a file exists, doesn't necessarily mean that it should be
// staged. For example -- if we're running //pkg/server:server_test, and
// bazel-testlogs/pkg/server/server_test/test.xml exists, it may just be because
// that file was created by a PREVIOUS run of :server_test. Staging it right
// away makes no sense in this case -- the XML will contain old data. Instead,
// we note the file metadata so that we know to re-stage it when the file is
// updated. Meanwhile, we force-stage everything that hasn't yet been staged
// once in the finalizePhase, to cover cases where Bazel reused the old
// cached artifact.
//
// For example, one might stage a log file with a call like:
// w.maybeStageArtifact(testlogsSourceDir, "pkg/server/server_test/test.log", 0666, copycontentTo)
// w.maybeStageArtifact(testlogsSourceDir, "pkg/server/server_test/test.log",
// 0666, incrementalUpdatePhase, copycontentTo)
func (w watcher) maybeStageArtifact(
root SourceDir,
relPath string,
perm os.FileMode,
phase Phase,
stagefn func(srcContent []byte, outFile io.Writer) error,
) error {
var rootPath string
Expand All @@ -237,31 +328,68 @@ func (w watcher) maybeStageArtifact(
srcPath := path.Join(rootPath, relPath)
destPath := path.Join(artifactsDir, artifactsSubdir, relPath)

stage := func() error {
contents, err := ioutil.ReadFile(srcPath)
if err != nil {
return err
}
writer := &cancelableWriter{filename: destPath, perm: perm}
err = stagefn(contents, writer)
if err != nil {
// If the stagefn fails, make sure to cancel the write
// so we don't just write garbage to the file.
writer.Canceled = true
// Sporadic failures are OK if we're not finalizing.
if phase == finalizePhase {
return err
}
log.Printf("WARNING: got error %v trying to stage artifact %s", err,
destPath)
return nil
}
err = writer.Close()
if err != nil {
return err
}
w.fileToStaged[srcPath] = true
return nil
}

stat, err := os.Stat(srcPath)
if err == nil {
meta := fileMetadata{size: stat.Size(), modTime: stat.ModTime()}
oldMeta, ok := w.files[srcPath]
// Stage if we haven't staged this file yet, or if the size or modTime has been
// updated since the last time we staged it.
if !ok || meta != oldMeta {
err := os.MkdirAll(path.Dir(destPath), 0777)
if err != nil {
return err
}
f, err := os.OpenFile(destPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
defer f.Close()
contents, err := ioutil.ReadFile(srcPath)
if err != nil {
return err
oldMeta, oldMetaOk := w.fileToMeta[srcPath]
// If we don't have metadata for this file yet, or if the
// metadata has been updated, stage the file.
if !oldMetaOk || meta != oldMeta {
switch phase {
case initialCachingPhase:
// Don't stage, but do make a note of the
// metadata.
case incrementalUpdatePhase, finalizePhase:
_, staged := w.fileToStaged[srcPath]
// This is not a great situation: we've been
// asked to stage a file that was already
// staged. Do it again, but print a warning.
if staged {
log.Printf("WARNING: re-staging already-staged file at %s",
destPath)
}
err := stage()
if err != nil {
return err
}
}
err = stagefn(contents, f)
w.fileToMeta[srcPath] = meta
}
_, staged := w.fileToStaged[srcPath]
// During the finalize phase, stage EVERYTHING that hasn't been
// staged yet. This is our last chance!
if !staged && phase == finalizePhase {
err := stage()
if err != nil {
return err
}
w.files[srcPath] = meta
}
}
// stat errors can simply be ignored -- if the file doesn't exist, we
Expand Down

0 comments on commit 5fb6a4c

Please sign in to comment.