From 3f291bb38ad54333b5bb065df4ba26ad76a612dc Mon Sep 17 00:00:00 2001 From: Leopoldo Pla Date: Mon, 27 Jun 2022 14:03:44 +0200 Subject: [PATCH] Implement new option to avoid Bash argument amount limit Also, include 'go.mod' file for a proper and modern local development --- cmd/giashard/main.go | 88 ++++++++++++++++++++++++++++++++------------ go.mod | 13 +++++++ 2 files changed, 78 insertions(+), 23 deletions(-) create mode 100644 go.mod diff --git a/cmd/giashard/main.go b/cmd/giashard/main.go index 3e5cfa0..895cb6d 100644 --- a/cmd/giashard/main.go +++ b/cmd/giashard/main.go @@ -1,16 +1,18 @@ package main import ( + "bufio" "errors" - "strings" "flag" "fmt" + "github.com/paracrawl/giashard" "log" "os" - "github.com/paracrawl/giashard" + "strings" ) var outdir string +var dirslist string var shards uint var batchsize int64 var fileslist string @@ -20,22 +22,55 @@ var schema = []string{"url", "mime", "plain_text"} func init() { flag.StringVar(&outdir, "o", ".", "Output location") + flag.StringVar(&dirslist, "l", "", "Input file listing all input directories") flag.StringVar(&fileslist, "f", "plain_text,url,mime", "Files to shard, separated by commas") flag.UintVar(&shards, "n", 8, "Number of shards (2^n)") flag.Int64Var(&batchsize, "b", 100, "Batch size in MB") flag.StringVar(&domainList, "d", "", "Additional public suffix entries") flag.Usage = func() { - fmt.Fprintf(flag.CommandLine.Output(), "Usage: %s [flags] input directories\n", os.Args[0]) + _, err := fmt.Fprintf(flag.CommandLine.Output(), "Usage: %s [flags] input directories\n", os.Args[0]) + if err != nil { + return + } flag.PrintDefaults() - fmt.Fprintf(flag.CommandLine.Output(), -`Shards together the directories give on input. They are assumed to be in the + _, err = fmt.Fprintf(flag.CommandLine.Output(), + `Shards together the directories given on input. They are assumed to be in the standard Paracrawl column storage format. The output is a tree of directories of the form: outdir/shard/batch where shard is computed as a hash of the significant part of the hostname in a url and batch is approximately fixed size. `) + if err != nil { + return + } } } +func processfile(source string, schema []string, w *giashard.Shard, hostname string) { + log.Printf("Processing input: %v", source) + r, err := giashard.NewColumnReader(source, schema...) + if err != nil { + log.Printf("Error opening input reader: %v", err) + return + } + + // provenance data - where is this from + provdata := []byte(fmt.Sprintf("%s:%s", hostname, source)) + for row := range r.Rows() { + row["source"] = provdata + if err := w.WriteRow(row); err != nil { + if errors.Is(err, giashard.ShardError) { // not fatal + log.Print(err) + continue + } + log.Fatalf("Error writing row: %v", err) + } + } + + err = r.Close() + if err != nil { + return + } +} func main() { log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) flag.Parse() @@ -50,40 +85,47 @@ func main() { } } - w, err := giashard.NewShard(outdir, shards, batchsize * 1024 * 1024, "url", append(schema, "source")...) + w, err := giashard.NewShard(outdir, shards, batchsize*1024*1024, "url", append(schema, "source")...) if err != nil { log.Fatalf("Error opening output shards: %v", err) } - defer w.Close() + defer func(w *giashard.Shard) { + var err = w.Close() + if err != nil { + + } + }(w) hostname, err := os.Hostname() if err != nil { log.Fatalf("Error getting local hostname: %v", err) } - for i:=0; i