Skip to content

Commit

Permalink
Fix race condition (#415) (#451)
Browse files Browse the repository at this point in the history
In a rare condition hydration-controller and parser may proceed with file
hydration or listing when git-sync see and updates the commit hash
in status, which causes resource mismatch and accidental deletion.

This change blocks the risk by double checking if commit has changed during
hydration or listing.

go/cs-race-condition
  • Loading branch information
tiffanny29631 authored Mar 6, 2023
1 parent 4f50b86 commit 579018f
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 3 deletions.
27 changes: 26 additions & 1 deletion pkg/hydrate/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (h *Hydrator) Run(ctx context.Context) {
defer runTimer.Stop()
rehydrateTimer := time.NewTimer(h.RehydratePeriod)
defer rehydrateTimer.Stop()
absSourceDir := h.SourceRoot.Join(cmpath.RelativeSlash(h.SourceLink))
absSourceDir := h.absSourceDir()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -115,13 +115,38 @@ func (h *Hydrator) runHydrate(sourceCommit, syncDir string) HydrationError {
if err := kustomizeBuild(syncDir, dest, true); err != nil {
return err
}

newCommit, err := ComputeCommit(h.absSourceDir())
if err != nil {
return NewTransientError(err)
} else if sourceCommit != newCommit {
return NewTransientError(fmt.Errorf("source commit changed while running Kustomize build, was %s, now %s. It will be retried in the next sync", sourceCommit, newCommit))
}

if err := updateSymlink(h.HydratedRoot.OSPath(), h.HydratedLink, newHydratedDir.OSPath()); err != nil {
return NewInternalError(errors.Wrapf(err, "unable to update the symbolic link to %s", newHydratedDir.OSPath()))
}
klog.Infof("Successfully rendered %s for commit %s", syncDir, sourceCommit)
return nil
}

// ComputeCommit returns the computed commit from given sourceDir, or error
// if the sourceDir fails symbolic link evaluation
func ComputeCommit(sourceDir cmpath.Absolute) (string, error) {
dir, err := sourceDir.EvalSymlinks()
if err != nil {
return "", errors.Wrapf(err, "unable to evaluate the symbolic link of sourceDir %s", dir)
}
newCommit := filepath.Base(dir.OSPath())
return newCommit, nil
}

// absSourceDir returns the absolute path of a source directory by joining the
// root source directory path and a relative path to the source directory
func (h *Hydrator) absSourceDir() cmpath.Absolute {
return h.SourceRoot.Join(cmpath.RelativeSlash(h.SourceLink))
}

// hydrate renders the source git repo to hydrated configs.
func (h *Hydrator) hydrate(sourceCommit, syncDir string) HydrationError {
hydrate, err := needsKustomize(syncDir)
Expand Down
175 changes: 175 additions & 0 deletions pkg/hydrate/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package hydrate

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"kpt.dev/configsync/pkg/api/configsync/v1beta1"
"kpt.dev/configsync/pkg/importer/filesystem/cmpath"
ft "kpt.dev/configsync/pkg/importer/filesystem/filesystemtest"
"sigs.k8s.io/cli-utils/pkg/testutil"
)

var originCommit = "1234567890abcdef"
var differentCommit = "abcdef1234567890"

func TestRunHydrate(t *testing.T) {
testCases := []struct {
name string
commit string // the commit in the hydrator object that might differ from origin commit
wantedErr error
}{
{
name: "Run hydrate when source commit is not changed",
commit: originCommit,
wantedErr: nil,
},
{
name: "Run hydrate when source commit is changed",
commit: differentCommit,
wantedErr: testutil.EqualError(NewTransientError(fmt.Errorf("source commit changed while running Kustomize build, was %s, now %s. It will be retried in the next sync", originCommit, differentCommit))),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// create a temporary directory with a commit hash
tempDir, err := ioutil.TempDir(os.TempDir(), "run-hydrate-test")
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.RemoveAll(tempDir)
if err != nil {
t.Fatal(err)
}
}()

commitDir := filepath.Join(tempDir, tc.commit)
err = os.Mkdir(commitDir, os.ModePerm)
if err != nil {
t.Fatal(err)
}

kustFileGenerated, err := ioutil.TempFile(commitDir, "kustomization.yaml")
if err != nil {
t.Fatal(err)
}
err = os.Rename(kustFileGenerated.Name(), filepath.Join(commitDir, "kustomization.yaml"))
if err != nil {
t.Fatal(err)
}

// create a symlink to point to the temporary directory
dir := ft.NewTestDir(t)
symDir := dir.Root().Join(cmpath.RelativeSlash("run-hydrate-symlink"))
err = os.Symlink(commitDir, symDir.OSPath())
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.Remove(symDir.OSPath())
if err != nil {
t.Fatal(err)
}
}()

hydrator := &Hydrator{
DonePath: "",
SourceType: v1beta1.HelmSource,
SourceRoot: cmpath.Absolute(commitDir),
HydratedRoot: cmpath.Absolute(commitDir),
SourceLink: "",
HydratedLink: "tmp-link",
SyncDir: "",
PollingPeriod: 1 * time.Minute,
RehydratePeriod: 1 * time.Minute,
ReconcilerName: "root-reconciler",
}

absSourceDir := hydrator.SourceRoot.Join(cmpath.RelativeSlash(hydrator.SourceLink))
_, syncDir, err := SourceCommitAndDir(hydrator.SourceType, absSourceDir, hydrator.SyncDir, hydrator.ReconcilerName)
if err != nil {
t.Fatal(fmt.Errorf("failed to get commit and sync directory from the source directory %s: %v", commitDir, err))
}

err = hydrator.runHydrate(originCommit, syncDir.OSPath())
testutil.AssertEqual(t, tc.wantedErr, err)
})
}
}

