From 2d7aa0fc8d1c8de09b629274d55c057c6cc86905 Mon Sep 17 00:00:00 2001 From: f1yegor Date: Sun, 8 May 2016 01:09:40 +0200 Subject: [PATCH 1/4] Support for username / password credentials for Cassandra --- docs/docs/configuration.md | 4 ++++ .../config/CassandraConfiguration.scala | 8 ++++++++ .../scheduler/config/JobStatsModule.scala | 19 +++++++++++-------- .../scheduler/jobs/stats/JobStats.scala | 3 +++ 4 files changed, 26 insertions(+), 8 deletions(-) diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index f7938891c..32f025ee4 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -24,6 +24,10 @@ If you installed chronos via package, run `/usr/bin/chronos run_jar --help`. (default = metrics) --cassandra_port Port for Cassandra (default = 9042) + --cassandra_user User for Cassandra + (default = None) + --cassandra_password Password for Cassandra + (default = None) --cassandra_stat_count_table Table to track stat counts in Cassandra (default = chronos_stat_count) diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/config/CassandraConfiguration.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/config/CassandraConfiguration.scala index 8a0f37ca2..d04fe6716 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/config/CassandraConfiguration.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/config/CassandraConfiguration.scala @@ -12,6 +12,14 @@ trait CassandraConfiguration extends ScallopConf { descr = "Port for Cassandra", default = Some(9042)) + lazy val cassandraUser = opt[String]("cassandra_user", + descr = "User", + default = None) + + lazy val cassandraPassword = opt[String]("cassandra_password", + descr = "Password", + default = None) + lazy val cassandraKeyspace = opt[String]("cassandra_keyspace", descr = "Keyspace to use for Cassandra", default = Some("metrics")) diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/config/JobStatsModule.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/config/JobStatsModule.scala index 990172e07..5ad9ce622 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/config/JobStatsModule.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/config/JobStatsModule.scala @@ -16,14 +16,17 @@ class JobStatsModule(config: CassandraConfiguration) extends AbstractModule { def provideCassandraClusterBuilder() = { config.cassandraContactPoints.get match { case Some(contactPoints) => - Some( - Cluster.builder() - .addContactPoints(contactPoints.split(","): _*) - .withPort(config.cassandraPort()) - .withCompression(Compression.LZ4) - .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) - .withLoadBalancingPolicy(LatencyAwarePolicy.builder(new RoundRobinPolicy).build) - ) + var builder = Cluster.builder() + .addContactPoints(contactPoints.split(","): _*) + .withPort(config.cassandraPort()) + .withCompression(Compression.LZ4) + .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) + .withLoadBalancingPolicy(LatencyAwarePolicy.builder(new RoundRobinPolicy).build) + + if (config.cassandraUser.isDefined && config.cassandraPassword.isDefined) { + builder = builder.withCredentials(config.cassandraUser.get.orNull, config.cassandraPassword.get.orNull) + } + Some(builder) case _ => None } diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala index c936ac255..3354af0a2 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala @@ -360,6 +360,9 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan case Some(c) => try { val session = c.build.connect() + session.execute(new SimpleStatement( + s"CREATE KEYSPACE IF NOT EXISTS ${config.cassandraKeyspace()} WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" + )) session.execute(new SimpleStatement( s"USE ${config.cassandraKeyspace()};" )) From 56c2a355539c95c433adce0634194148e4763337 Mon Sep 17 00:00:00 2001 From: f1yegor Date: Sun, 8 May 2016 01:39:05 +0200 Subject: [PATCH 2/4] Support Cassandra 3.x --- pom.xml | 2 +- .../apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 11bc467d8..95e0ede51 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ 2.3.6 - 2.1.0 + 3.0.1 0.6.3 3.2 2.6.0 diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala index 3354af0a2..10aeb2cd1 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala @@ -190,7 +190,7 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan * @return updated TaskStat object */ private def updateTaskStat(taskStat: TaskStat, row: Row): TaskStat = { - val taskTimestamp = row.getDate(TIMESTAMP) + val taskTimestamp = row.getTimestamp(TIMESTAMP) val taskState = row.getString(TASK_STATE) if (taskState == TaskState.TASK_RUNNING.toString) { From 662361ab5c08e82ecab499793df9582f9a4d0f8a Mon Sep 17 00:00:00 2001 From: f1yegor Date: Fri, 2 Dec 2016 16:48:35 +0100 Subject: [PATCH 3/4] fix merge --- .../apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala index 67157ffe0..29f271ac1 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala @@ -340,7 +340,7 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan * @return updated TaskStat object */ private def updateTaskStat(taskStat: TaskStat, row: Row): TaskStat = { - val taskTimestamp = new Date(row.getTimestamp(TIMESTAMP)) + val taskTimestamp = new Date(row.getDate(TIMESTAMP).getMillisSinceEpoch) val taskState = row.getString(TASK_STATE) if (taskState == TaskState.TASK_RUNNING.toString) { From 77792edd9e43db4ec4a2cd53ca28bced2ea91158 Mon Sep 17 00:00:00 2001 From: f1yegor Date: Fri, 2 Dec 2016 17:10:57 +0100 Subject: [PATCH 4/4] create keyspace if not exists --- .../apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala index 29f271ac1..dfb83fbf4 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala @@ -164,6 +164,9 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan case Some(c) => try { val session = c.build.connect() + session.execute(new SimpleStatement( + s"CREATE KEYSPACE IF NOT EXISTS ${config.cassandraKeyspace()} WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" + )) session.execute(new SimpleStatement( s"USE ${config.cassandraKeyspace()};" ))