Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pekko: update client helper to support flow API #1677

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ private[stream] abstract class EvaluatorImpl(
private def newStreamContext(dsLogger: DataSourceLogger = (_, _) => ()): StreamContext = {
new StreamContext(
config,
Http().superPool(),
materializer,
registry,
dsLogger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import com.netflix.atlas.eval.stream.Evaluator.DataSources
import com.netflix.atlas.json.JsonSupport
import com.netflix.atlas.pekko.AccessLogger
import com.netflix.atlas.pekko.DiagnosticMessage
import com.netflix.atlas.pekko.PekkoHttpClient
import com.netflix.atlas.pekko.StreamOps
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.api.Registry
Expand All @@ -50,7 +51,6 @@ import scala.util.Try

private[stream] class StreamContext(
rootConfig: Config,
val client: Client,
val materializer: Materializer,
val registry: Registry = new NoopRegistry,
val dsLogger: DataSourceLogger = (_, _) => ()
Expand Down Expand Up @@ -274,14 +274,7 @@ private[stream] class StreamContext(
* Returns a simple http client flow that will log the request using the provide name.
*/
def httpClient(name: String): SimpleClient = {
Flow[HttpRequest]
.map(r => r -> AccessLogger.newClientLogger(name, r))
.via(client)
.map {
case (response, log) =>
log.complete(response)
response
}
PekkoHttpClient.create(name, materializer.system).simpleFlow()
}

def monitorFlow[T](phase: String): Flow[T, T, NotUsed] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import scala.util.Try

package object stream {

type Client = Flow[(HttpRequest, AccessLogger), (Try[HttpResponse], AccessLogger), NotUsed]
type SimpleClient = Flow[HttpRequest, Try[HttpResponse], NotUsed]

type SourcesAndGroups = (DataSources, EddaSource.Groups)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ package com.netflix.atlas.eval.stream

import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model.HttpRequest
import org.apache.pekko.http.scaladsl.model.HttpResponse
import org.apache.pekko.http.scaladsl.model.StatusCodes
import org.apache.pekko.stream.scaladsl.Flow
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.scaladsl.Source
import com.netflix.atlas.json.Json
import com.netflix.atlas.pekko.AccessLogger
import munit.FunSuite
import org.apache.pekko.stream.Materializer

Expand All @@ -34,7 +32,6 @@ import scala.util.Success

class EddaGroupsLookupSuite extends FunSuite {

import EddaSource.*
import Evaluator.*

private implicit val system: ActorSystem = ActorSystem(getClass.getSimpleName)
Expand Down Expand Up @@ -63,13 +60,9 @@ class EddaGroupsLookupSuite extends FunSuite {
}

private def lookupFlow: Flow[DataSources, Source[SourcesAndGroups, NotUsed], NotUsed] = {
val client = Flow[(HttpRequest, AccessLogger)]
.map {
case (_, v) =>
val json = Json.encode(List(eddaGroup))
Success(HttpResponse(StatusCodes.OK, entity = json)) -> v
}
val context = TestContext.createContext(mat, client)
val json = Json.encode(List(eddaGroup))
val response = Success(HttpResponse(StatusCodes.OK, entity = json))
val context = TestContext.createContext(mat, response)
Flow[DataSources].via(new EddaGroupsLookup(context, 5.microseconds))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,19 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model.HttpEntity
import org.apache.pekko.http.scaladsl.model.HttpRequest
import org.apache.pekko.http.scaladsl.model.HttpResponse
import org.apache.pekko.http.scaladsl.model.MediaTypes
import org.apache.pekko.http.scaladsl.model.StatusCode
import org.apache.pekko.http.scaladsl.model.StatusCodes
import org.apache.pekko.http.scaladsl.model.headers.*
import org.apache.pekko.stream.ConnectionException
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.Flow
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.scaladsl.Source
import org.apache.pekko.util.ByteString
import com.fasterxml.jackson.databind.exc.ValueInstantiationException
import com.netflix.atlas.core.util.Streams
import com.netflix.atlas.eval.stream.EddaSource.GroupResponse
import com.netflix.atlas.pekko.AccessLogger
import munit.FunSuite

import scala.concurrent.Await
Expand Down Expand Up @@ -131,10 +128,7 @@ class EddaSourceSuite extends FunSuite {
|]""".stripMargin

private def run(uri: String, response: Try[HttpResponse]): GroupResponse = {
val client = Flow[(HttpRequest, AccessLogger)].map {
case (_, logger) => response -> logger
}
val context = TestContext.createContext(mat, client)
val context = TestContext.createContext(mat, response)
val future = EddaSource(uri, context).runWith(Sink.head)
Await.result(future, Duration.Inf)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class LwcToAggrDatapointSuite extends FunSuite {

private val context = new StreamContext(
ConfigFactory.load(),
null,
materializer,
dsLogger = (_, _) => ()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
*/
package com.netflix.atlas.eval.stream

import com.netflix.atlas.pekko.AccessLogger
import org.apache.pekko.http.scaladsl.model.HttpRequest
import com.netflix.atlas.pekko.PekkoHttpClient
import org.apache.pekko.http.scaladsl.model.HttpResponse
import org.apache.pekko.http.scaladsl.model.StatusCodes
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.Flow
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.api.Registry
import com.typesafe.config.ConfigFactory
import org.apache.pekko.http.scaladsl.model.StatusCodes

import scala.util.Failure
import scala.util.Success
import scala.util.Try

Expand Down Expand Up @@ -98,17 +97,14 @@ object TestContext {

def createContext(
mat: Materializer,
client: Client = defaultClient,
response: Try[HttpResponse] = Success(HttpResponse(StatusCodes.OK)),
registry: Registry = new NoopRegistry
): StreamContext = {
new StreamContext(config, client, mat, registry)
}

def defaultClient: Client = client(HttpResponse(StatusCodes.OK))

def client(response: HttpResponse): Client = client(Success(response))
new StreamContext(config, mat, registry) {

def client(result: Try[HttpResponse]): Client = {
Flow[(HttpRequest, AccessLogger)].map { case (_, v) => result -> v }
override def httpClient(name: String): SimpleClient = {
PekkoHttpClient.create(response).simpleFlow()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ import scala.util.Try
*/
class AccessLogger private (entry: IpcLogEntry, client: Boolean) {

private var attempt: Int = 1
private var maxAttempts: Int = 1

def withMaxAttempts(max: Int): AccessLogger = {
maxAttempts = max
this
}

/** Complete the log entry and write out the result. */
def complete(request: HttpRequest, result: Try[HttpResponse]): Unit = {
AccessLogger.addRequestInfo(entry, request)
Expand All @@ -56,7 +64,13 @@ class AccessLogger private (entry: IpcLogEntry, client: Boolean) {
entry.withException(t)
failure = true
}
entry.markEnd().withLogLevel(if (failure) Level.WARN else Level.DEBUG).log()
entry
.markEnd()
.withLogLevel(if (failure) Level.WARN else Level.DEBUG)
.withAttempt(attempt)
.withAttemptFinal(attempt == maxAttempts)
.log()
attempt += 1
}
}

Expand Down
Loading
Loading