Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: implement support for "unix" resolver scheme #3890

Merged
merged 18 commits into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"math"
"net"
"reflect"
"strings"
"sync"
Expand All @@ -48,6 +47,7 @@ import (
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
_ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
)

const (
Expand Down Expand Up @@ -191,21 +191,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
cc.mkp = cc.dopts.copts.KeepaliveParams

if cc.dopts.copts.Dialer == nil {
cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
network, addr := parseDialTarget(addr)
return (&net.Dialer{}).DialContext(ctx, network, addr)
}
if cc.dopts.withProxy {
cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer)
}
}
cc.dopts.copts.UseProxy = cc.dopts.withProxy

if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
cc.dopts.copts.UserAgent = grpcUA
}
cc.dopts.copts.GrpcUA = grpcUA

if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
Expand Down
59 changes: 59 additions & 0 deletions internal/resolver/unix/unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package unix implements a resolver for unix targets.
package unix

import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
)

const scheme = "unix"

type unixBuilder struct{}

func (*unixBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r := &unixResolver{
target: target,
cc: cc,
}
r.start()
return r, nil
}

func (*unixBuilder) Scheme() string {
return scheme
}

type unixResolver struct {
target resolver.Target
cc resolver.ClientConn
}

func (r *unixResolver) start() {
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "/" + r.target.Endpoint, Attributes: attributes.New("network_type", "unix")}}})
}

func (*unixResolver) ResolveNow(o resolver.ResolveNowOptions) {}

func (*unixResolver) Close() {}

func init() {
resolver.Register(&unixBuilder{})
}
22 changes: 17 additions & 5 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,23 @@ type http2Client struct {
connectionID uint64
}

func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
if fn != nil {
return fn(ctx, addr)
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
if fn == nil {
fn = func(fCtx context.Context, fAddr string) (net.Conn, error) {
if networkType := addr.Attributes.Value("network_type"); networkType != nil {
if networkTypeStr, ok := networkType.(string); ok {
return (&net.Dialer{}).DialContext(ctx, networkTypeStr, addr.Addr)
}
return nil, fmt.Errorf("network_type %v not of type string", networkType)
}
network, fAddr := parseDialTarget(fAddr)
return (&net.Dialer{}).DialContext(ctx, network, fAddr)
}
}
if useProxy {
fn = newProxyDialer(fn, grpcUA)
}
return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
return fn(ctx, addr.Addr)
}

func isTemporary(err error) bool {
Expand Down Expand Up @@ -172,7 +184,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}()

conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.GrpcUA)
if err != nil {
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
Expand Down
35 changes: 35 additions & 0 deletions internal/transport/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"math"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -598,3 +599,37 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
return f
}

// parseDialTarget returns the network and address to pass to dialer
func parseDialTarget(target string) (net string, addr string) {
net = "tcp"

m1 := strings.Index(target, ":")
m2 := strings.Index(target, ":/")

// handle unix:addr which will fail with url.Parse
if m1 >= 0 && m2 < 0 {
if n := target[0:m1]; n == "unix" {
net = n
addr = target[m1+1:]
return net, addr
}
}
if m2 >= 0 {
t, err := url.Parse(target)
if err != nil {
return net, target
}
scheme := t.Scheme
addr = t.Path
if scheme == "unix" {
net = scheme
if addr == "" {
addr = t.Host
}
return net, addr
}
}

return net, target
}
21 changes: 21 additions & 0 deletions internal/transport/http_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,24 @@ func (s) TestDecodeHeaderH2ErrCode(t *testing.T) {
})
}
}

