Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick "Fix http2 authority header in single endpoint scenario" to release-3.5 #13375

Merged
merged 10 commits into from
Sep 30, 2021
18 changes: 15 additions & 3 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,28 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
}

initialEndpoints := strings.Join(c.cfg.Endpoints, ";")
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.Endpoints()[0]))
conn, err := grpc.DialContext(dctx, target, opts...)
if err != nil {
return nil, err
}
return conn, nil
}

func authority(endpoint string) string {
spl := strings.SplitN(endpoint, "://", 2)
if len(spl) < 2 {
if strings.HasPrefix(endpoint, "unix:") {
return endpoint[len("unix:"):]
}
if strings.HasPrefix(endpoint, "unixs:") {
return endpoint[len("unixs:"):]
}
return endpoint
}
return spl[1]
}

func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
r := endpoint.RequiresCredentials(ep)
switch r {
Expand Down
69 changes: 69 additions & 0 deletions pkg/grpc_testing/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc_testing

import (
"context"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type GrpcRecorder struct {
mux sync.RWMutex
requests []RequestInfo
}

type RequestInfo struct {
FullMethod string
Authority string
}

func (ri *GrpcRecorder) UnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ri.record(toRequestInfo(ctx, info))
resp, err := handler(ctx, req)
return resp, err
}
}

func (ri *GrpcRecorder) RecordedRequests() []RequestInfo {
ri.mux.RLock()
defer ri.mux.RUnlock()
reqs := make([]RequestInfo, len(ri.requests))
copy(reqs, ri.requests)
return reqs
}

func toRequestInfo(ctx context.Context, info *grpc.UnaryServerInfo) RequestInfo {
req := RequestInfo{
FullMethod: info.FullMethod,
}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
as := md.Get(":authority")
if len(as) != 0 {
req.Authority = as[0]
}
}
return req
}

