Skip to content

Commit

Permalink
refactor: remove use of port in probing metric (#185)
Browse files Browse the repository at this point in the history
* refactor: remove use of port in proving metric

* Fix tests.
  • Loading branch information
sbruens authored Jul 2, 2024
1 parent e8c9080 commit 6a0a242
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 26 deletions.
2 changes: 1 addition & 1 deletion cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *SSServer) startPort(portNum int) error {
port := &ssPort{tcpListener: listener, packetConn: packetConn, cipherList: service.NewCipherList()}
authFunc := service.NewShadowsocksStreamAuthenticator(port.cipherList, &s.replayCache, s.m)
// TODO: Register initial data metrics at zero.
tcpHandler := service.NewTCPHandler(portNum, authFunc, s.m, tcpReadTimeout)
tcpHandler := service.NewTCPHandler(listener.Addr().String(), authFunc, s.m, tcpReadTimeout)
packetHandler := service.NewPacketHandler(s.natTimeout, port.cipherList, s.m)
s.ports[portNum] = port
go service.StreamServe(service.WrapStreamListener(listener.AcceptTCP), tcpHandler.Handle)
Expand Down
5 changes: 2 additions & 3 deletions cmd/outline-ss-server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"net"
"net/netip"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -357,8 +356,8 @@ func (m *outlineMetrics) RemoveUDPNatEntry(clientAddr net.Addr, accessKey string
}
}

