Skip to content

Commit

Permalink
s2c: Add Snappy/S2 stream recompression (#611)
Browse files Browse the repository at this point in the history
Allows easy recompression from commandline.
  • Loading branch information
klauspost authored Jun 1, 2022
1 parent d0cd1a9 commit 9ca8064
Showing 1 changed file with 57 additions and 2 deletions.
59 changes: 57 additions & 2 deletions s2/cmd/s2c/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
faster = flag.Bool("faster", false, "Compress faster, but with a minor compression loss")
slower = flag.Bool("slower", false, "Compress more, but a lot slower")
snappy = flag.Bool("snappy", false, "Generate Snappy compatible output stream")
recomp = flag.Bool("recomp", false, "Recompress Snappy or S2 input")
cpu = flag.Int("cpu", runtime.GOMAXPROCS(0), "Compress using this amount of threads")
blockSize = flag.String("blocksize", "4M", "Max block size. Examples: 64K, 256K, 1M, 4M. Must be power of two and <= 4MB")
block = flag.Bool("block", false, "Compress as a single block. Will load content into memory.")
Expand Down Expand Up @@ -342,6 +343,9 @@ Options:`)
}
for _, filename := range files {
if *block {
if *recomp {
exitErr(errors.New("cannot recompress blocks (yet)"))
}
func() {
var closeOnce sync.Once
dstFilename := cleanFileName(fmt.Sprintf("%s%s", filename, ext))
Expand Down Expand Up @@ -429,20 +433,55 @@ Options:`)
}
func() {
var closeOnce sync.Once
dstFilename := cleanFileName(fmt.Sprintf("%s%s", filename, ext))
outFileBase := filename
if *recomp {
switch {
case strings.HasSuffix(outFileBase, s2Ext):
outFileBase = strings.TrimSuffix(outFileBase, s2Ext)
case strings.HasSuffix(outFileBase, snappyExt):
outFileBase = strings.TrimSuffix(outFileBase, snappyExt)
case strings.HasSuffix(outFileBase, ".snappy"):
outFileBase = strings.TrimSuffix(outFileBase, ".snappy")
}
}
dstFilename := cleanFileName(fmt.Sprintf("%s%s", outFileBase, ext))
if *out != "" {
dstFilename = *out
}
if !*quiet {
fmt.Print("Compressing ", filename, " -> ", dstFilename)
}

if dstFilename == filename && !*stdout {
if *remove {
exitErr(errors.New("cannot remove when input = output"))
}
renameDst := dstFilename
dstFilename = fmt.Sprintf(".tmp-%s%s", time.Now().Format("2006-01-02T15-04-05Z07"), ext)
defer func() {
exitErr(os.Rename(dstFilename, renameDst))
}()
}

// Input file.
file, _, mode := openFile(filename)
exitErr(err)
defer closeOnce.Do(func() { file.Close() })
src, err := readahead.NewReaderSize(file, *cpu+1, 1<<20)
exitErr(err)
defer src.Close()
var rc = &rCounter{
in: src,
}
if !*quiet {
// We only need to count for printing
src = rc
}
if *recomp {
dec := s2.NewReader(src)
src = ioutil.NopCloser(dec)
}

var out io.Writer
switch {
case *stdout:
Expand All @@ -466,11 +505,12 @@ Options:`)
wr.Reset(&wc)
defer wr.Close()
start := time.Now()
input, err := wr.ReadFrom(src)
_, err = wr.ReadFrom(src)
exitErr(err)
err = wr.Close()
exitErr(err)
if !*quiet {
input := rc.n
elapsed := time.Since(start)
mbpersec := (float64(input) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second)))
pct := float64(wc.n) * 100 / float64(input)
Expand Down Expand Up @@ -608,3 +648,18 @@ func (w *wCounter) Write(p []byte) (n int, err error) {
return n, err

}

type rCounter struct {
n int64
in io.Reader
}

func (w *rCounter) Read(p []byte) (n int, err error) {
n, err = w.in.Read(p)
w.n += int64(n)
return n, err
}

func (w *rCounter) Close() (err error) {
return nil
}

0 comments on commit 9ca8064

Please sign in to comment.