Skip to content

Commit

Permalink
#684 data retention - original code from PR#762 - brought up to date …
Browse files Browse the repository at this point in the history
…with current develop - TODO test
  • Loading branch information
dk1844 committed Oct 17, 2022
1 parent 6bec89a commit a3fcaca
Show file tree
Hide file tree
Showing 13 changed files with 507 additions and 27 deletions.
28 changes: 28 additions & 0 deletions admin/src/main/scala/za/co/absa/spline/admin/AdminCLI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.slf4j.Logger.ROOT_LOGGER_NAME
import org.slf4j.LoggerFactory
import scopt.{OptionDef, OptionParser}
import za.co.absa.spline.admin.AdminCLI.AdminCLIConfig
import za.co.absa.spline.admin.DateTimeUtils.parseZonedDateTime
import za.co.absa.spline.arango.AuxiliaryDBAction._
import za.co.absa.spline.arango.OnDBExistsAction.{Drop, Fail, Skip}
import za.co.absa.spline.arango.{ArangoManagerFactory, ArangoManagerFactoryImpl}
Expand Down Expand Up @@ -214,13 +215,32 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
)
.children(this.dbCommandOptions: _*)

(cmd("db-prune")
action ((_, c) => c.copy(cmd = DBPrune()))
text "Prune old data to decrease the database footprint and speed up queries."
children (this.dbCommandOptions: _*)
children(
opt[String]("retain-for")
text "Retention period in format <length><unit>. " +
"Example: `--retain-for 30d` means to retain data that is NOT older than 30 days from now."
action { case (s, c@AdminCLIConfig(cmd: DBPrune, _, _)) => c.copy(cmd.copy(retentionPeriod = Some(Duration(s)))) },
opt[String]("before-date")
text "A datetime with an optional time and zone parts in ISO-8601 format. " +
"The data older than the specified datetime is subject for removal."
action { case (s, c@AdminCLIConfig(cmd: DBPrune, _, _)) => c.copy(cmd.copy(thresholdDate = Some(parseZonedDateTime(s)))) },
))

checkConfig {
case AdminCLIConfig(null, _, _) =>
failure("No command given")
case AdminCLIConfig(cmd: DBCommand, _, _) if cmd.dbUrl == null =>
failure("DB connection string is required")
case AdminCLIConfig(cmd: DBInit, _, _) if cmd.force && cmd.skip =>
failure("Options '--force' and '--skip' cannot be used together")
case AdminCLIConfig(cmd: DBPrune, _, _) if cmd.retentionPeriod.isEmpty && cmd.thresholdDate.isEmpty =>
failure("One of the following options must be specified: --retain-for or --before-date")
case AdminCLIConfig(cmd: DBPrune, _, _) if cmd.retentionPeriod.isDefined && cmd.thresholdDate.isDefined =>
failure("Options --retain-for and --before-date cannot be used together")
case _ =>
success
}
Expand Down Expand Up @@ -255,6 +275,14 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
case DBExec(url, actions) =>
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
Await.result(dbManager.execute(actions: _*), Duration.Inf)

case DBPrune(url, Some(retentionPeriod), _) =>
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
Await.result(dbManager.prune(retentionPeriod), Duration.Inf)

case DBPrune(url, _, Some(dateTime)) =>
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
Await.result(dbManager.prune(dateTime), Duration.Inf)
}

println(ansi"%green{DONE}")
Expand Down
60 changes: 60 additions & 0 deletions admin/src/main/scala/za/co/absa/spline/admin/DateTimeUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.admin

import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import java.time.temporal.ChronoField
import java.time.{ZoneId, ZonedDateTime}

import scala.util.control.NonFatal

object DateTimeUtils {

private val ZonedDateTimeRegexp = (s"" +
"^" +
"""([\dT:.+\-]+?)""".r + // local datetime
"""(Z|[+\-]\d\d:\d\d)?""".r + // timezone offset
"""(?:\[([\w/\-+]+)])?""".r + // timezone name
"$").r

private val ZonedDateTimeFormatter = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.optionalStart()
.appendLiteral('T')
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.optionalEnd()
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.toFormatter();

def parseZonedDateTime(s: String, defaultZoneId: ZoneId = ZoneId.systemDefault): ZonedDateTime =
try {
val ZonedDateTimeRegexp(ldt, tzOffset, tzId) = s
val maybeTzIds = Seq(tzId, tzOffset).map(Option.apply)

require(!maybeTzIds.forall(_.isDefined), "Either timezone ID or offset should be specified, not both")

val tz = maybeTzIds
.collectFirst({ case Some(v) => ZoneId.of(v) })
.getOrElse(defaultZoneId)

ZonedDateTime.parse(ldt, ZonedDateTimeFormatter.withZone(tz))

} catch {
case NonFatal(e) => throw new IllegalArgumentException(s"Cannot parse zoned datetime: $s", e)
}
}
12 changes: 12 additions & 0 deletions admin/src/main/scala/za/co/absa/spline/admin/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import za.co.absa.spline.admin.DBCommand._
import za.co.absa.spline.arango.{AuxiliaryDBAction, DatabaseCreateOptions}
import za.co.absa.spline.persistence.ArangoConnectionURL

