Skip to content

Commit

Permalink
Merge pull request filecoin-project#1 from EntropyPool/feature/entrop…
Browse files Browse the repository at this point in the history
…ool-preset-sector

Feature/entropool preset sector
  • Loading branch information
kikakkz authored Aug 22, 2020
2 parents db5c9ca + 821a01e commit 3b37009
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 60 deletions.
226 changes: 173 additions & 53 deletions extern/sector-storage/ffiwrapper/sealer_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"math/bits"
"os"
"runtime"
Expand Down Expand Up @@ -52,83 +54,147 @@ func (sb *Sealer) NewSector(ctx context.Context, sector abi.SectorID) error {
return nil
}

func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
log.Debugf("tropy: add pieces for %+v pieces ~", len(existingPieceSizes))
var offset abi.UnpaddedPieceSize
for _, size := range existingPieceSizes {
offset += size
func presetDir() string {
tmpdir, ok := os.LookupEnv("TMPDIR")
if !ok {
tmpdir = "/var/tmp"
}
presetDir := fmt.Sprintf("%s/preset", tmpdir)
os.MkdirAll(presetDir, os.ModePerm)
return presetDir
}

maxPieceSize := abi.PaddedPieceSize(sb.ssize)
func (sb *Sealer) presetSectorFilename() string {
return fmt.Sprintf("%s/sector-%v", presetDir(), abi.PaddedPieceSize(sb.ssize))
}

if offset.Padded()+pieceSize.Padded() > maxPieceSize {
return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces", pieceSize, sector, offset)
func (sb *Sealer) presetSectorPieceCidFilename() string {
return fmt.Sprintf("%s/sector-piece-cid-%v", presetDir(), abi.PaddedPieceSize(sb.ssize))
}

func (sb *Sealer) presetSectorPieceCid() (string, error) {
cidFilename := sb.presetSectorPieceCidFilename()
if _, err := os.Stat(cidFilename); os.IsNotExist(err) {
return "", xerrors.Errorf("preset sector piece cid is not exist")
}
cid, err := ioutil.ReadFile(cidFilename)
if nil != err {
return "", xerrors.Errorf("cannot read preset sector piece cid")
}
return string(cid), nil
}

var err error
var done func()
var stagedFile *partialFile
func copyFile(dst string, src string) error {
sFile, err := os.Open(src)
if err != nil {
log.Warnf("cannot open %+v: %+v", src, err)
return err
}
defer sFile.Close()

defer func() {
if done != nil {
done()
}
eFile, err := os.Create(dst)
if err != nil {
log.Warnf("cannot create %+v: %+v", dst, err)
return err
}
defer eFile.Close()

if stagedFile != nil {
if err := stagedFile.Close(); err != nil {
log.Errorf("closing staged file: %+v", err)
_, err = io.Copy(eFile, sFile) // first var shows number of bytes
if err != nil {
log.Warnf("cannot copy %+v -> %+v: %+v", src, dst, err)
return err
}

err = eFile.Sync()
if err != nil {
log.Warnf("cannot sync %+v: %+v", dst, err)
return err
}

return nil
}

func (sb *Sealer) tryCreateUnsealedFileFromPreset(ctx context.Context, sector abi.SectorID, maxPieceSize abi.PaddedPieceSize) (*partialFile, bool, func(), error) {
stagedPath, done, err := sb.sectors.AcquireSector(ctx, sector, 0, stores.FTUnsealed, stores.PathSealing)
if err != nil {
return nil, false, done, xerrors.Errorf("acquire unsealed sector: %w", err)
}

presetFile := sb.presetSectorFilename()
if _, err := os.Stat(presetFile); nil == err {
err = copyFile(stagedPath.Unsealed, presetFile)
if nil == err {
stagedFile, err := openPartialFile(maxPieceSize, stagedPath.Unsealed)
if nil == err {
log.Debugf("success to create unseal from preset file")
return stagedFile, true, done, nil
}
}
}()
}

var stagedPath stores.SectorPaths
log.Debugf("tropy: acruire sector for %+v pieces ~", len(existingPieceSizes))
if len(existingPieceSizes) == 0 {
log.Debugf("tropy: acruire sector ~")
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, stores.FTUnsealed, stores.PathSealing)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
stagedFile, err := createPartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil {
return nil, false, done, xerrors.Errorf("creating unsealed sector file: %w", err)
}
return stagedFile, false, done, nil

log.Debugf("tropy: create unsealed sector file %+v ~", maxPieceSize)
stagedFile, err = createPartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err)
}
log.Debugf("tropy: success to create unsealed sector file %+v ~", stagedFile)
} else {
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, stores.PathSealing)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
}

stagedFile, err = openPartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("opening unsealed sector file: %w", err)
}
func (sb *Sealer) presetPieceCids(stagedFile *partialFile, pieceSize abi.UnpaddedPieceSize, chunk abi.PaddedPieceSize) ([]abi.PieceInfo, error) {
var pieceCids []abi.PieceInfo

presetCid, err := sb.presetSectorPieceCid()
if nil != err {
return pieceCids, err
}

w, err := stagedFile.Writer(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded())
presetCidV, err := cid.Decode(presetCid)
if nil != err {
return pieceCids, err
}

for total := 0; total < int(pieceSize.Padded()); total += int(chunk) {
pieceCids = append(pieceCids, abi.PieceInfo{
Size: chunk.Unpadded().Padded(),
PieceCID: presetCidV,
})
}

return pieceCids, nil
}

