Skip to content

Commit

Permalink
Implement SASL-wrapped RPC
Browse files Browse the repository at this point in the history
This supports encrypted or signed RPC communication with the namenode,
and brings us one step closer to full KRB support.
  • Loading branch information
dtaniwaki authored and colinmarc committed Nov 24, 2019
1 parent 264fe94 commit 1c841f7
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 82 deletions.
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ language: go
go_import_path: github.com/colinmarc/hdfs
go: 1.x
env:
- PLATFORM=hdp2
- PLATFORM=cdh5
- PLATFORM=cdh6
- PLATFORM=cdh6 KERBEROS=true
- PLATFORM=hdp2
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=authentication
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=integrity
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy
before_install:
- export GO111MODULE=on # Travis installs into $GOPATH/src, which disables module support by default.
install:
Expand Down
66 changes: 66 additions & 0 deletions internal/rpc/challenge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package rpc

import (
"fmt"
"regexp"
"strings"

hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common"
)

const (
// qopAuthenication is how the namenode refers to authentication mode, which
// only establishes mutual authentication without encryption (the default).
qopAuthentication = "auth"
// qopIntegrity is how the namenode refers to integrity mode, which, in
// in addition to authentication, verifies the signature of RPC messages.
qopIntegrity = "auth-int"
// qopPrivacy is how the namenode refers to privacy mode, which, in addition
// to authentication and integrity, provides full end-to-end encryption for
// RPC messages.
qopPrivacy = "auth-conf"
)

var challengeRegexp = regexp.MustCompile(",?([a-zA-Z0-9]+)=(\"([^\"]+)\"|([^,]+)),?")

type tokenChallenge struct {
realm string
nonce string
qop string
charset string
cipher []string
algorithm string
}

// parseChallenge returns a tokenChallenge parsed from a challenge response from
// the namenode.
func parseChallenge(auth *hadoop.RpcSaslProto_SaslAuth) (*tokenChallenge, error) {
tokenChallenge := tokenChallenge{}

matched := challengeRegexp.FindAllSubmatch(auth.Challenge, -1)
if matched == nil {
return nil, fmt.Errorf("invalid token challenge: %s", auth.Challenge)
}

for _, m := range matched {
key := string(m[1])
val := string(m[3])
switch key {
case "realm":
tokenChallenge.realm = val
case "nonce":
tokenChallenge.nonce = val
case "qop":
tokenChallenge.qop = val
case "charset":
tokenChallenge.charset = val
case "cipher":
tokenChallenge.cipher = strings.Split(val, ",")
case "algorithm":
tokenChallenge.algorithm = val
default:
}
}

return &tokenChallenge, nil
}
65 changes: 52 additions & 13 deletions internal/rpc/kerberos.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,31 @@ var (
)

