Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bump framework for connection api retries #59

Merged
merged 5 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions backend/project/LibraryDependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import sbt._

object LibraryDependencies {
object NarrativeBackend {
val version = "20.11.5"
val version = "21.1.2"
val `narrative-common-ssm` = "io.narrative" %% "common-ssm" % version
val `narrative-common-catsretry` = "io.narrative" %% "common-catsretry" % version
val `narrative-common-doobie-testkit` = "io.narrative" %% "common-doobie-testkit" % version
Expand All @@ -15,14 +15,14 @@ object LibraryDependencies {
}

object NarrativeConnectorFramework {
val version = "0.2.2"
val version = "0.2.4"

val `connector-framework-core` =
"io.narrative" %% "connector-framework-core" % version
}

object Aws {
val version = "1.12.170"
val version = "1.12.656"
val `aws-java-sdk-core` = "com.amazonaws" % "aws-java-sdk-core" % version
val `aws-java-sdk-kms` = "com.amazonaws" % "aws-java-sdk-kms" % version
val `aws-java-sdk-ssm` = "com.amazonaws" % "aws-java-sdk-ssm" % version
Expand Down Expand Up @@ -83,7 +83,7 @@ object LibraryDependencies {
}

object Jackson {
val version = "2.13.3"
val version = "2.15.2"
def overrides = Seq(
"com.fasterxml.jackson.core" % "jackson-databind" % version,
"com.fasterxml.jackson.core" % "jackson-annotations" % version,
Expand Down
11 changes: 11 additions & 0 deletions backend/stores/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n%ex{short}</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.narrative.connectors.facebook

import cats.data.EitherT
import cats.data.{EitherT, OptionT}
import cats.effect.IO
import cats.implicits._
import cats.~>
import doobie.ConnectionIO
import cats.{Monad, ~>}
import doobie.{ConnectionIO, WeakAsync}
import io.circe.JsonObject
import io.circe.syntax.EncoderOps
import io.narrative.connectors.api.connections.ConnectionsApi
Expand All @@ -31,6 +31,9 @@ class CommandProcessor(
)(implicit loggerFactory: LoggerFactory[ConnectionIO])
extends CommandProcessor.Ops[ConnectionIO] {
private val logger: SelfAwareStructuredLogger[ConnectionIO] = loggerFactory.getLogger
// Resolves the ambiguous implicit error for Monad[ConnectionIO].
// Both WeakAsync[ConnectionIO] and catsFreeMonadForFree provide a Monad instance.
private implicit val monadConnectionIO: Monad[ConnectionIO] = WeakAsync[ConnectionIO]

import CommandProcessor._

Expand All @@ -54,41 +57,48 @@ class CommandProcessor(
private def processConnectionCreated(
event: ConnectionCreatedCPEvent,
toConnectionIO: IO ~> ConnectionIO
): ConnectionIO[CommandProcessor.Result] = for {
connection <- toConnectionIO(connectionsApi.connection(event.connectionCreated.connectionId))
quickSettings <- toConnectionIO(extractQuickSettings(connection.quickSettings))
settings <- toConnectionIO(
getSettings(
Profile.Id.apply(connection.profileId),
ConnectionId.apply(event.connectionCreated.connectionId),
quickSettings
)
)
deliverHistoricalData = quickSettings.flatMap(_.historicalDataEnabled).getOrElse(false)
datasetId = connection.datasetId
resp <-
if (deliverHistoricalData)
): ConnectionIO[CommandProcessor.Result] =
OptionT(toConnectionIO(connectionsApi.connection(event.connectionCreated.connectionId)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumping the connector framework forced me to update the connector to handle archived connections, that's what all the code changes relate to.

.semiflatMap(connection =>
for {
_ <- logger.info(
show"historical data delivery enabled: enqueuing delivery of all files for dataset_id=${datasetId}"
)
searchPeriod = DatasetFilesAPI.RightOpenPeriod(until = event.metadata.timestamp)
apiFiles <- toConnectionIO(datasetFilesApi.datasetFilesForPeriod(datasetId, searchPeriod))
files = apiFiles.map(apiFile => FileToProcess(name = apiFile.file, snapshotId = apiFile.snapshotId))
resp <- enqueueCommandAndJobs(
data = event.connectionCreated,
files = files,
metadata = event.metadata,
settingsId = settings.id
quickSettings <- toConnectionIO(extractQuickSettings(connection.quickSettings))
settings <- toConnectionIO(
getSettings(
Profile.Id.apply(connection.profileId),
ConnectionId.apply(event.connectionCreated.connectionId),
quickSettings
)
)
deliverHistoricalData = quickSettings.flatMap(_.historicalDataEnabled).getOrElse(false)
datasetId = connection.datasetId
resp <-
if (deliverHistoricalData)
for {
_ <- logger.info(
show"historical data delivery enabled: enqueuing delivery of all files for dataset_id=${datasetId}"
)
searchPeriod = DatasetFilesAPI.RightOpenPeriod(until = event.metadata.timestamp)
apiFiles <- toConnectionIO(datasetFilesApi.datasetFilesForPeriod(datasetId, searchPeriod))
files = apiFiles.map(apiFile => FileToProcess(name = apiFile.file, snapshotId = apiFile.snapshotId))
resp <- enqueueCommandAndJobs(
data = event.connectionCreated,
files = files,
metadata = event.metadata,
settingsId = settings.id
)
} yield resp
else
for {
_ <- logger.info(
show"historical data delivery disabled: skipping enqueuing of existing files for dataset_id=${datasetId}"
)
} yield Result.empty
} yield resp
else
for {
_ <- logger.info(
show"historical data delivery disabled: skipping enqueuing of existing files for dataset_id=${datasetId}"
)
} yield Result.empty
} yield resp
)
.getOrElseF(
logger.info(show"could not find connection ${event.connectionCreated.connectionId}. skipping processing") *>
Result.empty.pure[ConnectionIO]
)

private def enqueueCommandAndJobs(
data: DeliveryEvent.Payload,
Expand Down Expand Up @@ -162,22 +172,28 @@ class CommandProcessor(
event: SnapshotAppendedCPEvent,
toConnectionIO: IO ~> ConnectionIO
): ConnectionIO[CommandProcessor.Result] =
for {
connection <- toConnectionIO(connectionsApi.connection(event.snapshotAppended.connectionId))
quickSettings <- toConnectionIO(extractQuickSettings(connection.quickSettings))
settings <- toConnectionIO(
getSettings(
Profile.Id.apply(connection.profileId),
ConnectionId.apply(event.snapshotAppended.connectionId),
quickSettings
)
OptionT(toConnectionIO(connectionsApi.connection(event.snapshotAppended.connectionId)))
.semiflatMap(connection =>
for {
quickSettings <- toConnectionIO(extractQuickSettings(connection.quickSettings))
settings <- toConnectionIO(
getSettings(
Profile.Id.apply(connection.profileId),
ConnectionId.apply(event.snapshotAppended.connectionId),
quickSettings
)
)
apiFiles <- toConnectionIO(filesApi.getFiles(event.metadata.revision.value))
files = apiFiles.map(apiFile =>
FileToProcess(name = apiFile.name, snapshotId = SnapshotId(event.snapshotAppended.snapshotId))
)
res <- enqueueCommandAndJobs(event.snapshotAppended, files, event.metadata, settings.id)
} yield res
)
apiFiles <- toConnectionIO(filesApi.getFiles(event.metadata.revision.value))
files = apiFiles.map(apiFile =>
FileToProcess(name = apiFile.name, snapshotId = SnapshotId(event.snapshotAppended.snapshotId))
.getOrElseF(
logger.info(show"could not find connection ${event.snapshotAppended.connectionId}. skipping processing") *>
Result.empty.pure[ConnectionIO]
)
res <- enqueueCommandAndJobs(event.snapshotAppended, files, event.metadata, settings.id)
} yield res

private def getSettings(profileId: Profile.Id, connectionId: ConnectionId, quickSettings: Option[QuickSettings]) =
settingsService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,26 @@ class DeliveryProcessor(
}

private def processDeliverable(connectionId: ConnectionId, settingsId: Settings.Id, job: Job): IO[Unit] =
for {
connection <- connectionsApi.connection(connectionId.value)
profileId = Profile.Id(connection.profileId)
settings <- OptionT(settingsStore.settings(settingsId))
.getOrRaise(new RuntimeException(s"could not find settings with id ${settingsId.show}"))
profile <- profile_!(profileId)
token <- encryption.decrypt(profile.token.encrypted)
_ <- IO.raiseWhen(job.snapshotId.isEmpty)(
new RuntimeException(s"Job($job) requires a snapshot id for snapshots/connection created events.")
OptionT(connectionsApi.connection(connectionId.value))
.semiflatMap { connection =>
val profileId = Profile.Id(connection.profileId)
for {
settings <- OptionT(settingsStore.settings(settingsId))
.getOrRaise(new RuntimeException(s"could not find settings with id ${settingsId.show}"))
profile <- profile_!(profileId)
token <- encryption.decrypt(profile.token.encrypted)
_ <- IO.raiseWhen(job.snapshotId.isEmpty)(
new RuntimeException(s"Job($job) requires a snapshot id for snapshots/connection created events.")
)
_ <- deliverFile(settings.audienceId, connection.datasetId, job, token)
_ <- markDelivered(job, job.file)
} yield ()
}
.getOrRaise(
new RuntimeException(
show"could not find connection with id $connectionId while processing job ${job.file}. failing job."
)
)
_ <- deliverFile(settings.audienceId, connection.datasetId, job, token)
_ <- markDelivered(job, job.file)
} yield ()

private def deliverFile(
audienceId: Audience.Id,
Expand Down
Loading