Skip to content

Commit

Permalink
refactor logging to closer match other connectors.
Browse files Browse the repository at this point in the history
  • Loading branch information
jdneumeyer77 committed Jul 8, 2024
1 parent 1a47cd1 commit eef39ca
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import cats.data.EitherT
import cats.effect.IO
import cats.implicits._
import cats.~>
import com.typesafe.scalalogging.LazyLogging
import doobie.ConnectionIO
import io.circe.JsonObject
import io.circe.syntax.EncoderOps
Expand All @@ -17,6 +16,7 @@ import io.narrative.connectors.queue.QueueStore
import io.narrative.connectors.api.datasets.DatasetFilesAPI
import io.narrative.connectors.model.SnapshotId
import io.narrative.connectors.facebook.domain.Profile.QuickSettings
import org.typelevel.log4cats.{LoggerFactory, SelfAwareStructuredLogger}

/** Resolves commands saved by the [[CommandConsumer]]. Outputs a resolved, stored command and jobs for processing each
* file in the delivery command.
Expand All @@ -28,8 +28,9 @@ class CommandProcessor(
connectionsApi: ConnectionsApi.Ops[IO],
filesApi: BackwardsCompatibleFilesApi.Ops[IO],
datasetFilesApi: DatasetFilesAPI.Ops[IO]
) extends CommandProcessor.Ops[ConnectionIO]
with LazyLogging {
)(implicit loggerFactory: LoggerFactory[ConnectionIO])
extends CommandProcessor.Ops[ConnectionIO] {
private val logger: SelfAwareStructuredLogger[ConnectionIO] = loggerFactory.getLogger

import CommandProcessor._

Expand All @@ -38,7 +39,7 @@ class CommandProcessor(
toConnectionIO: IO ~> ConnectionIO
): ConnectionIO[CommandProcessor.Result] =
for {
_ <- logInfo(s"processing ${event}")
_ <- logger.info(s"processing ${event}")
result <- event match {
case event: ConnectionCreatedCPEvent => processConnectionCreated(event, toConnectionIO)
case event: SnapshotAppendedCPEvent => processSnapshotAppended(event, toConnectionIO)
Expand All @@ -50,8 +51,6 @@ class CommandProcessor(
EitherT.fromEither[IO](jsonObject.asJson.as[Profile.QuickSettings]).rethrowT
}.sequence

private def logInfo(str: String) = doobie.free.connection.delay(logger.info(str))

private def processConnectionCreated(
event: ConnectionCreatedCPEvent,
toConnectionIO: IO ~> ConnectionIO
Expand All @@ -70,7 +69,7 @@ class CommandProcessor(
resp <-
if (deliverHistoricalData)
for {
_ <- logInfo(
_ <- logger.info(
show"historical data delivery enabled: enqueuing delivery of all files for dataset_id=${datasetId}"
)
searchPeriod = DatasetFilesAPI.RightOpenPeriod(until = event.metadata.timestamp)
Expand All @@ -85,7 +84,7 @@ class CommandProcessor(
} yield resp
else
for {
_ <- logInfo(
_ <- logger.info(
show"historical data delivery disabled: skipping enqueuing of existing files for dataset_id=${datasetId}"
)
} yield Result.empty
Expand Down Expand Up @@ -114,7 +113,7 @@ class CommandProcessor(
settingsId = settingsId
)
)
_ <- logInfo(show"generated command. revision=${command.eventRevision.show}, payload=${command.payload.show}")
_ <- logger.info(show"generated command. revision=${command.eventRevision.show}, payload=${command.payload.show}")
} yield Result(command = command, jobs = newJobs)
}

Expand Down Expand Up @@ -156,7 +155,7 @@ class CommandProcessor(
status = Command.Status.ProcessingFiles(files.map(file => FileName(file.name)))
)
)
_ <- logInfo(s"generated command. revision=${command.eventRevision.show}, payload=${command.payload.show}")
_ <- logger.info(s"generated command. revision=${command.eventRevision.show}, payload=${command.payload.show}")
} yield Result(command = command, jobs = newJobs)

private def processSnapshotAppended(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.narrative.connectors.facebook
import cats.data.OptionT
import cats.effect.IO
import cats.syntax.show._
import com.typesafe.scalalogging.LazyLogging
import fs2.io.file.Flags
import io.circe.parser.parse
import io.narrative.connectors.api.connections.ConnectionsApi
Expand All @@ -26,6 +25,7 @@ import io.narrative.connectors.facebook.stores.CommandStore.StatusUpdate.FileUpd
import io.narrative.connectors.facebook.stores.{CommandStore, ProfileStore, SettingsStore}
import io.narrative.connectors.spark.ParquetTransformer
import io.narrative.connectors.facebook.DeliveryProcessor.ConnectionCreatedEntry
import org.typelevel.log4cats.{LoggerFactory, SelfAwareStructuredLogger}

// TODO: handle connection creation events?
class DeliveryProcessor(
Expand All @@ -37,8 +37,10 @@ class DeliveryProcessor(
fb: FacebookClient.Ops[IO],
profileStore: ProfileStore.Ops[IO],
parquetTransformer: ParquetTransformer
) extends DeliveryProcessor.Ops[IO]
with LazyLogging {
)(implicit loggerFactory: LoggerFactory[IO])
extends DeliveryProcessor.Ops[IO] {

private val logger: SelfAwareStructuredLogger[IO] = loggerFactory.getLogger

override def processIfDeliverable(job: Job): IO[Unit] = {
val deliverIO = for {
Expand Down Expand Up @@ -116,13 +118,13 @@ class DeliveryProcessor(
.getOrRaise(new RuntimeException(s"profile ${profileId.show} does not exist."))

private def markDelivered(job: Job, fileName: FileName): IO[Unit] = for {
_ <- IO(logger.info(s"delivery success. $job"))
_ <- logger.info(s"delivery success. $job")
_ <- commandStore.updateStatus(job.eventRevision, FileUpdate(fileName, FileStatus.Delivered))
} yield ()

private def markFailure(job: Job, fileName: FileName, err: Throwable): IO[Unit] =
for {
_ <- IO(logger.error(s"delivery failed. $job", err))
_ <- logger.error(err)(s"delivery failed. $job")
_ <- commandStore.updateStatus(job.eventRevision, FileUpdate(fileName, FileStatus.Failed))
} yield ()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import io.narrative.connectors.facebook.CommandProcessor.ConnectionCreatedCPEven

object Main extends IOApp.Simple with LazyLogging {
private implicit val logging: LoggerFactory[IO] = Slf4jFactory.create[IO]
private implicit val loggerFactoryConnectionIO: LoggerFactory[ConnectionIO] = Slf4jFactory.create[ConnectionIO]

val parallelizationFactor = 1
override def run: IO[Unit] = {
val worker = for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ object Resources extends LazyLogging {

def apply(config: Config, parallelizationFactor: Int)(implicit
timer: Temporal[IO],
logging: LoggerFactory[IO]
logging: LoggerFactory[IO],
loggingConnectionIO: LoggerFactory[ConnectionIO]
): Resource[IO, Resources] =
for {
awsCredentials <- Resource.eval(IO.blocking(new DefaultAWSCredentialsProviderChain()))
Expand Down

0 comments on commit eef39ca

Please sign in to comment.