-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: project viewed events to be tackled in chunks #1729
Conversation
…ng dequeued events
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!! 🚀
|
||
private def returnToQueue(de: DequeuedEvent): Throwable => F[Unit] = | ||
Logger[F].error(_)(show"$categoryName: persisting event failed: ${de.id}; returning to the queue") >> | ||
eventsDequeuer.returnToQueue(de) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds a bit strange to me to have separate interfaces for deqeue and enqueue but then the dequeue also puts something into the queue?
import io.renku.triplesstore.{ProjectsConnectionConfig, SparqlQueryTimeRecorder} | ||
import org.typelevel.log4cats.Logger | ||
|
||
private trait EventProcessor[F[_]] extends Pipe[F, DequeuedEvent, DequeuedEvent] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why we need to return an event - so why it's not Pipe[F, Event, Unit]
, more of a sink?
SqlStatement | ||
.named(s"$queryPrefix stats") | ||
.select[Void, (CategoryName, Long)]( | ||
sql"""SELECT category, COUNT(id) AS count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have now the stuff in the DbInfra
object, we could use it here as well maybe?
SqlStatement | ||
.named(s"$queryPrefix insert") | ||
.command[CategoryName *: String *: OffsetDateTime *: OffsetDateTime *: EnqueueStatus *: EmptyTuple]( | ||
sql"""INSERT INTO enqueued_event (category, payload, created, updated, status) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here it seems we could use references to DbInfra
perhaps?
private[viewed] def kickOffEventsDequeueing[F[_]: Async: Logger](dequeuer: EventsDequeuer[F], | ||
processor: EventProcessor[F] | ||
) = Async[F].start { | ||
Logger[F].info(show"Starting events enqueuer for $categoryName") >> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a typo: "Starting events enqueuer" -> "Starting events dequeuer" ?
This PR adds a new queue-like mechanism to deal with the ProjectViewed events in chunks instead of separately.
/deploy
closes #1649