Skip to content

Commit

Permalink
Implement new option to avoid Bash argument amount limit
Browse files Browse the repository at this point in the history
Also, include 'go.mod' file for a proper and modern local development
  • Loading branch information
lpla committed Jun 27, 2022
1 parent 1e48f2a commit 3f291bb
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 23 deletions.
88 changes: 65 additions & 23 deletions cmd/giashard/main.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package main

import (
"bufio"
"errors"
"strings"
"flag"
"fmt"
"github.com/paracrawl/giashard"
"log"
"os"
"github.com/paracrawl/giashard"
"strings"
)

var outdir string
var dirslist string
var shards uint
var batchsize int64
var fileslist string
Expand All @@ -20,22 +22,55 @@ var schema = []string{"url", "mime", "plain_text"}

func init() {
flag.StringVar(&outdir, "o", ".", "Output location")
flag.StringVar(&dirslist, "l", "", "Input file listing all input directories")
flag.StringVar(&fileslist, "f", "plain_text,url,mime", "Files to shard, separated by commas")
flag.UintVar(&shards, "n", 8, "Number of shards (2^n)")
flag.Int64Var(&batchsize, "b", 100, "Batch size in MB")
flag.StringVar(&domainList, "d", "", "Additional public suffix entries")
flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), "Usage: %s [flags] input directories\n", os.Args[0])
_, err := fmt.Fprintf(flag.CommandLine.Output(), "Usage: %s [flags] input directories\n", os.Args[0])
if err != nil {
return
}
flag.PrintDefaults()
fmt.Fprintf(flag.CommandLine.Output(),
`Shards together the directories give on input. They are assumed to be in the
_, err = fmt.Fprintf(flag.CommandLine.Output(),
`Shards together the directories given on input. They are assumed to be in the
standard Paracrawl column storage format. The output is a tree of directories
of the form: outdir/shard/batch where shard is computed as a hash of the
significant part of the hostname in a url and batch is approximately fixed size.
`)
if err != nil {
return
}
}
}

func processfile(source string, schema []string, w *giashard.Shard, hostname string) {
log.Printf("Processing input: %v", source)
r, err := giashard.NewColumnReader(source, schema...)
if err != nil {
log.Printf("Error opening input reader: %v", err)
return
}

// provenance data - where is this from
provdata := []byte(fmt.Sprintf("%s:%s", hostname, source))
for row := range r.Rows() {
row["source"] = provdata
if err := w.WriteRow(row); err != nil {
if errors.Is(err, giashard.ShardError) { // not fatal
log.Print(err)
continue
}
log.Fatalf("Error writing row: %v", err)
}
}

err = r.Close()
if err != nil {
return
}
}
func main() {
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
flag.Parse()
Expand All @@ -50,40 +85,47 @@ func main() {
}
}

w, err := giashard.NewShard(outdir, shards, batchsize * 1024 * 1024, "url", append(schema, "source")...)
w, err := giashard.NewShard(outdir, shards, batchsize*1024*1024, "url", append(schema, "source")...)
if err != nil {
log.Fatalf("Error opening output shards: %v", err)
}
defer w.Close()
defer func(w *giashard.Shard) {
var err = w.Close()
if err != nil {

}
}(w)

hostname, err := os.Hostname()
if err != nil {
log.Fatalf("Error getting local hostname: %v", err)
}

for i:=0; i<flag.NArg(); i++ {
for i := 0; i < flag.NArg(); i++ {
source := flag.Arg(i)
processfile(source, schema, w, hostname)
}

log.Printf("Processing input: %v", source)
r, err := giashard.NewColumnReader(source, schema...)
if dirslist != "" {
file, err := os.Open(dirslist)
if err != nil {
log.Printf("Error opening input reader: %v", err)
continue
log.Fatal(err)
}
defer func(file *os.File) {
var err = file.Close()
if err != nil {

// provenance data - where is this from
provdata := []byte(fmt.Sprintf("%s:%s", hostname, source))
for row := range r.Rows() {
row["source"] = provdata
if err := w.WriteRow(row); err != nil {
if errors.Is(err, giashard.ShardError) { // not fatal
log.Print(err)
continue
}
log.Fatalf("Error writing row: %v", err)
}
}(file)

scanner := bufio.NewScanner(file)
for scanner.Scan() {
source := scanner.Text()
processfile(source, schema, w, hostname)
}

r.Close()
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}
}
13 changes: 13 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module github.com/paracrawl/giashard

go 1.18

require (
github.com/weppos/publicsuffix-go v0.15.0
gopkg.in/yaml.v2 v2.4.0
)

require (
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 // indirect
golang.org/x/text v0.3.0 // indirect
)

0 comments on commit 3f291bb

Please sign in to comment.