diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index 27a2738d1..518534ae4 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -17,6 +17,10 @@ If you installed chronos via package, run `/usr/bin/chronos run_jar --help`. points for Cassandra --cassandra_keyspace Keyspace to use for Cassandra --cassandra_port Port for Cassandra +--cassandra_user User for Cassandra + (default = None) +--cassandra_password Password for Cassandra + (default = None) --cassandra_stat_count_table Table to track stat counts in Cassandra --cassandra_table Table to use for Cassandra diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/config/CassandraConfiguration.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/config/CassandraConfiguration.scala index 8a0f37ca2..d04fe6716 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/config/CassandraConfiguration.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/config/CassandraConfiguration.scala @@ -12,6 +12,14 @@ trait CassandraConfiguration extends ScallopConf { descr = "Port for Cassandra", default = Some(9042)) + lazy val cassandraUser = opt[String]("cassandra_user", + descr = "User", + default = None) + + lazy val cassandraPassword = opt[String]("cassandra_password", + descr = "Password", + default = None) + lazy val cassandraKeyspace = opt[String]("cassandra_keyspace", descr = "Keyspace to use for Cassandra", default = Some("metrics")) diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/config/JobStatsModule.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/config/JobStatsModule.scala index 58c844509..f4643d347 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/config/JobStatsModule.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/config/JobStatsModule.scala @@ -16,14 +16,17 @@ class JobStatsModule(config: CassandraConfiguration) extends AbstractModule { def provideCassandraClusterBuilder(): Option[Cluster.Builder] = { config.cassandraContactPoints.get match { case Some(contactPoints) => - Some( - Cluster.builder() - .addContactPoints(contactPoints.split(","): _*) - .withPort(config.cassandraPort()) - .withCompression(Compression.LZ4) - .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) - .withLoadBalancingPolicy(LatencyAwarePolicy.builder(new RoundRobinPolicy).build) - ) + var builder = Cluster.builder() + .addContactPoints(contactPoints.split(","): _*) + .withPort(config.cassandraPort()) + .withCompression(Compression.LZ4) + .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) + .withLoadBalancingPolicy(LatencyAwarePolicy.builder(new RoundRobinPolicy).build) + + if (config.cassandraUser.isDefined && config.cassandraPassword.isDefined) { + builder = builder.withCredentials(config.cassandraUser.get.orNull, config.cassandraPassword.get.orNull) + } + Some(builder) case _ => None } diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala index 29f271ac1..dfb83fbf4 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala @@ -164,6 +164,9 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan case Some(c) => try { val session = c.build.connect() + session.execute(new SimpleStatement( + s"CREATE KEYSPACE IF NOT EXISTS ${config.cassandraKeyspace()} WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" + )) session.execute(new SimpleStatement( s"USE ${config.cassandraKeyspace()};" ))