func (c *NamenodeConnection) doKerberosHandshake() error {
// All SASL requests/responses use this sequence number.
c.currentRequestID = saslRpcCallId

// Start negotiation, and get the list of supported mechanisms in reply.
c.writeSaslRequest(&hadoop.RpcSaslProto{State: hadoop.RpcSaslProto_NEGOTIATE.Enum()})
err := c.writeSaslRequest(&hadoop.RpcSaslProto{
State: hadoop.RpcSaslProto_NEGOTIATE.Enum(),
})
if err != nil {
return err
}

resp, err := c.readSaslResponse(hadoop.RpcSaslProto_NEGOTIATE)
if err != nil {
return err
}

var mechanism *hadoop.RpcSaslProto_SaslAuth
var krbAuth, tokenAuth *hadoop.RpcSaslProto_SaslAuth
for _, m := range resp.GetAuths() {
if *m.Method == "KERBEROS" {
mechanism = m
switch *m.Method {
case "KERBEROS":
krbAuth = m
case "TOKEN":
tokenAuth = m
default:
}
}

if mechanism == nil {
if krbAuth == nil {
return errKerberosNotSupported
}

Expand All @@ -48,12 +55,34 @@ func (c *NamenodeConnection) doKerberosHandshake() error {
return err
}

if tokenAuth != nil {
challenge, err := parseChallenge(tokenAuth)
if err != nil {
return err
}

switch challenge.qop {
case qopPrivacy, qopIntegrity:
// Switch to SASL RPC handler
c.transport = &saslTransport{
basicTransport: basicTransport{
clientID: c.ClientID,
},
sessionKey: sessionKey,
privacy: challenge.qop == qopPrivacy,
}
case qopAuthentication:
// No special transport is required.
default:
return errors.New("unexpected QOP in challenge")
}
}

err = c.writeSaslRequest(&hadoop.RpcSaslProto{
State: hadoop.RpcSaslProto_INITIATE.Enum(),
Token: token.MechTokenBytes,
Auths: []*hadoop.RpcSaslProto_SaslAuth{mechanism},
Auths: []*hadoop.RpcSaslProto_SaslAuth{krbAuth},
})

if err != nil {
return err
}
Expand Down Expand Up @@ -92,7 +121,6 @@ func (c *NamenodeConnection) doKerberosHandshake() error {
State: hadoop.RpcSaslProto_RESPONSE.Enum(),
Token: signedBytes,
})

if err != nil {
return err
}
Expand All @@ -103,7 +131,8 @@ func (c *NamenodeConnection) doKerberosHandshake() error {
}

func (c *NamenodeConnection) writeSaslRequest(req *hadoop.RpcSaslProto) error {
packet, err := makeRPCPacket(newRPCRequestHeader(saslRpcCallId, c.ClientID), req)
rrh := newRPCRequestHeader(saslRpcCallId, c.ClientID)
packet, err := makeRPCPacket(rrh, req)
if err != nil {
return err
}
Expand All @@ -113,10 +142,20 @@ func (c *NamenodeConnection) writeSaslRequest(req *hadoop.RpcSaslProto) error {
}

func (c *NamenodeConnection) readSaslResponse(expectedState hadoop.RpcSaslProto_SaslState) (*hadoop.RpcSaslProto, error) {
rrh := &hadoop.RpcResponseHeaderProto{}
resp := &hadoop.RpcSaslProto{}
err := c.readResponse("sasl", resp)
err := readRPCPacket(c.conn, rrh, resp)
if err != nil {
return nil, err
} else if int32(rrh.GetCallId()) != saslRpcCallId {
return nil, errors.New("unexpected sequence number")
} else if rrh.GetStatus() != hadoop.RpcResponseHeaderProto_SUCCESS {
return nil, &NamenodeError{
method: "sasl",
message: rrh.GetErrorMsg(),
code: int(rrh.GetErrorDetail()),
exception: rrh.GetExceptionClassName(),
}
} else if resp.GetState() != expectedState {
return nil, fmt.Errorf("unexpected SASL state: %s", resp.GetState().String())
}
Expand Down
78 changes: 11 additions & 67 deletions internal/rpc/namenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ type NamenodeConnection struct {
kerberosServicePrincipleName string
kerberosRealm string

dialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
conn net.Conn
host *namenodeHost
hostList []*namenodeHost
dialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
conn net.Conn
host *namenodeHost
hostList []*namenodeHost
transport transport

reqLock sync.Mutex
done chan struct{}
Expand Down Expand Up @@ -115,8 +116,9 @@ func NewNamenodeConnection(options NamenodeConnectionOptions) (*NamenodeConnecti
kerberosServicePrincipleName: options.KerberosServicePrincipleName,
kerberosRealm: realm,

dialFunc: options.DialFunc,
hostList: hostList,
dialFunc: options.DialFunc,
hostList: hostList,
transport: &basicTransport{clientID: clientId},

done: make(chan struct{}),
}
Expand Down Expand Up @@ -190,20 +192,21 @@ func (c *NamenodeConnection) Execute(method string, req proto.Message, resp prot
defer c.reqLock.Unlock()

c.currentRequestID++
requestID := c.currentRequestID

for {
err := c.resolveConnection()
if err != nil {
return err
}

err = c.writeRequest(method, req)
err = c.transport.writeRequest(c.conn, method, requestID, req)
if err != nil {
c.markFailure(err)
continue
}

err = c.readResponse(method, resp)
err = c.transport.readResponse(c.conn, method, requestID, resp)
if err != nil {
// Only retry on a standby exception.
if nerr, ok := err.(*NamenodeError); ok && nerr.exception == standbyExceptionClass {
Expand All @@ -220,62 +223,6 @@ func (c *NamenodeConnection) Execute(method string, req proto.Message, resp prot
return nil
}

// addLease increases the lease counter on the namenode. As long as the lease
// counter is greater than zero, all leases will automatically be renewed every
//

// RPC definitions

// A request packet:
// +-----------------------------------------------------------+
// | uint32 length of the next three parts |
// +-----------------------------------------------------------+
// | varint length + RpcRequestHeaderProto |
// +-----------------------------------------------------------+
// | varint length + RequestHeaderProto |
// +-----------------------------------------------------------+
// | varint length + Request |
// +-----------------------------------------------------------+
func (c *NamenodeConnection) writeRequest(method string, req proto.Message) error {
rrh := newRPCRequestHeader(c.currentRequestID, c.ClientID)
rh := newRequestHeader(method)

reqBytes, err := makeRPCPacket(rrh, rh, req)
if err != nil {
return err
}

_, err = c.conn.Write(reqBytes)
return err
}

// A response from the namenode:
// +-----------------------------------------------------------+
// | uint32 length of the next two parts |
// +-----------------------------------------------------------+
// | varint length + RpcResponseHeaderProto |
// +-----------------------------------------------------------+
// | varint length + Response |
// +-----------------------------------------------------------+
func (c *NamenodeConnection) readResponse(method string, resp proto.Message) error {
rrh := &hadoop.RpcResponseHeaderProto{}
err := readRPCPacket(c.conn, rrh, resp)
if err != nil {
return err
} else if int32(rrh.GetCallId()) != c.currentRequestID {
return errors.New("unexpected sequence number")
} else if rrh.GetStatus() != hadoop.RpcResponseHeaderProto_SUCCESS {
return &NamenodeError{
method: method,
message: rrh.GetErrorMsg(),
code: int(rrh.GetErrorDetail()),
exception: rrh.GetExceptionClassName(),
}
}

return nil
}

// A handshake packet:
// +-----------------------------------------------------------+
// | Header, 4 bytes ("hrpc") |
Expand Down Expand Up @@ -320,9 +267,6 @@ func (c *NamenodeConnection) doNamenodeHandshake() error {
if err != nil {
return fmt.Errorf("SASL handshake: %s", err)
}

// Reset the sequence number here, since we set it to -33 for the SASL bits.
c.currentRequestID = 0
}

rrh := newRPCRequestHeader(handshakeCallID, c.ClientID)
Expand Down
Loading

0 comments on commit 1c841f7

Please sign in to comment.