Skip to content

Commit

Permalink
Merge pull request #38 from yuhan6665/master
Browse files Browse the repository at this point in the history
Add stats counter for outbounds
  • Loading branch information
kslr authored Jun 28, 2020
2 parents 2af1a8f + cef1836 commit a8f3450
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 148 deletions.
6 changes: 4 additions & 2 deletions app/policy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ func (p *Policy) ToCorePolicy() policy.Session {
func (p *SystemPolicy) ToCorePolicy() policy.System {
return policy.System{
Stats: policy.SystemStats{
InboundUplink: p.Stats.InboundUplink,
InboundDownlink: p.Stats.InboundDownlink,
InboundUplink: p.Stats.InboundUplink,
InboundDownlink: p.Stats.InboundDownlink,
OutboundUplink: p.Stats.OutboundUplink,
OutboundDownlink: p.Stats.OutboundDownlink,
},
}
}
282 changes: 154 additions & 128 deletions app/policy/config.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions app/policy/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ message SystemPolicy {
message Stats {
bool inbound_uplink = 1;
bool inbound_downlink = 2;
bool outbound_uplink = 3;
bool outbound_downlink = 4;
}

Stats stats = 1;
Expand Down
6 changes: 3 additions & 3 deletions app/proxyman/inbound/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func (w *tcpWorker) callback(conn internet.Connection) {
ctx = session.ContextWithContent(ctx, content)
if w.uplinkCounter != nil || w.downlinkCounter != nil {
conn = &internet.StatCouterConnection{
Connection: conn,
Uplink: w.uplinkCounter,
Downlink: w.downlinkCounter,
Connection: conn,
ReadCounter: w.uplinkCounter,
WriteCounter: w.downlinkCounter,
}
}
if err := w.proxy.Process(ctx, net.Network_TCP, conn, w.dispatcher); err != nil {
Expand Down
48 changes: 46 additions & 2 deletions app/proxyman/outbound/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,40 @@ import (
"v2ray.com/core/common/net"
"v2ray.com/core/common/session"
"v2ray.com/core/features/outbound"
"v2ray.com/core/features/policy"
"v2ray.com/core/features/stats"
"v2ray.com/core/proxy"
"v2ray.com/core/transport"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tls"
"v2ray.com/core/transport/pipe"
)

func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter) {
var uplinkCounter stats.Counter
var downlinkCounter stats.Counter

policy := v.GetFeature(policy.ManagerType()).(policy.Manager)
if len(tag) > 0 && policy.ForSystem().Stats.OutboundUplink {
statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
name := "outbound>>>" + tag + ">>>traffic>>>uplink"
c, _ := stats.GetOrRegisterCounter(statsManager, name)
if c != nil {
uplinkCounter = c
}
}
if len(tag) > 0 && policy.ForSystem().Stats.OutboundDownlink {
statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
name := "outbound>>>" + tag + ">>>traffic>>>downlink"
c, _ := stats.GetOrRegisterCounter(statsManager, name)
if c != nil {
downlinkCounter = c
}
}

return uplinkCounter, downlinkCounter
}

// Handler is an implements of outbound.Handler.
type Handler struct {
tag string
Expand All @@ -25,14 +52,19 @@ type Handler struct {
proxy proxy.Outbound
outboundManager outbound.Manager
mux *mux.ClientManager
uplinkCounter stats.Counter
downlinkCounter stats.Counter
}

// NewHandler create a new Handler based on the given configuration.
func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbound.Handler, error) {
v := core.MustFromContext(ctx)
uplinkCounter, downlinkCounter := getStatCounter(v, config.Tag)
h := &Handler{
tag: config.Tag,
outboundManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager),
uplinkCounter: uplinkCounter,
downlinkCounter: downlinkCounter,
}

if config.SenderSettings != nil {
Expand Down Expand Up @@ -148,7 +180,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
conn = tls.Client(conn, tlsConfig)
}

return conn, nil
return h.getStatCouterConnection(conn), nil
}

newError("failed to get outbound handler with tag: ", tag).AtWarning().WriteToLog(session.ExportIDToError(ctx))
Expand All @@ -164,7 +196,19 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
}
}

return internet.Dial(ctx, dest, h.streamSettings)
conn, err := internet.Dial(ctx, dest, h.streamSettings)
return h.getStatCouterConnection(conn), err
}

func (h *Handler) getStatCouterConnection(conn internet.Connection) (internet.Connection) {
if h.uplinkCounter != nil || h.downlinkCounter != nil {
return &internet.StatCouterConnection{
Connection: conn,
ReadCounter: h.downlinkCounter,
WriteCounter: h.uplinkCounter,
}
}
return conn
}

// GetOutbound implements proxy.GetOutbound.
Expand Down
67 changes: 67 additions & 0 deletions app/proxyman/outbound/handler_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,80 @@
package outbound_test

