Skip to content

Commit

Permalink
Chunk (#19)
Browse files Browse the repository at this point in the history
* chunk

* WIP

* update chunk
  • Loading branch information
ls0f authored Nov 8, 2019
1 parent 4ad36a9 commit 6a59682
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 2 deletions.
64 changes: 64 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (
PING = "/ping"
PULL = "/pull"
PUSH = "/push"
DOWNLOAD = "/download"
CHUNK_PULL = "chunk_pull"
CHUNK_PUSH = "chunk_push"
)
const (
DATA_TYP = "data"
Expand All @@ -50,6 +53,17 @@ const (
version = "20170803"
)

type DevZero struct {
}

func (z DevZero) Read(b []byte) (n int, err error) {
for i := range b {
b[i] = 0
}

return len(b), nil
}

var bufPool = &sync.Pool{New: func() interface{} { return make([]byte, 1024*8) }}

type httpProxy struct {
Expand All @@ -73,6 +87,8 @@ func (hp *httpProxy) handler() {
http.HandleFunc(PULL, hp.pull)
http.HandleFunc(PUSH, hp.push)
http.HandleFunc(PING, hp.ping)
http.HandleFunc(CHUNK_PULL, hp.chunkPull)
http.HandleFunc(CHUNK_PUSH, hp.chunkPush)
}

func (hp *httpProxy) ListenHTTPS(cert, key string) {
Expand All @@ -87,6 +103,11 @@ func (hp *httpProxy) Listen() {
g.Fatal("ListenAndServe: ", http.ListenAndServe(hp.addr, nil))
}

func (hp *httpProxy) download(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Length", fmt.Sprintf("%d", 100 << 20))
io.CopyN(w, DevZero{}, 100 << 20)
}

func (hp *httpProxy) verify(r *http.Request) error {
ts := r.Header.Get("timestamp")
sign := r.Header.Get("sign")
Expand Down Expand Up @@ -245,3 +266,46 @@ func (hp *httpProxy) connect(w http.ResponseWriter, r *http.Request) {
}()
WriteHTTPOK(w, proxyID)
}


// not used by now

func (hp *httpProxy) chunkPush(w http.ResponseWriter, r *http.Request) {
if err := hp.before(w, r); err != nil {
return
}
// 消息不超过8k
chunk := bufPool.Get().([]byte)
defer bufPool.Put(chunk)
for {
n, err := r.Body.Read(chunk)
if n > 0 {
// unpack chunk
}
if err != nil {
g.V(LERROR).Info(err)
break
}
}
}

// not used by now

func (hp *httpProxy) chunkPull(w http.ResponseWriter, r *http.Request) {
if err := hp.before(w, r); err != nil {
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Transfer-Encoding", "chunked")
flusher, _ := w.(http.Flusher)
flusher.Flush()
buf := make([]byte, 10)
for {
_, err := w.Write(buf)
if err != nil {
g.V(LERROR).Info(err)
break
}
flusher.Flush()
}
}
45 changes: 43 additions & 2 deletions local.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type localProxyConn struct {
source io.ReadCloser
close chan bool
interval time.Duration
dst io.WriteCloser
}

func (c *localProxyConn) gen_sign(req *http.Request) {
Expand All @@ -60,11 +61,45 @@ func (c *localProxyConn) gen_sign(req *http.Request) {
req.Header.Set("sign", GenHMACSHA1(c.secret, ts))
}

func (c *localProxyConn) chunkPush(data []byte, typ string) error {
if c.dst != nil {
_, err := c.dst.Write(data)
return err
}
wr, ww := io.Pipe()
req, _ := http.NewRequest("POST", c.server+PUSH, wr)
req.Header.Set("TYP", typ)
req.Header.Set("Transfer-Encoding", "chunked")
c.gen_sign(req)
req.Header.Set("Content-Type", "image/jpeg")
go func() (err error) {
defer wr.Close()
defer ww.Close()
res, err := hc.Do(req)
if err != nil {
return
}
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
switch res.StatusCode {
case HeadOK:
return nil
default:
return errors.New(fmt.Sprintf("status code is %d, body is: %s", res.StatusCode, string(body)))
}
return nil
}()

c.dst = ww
_, err := c.dst.Write(data)
return err
}

func (c *localProxyConn) push(data []byte, typ string) error {
buf := bufio.NewBuffer(data)
req, _ := http.NewRequest("POST", c.server+PUSH, buf)
c.gen_sign(req)
req.Header.Set("TYP", typ)
c.gen_sign(req)
req.ContentLength = int64(len(data))
req.Header.Set("Content-Type", "image/jpeg")
res, err := hc.Do(req)
Expand Down Expand Up @@ -150,7 +185,13 @@ func (c *localProxyConn) Read(b []byte) (n int, err error) {

func (c *localProxyConn) Write(b []byte) (int, error) {

err := c.push(b, DATA_TYP)
var err error
if c.interval > 0 {
err = c.push(b, DATA_TYP)
} else {
//err = c.push(b, DATA_TYP)
err = c.chunkPush(b, DATA_TYP)
}
if err != nil {
g.V(LDEBUG).Infof("push: %v", err)
return 0, err
Expand Down

0 comments on commit 6a59682

Please sign in to comment.