diff --git a/local/main.go b/local/main.go index 2c1b0e5..75277e1 100644 --- a/local/main.go +++ b/local/main.go @@ -5,9 +5,8 @@ import ( "fmt" "logger" "os" - "socks" "proxy" - + "socks" ) var ( @@ -15,6 +14,8 @@ var ( BuildTime = "2000-01-01T00:00:00+0800" ) +var g = logger.GetLogger() + func main() { laddr := flag.String("laddr", "", "listen addr") raddr := flag.String("raddr", "", "remote http url(e.g, https://example.com)") @@ -30,5 +31,9 @@ func main() { } logger.InitLogger(*debug) proxy.Init() - socks.NewSocks5(*laddr, *raddr, *secret) + s, err := socks.NewSocks5(*laddr, *raddr, *secret) + if err != nil { + g.Fatal(err) + } + s.Wait() } diff --git a/src/proxy/http.go b/src/proxy/http.go index 4fcae5d..53b888e 100644 --- a/src/proxy/http.go +++ b/src/proxy/http.go @@ -53,7 +53,6 @@ func NewHttpProxy(addr, secret string, https bool) *httpProxy { secret: secret, proxyMap: make(map[string]*proxyConn), https: https, - } } diff --git a/src/proxy/local.go b/src/proxy/local.go index da9e8d8..c17c0fd 100644 --- a/src/proxy/local.go +++ b/src/proxy/local.go @@ -5,13 +5,14 @@ import ( "crypto/x509" "errors" "fmt" + "io" "io/ioutil" "net/http" + "os" "strings" "sync" "time" - "io" - "os" + "gopkg.in/bufio.v1" ) @@ -21,9 +22,9 @@ const ( var tr = &http.Transport{ - DisableKeepAlives: false, - + DisableKeepAlives: false, MaxIdleConnsPerHost: PerHostNum, + Proxy: http.ProxyFromEnvironment, } func Init() { @@ -47,8 +48,8 @@ type localProxyConn struct { secret string read_buffer []byte read_mutex sync.Mutex - Source io.ReadCloser - Close chan bool + source io.ReadCloser + close chan bool } func (c *localProxyConn) gen_sign(req *http.Request) { @@ -65,6 +66,7 @@ func (c *localProxyConn) push(data []byte, typ string) error { req, _ := http.NewRequest("POST", c.server+PUSH, buf) c.gen_sign(req) req.Header.Set("TYP", typ) + req.ContentLength = int64(len(data)) req.Header.Set("Content-Type", "image/jpeg") res, err := hc.Do(req) if err != nil { @@ -109,10 +111,17 @@ func (c *localProxyConn) pull() error { if err != nil { return err } - c.Source = res.Body + c.source = res.Body return nil } +func (c *localProxyConn) Read(b []byte) (int, error) { + if c.source == nil { + return 0, errors.New("pull http connection is not ready") + } + return c.source.Read(b) +} + func (c *localProxyConn) Write(b []byte) (int, error) { err := c.push(b, DATA_TYP) @@ -124,10 +133,14 @@ func (c *localProxyConn) Write(b []byte) (int, error) { return len(b), nil } +func (c *localProxyConn) CloseRead() error { + return c.source.Close() +} + func (c *localProxyConn) alive() { for { select { - case <-c.Close: + case <-c.close: return case <-time.After(time.Duration(time.Second * timeout)): if err := c.push([]byte("alive"), HEART_TYP); err != nil { @@ -137,10 +150,15 @@ func (c *localProxyConn) alive() { } } -func (c *localProxyConn) Quit() { +func (c *localProxyConn) quit() { c.push([]byte("quit"), QUIT_TYP) } +func (c *localProxyConn) Close() { + close(c.close) + c.quit() +} + func Connect(server, remote, secret string) (*localProxyConn, error) { if strings.HasSuffix(server, "/") { server = server[:len(server)-1] @@ -157,7 +175,7 @@ func Connect(server, remote, secret string) (*localProxyConn, error) { if err != nil { return nil, err } - conn.Close = make(chan bool) + conn.close = make(chan bool) go conn.alive() return &conn, nil } diff --git a/src/socks/socks5.go b/src/socks/socks5.go index 0e9557a..e3d1dae 100644 --- a/src/socks/socks5.go +++ b/src/socks/socks5.go @@ -10,7 +10,13 @@ import ( var g = logger.GetLogger() -func handleConn(conn net.Conn, raddr, secret string) { +type socks5 struct { + raddr string + secret string + wait chan bool +} + +func (s *socks5) handleConn(conn net.Conn) { defer conn.Close() buf := make([]byte, 1024) @@ -46,7 +52,7 @@ func handleConn(conn net.Conn, raddr, secret string) { } g.Debugf("will connect %s ... ", addr) - lp, err := proxy.Connect(raddr, addr, secret) + lp, err := proxy.Connect(s.raddr, addr, s.secret) if err != nil { g.Errorf("proxy connect err:%s", err) return @@ -54,31 +60,42 @@ func handleConn(conn net.Conn, raddr, secret string) { conn.Write([]byte{0x05, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}) //响应客户端连接成功 //进行转发 go func() { - _, err := io.Copy(conn, lp.Source) + _, err := io.Copy(conn, lp) if err != nil { g.Debugf("read err:", err) } - lp.Source.Close() + lp.CloseRead() }() io.Copy(lp, conn) - close(lp.Close) - lp.Quit() + lp.Close() g.Debugf("close connection with %s", conn.RemoteAddr().String()) } -func NewSocks5(addr, raddr, secret string) { - g.Infof("listen at:[%s]", addr) +func (s *socks5) Wait() { + <-s.wait +} + +func NewSocks5(addr, raddr, secret string) (s *socks5, err error) { l, err := net.Listen("tcp", addr) if err != nil { - g.Panic(err) + return nil, err } - for { - conn, err := l.Accept() - if err != nil { - g.Errorf("accept err:%s", err) - } - go handleConn(conn, raddr, secret) - + g.Infof("socks5 proxy listen at:[%s]", addr) + s = &socks5{ + raddr: raddr, + secret: secret, + wait: make(chan bool), } + go func() { + for { + conn, err := l.Accept() + if err != nil { + g.Errorf("accept err:%s", err) + } + go s.handleConn(conn) + + } + }() + return s, nil }