Skip to content

Commit

Permalink
feat: experimental connection-pools and hosts-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
momiji committed Jan 30, 2024
1 parent a4c9100 commit 4bbd1ca
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 112 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- **internally developed**, allowing to add features when necessary, like **proxy failover**, **regex rules**, **password caching**, **PAC support**, ...
- **multi-platform binaries**, for Windows, Linux and MacOS
- support automatic **update** and **restart** when configured
- experimental features like `connection-pools` (reuse http connections when possible) and `hosts-cache` (cache proxy lookup result by host:port, incompatible with url matching)

Alternatives tools that can be used:

Expand Down
101 changes: 72 additions & 29 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@ package kpx
import (
"encoding/json"
"fmt"
"github.com/google/uuid"
"github.com/howeyc/gopass"
"github.com/palantir/stacktrace"
"golang.org/x/text/encoding/charmap"
yaml2 "gopkg.in/yaml.v2"
"io"
"net/http"
"os"
Expand All @@ -16,6 +11,12 @@ import (
"strings"
"sync"
"time"

"github.com/google/uuid"
"github.com/howeyc/gopass"
"github.com/palantir/stacktrace"
"golang.org/x/text/encoding/charmap"
yaml2 "gopkg.in/yaml.v2"
)

type Config struct {
Expand All @@ -25,8 +26,18 @@ type Config struct {
lastMMutex sync.RWMutex
certsManager *CertsManager
disableAutoUpdate bool
hostsCache map[string]*HostCache
hostsCacheMutex sync.RWMutex
}

type HostCache struct {
rule *ConfRule
proxy []*ConfProxy
}

const EXPERIMENTAL_CONNETION_POOLS = "connection-pools"
const EXPERIMENTAL_HOSTS_CACHE = "hosts-cache"

func NewConfig(name string) (*Config, error) {
var config = Config{
conf: Conf{
Expand All @@ -35,7 +46,7 @@ func NewConfig(name string) (*Config, error) {
CloseTimeout: DEFAULT_CLOSE_TIMEOUT,
},
lastProxies: map[string]time.Time{},
lastMMutex: sync.RWMutex{},
hostsCache: map[string]*HostCache{},
}
var err error
if name == "" {
Expand All @@ -46,6 +57,8 @@ func NewConfig(name string) (*Config, error) {
config.conf.Trace = config.conf.Trace || options.Trace
config.conf.Debug = config.conf.Debug || options.Debug || config.conf.Trace
config.conf.Verbose = config.conf.Verbose || options.Verbose || config.conf.Debug
config.conf.experimentalConnectionPools = isExperimental(config.conf.Experimental, EXPERIMENTAL_CONNETION_POOLS)
config.conf.experimentalHostsCache = isExperimental(config.conf.Experimental, EXPERIMENTAL_HOSTS_CACHE)
if err != nil {
return nil, stacktrace.Propagate(err, "unable to read config")
}
Expand All @@ -68,6 +81,10 @@ func NewConfig(name string) (*Config, error) {
return &config, nil
}

func isExperimental(conf string, name string) bool {
return strings.Contains(" "+strings.ReplaceAll(conf, ",", " ")+" ", " "+name+" ")
}

func (c *Config) readFromConfig() error {
c.conf.Bind = options.bindHost
c.conf.Port = options.bindPort
Expand Down Expand Up @@ -555,13 +572,16 @@ func (c *Config) askCredentials() error {
}

func (c *Config) match(url string, hostPort string) (*ConfRule, []*ConfProxy) {
if hc, ok := c.getCachedHost(hostPort); ok {
return hc.rule, hc.proxy
}
hostOnly := strings.Split(hostPort, ":")[0]
var direct *ConfRule
for _, rule := range c.conf.Rules {
match := false
if rule.regex.pattern == nil {
match = true
} else if strings.Contains(rule.regex.regex, "/") {
} else if !c.conf.experimentalHostsCache && strings.Contains(rule.regex.regex, "/") {
match = rule.regex.pattern.MatchString(url) != rule.regex.exclude
} else if strings.Contains(rule.regex.regex, ":") {
match = rule.regex.pattern.MatchString(hostPort) != rule.regex.exclude
Expand All @@ -571,6 +591,7 @@ func (c *Config) match(url string, hostPort string) (*ConfRule, []*ConfProxy) {
if match {
proxy := c.resolve(url, hostOnly, rule)
if proxy != nil && *proxy[0] != ConfProxyContinue {
c.addCachedHost(hostPort, rule, proxy)
return rule, proxy
}
direct = rule
Expand All @@ -579,11 +600,30 @@ func (c *Config) match(url string, hostPort string) (*ConfRule, []*ConfProxy) {
// if last successful rule is a pac rule which returned DIRECT, then return a "direct" proxy
// otherwise, return nil
if direct != nil {
return direct, []*ConfProxy{c.conf.Proxies[ProxyDirect.Name()]}
rule := direct
proxy := []*ConfProxy{c.conf.Proxies[ProxyDirect.Name()]}
c.addCachedHost(hostPort, rule, proxy)
return rule, proxy
}
c.addCachedHost(hostPort, nil, nil)
return nil, nil
}

func (c *Config) addCachedHost(hostPort string, rule *ConfRule, proxy []*ConfProxy) {
c.hostsCacheMutex.Lock()
defer c.hostsCacheMutex.Unlock()
c.hostsCache[hostPort] = &HostCache{rule, proxy}
}

func (c *Config) getCachedHost(hostPort string) (*HostCache, bool) {
c.hostsCacheMutex.RLock()
defer c.hostsCacheMutex.RUnlock()
if hc, ok := c.hostsCache[hostPort]; ok {
return hc, true
}
return nil, false
}

func (c *Config) resolve(url, host string, rule *ConfRule) []*ConfProxy {
proxy := c.conf.Proxies[rule.firstProxy()]
if proxy == nil {
Expand Down Expand Up @@ -789,23 +829,26 @@ func (pt ProxyType) Value() int {
}

type Conf struct {
Bind string
Port int
Verbose bool
Debug bool
Trace bool
Proxies map[string]*ConfProxy
Credentials map[string]*ConfCred
Domains map[string]*string
Rules []*ConfRule
pacProxy string
Krb5 string
ConnectTimeout int `yaml:"connectTimeout"`
IdleTimeout int `yaml:"idleTimeout"`
CloseTimeout int `yaml:"closeTimeout"`
Check *bool
Update bool
Restart bool
Bind string
Port int
Verbose bool
Debug bool
Trace bool
Proxies map[string]*ConfProxy
Credentials map[string]*ConfCred
Domains map[string]*string
Rules []*ConfRule
pacProxy string
Krb5 string
ConnectTimeout int `yaml:"connectTimeout"`
IdleTimeout int `yaml:"idleTimeout"`
CloseTimeout int `yaml:"closeTimeout"`
Check *bool
Update bool
Restart bool
Experimental string // space/comma separated list of features
experimentalConnectionPools bool // add a connection pool for http
experimentalHostsCache bool // add a hosts cache for proxy lookup - fine grained url lookup is then disabled
}

type ConfCred struct {
Expand Down Expand Up @@ -834,10 +877,10 @@ type ConfProxy struct {
pacRegex *ConfRegex
Url *string
pacJs *string
proxy string
pacProxy *string
isUsed bool
pacRuntime *PacExecutor
// proxy string
pacProxy *string
isUsed bool
pacRuntime *PacExecutor
}

type ConfRule struct {
Expand Down
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/jcmturner/gokrb5/v8/client"
"io"
"net/http"
"os"
Expand All @@ -16,6 +15,8 @@ import (
"syscall"
"text/template"
"time"

"github.com/jcmturner/gokrb5/v8/client"
)

var VersionValue = ""
Expand Down Expand Up @@ -395,7 +396,7 @@ func start() {

func update(proxy *Proxy) {
// check for updates ?
config := proxy.safeGetConfig()
config := proxy.getConfig()
conf := config.conf
if conf.Check != nil && *conf.Check == false {
return
Expand Down
8 changes: 5 additions & 3 deletions pac.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@ package kpx

import (
"fmt"
"github.com/dop251/goja"
"github.com/palantir/stacktrace"
"math/big"
"net"
"regexp"
"strings"
"sync"
"time"

"github.com/dop251/goja"
"github.com/palantir/stacktrace"
)

type PacExecutor struct {
js string
program *goja.Program
pool sync.Pool
pool *sync.Pool
}

func NewPac(pacJs string) (*PacExecutor, error) {
Expand All @@ -33,6 +34,7 @@ return FindProxyForURL(url,host);
return &PacExecutor{
js: js,
program: program,
pool: &sync.Pool{},
}, nil
}

Expand Down
9 changes: 5 additions & 4 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"encoding/base64"
"errors"
"fmt"
"github.com/palantir/stacktrace"
netproxy "golang.org/x/net/proxy"
"io"
"net"
"sort"
"strings"
"sync"
"time"

"github.com/palantir/stacktrace"
netproxy "golang.org/x/net/proxy"
)

type Process struct {
Expand All @@ -37,7 +38,7 @@ func NewProcess(proxy *Proxy, conn net.Conn) *Process {
logTrace(ti, "create process")
}
return &Process{
config: proxy.safeGetConfig(),
config: proxy.getConfig(),
proxy: proxy,
conn: NewTimedConn(conn, newTraceInfo(reqId, "client")),
reqId: reqId,
Expand All @@ -55,7 +56,7 @@ func (p *Process) process() {
}
// loop
var proxyChannel *ProxyRequest
for !p.proxy.forceStop.IsSet() {
for !p.proxy.stopped() {
// we don't reuse the proxyChannel as the target can change,
// however we use a pool to reuse connection once target is identified
proxyChannel = p.processChannel(clientChannel, nil)
Expand Down
Loading

0 comments on commit 4bbd1ca

Please sign in to comment.