Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stats counter for outbounds #38

Merged
merged 3 commits into from
Jun 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions app/policy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ func (p *Policy) ToCorePolicy() policy.Session {
func (p *SystemPolicy) ToCorePolicy() policy.System {
return policy.System{
Stats: policy.SystemStats{
InboundUplink: p.Stats.InboundUplink,
InboundDownlink: p.Stats.InboundDownlink,
InboundUplink: p.Stats.InboundUplink,
InboundDownlink: p.Stats.InboundDownlink,
OutboundUplink: p.Stats.OutboundUplink,
OutboundDownlink: p.Stats.OutboundDownlink,
},
}
}
282 changes: 154 additions & 128 deletions app/policy/config.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions app/policy/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ message SystemPolicy {
message Stats {
bool inbound_uplink = 1;
bool inbound_downlink = 2;
bool outbound_uplink = 3;
bool outbound_downlink = 4;
}

Stats stats = 1;
Expand Down
6 changes: 3 additions & 3 deletions app/proxyman/inbound/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func (w *tcpWorker) callback(conn internet.Connection) {
ctx = session.ContextWithContent(ctx, content)
if w.uplinkCounter != nil || w.downlinkCounter != nil {
conn = &internet.StatCouterConnection{
Connection: conn,
Uplink: w.uplinkCounter,
Downlink: w.downlinkCounter,
Connection: conn,
ReadCounter: w.uplinkCounter,
WriteCounter: w.downlinkCounter,
}
}
if err := w.proxy.Process(ctx, net.Network_TCP, conn, w.dispatcher); err != nil {
Expand Down
48 changes: 46 additions & 2 deletions app/proxyman/outbound/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,40 @@ import (
"v2ray.com/core/common/net"
"v2ray.com/core/common/session"
"v2ray.com/core/features/outbound"
"v2ray.com/core/features/policy"
"v2ray.com/core/features/stats"
"v2ray.com/core/proxy"
"v2ray.com/core/transport"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tls"
"v2ray.com/core/transport/pipe"
)

func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter) {
var uplinkCounter stats.Counter
var downlinkCounter stats.Counter

policy := v.GetFeature(policy.ManagerType()).(policy.Manager)
if len(tag) > 0 && policy.ForSystem().Stats.OutboundUplink {
statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
name := "outbound>>>" + tag + ">>>traffic>>>uplink"
c, _ := stats.GetOrRegisterCounter(statsManager, name)
if c != nil {
uplinkCounter = c
}
}
if len(tag) > 0 && policy.ForSystem().Stats.OutboundDownlink {
statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
name := "outbound>>>" + tag + ">>>traffic>>>downlink"
c, _ := stats.GetOrRegisterCounter(statsManager, name)
if c != nil {
downlinkCounter = c
}
}

return uplinkCounter, downlinkCounter
}

// Handler is an implements of outbound.Handler.
type Handler struct {
tag string
Expand All @@ -25,14 +52,19 @@ type Handler struct {
proxy proxy.Outbound
outboundManager outbound.Manager
mux *mux.ClientManager
uplinkCounter stats.Counter
downlinkCounter stats.Counter
}

// NewHandler create a new Handler based on the given configuration.
func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbound.Handler, error) {
v := core.MustFromContext(ctx)
uplinkCounter, downlinkCounter := getStatCounter(v, config.Tag)
h := &Handler{
tag: config.Tag,
outboundManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager),
uplinkCounter: uplinkCounter,
downlinkCounter: downlinkCounter,
}

if config.SenderSettings != nil {
Expand Down Expand Up @@ -148,7 +180,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
conn = tls.Client(conn, tlsConfig)
}

return conn, nil
return h.getStatCouterConnection(conn), nil
}

newError("failed to get outbound handler with tag: ", tag).AtWarning().WriteToLog(session.ExportIDToError(ctx))
Expand All @@ -164,7 +196,19 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
}
}

return internet.Dial(ctx, dest, h.streamSettings)
conn, err := internet.Dial(ctx, dest, h.streamSettings)
return h.getStatCouterConnection(conn), err
}

func (h *Handler) getStatCouterConnection(conn internet.Connection) (internet.Connection) {
if h.uplinkCounter != nil || h.downlinkCounter != nil {
return &internet.StatCouterConnection{
Connection: conn,
ReadCounter: h.downlinkCounter,
WriteCounter: h.uplinkCounter,
}
}
return conn
}

// GetOutbound implements proxy.GetOutbound.
Expand Down
67 changes: 67 additions & 0 deletions app/proxyman/outbound/handler_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,80 @@
package outbound_test

import (
"context"
"testing"

"v2ray.com/core"
"v2ray.com/core/app/policy"
. "v2ray.com/core/app/proxyman/outbound"
"v2ray.com/core/app/stats"
"v2ray.com/core/common/net"
"v2ray.com/core/common/serial"
"v2ray.com/core/features/outbound"
"v2ray.com/core/proxy/freedom"
"v2ray.com/core/transport/internet"
)

