diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala new file mode 100644 index 000000000..5bd74bb18 --- /dev/null +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala @@ -0,0 +1,237 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.eval.stream + +import com.netflix.atlas.eval.stream.Evaluator.DataSource +import com.netflix.atlas.eval.stream.Evaluator.DataSources +import com.netflix.atlas.eval.stream.HostSource.unzipIfNeeded +import com.netflix.atlas.json.Json +import com.netflix.atlas.pekko.DiagnosticMessage +import com.netflix.atlas.pekko.OpportunisticEC.ec +import com.netflix.atlas.pekko.PekkoHttpClient +import com.netflix.atlas.pekko.StreamOps +import com.netflix.spectator.api.Registry +import com.typesafe.config.Config +import com.typesafe.scalalogging.StrictLogging +import org.apache.pekko.NotUsed +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.model.ContentTypes +import org.apache.pekko.http.scaladsl.model.HttpEntity +import org.apache.pekko.http.scaladsl.model.HttpMethods +import org.apache.pekko.http.scaladsl.model.HttpRequest +import org.apache.pekko.http.scaladsl.model.StatusCodes +import org.apache.pekko.stream.scaladsl.BroadcastHub +import org.apache.pekko.stream.scaladsl.Flow +import org.apache.pekko.stream.scaladsl.Keep +import org.apache.pekko.stream.scaladsl.RetryFlow +import org.apache.pekko.stream.scaladsl.Source + +import java.io.ByteArrayOutputStream +import java.util.concurrent.ConcurrentHashMap +import scala.concurrent.duration.DurationInt +import scala.jdk.CollectionConverters.CollectionHasAsScala +import scala.util.Failure +import scala.util.Success +import scala.util.Using + +class DataSourceRewriter( + config: Config, + registry: Registry, + implicit val system: ActorSystem +) extends StrictLogging { + + private val (enabled, rewriteUrl) = { + val enabled = config.hasPath("atlas.eval.stream.rewrite-url") + val url = if (enabled) config.getString("atlas.eval.stream.rewrite-url") else "" + if (enabled) { + logger.info(s"Rewriting enabled with url: ${url}") + } else { + logger.info("Rewriting is disabled") + } + (enabled, url) + } + + private val client = PekkoHttpClient + .create("datasource-rewrite", system) + .superPool[List[DataSource]]() + + private val rewriteCache = new ConcurrentHashMap[String, String]() + + private val rewriteSuccess = registry.counter("atlas.eval.stream.rewrite.success") + private val rewriteFailures = registry.createId("atlas.eval.stream.rewrite.failures") + private val rewriteCacheHits = registry.counter("atlas.eval.stream.rewrite.cache", "id", "hits") + + private val rewriteCacheMisses = + registry.counter("atlas.eval.stream.rewrite.cache", "id", "misses") + + def rewrite( + context: StreamContext, + keepRetrying: Boolean = true + ): Flow[DataSources, DataSources, NotUsed] = { + rewrite(client, context, keepRetrying) + } + + def rewrite( + client: SuperPoolClient, + context: StreamContext, + keepRetrying: Boolean + ): Flow[DataSources, DataSources, NotUsed] = { + if (!enabled) { + return Flow[DataSources] + } + + val (cachedQueue, cachedSource) = StreamOps + .blockingQueue[DataSources](registry, "cachedRewrites", 1) + .toMat(BroadcastHub.sink(1))(Keep.both) + .run() + var sentCacheData = false + + val retryFlow = RetryFlow + .withBackoff( + minBackoff = 100.milliseconds, + maxBackoff = 5.second, + randomFactor = 0.35, + maxRetries = if (keepRetrying) -1 else 0, + flow = httpFlow(client, context) + ) { + case (original, resp) => + resp match { + case Success(_) => None + case Failure(ex) => + val (request, dsl) = original + logger.debug("Retrying the rewrite request due to error", ex) + if (!sentCacheData) { + if (!cachedQueue.offer(returnFromCache(dsl))) { + // note that this should never happen. + logger.error("Unable to send cached results to queue.") + } else { + sentCacheData = true + } + } + Some(request -> dsl) + } + + } + .watchTermination() { (_, f) => + f.onComplete { _ => + cachedQueue.complete() + } + } + + Flow[DataSources] + .map(_.sources().asScala.toList) + .map(dsl => constructRequest(dsl) -> dsl) + .via(retryFlow) + .filter(_.isSuccess) + .map { + // reset the cached flag + sentCacheData = false + _.get + } + .merge(cachedSource) + } + + private[stream] def httpFlow(client: SuperPoolClient, context: StreamContext) = { + Flow[(HttpRequest, List[DataSource])] + .via(client) + .flatMapConcat { + case (Success(resp), dsl) => + unzipIfNeeded(resp) + .map(_.utf8String) + .map { body => + resp.status match { + case StatusCodes.OK => + val rewrites = List.newBuilder[DataSource] + Json + .decode[List[Rewrite]](body) + .zip(dsl) + .map { + case (r, ds) => + if (!r.status.equals("OK")) { + val msg = + DiagnosticMessage.error(s"failed rewrite of ${ds.uri()}: ${r.message}") + context.dsLogger(ds, msg) + } else { + rewriteCache.put(ds.uri(), r.rewrite) + rewrites += new DataSource(ds.id, ds.step(), r.rewrite) + } + } + .toArray + // NOTE: We're assuming that the number of items returned will be the same as the + // number of uris sent to the rewrite service. If they differ, data sources may be + // mapped to IDs and steps incorrectly. + rewriteSuccess.increment() + Success(DataSources.of(rewrites.result().toArray: _*)) + case _ => + logger.error( + "Error from rewrite service. status={}, resp={}", + resp.status, + body + ) + registry + .counter( + rewriteFailures.withTags("status", resp.status.toString(), "exception", "NA") + ) + .increment() + Failure( + new RuntimeException( + s"Error from rewrite service. status=${resp.status}, resp=$body" + ) + ) + } + } + case (Failure(ex), _) => + logger.error("Failure from rewrite service", ex) + registry + .counter( + rewriteFailures.withTags("status", "0", "exception", ex.getClass.getSimpleName) + ) + .increment() + Source.single(Failure(ex)) + } + } + + private[stream] def returnFromCache(dsl: List[DataSource]): DataSources = { + val rewrites = dsl.flatMap { ds => + val rewrite = rewriteCache.get(ds.uri()) + if (rewrite == null) { + rewriteCacheMisses.increment() + None + } else { + rewriteCacheHits.increment() + Some(new DataSource(ds.id, ds.step(), rewrite)) + } + } + DataSources.of(rewrites: _*) + } + + private[stream] def constructRequest(dss: List[DataSource]): HttpRequest = { + val baos = new ByteArrayOutputStream + Using(Json.newJsonGenerator(baos)) { json => + json.writeStartArray() + dss.foreach(s => json.writeString(s.uri())) + json.writeEndArray() + } + HttpRequest( + uri = rewriteUrl, + method = HttpMethods.POST, + entity = HttpEntity(ContentTypes.`application/json`, baos.toByteArray) + ) + } + +} + +case class Rewrite(status: String, rewrite: String, original: String, message: String) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala index e184247b6..c9e8935d7 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala @@ -60,6 +60,7 @@ import com.netflix.atlas.eval.stream.Evaluator.MessageEnvelope import com.netflix.atlas.json.Json import com.netflix.atlas.json.JsonSupport import com.netflix.atlas.pekko.ClusterOps +import com.netflix.atlas.pekko.DiagnosticMessage import com.netflix.atlas.pekko.StreamOps import com.netflix.atlas.pekko.ThreadPools import com.netflix.spectator.api.Registry @@ -86,8 +87,11 @@ private[stream] abstract class EvaluatorImpl( private val logger = LoggerFactory.getLogger(getClass) + // Calls out to a rewrite service in case URIs need mutating to pick the proper backend. + private[stream] var dataSourceRewriter = new DataSourceRewriter(config, registry, system) + // Cached context instance used for things like expression validation. - private val validationStreamContext = newStreamContext() + private val validationStreamContext = newStreamContext(new ThrowingDSLogger) // Timeout for DataSources unique operator: emit repeating DataSources after timeout exceeds private val uniqueTimeout: Long = config.getDuration("atlas.eval.stream.unique-timeout").toMillis @@ -129,7 +133,13 @@ private[stream] abstract class EvaluatorImpl( } protected def validateImpl(ds: DataSource): Unit = { - validationStreamContext.validateDataSource(ds).get + val future = Source + .single(DataSources.of(ds)) + .via(dataSourceRewriter.rewrite(validationStreamContext, false)) + .map(_.sources().asScala.map(validationStreamContext.validateDataSource).map(_.get)) + .toMat(Sink.head)(Keep.right) + .run() + Await.result(future, 60.seconds) } protected def writeInputToFileImpl(uri: String, file: Path, duration: Duration): Unit = { @@ -212,6 +222,7 @@ private[stream] abstract class EvaluatorImpl( def createStreamsFlow: Flow[DataSources, MessageEnvelope, NotUsed] = { val (logSrc, context) = createStreamContextSource Flow[DataSources] + .via(dataSourceRewriter.rewrite(context)) .map(dss => groupByHost(dss)) // Emit empty DataSource if no more DataSource for a host, so that the sub-stream get the info .via(new FillRemovedKeysWith[String, DataSources](_ => DataSources.empty())) @@ -581,4 +592,16 @@ private[stream] abstract class EvaluatorImpl( private def isPrintable(c: Int): Boolean = { c >= 32 && c < 127 } + + private class ThrowingDSLogger extends DataSourceLogger { + + override def apply(ds: DataSource, msg: JsonSupport): Unit = { + msg match { + case dsg: DiagnosticMessage => + throw new IllegalArgumentException(dsg.message) + } + } + + override def close(): Unit = {} + } } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/HostSource.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/HostSource.scala index 357009424..0f44617dd 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/HostSource.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/HostSource.scala @@ -103,7 +103,7 @@ private[stream] object HostSource extends StrictLogging { Source.empty[ByteString] } - private def unzipIfNeeded(res: HttpResponse): Source[ByteString, Any] = { + def unzipIfNeeded(res: HttpResponse): Source[ByteString, Any] = { val isCompressed = res.headers.contains(`Content-Encoding`(HttpEncodings.gzip)) val dataBytes = res.entity.withoutSizeLimit().dataBytes if (isCompressed) dataBytes.via(Compression.gunzip()) else dataBytes diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/package.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/package.scala index bb68511fc..d065fd840 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/package.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/package.scala @@ -30,5 +30,8 @@ package object stream { type SimpleClient = Flow[HttpRequest, Try[HttpResponse], NotUsed] + type SuperPoolClient = + Flow[(HttpRequest, List[DataSource]), (Try[HttpResponse], List[DataSource]), NotUsed] + type SourcesAndGroups = (DataSources, EddaSource.Groups) } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala new file mode 100644 index 000000000..e9aafc407 --- /dev/null +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala @@ -0,0 +1,409 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.eval.stream + +import com.fasterxml.jackson.databind.JsonMappingException +import com.netflix.atlas.eval.stream.Evaluator.DataSource +import com.netflix.atlas.eval.stream.Evaluator.DataSources +import com.netflix.atlas.json.Json +import com.netflix.atlas.json.JsonSupport +import com.netflix.atlas.pekko.PekkoHttpClient +import com.netflix.spectator.api.NoopRegistry +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigValueFactory +import munit.FunSuite +import org.apache.pekko.NotUsed +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.model.ContentTypes +import org.apache.pekko.http.scaladsl.model.HttpEntity +import org.apache.pekko.http.scaladsl.model.HttpRequest +import org.apache.pekko.http.scaladsl.model.HttpResponse +import org.apache.pekko.http.scaladsl.model.StatusCode +import org.apache.pekko.http.scaladsl.model.StatusCodes +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.Flow +import org.apache.pekko.stream.scaladsl.Keep +import org.apache.pekko.stream.scaladsl.Sink +import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.testkit.TestKitBase + +import java.time.Duration +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +class DataSourceRewriterSuite extends FunSuite with TestKitBase { + + val dss = DataSources.of( + new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq"), + new DataSource( + "bar", + Duration.ofSeconds(60), + "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and" + ) + ) + + var config: Config = _ + var logger: MockLogger = _ + var ctx: StreamContext = null + + override implicit def system: ActorSystem = ActorSystem("Test") + + override def beforeEach(context: BeforeEach): Unit = { + config = ConfigFactory + .load() + .withValue( + "atlas.eval.stream.rewrite-url", + ConfigValueFactory.fromAnyRef("http://localhost/api/v1/rewrite") + ) + logger = new MockLogger() + ctx = new StreamContext(config, Materializer(system), dsLogger = logger) + } + + test("rewrite: Disabled") { + config = ConfigFactory.load() + val obtained = rewrite(dss, null) + assertEquals(obtained, dss) + } + + test("rewrite: OK") { + val client = mockClient( + StatusCode.int2StatusCode(200), + okRewrite() + ) + val expected = DataSources.of( + new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq"), + new DataSource( + "bar", + Duration.ofSeconds(60), + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq" + ) + ) + val obtained = rewrite(dss, client) + assertEquals(obtained, expected) + ctx.dsLogger.asInstanceOf[MockLogger].assertSize(0) + } + + test("rewrite: Bad URI in datasources") { + val client = mockClient( + StatusCode.int2StatusCode(200), + Map( + "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( + "OK", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "" + ), + "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and" -> Rewrite( + "NOT_FOUND", + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq", + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq", + "No namespace found for seg" + ) + ) + ) + val expected = DataSources.of( + new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq") + ) + val obtained = rewrite(dss, client) + assertEquals(obtained, expected) + ctx.dsLogger.asInstanceOf[MockLogger].assertSize(1) + } + + test("rewrite: Malformed response JSON") { + val client = mockClient( + StatusCode.int2StatusCode(200), + Map( + "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( + "OK", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "" + ), + "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and" -> Rewrite( + "OK", + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq", + "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and", + "" + ) + ), + malformed = true + ) + intercept[JsonMappingException] { + rewrite(dss, client) + } + } + + test("rewrite: 500") { + val client = mockClient( + StatusCode.int2StatusCode(500), + Map.empty + ) + val expected = DataSources.of() + val obtained = rewrite(dss, client) + assertEquals(obtained, expected) + ctx.dsLogger.asInstanceOf[MockLogger].assertSize(0) + } + + test("rewrite: Missing a rewrite") { + val client = mockClient( + StatusCode.int2StatusCode(200), + Map( + "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( + "OK", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "" + ) + ) + ) + val expected = DataSources.of( + new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq") + ) + val obtained = rewrite(dss, client) + assertEquals(obtained, expected) + ctx.dsLogger.asInstanceOf[MockLogger].assertSize(1) + } + + test("rewrite: source changes with good, bad, good") { + val client = new MockClient( + List( + StatusCode.int2StatusCode(200), + StatusCode.int2StatusCode(500), + StatusCode.int2StatusCode(200) + ), + List( + okRewrite(true), + Map.empty, + okRewrite() + ) + ).superPool[List[DataSource]]() + + val rewriter = new DataSourceRewriter(config, new NoopRegistry(), system) + val future = Source + .fromIterator(() => + List( + DataSources.of( + new DataSource( + "foo", + Duration.ofSeconds(60), + "http://localhost/api/v1/graph?q=name,foo,:eq" + ) + ), + dss, + dss + ).iterator + ) + .via(rewriter.rewrite(client, ctx, true)) + .grouped(3) + .toMat(Sink.head)(Keep.right) + .run() + val res = Await.result(future, 30.seconds) + assertEquals(res(0), expectedRewrite(true)) + assertEquals(res(1), expectedRewrite(true)) + assertEquals(res(2), expectedRewrite()) + } + + test("rewrite: retry initial flow with 500s") { + val client = new MockClient( + List( + StatusCode.int2StatusCode(500), + StatusCode.int2StatusCode(500), + StatusCode.int2StatusCode(200) + ), + List( + Map.empty, + Map.empty, + okRewrite() + ) + ).superPool[List[DataSource]]() + + val rewriter = new DataSourceRewriter(config, new NoopRegistry(), system) + val future = Source + .single(dss) + .via(rewriter.rewrite(client, ctx, true)) + .grouped(3) + .toMat(Sink.head)(Keep.right) + .run() + val res = Await.result(future, 30.seconds) + assertEquals(res(0), DataSources.of()) + assertEquals(res(1), expectedRewrite()) + } + + test("rewrite: retry initial flow with 500, exception, ok") { + val client = new MockClient( + List( + StatusCode.int2StatusCode(500), + StatusCodes.custom(0, "no conn", "no conn", false, true), + StatusCode.int2StatusCode(200) + ), + List( + Map.empty, + Map.empty, + okRewrite() + ) + ).superPool[List[DataSource]]() + + val rewriter = new DataSourceRewriter(config, new NoopRegistry(), system) + val future = Source + .single(dss) + .via(rewriter.rewrite(client, ctx, true)) + .grouped(3) + .toMat(Sink.head)(Keep.right) + .run() + val res = Await.result(future, 30.seconds) + assertEquals(res(0), DataSources.of()) + assertEquals(res(1), expectedRewrite()) + } + + def mockClient( + status: StatusCode, + response: Map[String, Rewrite], + returnEx: Option[Exception] = None, + malformed: Boolean = false + ): SuperPoolClient = { + returnEx + .map { ex => + PekkoHttpClient.create(Failure(ex)).superPool[List[DataSource]]() + } + .getOrElse { + new MockClient(List(status), List(response), malformed).superPool[List[DataSource]]() + } + } + + def rewrite(dss: DataSources, client: SuperPoolClient): DataSources = { + val rewriter = new DataSourceRewriter(config, new NoopRegistry(), system) + val future = Source + .single(dss) + .via(rewriter.rewrite(client, ctx, true)) + .toMat(Sink.head)(Keep.right) + .run() + Await.result(future, 30.seconds) + } + + def okRewrite(dropSecond: Boolean = false): Map[String, Rewrite] = { + val builder = Map.newBuilder[String, Rewrite] + builder += + "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( + "OK", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "" + ) + + if (!dropSecond) { + builder += "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and" -> Rewrite( + "OK", + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq", + "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and", + "" + ) + } + + builder.result() + } + + def expectedRewrite(dropSecond: Boolean = false): DataSources = { + val ds1 = + new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq") + val ds2 = new DataSource( + "bar", + Duration.ofSeconds(60), + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq" + ) + if (dropSecond) { + DataSources.of(ds1) + } else { + DataSources.of(ds1, ds2) + } + } + + class MockClient( + status: List[StatusCode], + response: List[Map[String, Rewrite]], + malformed: Boolean = false + ) extends PekkoHttpClient { + + var called = 0 + + override def singleRequest(request: HttpRequest): Future[HttpResponse] = ??? + + override def superPool[C]( + config: PekkoHttpClient.ClientConfig + ): Flow[(HttpRequest, C), (Try[HttpResponse], C), NotUsed] = { + Flow[(HttpRequest, C)] + .flatMapConcat { + case (req, context) => + req.entity.withoutSizeLimit().dataBytes.map { body => + val httpResp = status(called) match { + case status if status.intValue() == 0 => null + + case status if status.intValue() != 200 => + HttpResponse( + status, + entity = HttpEntity(ContentTypes.`application/json`, "{\"error\":\"whoops\"}") + ) + + case status => + val uris = Json.decode[List[String]](body.toArray) + val rewrites = uris.map { uri => + response(called).get(uri) match { + case Some(r) => r + case None => Rewrite("NOT_FOUND", uri, uri, "Empty") + } + } + + val json = + if (malformed) Json.encode(rewrites).substring(0, 25) else Json.encode(rewrites) + + HttpResponse( + status, + entity = HttpEntity(ContentTypes.`application/json`, json) + ) + } + + called += 1 + if (httpResp == null) { + Failure(new RuntimeException("no conn")) -> context + } else { + Success(httpResp) -> context + } + } + } + } + } + + class MockLogger extends DataSourceLogger { + + var tuples: List[(DataSource, JsonSupport)] = Nil + + override def apply(ds: DataSource, msg: JsonSupport): Unit = { + tuples = (ds, msg) :: tuples + } + + override def close(): Unit = { + tuples = Nil + } + + def assertSize(n: Int): Unit = { + assertEquals(tuples.size, n) + } + } +} diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala index c4ab5efc0..35004c643 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala @@ -21,11 +21,15 @@ import java.time.Duration import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.scaladsl.BroadcastHub import org.apache.pekko.stream.scaladsl.Flow import org.apache.pekko.stream.scaladsl.Keep import org.apache.pekko.stream.scaladsl.Sink import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.stream.scaladsl.StreamConverters import com.netflix.atlas.chart.util.SrcPath +import com.netflix.atlas.core.model.FilterExpr.Filter +import com.netflix.atlas.core.util.Streams import com.netflix.atlas.eval.model.ArrayData import com.netflix.atlas.eval.model.LwcDatapoint import com.netflix.atlas.eval.model.LwcDiagnosticMessage @@ -35,20 +39,29 @@ import com.netflix.atlas.eval.model.LwcHeartbeat import com.netflix.atlas.eval.model.LwcMessages import com.netflix.atlas.eval.model.LwcSubscription import com.netflix.atlas.eval.model.TimeSeriesMessage +import com.netflix.atlas.eval.stream.Evaluator.DataSources import com.netflix.atlas.eval.stream.Evaluator.MessageEnvelope import com.netflix.atlas.json.Json import com.netflix.atlas.json.JsonSupport import com.netflix.atlas.pekko.DiagnosticMessage +import com.netflix.atlas.pekko.StreamOps import com.netflix.spectator.api.DefaultRegistry +import com.netflix.spectator.api.NoopRegistry import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValueFactory import nl.jqno.equalsverifier.EqualsVerifier import nl.jqno.equalsverifier.Warning import munit.FunSuite +import org.apache.pekko.NotUsed +import org.apache.pekko.util.ByteString import java.nio.file.Path import scala.concurrent.Await +import scala.concurrent.Future import scala.concurrent.Promise +import scala.concurrent.duration.DurationInt +import scala.jdk.CollectionConverters.CollectionHasAsScala +import scala.jdk.CollectionConverters.SetHasAsScala import scala.util.Success import scala.util.Using @@ -365,6 +378,133 @@ class EvaluatorSuite extends FunSuite { testError(ds1, msg) } + // TODO - these datasources changes tests are ignored as they are currently relying + // on a sleep since making it deterministic will be a challenge. But they do show how + // the last datasources wins. Previous streams are stopped. + def dataSourcesChanges(state: Int): Unit = { + val evaluator = new Evaluator(config, registry, system) + + val baseUri = "resource:///gc-pause.dat" + val uri = s"$baseUri?q=name,jvm.gc.pause,:eq,:dist-max,(,nf.asg,nf.node,),:by" + + def getSources: List[DataSources] = { + state match { + // duplicates + case 0 => + List( + Evaluator.DataSources.of(ds("one", uri), ds("two", uri)), + Evaluator.DataSources.of(ds("one", uri), ds("two", uri)) + ) + // disjoint + case 1 => + List( + Evaluator.DataSources.of(ds("one", uri)), + Evaluator.DataSources.of(ds("two", uri)) + ) + // overlap + case 2 => + List( + Evaluator.DataSources.of(ds("one", uri)), + Evaluator.DataSources.of(ds("one", uri), ds("two", uri)) + ) + case x => + throw new IllegalArgumentException(s"Haven't setup a test for ${x}") + } + } + val sourceRef = EvaluationFlows.stoppableSource( + Source + .fromIterator(() => getSources.iterator) + .via(Flow.fromProcessor(() => evaluator.createStreamsProcessor())) + ) + + val oneCount = new AtomicInteger() + val twoCount = new AtomicInteger() + val sink = Sink.foreach[Evaluator.MessageEnvelope] { msg => + if (msg.message().isInstanceOf[TimeSeriesMessage]) { + msg.id match { + case "one" => oneCount.incrementAndGet() + case "two" => twoCount.incrementAndGet() + } + } + } + + val future = sourceRef.source + .toMat(sink)(Keep.right) + .run() + + // TODO - We should have a better trigger. Can't trigger off DPs as if duplication breaks, + // we don't want to stop at 264 and miss dupes. + Thread.sleep(5000) + sourceRef.stop() + + Await.result(future, 1.minute) + // NOTE: Last source wins. So in this case, one is never processed. + assertEquals(oneCount.get(), if (state == 1) 0 else 255) + assertEquals(twoCount.get(), 255) + } + + test("datasources changes, duplicates".ignore) { + dataSourcesChanges(0) + } + + test("datasources changes, disjoint".ignore) { + dataSourcesChanges(1) + } + + test("datasources changes, overlap".ignore) { + dataSourcesChanges(2) + } + + def testRewrite(skipOne: Boolean = false, throwEx: Boolean = false): Unit = { + val evaluator = new Evaluator(config, registry, system) + evaluator.dataSourceRewriter = new UTRewriter(skipOne, throwEx) + + val baseUri = "resource:///gc-pause.dat" + val uri = s"$baseUri?q=name,jvm.gc.pause,:eq,:dist-max,(,nf.asg,nf.node,),:by" + val ds1 = Evaluator.DataSources.of(ds("one", uri), ds("two", uri)) + val sourceRef = EvaluationFlows.stoppableSource( + Source + .single(ds1) + .via(Flow.fromProcessor(() => evaluator.createStreamsProcessor())) + ) + + val oneCount = new AtomicInteger() + val twoCount = new AtomicInteger() + val sink = Sink.foreach[Evaluator.MessageEnvelope] { msg => + msg.id match { + case "one" => oneCount.incrementAndGet() + case "two" => + if (skipOne && msg.message.isInstanceOf[DiagnosticMessage]) twoCount.incrementAndGet() + else twoCount.incrementAndGet() + } + if (oneCount.get() > 0 && twoCount.get() > 0) sourceRef.stop() + } + + val future = sourceRef.source + .toMat(sink)(Keep.right) + .run() + + if (throwEx) { + intercept[RuntimeException] { + Await.result(future, 1.minute) + } + } else { + Await.result(future, 1.minute) + } + } + + test("create processor, rewrites ok") { + testRewrite() + } + + test("create processor, one of two failed rewrite") { + testRewrite(skipOne = true) + } + + test("create processor, rewrite call failed") { + testRewrite(throwEx = true) + } + test("processor handles multiple steps") { val evaluator = new Evaluator(config, registry, system) @@ -604,6 +744,18 @@ class EvaluatorSuite extends FunSuite { ) } + test("validate: invalid rewrite") { + val evaluator = new Evaluator(config, registry, system) + evaluator.dataSourceRewriter = new UTRewriter(true) + val ds = new Evaluator.DataSource( + "test", + s"synthetic://test/?q=nf.ns,none,:eq,name,foo,:eq,:and" + ) + intercept[IllegalArgumentException] { + evaluator.validate(ds) + } + } + private def invalidHiResQuery(expr: String): Unit = { val evaluator = new Evaluator(config, registry, system) val ds = new Evaluator.DataSource( @@ -791,4 +943,46 @@ class EvaluatorSuite extends FunSuite { val result = Await.result(future, scala.concurrent.duration.Duration.Inf) assertEquals(result.size, 10) } + + def getMessages(file: String, filter: Option[String] = None): Seq[ByteString] = { + Files + .readAllLines(Paths.get(file)) + .asScala + .filter(!_.isBlank) + .filter(line => filter.forall(line.contains)) + .map(ByteString(_)) + .toSeq + } + + class UTRewriter(skipOne: Boolean = false, throwEx: Boolean = false) + extends DataSourceRewriter(config, registry, system) { + + override def rewrite( + context: StreamContext, + keepRetrying: Boolean = true + ): Flow[DataSources, DataSources, NotUsed] = { + if (throwEx) { + Flow[DataSources] + .map(_ => throw new RuntimeException("test")) + } else if (skipOne) { + // mimic dropping one. + Flow[DataSources] + .map { dss => + val dsl = dss.sources().asScala.toList + dsl.size match { + case 1 => + val msg = DiagnosticMessage.error(s"failed rewrite") + context.dsLogger(dsl(0), msg) + DataSources.empty() + case 2 => + val msg = DiagnosticMessage.error(s"failed rewrite") + context.dsLogger(dsl(1), msg) + DataSources.of(dsl(0)) + } + } + } else { + Flow[DataSources] + } + } + } } diff --git a/build.sbt b/build.sbt index 26a4b7948..1b7df2295 100644 --- a/build.sbt +++ b/build.sbt @@ -68,10 +68,10 @@ lazy val `atlas-eval` = project .configure(BuildSettings.profile) .dependsOn(`atlas-pekko`, `atlas-chart`, `atlas-core`) .settings(libraryDependencies ++= Seq( + Dependencies.equalsVerifier % "test", Dependencies.pekkoHttpTestkit % "test", Dependencies.pekkoStreamTestkit % "test", - Dependencies.pekkoTestkit % "test", - Dependencies.equalsVerifier % "test" + Dependencies.pekkoTestkit % "test" )) lazy val `atlas-jmh` = project