Skip to content

Commit

Permalink
support presentation metadata for fetch (Netflix#1611)
Browse files Browse the repository at this point in the history
Adds initial support for presentation metadata when using
fetch API. It will force hashed palette selection as not
all data is known in advance. It is off by default to
keep data sizes smaller. To enable the user would need
to specify a hint of `presentation-metadata`.
  • Loading branch information
brharrington authored and manolama committed May 22, 2024
1 parent c207632 commit 38807dd
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import scala.util.Using
* `plot-metadata` that corresponds to all lines on a given axis. The plot has an id that
* will be referenced when the line data is emitted.
*/
private[chart] object JsonCodec {
object JsonCodec {

import com.netflix.atlas.json.JsonParserHelper.*
private val factory = new JsonFactory()
Expand Down Expand Up @@ -120,7 +120,7 @@ private[chart] object JsonCodec {
s"data:image/png;base64,$encoded"
}

private def writeGraphDefMetadata(gen: JsonGenerator, config: GraphDef): Unit = {
def writeGraphDefMetadata(gen: JsonGenerator, config: GraphDef): Unit = {
gen.writeStartObject()
gen.writeStringField("type", "graph-metadata")
gen.writeNumberField("startTime", config.startTime.toEpochMilli)
Expand Down Expand Up @@ -170,7 +170,7 @@ private[chart] object JsonCodec {
gen.writeEndObject()
}

private def writePlotDefMetadata(gen: JsonGenerator, plot: PlotDef, id: Int): Unit = {
def writePlotDefMetadata(gen: JsonGenerator, plot: PlotDef, id: Int): Unit = {
gen.writeStartObject()
gen.writeStringField("type", "plot-metadata")
gen.writeNumberField("id", id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@
*/
package com.netflix.atlas.chart.model;

import java.util.Locale;

/**
* Line styles for how to render an time series.
* Line styles for how to render a time series.
*/
public enum LineStyle {
LINE, AREA, STACK, VSPAN, HEATMAP
LINE, AREA, STACK, VSPAN, HEATMAP;

public static LineStyle parse(String lineStyle) {
return valueOf(lineStyle.toUpperCase(Locale.US));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ case class Grapher(settings: DefaultSettings) {

val lineDefs = labelledTS.sortWith(_._1.label < _._1.label).map {
case (t, stats) =>
val lineStyle = s.lineStyle.fold(dfltStyle)(s => LineStyle.valueOf(s.toUpperCase))
val lineStyle = s.lineStyle.fold(dfltStyle)(LineStyle.parse)
val color = s.color.fold {
val c = lineStyle match {
case LineStyle.HEATMAP =>
Expand All @@ -393,7 +393,7 @@ case class Grapher(settings: DefaultSettings) {
query = Some(s.toString),
groupByKeys = s.expr.finalGrouping,
color = color,
lineStyle = s.lineStyle.fold(dfltStyle)(s => LineStyle.valueOf(s.toUpperCase)),
lineStyle = s.lineStyle.fold(dfltStyle)(LineStyle.parse),
lineWidth = s.lineWidth,
legendStats = stats
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,16 @@ case class ImageFlags(
theme: String,
layout: Layout,
hints: Set[String]
)
) {

def presentationMetadataEnabled: Boolean = {
hints.contains("presentation-metadata")
}

def axisPalette(settings: DefaultSettings, index: Int): String = {
axes.get(index) match {
case Some(axis) => axis.palette.getOrElse(settings.primaryPalette(theme))
case None => settings.primaryPalette(theme)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.model

import com.netflix.atlas.chart.model.LineStyle

import java.awt.Color

/**
* Metadata for presentation details related to how to render the line.
*
* @param plot
* Identifies which axis the line should be associated with.
* @param color
* Color to use for rendering the line.
* @param lineStyle
* How to render the line (line, stack, area, etc).
* @param lineWidth
* Width of the stroke when rendering the line.
*/
case class LineStyleMetadata(plot: Int, color: Color, lineStyle: LineStyle, lineWidth: Float)
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
package com.netflix.atlas.eval.model

import com.fasterxml.jackson.core.JsonGenerator
import com.netflix.atlas.chart.model.LineStyle
import com.netflix.atlas.chart.model.Palette
import com.netflix.atlas.chart.model.PlotDef
import com.netflix.atlas.core.model.*
import com.netflix.atlas.core.util.Strings
import com.netflix.atlas.json.JsonSupport

import java.awt.Color
import java.time.Duration

/**
Expand Down Expand Up @@ -52,6 +56,8 @@ import java.time.Duration
* from the query plus any keys used in the group by clause.
* @param data
* Data for the time series.
* @param styleMetadata
* Metadata for presentation details related to how to render the line.
*/
case class TimeSeriesMessage(
id: String,
Expand All @@ -62,7 +68,8 @@ case class TimeSeriesMessage(
step: Long,
label: String,
tags: Map[String, String],
data: ChunkData
data: ChunkData,
styleMetadata: Option[LineStyleMetadata]
) extends JsonSupport {

override def hasCustomEncoding: Boolean = true
Expand All @@ -79,6 +86,12 @@ case class TimeSeriesMessage(
}
gen.writeStringField("label", label)
encodeTags(gen, tags)
styleMetadata.foreach { metadata =>
gen.writeNumberField("plot", metadata.plot)
gen.writeStringField("color", Strings.zeroPad(metadata.color.getRGB, 8))
gen.writeStringField("lineStyle", metadata.lineStyle.name())
gen.writeNumberField("lineWidth", metadata.lineWidth)
}
gen.writeNumberField("start", start)
gen.writeNumberField("end", end)
gen.writeNumberField("step", step)
Expand Down Expand Up @@ -110,8 +123,15 @@ object TimeSeriesMessage {
* for the message.
* @param ts
* Time series to use for the message.
* @param palette
* If defined then include presentation metadata.
*/
def apply(expr: StyleExpr, context: EvalContext, ts: TimeSeries): TimeSeriesMessage = {
def apply(
expr: StyleExpr,
context: EvalContext,
ts: TimeSeries,
palette: Option[String] = None
): TimeSeriesMessage = {
val query = expr.toString
val offset = Strings.toString(Duration.ofMillis(expr.offset))
val outputTags = ts.tags + (TagKey.offset -> offset)
Expand All @@ -126,7 +146,27 @@ object TimeSeriesMessage {
context.step,
ts.label,
outputTags,
ArrayData(data.data)
ArrayData(data.data),
palette.map(p => createStyleMetadata(expr, ts.label, p))
)
}

private def createStyleMetadata(
expr: StyleExpr,
label: String,
dfltPalette: String
): LineStyleMetadata = {
val color = expr.color.fold(colorFromPalette(expr, label, dfltPalette))(Strings.parseColor)
LineStyleMetadata(
plot = expr.axis.getOrElse(0),
color = color,
lineStyle = expr.lineStyle.fold(LineStyle.LINE)(LineStyle.parse),
lineWidth = expr.lineWidth
)
}

private def colorFromPalette(expr: StyleExpr, label: String, dfltPalette: String): Color = {
val palette = expr.palette.getOrElse(dfltPalette)
Palette.create(palette).colors(label.hashCode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class TimeSeriesMessageSuite extends FunSuite {
step = 60000L,
label = "test",
tags = Map("name" -> "sps", "cluster" -> "www"),
data = ArrayData(Array(42.0))
data = ArrayData(Array(42.0)),
None
)

test("json encoding with empty group by") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package com.netflix.atlas.webapi

import com.fasterxml.jackson.core.JsonGenerator
import com.netflix.atlas.chart.JsonCodec
import com.netflix.atlas.chart.model.PlotDef

import java.time.Instant
import java.time.temporal.ChronoUnit
import org.apache.pekko.NotUsed
Expand Down Expand Up @@ -43,10 +47,15 @@ import com.netflix.atlas.core.model.TimeSeq
import com.netflix.atlas.core.model.TimeSeries
import com.netflix.atlas.eval.graph.GraphConfig
import com.netflix.atlas.eval.model.TimeSeriesMessage
import com.netflix.atlas.json.Json
import com.netflix.atlas.json.JsonSupport
import com.netflix.atlas.pekko.DiagnosticMessage
import com.netflix.atlas.webapi.GraphApi.DataRequest
import com.netflix.atlas.webapi.GraphApi.DataResponse

import java.io.StringWriter
import scala.util.Using

/**
* Provides the SSE data stream payload for a fetch response. Fetch is an alternative
* to the graph API that is meant for accessing the data rather than rendering as an
Expand Down Expand Up @@ -75,6 +84,8 @@ object FetchRequestSource {
.repeat(DiagnosticMessage.info("heartbeat"))
.throttle(1, 10.seconds, 1, ThrottleMode.Shaping)

val metadataSrc = createMetadataSource(graphCfg)

val dataSrc = Source(chunks)
.flatMapConcat { chunk =>
val req = DataRequest(graphCfg).copy(context = chunk)
Expand All @@ -91,17 +102,40 @@ object FetchRequestSource {
case t: Throwable => DiagnosticMessage.error(t)
}
.merge(heartbeatSrc, eagerComplete = true)
.map(_.toJson)

val closeSrc = Source.single(DiagnosticMessage.close)
val closeSrc = Source.single(DiagnosticMessage.close).map(_.toJson)

Source(List(dataSrc, closeSrc))
.flatMapConcat(s => s)
metadataSrc
.concat(dataSrc.concat(closeSrc))
.map { msg =>
val bytes = ByteString(s"$prefix${msg.toJson}$suffix")
val bytes = ByteString(s"$prefix$msg$suffix")
ChunkStreamPart(bytes)
}
}

private def createMetadataSource(graphCfg: GraphConfig): Source[String, NotUsed] = {
val usedAxes = graphCfg.exprs.map(_.axis.getOrElse(0)).toSet
if (graphCfg.flags.presentationMetadataEnabled) {
val graphDef = toJson(gen => JsonCodec.writeGraphDefMetadata(gen, graphCfg.newGraphDef(Nil)))
val plots = graphCfg.flags.axes.filter(t => usedAxes.contains(t._1)).map {
case (i, axis) => toJson(gen => JsonCodec.writePlotDefMetadata(gen, axis.newPlotDef(), i))
}
Source(graphDef :: plots.toList)
} else {
Source.empty
}
}

private def toJson(encode: JsonGenerator => Unit): String = {
Using.resource(new StringWriter()) { writer =>
Using.resource(Json.newJsonGenerator(writer)) { gen =>
encode(gen)
}
writer.toString
}
}

/**
* Returns an HttpResponse with an entity that is generated by the fetch source.
*/
Expand Down Expand Up @@ -156,18 +190,24 @@ object FetchRequestSource {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler {

private val metadata = graphCfg.flags.presentationMetadataEnabled

private var state = Map.empty[StatefulExpr, Any]

override def onPush(): Unit = {
val chunk = grab(in)
val ts = graphCfg.exprs
.flatMap { s =>
val palette =
if (metadata)
Some(graphCfg.flags.axisPalette(graphCfg.settings, s.axis.getOrElse(0)))
else None
val context = chunk.context.copy(state = state)
val result = s.expr.eval(context, chunk.data)
state = result.state
result.data
.filterNot(ts => isAllNaN(ts.data, context.start, context.end, context.step))
.map(ts => TimeSeriesMessage(s, context, ts.withLabel(s.legend(ts))))
.map(ts => TimeSeriesMessage(s, context, ts.withLabel(s.legend(ts)), palette))
}
push(out, ts)
}
Expand Down

0 comments on commit 38807dd

Please sign in to comment.