func TestComputeCommit(t *testing.T) {
testCases := []struct {
name string
sourceCommit string
}{
{
name: "Computed commit should be the same to the one given in sourceDir",
sourceCommit: originCommit,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// create a temporary directory with a commit hash
tempDir, err := ioutil.TempDir(os.TempDir(), "compute-commit-test")
if err != nil {
t.Fatal(err)
}
defer func(path string) {
err := os.RemoveAll(path)
if err != nil {
t.Fatal(err)
}
}(tempDir)
absTempDir := cmpath.Absolute(tempDir)

commitDir := filepath.Join(tempDir, originCommit)
err = os.Mkdir(commitDir, os.ModePerm)
if err != nil {
t.Fatal(err)
}

// create a symlink to point to the temporary directory
dir := ft.NewTestDir(t)
symDir := dir.Root().Join(cmpath.RelativeSlash("compute-commit-symlink"))
err = os.Symlink(commitDir, symDir.OSPath())
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.Remove(symDir.OSPath())
if err != nil {
t.Fatal(err)
}
}()

absSourceDir := absTempDir.Join(cmpath.RelativeSlash(tc.sourceCommit))
computed, err := ComputeCommit(symDir)
if computed != tc.sourceCommit {
t.Errorf("wanted commit to be %v, got %v", tc.sourceCommit, computed)
} else if err != nil {
t.Errorf("error computing commit from %s: %v ", absSourceDir, err)
}
})
}
}
2 changes: 1 addition & 1 deletion pkg/parse/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func readFromSource(ctx context.Context, p Parser, trigger string, state *reconc
state.resetCache()

// Read all the files under state.syncDir
sourceStatus.errs = opts.readConfigFiles(&sourceState)
sourceStatus.errs = opts.readConfigFiles(&sourceState, p)
if sourceStatus.errs == nil {
// Set `state.cache.source` after `readConfigFiles` succeeded
state.cache.source = sourceState
Expand Down
10 changes: 9 additions & 1 deletion pkg/parse/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type sourceState struct {
// - if rendered is true, state.syncDir contains the hydrated files.
// - if rendered is false, state.syncDir contains the source files.
// readConfigFiles should be called after sourceState is populated.
func (o *files) readConfigFiles(state *sourceState) status.Error {
func (o *files) readConfigFiles(state *sourceState, p Parser) status.Error {
if state == nil || state.commit == "" || state.syncDir.OSPath() == "" {
return status.InternalError("sourceState is not populated yet")
}
Expand All @@ -95,6 +95,14 @@ func (o *files) readConfigFiles(state *sourceState) status.Error {
if err != nil {
return status.PathWrapError(errors.Wrap(err, "listing files in the configs directory"), syncDir.OSPath())
}

newCommit, err := hydrate.ComputeCommit(p.options().SourceDir)
if err != nil {
return status.TransientError(err)
} else if newCommit != state.commit {
return status.TransientError(fmt.Errorf("source commit changed while listing files, was %s, now %s. It will be retried in the next sync", state.commit, newCommit))
}

state.files = fileList
return nil
}
Expand Down
130 changes: 130 additions & 0 deletions pkg/parse/source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package parse

import (
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"

"kpt.dev/configsync/pkg/core"
"kpt.dev/configsync/pkg/declared"
"kpt.dev/configsync/pkg/importer/filesystem"
"kpt.dev/configsync/pkg/importer/filesystem/cmpath"
ft "kpt.dev/configsync/pkg/importer/filesystem/filesystemtest"
"kpt.dev/configsync/pkg/kinds"
"kpt.dev/configsync/pkg/status"
syncertest "kpt.dev/configsync/pkg/syncer/syncertest/fake"
"kpt.dev/configsync/pkg/testing/fake"
"sigs.k8s.io/cli-utils/pkg/testutil"
)

var originCommit = "1234567890abcde"
var differentCommit = "abcde1234567890"

func TestReadConfigFiles(t *testing.T) {
testCases := []struct {
name string
commit string
wantedErr error
}{
{
name: "read config files when commit is not changed",
commit: originCommit,
wantedErr: nil,
},
{
name: "read config files when commit is changed",
commit: differentCommit,
wantedErr: testutil.EqualError(status.TransientError(fmt.Errorf("source commit changed while running Kustomize build, was %s, now %s. It will be retried in the next sync", originCommit, differentCommit))),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// create temporary directory for parser
tempRoot, _ := ioutil.TempDir(os.TempDir(), "read-config-test")
defer func(path string) {
err := os.RemoveAll(path)
if err != nil {
t.Fatal(err)
}
}(tempRoot)

// mock the parser's syncDir that could change while program running
parserCommitDir := filepath.Join(tempRoot, tc.commit)
err := os.Mkdir(parserCommitDir, os.ModePerm)
if err != nil {
t.Fatal(err)
}

// mock the original sourceCommit that is passed by sourceState when
// running readConfigFiles
sourceCommitDir := filepath.Join(tempRoot, originCommit)
if _, err := os.Stat(sourceCommitDir); errors.Is(err, os.ErrNotExist) {
err = os.Mkdir(sourceCommitDir, os.ModePerm)
if err != nil {
t.Fatal(err)
}
}

// create a symlink to point to the temporary directory
dir := ft.NewTestDir(t)
symDir := dir.Root().Join(cmpath.RelativeSlash("list-file-symlink"))
err = os.Symlink(parserCommitDir, symDir.OSPath())
if err != nil {
t.Fatal(err)
}
defer func() {
err := os.Remove(symDir.OSPath())
if err != nil {
t.Fatal(err)
}
}()

sourceState := &sourceState{
commit: originCommit,
syncDir: cmpath.Absolute(sourceCommitDir),
files: nil,
}

parser := &root{
sourceFormat: filesystem.SourceFormatUnstructured,
opts: opts{
parser: &fakeParser{},
syncName: rootSyncName,
reconcilerName: rootReconcilerName,
client: syncertest.NewClient(t, core.Scheme, fake.RootSyncObjectV1Beta1(rootSyncName)),
discoveryInterface: syncertest.NewDiscoveryClient(kinds.Namespace(), kinds.Role()),
updater: updater{
scope: declared.RootReconciler,
resources: &declared.Resources{},
},
mux: &sync.Mutex{},
},
}

// set the necessary FileSource of parser
parser.SourceDir = symDir

err = parser.readConfigFiles(sourceState, parser)
testutil.AssertEqual(t, tc.wantedErr, err)
})
}
}

0 comments on commit 579018f

Please sign in to comment.