Skip to content

Commit

Permalink
Remote scheme provider (#53)
Browse files Browse the repository at this point in the history
* Allows loading schema from files
* Adds a remote schema provider based on cluster metadata
* Adds tests and improves schema provider
* Removes hardcoded schema data and improves tests
* Adds the metadata schema provider tests
* Moves reserved keywords to TestUtils
  • Loading branch information
Fede Fernández authored Aug 29, 2017
1 parent 9cfe86e commit 76c52cb
Show file tree
Hide file tree
Showing 16 changed files with 1,353 additions and 165 deletions.
7 changes: 7 additions & 0 deletions core/src/main/scala/schema/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ package object schema {
case class SchemaDefinitionProviderError(msg: String, maybeCause: Option[Throwable] = None)
extends SchemaError(msg, maybeCause)

object SchemaDefinitionProviderError {
def apply(e: Throwable): SchemaDefinitionProviderError =
SchemaDefinitionProviderError(e.getMessage, Some(e))
}

type SchemaResult[T] = Either[SchemaDefinitionProviderError, T]

case class SchemaValidatorError(msg: String, maybeCause: Option[Throwable] = None)
extends SchemaError(msg, maybeCause)

Expand Down
97 changes: 97 additions & 0 deletions core/src/main/scala/schema/provider/MetadataSchemaProvider.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package freestyle.cassandra
package schema.provider

import cats.implicits._
import cats.{~>, MonadError}
import com.datastax.driver.core._
import freestyle.{FreeS, _}
import freestyle.cassandra.schema.provider.metadata.SchemaConversions
import freestyle.cassandra.schema.{SchemaDefinition, SchemaDefinitionProviderError, SchemaResult}
import troy.cql.ast.DataDefinition

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.language.postfixOps

class MetadataSchemaProvider(cluster: Cluster)
extends SchemaDefinitionProvider
with SchemaConversions {

def extractTables(keyspaceMetadata: KeyspaceMetadata): List[AbstractTableMetadata] =
keyspaceMetadata.getTables.asScala.toList

def extractIndexes(tableMetadataList: List[AbstractTableMetadata]): List[IndexMetadata] =
tableMetadataList.flatMap {
case (t: TableMetadata) => t.getIndexes.asScala.toList
case _ => Nil
}

def extractUserTypes(keyspaceMetadata: KeyspaceMetadata): List[UserType] =
keyspaceMetadata.getUserTypes.asScala.toList

override def schemaDefinition: SchemaResult[SchemaDefinition] = {

import freestyle.async.implicits._
import freestyle.cassandra.api._
import freestyle.cassandra.handlers.implicits._

import scala.concurrent.ExecutionContext.Implicits.global

implicit val clusterAPIInterpreter: ClusterAPI.Op ~> Future =
clusterAPIHandler[Future] andThen apiInterpreter[Future, Cluster](cluster)

def guarantee[F[_], A](fa: F[A], finalizer: F[Unit])(
implicit M: MonadError[F, Throwable]): F[A] =
M.flatMap(M.attempt(fa)) { e =>
M.flatMap(finalizer)(_ => e.fold(M.raiseError, M.pure))
}

def metadataF[F[_]](implicit clusterAPI: ClusterAPI[F]): FreeS[F, Metadata] =
clusterAPI.connect *> clusterAPI.metadata

def closeF[F[_]](implicit clusterAPI: ClusterAPI[F]): FreeS[F, Unit] =
clusterAPI.close

val fut: Future[SchemaResult[SchemaDefinition]] =
guarantee(
metadataF[ClusterAPI.Op].interpret[Future],
closeF[ClusterAPI.Op].interpret[Future]).attempt.map {
_.leftMap(SchemaDefinitionProviderError(_)) flatMap { metadata =>
val keyspaceList: List[KeyspaceMetadata] = metadata.getKeyspaces.asScala.toList
val tableList: List[AbstractTableMetadata] = keyspaceList.flatMap(extractTables)
val indexList: List[IndexMetadata] = extractIndexes(tableList)
val userTypeList: List[UserType] = keyspaceList.flatMap(extractUserTypes)

keyspaceList.traverse[SchemaResult, DataDefinition](toCreateKeyspace) |+|
tableList.traverse(toCreateTable) |+|
indexList.traverse(toCreateIndex(_)) |+|
userTypeList.traverse(toUserType)
}
}
Await.result(fut, 10.seconds)
}
}

object MetadataSchemaProvider {

implicit def metadataSchemaProvider(implicit cluster: Cluster): SchemaDefinitionProvider =
new MetadataSchemaProvider(cluster)

}
37 changes: 28 additions & 9 deletions core/src/main/scala/schema/provider/TroySchemaProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,37 @@
package freestyle.cassandra
package schema.provider

import java.io.InputStream

import cats.syntax.either._
import freestyle.cassandra.schema._
import troy.cql.ast.CqlParser

class TroySchemaProvider(cql: String) extends SchemaDefinitionProvider {
class TroySchemaProvider(cqlF: => SchemaResult[String]) extends SchemaDefinitionProvider {

override def schemaDefinition: Either[SchemaDefinitionProviderError, SchemaDefinition] =
CqlParser.parseSchema(cql) match {
case CqlParser.Success(res, _) => Right(res)
case CqlParser.Failure(msg, next) =>
Left(
SchemaDefinitionProviderError(
s"Parse Failure: $msg, line = ${next.pos.line}, column = ${next.pos.column}"))
case CqlParser.Error(msg, _) => Left(SchemaDefinitionProviderError(msg))
override def schemaDefinition: SchemaResult[SchemaDefinition] =
cqlF.flatMap { cql =>
CqlParser.parseSchema(cql) match {
case CqlParser.Success(res, _) => Right(res)
case CqlParser.Failure(msg, next) =>
Left(
SchemaDefinitionProviderError(
s"Parse Failure: $msg, line = ${next.pos.line}, column = ${next.pos.column}"))
case CqlParser.Error(msg, _) => Left(SchemaDefinitionProviderError(msg))
}
}
}

object TroySchemaProvider {

def apply(cql: String): TroySchemaProvider = new TroySchemaProvider(Right(cql))

def apply(is: InputStream): TroySchemaProvider = new TroySchemaProvider(
Either.catchNonFatal {
scala.io.Source.fromInputStream(is).mkString
} leftMap { e =>
SchemaDefinitionProviderError(e.getMessage, Some(e))
}
)

}
200 changes: 200 additions & 0 deletions core/src/main/scala/schema/provider/metadata/SchemaConversions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package freestyle.cassandra
package schema.provider.metadata

import cats.implicits._
import com.datastax.driver.core.{
AbstractTableMetadata,
ColumnMetadata,
IndexMetadata,
KeyspaceMetadata,
TupleType,
UserType,
DataType => DatastaxDataType
}
import freestyle.cassandra.schema.{SchemaDefinitionProviderError, SchemaResult}
import troy.cql.ast._
import troy.cql.ast.ddl.Keyspace.Replication
import troy.cql.ast.ddl.Table.PrimaryKey
import troy.cql.ast.ddl.{Field, Index, Table}

import scala.collection.JavaConverters._
import scala.language.postfixOps

trait SchemaConversions {

def toCreateKeyspace(keyspaceMetadata: KeyspaceMetadata): SchemaResult[CreateKeyspace] =
Either.catchNonFatal {
val name: String = Option(keyspaceMetadata.getName)
.getOrElse(throw new IllegalArgumentException("Schema name is null"))
val replication: Option[Replication] = Option(keyspaceMetadata.getReplication)
.flatMap { m =>
val seq = m.asScala.toSeq
if (seq.isEmpty) None else Option(Replication(seq.sortBy(_._1)))
}
CreateKeyspace(
ifNotExists = false,
keyspaceName = KeyspaceName(name),
properties = replication map (Seq(_)) getOrElse Seq.empty)
} leftMap (SchemaDefinitionProviderError(_))

def toCreateTable(metadata: AbstractTableMetadata): SchemaResult[CreateTable] =
Either.catchNonFatal {
for {
columns <- metadata.getColumns.asScala.toList.traverse(toTableColumn)
primaryKey <- toPrimaryKey(
metadata.getPartitionKey.asScala.toList,
metadata.getClusteringColumns.asScala.toList)
} yield
CreateTable(
ifNotExists = false,
tableName = TableName(Some(KeyspaceName(metadata.getKeyspace.getName)), metadata.getName),
columns = columns,
primaryKey = Some(primaryKey),
options = Seq.empty
)
} leftMap (SchemaDefinitionProviderError(_)) joinRight

def readTable(metadata: IndexMetadata): TableName =
TableName(Some(KeyspaceName(metadata.getTable.getKeyspace.getName)), metadata.getTable.getName)

def toCreateIndex(
metadata: IndexMetadata,
readTable: (IndexMetadata) => TableName = readTable): Either[
SchemaDefinitionProviderError,
CreateIndex] =
Either.catchNonFatal {
CreateIndex(
isCustom = metadata.isCustomIndex,
ifNotExists = false,
indexName = Option(metadata.getName),
tableName = readTable(metadata),
identifier = Index.Identifier(metadata.getTarget),
using =
if (metadata.isCustomIndex)
// The options are not visible in the IndexMetadata class
Some(Index.Using(metadata.getIndexClassName, None))
else None
)
} leftMap (SchemaDefinitionProviderError(_))

def toUserType(userType: UserType): SchemaResult[CreateType] =
Either.catchNonFatal {
userType.getFieldNames.asScala.toList.traverse { fieldName =>
toField(fieldName, userType.getFieldType(fieldName))
} map { list =>
CreateType(
ifNotExists = false,
typeName = TypeName(Some(KeyspaceName(userType.getKeyspace)), userType.getTypeName),
fields = list)
}
} leftMap (SchemaDefinitionProviderError(_)) joinRight

private[this] def toField(
name: String,
datastaxDataType: DatastaxDataType): SchemaResult[Field] =
toDataType(datastaxDataType) map { dataType =>
Field(name, dataType)
}

private[this] def toTableColumn(metadata: ColumnMetadata): SchemaResult[Table.Column] =
toDataType(metadata.getType).map { dataType =>
Table.Column(
name = metadata.getName,
dataType = dataType,
isStatic = metadata.isStatic,
isPrimaryKey = false)
}

private[this] def toDataType(dataType: DatastaxDataType): SchemaResult[DataType] = {

import DatastaxDataType._

def toDataTypeNative(dataType: DatastaxDataType): SchemaResult[DataType.Native] =
dataType.getName match {
case Name.ASCII => DataType.Ascii.asRight
case Name.BIGINT => DataType.BigInt.asRight
case Name.BLOB => DataType.Blob.asRight
case Name.BOOLEAN => DataType.Boolean.asRight
case Name.COUNTER => DataType.Counter.asRight
case Name.DATE => DataType.Date.asRight
case Name.DECIMAL => DataType.Decimal.asRight
case Name.DOUBLE => DataType.Double.asRight
case Name.FLOAT => DataType.Float.asRight
case Name.INET => DataType.Inet.asRight
case Name.INT => DataType.Int.asRight
case Name.SMALLINT => DataType.Smallint.asRight
case Name.TEXT => DataType.Text.asRight
case Name.TIME => DataType.Time.asRight
case Name.TIMESTAMP => DataType.Timestamp.asRight
case Name.TIMEUUID => DataType.Timeuuid.asRight
case Name.TINYINT => DataType.Tinyint.asRight
case Name.UUID => DataType.Uuid.asRight
case Name.VARCHAR => DataType.Varchar.asRight
case Name.VARINT => DataType.Varint.asRight
case _ =>
Left(SchemaDefinitionProviderError(s"Native DataType ${dataType.getName} not supported"))
}

def toCollectionType(collectionType: CollectionType): SchemaResult[DataType] = {

val typeArgs: List[DatastaxDataType] = collectionType.getTypeArguments.asScala.toList

val maybeCol = collectionType.getName match {
case Name.LIST =>
typeArgs.headOption map { typeArg =>
toDataTypeNative(typeArg) map DataType.List
}
case Name.SET =>
typeArgs.headOption map { typeArg =>
toDataTypeNative(typeArg) map DataType.Set
}
case Name.MAP =>
for {
t1 <- typeArgs.headOption
t2 <- typeArgs.tail.headOption
} yield (toDataTypeNative(t1) |@| toDataTypeNative(t2)).map(DataType.Map)
case _ => None
}

maybeCol getOrElse {
Left(
SchemaDefinitionProviderError(
s"Error parsing collection DataType '${collectionType.asFunctionParameterString()}'"))
}
}

def toTupleType(tupleType: TupleType): SchemaResult[DataType] =
tupleType.getComponentTypes.asScala.toList traverse toDataTypeNative map DataType.Tuple

dataType match {
case nativeType: NativeType => toDataTypeNative(nativeType)
case customType: CustomType => Right(DataType.Custom(customType.getCustomTypeClassName))
case collectionType: CollectionType => toCollectionType(collectionType)
case tupleType: TupleType => toTupleType(tupleType)
case userType: UserType =>
Right(DataType.UserDefined(KeyspaceName(userType.getKeyspace), userType.getTypeName))
}
}

private[this] def toPrimaryKey(
partitionKeys: List[ColumnMetadata],
clusteringColumns: List[ColumnMetadata]): SchemaResult[PrimaryKey] =
PrimaryKey(partitionKeys.map(_.getName), clusteringColumns.map(_.getName)).asRight

}
2 changes: 1 addition & 1 deletion core/src/main/scala/schema/provider/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package schema
package object provider {

trait SchemaDefinitionProvider {
def schemaDefinition: Either[SchemaDefinitionProviderError, SchemaDefinition]
def schemaDefinition: SchemaResult[SchemaDefinition]
}

}
Loading

0 comments on commit 76c52cb

Please sign in to comment.