func TestInterfaces(t *testing.T) {
_ = (outbound.Handler)(new(Handler))
_ = (outbound.Manager)(new(Manager))
}

const v2rayKey core.V2rayKey = 1

func TestOutboundWithoutStatCounter(t *testing.T) {
config := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&stats.Config{}),
serial.ToTypedMessage(&policy.Config{
System: &policy.SystemPolicy{
Stats: &policy.SystemPolicy_Stats{
InboundUplink: true,
},
},
}),
},
}

v, _ := core.New(config)
v.AddFeature((outbound.Manager)(new(Manager)))
ctx := context.WithValue(context.Background(), v2rayKey, v)
h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
Tag: "tag",
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
})
conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
_, ok := conn.(*internet.StatCouterConnection)
if ok {
t.Errorf("Expected conn to not be StatCouterConnection")
}
}

func TestOutboundWithStatCounter(t *testing.T) {
config := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&stats.Config{}),
serial.ToTypedMessage(&policy.Config{
System: &policy.SystemPolicy{
Stats: &policy.SystemPolicy_Stats{
OutboundUplink: true,
OutboundDownlink: true,
},
},
}),
},
}

v, _ := core.New(config)
v.AddFeature((outbound.Manager)(new(Manager)))
ctx := context.WithValue(context.Background(), v2rayKey, v)
h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
Tag: "tag",
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
})
conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
_, ok := conn.(*internet.StatCouterConnection)
if !ok {
t.Errorf("Expected conn to be StatCouterConnection")
}
}
5 changes: 3 additions & 2 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"context"
)

type key int
// V2rayKey is the key type of Instance in Context, exported for test.
type V2rayKey int

const v2rayKey key = 1
const v2rayKey V2rayKey = 1

// FromContext returns an Instance from the given context, or nil if the context doesn't contain one.
func FromContext(ctx context.Context) *Instance {
Expand Down
4 changes: 4 additions & 0 deletions features/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type SystemStats struct {
InboundUplink bool
// Whether or not to enable stat counter for downlink traffic in inbound handlers.
InboundDownlink bool
// Whether or not to enable stat counter for uplink traffic in outbound handlers.
OutboundUplink bool
// Whether or not to enable stat counter for downlink traffic in outbound handlers.
OutboundDownlink bool
}

// System contains policy settings at system level.
Expand Down
12 changes: 8 additions & 4 deletions infra/conf/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,19 @@ func (t *Policy) Build() (*policy.Policy, error) {
}

type SystemPolicy struct {
StatsInboundUplink bool `json:"statsInboundUplink"`
StatsInboundDownlink bool `json:"statsInboundDownlink"`
StatsInboundUplink bool `json:"statsInboundUplink"`
StatsInboundDownlink bool `json:"statsInboundDownlink"`
StatsOutboundUplink bool `json:"statsOutboundUplink"`
StatsOutboundDownlink bool `json:"statsOutboundDownlink"`
}

func (p *SystemPolicy) Build() (*policy.SystemPolicy, error) {
return &policy.SystemPolicy{
Stats: &policy.SystemPolicy_Stats{
InboundUplink: p.StatsInboundUplink,
InboundDownlink: p.StatsInboundDownlink,
InboundUplink: p.StatsInboundUplink,
InboundDownlink: p.StatsInboundDownlink,
OutboundUplink: p.StatsOutboundUplink,
OutboundDownlink: p.StatsOutboundDownlink,
},
}, nil
}
Expand Down
4 changes: 3 additions & 1 deletion release/config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@
},
"system": {
"statsInboundUplink": false,
"statsInboundDownlink": false
"statsInboundDownlink": false,
"statsOutboundUplink": false,
"statsOutboundDownlink": false
}
},

Expand Down
12 changes: 6 additions & 6 deletions transport/internet/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,23 @@ type Connection interface {

type StatCouterConnection struct {
Connection
Uplink stats.Counter
Downlink stats.Counter
ReadCounter stats.Counter
WriteCounter stats.Counter
}

func (c *StatCouterConnection) Read(b []byte) (int, error) {
nBytes, err := c.Connection.Read(b)
if c.Uplink != nil {
c.Uplink.Add(int64(nBytes))
if c.ReadCounter != nil {
c.ReadCounter.Add(int64(nBytes))
}

return nBytes, err
}

func (c *StatCouterConnection) Write(b []byte) (int, error) {
nBytes, err := c.Connection.Write(b)
if c.Downlink != nil {
c.Downlink.Add(int64(nBytes))
if c.WriteCounter != nil {
c.WriteCounter.Add(int64(nBytes))
}
return nBytes, err
}