From f4a4f05180583fbfd4f52a53960f992d094038eb Mon Sep 17 00:00:00 2001 From: Rob Title Date: Tue, 12 Feb 2019 22:38:58 -0500 Subject: [PATCH 1/2] More efficient database unmarshaling --- project/Dependencies.scala | 5 +-- .../dao/google/HttpGoogleDataprocDAO.scala | 2 +- .../leonardo/db/ClusterComponent.scala | 32 +++++++++++++++---- .../leonardo/dns/ClusterDnsCache.scala | 2 +- .../monitor/ZombieClusterMonitor.scala | 4 +-- .../leonardo/service/LeonardoService.scala | 4 +-- .../leonardo/ClusterEnrichments.scala | 2 +- .../leonardo/db/ClusterComponentSpec.scala | 12 +++++++ 8 files changed, 47 insertions(+), 16 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f0bbd6e5136..509b93bf2b7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -8,8 +8,9 @@ object Dependencies { val scalaLoggingV = "3.9.0" val scalaTestV = "3.0.5" val slickV = "3.2.3" + val catsV = "1.3.1" - val workbenchUtilV = "0.3-0e9d080" + val workbenchUtilV = "0.5-6942040" val workbenchModelV = "0.11-2bddd5b" val workbenchGoogleV = "0.16-4fe117d" val workbenchMetricsV = "0.3-c5b80d2" @@ -48,7 +49,7 @@ object Dependencies { val scalaLogging: ModuleID = "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV val swaggerUi: ModuleID = "org.webjars" % "swagger-ui" % "2.2.5" val ficus: ModuleID = "com.iheart" %% "ficus" % "1.4.3" - val cats: ModuleID = "org.typelevel" %% "cats" % "0.9.0" + val cats: ModuleID = "org.typelevel" %% "cats-core" % catsV val httpClient: ModuleID = "org.apache.httpcomponents" % "httpclient" % "4.5.5" // upgrading a transitive dependency to avoid security warnings val enumeratum: ModuleID = "com.beachape" %% "enumeratum" % "1.5.13" diff --git a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/dao/google/HttpGoogleDataprocDAO.scala b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/dao/google/HttpGoogleDataprocDAO.scala index ccee07cb172..de47c6be98a 100644 --- a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/dao/google/HttpGoogleDataprocDAO.scala +++ b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/dao/google/HttpGoogleDataprocDAO.scala @@ -97,7 +97,7 @@ class HttpGoogleDataprocDAO(appName: String, override def getClusterStatus(googleProject: GoogleProject, clusterName: ClusterName): Future[ClusterStatus] = { val transformed = for { cluster <- OptionT(getCluster(googleProject, clusterName)) - status <- OptionT.pure[Future, ClusterStatus]( + status <- OptionT.pure[Future]( Try(ClusterStatus.withNameInsensitive(cluster.getStatus.getState)).toOption.getOrElse(ClusterStatus.Unknown)) } yield status diff --git a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala index 3fe342afc4b..68679a2d141 100644 --- a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala +++ b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala @@ -4,6 +4,7 @@ import java.time.Instant import java.sql.Timestamp import java.util.UUID +import cats.data.Chain import cats.implicits._ import org.broadinstitute.dsde.workbench.leonardo.model.Cluster.LabelMap import org.broadinstitute.dsde.workbench.leonardo.model._ @@ -193,6 +194,19 @@ trait ClusterComponent extends LeoComponent { } } + def getActiveClusterForDnsCache(project: GoogleProject, name: ClusterName): DBIO[Option[Cluster]] = { + clusterQuery + .filter { _.googleProject === project.value } + .filter { _.clusterName === name.value } + .filter { _.destroyedDate === Timestamp.from(dummyDate) } + .result + .map { recs => + recs.headOption.map { clusterRec => + unmarshalCluster(clusterRec, Seq.empty, List.empty, Map.empty, List.empty, List.empty, List.empty) + } + } + } + def getClusterById(id: Long): DBIO[Option[Cluster]] = { fullClusterQuery.filter { _._1.id === id }.result map { recs => unmarshalFullCluster(recs).headOption @@ -404,32 +418,36 @@ trait ClusterComponent extends LeoComponent { private def unmarshalMinimalCluster(clusterLabels: Seq[(ClusterRecord, Option[LabelRecord])]): Seq[Cluster] = { // Call foldMap to aggregate a Seq[(ClusterRecord, LabelRecord)] returned by the query to a Map[ClusterRecord, Map[labelKey, labelValue]]. - val clusterLabelMap: Map[ClusterRecord, Map[String, List[String]]] = clusterLabels.toList.foldMap { case (clusterRecord, labelRecordOpt) => - val labelMap = labelRecordOpt.map(labelRecordOpt => labelRecordOpt.key -> List(labelRecordOpt.value)).toMap + // Note we use Chain instead of List inside the foldMap because the Chain monoid is much more efficient than the List monoid. + // See: https://typelevel.org/cats/datatypes/chain.html + val clusterLabelMap: Map[ClusterRecord, Map[String, Chain[String]]] = clusterLabels.toList.foldMap { case (clusterRecord, labelRecordOpt) => + val labelMap = labelRecordOpt.map(labelRecord => labelRecord.key -> Chain(labelRecord.value)).toMap Map(clusterRecord -> labelMap) } // Unmarshal each (ClusterRecord, Map[labelKey, labelValue]) to a Cluster object clusterLabelMap.map { case (clusterRec, labelMap) => - unmarshalCluster(clusterRec, Seq.empty, List.empty, labelMap.mapValues(_.toSet.head), List.empty, List.empty, List.empty) + unmarshalCluster(clusterRec, Seq.empty, List.empty, labelMap.mapValues(_.toList.toSet.head), List.empty, List.empty, List.empty) }.toSeq } private def unmarshalFullCluster(clusterRecords: Seq[(ClusterRecord, Option[InstanceRecord], Option[ClusterErrorRecord], Option[LabelRecord], Option[ExtensionRecord], Option[ClusterImageRecord], Option[ScopeRecord])]): Seq[Cluster] = { // Call foldMap to aggregate a flat sequence of (cluster, instance, label) triples returned by the query // to a grouped (cluster -> (instances, labels)) structure. - val clusterRecordMap: Map[ClusterRecord, (List[InstanceRecord], List[ClusterErrorRecord], Map[String, List[String]], List[ExtensionRecord], List[ClusterImageRecord], List[ScopeRecord])] = clusterRecords.toList.foldMap { case (clusterRecord, instanceRecordOpt, errorRecordOpt, labelRecordOpt, extensionOpt, clusterImageOpt, scopeOpt) => + // Note we use Chain instead of List inside the foldMap because the Chain monoid is much more efficient than the List monoid. + // See: https://typelevel.org/cats/datatypes/chain.html + val clusterRecordMap: Map[ClusterRecord, (Chain[InstanceRecord], Chain[ClusterErrorRecord], Map[String, Chain[String]], Chain[ExtensionRecord], Chain[ClusterImageRecord], Chain[ScopeRecord])] = clusterRecords.toList.foldMap { case (clusterRecord, instanceRecordOpt, errorRecordOpt, labelRecordOpt, extensionOpt, clusterImageOpt, scopeOpt) => val instanceList = instanceRecordOpt.toList - val labelMap = labelRecordOpt.map(labelRecordOpt => labelRecordOpt.key -> List(labelRecordOpt.value)).toMap + val labelMap = labelRecordOpt.map(labelRecordOpt => labelRecordOpt.key -> Chain(labelRecordOpt.value)).toMap val errorList = errorRecordOpt.toList val extList = extensionOpt.toList val clusterImageList = clusterImageOpt.toList val scopeList = scopeOpt.toList - Map(clusterRecord -> (instanceList, errorList, labelMap, extList, clusterImageList, scopeList)) + Map(clusterRecord -> (Chain.fromSeq(instanceList), Chain.fromSeq(errorList), labelMap, Chain.fromSeq(extList), Chain.fromSeq(clusterImageList), Chain.fromSeq(scopeList))) } clusterRecordMap.map { case (clusterRecord, (instanceRecords, errorRecords, labels, extensions, clusterImages, scopes)) => - unmarshalCluster(clusterRecord, instanceRecords.toSet.toSeq, errorRecords.groupBy(_.timestamp).map(_._2.head).toList, labels.mapValues(_.toSet.head), extensions, clusterImages.toSet.toList, scopes) + unmarshalCluster(clusterRecord, instanceRecords.toList, errorRecords.toList.groupBy(_.timestamp).map(_._2.head).toList, labels.mapValues(_.toList.toSet.head), extensions.toList, clusterImages.toList, scopes.toList) }.toSeq } diff --git a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/dns/ClusterDnsCache.scala b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/dns/ClusterDnsCache.scala index 8e610942b11..360284fc0c0 100644 --- a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/dns/ClusterDnsCache.scala +++ b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/dns/ClusterDnsCache.scala @@ -60,7 +60,7 @@ class ClusterDnsCache(proxyConfig: ProxyConfig, dbRef: DbReference, dnsCacheConf def load(key: DnsCacheKey) = { logger.debug(s"DNS Cache miss for ${key.clusterName} / ${key.clusterName}...loading from DB...") dbRef - .inTransaction { _.clusterQuery.getActiveClusterByName(key.googleProject, key.clusterName) } + .inTransaction { _.clusterQuery.getActiveClusterForDnsCache(key.googleProject, key.clusterName) } .map { case Some(cluster) => getHostStatusAndUpdateHostToIpIfHostReady(cluster) case None => HostNotFound diff --git a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/ZombieClusterMonitor.scala b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/ZombieClusterMonitor.scala index 8ce816f01b7..e0446e93ad3 100644 --- a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/ZombieClusterMonitor.scala +++ b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/ZombieClusterMonitor.scala @@ -86,8 +86,8 @@ class ZombieClusterMonitor(config: ZombieClusterConfig, gdDAO: GoogleDataprocDAO private def isProjectActiveInGoogle(googleProject: GoogleProject): Future[Boolean] = { // Check the project and its billing info - (googleProjectDAO.isProjectActive(googleProject.value) |@| googleProjectDAO.isBillingActive(googleProject.value)) - .map(_ && _) + (googleProjectDAO.isProjectActive(googleProject.value), googleProjectDAO.isBillingActive(googleProject.value)) + .mapN(_ && _) .recover { case e => logger.warn(s"Unable to check status of project ${googleProject.value} for zombie cluster detection", e) true diff --git a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/service/LeonardoService.scala b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/service/LeonardoService.scala index 6b98e8f4c1f..69584efb456 100644 --- a/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/service/LeonardoService.scala +++ b/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/service/LeonardoService.scala @@ -728,7 +728,7 @@ class LeonardoService(protected val dataprocConfig: DataprocConfig, // Validate the user script URI _ <- clusterRequest.jupyterUserScriptUri match { case Some(userScriptUri) => OptionT.liftF[Future, Unit](validateBucketObjectUri(userEmail, petToken, userScriptUri.toUri)) - case None => OptionT.pure[Future, Unit](()) + case None => OptionT.pure[Future](()) } // Validate the extension URIs @@ -736,7 +736,7 @@ class LeonardoService(protected val dataprocConfig: DataprocConfig, case Some(config) => val extensionsToValidate = (config.nbExtensions.values ++ config.serverExtensions.values ++ config.combinedExtensions.values).filter(_.startsWith("gs://")) OptionT.liftF(Future.traverse(extensionsToValidate)(x => validateBucketObjectUri(userEmail, petToken, x))) - case None => OptionT.pure[Future, Unit](()) + case None => OptionT.pure[Future](()) } } yield () diff --git a/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/ClusterEnrichments.scala b/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/ClusterEnrichments.scala index 766b877d912..6c5b0b705a2 100644 --- a/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/ClusterEnrichments.scala +++ b/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/ClusterEnrichments.scala @@ -52,7 +52,7 @@ object ClusterEnrichments { fcs1 == fcs2 } - def stripFieldsForListCluster(cluster: Cluster): Cluster = { + def stripFieldsForListCluster: Cluster => Cluster = { cluster => cluster.copy( instances = Set.empty, clusterImages = Set.empty, diff --git a/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponentSpec.scala b/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponentSpec.scala index c795d0d059b..01f22751f63 100644 --- a/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponentSpec.scala +++ b/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponentSpec.scala @@ -209,4 +209,16 @@ class ClusterComponentSpec extends TestComponent with FlatSpecLike with CommonTe dbFutureValue { _.clusterQuery.listByLabels(Map("a" -> "b"), true, Some(project)) }.toSet shouldEqual Set(savedCluster3).map(stripFieldsForListCluster) dbFutureValue { _.clusterQuery.listByLabels(Map("a" -> "b"), true, Some(project2)) }.toSet shouldEqual Set.empty[Cluster] } + + it should "get for dns cache" in isolatedDbTest { + val savedCluster1 = makeCluster(1) + .copy( + labels = Map("bam" -> "yes", "vcf" -> "no", "foo" -> "bar"), + instances = Set(masterInstance, workerInstance1, workerInstance2)) + .save(Some(serviceAccountKey.id)) + + // Result should not include labels or instances + dbFutureValue { _.clusterQuery.getActiveClusterForDnsCache(savedCluster1.googleProject, savedCluster1.clusterName) } shouldEqual + Some(savedCluster1).map(stripFieldsForListCluster andThen (_.copy(labels = Map.empty))) + } } From 7ac341db49fb87a374bb0e962bb14693add92e4c Mon Sep 17 00:00:00 2001 From: Rob Title Date: Wed, 13 Feb 2019 14:22:36 -0500 Subject: [PATCH 2/2] Update workbenchGoogle to the latest --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 509b93bf2b7..31055b23bbd 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -12,7 +12,7 @@ object Dependencies { val workbenchUtilV = "0.5-6942040" val workbenchModelV = "0.11-2bddd5b" - val workbenchGoogleV = "0.16-4fe117d" + val workbenchGoogleV = "0.18-6942040" val workbenchMetricsV = "0.3-c5b80d2" val samV = "1.0-5cdffb4"