Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: added retry mechanism when storing and fetching the access token #886

Merged
merged 2 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package io.renku.tokenrepository.repository.association

import cats.MonadThrow
import cats.effect.Async
import cats.effect.kernel.Temporal
import cats.syntax.all._
import io.renku.db.{SessionResource, SqlStatement}
import io.renku.graph.model.projects.{Id, Path}
Expand All @@ -29,6 +28,10 @@ import io.renku.metrics.LabeledHistogram
import io.renku.tokenrepository.repository.deletion.{TokenRemover, TokenRemoverImpl}
import io.renku.tokenrepository.repository.{AccessTokenCrypto, ProjectsTokensDB}
import org.typelevel.log4cats.Logger
import io.renku.tokenrepository.repository.fetching.PersistedTokensFinderImpl
import io.renku.tokenrepository.repository.fetching.PersistedTokensFinder
import scala.util.control.NonFatal
import eu.timepit.refined.types.numeric

private trait TokenAssociator[F[_]] {
def associate(projectId: Id, token: AccessToken): F[Unit]
Expand All @@ -38,7 +41,9 @@ private class TokenAssociatorImpl[F[_]: MonadThrow](
projectPathFinder: ProjectPathFinder[F],
accessTokenCrypto: AccessTokenCrypto[F],
associationPersister: AssociationPersister[F],
tokenRemover: TokenRemover[F]
tokenRemover: TokenRemover[F],
tokenFinder: PersistedTokensFinder[F],
maxRetries: numeric.NonNegInt
) extends TokenAssociator[F] {

import accessTokenCrypto._
Expand All @@ -53,18 +58,49 @@ private class TokenAssociatorImpl[F[_]: MonadThrow](

private def encryptAndPersist(projectId: Id, projectPath: Path, token: AccessToken) = for {
encryptedToken <- encrypt(token)
_ <- persistAssociation(projectId, projectPath, encryptedToken)
_ <- persistOrRetry(projectId, projectPath, encryptedToken)
} yield ()

private def persistOrRetry(projectId: Id,
projectPath: Path,
encryptedToken: AccessTokenCrypto.EncryptedAccessToken,
numberOfRetries: Int = 0
): F[Unit] = for {
_ <- persistAssociation(projectId, projectPath, encryptedToken)
_ <- verifyTokenIntegrity(projectPath) recoverWith retry(projectId, projectPath, encryptedToken, numberOfRetries)
} yield ()

private def verifyTokenIntegrity(projectPath: Path) =
tokenFinder.findToken(projectPath).value.flatMap {
case Some(savedToken) => decrypt(savedToken).void
case _ =>
new Exception(show"Token associator - saved encrypted token cannot be found for project: $projectPath")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe raise the exception only when we run out of attempts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good idea!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this error is thrown only when the token is not found after it being saved to the DB. It should not happen, it is an unexpected behaviour of the program. With the retry being after this exception we would insert it again and verify if it was saved properly

.raiseError[F, Unit]
}

private def retry(projectId: Id,
projectPath: Path,
encryptedToken: AccessTokenCrypto.EncryptedAccessToken,
numberOfRetries: Int
): PartialFunction[Throwable, F[Unit]] = { case NonFatal(error) =>
if (numberOfRetries < maxRetries.value) {
persistOrRetry(projectId, projectPath, encryptedToken, numberOfRetries + 1)
} else error.raiseError[F, Unit]
}
}

private object TokenAssociator {
def apply[F[_]: Async: Temporal: Logger](

private val maxRetries: numeric.NonNegInt = numeric.NonNegInt.unsafeFrom(3)

def apply[F[_]: Async: Logger](
sessionResource: SessionResource[F, ProjectsTokensDB],
queriesExecTimes: LabeledHistogram[F, SqlStatement.Name]
): F[TokenAssociator[F]] = for {
pathFinder <- ProjectPathFinder[F]
accessTokenCrypto <- AccessTokenCrypto[F]()
persister = new AssociationPersisterImpl(sessionResource, queriesExecTimes)
tokenRemover = new TokenRemoverImpl[F](sessionResource, queriesExecTimes)
} yield new TokenAssociatorImpl[F](pathFinder, accessTokenCrypto, persister, tokenRemover)
tokenFinder = new PersistedTokensFinderImpl[F](sessionResource, queriesExecTimes)
} yield new TokenAssociatorImpl[F](pathFinder, accessTokenCrypto, persister, tokenRemover, tokenFinder, maxRetries)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import io.renku.tokenrepository.repository.AccessTokenCrypto.EncryptedAccessToke
import io.renku.tokenrepository.repository.{ProjectsTokensDB, TokenRepositoryTypeSerializers}
import skunk.implicits._

private trait PersistedTokensFinder[F[_]] {
private[repository] trait PersistedTokensFinder[F[_]] {
def findToken(projectId: Id): OptionT[F, EncryptedAccessToken]
def findToken(projectPath: Path): OptionT[F, EncryptedAccessToken]
}

private class PersistedTokensFinderImpl[F[_]: MonadCancelThrow](
private[repository] class PersistedTokensFinderImpl[F[_]: MonadCancelThrow](
sessionResource: SessionResource[F, ProjectsTokensDB],
queriesExecTimes: LabeledHistogram[F, SqlStatement.Name]
) extends DbClient[F](Some(queriesExecTimes))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import io.renku.graph.model.projects.{Id, Path}
import io.renku.http.client.AccessToken
import io.renku.metrics.LabeledHistogram
import io.renku.tokenrepository.repository.{AccessTokenCrypto, ProjectsTokensDB}
import eu.timepit.refined.auto._
import scala.util.control.NonFatal
import eu.timepit.refined.types.numeric

private trait TokenFinder[F[_]] {
def findToken(projectPath: Path): OptionT[F, AccessToken]
Expand All @@ -35,30 +38,50 @@ private trait TokenFinder[F[_]] {

private class TokenFinderImpl[F[_]: MonadThrow](
tokenInRepoFinder: PersistedTokensFinder[F],
accessTokenCrypto: AccessTokenCrypto[F]
accessTokenCrypto: AccessTokenCrypto[F],
maxRetries: numeric.NonNegInt
) extends TokenFinder[F] {

import accessTokenCrypto._

override def findToken(projectPath: Path): OptionT[F, AccessToken] = for {
encryptedToken <- tokenInRepoFinder.findToken(projectPath)
accessToken <- OptionT.liftF(decrypt(encryptedToken))
accessToken <-
OptionT.liftF(decrypt(encryptedToken)) recoverWith retry(() => tokenInRepoFinder.findToken(projectPath))
} yield accessToken

override def findToken(projectId: Id): OptionT[F, AccessToken] = for {
encryptedToken <- tokenInRepoFinder.findToken(projectId)
accessToken <- OptionT.liftF(decrypt(encryptedToken))
accessToken <-
OptionT.liftF(decrypt(encryptedToken)) recoverWith retry(() => tokenInRepoFinder.findToken(projectId))
} yield accessToken

private def retry(fetchToken: () => OptionT[F, AccessTokenCrypto.EncryptedAccessToken],
numberOfRetries: Int = 0
): PartialFunction[Throwable, OptionT[F, AccessToken]] = { case NonFatal(exception) =>
if (numberOfRetries < maxRetries.value) {
for {
encryptedToken <- fetchToken()
accessToken <- OptionT.liftF(decrypt(encryptedToken)) recoverWith retry(fetchToken, numberOfRetries + 1)
} yield accessToken
} else {
OptionT.liftF(exception.raiseError[F, AccessToken])
}
}
}

private object TokenFinder {

private val maxRetries = numeric.NonNegInt.unsafeFrom(3)

def apply[F[_]: MonadCancelThrow](
sessionResource: SessionResource[F, ProjectsTokensDB],
queriesExecTimes: LabeledHistogram[F, SqlStatement.Name]
): F[TokenFinder[F]] = for {
accessTokenCrypto <- AccessTokenCrypto[F]()
} yield new TokenFinderImpl[F](
new PersistedTokensFinderImpl[F](sessionResource, queriesExecTimes),
accessTokenCrypto
accessTokenCrypto,
maxRetries
)
}
Loading