func (sb *Sealer) openExistUnsealedFile(ctx context.Context, sector abi.SectorID, maxPieceSize abi.PaddedPieceSize) (*partialFile, func(), error) {
stagedPath, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, stores.PathSealing)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err)
return nil, done, xerrors.Errorf("acquire unsealed sector: %w", err)
}

pw := fr32.NewPadWriter(w)
stagedFile, err := openPartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil {
return nil, done, xerrors.Errorf("opening unsealed sector file: %w", err)
}

pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw)
return stagedFile, done, nil
}

chunk := abi.PaddedPieceSize(4 << 20)
func (sb *Sealer) pieceCids(stagedFile *partialFile, pieceSize abi.UnpaddedPieceSize, offset abi.UnpaddedPieceSize, file storage.Data, chunk abi.PaddedPieceSize) ([]abi.PieceInfo, error) {
var pieceCids []abi.PieceInfo

w, err := stagedFile.Writer(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded())
if err != nil {
return pieceCids, xerrors.Errorf("getting partial file writer: %w", err)
}

pw := fr32.NewPadWriter(w)
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw)
buf := make([]byte, chunk.Unpadded())
var pieceCids []abi.PieceInfo

log.Debugf("tropy: fill sector ~")
for {
var read int
for rbuf := buf; len(rbuf) > 0; {
n, err := pr.Read(rbuf)
if err != nil && err != io.EOF {
return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err)
return pieceCids, xerrors.Errorf("pr read error: %w", err)
}

rbuf = rbuf[n:]
Expand All @@ -144,17 +210,71 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie

c, err := sb.pieceCid(buf[:read])
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("pieceCid error: %w", err)
return pieceCids, xerrors.Errorf("pieceCid error: %w", err)
}
pieceCids = append(pieceCids, abi.PieceInfo{
Size: abi.UnpaddedPieceSize(len(buf[:read])).Padded(),
PieceCID: c,
})
}
log.Debugf("tropy: sector filled %+v ~", len(pieceCids))

if err := pw.Close(); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("closing padded writer: %w", err)
return pieceCids, xerrors.Errorf("closing padded writer: %w", err)
}

return pieceCids, nil
}

func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
var offset abi.UnpaddedPieceSize
for _, size := range existingPieceSizes {
offset += size
}

maxPieceSize := abi.PaddedPieceSize(sb.ssize)

if offset.Padded()+pieceSize.Padded() > maxPieceSize {
return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces", pieceSize, sector, offset)
}

var err error
var done func()
var stagedFile *partialFile

defer func() {
if done != nil {
done()
}

if stagedFile != nil {
if err := stagedFile.Close(); err != nil {
log.Errorf("closing staged file: %+v", err)
}
}
}()

fromPreset := false
var pieceCids []abi.PieceInfo
chunk := abi.PaddedPieceSize(4 << 20)

if len(existingPieceSizes) == 0 {
stagedFile, fromPreset, done, err = sb.tryCreateUnsealedFileFromPreset(ctx, sector, maxPieceSize)
if fromPreset {
pieceCids, err = sb.presetPieceCids(stagedFile, pieceSize, chunk)
if nil != err {
fromPreset = false
}
}
} else {
stagedFile, done, err = sb.openExistUnsealedFile(ctx, sector, maxPieceSize)
}

if !fromPreset {
pieceCids, err = sb.pieceCids(stagedFile, pieceSize, offset, file, chunk)
presetFile := sb.presetSectorFilename()
copyFile(presetFile, stagedFile.path)
presetCidFile := sb.presetSectorPieceCidFilename()
ioutil.WriteFile(presetCidFile, []byte(pieceCids[0].PieceCID.String()), 0644)
}

if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded()); err != nil {
Expand Down
3 changes: 0 additions & 3 deletions extern/sector-storage/localworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.
}

func (l *LocalWorker) sb() (ffiwrapper.Storage, error) {
log.Debugf("tropy: get ffi storage implementation ~")
return ffiwrapper.New(&localWorkerPathProvider{w: l}, l.scfg)
}

Expand All @@ -110,7 +109,6 @@ func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error
return err
}

log.Debugf("tropy: create sector for %+v", sector.Number)
return sb.NewSector(ctx, sector)
}

Expand All @@ -119,7 +117,6 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []
if err != nil {
return abi.PieceInfo{}, err
}
log.Debugf("tropy: add piece in local worker for %+v", sector.Number)
return sb.AddPiece(ctx, sector, epcs, sz, r)
}

Expand Down
4 changes: 0 additions & 4 deletions extern/sector-storage/stores/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int
}

func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) {
log.Debugf("tropy: acquire sector %+v from remote", s.Number)
if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector")
}
Expand Down Expand Up @@ -91,16 +90,13 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
r.fetchLk.Unlock()
}()

log.Debugf("tropy: acquire sector %+v to local storage", s.Number)
paths, stores, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, pathType, op)
if err != nil {
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("local acquire error: %w", err)
}

log.Debugf("tropy: acquire sector %+v to with path type", s.Number)
var toFetch SectorFileType
for _, fileType := range PathTypes {
log.Debugf("tropy: acquire sector %+v filetype %+v existing %+v", s.Number, fileType, existing)
if fileType&existing == 0 {
continue
}
Expand Down

0 comments on commit 3b37009

Please sign in to comment.