Skip to content

Commit

Permalink
fix: fix read split buffer function gr block bug
Browse files Browse the repository at this point in the history
  • Loading branch information
LaurenceLiZhixin committed Aug 12, 2021
1 parent afec806 commit 131170d
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 7 deletions.
10 changes: 8 additions & 2 deletions example/dubbo/go-client/cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package main
import (
"context"
"fmt"
"net/http"
"runtime"
"sync"
"time"
)
Expand All @@ -36,6 +38,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/registry/protocol"
_ "dubbo.apache.org/dubbo-go/v3/registry/zookeeper"
"go.uber.org/atomic"
_ "net/http/pprof"
)

import (
Expand All @@ -47,10 +50,13 @@ var greeterProvider = new(pkg.GreeterClientImpl)

func init() {
config.SetConsumerService(greeterProvider)
runtime.SetMutexProfileFraction(1)
}

// need to setup environment variable "CONF_CONSUMER_FILE_PATH" to "conf/client.yml" before run
func main() {
go func() {
_ = http.ListenAndServe("0.0.0.0:6061", nil)
}()
config.Load()
time.Sleep(time.Second * 3)
testSayHello()
Expand Down Expand Up @@ -94,7 +100,7 @@ func testSayHelloWithHighParallel() {
wg := sync.WaitGroup{}
goodCounter := atomic.Uint32{}
badCounter := atomic.Uint32{}
for i := 0; i < 1000; i ++{
for i := 0; i < 2000; i ++{
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
10 changes: 10 additions & 0 deletions example/dubbo/go-server/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package main

import (
"fmt"
"net/http"
"os"
"os/signal"
"runtime"
"syscall"
"time"
)
Expand All @@ -36,6 +38,8 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/registry/nacos"
_ "dubbo.apache.org/dubbo-go/v3/registry/protocol"
_ "dubbo.apache.org/dubbo-go/v3/registry/zookeeper"

_ "net/http/pprof"
)

import (
Expand All @@ -46,10 +50,16 @@ import (
var (
survivalTimeout = int(3 * time.Second)
)
func init(){
runtime.SetMutexProfileFraction(1)
}

// need to setup environment variable "CONF_PROVIDER_FILE_PATH" to "conf/server.yml" before run
func main() {
config.SetProviderService(pkg.NewGreeterProvider())
go func() {
_ = http.ListenAndServe("0.0.0.0:6060", nil)
}()
config.Load()
initSignal()
}
Expand Down
1 change: 0 additions & 1 deletion example/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.13

require (
dubbo.apache.org/dubbo-go/v3 v3.0.0-rc2
github.com/dubbogo/gost v1.11.16
github.com/dubbogo/triple v1.0.1
github.com/golang/protobuf v1.5.2
github.com/stretchr/testify v1.7.0
Expand Down
3 changes: 2 additions & 1 deletion pkg/http2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http2

import (
"bytes"
"context"
"crypto/tls"
"net"
"net/http"
Expand Down Expand Up @@ -84,7 +85,7 @@ func (h *Client) StreamPost(addr, path string, sendChan chan *bytes.Buffer, opts
close(closeChan)
return
}
ch := readSplitData(rsp.Body)
ch := readSplitData(context.Background(), rsp.Body)
Loop:
for {
select {
Expand Down
22 changes: 19 additions & 3 deletions pkg/http2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http2

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -179,7 +180,7 @@ func skipHeader(frameData []byte) ([]byte, uint32) {
return frameData[5:], length
}

func readSplitData(rBody io.ReadCloser) chan *bytes.Buffer {
func readSplitData(ctx context.Context, rBody io.ReadCloser) chan *bytes.Buffer {
cbm := make(chan *bytes.Buffer)
go func() {
buf := make([]byte, 4098) // todo configurable
Expand Down Expand Up @@ -224,7 +225,14 @@ func readSplitData(rBody io.ReadCloser) chan *bytes.Buffer {
if err != nil {
fmt.Printf("read SplitedDatas error = %v\n", err)
}
cbm <- bytes.NewBuffer(allDataBody)
select {
case <-ctx.Done():
close(cbm)
return
default:
cbm <- bytes.NewBuffer(allDataBody)
}

// temp data is sent, and reset wanting data size
fromFrameHeaderDataSize = 0
}
Expand All @@ -236,7 +244,15 @@ func readSplitData(rBody io.ReadCloser) chan *bytes.Buffer {

func (s *Server) http2HandleFunction(wi http.ResponseWriter, r *http.Request) {
// body data from http
bodyCh := readSplitData(r.Body)
ctx, cancel := context.WithCancel(context.Background())
bodyCh := readSplitData(ctx, r.Body)
defer func() {
cancel()
select {
case <-bodyCh:
default:
}
}()
sendChan := make(chan *bytes.Buffer)
ctrlChan := make(chan http.Header)
errChan := make(chan interface{})
Expand Down

0 comments on commit 131170d

Please sign in to comment.