From fe332d314ba6f56781e527623890b5e8d8cb6190 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Babi=C4=87?= Date: Thu, 22 Aug 2024 12:07:54 -0700 Subject: [PATCH 1/5] feat: bump framework for connection api retries --- backend/project/LibraryDependencies.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/project/LibraryDependencies.scala b/backend/project/LibraryDependencies.scala index c6d775b..f795f2a 100644 --- a/backend/project/LibraryDependencies.scala +++ b/backend/project/LibraryDependencies.scala @@ -2,7 +2,7 @@ import sbt._ object LibraryDependencies { object NarrativeBackend { - val version = "20.11.5" + val version = "21.1.0" val `narrative-common-ssm` = "io.narrative" %% "common-ssm" % version val `narrative-common-catsretry` = "io.narrative" %% "common-catsretry" % version val `narrative-common-doobie-testkit` = "io.narrative" %% "common-doobie-testkit" % version @@ -15,14 +15,14 @@ object LibraryDependencies { } object NarrativeConnectorFramework { - val version = "0.2.2" + val version = "0.2.4" val `connector-framework-core` = "io.narrative" %% "connector-framework-core" % version } object Aws { - val version = "1.12.170" + val version = "1.12.656" val `aws-java-sdk-core` = "com.amazonaws" % "aws-java-sdk-core" % version val `aws-java-sdk-kms` = "com.amazonaws" % "aws-java-sdk-kms" % version val `aws-java-sdk-ssm` = "com.amazonaws" % "aws-java-sdk-ssm" % version @@ -83,7 +83,7 @@ object LibraryDependencies { } object Jackson { - val version = "2.13.3" + val version = "2.15.2" def overrides = Seq( "com.fasterxml.jackson.core" % "jackson-databind" % version, "com.fasterxml.jackson.core" % "jackson-annotations" % version, From ea7d6035e9cedc31117cde697a57845083533917 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Babi=C4=87?= Date: Fri, 23 Aug 2024 11:18:24 -0700 Subject: [PATCH 2/5] chore: bump backend to 21.1.1 --- backend/project/LibraryDependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/project/LibraryDependencies.scala b/backend/project/LibraryDependencies.scala index f795f2a..b94f762 100644 --- a/backend/project/LibraryDependencies.scala +++ b/backend/project/LibraryDependencies.scala @@ -2,7 +2,7 @@ import sbt._ object LibraryDependencies { object NarrativeBackend { - val version = "21.1.0" + val version = "21.1.1" val `narrative-common-ssm` = "io.narrative" %% "common-ssm" % version val `narrative-common-catsretry` = "io.narrative" %% "common-catsretry" % version val `narrative-common-doobie-testkit` = "io.narrative" %% "common-doobie-testkit" % version From b8ab08fb05d61abfd8f5aac02fe9c797a899b53b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Babi=C4=87?= Date: Fri, 23 Aug 2024 11:52:26 -0700 Subject: [PATCH 3/5] chore: reduce log noise in store tests --- backend/stores/src/test/resources/logback-test.xml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 backend/stores/src/test/resources/logback-test.xml diff --git a/backend/stores/src/test/resources/logback-test.xml b/backend/stores/src/test/resources/logback-test.xml new file mode 100644 index 0000000..0a679fb --- /dev/null +++ b/backend/stores/src/test/resources/logback-test.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n%ex{short} + + + + + + + \ No newline at end of file From 5285349675d2219212e00bcc6f8d4745718ee04c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Babi=C4=87?= Date: Fri, 23 Aug 2024 12:04:20 -0700 Subject: [PATCH 4/5] chore: fix compilation errors --- .../facebook/CommandProcessor.scala | 114 ++++++++++-------- .../facebook/DeliveryProcessor.scala | 31 +++-- 2 files changed, 84 insertions(+), 61 deletions(-) diff --git a/backend/worker/src/main/scala/io/narrative/connectors/facebook/CommandProcessor.scala b/backend/worker/src/main/scala/io/narrative/connectors/facebook/CommandProcessor.scala index 9b09439..97612a0 100644 --- a/backend/worker/src/main/scala/io/narrative/connectors/facebook/CommandProcessor.scala +++ b/backend/worker/src/main/scala/io/narrative/connectors/facebook/CommandProcessor.scala @@ -1,10 +1,10 @@ package io.narrative.connectors.facebook -import cats.data.EitherT +import cats.data.{EitherT, OptionT} import cats.effect.IO import cats.implicits._ -import cats.~> -import doobie.ConnectionIO +import cats.{Monad, ~>} +import doobie.{ConnectionIO, WeakAsync} import io.circe.JsonObject import io.circe.syntax.EncoderOps import io.narrative.connectors.api.connections.ConnectionsApi @@ -31,6 +31,9 @@ class CommandProcessor( )(implicit loggerFactory: LoggerFactory[ConnectionIO]) extends CommandProcessor.Ops[ConnectionIO] { private val logger: SelfAwareStructuredLogger[ConnectionIO] = loggerFactory.getLogger + // Resolves the ambiguous implicit error for Monad[ConnectionIO]. + // Both WeakAsync[ConnectionIO] and catsFreeMonadForFree provide a Monad instance. + private implicit val monadConnectionIO: Monad[ConnectionIO] = WeakAsync[ConnectionIO] import CommandProcessor._ @@ -54,41 +57,48 @@ class CommandProcessor( private def processConnectionCreated( event: ConnectionCreatedCPEvent, toConnectionIO: IO ~> ConnectionIO - ): ConnectionIO[CommandProcessor.Result] = for { - connection <- toConnectionIO(connectionsApi.connection(event.connectionCreated.connectionId)) - quickSettings <- toConnectionIO(extractQuickSettings(connection.quickSettings)) - settings <- toConnectionIO( - getSettings( - Profile.Id.apply(connection.profileId), - ConnectionId.apply(event.connectionCreated.connectionId), - quickSettings - ) - ) - deliverHistoricalData = quickSettings.flatMap(_.historicalDataEnabled).getOrElse(false) - datasetId = connection.datasetId - resp <- - if (deliverHistoricalData) + ): ConnectionIO[CommandProcessor.Result] = + OptionT(toConnectionIO(connectionsApi.connection(event.connectionCreated.connectionId))) + .semiflatMap(connection => for { - _ <- logger.info( - show"historical data delivery enabled: enqueuing delivery of all files for dataset_id=${datasetId}" - ) - searchPeriod = DatasetFilesAPI.RightOpenPeriod(until = event.metadata.timestamp) - apiFiles <- toConnectionIO(datasetFilesApi.datasetFilesForPeriod(datasetId, searchPeriod)) - files = apiFiles.map(apiFile => FileToProcess(name = apiFile.file, snapshotId = apiFile.snapshotId)) - resp <- enqueueCommandAndJobs( - data = event.connectionCreated, - files = files, - metadata = event.metadata, - settingsId = settings.id + quickSettings <- toConnectionIO(extractQuickSettings(connection.quickSettings)) + settings <- toConnectionIO( + getSettings( + Profile.Id.apply(connection.profileId), + ConnectionId.apply(event.connectionCreated.connectionId), + quickSettings + ) ) + deliverHistoricalData = quickSettings.flatMap(_.historicalDataEnabled).getOrElse(false) + datasetId = connection.datasetId + resp <- + if (deliverHistoricalData) + for { + _ <- logger.info( + show"historical data delivery enabled: enqueuing delivery of all files for dataset_id=${datasetId}" + ) + searchPeriod = DatasetFilesAPI.RightOpenPeriod(until = event.metadata.timestamp) + apiFiles <- toConnectionIO(datasetFilesApi.datasetFilesForPeriod(datasetId, searchPeriod)) + files = apiFiles.map(apiFile => FileToProcess(name = apiFile.file, snapshotId = apiFile.snapshotId)) + resp <- enqueueCommandAndJobs( + data = event.connectionCreated, + files = files, + metadata = event.metadata, + settingsId = settings.id + ) + } yield resp + else + for { + _ <- logger.info( + show"historical data delivery disabled: skipping enqueuing of existing files for dataset_id=${datasetId}" + ) + } yield Result.empty } yield resp - else - for { - _ <- logger.info( - show"historical data delivery disabled: skipping enqueuing of existing files for dataset_id=${datasetId}" - ) - } yield Result.empty - } yield resp + ) + .getOrElseF( + logger.info(show"could not find connection ${event.connectionCreated.connectionId}. skipping processing") *> + Result.empty.pure[ConnectionIO] + ) private def enqueueCommandAndJobs( data: DeliveryEvent.Payload, @@ -162,22 +172,28 @@ class CommandProcessor( event: SnapshotAppendedCPEvent, toConnectionIO: IO ~> ConnectionIO ): ConnectionIO[CommandProcessor.Result] = - for { - connection <- toConnectionIO(connectionsApi.connection(event.snapshotAppended.connectionId)) - quickSettings <- toConnectionIO(extractQuickSettings(connection.quickSettings)) - settings <- toConnectionIO( - getSettings( - Profile.Id.apply(connection.profileId), - ConnectionId.apply(event.snapshotAppended.connectionId), - quickSettings - ) + OptionT(toConnectionIO(connectionsApi.connection(event.snapshotAppended.connectionId))) + .semiflatMap(connection => + for { + quickSettings <- toConnectionIO(extractQuickSettings(connection.quickSettings)) + settings <- toConnectionIO( + getSettings( + Profile.Id.apply(connection.profileId), + ConnectionId.apply(event.snapshotAppended.connectionId), + quickSettings + ) + ) + apiFiles <- toConnectionIO(filesApi.getFiles(event.metadata.revision.value)) + files = apiFiles.map(apiFile => + FileToProcess(name = apiFile.name, snapshotId = SnapshotId(event.snapshotAppended.snapshotId)) + ) + res <- enqueueCommandAndJobs(event.snapshotAppended, files, event.metadata, settings.id) + } yield res ) - apiFiles <- toConnectionIO(filesApi.getFiles(event.metadata.revision.value)) - files = apiFiles.map(apiFile => - FileToProcess(name = apiFile.name, snapshotId = SnapshotId(event.snapshotAppended.snapshotId)) + .getOrElseF( + logger.info(show"could not find connection ${event.snapshotAppended.connectionId}. skipping processing") *> + Result.empty.pure[ConnectionIO] ) - res <- enqueueCommandAndJobs(event.snapshotAppended, files, event.metadata, settings.id) - } yield res private def getSettings(profileId: Profile.Id, connectionId: ConnectionId, quickSettings: Option[QuickSettings]) = settingsService diff --git a/backend/worker/src/main/scala/io/narrative/connectors/facebook/DeliveryProcessor.scala b/backend/worker/src/main/scala/io/narrative/connectors/facebook/DeliveryProcessor.scala index 8f1c8a5..39fc2f5 100644 --- a/backend/worker/src/main/scala/io/narrative/connectors/facebook/DeliveryProcessor.scala +++ b/backend/worker/src/main/scala/io/narrative/connectors/facebook/DeliveryProcessor.scala @@ -57,19 +57,26 @@ class DeliveryProcessor( } private def processDeliverable(connectionId: ConnectionId, settingsId: Settings.Id, job: Job): IO[Unit] = - for { - connection <- connectionsApi.connection(connectionId.value) - profileId = Profile.Id(connection.profileId) - settings <- OptionT(settingsStore.settings(settingsId)) - .getOrRaise(new RuntimeException(s"could not find settings with id ${settingsId.show}")) - profile <- profile_!(profileId) - token <- encryption.decrypt(profile.token.encrypted) - _ <- IO.raiseWhen(job.snapshotId.isEmpty)( - new RuntimeException(s"Job($job) requires a snapshot id for snapshots/connection created events.") + OptionT(connectionsApi.connection(connectionId.value)) + .semiflatMap { connection => + val profileId = Profile.Id(connection.profileId) + for { + settings <- OptionT(settingsStore.settings(settingsId)) + .getOrRaise(new RuntimeException(s"could not find settings with id ${settingsId.show}")) + profile <- profile_!(profileId) + token <- encryption.decrypt(profile.token.encrypted) + _ <- IO.raiseWhen(job.snapshotId.isEmpty)( + new RuntimeException(s"Job($job) requires a snapshot id for snapshots/connection created events.") + ) + _ <- deliverFile(settings.audienceId, connection.datasetId, job, token) + _ <- markDelivered(job, job.file) + } yield () + } + .getOrRaise( + new RuntimeException( + show"could not find connection with id $connectionId while processing job ${job.file}. failing job." + ) ) - _ <- deliverFile(settings.audienceId, connection.datasetId, job, token) - _ <- markDelivered(job, job.file) - } yield () private def deliverFile( audienceId: Audience.Id, From 28819963ebbd49652d60920c875e0bd49eccbcd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Babi=C4=87?= Date: Mon, 26 Aug 2024 14:15:46 -0700 Subject: [PATCH 5/5] chore: bump backend --- backend/project/LibraryDependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/project/LibraryDependencies.scala b/backend/project/LibraryDependencies.scala index b94f762..0b80a88 100644 --- a/backend/project/LibraryDependencies.scala +++ b/backend/project/LibraryDependencies.scala @@ -2,7 +2,7 @@ import sbt._ object LibraryDependencies { object NarrativeBackend { - val version = "21.1.1" + val version = "21.1.2" val `narrative-common-ssm` = "io.narrative" %% "common-ssm" % version val `narrative-common-catsretry` = "io.narrative" %% "common-catsretry" % version val `narrative-common-doobie-testkit` = "io.narrative" %% "common-doobie-testkit" % version