import java.time.ZonedDateTime
import scala.concurrent.duration.Duration

sealed trait Command

sealed trait DBCommand extends Command {
Expand Down Expand Up @@ -67,3 +70,12 @@ case class DBExec(

def addAction(action: AuxiliaryDBAction): DBExec = copy(actions = actions :+ action)
}

case class DBPrune(
override val dbUrl: Url = null,
retentionPeriod: Option[Duration] = None,
thresholdDate: Option[ZonedDateTime] = None
) extends DBCommand {
protected override type Self = DBPrune
protected override val selfCopy: DBCommandProps => Self = copy(_)
}
17 changes: 17 additions & 0 deletions admin/src/main/scala/za/co/absa/spline/arango/ArangoManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import za.co.absa.spline.persistence.DatabaseVersionManager
import za.co.absa.spline.persistence.migration.Migrator
import za.co.absa.spline.persistence.model.{CollectionDef, GraphDef, SearchAnalyzerDef, SearchViewDef}

import java.time.{Clock, ZonedDateTime}
import scala.collection.JavaConverters._
import scala.collection.immutable._
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source

Expand All @@ -46,13 +48,18 @@ trait ArangoManager {
def initialize(onExistsAction: OnDBExistsAction, options: DatabaseCreateOptions): Future[Boolean]
def upgrade(): Future[Unit]
def execute(actions: AuxiliaryDBAction*): Future[Unit]
def prune(retentionPeriod: Duration): Future[Unit]
def prune(thresholdDate: ZonedDateTime): Future[Unit]

}

class ArangoManagerImpl(
db: ArangoDatabaseAsync,
dbVersionManager: DatabaseVersionManager,
dataRetentionManager: DataRetentionManager,
migrator: Migrator,
foxxManager: FoxxManager,
clock: Clock,
appDBVersion: SemanticVersion)
(implicit val ex: ExecutionContext)
extends ArangoManager
Expand Down Expand Up @@ -115,6 +122,16 @@ class ArangoManagerImpl(
}
}

override def prune(retentionPeriod: Duration): Future[Unit] = {
log.debug(s"Prune data older than $retentionPeriod")
dataRetentionManager.pruneBefore(clock.millis - retentionPeriod.toMillis)
}

override def prune(dateTime: ZonedDateTime): Future[Unit] = {
log.debug(s"Prune data before $dateTime")
dataRetentionManager.pruneBefore(dateTime.toInstant.toEpochMilli)
}