func (m *outlineMetrics) AddTCPProbe(status, drainResult string, port int, clientProxyBytes int64) {
m.tcpProbes.WithLabelValues(strconv.Itoa(port), status, drainResult).Observe(float64(clientProxyBytes))
func (m *outlineMetrics) AddTCPProbe(status, drainResult string, listenerId string, clientProxyBytes int64) {
m.tcpProbes.WithLabelValues(listenerId, status, drainResult).Observe(float64(clientProxyBytes))
}

func (m *outlineMetrics) AddTCPCipherSearch(accessKeyFound bool, timeToCipher time.Duration) {
Expand Down
5 changes: 2 additions & 3 deletions cmd/outline-ss-server/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestMethodsDontPanic(t *testing.T) {
ssMetrics.AddUDPPacketFromTarget(ipInfo, "3", "OK", 10, 20)
ssMetrics.AddUDPNatEntry(fakeAddr("127.0.0.1:9"), "key-1")
ssMetrics.RemoveUDPNatEntry(fakeAddr("127.0.0.1:9"), "key-1")
ssMetrics.AddTCPProbe("ERR_CIPHER", "eof", 443, proxyMetrics.ClientProxy)
ssMetrics.AddTCPProbe("ERR_CIPHER", "eof", "127.0.0.1:443", proxyMetrics.ClientProxy)
ssMetrics.AddTCPCipherSearch(true, 10*time.Millisecond)
ssMetrics.AddUDPCipherSearch(true, 10*time.Millisecond)
}
Expand Down Expand Up @@ -164,11 +164,10 @@ func BenchmarkProbe(b *testing.B) {
ssMetrics := newPrometheusOutlineMetrics(nil, prometheus.NewRegistry())
status := "ERR_REPLAY"
drainResult := "other"
port := 12345
data := metrics.ProxyMetrics{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
ssMetrics.AddTCPProbe(status, drainResult, port, data.ClientProxy)
ssMetrics.AddTCPProbe(status, drainResult, "127.0.0.1:12345", data.ClientProxy)
}
}

Expand Down
8 changes: 4 additions & 4 deletions internal/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestTCPEcho(t *testing.T) {
const testTimeout = 200 * time.Millisecond
testMetrics := &service.NoOpTCPMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics)
handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
handler := service.NewTCPHandler(proxyListener.Addr().String(), authFunc, testMetrics, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestRestrictedAddresses(t *testing.T) {
const testTimeout = 200 * time.Millisecond
testMetrics := &statusMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
handler := service.NewTCPHandler(proxyListener.Addr().String(), authFunc, testMetrics, testTimeout)
done := make(chan struct{})
go func() {
service.StreamServe(service.WrapStreamListener(proxyListener.AcceptTCP), handler.Handle)
Expand Down Expand Up @@ -384,7 +384,7 @@ func BenchmarkTCPThroughput(b *testing.B) {
const testTimeout = 200 * time.Millisecond
testMetrics := &service.NoOpTCPMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
handler := service.NewTCPHandler(proxyListener.Addr().String(), authFunc, testMetrics, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -448,7 +448,7 @@ func BenchmarkTCPMultiplexing(b *testing.B) {
const testTimeout = 200 * time.Millisecond
testMetrics := &service.NoOpTCPMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics)
handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
handler := service.NewTCPHandler(proxyListener.Addr().String(), authFunc, testMetrics, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
done := make(chan struct{})
go func() {
Expand Down
12 changes: 6 additions & 6 deletions service/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type TCPMetrics interface {
AddOpenTCPConnection(clientInfo ipinfo.IPInfo)
AddAuthenticatedTCPConnection(clientAddr net.Addr, accessKey string)
AddClosedTCPConnection(clientInfo ipinfo.IPInfo, clientAddr net.Addr, accessKey string, status string, data metrics.ProxyMetrics, duration time.Duration)
AddTCPProbe(status, drainResult string, port int, clientProxyBytes int64)
AddTCPProbe(status, drainResult string, listenerId string, clientProxyBytes int64)
}

func remoteIP(conn net.Conn) netip.Addr {
Expand Down Expand Up @@ -162,17 +162,17 @@ func NewShadowsocksStreamAuthenticator(ciphers CipherList, replayCache *ReplayCa
}

type tcpHandler struct {
port int
listenerId string
m TCPMetrics
readTimeout time.Duration
authenticate StreamAuthenticateFunc
dialer transport.StreamDialer
}

// NewTCPService creates a TCPService
func NewTCPHandler(port int, authenticate StreamAuthenticateFunc, m TCPMetrics, timeout time.Duration) TCPHandler {
func NewTCPHandler(listenerId string, authenticate StreamAuthenticateFunc, m TCPMetrics, timeout time.Duration) TCPHandler {
return &tcpHandler{
port: port,
listenerId: listenerId,
m: m,
readTimeout: timeout,
authenticate: authenticate,
Expand Down Expand Up @@ -375,7 +375,7 @@ func (h *tcpHandler) absorbProbe(clientConn io.ReadCloser, status string, proxyM
_, drainErr := io.Copy(io.Discard, clientConn) // drain socket
drainResult := drainErrToString(drainErr)
logger.Debugf("Drain error: %v, drain result: %v", drainErr, drainResult)
h.m.AddTCPProbe(status, drainResult, h.port, proxyMetrics.ClientProxy)
h.m.AddTCPProbe(status, drainResult, h.listenerId, proxyMetrics.ClientProxy)
}

func drainErrToString(drainErr error) string {
Expand Down Expand Up @@ -404,6 +404,6 @@ func (m *NoOpTCPMetrics) GetIPInfo(net.IP) (ipinfo.IPInfo, error) {
func (m *NoOpTCPMetrics) AddOpenTCPConnection(clientInfo ipinfo.IPInfo) {}
func (m *NoOpTCPMetrics) AddAuthenticatedTCPConnection(clientAddr net.Addr, accessKey string) {
}
func (m *NoOpTCPMetrics) AddTCPProbe(status, drainResult string, port int, clientProxyBytes int64) {
func (m *NoOpTCPMetrics) AddTCPProbe(status, drainResult string, listenerId string, clientProxyBytes int64) {
}
func (m *NoOpTCPMetrics) AddTCPCipherSearch(accessKeyFound bool, timeToCipher time.Duration) {}
18 changes: 9 additions & 9 deletions service/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (m *probeTestMetrics) AddOpenTCPConnection(clientInfo ipinfo.IPInfo) {
func (m *probeTestMetrics) AddAuthenticatedTCPConnection(clientAddr net.Addr, accessKey string) {
}

func (m *probeTestMetrics) AddTCPProbe(status, drainResult string, port int, clientProxyBytes int64) {
func (m *probeTestMetrics) AddTCPProbe(status, drainResult string, listenerId string, clientProxyBytes int64) {
m.mu.Lock()
m.probeData = append(m.probeData, clientProxyBytes)
m.probeStatus = append(m.probeStatus, status)
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestProbeRandom(t *testing.T) {
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond)
handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, 200*time.Millisecond)
done := make(chan struct{})
go func() {
StreamServe(WrapStreamListener(listener.AcceptTCP), handler.Handle)
Expand Down Expand Up @@ -358,7 +358,7 @@ func TestProbeClientBytesBasicTruncated(t *testing.T) {
cipher := firstCipher(cipherList)
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond)
handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, 200*time.Millisecond)
handler.SetTargetDialer(makeValidatingTCPStreamDialer(allowAll))
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestProbeClientBytesBasicModified(t *testing.T) {
cipher := firstCipher(cipherList)
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond)
handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, 200*time.Millisecond)
handler.SetTargetDialer(makeValidatingTCPStreamDialer(allowAll))
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -429,7 +429,7 @@ func TestProbeClientBytesCoalescedModified(t *testing.T) {
cipher := firstCipher(cipherList)
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond)
handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, 200*time.Millisecond)
handler.SetTargetDialer(makeValidatingTCPStreamDialer(allowAll))
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestProbeServerBytesModified(t *testing.T) {
cipher := firstCipher(cipherList)
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond)
handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, 200*time.Millisecond)
done := make(chan struct{})
go func() {
StreamServe(WrapStreamListener(listener.AcceptTCP), handler.Handle)
Expand Down Expand Up @@ -503,7 +503,7 @@ func TestReplayDefense(t *testing.T) {
testMetrics := &probeTestMetrics{}
const testTimeout = 200 * time.Millisecond
authFunc := NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics)
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, testTimeout)
snapshot := cipherList.SnapshotForClientIP(netip.Addr{})
cipherEntry := snapshot[0].Value.(*CipherEntry)
cipher := cipherEntry.CryptoKey
Expand Down Expand Up @@ -582,7 +582,7 @@ func TestReverseReplayDefense(t *testing.T) {
testMetrics := &probeTestMetrics{}
const testTimeout = 200 * time.Millisecond
authFunc := NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics)
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, testTimeout)
snapshot := cipherList.SnapshotForClientIP(netip.Addr{})
cipherEntry := snapshot[0].Value.(*CipherEntry)
cipher := cipherEntry.CryptoKey
Expand Down Expand Up @@ -653,7 +653,7 @@ func probeExpectTimeout(t *testing.T, payloadSize int) {
require.NoError(t, err, "MakeTestCiphers failed: %v", err)
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout)
handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, testTimeout)

done := make(chan struct{})
go func() {
Expand Down

0 comments on commit 6a0a242

Please sign in to comment.