Skip to content
This repository has been archived by the owner on Oct 5, 2023. It is now read-only.

Commit

Permalink
Feature/add acquire host list interval support (#16)
Browse files Browse the repository at this point in the history
* support for acquireHostListInterval

* update version
  • Loading branch information
BCaxelbecker authored and hkernbach committed Jul 19, 2019
1 parent c9c9c27 commit 939ae89
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 6 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.arangodb</groupId>
<artifactId>arangodb-spark-connector_2.12</artifactId>
<version>1.0.10-SNAPSHOT</version>
<version>1.0.11-SNAPSHOT</version>
<inceptionYear>2016</inceptionYear>
<packaging>jar</packaging>

Expand Down Expand Up @@ -245,7 +245,7 @@
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-java-driver</artifactId>
<version>5.0.6</version>
<version>5.0.7</version>
</dependency>
<dependency>
<groupId>com.arangodb</groupId>
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/com/arangodb/spark/ArangoOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ trait ArangoOptions {

def acquireHostList: Option[Boolean] = None

def acquireHostListInterval: Option[Int] = None

def loadBalancingStrategy: Option[LoadBalancingStrategy] = None

}
6 changes: 5 additions & 1 deletion src/main/scala/com/arangodb/spark/ReadOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ case class ReadOptions(override val database: String = "_system",
override val protocol: Option[Protocol] = None,
override val maxConnections: Option[Int] = None,
override val acquireHostList: Option[Boolean] = None,
override val acquireHostListInterval: Option[Int] = None,
override val loadBalancingStrategy: Option[LoadBalancingStrategy] = None) extends ArangoOptions {

def this() = this(database = "_system")
Expand Down Expand Up @@ -71,6 +72,8 @@ case class ReadOptions(override val database: String = "_system",

def acquireHostList(acquireHostList: Boolean): ReadOptions = copy(acquireHostList = Some(acquireHostList))

def acquireHostListInterval(acquireHostListInterval: Int): ReadOptions = copy(acquireHostListInterval = Some(acquireHostListInterval))

def loadBalancingStrategy(loadBalancingStrategy: LoadBalancingStrategy): ReadOptions = copy(loadBalancingStrategy = Some(loadBalancingStrategy))

def copy(database: String = database,
Expand All @@ -86,8 +89,9 @@ case class ReadOptions(override val database: String = "_system",
protocol: Option[Protocol] = protocol,
maxConnections: Option[Int] = maxConnections,
acquireHostList: Option[Boolean] = acquireHostList,
acquireHostListInterval: Option[Int] = acquireHostListInterval,
loadBalancingStrategy: Option[LoadBalancingStrategy] = loadBalancingStrategy): ReadOptions = {
ReadOptions(database, collection, partitioner, hosts, user, password, useSsl, sslKeyStoreFile, sslPassPhrase, sslProtocol, protocol, maxConnections, acquireHostList, loadBalancingStrategy)
ReadOptions(database, collection, partitioner, hosts, user, password, useSsl, sslKeyStoreFile, sslPassPhrase, sslProtocol, protocol, maxConnections, acquireHostList, acquireHostListInterval, loadBalancingStrategy)
}

}
6 changes: 5 additions & 1 deletion src/main/scala/com/arangodb/spark/WriteOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ case class WriteOptions(override val database: String = "_system",
override val protocol: Option[Protocol] = None,
override val maxConnections: Option[Int] = None,
override val acquireHostList: Option[Boolean] = None,
override val acquireHostListInterval: Option[Int] = None,
override val loadBalancingStrategy: Option[LoadBalancingStrategy] = None) extends ArangoOptions {

def this() = this(database = "_system")
Expand All @@ -63,6 +64,8 @@ case class WriteOptions(override val database: String = "_system",

def acquireHostList(acquireHostList: Boolean): WriteOptions = copy(acquireHostList = Some(acquireHostList))

def acquireHostListInterval(acquireHostListInterval: Int): WriteOptions = copy(acquireHostListInterval = Some(acquireHostListInterval))

def loadBalancingStrategy(loadBalancingStrategy: LoadBalancingStrategy): WriteOptions = copy(loadBalancingStrategy = Some(loadBalancingStrategy))

def copy(database: String = database,
Expand All @@ -76,8 +79,9 @@ case class WriteOptions(override val database: String = "_system",
protocol: Option[Protocol] = protocol,
maxConnections: Option[Int] = maxConnections,
acquireHostList: Option[Boolean] = acquireHostList,
acquireHostListInterval: Option[Int] = acquireHostListInterval,
loadBalancingStrategy: Option[LoadBalancingStrategy] = loadBalancingStrategy): WriteOptions = {
WriteOptions(database, hosts, user, password, useSsl, sslKeyStoreFile, sslPassPhrase, sslProtocol, protocol, maxConnections, acquireHostList, loadBalancingStrategy)
WriteOptions(database, hosts, user, password, useSsl, sslKeyStoreFile, sslPassPhrase, sslProtocol, protocol, maxConnections, acquireHostList, acquireHostListInterval, loadBalancingStrategy)
}

}
4 changes: 4 additions & 0 deletions src/main/scala/com/arangodb/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ package object spark {
val PropertyProtocol = "arangodb.protocol"
val PropertyMaxConnections = "arangodb.maxConnections"
val PropertyAcquireHostList = "arangodb.acquireHostList"
val PropertyAcquireHostListInterval = "arangodb.acquireHostListInterval"
val PropertyLoadBalancingStrategy = "arangodb.loadBalancingStrategy"

private[spark] def createReadOptions(options: ReadOptions, sc: SparkConf): ReadOptions = {
Expand All @@ -60,6 +61,7 @@ package object spark {
protocol = options.protocol.orElse(some(Protocol.valueOf(sc.get(PropertyProtocol, "VST")))),
maxConnections = options.maxConnections.orElse(some(Try(sc.get(PropertyMaxConnections, null).toInt).getOrElse(1))),
acquireHostList = options.acquireHostList.orElse(some(Try(sc.get(PropertyAcquireHostList, null).toBoolean).getOrElse(false))),
acquireHostListInterval = options.acquireHostListInterval.orElse(some(Try(sc.get(PropertyAcquireHostListInterval, null).toInt).getOrElse(60000))),
loadBalancingStrategy = options.loadBalancingStrategy.orElse(some(LoadBalancingStrategy.valueOf(sc.get(PropertyLoadBalancingStrategy, "NONE")))))
}

Expand All @@ -75,6 +77,7 @@ package object spark {
protocol = options.protocol.orElse(some(Protocol.valueOf(sc.get(PropertyProtocol, "VST")))),
maxConnections = options.maxConnections.orElse(some(Try(sc.get(PropertyMaxConnections, null).toInt).getOrElse(1))),
acquireHostList = options.acquireHostList.orElse(some(Try(sc.get(PropertyAcquireHostList, null).toBoolean).getOrElse(false))),
acquireHostListInterval = options.acquireHostListInterval.orElse(some(Try(sc.get(PropertyAcquireHostListInterval, null).toInt).getOrElse(60000))),
loadBalancingStrategy = options.loadBalancingStrategy.orElse(some(LoadBalancingStrategy.valueOf(sc.get(PropertyLoadBalancingStrategy, "NONE")))))
}

Expand All @@ -91,6 +94,7 @@ package object spark {
options.protocol.foreach { builder.useProtocol(_) }
options.maxConnections.foreach { builder.maxConnections(_) }
options.acquireHostList.foreach { builder.acquireHostList(_) }
options.acquireHostListInterval.foreach { builder.acquireHostListInterval(_) }
options.loadBalancingStrategy.foreach { builder.loadBalancingStrategy(_) }
builder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,17 @@ public void loadAllWithHTTP() {
public void loadAllWithLoadBalancing() {
// set acquireHostList to false, due our tests are running inside a nested docker container. Settings this option to true will result in wrong ports beeing used.
// So we need to set those settings explicitly inside: 'src/test/resources/arangodb.properties' file
ArangoJavaRDD<TestJavaEntity> rdd = ArangoSpark.load(sc, COLLECTION, new ReadOptions().user("root").password("test").database(DB).acquireHostList(false).loadBalancingStrategy(LoadBalancingStrategy.ROUND_ROBIN), TestJavaEntity.class);
ArangoJavaRDD<TestJavaEntity> rdd = ArangoSpark.load(
sc,
COLLECTION,
new ReadOptions()
.user("root")
.password("test")
.database(DB)
.acquireHostList(false)
.loadBalancingStrategy(LoadBalancingStrategy.ROUND_ROBIN), TestJavaEntity.class);

assertThat(rdd.count(), is(100L));
}

}

0 comments on commit 939ae89

Please sign in to comment.