Skip to content

Add monitor mode #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 23, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 109 additions & 19 deletions PublicGitArchive/pga2uast/pga2uast.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
"log"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -37,7 +38,7 @@ import (

func parseFlags() (
inputDirectory, bblfshEndpoint, outputDirectory, outputFormat string,
languages map[string]struct{}) {
languages map[string]struct{}, workers int, monitor bool) {

var langsList []string
pflag.StringVarP(&outputDirectory, "output", "o", "uast", "Output directory where to save the results.")
Expand All @@ -47,6 +48,9 @@ func parseFlags() (
"value \"all\" disables any filtering. Example: --languages=c++,python")
pflag.StringVarP(&outputFormat, "format", "f", "zip", "Output format: choose one of zip, parquet.")
pflag.StringVarP(&bblfshEndpoint, "bblfsh", "b", "0.0.0.0:9432", "Babelfish server address.")
pflag.IntVarP(&workers, "workers", "n", runtime.NumCPU()*2, "Number of goroutines to parse UASTs.")
pflag.BoolVarP(&monitor, "monitor", "m", false, "Activate the advanced detection of \"bad\" " +
"repositories and automatic restart on failures.")
pflag.Parse()
if pflag.NArg() != 1 {
log.Fatalf("usage: pga2uast /path/to/directory/with/siva")
Expand Down Expand Up @@ -161,19 +165,45 @@ func decompressBytes(buffer []byte) []byte {
return output
}

type parseTask struct {
FileName string
FullPath string
CompressedContents []byte
HeadUasts map[string][]byte
}

const BlacklistFileName = "blacklist.txt"

func getCurrentTaskFilePath() string {
return filepath.Join(os.TempDir(), "pga2uast-current-task.txt")
}

func processRepository(
r borges.Repository, bblfshEndpoint, outputDirectory, outputFormat string, languages map[string]struct{},
bar *progress.ProgressBar, filesProcessed *int) (elapsed time.Duration) {
workers int, bar *progress.ProgressBar, filesProcessed *int) (elapsed time.Duration) {

startTime := time.Now()
defer func() {
elapsed = time.Now().Sub(startTime)
}()
defer bar.Increment()
rid := r.ID().String()

if blacklist, err := ioutil.ReadFile(BlacklistFileName); err != nil {
for _, black := range strings.Split(string(blacklist), "\n") {
if black == rid {
log.Printf("skipped %s because it is blacklisted", rid)
return
}
}
}

if _, err := os.Stat(getOutputFileName(outputDirectory, rid, outputFormat)); err == nil {
return
}
if err := ioutil.WriteFile(getCurrentTaskFilePath(), []byte(rid), 0666); err != nil {
log.Fatalf("cannot write %s: %v", getCurrentTaskFilePath(), err)
}
heads, names, err := listHeads(r.R())
if len(heads) == 0 {
log.Printf("%s: no heads: %v", rid, err)
Expand All @@ -182,6 +212,29 @@ func processRepository(
wg := sync.WaitGroup{}
uasts := map[string]map[string][]byte{}
headLock := sync.Mutex{}
parseTasks := make(chan parseTask, workers*2)
for i:=0; i<workers; i++ {
go func() {
client, err := bblfsh.NewClient(bblfshEndpoint)
if err != nil {
log.Panicf("cannot initialize the Bablefish client on %s: %v", bblfshEndpoint, err)
}
defer client.Close()
for {
task, more := <-parseTasks
if !more {
break
}
uast, err := parseFile(client, task.FullPath, decompressBytes(task.CompressedContents))
if err == nil {
headLock.Lock()
task.HeadUasts[task.FileName] = compressBytes(uast)
headLock.Unlock()
}
wg.Done()
}
}()
}
for headIndex, head := range heads {
headUasts := map[string][]byte{}
uasts[names[headIndex]] = headUasts
Expand Down Expand Up @@ -216,15 +269,12 @@ func processRepository(
}
}
wg.Add(1)
go func(fileName string, contents []byte, headUasts map[string][]byte) {
defer wg.Done()
uast, err := parseFile(bblfshEndpoint, fileName, decompressBytes(contents))
if err == nil {
headLock.Lock()
headUasts[file.Name] = compressBytes(uast)
headLock.Unlock()
}
}(fmt.Sprintf("%s/%s/%s", rid, names[headIndex], file.Name), compressBytes(contents), headUasts)
parseTasks <- parseTask{
FileName: file.Name,
FullPath: fmt.Sprintf("%s/%s/%s", rid, names[headIndex], file.Name),
CompressedContents: compressBytes(contents),
HeadUasts: headUasts,
}
*filesProcessed++
bar.Postfix(fmt.Sprintf(" %s/%s %d", rid, names[headIndex], *filesProcessed))
return nil
Expand All @@ -233,6 +283,7 @@ func processRepository(
log.Printf("%s: failed to iter files in %s: %v", rid, head.String(), err)
}
}
close(parseTasks)
wg.Wait()
if err := writeOutput(rid, uasts, outputDirectory, outputFormat); err != nil {
log.Fatalf("failed to write the results for %s: %v", rid, err)
Expand All @@ -257,12 +308,7 @@ func printElapsedTimes(times map[string]time.Duration) {
}
}

func parseFile(endpoint string, path string, contents []byte) ([]byte, error) {
client, err := bblfsh.NewClient(endpoint)
if err != nil {
return nil, err
}
defer client.Close()
func parseFile(client *bblfsh.Client, path string, contents []byte) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()
request := client.NewParseRequest().
Expand Down Expand Up @@ -371,8 +417,52 @@ func writeParquet(uasts map[string]map[string][]byte, file *os.File) (err error)
return
}

const SlaveEnvVar = "pga2uast-slave"

func launchSlave() *exec.Cmd {
cmd := exec.Command(os.Args[0])
e := os.Environ()
e = append(e, SlaveEnvVar + "=1")
cmd.Env = e
for _, arg := range os.Args[1:] {
if arg != "-m" && arg != "--monitor" {
cmd.Args = append(cmd.Args, arg)
}
}
log.Printf("running %s", strings.Join(cmd.Args, " "))
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
log.Fatalf("Failed to start the slave process: %v", err)
}
return cmd
}

func becomeMonitor() {
for {
if err := launchSlave().Wait(); err == nil {
break
}
task, err := ioutil.ReadFile(getCurrentTaskFilePath())
if err != nil {
log.Fatalf("cannot read %s: %v", getCurrentTaskFilePath(), err)
}
log.Printf("blacklisting %s", string(task))
blacklist, _ := ioutil.ReadFile(BlacklistFileName)
blacklist = append(blacklist, task...)
if err = ioutil.WriteFile(BlacklistFileName, blacklist, 0666); err != nil {
log.Fatalf("cannot write %s: %v", BlacklistFileName, err)
}
}
}

func main() {
inputDirectory, bblfshEndpoint, outputDirectory, outputFormat, languages := parseFlags()
inputDirectory, bblfshEndpoint, outputDirectory, outputFormat, languages, workers, monitor := parseFlags()
if monitor {
becomeMonitor()
return
}
fs := osfs.New(inputDirectory)
lib, err := legacysiva.NewLibrary("pga2siva", fs, &legacysiva.LibraryOptions{})
if err != nil {
Expand All @@ -390,7 +480,7 @@ func main() {
defer printElapsedTimes(times)
err = repos.ForEach(func(r borges.Repository) error {
times[r.ID().String()] = processRepository(
r, bblfshEndpoint, outputDirectory, outputFormat, languages, bar, &filesProcessed)
r, bblfshEndpoint, outputDirectory, outputFormat, languages, workers, bar, &filesProcessed)
return nil
})
if err != nil {
Expand Down