diff --git a/mmap_span/mmap_span.go b/mmap_span/mmap_span.go index 6a6c8392ed..d9970be4be 100644 --- a/mmap_span/mmap_span.go +++ b/mmap_span/mmap_span.go @@ -19,6 +19,18 @@ func (ms *MMapSpan) Append(mMap mmap.MMap) { ms.mMaps = append(ms.mMaps, mMap) } +func (ms *MMapSpan) Flush() (errs []error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + for _, mMap := range ms.mMaps { + err := mMap.Flush() + if err != nil { + errs = append(errs, err) + } + } + return +} + func (ms *MMapSpan) Close() (errs []error) { ms.mu.Lock() defer ms.mu.Unlock() @@ -69,6 +81,7 @@ func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte) _n := copyBytes(copyArgs(p, mMapBytes)) p = p[_n:] n += _n + if segments.Int(_n) != e.Length { panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length)) } diff --git a/piece.go b/piece.go index 680675bab3..3ef3576fdd 100644 --- a/piece.go +++ b/piece.go @@ -54,6 +54,12 @@ func (p *Piece) Storage() storage.Piece { return p.t.storage.Piece(p.Info()) } +func (p *Piece) Flush() { + if p.t.storage.Flush != nil { + _ = p.t.storage.Flush() + } +} + func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool { return !p.chunkIndexDirty(chunkIndex) } diff --git a/storage/interface.go b/storage/interface.go index ee6e6336cd..3d1bfb3bfe 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -22,6 +22,7 @@ type TorrentCapacity *func() (cap int64, capped bool) type TorrentImpl struct { Piece func(p metainfo.Piece) PieceImpl Close func() error + Flush func() error // Storages that share the same space, will provide equal pointers. The function is called once // to determine the storage for torrents sharing the same function pointer, and mutated in // place. diff --git a/storage/mmap.go b/storage/mmap.go index 300d863510..a9d922ac20 100644 --- a/storage/mmap.go +++ b/storage/mmap.go @@ -41,7 +41,7 @@ func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash span: span, pc: s.pc, } - return TorrentImpl{Piece: t.Piece, Close: t.Close}, err + return TorrentImpl{Piece: t.Piece, Close: t.Close, Flush: t.Flush}, err } func (s *mmapClientImpl) Close() error { @@ -71,6 +71,13 @@ func (ts *mmapTorrentStorage) Close() error { } return nil } +func (ts *mmapTorrentStorage) Flush() error { + errs := ts.span.Flush() + if len(errs) > 0 { + return errs[0] + } + return nil +} type mmapStoragePiece struct { pc PieceCompletionGetSetter diff --git a/torrent.go b/torrent.go index 7ce097fa62..e53b6bef23 100644 --- a/torrent.go +++ b/torrent.go @@ -2007,7 +2007,11 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { c._stats.incrementPiecesDirtiedGood() } t.clearPieceTouchers(piece) + hasDirty := p.hasDirtyChunks() t.cl.unlock() + if hasDirty { + p.Flush() // You can be synchronous here! + } err := p.Storage().MarkComplete() if err != nil { t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)