Skip to content

Commit

Permalink
Hash: src repo listing using Engine
Browse files Browse the repository at this point in the history
Although slower, but fixes user experience i.e src-d#138

The only possible workaround \wo Engine using
`org.apache.hadoop.fs.FileSystem#globStatus(org.apache.hadoop.fs.Path)`

did not work, as "$path/*" there does not matche recursivly so

```
      fs.globStatus(new Path(s"$path/*"))
      .flatMap(file => if (file.getPath.getName.endsWith(".siva")) {
          Some(file.getPath)
        } else {
          None
        })
```

would not support bucketed repos, and thus we would need to mimick
`org.apache.hadoop.mapreduce.lib.input.FileInputFormat#listStatus`
manually.

Signed-off-by: Alexander Bezzubov <bzz@apache.org>
  • Loading branch information
bzz committed Jun 26, 2018
1 parent a1c7589 commit 37e2e01
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 26 deletions.
24 changes: 6 additions & 18 deletions src/main/scala/tech/sourced/gemini/cmd/HashSparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import java.net.URI

import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.log4j.{Level, LogManager}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import tech.sourced.engine.provider.RepositoryRDDProvider
import tech.sourced.gemini.Gemini

import scala.util.Properties
Expand Down Expand Up @@ -95,7 +96,7 @@ object HashSparkApp extends App with Logging {
.getOrCreate()

val reposPath = config.reposPath
val repos = listRepositories(reposPath, config.format, spark.sparkContext.hadoopConfiguration, config.limit)
val repos = listRepositories(reposPath, config.format, spark, config.limit)
printRepositories(reposPath, repos)

val gemini = Gemini(spark, log, config.keyspace)
Expand All @@ -111,7 +112,7 @@ object HashSparkApp extends App with Logging {
System.exit(2)
}

private def printRepositories(reposPath: String, repos: Array[Path]): Unit = {
private def printRepositories(reposPath: String, repos: Array[String]): Unit = {
val numToPrint = Math.min(repos.length, printLimit)
println(s"Hashing ${repos.length} repositories in: '$reposPath' " +
s"${if (numToPrint < repos.length) s"(only $numToPrint shown)"}")
Expand All @@ -120,22 +121,9 @@ object HashSparkApp extends App with Logging {
.foreach(repo => println(s"\t$repo"))
}

private def listRepositories(path: String, format: String, conf: Configuration, limit: Int): Array[Path] = {
val fs = FileSystem.get(new URI(path), conf)
val p = new Path(path)

val paths = format match {
case "siva" => findSivaRecursive(p, fs)
case _ => fs.listStatus(p).filter(_.isDirectory).map(_.getPath)
}

def listRepositories(path: String, format: String, ss: SparkSession, limit: Int = 0): Array[String] = {
val paths = RepositoryRDDProvider(ss.sparkContext).get(path, format).map(_.root).collect()
if (limit <= 0) paths else paths.take(limit)
}

// we don't filter files by extension here, because engine doesn't do it too
// it will try to process any file
private def findSivaRecursive(p: Path, fs: FileSystem): Array[Path] =
fs.listStatus(p).flatMap{ file =>
if (file.isDirectory) findSivaRecursive(file.getPath, fs) else Array(file.getPath)
}
}
32 changes: 24 additions & 8 deletions src/test/scala/tech/sourced/gemini/SparkHashSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.slf4j.{Logger => Slf4jLogger}
import tech.sourced.gemini.cmd.HashSparkApp

@tags.Bblfsh
@tags.FeatureExtractor
Expand All @@ -25,7 +26,6 @@ class SparkHashSpec extends FlatSpec
"archiver.go",
"archiver_test.go"
)
var hashResult: HashResult = _

// don't process content of repos to speedup tests
class LimitedHash(s: SparkSession, log: Slf4jLogger) extends Hash(s, log) {
Expand All @@ -36,16 +36,15 @@ class SparkHashSpec extends FlatSpec
def apply(s: SparkSession, log: Slf4jLogger): LimitedHash = new LimitedHash(s, log)
}

override def beforeAll(): Unit = {
super.beforeAll()

def hashWithNewGemini(): HashResult = {
val gemini = Gemini(sparkSession)
val hash = LimitedHash(sparkSession, log)
val repos = gemini.getRepos("src/test/resources/siva/duplicate-files")
hashResult = hash.forRepos(repos)
hash.forRepos(repos)
}

"Hash" should "return correct files" in {
val hashResult = hashWithNewGemini()
val files = hashResult.files
// num of files * num of repos
files.count() shouldEqual 6
Expand All @@ -58,7 +57,7 @@ class SparkHashSpec extends FlatSpec
}

"Hash" should "calculate hashes" in {
val hashes = hashResult.hashes
val hashes = hashWithNewGemini().hashes

// num of not-ignored files * num of repos
hashes.count() shouldEqual 4
Expand All @@ -68,11 +67,11 @@ class SparkHashSpec extends FlatSpec
}

"Hash" should "generate docFreq" in {
val docFreq = hashResult.docFreq
val docFreq = hashWithNewGemini().docFreq
// num of processed files * 2 repo
docFreq.docs shouldEqual 4
docFreq.tokens.size shouldEqual 738
docFreq.df(docFreq.tokens(0)) shouldEqual 3
docFreq.df(docFreq.tokens.head) shouldEqual 3
}

"Hash with limit" should "collect files only from limit repos" in {
Expand All @@ -81,4 +80,21 @@ class SparkHashSpec extends FlatSpec
repos should be(1)
}

".siva files listing" should "include only files, reachable by Engine" in {
val path = "src/test/resources/siva"
val engine = tech.sourced.engine.Engine(sparkSession, path, "siva")
val repos = engine.getRepositories.select("repository_path").distinct
println(s"Expected: \n\t${repos.collect.mkString("\n\t")}")

val reposPaths = HashSparkApp.listRepositories(path, "siva", sparkSession)
println(s"Actual: \n\t${reposPaths.mkString("\n\t")}")

reposPaths.size shouldEqual repos.count
}

".siva files listing" should "no include un-reachable .siva files" in {
val repos = HashSparkApp.listRepositories("src/test", "siva", sparkSession)
repos.length shouldBe 0
}

}

0 comments on commit 37e2e01

Please sign in to comment.