Skip to content

Commit

Permalink
fix: Triples Transformation flow not to log data related issues
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro committed Mar 1, 2022
1 parent 01f0cc0 commit 491c030
Show file tree
Hide file tree
Showing 14 changed files with 141 additions and 103 deletions.
29 changes: 19 additions & 10 deletions graph-commons/src/test/scala/io/renku/interpreters/TestLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ class TestLogger[F[_]: Monad] extends Logger[F] with should.Matchers {
private[this] val invocations = new ConcurrentLinkedQueue[LogEntry]()

def getMessages(severity: Level): List[LogMessage] =
invocations.asScala.filter(_.level === severity).toList.map(_.message)
invocations(of = severity).toList.map(_.message)

private def invocations(of: Level): Iterable[LogEntry] =
invocations.asScala.filter(_.level === of)

def logged(expected: LogEntry*): Assertion =
invocations should contain allElementsOf expected
Expand All @@ -56,6 +59,11 @@ class TestLogger[F[_]: Monad] extends Logger[F] with should.Matchers {
def expectNoLogs(): Unit =
if (!invocations.isEmpty) fail(s"No logs expected but got $invocationsPrettyPrint")

def expectNoLogs(severity: Level): Unit = invocations(of = severity).toList match {
case Nil => ()
case invocations => fail(s"No $severity logs expected but got ${prettyPrint(invocations)}")
}

def reset(): Unit = invocations.clear()

override def error(t: Throwable)(message: => String): F[Unit] = {
Expand Down Expand Up @@ -108,15 +116,16 @@ class TestLogger[F[_]: Monad] extends Logger[F] with should.Matchers {
implicitly[Monad[F]].pure(())
}

private def invocationsPrettyPrint: String =
invocations.asScala
.map {
case LogEntry(level, Message(message)) => s"$level '$message'"
case LogEntry(level, MessageAndThrowable(message, throwable)) => s"$level '$message' $throwable"
case LogEntry(level, MessageAndThrowableMatcher(message, throwableMatcher)) =>
s"$level '$message' $throwableMatcher"
}
.mkString("\n", "\n", "")
private def invocationsPrettyPrint: String = prettyPrint(invocations.asScala)

private def prettyPrint(invocations: Iterable[LogEntry]): String = invocations
.map {
case LogEntry(level, Message(message)) => s"$level '$message'"
case LogEntry(level, MessageAndThrowable(message, throwable)) => s"$level '$message' $throwable"
case LogEntry(level, MessageAndThrowableMatcher(message, throwableMatcher)) =>
s"$level '$message' $throwableMatcher"
}
.mkString("\n", "\n", "")
}

object TestLogger {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ object ProcessingRecoverableError {
abstract class ProcessingNonRecoverableError(message: String, cause: Throwable) extends Exception(message, cause)
object ProcessingNonRecoverableError {

final case class DataError(message: String, cause: Throwable) extends ProcessingNonRecoverableError(message, cause)
object DataError {
def apply(message: String): DataError = DataError(message, null)
final case class MalformedRepository(message: String, cause: Throwable)
extends ProcessingNonRecoverableError(message, cause)
object MalformedRepository {
def apply(message: String): MalformedRepository = MalformedRepository(message, null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private class EventProcessorImpl[F[_]: MonadThrow: Logger](
private def toNonRecoverableFailure(
commit: CommitEvent
): PartialFunction[Throwable, F[TriplesGenerationResult]] = {
case exception: ProcessingNonRecoverableError.DataError =>
case exception: ProcessingNonRecoverableError.MalformedRepository =>
NonRecoverableError(commit, exception).pure[F].widen[TriplesGenerationResult]
case NonFatal(exception) =>
Logger[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private object Commands {
.catchNonFatal(renkuMigrate(directory.value))
.void
.recoverWith { case NonFatal(exception) =>
new ProcessingNonRecoverableError.DataError(
new ProcessingNonRecoverableError.MalformedRepository(
s"'renku migrate' failed for commit: ${commitEvent.commitId}, project: ${commitEvent.project.id}",
exception
).raiseError[F, Unit]
Expand All @@ -236,7 +236,7 @@ private object Commands {
LogWorthyRecoverableError("Not enough memory").asLeft[JsonLD].leftWiden[ProcessingRecoverableError].pure[F]
case NonFatal(exception) =>
ProcessingNonRecoverableError
.DataError("'renku graph export' failed", exception)
.MalformedRepository("'renku graph export' failed", exception)
.raiseError[F, Either[ProcessingRecoverableError, JsonLD]]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ private[awaitinggeneration] class RenkuLogTriplesGenerator[F[_]: Async] private[
commitEvent: CommitEvent,
maybeAccessToken: Option[AccessToken]
): PartialFunction[Throwable, F[Either[ProcessingRecoverableError, JsonLD]]] = {
case ProcessingNonRecoverableError.DataError(message, cause) =>
case ProcessingNonRecoverableError.MalformedRepository(message, cause) =>
ProcessingNonRecoverableError
.DataError(s"${logMessageCommon(commitEvent)} $message", cause)
.MalformedRepository(s"${logMessageCommon(commitEvent)} $message", cause)
.raiseError[F, Either[ProcessingRecoverableError, JsonLD]]
case NonFatal(exception) =>
(Option(exception.getMessage) -> maybeAccessToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import io.renku.logging.ExecutionTimeRecorder.ElapsedTime
import io.renku.metrics.MetricsRegistry
import io.renku.rdfstore.SparqlQueryTimeRecorder
import io.renku.triplesgenerator.events.categories.EventStatusUpdater._
import io.renku.triplesgenerator.events.categories.{EventStatusUpdater, ProcessingRecoverableError}
import io.renku.triplesgenerator.events.categories.{EventStatusUpdater, ProcessingNonRecoverableError, ProcessingRecoverableError}
import io.renku.triplesgenerator.events.categories.ProcessingRecoverableError._
import io.renku.triplesgenerator.events.categories.triplesgenerated.transformation.TransformationStepsCreator
import io.renku.triplesgenerator.events.categories.triplesgenerated.triplesuploading.TriplesUploadResult._
Expand Down Expand Up @@ -111,10 +111,13 @@ private class EventProcessorImpl[F[_]: MonadThrow: Logger](

private def nonRecoverableFailure(
triplesGeneratedEvent: TriplesGeneratedEvent
): PartialFunction[Throwable, F[UploadingResult]] = { case NonFatal(exception) =>
Logger[F]
.error(exception)(s"${logMessageCommon(triplesGeneratedEvent)} ${exception.getMessage}")
.map(_ => NonRecoverableError(triplesGeneratedEvent, exception))
): PartialFunction[Throwable, F[UploadingResult]] = {
case exception: ProcessingNonRecoverableError.MalformedRepository =>
NonRecoverableError(triplesGeneratedEvent, exception).pure[F].widen[UploadingResult]
case NonFatal(exception) =>
Logger[F]
.error(exception)(s"${logMessageCommon(triplesGeneratedEvent)} ${exception.getMessage}")
.map(_ => NonRecoverableError(triplesGeneratedEvent, exception))
}

private def toUploadingError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import io.renku.graph.model.entities.Project.GitLabProjectInfo
import io.renku.graph.model.entities._
import io.renku.http.client.AccessToken
import io.renku.jsonld.JsonLDDecoder.decodeList
import io.renku.triplesgenerator.events.categories.ProcessingRecoverableError
import io.renku.triplesgenerator.events.categories.{ProcessingNonRecoverableError, ProcessingRecoverableError}
import io.renku.triplesgenerator.events.categories.triplesgenerated.projectinfo.ProjectInfoFinder
import org.typelevel.log4cats.Logger

Expand Down Expand Up @@ -64,7 +64,8 @@ private class JsonLDDeserializerImpl[F[_]: MonadThrow](
) = findProjectInfo(event.project.path) semiflatMap {
case Some(projectInfo) => projectInfo.pure[F]
case None =>
new IllegalStateException(s"No project ${event.project.show} found in GitLab")
ProcessingNonRecoverableError
.MalformedRepository(show"${event.project} not found in GitLab")
.raiseError[F, GitLabProjectInfo]
}

Expand All @@ -87,9 +88,11 @@ private class JsonLDDeserializerImpl[F[_]: MonadThrow](
} yield project
}

private def raiseError[T](event: TriplesGeneratedEvent): DecodingFailure => F[T] = err =>
new IllegalStateException(show"Finding Project entity in the JsonLD for ${event.project} failed", err)
.raiseError[F, T]
private def raiseError[T](event: TriplesGeneratedEvent): DecodingFailure => F[T] = failure =>
new ProcessingNonRecoverableError.MalformedRepository(
show"Finding Project entity in the JsonLD for ${event.project} failed",
failure
).raiseError[F, T]
}

private object JsonLDDeserializer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ private[triplesgenerated] trait ProjectInfoFinder[F[_]] {
private[triplesgenerated] object ProjectInfoFinder {
def apply[F[_]: Async: NonEmptyParallel: Parallel: Logger](
gitLabThrottler: Throttler[F, GitLab]
): F[ProjectInfoFinder[F]] =
for {
projectFinder <- ProjectFinder[F](gitLabThrottler)
membersFinder <- ProjectMembersFinder[F](gitLabThrottler)
memberEmailFinder <- MemberEmailFinder[F](gitLabThrottler)
} yield new ProjectInfoFinderImpl(projectFinder, membersFinder, memberEmailFinder)
): F[ProjectInfoFinder[F]] = for {
projectFinder <- ProjectFinder[F](gitLabThrottler)
membersFinder <- ProjectMembersFinder[F](gitLabThrottler)
memberEmailFinder <- MemberEmailFinder[F](gitLabThrottler)
} yield new ProjectInfoFinderImpl(projectFinder, membersFinder, memberEmailFinder)
}

private[triplesgenerated] class ProjectInfoFinderImpl[F[_]: MonadThrow: Parallel: Logger](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import io.renku.triplesgenerator.events.categories.awaitinggeneration.EventProce
import io.renku.triplesgenerator.events.categories.awaitinggeneration.EventProcessor.eventsProcessingTimesBuilder
import io.renku.triplesgenerator.events.categories.awaitinggeneration.triplesgeneration.TriplesGenerator
import io.renku.triplesgenerator.events.categories.{EventStatusUpdater, ProcessingRecoverableError}
import io.renku.triplesgenerator.generators.ErrorGenerators.{authRecoverableErrors, logWorthyRecoverableErrors, nonRecoverableDataErrors}
import io.renku.triplesgenerator.generators.ErrorGenerators.{authRecoverableErrors, logWorthyRecoverableErrors, nonRecoverableMalformedRepoErrors}
import org.scalacheck.Gen
import org.scalamock.scalatest.MockFactory
import org.scalatest.concurrent.{Eventually, IntegrationPatience}
Expand Down Expand Up @@ -84,14 +84,14 @@ class EventProcessorSpec
verifyMetricsCollected()
}

"succeed if event fails during triples generation with a DataError" in new TestCase {
"succeed if event fails during triples generation with a ProcessingNonRecoverableError.MalformedRepository" in new TestCase {

val commitEvent = commitEvents.generateOne

givenFetchingAccessToken(forProjectPath = commitEvent.project.path)
.returning(maybeAccessToken.pure[Try])

val exception = nonRecoverableDataErrors.generateOne
val exception = nonRecoverableMalformedRepoErrors.generateOne
(triplesFinder
.generateTriples(_: CommitEvent)(_: Option[AccessToken]))
.expects(commitEvent, maybeAccessToken)
Expand All @@ -104,27 +104,28 @@ class EventProcessorSpec
logger.expectNoLogs()
}

"log an error and succeed if event fails during triples generation with a non data problem" in new TestCase {
"log an error and succeed " +
"if event fails during triples generation with a non-ProcessingNonRecoverableError.MalformedRepository" in new TestCase {

val commitEvent = commitEvents.generateOne
val commitEvent = commitEvents.generateOne

givenFetchingAccessToken(forProjectPath = commitEvent.project.path)
.returning(maybeAccessToken.pure[Try])
givenFetchingAccessToken(forProjectPath = commitEvent.project.path)
.returning(maybeAccessToken.pure[Try])

val exception = exceptions.generateOne
(triplesFinder
.generateTriples(_: CommitEvent)(_: Option[AccessToken]))
.expects(commitEvent, maybeAccessToken)
.returning(EitherT.liftF(exception.raiseError[Try, JsonLD]))
val exception = exceptions.generateOne
(triplesFinder
.generateTriples(_: CommitEvent)(_: Option[AccessToken]))
.expects(commitEvent, maybeAccessToken)
.returning(EitherT.liftF(exception.raiseError[Try, JsonLD]))

expectEventMarkedAsNonRecoverableFailure(commitEvent, exception)
expectEventMarkedAsNonRecoverableFailure(commitEvent, exception)

eventProcessor.process(commitEvent) shouldBe ().pure[Try]
eventProcessor.process(commitEvent) shouldBe ().pure[Try]

logError(commitEvent, exception)
}
logError(commitEvent, exception)
}

s"mark event as RecoverableFailure and log an error if finding triples fails with $LogWorthyRecoverableError" in new TestCase {
"mark event as RecoverableFailure and log an error if finding triples fails with LogWorthyRecoverableError" in new TestCase {

val commitEvent = commitEvents.generateOne

Expand All @@ -145,7 +146,7 @@ class EventProcessorSpec
}

"mark event as RecoverableFailure and refrain from loggin an error " +
s"if finding triples fails with $AuthRecoverableError" in new TestCase {
"if finding triples fails with AuthRecoverableError" in new TestCase {

val commitEvent = commitEvents.generateOne

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import io.renku.triplesgenerator.events.categories.{ProcessingNonRecoverableErro
import io.renku.triplesgenerator.events.categories.ProcessingRecoverableError._
import io.renku.triplesgenerator.events.categories.awaitinggeneration.triplesgeneration.renkulog.Commands.{GitLabRepoUrlFinder, RepositoryPath}
import io.renku.triplesgenerator.events.categories.awaitinggeneration.{CommitEvent, categoryName}
import io.renku.triplesgenerator.generators.ErrorGenerators.nonRecoverableDataErrors
import io.renku.triplesgenerator.generators.ErrorGenerators.nonRecoverableMalformedRepoErrors
import org.scalacheck.Gen
import org.scalamock.scalatest.MockFactory
import org.scalatest.matchers.should
Expand Down Expand Up @@ -601,7 +601,7 @@ class RenkuLogTriplesGeneratorSpec extends AnyWordSpec with IOSpec with MockFact
actual.getCause shouldBe exception
}

"fail if calling 'renku migrate' fails with DataError" in new TestCase {
"fail if calling 'renku migrate' fails with ProcessingNonRecoverableError.MalformedRepository" in new TestCase {

(file
.mkdir(_: Path))
Expand All @@ -626,7 +626,7 @@ class RenkuLogTriplesGeneratorSpec extends AnyWordSpec with IOSpec with MockFact
(file.exists(_: Path)).expects(renkuRepoFilePath).returning(true.pure[IO])
(file.exists(_: Path)).expects(dirtyRepoFilePath).returning(false.pure[IO])

val exception = nonRecoverableDataErrors.generateOne
val exception = nonRecoverableMalformedRepoErrors.generateOne
(renku
.migrate(_: CommitEvent)(_: RepositoryPath))
.expects(commitEvent, repositoryDirectory)
Expand All @@ -638,14 +638,14 @@ class RenkuLogTriplesGeneratorSpec extends AnyWordSpec with IOSpec with MockFact
.returning(IO.unit)
.atLeastOnce()

val actual = intercept[ProcessingNonRecoverableError.DataError] {
val actual = intercept[ProcessingNonRecoverableError.MalformedRepository] {
triplesGenerator.generateTriples(commitEvent)(maybeAccessToken).value.unsafeRunSync()
}
actual.getMessage shouldBe s"${commonLogMessage(commitEvent)} ${exception.message}"
actual.getCause shouldBe exception.cause
}

"fail if calling 'renku migrate' fails with non-DataError" in new TestCase {
"fail if calling 'renku migrate' fails with non-ProcessingNonRecoverableError.MalformedRepository" in new TestCase {

(file
.mkdir(_: Path))
Expand Down Expand Up @@ -689,7 +689,7 @@ class RenkuLogTriplesGeneratorSpec extends AnyWordSpec with IOSpec with MockFact
actual.getCause shouldBe exception
}

"fail if calling 'renku graph export' fails with DataError" in new TestCase {
"fail if calling 'renku graph export' fails with ProcessingNonRecoverableError.MalformedRepository" in new TestCase {

(file
.mkdir(_: Path))
Expand Down Expand Up @@ -719,7 +719,7 @@ class RenkuLogTriplesGeneratorSpec extends AnyWordSpec with IOSpec with MockFact
.expects(commitEvent, repositoryDirectory)
.returning(IO.unit)

val exception = nonRecoverableDataErrors.generateOne
val exception = nonRecoverableMalformedRepoErrors.generateOne
(renku
.graphExport(_: RepositoryPath))
.expects(repositoryDirectory)
Expand All @@ -731,14 +731,14 @@ class RenkuLogTriplesGeneratorSpec extends AnyWordSpec with IOSpec with MockFact
.returning(IO.unit)
.atLeastOnce()

val actual = intercept[ProcessingNonRecoverableError.DataError] {
val actual = intercept[ProcessingNonRecoverableError.MalformedRepository] {
triplesGenerator.generateTriples(commitEvent)(maybeAccessToken).value.unsafeRunSync()
}
actual.getMessage shouldBe s"${commonLogMessage(commitEvent)} ${exception.message}"
actual.getCause shouldBe exception.cause
}

"fail if calling 'renku graph export' fails with non-DataError" in new TestCase {
"fail if calling 'renku graph export' fails with non-ProcessingNonRecoverableError.MalformedRepository" in new TestCase {

(file
.mkdir(_: Path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,20 @@ class RenkuSpec extends AnyWordSpec with IOSpec with should.Matchers with MockFa
renku.migrate(event)(path).unsafeRunSync() shouldBe ()
}

"fail with some ProcessingNonRecoverableError.DataError if 'renku migrate' fails" in new TestCase {
val exception = exceptions.generateOne
renkuMigrate.expects(path.value).throws(exception)
"fail with some ProcessingNonRecoverableError.MalformedRepository " +
"if 'renku migrate' fails" in new TestCase {
val exception = exceptions.generateOne
renkuMigrate.expects(path.value).throws(exception)

val event = commitEvents.generateOne
val event = commitEvents.generateOne

val failure = intercept[ProcessingNonRecoverableError.DataError] {
renku.migrate(event)(path).unsafeRunSync()
}
val failure = intercept[ProcessingNonRecoverableError.MalformedRepository] {
renku.migrate(event)(path).unsafeRunSync()
}

failure.getMessage shouldBe s"'renku migrate' failed for commit: ${event.commitId}, project: ${event.project.id}"
failure.getCause shouldBe exception
}
failure.getMessage shouldBe s"'renku migrate' failed for commit: ${event.commitId}, project: ${event.project.id}"
failure.getCause shouldBe exception
}
}

"graphExport" should {
Expand All @@ -76,11 +77,11 @@ class RenkuSpec extends AnyWordSpec with IOSpec with should.Matchers with MockFa
triples shouldBe commandBody
}

"fail if calling 'renku export' results with a DataError" in new TestCase {
"fail if calling 'renku export' results with a ProcessingNonRecoverableError.MalformedRepository" in new TestCase {
val exception = exceptions.generateOne
renkuExport.expects(path.value).throws(exception)

val failure = intercept[ProcessingNonRecoverableError.DataError] {
val failure = intercept[ProcessingNonRecoverableError.MalformedRepository] {
renku.graphExport(path).value.unsafeRunSync()
}

Expand Down
Loading

0 comments on commit 491c030

Please sign in to comment.