import (
"context"
"testing"

"v2ray.com/core"
"v2ray.com/core/app/policy"
. "v2ray.com/core/app/proxyman/outbound"
"v2ray.com/core/app/stats"
"v2ray.com/core/common/net"
"v2ray.com/core/common/serial"
"v2ray.com/core/features/outbound"
"v2ray.com/core/proxy/freedom"
"v2ray.com/core/transport/internet"
)

func TestInterfaces(t *testing.T) {
_ = (outbound.Handler)(new(Handler))
_ = (outbound.Manager)(new(Manager))
}

const v2rayKey core.V2rayKey = 1

func TestOutboundWithoutStatCounter(t *testing.T) {
config := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&stats.Config{}),
serial.ToTypedMessage(&policy.Config{
System: &policy.SystemPolicy{
Stats: &policy.SystemPolicy_Stats{
InboundUplink: true,
},
},
}),
},
}

v, _ := core.New(config)
v.AddFeature((outbound.Manager)(new(Manager)))
ctx := context.WithValue(context.Background(), v2rayKey, v)
h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
Tag: "tag",
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
})
conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
_, ok := conn.(*internet.StatCouterConnection)
if ok {
t.Errorf("Expected conn to not be StatCouterConnection")
}
}

func TestOutboundWithStatCounter(t *testing.T) {
config := &core.Config{
App: []*serial.TypedMessage{
serial.ToTypedMessage(&stats.Config{}),
serial.ToTypedMessage(&policy.Config{
System: &policy.SystemPolicy{
Stats: &policy.SystemPolicy_Stats{
OutboundUplink: true,
OutboundDownlink: true,
},
},
}),
},
}

v, _ := core.New(config)
v.AddFeature((outbound.Manager)(new(Manager)))
ctx := context.WithValue(context.Background(), v2rayKey, v)
h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
Tag: "tag",
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
})
conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
_, ok := conn.(*internet.StatCouterConnection)
if !ok {
t.Errorf("Expected conn to be StatCouterConnection")
}
}
5 changes: 3 additions & 2 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"context"
)

type key int
// V2rayKey is the key type of Instance in Context, exported for test.
type V2rayKey int

const v2rayKey key = 1
const v2rayKey V2rayKey = 1

// FromContext returns an Instance from the given context, or nil if the context doesn't contain one.
func FromContext(ctx context.Context) *Instance {
Expand Down
4 changes: 4 additions & 0 deletions features/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type SystemStats struct {
InboundUplink bool
// Whether or not to enable stat counter for downlink traffic in inbound handlers.
InboundDownlink bool
// Whether or not to enable stat counter for uplink traffic in outbound handlers.
OutboundUplink bool
// Whether or not to enable stat counter for downlink traffic in outbound handlers.
OutboundDownlink bool
}

// System contains policy settings at system level.
Expand Down
12 changes: 8 additions & 4 deletions infra/conf/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,19 @@ func (t *Policy) Build() (*policy.Policy, error) {
}

type SystemPolicy struct {
StatsInboundUplink bool `json:"statsInboundUplink"`
StatsInboundDownlink bool `json:"statsInboundDownlink"`
StatsInboundUplink bool `json:"statsInboundUplink"`
StatsInboundDownlink bool `json:"statsInboundDownlink"`
StatsOutboundUplink bool `json:"statsOutboundUplink"`
StatsOutboundDownlink bool `json:"statsOutboundDownlink"`
}

func (p *SystemPolicy) Build() (*policy.SystemPolicy, error) {
return &policy.SystemPolicy{
Stats: &policy.SystemPolicy_Stats{
InboundUplink: p.StatsInboundUplink,
InboundDownlink: p.StatsInboundDownlink,
InboundUplink: p.StatsInboundUplink,
InboundDownlink: p.StatsInboundDownlink,
OutboundUplink: p.StatsOutboundUplink,
OutboundDownlink: p.StatsOutboundDownlink,
},
}, nil
}
Expand Down
4 changes: 3 additions & 1 deletion release/config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@
},
"system": {
"statsInboundUplink": false,
"statsInboundDownlink": false
"statsInboundDownlink": false,
"statsOutboundUplink": false,
"statsOutboundDownlink": false
}
},

Expand Down
12 changes: 6 additions & 6 deletions transport/internet/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,23 @@ type Connection interface {

type StatCouterConnection struct {
Connection
Uplink stats.Counter
Downlink stats.Counter
ReadCounter stats.Counter
WriteCounter stats.Counter
}

func (c *StatCouterConnection) Read(b []byte) (int, error) {
nBytes, err := c.Connection.Read(b)
if c.Uplink != nil {
c.Uplink.Add(int64(nBytes))
if c.ReadCounter != nil {
c.ReadCounter.Add(int64(nBytes))
}

return nBytes, err
}

func (c *StatCouterConnection) Write(b []byte) (int, error) {
nBytes, err := c.Connection.Write(b)
if c.Downlink != nil {
c.Downlink.Add(int64(nBytes))
if c.WriteCounter != nil {
c.WriteCounter.Add(int64(nBytes))
}
return nBytes, err
}

0 comments on commit a8f3450

Please sign in to comment.