Skip to content

Commit

Permalink
Refactored JDBC layer check (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyprime committed Feb 10, 2022
1 parent 71b1e9c commit 881d329
Showing 1 changed file with 12 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ object VerticaPipeFactory extends VerticaPipeFactoryInterface {
private var readLayer: Option[VerticaJdbcLayer] = None
private var writeLayer: Option[VerticaJdbcLayer] = None

private def checkJdbcLayer(jdbcLayer: Option[VerticaJdbcLayer], jdbcConfig: JDBCConfig): Option[VerticaJdbcLayer] = {
jdbcLayer match {
case Some(layer) => if (layer.isClosed()) Some(new VerticaJdbcLayer(jdbcConfig)) else jdbcLayer
case None => Some(new VerticaJdbcLayer(jdbcConfig))
}
}

override def getReadPipe(config: ReadConfig): VerticaPipeInterface with VerticaPipeReadInterface = {
config match {
case cfg: DistributedFilesystemReadConfig =>
Expand All @@ -52,20 +59,9 @@ object VerticaPipeFactory extends VerticaPipeFactoryInterface {
}
case _ => None
})
val jdbcLayer = readLayer match {
case Some(layer) => {
if (layer.isClosed()) {
readLayer = Some(new VerticaJdbcLayer(cfg.jdbcConfig))
}
readLayer.get
}
case None => {
readLayer = Some(new VerticaJdbcLayer(cfg.jdbcConfig))
readLayer.get
}
}
readLayer = checkJdbcLayer(readLayer, cfg.jdbcConfig)
new VerticaDistributedFilesystemReadPipe(cfg, hadoopFileStoreLayer,
jdbcLayer,
readLayer.get,
new SchemaTools,
new CleanupUtils
)
Expand All @@ -76,23 +72,12 @@ object VerticaPipeFactory extends VerticaPipeFactoryInterface {
config match {
case cfg: DistributedFilesystemWriteConfig =>
val schemaTools = new SchemaTools
val jdbcLayer = writeLayer match {
case Some(layer) => {
if (layer.isClosed()) {
writeLayer = Some(new VerticaJdbcLayer(cfg.jdbcConfig))
}
writeLayer.get
}
case None => {
writeLayer = Some(new VerticaJdbcLayer(cfg.jdbcConfig))
writeLayer.get
}
}
writeLayer = checkJdbcLayer(writeLayer, cfg.jdbcConfig)
new VerticaDistributedFilesystemWritePipe(cfg,
new HadoopFileStoreLayer(cfg.fileStoreConfig, Some(cfg.schema)),
jdbcLayer,
writeLayer.get,
schemaTools,
new TableUtils(schemaTools, jdbcLayer)
new TableUtils(schemaTools, writeLayer.get)
)
}
}
Expand Down

0 comments on commit 881d329

Please sign in to comment.