func (ri *GrpcRecorder) record(r RequestInfo) {
ri.mux.Lock()
defer ri.mux.Unlock()
ri.requests = append(ri.requests, r)
}
2 changes: 1 addition & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (e *Etcd) servePeers() (err error) {

for _, p := range e.Peers {
u := p.Listener.Addr().String()
gs := v3rpc.Server(e.Server, peerTLScfg)
gs := v3rpc.Server(e.Server, peerTLScfg, nil)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Expand Down
4 changes: 2 additions & 2 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (sctx *serveCtx) serve(
}()

if sctx.insecure {
gs = v3rpc.Server(s, nil, gopts...)
gs = v3rpc.Server(s, nil, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down Expand Up @@ -148,7 +148,7 @@ func (sctx *serveCtx) serve(
if tlsErr != nil {
return tlsErr
}
gs = v3rpc.Server(s, tlscfg, gopts...)
gs = v3rpc.Server(s, tlscfg, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down
6 changes: 4 additions & 2 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,21 @@ const (
maxSendBytes = math.MaxInt32
)

func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
if tls != nil {
bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls})
opts = append(opts, grpc.Creds(bundle.TransportCredentials()))
}

chainUnaryInterceptors := []grpc.UnaryServerInterceptor{
newLogUnaryInterceptor(s),
newUnaryInterceptor(s),
grpc_prometheus.UnaryServerInterceptor,
}
if interceptor != nil {
chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor)
}

chainStreamInterceptors := []grpc.StreamServerInterceptor{
newStreamInterceptor(s),
Expand Down
6 changes: 5 additions & 1 deletion tests/e2e/cluster_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
return p.etcdProc.WithStopSignal(sig)
}

func (p *proxyEtcdProcess) Logs() logsExpect {
return p.etcdProc.Logs()
}

type proxyProc struct {
lg *zap.Logger
execPath string
Expand All @@ -132,7 +136,7 @@ func (pp *proxyProc) start() error {
if pp.proc != nil {
panic("already started")
}
proc, err := spawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...))
proc, err := spawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), nil)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type etcdProcessClusterConfig struct {
execPath string
dataDirPath string
keepDataDir bool
envVars map[string]string

clusterSize int

Expand Down Expand Up @@ -318,6 +319,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
lg: lg,
execPath: cfg.execPath,
args: args,
envVars: cfg.envVars,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func etcdctlBackup(t testing.TB, clus *etcdProcessCluster, dataDir, backupDir st
cmdArgs = append(cmdArgs, "--with-v3=false")
}
t.Logf("Running: %v", cmdArgs)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_alarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,5 @@ func alarmTest(cx ctlCtx) {

func ctlV3Alarm(cx ctlCtx, cmd string, as ...string) error {
cmdArgs := append(cx.PrefixArgs(), "alarm", cmd)
return spawnWithExpects(cmdArgs, as...)
return spawnWithExpects(cmdArgs, cx.envMap, as...)
}
38 changes: 19 additions & 19 deletions tests/e2e/ctl_v3_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func authEnable(cx ctlCtx) error {

func ctlV3AuthEnable(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "auth", "enable")
return spawnWithExpect(cmdArgs, "Authentication Enabled")
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "Authentication Enabled")
}

func authDisableTest(cx ctlCtx) {
Expand Down Expand Up @@ -139,12 +139,12 @@ func authDisableTest(cx ctlCtx) {

func ctlV3AuthDisable(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "auth", "disable")
return spawnWithExpect(cmdArgs, "Authentication Disabled")
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "Authentication Disabled")
}

func authStatusTest(cx ctlCtx) {
cmdArgs := append(cx.PrefixArgs(), "auth", "status")
if err := spawnWithExpects(cmdArgs, "Authentication Status: false", "AuthRevision:"); err != nil {
if err := spawnWithExpects(cmdArgs, cx.envMap, "Authentication Status: false", "AuthRevision:"); err != nil {
cx.t.Fatal(err)
}

Expand All @@ -155,15 +155,15 @@ func authStatusTest(cx ctlCtx) {
cx.user, cx.pass = "root", "root"
cmdArgs = append(cx.PrefixArgs(), "auth", "status")

if err := spawnWithExpects(cmdArgs, "Authentication Status: true", "AuthRevision:"); err != nil {
if err := spawnWithExpects(cmdArgs, cx.envMap, "Authentication Status: true", "AuthRevision:"); err != nil {
cx.t.Fatal(err)
}

cmdArgs = append(cx.PrefixArgs(), "auth", "status", "--write-out", "json")
if err := spawnWithExpect(cmdArgs, "enabled"); err != nil {
if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, "enabled"); err != nil {
cx.t.Fatal(err)
}
if err := spawnWithExpect(cmdArgs, "authRevision"); err != nil {
if err := spawnWithExpectWithEnv(cmdArgs, cx.envMap, "authRevision"); err != nil {
cx.t.Fatal(err)
}
}
Expand Down Expand Up @@ -381,25 +381,25 @@ func authRoleRevokeDuringOpsTest(cx ctlCtx) {
}

func ctlV3PutFailAuth(cx ctlCtx, key, val string) error {
return spawnWithExpect(append(cx.PrefixArgs(), "put", key, val), "authentication failed")
return spawnWithExpectWithEnv(append(cx.PrefixArgs(), "put", key, val), cx.envMap, "authentication failed")
}

func ctlV3PutFailPerm(cx ctlCtx, key, val string) error {
return spawnWithExpect(append(cx.PrefixArgs(), "put", key, val), "permission denied")
return spawnWithExpectWithEnv(append(cx.PrefixArgs(), "put", key, val), cx.envMap, "permission denied")
}

func authSetupTestUser(cx ctlCtx) {
if err := ctlV3User(cx, []string{"add", "test-user", "--interactive=false"}, "User test-user created", []string{"pass"}); err != nil {
cx.t.Fatal(err)
}
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "add", "test-role"), "Role test-role created"); err != nil {
if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "add", "test-role"), cx.envMap, "Role test-role created"); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3User(cx, []string{"grant-role", "test-user", "test-role"}, "Role test-role is granted to user test-user", nil); err != nil {
cx.t.Fatal(err)
}
cmd := append(cx.PrefixArgs(), "role", "grant-permission", "test-role", "readwrite", "foo")
if err := spawnWithExpect(cmd, "Role test-role updated"); err != nil {
if err := spawnWithExpectWithEnv(cmd, cx.envMap, "Role test-role updated"); err != nil {
cx.t.Fatal(err)
}
}
Expand Down Expand Up @@ -611,7 +611,7 @@ func authTestCertCN(cx ctlCtx) {
if err := ctlV3User(cx, []string{"add", "example.com", "--interactive=false"}, "User example.com created", []string{""}); err != nil {
cx.t.Fatal(err)
}
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "add", "test-role"), "Role test-role created"); err != nil {
if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "add", "test-role"), cx.envMap, "Role test-role created"); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3User(cx, []string{"grant-role", "example.com", "test-role"}, "Role test-role is granted to user example.com", nil); err != nil {
Expand Down Expand Up @@ -921,21 +921,21 @@ func authTestRoleGet(cx ctlCtx) {
"KV Read:", "foo",
"KV Write:", "foo",
}
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}

// test-user can get the information of test-role because it belongs to the role
cx.user, cx.pass = "test-user", "pass"
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "test-role"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}

// test-user cannot get the information of root because it doesn't belong to the role
expected = []string{
"Error: etcdserver: permission denied",
}
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "root"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "role", "get", "root"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}
}
Expand All @@ -952,21 +952,21 @@ func authTestUserGet(cx ctlCtx) {
"Roles: test-role",
}

