From bcb0cb5688735a1d2252b4d2974ba72d812350cc Mon Sep 17 00:00:00 2001 From: YUNRU Date: Sun, 11 Aug 2024 08:33:47 +0800 Subject: [PATCH] SplitHTTP Client: Multiplexing Config --- infra/conf/transport_internet.go | 31 +++ transport/internet/splithttp/config.go | 24 +- transport/internet/splithttp/config.pb.go | 257 ++++++++++++++++++---- transport/internet/splithttp/config.proto | 13 ++ transport/internet/splithttp/dialer.go | 35 +-- transport/internet/splithttp/mux.go | 115 ++++++++++ 6 files changed, 418 insertions(+), 57 deletions(-) create mode 100644 transport/internet/splithttp/mux.go diff --git a/infra/conf/transport_internet.go b/infra/conf/transport_internet.go index 9e13f246a5af..36d59414fa9f 100644 --- a/infra/conf/transport_internet.go +++ b/infra/conf/transport_internet.go @@ -234,6 +234,14 @@ type SplitHTTPConfig struct { ScMinPostsIntervalMs *Int32Range `json:"scMinPostsIntervalMs"` NoSSEHeader bool `json:"noSSEHeader"` XPaddingBytes *Int32Range `json:"xPaddingBytes"` + Mux SplitHTTPMux `json:"mux"` +} + +type SplitHTTPMux struct { + Mode string `json:"mode"` + MaxConnectionConcurrency Int32Range `json:"maxConnectionConcurrency"` + MaxConnectionLifetime Int32Range `json:"maxConnectionLifetime"` + MaxConnection int32 `json:"maxConnection"` } func splithttpNewRandRangeConfig(input *Int32Range) *splithttp.RandRangeConfig { @@ -257,6 +265,28 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) { } else if c.Host == "" && c.Headers["Host"] != "" { c.Host = c.Headers["Host"] } + + // Multiplexing config + muxProtobuf := splithttp.Multiplexing{} + switch strings.ToLower(c.Mux.Mode) { + case "disabled", "off", "none": + muxProtobuf.Mode = splithttp.Multiplexing_DISABLED + case "prefer_reuse", "preferreuse", "prefer_existing", "preferexisting", "": // Default: Reuse existing connections before opening new ones + muxProtobuf.Mode = splithttp.Multiplexing_PREFER_EXTISTING + case "prefer_new", "prefernew": // Open new connections until max limit, then reuse + muxProtobuf.Mode = splithttp.Multiplexing_PREFER_NEW + default: + return nil, errors.New("unsupported splithttp multiplexing mode: ", c.Mux.Mode) + } + muxProtobuf.MaxConnectionConcurrency = &splithttp.RandRangeConfig{ + From: c.Mux.MaxConnectionConcurrency.From, + To: c.Mux.MaxConnectionConcurrency.To, + } + muxProtobuf.MaxConnectionLifetime = &splithttp.RandRangeConfig{ + From: c.Mux.MaxConnectionLifetime.From, + To: c.Mux.MaxConnectionLifetime.To, + } + muxProtobuf.MaxConnections = c.Mux.MaxConnection config := &splithttp.Config{ Path: c.Path, Host: c.Host, @@ -266,6 +296,7 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) { ScMinPostsIntervalMs: splithttpNewRandRangeConfig(c.ScMinPostsIntervalMs), NoSSEHeader: c.NoSSEHeader, XPaddingBytes: splithttpNewRandRangeConfig(c.XPaddingBytes), + Mux: &muxProtobuf, } return config, nil } diff --git a/transport/internet/splithttp/config.go b/transport/internet/splithttp/config.go index 6b5a2005ea21..4681fd1a2fca 100644 --- a/transport/internet/splithttp/config.go +++ b/transport/internet/splithttp/config.go @@ -53,7 +53,6 @@ func (c *Config) GetRequestHeader() http.Header { return header } - func (c *Config) WriteResponseHeader(writer http.ResponseWriter) { paddingLen := c.GetNormalizedXPaddingBytes().roll() if paddingLen > 0 { @@ -72,6 +71,27 @@ func (c *Config) GetNormalizedScMaxConcurrentPosts() RandRangeConfig { return *c.ScMaxConcurrentPosts } +func (m *Multiplexing) GetNormalizedMaxConnectionConcurrency() RandRangeConfig { + if m.MaxConnectionConcurrency == nil || m.MaxConnectionConcurrency.To == 0 { + return RandRangeConfig{ + From: 1, + To: 3, + } + } + + return *m.MaxConnectionConcurrency +} + +func (c *Multiplexing) GetNormalizedConnectionLifetime() RandRangeConfig { + if c.MaxConnectionLifetime == nil || c.MaxConnectionLifetime.To == 0 { + return RandRangeConfig{ + From: 60000, + To: 90000, + } + } + return *c.MaxConnectionLifetime +} + func (c *Config) GetNormalizedScMaxEachPostBytes() RandRangeConfig { if c.ScMaxEachPostBytes == nil || c.ScMaxEachPostBytes.To == 0 { return RandRangeConfig{ @@ -79,10 +99,8 @@ func (c *Config) GetNormalizedScMaxEachPostBytes() RandRangeConfig { To: 1000000, } } - return *c.ScMaxEachPostBytes } - func (c *Config) GetNormalizedScMinPostsIntervalMs() RandRangeConfig { if c.ScMinPostsIntervalMs == nil || c.ScMinPostsIntervalMs.To == 0 { return RandRangeConfig{ diff --git a/transport/internet/splithttp/config.pb.go b/transport/internet/splithttp/config.pb.go index 26dbb0b7da53..1c1be9b72e18 100644 --- a/transport/internet/splithttp/config.pb.go +++ b/transport/internet/splithttp/config.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.1 -// protoc v5.27.0 +// protoc v5.27.2 // source: transport/internet/splithttp/config.proto package splithttp @@ -20,6 +20,55 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type Multiplexing_MultiplexingMode int32 + +const ( + Multiplexing_DISABLED Multiplexing_MultiplexingMode = 0 + Multiplexing_PREFER_EXTISTING Multiplexing_MultiplexingMode = 1 + Multiplexing_PREFER_NEW Multiplexing_MultiplexingMode = 2 +) + +// Enum value maps for Multiplexing_MultiplexingMode. +var ( + Multiplexing_MultiplexingMode_name = map[int32]string{ + 0: "DISABLED", + 1: "PREFER_EXTISTING", + 2: "PREFER_NEW", + } + Multiplexing_MultiplexingMode_value = map[string]int32{ + "DISABLED": 0, + "PREFER_EXTISTING": 1, + "PREFER_NEW": 2, + } +) + +func (x Multiplexing_MultiplexingMode) Enum() *Multiplexing_MultiplexingMode { + p := new(Multiplexing_MultiplexingMode) + *p = x + return p +} + +func (x Multiplexing_MultiplexingMode) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Multiplexing_MultiplexingMode) Descriptor() protoreflect.EnumDescriptor { + return file_transport_internet_splithttp_config_proto_enumTypes[0].Descriptor() +} + +func (Multiplexing_MultiplexingMode) Type() protoreflect.EnumType { + return &file_transport_internet_splithttp_config_proto_enumTypes[0] +} + +func (x Multiplexing_MultiplexingMode) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Multiplexing_MultiplexingMode.Descriptor instead. +func (Multiplexing_MultiplexingMode) EnumDescriptor() ([]byte, []int) { + return file_transport_internet_splithttp_config_proto_rawDescGZIP(), []int{2, 0} +} + type Config struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -33,6 +82,7 @@ type Config struct { ScMinPostsIntervalMs *RandRangeConfig `protobuf:"bytes,6,opt,name=scMinPostsIntervalMs,proto3" json:"scMinPostsIntervalMs,omitempty"` NoSSEHeader bool `protobuf:"varint,7,opt,name=noSSEHeader,proto3" json:"noSSEHeader,omitempty"` XPaddingBytes *RandRangeConfig `protobuf:"bytes,8,opt,name=xPaddingBytes,proto3" json:"xPaddingBytes,omitempty"` + Mux *Multiplexing `protobuf:"bytes,9,opt,name=mux,proto3" json:"mux,omitempty"` } func (x *Config) Reset() { @@ -123,6 +173,13 @@ func (x *Config) GetXPaddingBytes() *RandRangeConfig { return nil } +func (x *Config) GetMux() *Multiplexing { + if x != nil { + return x.Mux + } + return nil +} + type RandRangeConfig struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -178,6 +235,77 @@ func (x *RandRangeConfig) GetTo() int32 { return 0 } +type Multiplexing struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Mode Multiplexing_MultiplexingMode `protobuf:"varint,3,opt,name=mode,proto3,enum=xray.transport.internet.splithttp.Multiplexing_MultiplexingMode" json:"mode,omitempty"` + MaxConnectionConcurrency *RandRangeConfig `protobuf:"bytes,4,opt,name=maxConnectionConcurrency,proto3" json:"maxConnectionConcurrency,omitempty"` + MaxConnectionLifetime *RandRangeConfig `protobuf:"bytes,5,opt,name=maxConnectionLifetime,proto3" json:"maxConnectionLifetime,omitempty"` + MaxConnections int32 `protobuf:"varint,6,opt,name=maxConnections,proto3" json:"maxConnections,omitempty"` +} + +func (x *Multiplexing) Reset() { + *x = Multiplexing{} + if protoimpl.UnsafeEnabled { + mi := &file_transport_internet_splithttp_config_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Multiplexing) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Multiplexing) ProtoMessage() {} + +func (x *Multiplexing) ProtoReflect() protoreflect.Message { + mi := &file_transport_internet_splithttp_config_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Multiplexing.ProtoReflect.Descriptor instead. +func (*Multiplexing) Descriptor() ([]byte, []int) { + return file_transport_internet_splithttp_config_proto_rawDescGZIP(), []int{2} +} + +func (x *Multiplexing) GetMode() Multiplexing_MultiplexingMode { + if x != nil { + return x.Mode + } + return Multiplexing_DISABLED +} + +func (x *Multiplexing) GetMaxConnectionConcurrency() *RandRangeConfig { + if x != nil { + return x.MaxConnectionConcurrency + } + return nil +} + +func (x *Multiplexing) GetMaxConnectionLifetime() *RandRangeConfig { + if x != nil { + return x.MaxConnectionLifetime + } + return nil +} + +func (x *Multiplexing) GetMaxConnections() int32 { + if x != nil { + return x.MaxConnections + } + return 0 +} + var File_transport_internet_splithttp_config_proto protoreflect.FileDescriptor var file_transport_internet_splithttp_config_proto_rawDesc = []byte{ @@ -185,8 +313,8 @@ var file_transport_internet_splithttp_config_proto_rawDesc = []byte{ 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x21, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, - 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x22, 0xec, - 0x04, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, + 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x22, 0xad, + 0x05, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x4d, 0x0a, 0x06, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x03, 0x20, 0x03, 0x28, @@ -215,29 +343,60 @@ var file_transport_internet_splithttp_config_proto_rawDesc = []byte{ 0x73, 0x63, 0x4d, 0x69, 0x6e, 0x50, 0x6f, 0x73, 0x74, 0x73, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x4d, 0x73, 0x12, 0x20, 0x0a, 0x0b, 0x6e, 0x6f, 0x53, 0x53, 0x45, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x07, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x6e, 0x6f, 0x53, 0x53, 0x45, - 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x5a, 0x0a, 0x0e, 0x78, 0x50, 0x61, 0x64, 0x64, 0x69, - 0x6e, 0x67, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x58, 0x0a, 0x0d, 0x78, 0x50, 0x61, 0x64, 0x64, 0x69, + 0x6e, 0x67, 0x42, 0x79, 0x74, 0x65, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, + 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, + 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, + 0x70, 0x2e, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x52, 0x0d, 0x78, 0x50, 0x61, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x42, 0x79, 0x74, 0x65, 0x73, + 0x12, 0x41, 0x0a, 0x03, 0x6d, 0x75, 0x78, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, + 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, + 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, + 0x70, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x70, 0x6c, 0x65, 0x78, 0x69, 0x6e, 0x67, 0x52, 0x03, + 0x6d, 0x75, 0x78, 0x1a, 0x39, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, + 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x35, + 0x0a, 0x0f, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x12, 0x12, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, + 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x02, 0x74, 0x6f, 0x22, 0xae, 0x03, 0x0a, 0x0c, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x70, + 0x6c, 0x65, 0x78, 0x69, 0x6e, 0x67, 0x12, 0x54, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0e, 0x32, 0x40, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, + 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, + 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x70, 0x6c, + 0x65, 0x78, 0x69, 0x6e, 0x67, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x70, 0x6c, 0x65, 0x78, 0x69, + 0x6e, 0x67, 0x4d, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x12, 0x6e, 0x0a, 0x18, + 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, + 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x52, 0x0e, 0x78, 0x50, 0x61, 0x64, 0x64, 0x69, 0x6e, 0x67, 0x48, 0x65, 0x61, 0x64, - 0x65, 0x72, 0x1a, 0x39, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x35, 0x0a, - 0x0f, 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x12, 0x12, 0x0a, 0x04, 0x66, 0x72, 0x6f, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, - 0x66, 0x72, 0x6f, 0x6d, 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, - 0x52, 0x02, 0x74, 0x6f, 0x42, 0x85, 0x01, 0x0a, 0x25, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, - 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, 0x65, - 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x50, 0x01, - 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, - 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x74, 0x72, 0x61, 0x6e, - 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2f, 0x73, - 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0xaa, 0x02, 0x21, 0x58, 0x72, 0x61, 0x79, 0x2e, - 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, - 0x65, 0x74, 0x2e, 0x53, 0x70, 0x6c, 0x69, 0x74, 0x48, 0x74, 0x74, 0x70, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x69, 0x67, 0x52, 0x18, 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x12, 0x68, 0x0a, 0x15, + 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69, 0x66, + 0x65, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x78, 0x72, + 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, 0x74, + 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0x2e, + 0x52, 0x61, 0x6e, 0x64, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, + 0x15, 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69, + 0x66, 0x65, 0x74, 0x69, 0x6d, 0x65, 0x12, 0x26, 0x0a, 0x0e, 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, + 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0e, + 0x6d, 0x61, 0x78, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x46, + 0x0a, 0x10, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x70, 0x6c, 0x65, 0x78, 0x69, 0x6e, 0x67, 0x4d, 0x6f, + 0x64, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x44, 0x49, 0x53, 0x41, 0x42, 0x4c, 0x45, 0x44, 0x10, 0x00, + 0x12, 0x14, 0x0a, 0x10, 0x50, 0x52, 0x45, 0x46, 0x45, 0x52, 0x5f, 0x45, 0x58, 0x54, 0x49, 0x53, + 0x54, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x50, 0x52, 0x45, 0x46, 0x45, 0x52, + 0x5f, 0x4e, 0x45, 0x57, 0x10, 0x02, 0x42, 0x85, 0x01, 0x0a, 0x25, 0x63, 0x6f, 0x6d, 0x2e, 0x78, + 0x72, 0x61, 0x79, 0x2e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x69, 0x6e, + 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, + 0x50, 0x01, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, + 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x74, 0x72, + 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x65, 0x74, + 0x2f, 0x73, 0x70, 0x6c, 0x69, 0x74, 0x68, 0x74, 0x74, 0x70, 0xaa, 0x02, 0x21, 0x58, 0x72, 0x61, + 0x79, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x49, 0x6e, 0x74, 0x65, + 0x72, 0x6e, 0x65, 0x74, 0x2e, 0x53, 0x70, 0x6c, 0x69, 0x74, 0x48, 0x74, 0x74, 0x70, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -252,23 +411,30 @@ func file_transport_internet_splithttp_config_proto_rawDescGZIP() []byte { return file_transport_internet_splithttp_config_proto_rawDescData } -var file_transport_internet_splithttp_config_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_transport_internet_splithttp_config_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_transport_internet_splithttp_config_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_transport_internet_splithttp_config_proto_goTypes = []interface{}{ - (*Config)(nil), // 0: xray.transport.internet.splithttp.Config - (*RandRangeConfig)(nil), // 1: xray.transport.internet.splithttp.RandRangeConfig - nil, // 2: xray.transport.internet.splithttp.Config.HeaderEntry + (Multiplexing_MultiplexingMode)(0), // 0: xray.transport.internet.splithttp.Multiplexing.MultiplexingMode + (*Config)(nil), // 1: xray.transport.internet.splithttp.Config + (*RandRangeConfig)(nil), // 2: xray.transport.internet.splithttp.RandRangeConfig + (*Multiplexing)(nil), // 3: xray.transport.internet.splithttp.Multiplexing + nil, // 4: xray.transport.internet.splithttp.Config.HeaderEntry } var file_transport_internet_splithttp_config_proto_depIdxs = []int32{ - 2, // 0: xray.transport.internet.splithttp.Config.header:type_name -> xray.transport.internet.splithttp.Config.HeaderEntry - 1, // 1: xray.transport.internet.splithttp.Config.scMaxConcurrentPosts:type_name -> xray.transport.internet.splithttp.RandRangeConfig - 1, // 2: xray.transport.internet.splithttp.Config.scMaxEachPostBytes:type_name -> xray.transport.internet.splithttp.RandRangeConfig - 1, // 3: xray.transport.internet.splithttp.Config.scMinPostsIntervalMs:type_name -> xray.transport.internet.splithttp.RandRangeConfig - 1, // 4: xray.transport.internet.splithttp.Config.xPaddingBytes:type_name -> xray.transport.internet.splithttp.RandRangeConfig - 5, // [5:5] is the sub-list for method output_type - 5, // [5:5] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 4, // 0: xray.transport.internet.splithttp.Config.header:type_name -> xray.transport.internet.splithttp.Config.HeaderEntry + 2, // 1: xray.transport.internet.splithttp.Config.scMaxConcurrentPosts:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 2, // 2: xray.transport.internet.splithttp.Config.scMaxEachPostBytes:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 2, // 3: xray.transport.internet.splithttp.Config.scMinPostsIntervalMs:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 2, // 4: xray.transport.internet.splithttp.Config.xPaddingBytes:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 3, // 5: xray.transport.internet.splithttp.Config.mux:type_name -> xray.transport.internet.splithttp.Multiplexing + 0, // 6: xray.transport.internet.splithttp.Multiplexing.mode:type_name -> xray.transport.internet.splithttp.Multiplexing.MultiplexingMode + 2, // 7: xray.transport.internet.splithttp.Multiplexing.maxConnectionConcurrency:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 2, // 8: xray.transport.internet.splithttp.Multiplexing.maxConnectionLifetime:type_name -> xray.transport.internet.splithttp.RandRangeConfig + 9, // [9:9] is the sub-list for method output_type + 9, // [9:9] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_transport_internet_splithttp_config_proto_init() } @@ -301,19 +467,32 @@ func file_transport_internet_splithttp_config_proto_init() { return nil } } + file_transport_internet_splithttp_config_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Multiplexing); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_transport_internet_splithttp_config_proto_rawDesc, - NumEnums: 0, - NumMessages: 3, + NumEnums: 1, + NumMessages: 4, NumExtensions: 0, NumServices: 0, }, GoTypes: file_transport_internet_splithttp_config_proto_goTypes, DependencyIndexes: file_transport_internet_splithttp_config_proto_depIdxs, + EnumInfos: file_transport_internet_splithttp_config_proto_enumTypes, MessageInfos: file_transport_internet_splithttp_config_proto_msgTypes, }.Build() File_transport_internet_splithttp_config_proto = out.File diff --git a/transport/internet/splithttp/config.proto b/transport/internet/splithttp/config.proto index 3f24cfd3ba54..02ca552512b5 100644 --- a/transport/internet/splithttp/config.proto +++ b/transport/internet/splithttp/config.proto @@ -15,9 +15,22 @@ message Config { RandRangeConfig scMinPostsIntervalMs = 6; bool noSSEHeader = 7; RandRangeConfig xPaddingBytes = 8; + Multiplexing mux = 9; } message RandRangeConfig { int32 from = 1; int32 to = 2; } + +message Multiplexing { + enum MultiplexingMode { + DISABLED = 0; + PREFER_EXTISTING = 1; + PREFER_NEW = 2; + } + MultiplexingMode mode = 3; + RandRangeConfig maxConnectionConcurrency = 4; + RandRangeConfig maxConnectionLifetime = 5; + int32 maxConnections = 6; +} diff --git a/transport/internet/splithttp/dialer.go b/transport/internet/splithttp/dialer.go index 45bdc6459c7b..296d22595f62 100644 --- a/transport/internet/splithttp/dialer.go +++ b/transport/internet/splithttp/dialer.go @@ -43,8 +43,8 @@ type dialerConf struct { } var ( - globalDialerMap map[dialerConf]DialerClient - globalDialerAccess sync.Mutex + globalDialerAccess sync.Mutex + globalMuxManagerMap map[dialerConf]muxManager ) func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) DialerClient { @@ -52,23 +52,29 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in return &BrowserDialerClient{} } - tlsConfig := tls.ConfigFromStreamSettings(streamSettings) - isH2 := tlsConfig != nil && !(len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "http/1.1") - isH3 := tlsConfig != nil && (len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "h3") - globalDialerAccess.Lock() defer globalDialerAccess.Unlock() - if globalDialerMap == nil { - globalDialerMap = make(map[dialerConf]DialerClient) + if globalMuxManagerMap == nil { + globalMuxManagerMap = make(map[dialerConf]muxManager) } + if muxMan, found := globalMuxManagerMap[dialerConf{dest, streamSettings}]; found { + return muxMan.getClient(ctx, dest, streamSettings) + } + muxMan := muxManager{} + globalMuxManagerMap[dialerConf{dest, streamSettings}] = muxMan + return muxMan.getClient(ctx, dest, streamSettings) +} + +func createHTTPClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) *DefaultDialerClient { + + tlsConfig := tls.ConfigFromStreamSettings(streamSettings) + isH2 := tlsConfig != nil && !(len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "http/1.1") + isH3 := tlsConfig != nil && (len(tlsConfig.NextProtocol) == 1 && tlsConfig.NextProtocol[0] == "h3") if isH3 { dest.Network = net.Network_UDP } - if client, found := globalDialerMap[dialerConf{dest, streamSettings}]; found { - return client - } var gotlsConfig *gotls.Config @@ -173,7 +179,7 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in uploadTransport = nil } - client := &DefaultDialerClient{ + client := DefaultDialerClient{ transportConfig: streamSettings.ProtocolSettings.(*Config), download: &http.Client{ Transport: downloadTransport, @@ -187,8 +193,7 @@ func getHTTPClient(ctx context.Context, dest net.Destination, streamSettings *in dialUploadConn: dialContext, } - globalDialerMap[dialerConf{dest, streamSettings}] = client - return client + return &client } func init() { @@ -202,7 +207,6 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me transportConfiguration := streamSettings.ProtocolSettings.(*Config) tlsConfig := tls.ConfigFromStreamSettings(streamSettings) - scMaxConcurrentPosts := transportConfiguration.GetNormalizedScMaxConcurrentPosts() scMaxEachPostBytes := transportConfiguration.GetNormalizedScMaxEachPostBytes() scMinPostsIntervalMs := transportConfiguration.GetNormalizedScMinPostsIntervalMs() @@ -235,6 +239,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me // calls get automatically batched together into larger POST requests. // without batching, bandwidth is extremely limited. for { + chunk, err := uploadPipeReader.ReadMultiBuffer() if err != nil { break diff --git a/transport/internet/splithttp/mux.go b/transport/internet/splithttp/mux.go new file mode 100644 index 000000000000..3fa1c0596920 --- /dev/null +++ b/transport/internet/splithttp/mux.go @@ -0,0 +1,115 @@ +package splithttp + +import ( + "context" + "sync" + "time" + + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/transport/internet" +) + +type muxDialerClient struct { + *DefaultDialerClient + leftUsage int32 + expirationTime time.Time +} + +type muxManager struct { + sync.Mutex + config *Multiplexing + dialerClients []muxDialerClient +} + +func newMuxManager(config *Multiplexing) *muxManager { + return &muxManager{ + config: config, + dialerClients: make([]muxDialerClient, 0), + } +} + +func (m *muxManager) getClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) *muxDialerClient { + m.Lock() + defer m.Unlock() + if len(m.dialerClients) > 0 { + m.removeExpiredConnections() + } + switch m.config.GetMode() { + case Multiplexing_PREFER_EXTISTING: + return m.dialPreferExisting(ctx, dest, streamSettings) + case Multiplexing_PREFER_NEW: + return m.dialPreferNew(ctx, dest, streamSettings) + default: + return &muxDialerClient{ + DefaultDialerClient: createHTTPClient(ctx, dest, streamSettings), + } + } +} + +func (m *muxManager) dialPreferExisting(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) *muxDialerClient { + for { + for _, client := range m.dialerClients { + if m.canReuseClient(client) { + client.leftUsage-- + return &client + } + } + if int32(len(m.dialerClients)) >= m.config.GetMaxConnections() || m.config.GetMaxConnections() == 0 { + if streamSettings.ProtocolSettings.(*Config).GetNormalizedScMinPostsIntervalMs().From > 0 { + time.Sleep(time.Duration(streamSettings.ProtocolSettings.(*Config).GetNormalizedScMinPostsIntervalMs().roll()) * time.Millisecond) + } + continue + } + break + } + return m.newClient(ctx, dest, streamSettings) +} + +func (m *muxManager) dialPreferNew(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) *muxDialerClient { + for { + if int32(len(m.dialerClients)) < m.config.MaxConnections || m.config.MaxConnections == 0 { + return m.newClient(ctx, dest, streamSettings) + } + + for _, client := range m.dialerClients { + if m.canReuseClient(client) { + client.leftUsage-- + return &client + } + } + if streamSettings.ProtocolSettings.(*Config).GetNormalizedScMinPostsIntervalMs().From > 0 { + time.Sleep(time.Duration(streamSettings.ProtocolSettings.(*Config).GetNormalizedScMinPostsIntervalMs().roll()) * time.Millisecond) + } + continue + } +} + +func (m *muxManager) newClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) *muxDialerClient { + m.Lock() + defer m.Unlock() + + Client := muxDialerClient{ + DefaultDialerClient: createHTTPClient(ctx, dest, streamSettings), + leftUsage: m.config.GetNormalizedMaxConnectionConcurrency().roll(), + expirationTime: time.Now().Add(time.Duration(m.config.GetNormalizedConnectionLifetime().roll()) * time.Second), + } + m.dialerClients = append(m.dialerClients, Client) + return &Client +} + +func (m *muxManager) removeExpiredConnections() { + m.Lock() + defer m.Unlock() + + for i := 0; i < len(m.dialerClients); i++ { + client := m.dialerClients[i] + if time.Now().After(client.expirationTime) || client.leftUsage <= 0 { + m.dialerClients = append(m.dialerClients[:i], m.dialerClients[i+1:]...) + i-- + } + } +} + +func (m *muxManager) canReuseClient(c muxDialerClient) bool { + return c.leftUsage > 0 && time.Now().Before(c.expirationTime) +}