Skip to content

Commit

Permalink
review comments round #1
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Mar 31, 2021
1 parent 602e624 commit ee2fe56
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 38 deletions.
45 changes: 26 additions & 19 deletions xds/internal/client/filter_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ const (
//
// The logic specified in the documentation around the xDS FilterChainMatch
// proto mentions 8 criteria to match on. gRPC does not support 4 of those, and
// we get rid of filter chains which contain any of these unsupported fields at
// we ignore filter chains which contain any of these unsupported fields at
// parsing time. Here we use the remaining 4 criteria to find a matching filter
// chain in the following order:
// Destination IP address, Source type, Source IP address, Source port.
// TODO: Ignore chains with unsupported fields *only* at connection time.
type FilterChainManager struct {
// Destination prefix is the first match criteria that we support.
// Therefore, this multi-stage map is indexed on destination prefixes
Expand Down Expand Up @@ -145,7 +146,7 @@ func NewFilterChainManager(lis *v3listenerpb.Listener) (*FilterChainManager, err
var def *FilterChain
if dfc := lis.GetDefaultFilterChain(); dfc != nil {
var err error
if def, err = filterChainFromProto(lis.GetDefaultFilterChain()); err != nil {
if def, err = filterChainFromProto(dfc); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -524,31 +525,37 @@ func filterBasedOnSourcePrefixes(srcPrefixes []*sourcePrefixes, srcAddr net.IP)
return matchingSrcPrefixes
}

// filterBasedOnSourcePorts is the last state of the filter chain matching
// filterBasedOnSourcePorts is the last stage of the filter chain matching
// algorithm. It trims the filter chains based on the source ports. It expects
// to be left with a single matching filter chain and returns an error if there
// are multiple matching filter chains at the end.
func filterBasedOnSourcePorts(srcPrefixEntries []*sourcePrefixEntry, srcPort int) (*FilterChain, error) {
// We need to find the most specific match from each of these source prefix
// entries. Matching filter chains are associated with a weight of 0 or 1,
// indicating whether they were a wildcard or a specific port match
// respectively. Once all source prefix entries have been processed, if we
// are left with more than one matching filter chain at the highest weight,
// it means that we have a match conflict.
matchingFCs := make(map[int][]*FilterChain)
// entries. A match could be a wildcard match (this happens when the match
// criteria does not specify source ports) or a specific port match (this
// happens when the match criteria specifies a set of ports and the source
// port of the incoming connection matches one of the specified ports). The
// latter is considered to be a more specific match.
//
// Once all source prefix entries have been processed, we need a single
// most-specific matching filter chain.
var (
wildcardFCs []*FilterChain
portFC *FilterChain
)
for _, spe := range srcPrefixEntries {
if fc := spe.srcPortMap[srcPort]; fc != nil {
// There can only be one specific match. So, the moment we find a
// second specific match, we can error out.
if len(matchingFCs[1]) != 0 {
// There can only be one non-wildcard match. So, the moment we find
// a second one, we can error out.
if portFC != nil {
return nil, errors.New("multiple matching filter chains")
}
matchingFCs[1] = append(matchingFCs[1], fc)
portFC = fc
} else if fc := spe.srcPortMap[0]; fc != nil {
matchingFCs[0] = append(matchingFCs[0], fc)
wildcardFCs = append(wildcardFCs, fc)
}
}
if len(matchingFCs) == 0 {
if portFC == nil && len(wildcardFCs) == 0 {
// This happens when specific source ports are mentioned in the matching
// source prefix (and therefore there is no entry for a wildcard port),
// but none of them match the incoming source port.
Expand All @@ -557,14 +564,14 @@ func filterBasedOnSourcePorts(srcPrefixEntries []*sourcePrefixEntry, srcPort int

// If we have a specific match, we can be sure that there was only one such
// match. So, we can safely return it.
if fcs := matchingFCs[1]; len(fcs) != 0 {
return fcs[0], nil
if portFC != nil {
return portFC, nil
}

// If we did not find a specific match and have more than one wildcard
// match, we have a match conflict.
if fcs := matchingFCs[0]; len(fcs) != 1 {
if len(wildcardFCs) != 1 {
return nil, errors.New("multiple matching filter chains")
}
return matchingFCs[0][0], nil
return wildcardFCs[0], nil
}
41 changes: 22 additions & 19 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,38 +157,41 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
// from the net package, and it is useful for us to not shutdown the
// server in these conditions. The listen queue being full is one
// such case.
if ne, ok := err.(interface{ Temporary() bool }); ok && ne.Temporary() {
retries++
timer := time.NewTimer(backoffFunc(retries))
select {
case <-timer.C:
case <-l.closed.Done():
timer.Stop()
// Continuing here will cause us to call Accept() again
// which will return a non-temporary error.
continue
}
if ne, ok := err.(interface{ Temporary() bool }); !ok || !ne.Temporary() {
return nil, err
}
retries++
timer := time.NewTimer(backoffFunc(retries))
select {
case <-timer.C:
case <-l.closed.Done():
timer.Stop()
// Continuing here will cause us to call Accept() again
// which will return a non-temporary error.
continue
}
// Non-temporary errors will be sent upstairs.
return nil, err
continue
}
// Reset retries after a successful Accept().
retries = 0

// TODO: Close connections if in "non-serving" state

// Since the net.Conn represents an incoming connection, the source and
// destination address can be retrieved from the local address and remote
// address of the net.Conn respectively. And since we only accept TCP
// listeners in Serve(), we can safely type assert here.
destAddr := conn.LocalAddr().(*net.TCPAddr).IP
srcAddr := conn.RemoteAddr().(*net.TCPAddr)
// destination address can be retrieved from the local address and
// remote address of the net.Conn respectively. If the incoming
// connection is not a TCP connection, we close it and move on.
destAddr, ok1 := conn.LocalAddr().(*net.TCPAddr)
srcAddr, ok2 := conn.RemoteAddr().(*net.TCPAddr)
if !ok1 || !ok2 {
conn.Close()
continue
}

l.mu.RLock()
fc, err := l.filterChains.Lookup(xdsclient.FilterChainLookupParams{
IsUnspecifiedListener: l.isUnspecifiedAddr,
DestAddr: destAddr,
DestAddr: destAddr.IP,
SourceAddr: srcAddr.IP,
SourcePort: srcAddr.Port,
})
Expand Down

0 comments on commit ee2fe56

Please sign in to comment.