func (s) TestParseDialTarget(t *testing.T) {
for _, test := range []struct {
target, wantNet, wantAddr string
}{
{"unix:etcd:0", "unix", "etcd:0"},
{"unix:///tmp/unix-3", "unix", "/tmp/unix-3"},
{"unix://domain", "unix", "domain"},
{"unix://etcd:0", "unix", "etcd:0"},
{"unix:///etcd:0", "unix", "/etcd:0"},
{"passthrough://unix://domain", "tcp", "passthrough://unix://domain"},
{"https://google.com:443", "tcp", "https://google.com:443"},
{"dns:///google.com", "tcp", "dns:///google.com"},
{"/unix/socket/address", "tcp", "/unix/socket/address"},
} {
gotNet, gotAddr := parseDialTarget(test.target)
if gotNet != test.wantNet || gotAddr != test.wantAddr {
t.Errorf("parseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr)
}
}
}
8 changes: 4 additions & 4 deletions proxy.go → internal/transport/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package grpc
package transport

import (
"bufio"
Expand Down Expand Up @@ -76,7 +76,7 @@ func basicAuth(username, password string) string {
return base64.StdEncoding.EncodeToString([]byte(auth))
}

func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL) (_ net.Conn, err error) {
func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL, grpcUA string) (_ net.Conn, err error) {
defer func() {
if err != nil {
conn.Close()
Expand Down Expand Up @@ -118,7 +118,7 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
// newProxyDialer returns a dialer that connects to proxy first if necessary.
// The returned dialer checks if a proxy is necessary, dial to the proxy with the
// provided dialer, does HTTP CONNECT handshake and returns the connection.
func newProxyDialer(dialer func(context.Context, string) (net.Conn, error)) func(context.Context, string) (net.Conn, error) {
func newProxyDialer(dialer func(context.Context, string) (net.Conn, error), grpcUA string) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, addr string) (conn net.Conn, err error) {
var newAddr string
proxyURL, err := mapAddress(ctx, addr)
Expand All @@ -137,7 +137,7 @@ func newProxyDialer(dialer func(context.Context, string) (net.Conn, error)) func
}
if proxyURL != nil {
// proxy is disabled if proxyURL is nil.
conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL)
conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)
}
return
}
Expand Down
10 changes: 2 additions & 8 deletions proxy_test.go → internal/transport/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*
*/

package grpc
package transport

import (
"bufio"
Expand Down Expand Up @@ -143,7 +143,7 @@ func testHTTPConnect(t *testing.T, proxyURLModify func(*url.URL) *url.URL, proxy
return net.DialTimeout("tcp", addr, time.Until(deadline))
}
return net.Dial("tcp", addr)
})
}, "test")
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
c, err := dialer(ctx, blis.Addr().String())
Expand Down Expand Up @@ -173,9 +173,6 @@ func (s) TestHTTPConnect(t *testing.T) {
if req.Method != http.MethodConnect {
return fmt.Errorf("unexpected Method %q, want %q", req.Method, http.MethodConnect)
}
if req.UserAgent() != grpcUA {
return fmt.Errorf("unexpect user agent %q, want %q", req.UserAgent(), grpcUA)
}
return nil
},
)
Expand All @@ -195,9 +192,6 @@ func (s) TestHTTPConnectBasicAuth(t *testing.T) {
if req.Method != http.MethodConnect {
return fmt.Errorf("unexpected Method %q, want %q", req.Method, http.MethodConnect)
}
if req.UserAgent() != grpcUA {
return fmt.Errorf("unexpect user agent %q, want %q", req.UserAgent(), grpcUA)
}
wantProxyAuthStr := "Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+password))
if got := req.Header.Get(proxyAuthHeaderKey); got != wantProxyAuthStr {
gotDecoded, _ := base64.StdEncoding.DecodeString(got)
Expand Down
4 changes: 4 additions & 0 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,10 @@ type ConnectOptions struct {
ChannelzParentID int64
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
MaxHeaderListSize *uint32
// UseProxy specifies if a proxy should be used.
UseProxy bool
// GrpcUA
GrpcUA string
}

// NewClientTransport establishes the transport with the required ConnectOptions
Expand Down
3 changes: 0 additions & 3 deletions resolver_conn_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ func (s) TestDialParseTargetUnknownScheme(t *testing.T) {
}{
{"/unix/socket/address", "/unix/socket/address"},

// Special test for "unix:///".
{"unix:///unix/socket/address", "unix:///unix/socket/address"},

// For known scheme.
{"passthrough://a.server.com/google.com", "google.com"},
} {
Expand Down
35 changes: 0 additions & 35 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"io"
"io/ioutil"
"math"
"net/url"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -817,40 +816,6 @@ func setCallInfoCodec(c *callInfo) error {
return nil
}

// parseDialTarget returns the network and address to pass to dialer
func parseDialTarget(target string) (net string, addr string) {
net = "tcp"

m1 := strings.Index(target, ":")
m2 := strings.Index(target, ":/")

// handle unix:addr which will fail with url.Parse
if m1 >= 0 && m2 < 0 {
if n := target[0:m1]; n == "unix" {
net = n
addr = target[m1+1:]
return net, addr
}
}
if m2 >= 0 {
t, err := url.Parse(target)
if err != nil {
return net, target
}
scheme := t.Scheme
addr = t.Path
if scheme == "unix" {
net = scheme
if addr == "" {
addr = t.Host
}
return net, addr
}
}

return net, target
}

// channelzData is used to store channelz related data for ClientConn, addrConn and Server.
// These fields cannot be embedded in the original structs (e.g. ClientConn), since to do atomic
// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
Expand Down
21 changes: 0 additions & 21 deletions rpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,27 +191,6 @@ func (s) TestToRPCErr(t *testing.T) {
}
}

func (s) TestParseDialTarget(t *testing.T) {
for _, test := range []struct {
target, wantNet, wantAddr string
}{
{"unix:etcd:0", "unix", "etcd:0"},
{"unix:///tmp/unix-3", "unix", "/tmp/unix-3"},
{"unix://domain", "unix", "domain"},
{"unix://etcd:0", "unix", "etcd:0"},
{"unix:///etcd:0", "unix", "/etcd:0"},
{"passthrough://unix://domain", "tcp", "passthrough://unix://domain"},
{"https://google.com:443", "tcp", "https://google.com:443"},
{"dns:///google.com", "tcp", "dns:///google.com"},
{"/unix/socket/address", "tcp", "/unix/socket/address"},
} {
gotNet, gotAddr := parseDialTarget(test.target)
if gotNet != test.wantNet || gotAddr != test.wantAddr {
t.Errorf("parseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr)
}
}
}

// bmEncode benchmarks encoding a Protocol Buffer message containing mSize
// bytes.
func bmEncode(b *testing.B, mSize int) {
Expand Down