-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/fetch p2p network info dfdaemon #5
base: feat/antsystem
Are you sure you want to change the base?
Feat/fetch p2p network info dfdaemon #5
Conversation
…work if need update. fix bug
add some log for test.
fix not report to uploader.
fix requestManager get wrong front urls.
Feat/support local task reload
add heart beat for peer
79ef05b
to
d92072a
Compare
e8ae41e
to
f241659
Compare
"sync" | ||
) | ||
|
||
// finiteQueue provides a circle queue with capacity, and it will weed out the earlier item if full |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkg/queue
下面有个有限队列的实现,跟这个区别在于队列满时会阻塞。可否考虑将本实现也放到pkg/queue
下面。
另外我觉得应该叫循环队列,但是我从实现上看更像个cache。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
嗯 这是一个有限长度的循环队列。已放在pkg/queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
type finiteQueue struct { | ||
sync.Mutex | ||
capacity int | ||
head *itemNode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是否可以考虑使用 list.List
使代码更简洁。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
uploaderAPI api.UploaderAPI | ||
|
||
// postNotifyUploader should be called after notify the local uploader finish | ||
postNotifyUploader func(req *api.FinishTaskRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
没有初始化
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dfget/core/uploader/uploader.go
Outdated
@@ -136,7 +136,7 @@ func launch(cfg *config.Config, p2pPtr *unsafe.Pointer) error { | |||
func waitForStartup(result chan error, p2pPtr *unsafe.Pointer) (err error) { | |||
ticker := time.NewTicker(5 * time.Millisecond) | |||
defer ticker.Stop() | |||
timeout := time.After(233 * time.Millisecond) | |||
timeout := time.After(10 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个为什么要改掉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
之前是为了测试,已改回去。
logrus.Infof("in downloadPiece by returnSrc, url: %s, header: %v, err: %d", pc.pieceTask.Url, header, err) | ||
}else{ | ||
downloadRequest := pc.createDownloadRequest() | ||
downloadRequest.PieceRange = fmt.Sprintf("0-%d", downloadRequest.PieceSize + config.PieceMetaSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这样改原有的下载逻辑都变了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
已修改回原本的逻辑。
buf := make([]byte, 128) | ||
binary.BigEndian.PutUint32(buf, uint32((pieceSize)|(pieceSize)<<4)) | ||
content.Write(buf[:config.PieceHeadSize]) | ||
defer content.Write([]byte{config.PieceTailChar}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
直接添加分片头尾信息有问题,因为分片的md5值是包含了:piece head + piece data + piece tail。
这种情况下考虑使用 NewLimitReaderWithLimiterAndMD5Sum
,下载前后将分片头尾信息放到md5计算中。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
已修改。
@@ -177,6 +212,14 @@ func (csw *ClientStreamWriter) writePieceToPipe(p *Piece) error { | |||
|
|||
func (csw *ClientStreamWriter) Read(p []byte) (n int, err error) { | |||
n, err = csw.limitReader.Read(p) | |||
csw.alreadyReadSize += int64(n) | |||
if csw.alreadyReadSize == csw.expectReadSize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里是为了做什么?csw.alreadyReadSize += int64(n)
结果可能也大于 csw. expectReadSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
当读到的内容跟预计的一样多时,主动close(pipewriter)。 不然请求方不会收到EOF。
dfdaemon/transport/transport.go
Outdated
@@ -126,11 +151,41 @@ func WithCondition(c func(r *http.Request) bool) Option { | |||
// RoundTrip only process first redirect at present | |||
// fix resource release | |||
func (roundTripper *DFRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { | |||
if req.Header.Get("x-nydus-proxy-healthcheck") != "" { | |||
if roundTripper.config.Extreme.SpecKeyOfDirectRet != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这一段改造可否独立个函数出来,因为跟之前的逻辑基本都不一样,可在这里判断进入不同的处理函数中。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done。
0364ad6
to
4d1a730
Compare
pkg/queue/circle_queue.go
Outdated
// remove the earliest item | ||
i := q.internalRemoveTail() | ||
if i != nil { | ||
delete(q.itemMap, key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deleted element is not the tail of the list, it may delete nothing actually.
The stored value in list.Element
may be like this:
type elementData struct {
key string
data interface{}
}
So delete the value from map can be written:
delete(q.itemMap, i.Value.(*elementData).key)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
|
||
// if pieceRange == "" means all Pieces of file | ||
func (sm *SchedulerManager) SchedulerByTaskID(ctx context.Context, taskID string, srcCid string, pieceRange string, pieceSize int32) ([]*Result, error) { | ||
sm.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个锁的粒度似乎太大了,所有任务都需要串行调度,而且对于peer负载的更新也依赖于此锁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
实际测试下来,调度所占耗时不到1%,暂时不影响。
PeerInfo: node.Basic, | ||
Generation: sm.generation, | ||
StartDownload: func(peerID string, generation int64) { | ||
sm.downloadStartCh <- notifySt{peerID: peerID, generation: generation} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里使用chan是为了实现异步更新?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是的
dfdaemon/proxy/proxy.go
Outdated
@@ -129,9 +145,16 @@ func NewFromConfig(c config.Properties) (*Proxy, error) { | |||
WithRules(c.Proxies), | |||
WithRegistryMirror(c.RegistryMirror), | |||
WithStreamDownloaderFactory(func() downloader.Stream { | |||
// dfget.NewGetter(c.DFGetConfig()) | |||
dfget.NewGetter(c.DFGetConfig()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这行是不是多余了?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
4d1a730
to
5b9b471
Compare
Ⅰ. Describe what this PR did
Ⅱ. Does this pull request fix one issue?
Ⅲ. Why don't you add test cases (unit test/integration test)? (你真的觉得不需要加测试吗?)
Ⅳ. Describe how to verify it
Ⅴ. Special notes for reviews