Skip to content

Commit

Permalink
fix some code by CR
Browse files Browse the repository at this point in the history
  • Loading branch information
wangforthinker committed Mar 19, 2020
1 parent f241659 commit 5b9b471
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 210 deletions.
148 changes: 0 additions & 148 deletions dfdaemon/localManager/finite_queue.go

This file was deleted.

4 changes: 2 additions & 2 deletions dfdaemon/localManager/local_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,13 @@ func (ld *LocalDownloader) reportResource(info *downloadNodeInfo) {
func (ld *LocalDownloader) processPiece(ctx context.Context, info* downloadNodeInfo) {
logrus.Debugf("pieces to be processed:%v", info)
pieceTask := &types.PullPieceTaskResponseContinueData{
Range: fmt.Sprintf("0-%d", ld.length - 1),
Range: fmt.Sprintf("0-%d", ld.length - 1 + config.PieceMetaSize),
PieceNum: 0,
PieceSize: int32(ld.length),
Cid: info.peerID,
PeerIP: info.ip,
PeerPort: info.port,
Path: info.path,
Path: fmt.Sprintf("%s%s", config.PeerHTTPPathPrefix, info.path),
Url: info.url,
Header: info.header,
DirectSource: info.directSource,
Expand Down
17 changes: 10 additions & 7 deletions dfdaemon/localManager/request_manager.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
package localManager

import (
"time"

"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/pkg/queue"

"github.com/pkg/errors"
"time"
)

// requestManager manage the recent the requests, it provides the
type requestManager struct {
q *finiteQueue
q *queue.CircleQueue
}

func newRequestManager() *requestManager {
return &requestManager{
q: newFiniteQueue(100),
q: queue.NewCircleQueue(100),
}
}

func (rm *requestManager) addRequest(url string, directReturnSrc bool) error {
data, err := rm.q.getItemByKey(url)
data, err := rm.q.GetItemByKey(url)
if err != nil && err != errortypes.ErrDataNotFound {
return err
}
Expand All @@ -42,7 +45,7 @@ func (rm *requestManager) addRequest(url string, directReturnSrc bool) error {
}

rs.updateRecentTime()
rm.q.putFront(url, rs)
rm.q.PutFront(url, rs)

return nil
}
Expand All @@ -52,7 +55,7 @@ func (rm *requestManager) getRecentRequest(count int) []string {
if count == 0 {
count = 5
}
arr := rm.q.getFront(count)
arr := rm.q.GetFront(count)
result := []string{}

for _, d := range arr {
Expand All @@ -65,7 +68,7 @@ func (rm *requestManager) getRecentRequest(count int) []string {
}

func (rm *requestManager) getRequestState(url string) (*requestState, error) {
data, err := rm.q.getItemByKey(url)
data, err := rm.q.GetItemByKey(url)
if err != nil {
return nil, err
}
Expand Down
13 changes: 6 additions & 7 deletions dfdaemon/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ package proxy
import (
"crypto/tls"
"fmt"
"github.com/dragonflyoss/Dragonfly/dfdaemon/downloader/dfget"
"github.com/dragonflyoss/Dragonfly/dfdaemon/downloader/p2p"
"github.com/dragonflyoss/Dragonfly/dfdaemon/localManager"
"io"
"net"
"net/http"
Expand All @@ -32,6 +29,9 @@ import (

"github.com/dragonflyoss/Dragonfly/dfdaemon/config"
"github.com/dragonflyoss/Dragonfly/dfdaemon/downloader"
"github.com/dragonflyoss/Dragonfly/dfdaemon/downloader/dfget"
"github.com/dragonflyoss/Dragonfly/dfdaemon/downloader/p2p"
"github.com/dragonflyoss/Dragonfly/dfdaemon/localManager"
"github.com/dragonflyoss/Dragonfly/dfdaemon/transport"

"github.com/pkg/errors"
Expand Down Expand Up @@ -145,7 +145,6 @@ func NewFromConfig(c config.Properties) (*Proxy, error) {
WithRules(c.Proxies),
WithRegistryMirror(c.RegistryMirror),
WithStreamDownloaderFactory(func() downloader.Stream {
dfget.NewGetter(c.DFGetConfig())
return p2p.NewClient(c.DFGetConfig())
}),
WithExtremeDownloaderFactory(func() downloader.Stream {
Expand Down Expand Up @@ -202,10 +201,10 @@ type Proxy struct {
// directHandler are used to handle non proxy requests
directHandler http.Handler
// downloadFactory returns the downloader used for p2p downloading
downloadFactory downloader.Factory
streamDownloadFactory downloader.StreamFactory
downloadFactory downloader.Factory
streamDownloadFactory downloader.StreamFactory
extremeDownloadFactory downloader.StreamFactory
config *config.Properties
config *config.Properties
}

func (proxy *Proxy) mirrorRegistry(w http.ResponseWriter, r *http.Request) {
Expand Down
85 changes: 51 additions & 34 deletions dfdaemon/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,42 +151,14 @@ func WithCondition(c func(r *http.Request) bool) Option {
// RoundTrip only process first redirect at present
// fix resource release
func (roundTripper *DFRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if roundTripper.config.Extreme.SpecKeyOfDirectRet != "" {
if req.Header.Get(roundTripper.config.Extreme.SpecKeyOfDirectRet) != "" {
return &http.Response{
StatusCode: 200,
// add a body with no data
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}, nil
}
ret, resp, err := roundTripper.directReturn(req)
if ret {
return resp, err
}

if req.Header.Get("x-numerical-ware") != "" {
var baseLine int64 = 0
var msgErr error
baseLineStr := req.Header.Get("x-numerical-ware-baseline")
if baseLineStr != "" {
bl, err := strconv.ParseInt(baseLineStr, 10, 64)
if err != nil {
msgErr = fmt.Errorf("base line parse failed: %v", err)
}

baseLine = bl
}

reset := req.Header.Get("x-numerical-ware-reset")
if strings.TrimSpace(reset) == "true" {
defer roundTripper.nWare.Reset()
}

rs := roundTripper.nWare.OutputWithBaseLine(baseLine)
rs.Err = msgErr
rsData,_ := json.Marshal(rs)

return &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader(rsData)),
}, nil
ret, resp, err = roundTripper.isNumericalResult(req)
if ret {
return resp, err
}

if roundTripper.ShouldUseDfget(req) {
Expand Down Expand Up @@ -247,3 +219,48 @@ func (roundTripper *DFRoundTripper) downloadByStream(ctx context.Context, url st
func NeedUseGetter(req *http.Request) bool {
return req.Method == http.MethodGet && layerReg.MatchString(req.URL.Path)
}

func (roundTripper *DFRoundTripper) directReturn(req *http.Request) (bool, *http.Response, error) {
if roundTripper.config.Extreme.SpecKeyOfDirectRet != "" &&
req.Header.Get(roundTripper.config.Extreme.SpecKeyOfDirectRet) != ""{
return true, &http.Response{
StatusCode: 200,
// add a body with no data
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}, nil
}

return false, nil, nil
}

func (roundTripper *DFRoundTripper) isNumericalResult(req *http.Request) (bool, *http.Response, error) {
if req.Header.Get("x-numerical-ware") == "" {
return false, nil, nil
}

var baseLine int64 = 0
var msgErr error
baseLineStr := req.Header.Get("x-numerical-ware-baseline")
if baseLineStr != "" {
bl, err := strconv.ParseInt(baseLineStr, 10, 64)
if err != nil {
msgErr = fmt.Errorf("base line parse failed: %v", err)
}

baseLine = bl
}

reset := req.Header.Get("x-numerical-ware-reset")
if strings.TrimSpace(reset) == "true" {
defer roundTripper.nWare.Reset()
}

rs := roundTripper.nWare.OutputWithBaseLine(baseLine)
rs.Err = msgErr
rsData,_ := json.Marshal(rs)

return true, &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader(rsData)),
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (csw *ClientStreamWriter) writePieceToPipe(p *Piece) error {
func (csw *ClientStreamWriter) Read(p []byte) (n int, err error) {
n, err = csw.limitReader.Read(p)
csw.alreadyReadSize += int64(n)
if csw.alreadyReadSize == csw.expectReadSize {
if csw.expectReadSize > 0 && csw.alreadyReadSize >= csw.expectReadSize {
go csw.notifyCloseStream()
if csw.nWare != nil {
csw.nWare.Add(csw.nKey, transport.RequestReadName, time.Since(csw.startTime).Nanoseconds())
Expand Down
Loading

0 comments on commit 5b9b471

Please sign in to comment.