Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.4] backport #13359 Fix http2 authority header in single endpoint scenario #16988

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,28 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
}

initialEndpoints := strings.Join(c.Endpoints(), ";")
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.Endpoints()[0]))
conn, err := grpc.DialContext(dctx, target, opts...)
if err != nil {
return nil, err
}
return conn, nil
}

func authority(endpoint string) string {
spl := strings.SplitN(endpoint, "://", 2)
if len(spl) < 2 {
if strings.HasPrefix(endpoint, "unix:") {
return endpoint[len("unix:"):]
}
if strings.HasPrefix(endpoint, "unixs:") {
return endpoint[len("unixs:"):]
}
return endpoint
}
return spl[1]
}

func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
r := endpoint.RequiresCredentials(ep)
switch r {
Expand Down
4 changes: 4 additions & 0 deletions tests/e2e/cluster_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
return p.etcdProc.WithStopSignal(sig)
}

func (p *proxyEtcdProcess) Logs() logsExpect {
return p.etcdProc.Logs()
}

type proxyProc struct {
execPath string
args []string
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type etcdProcessClusterConfig struct {
execPath string
dataDirPath string
keepDataDir bool
envVars map[string]string

clusterSize int

Expand Down Expand Up @@ -291,6 +292,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
etcdCfgs[i] = &etcdServerProcessConfig{
execPath: cfg.execPath,
args: args,
envVars: cfg.envVars,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
Expand Down
214 changes: 214 additions & 0 deletions tests/e2e/ctl_v3_grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !cluster_proxy
// +build !cluster_proxy

package e2e

import (
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.etcd.io/etcd/pkg/testutil"
)

func TestAuthority(t *testing.T) {
tcs := []struct {
name string
useTLS bool
useInsecureTLS bool
// Pattern used to generate endpoints for client. Fields filled
// %d - will be filled with member grpc port
clientURLPattern string

// Pattern used to validate authority received by server. Fields filled:
// %d - will be filled with first member grpc port
expectAuthorityPattern string
}{
{
name: "http://domain[:port]",
clientURLPattern: "http://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "http://address[:port]",
clientURLPattern: "http://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
{
name: "https://domain[:port] insecure",
useTLS: true,
useInsecureTLS: true,
clientURLPattern: "https://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "https://address[:port] insecure",
useTLS: true,
useInsecureTLS: true,
clientURLPattern: "https://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
{
name: "https://domain[:port]",
useTLS: true,
clientURLPattern: "https://localhost:%d",
expectAuthorityPattern: "localhost:%d",
},
{
name: "https://address[:port]",
useTLS: true,
clientURLPattern: "https://127.0.0.1:%d",
expectAuthorityPattern: "127.0.0.1:%d",
},
}
for _, tc := range tcs {
for _, clusterSize := range []int{1, 3} {
t.Run(fmt.Sprintf("Size: %d, Scenario: %q", clusterSize, tc.name), func(t *testing.T) {
defer testutil.AfterTest(t)

cfg := configNoTLS
cfg.clusterSize = clusterSize
if tc.useTLS {
cfg.clientTLS = clientTLS
}
cfg.isClientAutoTLS = tc.useInsecureTLS
// Enable debug mode to get logs with http2 headers (including authority)
cfg.envVars = map[string]string{"GODEBUG": "http2debug=2"}

epc, err := newEtcdProcessCluster(&cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
defer epc.Close()
endpoints := templateEndpoints(t, tc.clientURLPattern, epc)

client := clusterEtcdctlV3(&cfg, endpoints)
err = client.Put("foo", "bar")
if err != nil {
t.Fatal(err)
}

executeWithTimeout(t, 5*time.Second, func() {
assertAuthority(t, fmt.Sprintf(tc.expectAuthorityPattern, 20000), epc)
})
})

}
}
}

func templateEndpoints(t *testing.T, pattern string, clus *etcdProcessCluster) []string {
t.Helper()
endpoints := []string{}
for i := 0; i < clus.cfg.clusterSize; i++ {
ent := pattern
if strings.Contains(ent, "%d") {
ent = fmt.Sprintf(ent, etcdProcessBasePort+i*5)
}
if strings.Contains(ent, "%") {
t.Fatalf("Failed to template pattern, %% symbol left %q", ent)
}
endpoints = append(endpoints, ent)
}
return endpoints
}

func assertAuthority(t *testing.T, expectAurhority string, clus *etcdProcessCluster) {
logs := []logsExpect{}
for _, proc := range clus.procs {
logs = append(logs, proc.Logs())
}
line := firstMatch(t, `http2: decoded hpack field header field ":authority"`, logs...)
line = strings.TrimSuffix(line, "\n")
line = strings.TrimSuffix(line, "\r")
expectLine := fmt.Sprintf(`http2: decoded hpack field header field ":authority" = %q`, expectAurhority)
assert.True(t, strings.HasSuffix(line, expectLine), fmt.Sprintf("Got %q expected suffix %q", line, expectLine))
}

func firstMatch(t *testing.T, expectLine string, logs ...logsExpect) string {
t.Helper()
match := make(chan string, len(logs))
for i := range logs {
go func(l logsExpect) {
line, _ := l.Expect(expectLine)
match <- line
}(logs[i])
}
return <-match
}

func executeWithTimeout(t *testing.T, timeout time.Duration, f func()) {
donec := make(chan struct{})
go func() {
defer close(donec)
f()
}()

select {
case <-time.After(timeout):
testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
case <-donec:
}
}

type etcdctlV3 struct {
cfg *etcdProcessClusterConfig
endpoints []string
}

func clusterEtcdctlV3(cfg *etcdProcessClusterConfig, endpoints []string) *etcdctlV3 {
return &etcdctlV3{
cfg: cfg,
endpoints: endpoints,
}
}

func (ctl *etcdctlV3) Put(key, value string) error {
return ctl.runCmd("put", key, value)
}

func (ctl *etcdctlV3) runCmd(args ...string) error {
cmdArgs := []string{ctlBinPath + "3"}
for k, v := range ctl.flags() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v))
}
cmdArgs = append(cmdArgs, args...)
return spawnWithExpect(cmdArgs, "OK")
}

func (ctl *etcdctlV3) flags() map[string]string {
fmap := make(map[string]string)
if ctl.cfg.clientTLS == clientTLS {
if ctl.cfg.isClientAutoTLS {
fmap["insecure-transport"] = "false"
fmap["insecure-skip-tls-verify"] = "true"
} else if ctl.cfg.isClientCRL {
fmap["cacert"] = caPath
fmap["cert"] = revokedCertPath
fmap["key"] = revokedPrivateKeyPath
} else {
fmap["cacert"] = caPath
fmap["cert"] = certPath
fmap["key"] = privateKeyPath
}
}
fmap["endpoints"] = strings.Join(ctl.endpoints, ",")
return fmap
}
16 changes: 15 additions & 1 deletion tests/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ type etcdProcess interface {
Close() error
WithStopSignal(sig os.Signal) os.Signal
Config() *etcdServerProcessConfig

Logs() logsExpect
}

type logsExpect interface {
Expect(string) (string, error)
}

type etcdServerProcess struct {
Expand All @@ -54,6 +60,7 @@ type etcdServerProcess struct {
type etcdServerProcessConfig struct {
execPath string
args []string
envVars map[string]string
tlsArgs []string

dataDirPath string
Expand Down Expand Up @@ -98,7 +105,7 @@ func (ep *etcdServerProcess) Start() error {
if ep.proc != nil {
panic("already started")
}
proc, err := spawnCmd(append([]string{ep.cfg.execPath}, ep.cfg.args...))
proc, err := spawnCmdWithEnv(append([]string{ep.cfg.execPath}, ep.cfg.args...), ep.cfg.envVars)
if err != nil {
return err
}
Expand Down Expand Up @@ -153,3 +160,10 @@ func (ep *etcdServerProcess) waitReady() error {
}

func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg }

func (ep *etcdServerProcess) Logs() logsExpect {
if ep.proc == nil {
panic("please grab logs before process is stopped")
}
return ep.proc
}
27 changes: 26 additions & 1 deletion tests/e2e/etcd_spawn_nocov.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package e2e

import (
"fmt"
"os"
"strings"

Expand All @@ -27,7 +28,11 @@ import (
const noOutputLineCount = 0 // regular binaries emit no extra lines

func spawnCmd(args []string) (*expect.ExpectProcess, error) {
env := os.Environ()
return spawnCmdWithEnv(args, nil)
}

func spawnCmdWithEnv(args []string, envVars map[string]string) (*expect.ExpectProcess, error) {
env := mergeEnvVariables(envVars)
switch {
case strings.HasSuffix(args[0], ctlBinPath+"2"):
env = append(env, "ETCDCTL_API=2")
Expand All @@ -38,3 +43,23 @@ func spawnCmd(args []string) (*expect.ExpectProcess, error) {
}
return expect.NewExpectWithEnv(args[0], args[1:], env)
}

func mergeEnvVariables(envVars map[string]string) []string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it from main branch?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YES.

There is no function SpawnCmdWithLogger in releasae-3.4, and a new function spawnCmdWithEnv is added in this PR. It is a minor difference from the original PR. It should be OK.

var env []string
// Environment variables are passed as parameter have higher priority
// than os environment variables.
for k, v := range envVars {
env = append(env, fmt.Sprintf("%s=%s", k, v))
}

// Now, we can set os environment variables not passed as parameter.
currVars := os.Environ()
for _, v := range currVars {
p := strings.Split(v, "=")
if _, ok := envVars[p[0]]; !ok {
env = append(env, fmt.Sprintf("%s=%s", p[0], p[1]))
}
}

return env
}
Loading