Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent split-brain active node writes when using Consul #23013

Merged
merged 3 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelog/23013.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
storage/consul: fix a bug where an active node in a specific sort of network
partition could continue to write data to Consul after a new leader is elected
potentially causing data loss or corruption for keys with many concurrent
writers. For Enterprise clusters this could cause corruption of the merkle trees
leading to failure to complete merkle sync without a full re-index.
```
70 changes: 70 additions & 0 deletions helper/testhelpers/consul/cluster_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package consul

import (
"context"
"fmt"

"github.com/hashicorp/vault/sdk/helper/testcluster"
)

type ClusterStorage struct {
// Set these after calling `NewConsulClusterStorage` but before `Start` (or
// passing in to NewDockerCluster) to control Consul version specifically in
// your test. Leave empty for latest OSS (defined in consulhelper.go).
ConsulVersion string
ConsulEnterprise bool

cleanup func()
config *Config
}

var _ testcluster.ClusterStorage = &ClusterStorage{}

func NewClusterStorage() *ClusterStorage {
return &ClusterStorage{}
}

func (s *ClusterStorage) Start(ctx context.Context, opts *testcluster.ClusterOptions) error {
prefix := ""
if opts != nil && opts.ClusterName != "" {
prefix = fmt.Sprintf("%s-", opts.ClusterName)
}
cleanup, config, err := RunContainer(ctx, prefix, s.ConsulVersion, s.ConsulEnterprise, true)
if err != nil {
return err
}
s.cleanup = cleanup
s.config = config

return nil
}

func (s *ClusterStorage) Cleanup() error {
if s.cleanup != nil {
s.cleanup()
s.cleanup = nil
}
return nil
}

func (s *ClusterStorage) Opts() map[string]interface{} {
if s.config == nil {
return nil
}
return map[string]interface{}{
"address": s.config.ContainerHTTPAddr,
"token": s.config.Token,
"max_parallel": "32",
}
}

func (s *ClusterStorage) Type() string {
return "consul"
}

func (s *ClusterStorage) Config() *Config {
return s.config
}
88 changes: 72 additions & 16 deletions helper/testhelpers/consul/consulhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package consul

import (
"context"
"fmt"
"os"
"strings"
"testing"
Expand All @@ -14,9 +15,16 @@ import (
"github.com/hashicorp/vault/sdk/helper/docker"
)

// LatestConsulVersion is the most recent version of Consul which is used unless
// another version is specified in the test config or environment. This will
// probably go stale as we don't always update it on every release but we rarely
// rely on specific new Consul functionality so that's probably not a problem.
const LatestConsulVersion = "1.15.3"

type Config struct {
docker.ServiceHostPort
Token string
Token string
ContainerHTTPAddr string
}

func (c *Config) APIConfig() *consulapi.Config {
Expand All @@ -26,19 +34,39 @@ func (c *Config) APIConfig() *consulapi.Config {
return apiConfig
}

// PrepareTestContainer creates a Consul docker container. If version is empty,
// the Consul version used will be given by the environment variable
// CONSUL_DOCKER_VERSION, or if that's empty, whatever we've hardcoded as the
// the latest Consul version.
// PrepareTestContainer is a test helper that creates a Consul docker container
// or fails the test if unsuccessful. See RunContainer for more details on the
// configuration.
func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBootstrapSetup bool) (func(), *Config) {
t.Helper()

cleanup, config, err := RunContainer(context.Background(), "", version, isEnterprise, doBootstrapSetup)
if err != nil {
t.Fatalf("failed starting consul: %s", err)
}
return cleanup, config
}

// RunContainer runs Consul in a Docker container unless CONSUL_HTTP_ADDR is
// already found in the environment. Consul version is determined by the version
// argument. If version is empty string, the CONSUL_DOCKER_VERSION environment
// variable is used and if that is empty too, LatestConsulVersion is used
// (defined above). If namePrefix is provided we assume you have chosen a unique
// enough prefix to avoid collision with other tests that may be running in
// parallel and so _do not_ add an additional unique ID suffix. We will also
// ensure previous instances are deleted and leave the container running for
// debugging. This is useful for using Consul as part of at testcluster (i.e.
// when Vault is in Docker too). If namePrefix is empty then a unique suffix is
// added since many older tests rely on a uniq instance of the container. This
// is used by `PrepareTestContainer` which is used typically in tests that rely
// on Consul but run tested code within the test process.
func RunContainer(ctx context.Context, namePrefix, version string, isEnterprise bool, doBootstrapSetup bool) (func(), *Config, error) {
if retAddress := os.Getenv("CONSUL_HTTP_ADDR"); retAddress != "" {
shp, err := docker.NewServiceHostPortParse(retAddress)
if err != nil {
t.Fatal(err)
return nil, nil, err
}
return func() {}, &Config{ServiceHostPort: *shp, Token: os.Getenv("CONSUL_HTTP_TOKEN")}
return func() {}, &Config{ServiceHostPort: *shp, Token: os.Getenv("CONSUL_HTTP_TOKEN")}, nil
}

config := `acl { enabled = true default_policy = "deny" }`
Expand All @@ -47,7 +75,7 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
if consulVersion != "" {
version = consulVersion
} else {
version = "1.11.3" // Latest Consul version, update as new releases come out
version = LatestConsulVersion
}
}
if strings.HasPrefix(version, "1.3") {
Expand All @@ -66,15 +94,18 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
envVars = append(envVars, "CONSUL_LICENSE="+license)

if !hasLicense {
t.Fatalf("Failed to find enterprise license")
return nil, nil, fmt.Errorf("Failed to find enterprise license")
}
}
if namePrefix != "" {
name = namePrefix + name
}

if dockerRepo, hasEnvRepo := os.LookupEnv("CONSUL_DOCKER_REPO"); hasEnvRepo {
repo = dockerRepo
}

runner, err := docker.NewServiceRunner(docker.RunOptions{
dockerOpts := docker.RunOptions{
ContainerName: name,
ImageRepo: repo,
ImageTag: version,
Expand All @@ -83,12 +114,25 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
Ports: []string{"8500/tcp"},
AuthUsername: os.Getenv("CONSUL_DOCKER_USERNAME"),
AuthPassword: os.Getenv("CONSUL_DOCKER_PASSWORD"),
})
}

// Add a unique suffix if there is no per-test prefix provided
addSuffix := true
if namePrefix != "" {
// Don't add a suffix if the caller already provided a prefix
addSuffix = false
// Also enable predelete and non-removal to make debugging easier for test
// cases with named containers).
dockerOpts.PreDelete = true
dockerOpts.DoNotAutoRemove = true
}

runner, err := docker.NewServiceRunner(dockerOpts)
if err != nil {
t.Fatalf("Could not start docker Consul: %s", err)
return nil, nil, fmt.Errorf("Could not start docker Consul: %s", err)
}

svc, err := runner.StartService(context.Background(), func(ctx context.Context, host string, port int) (docker.ServiceConfig, error) {
svc, _, err := runner.StartNewService(ctx, addSuffix, false, func(ctx context.Context, host string, port int) (docker.ServiceConfig, error) {
shp := docker.NewServiceHostPort(host, port)
apiConfig := consulapi.DefaultNonPooledConfig()
apiConfig.Address = shp.Address()
Expand Down Expand Up @@ -165,7 +209,7 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
}
}

// Configure a namespace and parition if testing enterprise Consul
// Configure a namespace and partition if testing enterprise Consul
if isEnterprise {
// Namespaces require Consul 1.7 or newer
namespaceVersion, _ := goversion.NewVersion("1.7")
Expand Down Expand Up @@ -229,8 +273,20 @@ func PrepareTestContainer(t *testing.T, version string, isEnterprise bool, doBoo
}, nil
})
if err != nil {
t.Fatalf("Could not start docker Consul: %s", err)
return nil, nil, err
}

return svc.Cleanup, svc.Config.(*Config)
// Find the container network info.
if len(svc.Container.NetworkSettings.Networks) < 1 {
svc.Cleanup()
return nil, nil, fmt.Errorf("failed to find any network settings for container")
}
cfg := svc.Config.(*Config)
for _, eps := range svc.Container.NetworkSettings.Networks {
// Just pick the first network, we assume only one for now.
// Pull out the real container IP and set that up
cfg.ContainerHTTPAddr = fmt.Sprintf("http://%s:8500", eps.IPAddress)
break
}
return svc.Cleanup, cfg, nil
}
92 changes: 90 additions & 2 deletions helper/testhelpers/teststorage/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
package consul

import (
"sync"
realtesting "testing"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/testhelpers/consul"
"github.com/hashicorp/vault/helper/testhelpers/teststorage"
physConsul "github.com/hashicorp/vault/physical/consul"
"github.com/hashicorp/vault/vault"
"github.com/mitchellh/go-testing-interface"
Expand All @@ -33,5 +33,93 @@ func MakeConsulBackend(t testing.T, logger hclog.Logger) *vault.PhysicalBackendB
}

func ConsulBackendSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions) {
opts.PhysicalFactory = teststorage.SharedPhysicalFactory(MakeConsulBackend)
m := &consulContainerManager{}
opts.PhysicalFactory = m.Backend
}

// consulContainerManager exposes Backend which matches the PhysicalFactory func
// type. When called, it will ensure that a separate Consul container is started
// for each distinct vault cluster that calls it and ensures that each Vault
// core gets a separate Consul backend instance since that contains state
// related to lock sessions. The whole test framework doesn't have a concept of
// "cluster names" outside of the prefix attached to the logger and other
// backend factories, mostly via SharedPhysicalFactory currently implicitly rely
// on being called in a sequence of core 0, 1, 2,... on one cluster and then
// core 0, 1, 2... on the next and so on. Refactoring lots of things to make
// first-class cluster identifiers a thing seems like a heavy lift given that we
// already rely on sequence of calls everywhere else anyway so we do the same
// here - each time the Backend method is called with coreIdx == 0 we create a
// whole new Consul and assume subsequent non 0 index cores are in the same
// cluster.
type consulContainerManager struct {
mu sync.Mutex
current *consulContainerBackendFactory
}

func (m *consulContainerManager) Backend(t testing.T, coreIdx int,
logger hclog.Logger, conf map[string]interface{},
) *vault.PhysicalBackendBundle {
m.mu.Lock()
if coreIdx == 0 || m.current == nil {
// Create a new consul container factory
m.current = &consulContainerBackendFactory{}
}
f := m.current
m.mu.Unlock()

return f.Backend(t, coreIdx, logger, conf)
}

type consulContainerBackendFactory struct {
mu sync.Mutex
refCount int
cleanupFn func()
config map[string]string
}

func (f *consulContainerBackendFactory) Backend(t testing.T, coreIdx int,
logger hclog.Logger, conf map[string]interface{},
) *vault.PhysicalBackendBundle {
f.mu.Lock()
defer f.mu.Unlock()

if f.refCount == 0 {
f.startContainerLocked(t)
logger.Debug("started consul container", "clusterID", conf["cluster_id"],
"address", f.config["address"])
}

f.refCount++
consulBackend, err := physConsul.NewConsulBackend(f.config, logger.Named("consul"))
if err != nil {
t.Fatal(err)
}
return &vault.PhysicalBackendBundle{
Backend: consulBackend,
Cleanup: f.cleanup,
}
}

func (f *consulContainerBackendFactory) startContainerLocked(t testing.T) {
cleanup, config := consul.PrepareTestContainer(t.(*realtesting.T), "", false, true)
f.config = map[string]string{
"address": config.Address(),
"token": config.Token,
"max_parallel": "32",
}
f.cleanupFn = cleanup
}

func (f *consulContainerBackendFactory) cleanup() {
f.mu.Lock()
defer f.mu.Unlock()

if f.refCount < 1 || f.cleanupFn == nil {
return
}
f.refCount--
if f.refCount == 0 {
f.cleanupFn()
f.cleanupFn = nil
}
}
Loading