From 9ca80648bbe7e6cce76f4f2ad6a7ccf48b19359a Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Wed, 1 Jun 2022 05:50:40 -0700 Subject: [PATCH] s2c: Add Snappy/S2 stream recompression (#611) Allows easy recompression from commandline. --- s2/cmd/s2c/main.go | 59 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/s2/cmd/s2c/main.go b/s2/cmd/s2c/main.go index 9d4af2a1ee..4292d62783 100644 --- a/s2/cmd/s2c/main.go +++ b/s2/cmd/s2c/main.go @@ -31,6 +31,7 @@ var ( faster = flag.Bool("faster", false, "Compress faster, but with a minor compression loss") slower = flag.Bool("slower", false, "Compress more, but a lot slower") snappy = flag.Bool("snappy", false, "Generate Snappy compatible output stream") + recomp = flag.Bool("recomp", false, "Recompress Snappy or S2 input") cpu = flag.Int("cpu", runtime.GOMAXPROCS(0), "Compress using this amount of threads") blockSize = flag.String("blocksize", "4M", "Max block size. Examples: 64K, 256K, 1M, 4M. Must be power of two and <= 4MB") block = flag.Bool("block", false, "Compress as a single block. Will load content into memory.") @@ -342,6 +343,9 @@ Options:`) } for _, filename := range files { if *block { + if *recomp { + exitErr(errors.New("cannot recompress blocks (yet)")) + } func() { var closeOnce sync.Once dstFilename := cleanFileName(fmt.Sprintf("%s%s", filename, ext)) @@ -429,13 +433,36 @@ Options:`) } func() { var closeOnce sync.Once - dstFilename := cleanFileName(fmt.Sprintf("%s%s", filename, ext)) + outFileBase := filename + if *recomp { + switch { + case strings.HasSuffix(outFileBase, s2Ext): + outFileBase = strings.TrimSuffix(outFileBase, s2Ext) + case strings.HasSuffix(outFileBase, snappyExt): + outFileBase = strings.TrimSuffix(outFileBase, snappyExt) + case strings.HasSuffix(outFileBase, ".snappy"): + outFileBase = strings.TrimSuffix(outFileBase, ".snappy") + } + } + dstFilename := cleanFileName(fmt.Sprintf("%s%s", outFileBase, ext)) if *out != "" { dstFilename = *out } if !*quiet { fmt.Print("Compressing ", filename, " -> ", dstFilename) } + + if dstFilename == filename && !*stdout { + if *remove { + exitErr(errors.New("cannot remove when input = output")) + } + renameDst := dstFilename + dstFilename = fmt.Sprintf(".tmp-%s%s", time.Now().Format("2006-01-02T15-04-05Z07"), ext) + defer func() { + exitErr(os.Rename(dstFilename, renameDst)) + }() + } + // Input file. file, _, mode := openFile(filename) exitErr(err) @@ -443,6 +470,18 @@ Options:`) src, err := readahead.NewReaderSize(file, *cpu+1, 1<<20) exitErr(err) defer src.Close() + var rc = &rCounter{ + in: src, + } + if !*quiet { + // We only need to count for printing + src = rc + } + if *recomp { + dec := s2.NewReader(src) + src = ioutil.NopCloser(dec) + } + var out io.Writer switch { case *stdout: @@ -466,11 +505,12 @@ Options:`) wr.Reset(&wc) defer wr.Close() start := time.Now() - input, err := wr.ReadFrom(src) + _, err = wr.ReadFrom(src) exitErr(err) err = wr.Close() exitErr(err) if !*quiet { + input := rc.n elapsed := time.Since(start) mbpersec := (float64(input) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second))) pct := float64(wc.n) * 100 / float64(input) @@ -608,3 +648,18 @@ func (w *wCounter) Write(p []byte) (n int, err error) { return n, err } + +type rCounter struct { + n int64 + in io.Reader +} + +func (w *rCounter) Read(p []byte) (n int, err error) { + n, err = w.in.Read(p) + w.n += int64(n) + return n, err +} + +func (w *rCounter) Close() (err error) { + return nil +}