Skip to content

Commit

Permalink
add arguments file is boolean,path is srting (#113)
Browse files Browse the repository at this point in the history
* add arguments for PARQUET、ORC、JSON、CSV ,In order to realize parameterized file path。

* -p is empty, throw exception

* change variable name,  `file` to `variable`, `path` to `param`, and format code

* add  test case for the variable path
  • Loading branch information
Codelone authored Feb 6, 2023
1 parent bb1389f commit 2d712d9
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,6 @@ final case class Argument(config: String = "application.conf",
hive: Boolean = false,
directly: Boolean = false,
dry: Boolean = false,
reload: String = "")
reload: String = "",
variable: Boolean = false,
param: String = "")
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,16 @@ object Configs {
* @param configPath
* @return
*/
def parse(configPath: String): Configs = {
def parse(configPath: String, variable: Boolean = false, param: String = ""): Configs = {
var config: Config = null
var paths: Map[String,String] = null
if (variable) {
if (param.isEmpty) throw new IllegalArgumentException(s"-p must to set ")
paths = param.split(",").map(path => {
val kv = path.split("=")
(kv(0), kv(1))
}).toMap
}
if (configPath.startsWith("hdfs://")) {
val hadoopConfig: Configuration = new Configuration()
val fs: FileSystem = org.apache.hadoop.fs.FileSystem.get(hadoopConfig)
Expand Down Expand Up @@ -393,7 +401,7 @@ object Configs {
}

val sourceCategory = toSourceCategory(tagConfig.getString("type.source"))
val sourceConfig = dataSourceConfig(sourceCategory, tagConfig, nebulaConfig)
val sourceConfig = dataSourceConfig(sourceCategory, tagConfig, nebulaConfig, variable, paths)
LOG.info(s"Source Config ${sourceConfig}")
hasKafka = sourceCategory == SourceCategory.KAFKA

Expand Down Expand Up @@ -457,7 +465,7 @@ object Configs {
edgeConfig.hasPath("longitude")

val sourceCategory = toSourceCategory(edgeConfig.getString("type.source"))
val sourceConfig = dataSourceConfig(sourceCategory, edgeConfig, nebulaConfig)
val sourceConfig = dataSourceConfig(sourceCategory, edgeConfig, nebulaConfig, variable, paths)
LOG.info(s"Source Config ${sourceConfig}")
hasKafka = sourceCategory == SourceCategory.KAFKA

Expand Down Expand Up @@ -616,14 +624,19 @@ object Configs {
*/
private[this] def dataSourceConfig(category: SourceCategory.Value,
config: Config,
nebulaConfig: Config): DataSourceConfigEntry = {
nebulaConfig: Config,
variable: Boolean,
paths: Map[String,String]): DataSourceConfigEntry = {
category match {
case SourceCategory.PARQUET =>
FileBaseSourceConfigEntry(SourceCategory.PARQUET, config.getString("path"))
if (variable) FileBaseSourceConfigEntry(SourceCategory.PARQUET, paths(config.getString("path")))
else FileBaseSourceConfigEntry(SourceCategory.PARQUET, config.getString("path"))
case SourceCategory.ORC =>
FileBaseSourceConfigEntry(SourceCategory.ORC, config.getString("path"))
if (variable) FileBaseSourceConfigEntry(SourceCategory.ORC, paths(config.getString("path")))
else FileBaseSourceConfigEntry(SourceCategory.ORC, config.getString("path"))
case SourceCategory.JSON =>
FileBaseSourceConfigEntry(SourceCategory.JSON, config.getString("path"))
if (variable) FileBaseSourceConfigEntry(SourceCategory.JSON, paths(config.getString("path")))
else FileBaseSourceConfigEntry(SourceCategory.JSON, config.getString("path"))
case SourceCategory.CSV =>
val separator =
if (config.hasPath("separator"))
Expand All @@ -634,10 +647,16 @@ object Configs {
config.getBoolean("header")
else
false
FileBaseSourceConfigEntry(SourceCategory.CSV,
config.getString("path"),
Some(separator),
Some(header))
if (variable)
FileBaseSourceConfigEntry(SourceCategory.CSV,
paths(config.getString("path")),
Some(separator),
Some(header))
else
FileBaseSourceConfigEntry(SourceCategory.CSV,
config.getString("path"),
Some(separator),
Some(header))
case SourceCategory.HIVE =>
HiveSourceConfigEntry(SourceCategory.HIVE, config.getString("exec"))
case SourceCategory.NEO4J =>
Expand Down Expand Up @@ -968,6 +987,16 @@ object Configs {
.valueName("<path>")
.action((x, c) => c.copy(reload = x))
.text("reload path")

opt[Unit]('v', "variable")
.action((_, c) => c.copy(variable = true))
.text("enable file param")

opt[String]('p', "param")
.valueName("<param>")
.action((x, c) => c.copy(param = x))
.text("file param path")

}
parser.parse(args, Argument())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,66 @@ class ConfigsSuite {
}
}


@Test
def configsWithVariableSuite(): Unit = {
val args = List("-c", "src/test/resources/application.conf", "-v", "-p", "path0=/app/test1.parquet,path1=/app/test2.csv,path2=/app/test2.json,path3=/app/test3.json")
val options = Configs.parser(args.toArray, "test")
val c: Argument = options match {
case Some(config) => config
case _ =>
assert(false)
sys.exit(-1)
}
assert(c.variable)

val configs = Configs.parse(c.config, c.variable, c.param)
val tagsConfig = configs.tagsConfig
val edgesConfig = configs.edgesConfig
for (tagConfig <- tagsConfig) {
val source = tagConfig.dataSourceConfigEntry

source.category match {
case SourceCategory.CSV => {
val csv = tagConfig.dataSourceConfigEntry.asInstanceOf[FileBaseSourceConfigEntry]
assert(csv.path.equals("/app/test2.csv"))
}
case SourceCategory.JSON => {
val json = tagConfig.dataSourceConfigEntry.asInstanceOf[FileDataSourceConfigEntry]
assert(json.path.equals("/app/test3.json"))
}
case SourceCategory.PARQUET => {
val parquet = tagConfig.dataSourceConfigEntry.asInstanceOf[FileDataSourceConfigEntry]
assert(parquet.path.equals("/app/test1.parquet"))
}

case _ => {}
}
}

for (edgeConfig <- edgesConfig) {
val source = edgeConfig.dataSourceConfigEntry
val sink = edgeConfig.dataSinkConfigEntry
assert(sink.category == SinkCategory.CLIENT || sink.category == SinkCategory.SST)

source.category match {
case SourceCategory.CSV => {
val csv = edgeConfig.dataSourceConfigEntry.asInstanceOf[FileBaseSourceConfigEntry]
assert(csv.path.equals("/app/test2.csv"))
}
case SourceCategory.JSON => {
val json = edgeConfig.dataSourceConfigEntry.asInstanceOf[FileDataSourceConfigEntry]
assert(json.path.equals("/app/test2.json"))
}
case SourceCategory.PARQUET => {
val parquet = edgeConfig.dataSourceConfigEntry.asInstanceOf[FileDataSourceConfigEntry]
assert(parquet.path.equals("/app/test1.parquet"))
}
case _ => {}
}
}

}
/**
* correct com.vesoft.exchange.common.config
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object Exchange {
sys.exit(-1)
}

val configs = Configs.parse(c.config)
val configs = Configs.parse(c.config, c.variable, c.param)
LOG.info(s"Config ${configs}")

val session = SparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object Exchange {
sys.exit(-1)
}

val configs = Configs.parse(c.config)
val configs = Configs.parse(c.config, c.variable, c.param)
LOG.info(s"Config ${configs}")

val session = SparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object Exchange {
sys.exit(-1)
}

val configs = Configs.parse(c.config)
val configs = Configs.parse(c.config, c.variable, c.param)
LOG.info(s"Config ${configs}")

val session = SparkSession
Expand Down

0 comments on commit 2d712d9

Please sign in to comment.