if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}

// test-user can get the information of test-user itself
cx.user, cx.pass = "test-user", "pass"
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "test-user"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}

// test-user cannot get the information of root
expected = []string{
"Error: etcdserver: permission denied",
}
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "root"), expected...); err != nil {
if err := spawnWithExpects(append(cx.PrefixArgs(), "user", "get", "root"), cx.envMap, expected...); err != nil {
cx.t.Fatal(err)
}
}
Expand All @@ -977,7 +977,7 @@ func authTestRoleList(cx ctlCtx) {
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "list"), "test-role"); err != nil {
if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "list"), cx.envMap, "test-role"); err != nil {
cx.t.Fatal(err)
}
}
Expand Down Expand Up @@ -1088,7 +1088,7 @@ func certCNAndUsername(cx ctlCtx, noPassword bool) {
cx.t.Fatal(err)
}
}
if err := spawnWithExpect(append(cx.PrefixArgs(), "role", "add", "test-role-cn"), "Role test-role-cn created"); err != nil {
if err := spawnWithExpectWithEnv(append(cx.PrefixArgs(), "role", "add", "test-role-cn"), cx.envMap, "Role test-role-cn created"); err != nil {
cx.t.Fatal(err)
}
if err := ctlV3User(cx, []string{"grant-role", "example.com", "test-role-cn"}, "Role test-role-cn is granted to user example.com", nil); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,5 @@ func ctlV3Compact(cx ctlCtx, rev int64, physical bool) error {
if physical {
cmdArgs = append(cmdArgs, "--physical")
}
return spawnWithExpect(cmdArgs, "compacted revision "+rs)
return spawnWithExpectWithEnv(cmdArgs, cx.envMap, "compacted revision "+rs)
}
4 changes: 2 additions & 2 deletions tests/e2e/ctl_v3_defrag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ func ctlV3OnlineDefrag(cx ctlCtx) error {
for i := range lines {
lines[i] = "Finished defragmenting etcd member"
}
return spawnWithExpects(cmdArgs, lines...)
return spawnWithExpects(cmdArgs, cx.envMap, lines...)
}

func ctlV3OfflineDefrag(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgsUtl(), "defrag", "--data-dir", cx.dataDir)
lines := []string{"finished defragmenting directory"}
return spawnWithExpects(cmdArgs, lines...)
return spawnWithExpects(cmdArgs, cx.envMap, lines...)
}

func defragOfflineTest(cx ctlCtx) {
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_elect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func testElect(cx ctlCtx) {
// ctlV3Elect creates a elect process with a channel listening for when it wins the election.
func ctlV3Elect(cx ctlCtx, name, proposal string) (*expect.ExpectProcess, <-chan string, error) {
cmdArgs := append(cx.PrefixArgs(), "elect", name, proposal)
proc, err := spawnCmd(cmdArgs)
proc, err := spawnCmd(cmdArgs, cx.envMap)
outc := make(chan string, 1)
if err != nil {
close(outc)
Expand Down
Loading