diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index 6d7931b4e33a..ede180696b8c 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -15,7 +15,9 @@ package proxy import ( - "context" + "bufio" + "bytes" + "crypto/tls" "fmt" "io" mrand "math/rand" @@ -31,12 +33,14 @@ import ( "go.uber.org/zap" "go.etcd.io/etcd/client/pkg/v3/transport" + "go.etcd.io/etcd/tests/v3/framework/testutils" ) var ( defaultDialTimeout = 3 * time.Second defaultBufferSize = 48 * 1024 defaultRetryInterval = 10 * time.Millisecond + FixturesDir = testutils.MustAbsPath("../fixtures") ) // Server defines proxy server layer that simulates common network faults: @@ -137,6 +141,15 @@ type ServerConfig struct { DialTimeout time.Duration BufferSize int RetryInterval time.Duration + + IsSSLTerminatingProxy bool + TerminatingTLSInfo transport.TLSInfo + // for SSL termination proxy: put outgoing connection local addr - connection initiated by peerURLs + // for transparent proxy: read incoming connection remote addr - check BlackholeMap to see if peerURLs is blocked + ConnectionMap map[string]string + ConnectionMapMu *sync.RWMutex + BlackholeMap map[string]bool // peerURLs to be blackholed - true + BlackholeMapMu *sync.RWMutex } type server struct { @@ -150,6 +163,13 @@ type server struct { tlsInfo transport.TLSInfo dialTimeout time.Duration + terminatingTLSInfo transport.TLSInfo + isSSLTerminatingProxy bool + connectionMap map[string]string + connectionMapMu *sync.RWMutex + blackholeMap map[string]bool + blackholeMapMu *sync.RWMutex + bufferSize int retryInterval time.Duration @@ -200,6 +220,13 @@ func NewServer(cfg ServerConfig) Server { tlsInfo: cfg.TLSInfo, dialTimeout: cfg.DialTimeout, + isSSLTerminatingProxy: cfg.IsSSLTerminatingProxy, + terminatingTLSInfo: cfg.TerminatingTLSInfo, + connectionMap: cfg.ConnectionMap, + connectionMapMu: cfg.ConnectionMapMu, + blackholeMap: cfg.BlackholeMap, + blackholeMapMu: cfg.BlackholeMapMu, + bufferSize: cfg.BufferSize, retryInterval: cfg.RetryInterval, @@ -216,6 +243,7 @@ func NewServer(cfg ServerConfig) Server { if err == nil { s.fromPort, _ = strconv.Atoi(fromPort) } + var toPort string _, toPort, err = net.SplitHostPort(cfg.To.Host) if err == nil { @@ -250,7 +278,11 @@ func NewServer(cfg ServerConfig) Server { var ln net.Listener if !s.tlsInfo.Empty() { - ln, err = transport.NewListener(addr, s.from.Scheme, &s.tlsInfo) + if s.isSSLTerminatingProxy { + ln, err = transport.NewListener(addr, "https", &s.tlsInfo) + } else { + ln, err = transport.NewListener(addr, s.from.Scheme, &s.tlsInfo) + } } else { ln, err = net.Listen(s.from.Scheme, addr) } @@ -265,6 +297,7 @@ func NewServer(cfg ServerConfig) Server { go s.listenAndServe() s.lg.Info("started proxying", zap.String("from", s.From()), zap.String("to", s.To())) + return s } @@ -284,7 +317,7 @@ func (s *server) To() string { func (s *server) listenAndServe() { defer s.closeWg.Done() - ctx := context.Background() + // ctx := context.Background() s.lg.Info("proxy is listening on", zap.String("from", s.From())) close(s.readyc) @@ -355,22 +388,37 @@ func (s *server) listenAndServe() { var out net.Conn if !s.tlsInfo.Empty() { - var tp *http.Transport - tp, err = transport.NewTransport(s.tlsInfo, s.dialTimeout) + // Not sure why this is not working... + // var tp *http.Transport + // tp, err = transport.NewTransport(s.tlsInfo, s.dialTimeout) + // if err != nil { + // select { + // case s.errc <- err: + // select { + // case <-s.donec: + // return + // default: + // } + // case <-s.donec: + // return + // } + // continue + // } + // out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host) + + // using simple Cert loading for now + var cert tls.Certificate + CertPath := FixturesDir + "/server.crt" + PrivateKeyPath := FixturesDir + "/server.key.insecure" + cert, err = tls.LoadX509KeyPair(CertPath, PrivateKeyPath) if err != nil { - select { - case s.errc <- err: - select { - case <-s.donec: - return - default: - } - case <-s.donec: - return - } - continue + panic(err) } - out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host) + conf := &tls.Config{ + InsecureSkipVerify: true, + Certificates: []tls.Certificate{cert}, + } + out, err = tls.Dial(s.to.Scheme, s.to.Host, conf) } else { out, err = net.Dial(s.to.Scheme, s.to.Host) } @@ -393,9 +441,21 @@ func (s *server) listenAndServe() { go func() { defer s.closeWg.Done() // read incoming bytes from listener, dispatch to outgoing connection + s.transmit(out, in) out.Close() in.Close() + + if s.isSSLTerminatingProxy { + // when the connection is closed, we delete the entry from the map + s.connectionMapMu.Lock() + port, err := getPort(out.LocalAddr().String()) + if err != nil { + panic("") + } + delete(s.connectionMap, port) + s.connectionMapMu.Unlock() + } }() go func() { defer s.closeWg.Done() @@ -403,18 +463,42 @@ func (s *server) listenAndServe() { s.receive(in, out) in.Close() out.Close() + + if s.isSSLTerminatingProxy { + // when the connection is closed, we delete the entry from the map + s.connectionMapMu.Lock() + port, err := getPort(out.LocalAddr().String()) + if err != nil { + panic("") + } + delete(s.connectionMap, port) + s.connectionMapMu.Unlock() + } }() } } -func (s *server) transmit(dst io.Writer, src io.Reader) { +func (s *server) transmit(dst, src net.Conn) { s.ioCopy(dst, src, proxyTx) } -func (s *server) receive(dst io.Writer, src io.Reader) { +func (s *server) receive(dst, src net.Conn) { s.ioCopy(dst, src, proxyRx) } +func getPort(host string) (string, error) { + if strings.HasPrefix(host, "https") { + return strings.Replace(host, "https://localhost:", "", 1), nil + } + + _, port, err := net.SplitHostPort(host) + if err != nil { + return "", nil + } + // return strconv.Atoi(port) + return port, nil +} + type proxyType uint8 const ( @@ -422,7 +506,7 @@ const ( proxyRx ) -func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { +func (s *server) ioCopy(dst, src net.Conn, ptype proxyType) { buf := make([]byte, s.bufferSize) for { nr1, err := src.Read(buf) @@ -455,6 +539,53 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { } data := buf[:nr1] + // attempt to obtain the header information + // this approach won't work since we are looking at encrypted bytestream, and headers are also encrypted + // unless we are able to decrypt this + if s.isSSLTerminatingProxy && ptype == proxyTx { + /* + if we are isolating A from B and C + case 1 - Proxy of A + The incoming traffic and outgoing traffic are already handled by the original design + case 2 - Proxy of B and C + The outgoing traffic to A is already handled by the original design + + The incoming traffic from A will have to be inspected at all SSL termination proxy, by extracting + X-PeerURLs field from the Header. Since the SSL termination proxy will initiate a new connection + to forward the traffic to the transparent proxy (the original proxy), we will record which port + that the new forwarding connection is using in a map shared by SSL termination proxy and the + transparent proxy, containing the key-value pair of port-peer mapping. So now at the transparent + proxy, we can identify which traffic is coming from which node! + + Note A: failed to decode headers and no X-PeerURLs + After looking into the logs, we can see the following traffic that doesn't have the X-PeerURLs present + - map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /members + - map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /version + - map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /raft/probing + */ + if req, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(data))); err != nil { + // check note A + } else { + peerURLs := req.Header.Get("X-PeerURLs") + + if len(peerURLs) == 0 { + // check note A + } else { + s.connectionMapMu.Lock() + port, err := getPort(dst.LocalAddr().String()) + if err != nil { + panic("") + } + blockingPort, err := getPort(peerURLs) // TODO: make this able to parse multiple peerURLs? + if err != nil { + panic("") + } + s.connectionMap[port] = blockingPort + s.connectionMapMu.Unlock() + } + } + } + // alters/corrupts/drops data switch ptype { case proxyTx: @@ -472,6 +603,40 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) { default: panic("unknown proxy type") } + + // we also block off the traffic to targetted node + if !s.isSSLTerminatingProxy && data != nil { + var port string + switch ptype { + case proxyTx: + port, err = getPort(src.RemoteAddr().String()) + if err != nil { + panic("") + } + + case proxyRx: + port, err = getPort(dst.RemoteAddr().String()) + if err != nil { + panic("") + } + default: + panic("unknown proxy type") + } + + s.connectionMapMu.RLock() + if peerPort, ok := s.connectionMap[port]; ok { + + s.blackholeMapMu.RLock() + if _, ok := s.blackholeMap[peerPort]; ok { + data = nil + } + s.blackholeMapMu.RUnlock() + } else { + // we might have non-peer messages on the network + } + s.connectionMapMu.RUnlock() + } + nr2 := len(data) switch ptype { case proxyTx: @@ -867,6 +1032,21 @@ func (s *server) BlackholeTx() { zap.String("from", s.From()), zap.String("to", s.To()), ) + + s.blackholeMapMu.Lock() + + port, err := getPort(s.listener.Addr().String()) + if err != nil { + panic("") + } + portInt, err := strconv.Atoi(port) + if err != nil { + panic("") + } + port = strconv.Itoa((portInt - 2)) + s.blackholeMap[port] = true + + s.blackholeMapMu.Unlock() } func (s *server) UnblackholeTx() { @@ -876,6 +1056,19 @@ func (s *server) UnblackholeTx() { zap.String("from", s.From()), zap.String("to", s.To()), ) + + s.blackholeMapMu.Lock() + port, err := getPort(s.listener.Addr().String()) + if err != nil { + panic("") + } + portInt, err := strconv.Atoi(port) + if err != nil { + panic("") + } + port = strconv.Itoa((portInt - 2)) + delete(s.blackholeMap, port) + s.blackholeMapMu.Unlock() } func (s *server) BlackholeRx() { @@ -972,7 +1165,11 @@ func (s *server) ResetListener() error { var ln net.Listener var err error if !s.tlsInfo.Empty() { - ln, err = transport.NewListener(s.from.Host, s.from.Scheme, &s.tlsInfo) + if s.isSSLTerminatingProxy { + ln, err = transport.NewListener(s.from.Host, "https", &s.tlsInfo) + } else { + ln, err = transport.NewListener(s.from.Host, s.from.Scheme, &s.tlsInfo) + } } else { ln, err = net.Listen(s.from.Scheme, s.from.Host) } diff --git a/tests/e2e/blackhole_test.go b/tests/e2e/blackhole_test.go index 68150cb8608a..28989b44673a 100644 --- a/tests/e2e/blackhole_test.go +++ b/tests/e2e/blackhole_test.go @@ -43,6 +43,7 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea e2e.WithClusterSize(clusterSize), e2e.WithSnapshotCount(10), e2e.WithSnapshotCatchUpEntries(10), + e2e.WithSSLTerminationProxy(true), e2e.WithIsPeerTLS(true), e2e.WithPeerProxy(true), ) @@ -68,6 +69,7 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea t.Logf("Wait for new leader election with remaining members") leaderEPC := epc.Procs[waitLeader(t, epc, mockPartitionNodeIndex)] + t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot.)") writeKVs(t, leaderEPC.Etcdctl(), 0, 20) e2e.AssertProcessLogs(t, leaderEPC, "saved snapshot") @@ -76,7 +78,8 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea assertRevision(t, leaderEPC, 21) assertRevision(t, partitionedMember, 1) - // Wait for some time to restore the network + // Wait for 1s before restoring the network + t.Logf("Wait 1s before restoring the network") time.Sleep(1 * time.Second) t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name) proxy.UnblackholeTx() diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 8f3a102c3059..d2fd4ccba57d 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -24,6 +24,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "testing" "time" @@ -31,6 +32,7 @@ import ( "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/client/pkg/v3/transport" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/pkg/v3/proxy" "go.etcd.io/etcd/server/v3/embed" @@ -143,6 +145,9 @@ type EtcdProcessClusterConfig struct { GoFailClientTimeout time.Duration LazyFSEnabled bool PeerProxy bool + SSLTerminationProxy bool + BlackholeMap map[string]bool + BlackholeMapMu *sync.RWMutex // Process config @@ -255,6 +260,10 @@ func WithIsPeerAutoTLS(isPeerAutoTLS bool) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.IsPeerAutoTLS = isPeerAutoTLS } } +func WithSSLTerminationProxy(SSLTerminationProxy bool) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.SSLTerminationProxy = SSLTerminationProxy } +} + func WithClientAutoTLS(isClientAutoTLS bool) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.Client.AutoTLS = isClientAutoTLS } } @@ -397,6 +406,10 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP if cfg.ServerConfig.SnapshotCount == 0 { cfg.ServerConfig.SnapshotCount = etcdserver.DefaultSnapshotCount } + if cfg.SSLTerminationProxy { + cfg.BlackholeMap = make(map[string]bool) + cfg.BlackholeMapMu = &sync.RWMutex{} + } etcdCfgs := cfg.EtcdAllServerProcessConfigs(t) epc := &EtcdProcessCluster{ @@ -481,12 +494,25 @@ func (cfg *EtcdProcessClusterConfig) SetInitialOrDiscovery(serverCfg *EtcdServer func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i int) *EtcdServerProcessConfig { var curls []string var curl string - port := cfg.BasePort + 5*i + port := cfg.BasePort + 6*i clientPort := port - peerPort := port + 1 + peerPort := port + 1 // the port the the peer actually listens on metricsPort := port + 2 - peer2Port := port + 3 + peer2Port := port + 3 // the port that the peer advertises clientHTTPPort := port + 4 + transparentProxyPort := port + 5 // used when SSL termination proxy is turned on + + /* + 0 + out = 20003 + listen = 20005 + 1 + out = 20009 + listen = 20011 + 2 + out = 20015 + listen = 20017 + */ if cfg.Client.ConnectionType == ClientTLSAndNonTLS { curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS) @@ -499,15 +525,60 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in peerListenURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} peerAdvertiseURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} var proxyCfg *proxy.ServerConfig + var SSLTerminationProxyCfg *proxy.ServerConfig if cfg.PeerProxy { if !cfg.IsPeerTLS { panic("Can't use peer proxy without peer TLS as it can result in malformed packets") } + + /* + if the SSL terminating proxy is switch off, the traffic will go like this + peer2Port -- (transparent proxy)-- peerPort + peerAdvertiseURL peerListenURL + + if the SSL terminating proxy is switch on, the traffic will go like this + peer2Port --(SSL terminating proxy)-- transparentProxyPort -- (transparent proxy)-- peerPort + peerAdvertiseURL transparentProxyURL peerListenURL + */ peerAdvertiseURL.Host = fmt.Sprintf("localhost:%d", peer2Port) - proxyCfg = &proxy.ServerConfig{ - Logger: zap.NewNop(), - To: peerListenURL, - From: peerAdvertiseURL, + transparentProxyURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", transparentProxyPort)} + if cfg.SSLTerminationProxy { + connectionMap := make(map[string]string) + var connectionMapMu sync.RWMutex + + SSLTerminationProxyCfg = &proxy.ServerConfig{ + Logger: zap.NewNop(), + To: transparentProxyURL, + From: peerAdvertiseURL, + TLSInfo: transport.TLSInfo{ + CertFile: CertPath, + KeyFile: PrivateKeyPath, + TrustedCAFile: CaPath, + }, + IsSSLTerminatingProxy: true, + ConnectionMap: connectionMap, + ConnectionMapMu: &connectionMapMu, + BlackholeMap: cfg.BlackholeMap, + BlackholeMapMu: cfg.BlackholeMapMu, + } + + proxyCfg = &proxy.ServerConfig{ + Logger: zap.NewNop(), + To: peerListenURL, + From: transparentProxyURL, + IsSSLTerminatingProxy: false, + ConnectionMap: connectionMap, + ConnectionMapMu: &connectionMapMu, + BlackholeMap: cfg.BlackholeMap, + BlackholeMapMu: cfg.BlackholeMapMu, + } + } else { + proxyCfg = &proxy.ServerConfig{ + Logger: zap.NewNop(), + To: peerListenURL, + From: peerAdvertiseURL, + IsSSLTerminatingProxy: false, + } } } @@ -624,6 +695,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in KeepDataDir: cfg.KeepDataDir, Name: name, PeerURL: peerAdvertiseURL, + PeerListenURL: peerListenURL, ClientURL: curl, ClientHTTPURL: clientHTTPURL, MetricsURL: murl, @@ -631,6 +703,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in GoFailPort: gofailPort, GoFailClientTimeout: cfg.GoFailClientTimeout, Proxy: proxyCfg, + SSLTerminationProxy: SSLTerminationProxyCfg, LazyFSEnabled: cfg.LazyFSEnabled, } } diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index af5c437c7c75..f3d12892c738 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -69,12 +69,13 @@ type LogsExpect interface { } type EtcdServerProcess struct { - cfg *EtcdServerProcessConfig - proc *expect.ExpectProcess - proxy proxy.Server - lazyfs *LazyFS - failpoints *BinaryFailpoints - donec chan struct{} // closed when Interact() terminates + cfg *EtcdServerProcessConfig + proc *expect.ExpectProcess + proxy proxy.Server + SSLTerminationProxy proxy.Server + lazyfs *LazyFS + failpoints *BinaryFailpoints + donec chan struct{} // closed when Interact() terminates } type EtcdServerProcessConfig struct { @@ -91,6 +92,7 @@ type EtcdServerProcessConfig struct { Name string PeerURL url.URL + PeerListenURL url.URL ClientURL string ClientHTTPURL string MetricsURL string @@ -100,8 +102,9 @@ type EtcdServerProcessConfig struct { GoFailPort int GoFailClientTimeout time.Duration - LazyFSEnabled bool - Proxy *proxy.ServerConfig + LazyFSEnabled bool + Proxy *proxy.ServerConfig + SSLTerminationProxy *proxy.ServerConfig } func NewEtcdServerProcess(t testing.TB, cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { @@ -151,6 +154,15 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error { if ep.proc != nil { panic("already started") } + if ep.cfg.SSLTerminationProxy != nil && ep.SSLTerminationProxy == nil { + ep.cfg.lg.Info("starting SSL termination proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.SSLTerminationProxy.From.String()), zap.String("to", ep.cfg.SSLTerminationProxy.To.String())) + ep.SSLTerminationProxy = proxy.NewServer(*ep.cfg.SSLTerminationProxy) + select { + case <-ep.SSLTerminationProxy.Ready(): + case err := <-ep.SSLTerminationProxy.Error(): + return err + } + } if ep.cfg.Proxy != nil && ep.proxy == nil { ep.cfg.lg.Info("starting proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.Proxy.From.String()), zap.String("to", ep.cfg.Proxy.To.String())) ep.proxy = proxy.NewServer(*ep.cfg.Proxy) @@ -200,23 +212,12 @@ func (ep *EtcdServerProcess) Stop() (err error) { ep.cfg.lg.Info("stopping server...", zap.String("name", ep.cfg.Name)) - defer func() { - ep.proc = nil - }() - - err = ep.proc.Stop() - if err != nil { - return err - } - err = ep.proc.Close() - if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { - return err - } - <-ep.donec - ep.donec = make(chan struct{}) - if ep.cfg.PeerURL.Scheme == "unix" || ep.cfg.PeerURL.Scheme == "unixs" { - err = os.Remove(ep.cfg.PeerURL.Host + ep.cfg.PeerURL.Path) - if err != nil && !os.IsNotExist(err) { + // TODO: if we terminate the proxies first, stopping SSL termination proxy will not hang as long. Why?! (timeout setting?) + if ep.SSLTerminationProxy != nil { + ep.cfg.lg.Info("stopping SSL termination proxy...", zap.String("name", ep.cfg.Name)) + err = ep.SSLTerminationProxy.Close() + ep.SSLTerminationProxy = nil + if err != nil { return err } } @@ -237,6 +238,28 @@ func (ep *EtcdServerProcess) Stop() (err error) { return err } } + + defer func() { + ep.proc = nil + }() + + err = ep.proc.Stop() + if err != nil { + return err + } + err = ep.proc.Close() + if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { + return err + } + <-ep.donec + ep.donec = make(chan struct{}) + if ep.cfg.PeerURL.Scheme == "unix" || ep.cfg.PeerURL.Scheme == "unixs" { + err = os.Remove(ep.cfg.PeerURL.Host + ep.cfg.PeerURL.Path) + if err != nil && !os.IsNotExist(err) { + return err + } + } + return nil } @@ -375,7 +398,11 @@ func (f *BinaryFailpoints) SetupHTTP(ctx context.Context, failpoint, payload str } defer resp.Body.Close() if resp.StatusCode != http.StatusNoContent { - return fmt.Errorf("bad status code: %d", resp.StatusCode) + errorBody, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + return fmt.Errorf("bad status code: %d (%#v)", resp.StatusCode, string(errorBody)) } return nil } diff --git a/tests/robustness/failpoint/network.go b/tests/robustness/failpoint/network.go index 5d59fba3d99c..c8b351510587 100644 --- a/tests/robustness/failpoint/network.go +++ b/tests/robustness/failpoint/network.go @@ -64,9 +64,6 @@ func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, proces func Blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error { proxy := member.PeerProxy() - // Blackholing will cause peers to not be able to use streamWriters registered with member - // but peer traffic is still possible because member has 'pipeline' with peers - // TODO: find a way to stop all traffic t.Logf("Blackholing traffic from and to member %q", member.Config().Name) proxy.BlackholeTx() proxy.BlackholeRx()