private def checkDBAccess(db: ArangoDatabaseAsync) = {
db.exists.toScala
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import za.co.absa.spline.persistence.ArangoImplicits.ArangoDatabaseAsyncScalaWra
import za.co.absa.spline.persistence.migration.{MigrationScriptRepository, Migrator}
import za.co.absa.spline.persistence.{ArangoConnectionURL, ArangoDatabaseFacade, DatabaseVersionManager}

import java.time.Clock
import javax.net.ssl.SSLContext
import scala.concurrent.ExecutionContext

Expand All @@ -36,21 +37,25 @@ class ArangoManagerFactoryImpl(activeFailover: Boolean)(implicit ec: ExecutionCo

def dbManager(db: ArangoDatabaseAsync): ArangoManager = {
val versionManager = new DatabaseVersionManager(db)
val drManager = new DataRetentionManager(db)
val migrator = new Migrator(db, scriptRepo, versionManager)
val foxxManager = new FoxxManagerImpl(db.restClient)
val clock = Clock.systemDefaultZone
new ArangoManagerImpl(
db,
versionManager,
drManager,
migrator,
foxxManager,
clock,
scriptRepo.latestToVersion
)
}

def dbFacade(): ArangoDatabaseFacade =
new ArangoDatabaseFacade(connectionURL, maybeSSLContext, activeFailover)

new AutoClosingArangoManagerProxy(dbManager, dbFacade)
AutoClosingArangoManagerProxy.create(dbManager, dbFacade)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,39 @@

package za.co.absa.spline.arango

import java.lang.reflect.{InvocationHandler, Method, Proxy}
import com.arangodb.async.ArangoDatabaseAsync
import za.co.absa.spline.persistence.ArangoDatabaseFacade

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

class AutoClosingArangoManagerProxy(
managerProvider: ArangoDatabaseAsync => ArangoManager,
arangoFacadeProvider: () => ArangoDatabaseFacade)
(implicit val ex: ExecutionContext)
extends ArangoManager {

override def initialize(onExistsAction: OnDBExistsAction, options: DatabaseCreateOptions): Future[Boolean] =
withManager(_.initialize(onExistsAction, options))

override def upgrade(): Future[Unit] =
withManager(_.upgrade())

override def execute(actions: AuxiliaryDBAction*): Future[Unit] =
withManager(_.execute(actions: _*))

private def withManager[A](fn: ArangoManager => Future[A]): Future[A] = {
val dbFacade = arangoFacadeProvider()

(Try(fn(managerProvider(dbFacade.db))) match {
case Failure(e) => Future.failed(e)
case Success(v) => v
}) andThen {
case _ => dbFacade.destroy()
object AutoClosingArangoManagerProxy {

def create(
managerProvider: ArangoDatabaseAsync => ArangoManager,
arangoFacadeProvider: () => ArangoDatabaseFacade)
(implicit ex: ExecutionContext): ArangoManager = {

val handler: InvocationHandler = (_: Any, method: Method, args: Array[AnyRef]) => {
val dbFacade = arangoFacadeProvider()
(Try {
val underlyingManager = managerProvider(dbFacade.db)
method
.invoke(underlyingManager, args: _*)
.asInstanceOf[Future[_]]
} match {
case Failure(e) => Future.failed(e)
case Success(v) => v
}) andThen {
case _ => dbFacade.destroy()
}
}
}

Proxy.newProxyInstance(
getClass.getClassLoader,
Array(classOf[ArangoManager]),
handler
).asInstanceOf[ArangoManager]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.arango


import com.arangodb.async.ArangoDatabaseAsync
import com.arangodb.internal.InternalArangoDatabaseOps

import scala.concurrent.{ExecutionContext, Future}

class DataRetentionManager(db: ArangoDatabaseAsync)(implicit ec: ExecutionContext) {

def pruneBefore(timestamp: Long): Future[Unit] = {
new InternalArangoDatabaseOps(db).restClient.delete(s"spline/admin/data/before/$timestamp")
}

}
38 changes: 37 additions & 1 deletion admin/src/test/scala/za/co/absa/spline/admin/AdminCLISpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package za.co.absa.spline.admin

import java.time.{ZoneId, ZonedDateTime}

import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.Mockito.{inOrder => mockitoInOrder, _} // Mockito.inOrder would collide with Matchers.inOrder
import org.scalatest.OneInstancePerTest
import org.scalatest.OptionValues._
import org.scalatest.flatspec.AnyFlatSpec
Expand All @@ -35,6 +37,7 @@ import za.co.absa.spline.persistence.model.{EdgeDef, NodeDef}

import javax.net.ssl.SSLContext
import scala.concurrent.Future
import scala.concurrent.duration._

class AdminCLISpec
extends AnyFlatSpec
Expand Down Expand Up @@ -91,6 +94,14 @@ class AdminCLISpec
arangoManagerMock.execute(any()))
thenReturn Future.successful({}))

(when(
arangoManagerMock.prune(any[Duration]()))
thenReturn Future.successful({}))

(when(
arangoManagerMock.prune(any[ZonedDateTime]()))
thenReturn Future.successful({}))

it should "when called with wrong options, print welcome message" in {
captureStdErr {
captureExitStatus(cli.exec(Array("db-init"))) should be(1)
Expand Down Expand Up @@ -255,5 +266,30 @@ class AdminCLISpec
IndicesCreate,
)
}

behavior of "DB-Prune"

it should "require either -r or -d option" in {
val msg = captureStdErr(captureExitStatus(cli.exec(Array("db-prune", "arangodb://foo/bar"))) should be(1))
msg should include("Try --help for more information")
}

it should "support retention duration" in assertingStdOut(include("DONE")) {
cli.exec(Array("db-prune", "--retain-for", "30d", "arangodb://foo/bar"))
connUrlCaptor.getValue should be(ArangoConnectionURL("arangodb://foo/bar"))
verify(arangoManagerMock).prune(30.days)
verifyNoMoreInteractions(arangoManagerMock)
}

it should "prune support threshold timestamp" in assertingStdOut(include("DONE")) {
cli.exec(Array("db-prune", "--before-date", "2020-04-11", "arangodb://foo/bar"))
cli.exec(Array("db-prune", "--before-date", "2020-04-11T22:33Z", "arangodb://foo/bar"))

val inOrder = mockitoInOrder(arangoManagerMock)

inOrder.verify(arangoManagerMock).prune(ZonedDateTime.of(2020, 4, 11, 0, 0, 0, 0, ZoneId.systemDefault))
inOrder.verify(arangoManagerMock).prune(ZonedDateTime.of(2020, 4, 11, 22, 33, 0, 0, ZoneId.of("Z")))
inOrder.verifyNoMoreInteractions()
}
}
}
Loading

0 comments on commit a3fcaca

Please sign in to comment.