Skip to content

Commit

Permalink
Close read connection after write (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyprime committed Feb 10, 2022
1 parent 67daea2 commit cd86d87
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ class DSWriter(config: WriteConfig, uniqueId: String, pipeFactory: VerticaPipeFa
}

def commitRows(): ConnectorResult[Unit] = {
pipe.commit()
val ret = pipe.commit()
// Ensure all connections are closed, including read connections used by the write operation
val _ = pipeFactory.closeJdbcLayers()
ret
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ trait VerticaPipeFactoryInterface {
def getReadPipe(config: ReadConfig): VerticaPipeInterface with VerticaPipeReadInterface

def getWritePipe(config: WriteConfig): VerticaPipeInterface with VerticaPipeWriteInterface

def closeJdbcLayers()
}

/**
Expand Down Expand Up @@ -82,4 +84,15 @@ object VerticaPipeFactory extends VerticaPipeFactoryInterface {
}
}

override def closeJdbcLayers(): Unit = {
readLayer match {
case Some(layer) => val _ = layer.close
case None =>
}
writeLayer match {
case Some(layer) => val _ = layer.close
case None =>
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class DSWriterTest extends AnyFlatSpec with BeforeAndAfterAll with MockFactory {
(pipe.commit _).expects().returning(Right())
val pipeFactory = mock[VerticaPipeFactoryInterface]
(pipeFactory.getWritePipe _).expects(*).returning(pipe)
(pipeFactory.closeJdbcLayers _).expects().returning(())

val writer = new DSWriter(config, "unique-id", pipeFactory)
checkResult(writer.commitRows())
Expand Down

0 comments on commit cd86d87

Please sign in to comment.