Skip to content

Commit

Permalink
Add tracking scenario ID in observed_event if defined (close #807)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben authored and spenes committed Feb 20, 2024
1 parent c0d05cb commit c7f528f
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ object Metadata {
)
}

case class TrackingScenarioInfo(
schemaVendor: String,
schemaName: String,
field: String
)
private val trackingScenarioInfo = TrackingScenarioInfo("com.snowplowanalytics.snowplow", "tracking_scenario", "id")

private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

Expand Down Expand Up @@ -171,15 +178,17 @@ object Metadata {
* @param source - `app_id` for given event
* @param tracker - `v_tracker` for given event
* @param platform - The platform the app runs on for given event (`platform` field)
* @param scenarioId - Identifier for the tracking scenario the event is being tracked for
*/
case class MetadataEvent(
schema: SchemaKey,
source: Option[String],
tracker: Option[String],
platform: Option[String]
platform: Option[String],
scenarioId: Option[String]
)
object MetadataEvent {
def apply(event: EnrichedEvent): MetadataEvent =
def apply(event: EnrichedEvent, scenarioId: Option[String]): MetadataEvent =
MetadataEvent(
SchemaKey(
Option(event.event_vendor).getOrElse("unknown-vendor"),
Expand All @@ -189,7 +198,8 @@ object Metadata {
),
Option(event.app_id),
Option(event.v_tracker),
Option(event.platform)
Option(event.platform),
scenarioId
)
}

Expand Down Expand Up @@ -231,19 +241,39 @@ object Metadata {
} yield MetadataSnapshot(aggregates, periodStart, periodEnd)
}

def unwrapEntities(event: EnrichedEvent): Set[SchemaKey] = {
def unwrap(str: String) =
decode[SelfDescribingData[Json]](str)
def unwrapEntities(event: EnrichedEvent): (Set[SchemaKey], Option[String]) = {
case class Entities(schemas: Set[SchemaKey], scenarioId: Option[String])

def unwrap(str: String): Entities = {
val sdjs = decode[SelfDescribingData[Json]](str)
.traverse(
_.data
.as[List[SelfDescribingData[Json]]]
.traverse(_.map(_.schema))
.sequence
.flatMap(_.toList)
)
.flatMap(_.toList)
.toSet

unwrap(event.contexts) ++ unwrap(event.derived_contexts)
val schemas = sdjs.map(_.schema)

val scenarioId = sdjs.collectFirst {
case sdj if sdj.schema.vendor == trackingScenarioInfo.schemaVendor && sdj.schema.name == trackingScenarioInfo.schemaName =>
sdj.data.hcursor.downField(trackingScenarioInfo.field).as[String] match {
case Right(scenarioId) =>
Some(scenarioId)
case _ =>
None
}
}.flatten

Entities(schemas, scenarioId)
}

val entities = unwrap(event.contexts)
val schemas = entities.schemas ++ unwrap(event.derived_contexts).schemas

(schemas, entities.scenarioId)
}

def schema(event: EnrichedEvent): SchemaKey =
Expand All @@ -255,7 +285,10 @@ object Metadata {
)

def recalculate(previous: Aggregates, events: List[EnrichedEvent]): Aggregates =
previous |+| events.map(e => Map(MetadataEvent(e) -> EntitiesAndCount(unwrapEntities(e), 1))).combineAll
previous |+| events.map { e =>
val (entities, scenarioId) = unwrapEntities(e)
Map(MetadataEvent(e, scenarioId) -> EntitiesAndCount(entities, 1))
}.combineAll

def mkWebhookEvent(
organizationId: UUID,
Expand All @@ -266,7 +299,7 @@ object Metadata {
count: Int
): SelfDescribingData[Json] =
SelfDescribingData(
SchemaKey("com.snowplowanalytics.console", "observed_event", "jsonschema", SchemaVer.Full(6, 0, 0)),
SchemaKey("com.snowplowanalytics.console", "observed_event", "jsonschema", SchemaVer.Full(6, 0, 1)),
Json.obj(
"organizationId" -> organizationId.asJson,
"pipelineId" -> pipelineId.asJson,
Expand All @@ -276,6 +309,7 @@ object Metadata {
"source" -> event.source.getOrElse("unknown-source").asJson,
"tracker" -> event.tracker.getOrElse("unknown-tracker").asJson,
"platform" -> event.platform.getOrElse("unknown-platform").asJson,
"scenario_id" -> event.scenarioId.asJson,
"eventVolume" -> Json.fromInt(count),
"periodStart" -> periodStart.asJson,
"periodEnd" -> periodEnd.asJson
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class MetadataSpec extends Specification with CatsEffect {
case class Report(
periodStart: Instant,
periodEnd: Instant,
event: SchemaKey,
event: MetadataEvent,
entitiesAndCount: EntitiesAndCount
)
case class TestReporter[F[_]](state: Ref[F, List[Report]]) extends MetadataReporter[F] {
Expand All @@ -43,61 +43,74 @@ class MetadataSpec extends Specification with CatsEffect {
entitiesAndCount: EntitiesAndCount
): F[Unit] =
state.update(
_ :+ Report(periodStart, periodEnd, event.schema, entitiesAndCount)
_ :+ Report(periodStart, periodEnd, event, entitiesAndCount)
)
}

"Metadata" should {

"report observed events and entities" in {
val event = new EnrichedEvent()
event.contexts =
"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}},{"schema":"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0","data":{"navigationStart":1581931694397,"unloadEventStart":1581931696046,"unloadEventEnd":1581931694764,"redirectStart":0,"redirectEnd":0,"fetchStart":1581931694397,"domainLookupStart":1581931694440,"domainLookupEnd":1581931694513,"connectStart":1581931694513,"connectEnd":1581931694665,"secureConnectionStart":1581931694572,"requestStart":1581931694665,"responseStart":1581931694750,"responseEnd":1581931694750,"domLoading":1581931694762,"domInteractive":1581931695963,"domContentLoadedEventStart":1581931696039,"domContentLoadedEventEnd":1581931696039,"domComplete":0,"loadEventStart":0,"loadEventEnd":0}}]}"""
val event = MetadataSpec.enriched
event.contexts = """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[
{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}},
{"schema":"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0","data":{"navigationStart":1581931694397,"unloadEventStart":1581931696046,"unloadEventEnd":1581931694764,"redirectStart":0,"redirectEnd":0,"fetchStart":1581931694397,"domainLookupStart":1581931694440,"domainLookupEnd":1581931694513,"connectStart":1581931694513,"connectEnd":1581931694665,"secureConnectionStart":1581931694572,"requestStart":1581931694665,"responseStart":1581931694750,"responseEnd":1581931694750,"domLoading":1581931694762,"domInteractive":1581931695963,"domContentLoadedEventStart":1581931696039,"domContentLoadedEventEnd":1581931696039,"domComplete":0,"loadEventStart":0,"loadEventEnd":0}}
]}"""

val config = MetadataConfig(
Uri.unsafeFromString("https://localhost:443"),
50.millis,
UUID.fromString("dfc1aef8-2656-492b-b5ba-c77702f850bc"),
UUID.fromString("8c121fdd-dc8c-4cdc-bad1-3cefbe2b01ff")
)

for {
state <- Ref.of[IO, List[Report]](List.empty)
system <- Metadata.build[IO](config, TestReporter(state))
_ <- system.observe(List(event))
_ <- system.report.take(1).compile.drain
res <- state.get
report = res.head
} yield {
res.map(_.event) should containTheSameElementsAs(
List(SchemaKey("unknown-vendor", "unknown-name", "unknown-format", SchemaVer.Full(0, 0, 0)))
)
res.flatMap(_.entitiesAndCount.entities) should containTheSameElementsAs(
report.event should beEqualTo(MetadataSpec.expectedMetadaEvent)

report.entitiesAndCount.entities should containTheSameElementsAs(
Seq(
SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 0)),
SchemaKey("org.w3", "PerformanceTiming", "jsonschema", SchemaVer.Full(1, 0, 0))
)
)
res.map(_.entitiesAndCount.count) should beEqualTo(List(1))

report.entitiesAndCount.count should beEqualTo(1)
}
}

"parse schemas for event's entities" in {
"get entities in event's contexts and find scenarioId if present" in {
val event = new EnrichedEvent()
event.contexts =
"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}},{"schema":"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0","data":{"navigationStart":1581931694397,"unloadEventStart":1581931696046,"unloadEventEnd":1581931694764,"redirectStart":0,"redirectEnd":0,"fetchStart":1581931694397,"domainLookupStart":1581931694440,"domainLookupEnd":1581931694513,"connectStart":1581931694513,"connectEnd":1581931694665,"secureConnectionStart":1581931694572,"requestStart":1581931694665,"responseStart":1581931694750,"responseEnd":1581931694750,"domLoading":1581931694762,"domInteractive":1581931695963,"domContentLoadedEventStart":1581931696039,"domContentLoadedEventEnd":1581931696039,"domComplete":0,"loadEventStart":0,"loadEventEnd":0}}]}"""
val expected =
event.contexts = """{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[
{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}},
{"schema":"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0","data":{"navigationStart":1581931694397,"unloadEventStart":1581931696046,"unloadEventEnd":1581931694764,"redirectStart":0,"redirectEnd":0,"fetchStart":1581931694397,"domainLookupStart":1581931694440,"domainLookupEnd":1581931694513,"connectStart":1581931694513,"connectEnd":1581931694665,"secureConnectionStart":1581931694572,"requestStart":1581931694665,"responseStart":1581931694750,"responseEnd":1581931694750,"domLoading":1581931694762,"domInteractive":1581931695963,"domContentLoadedEventStart":1581931696039,"domContentLoadedEventEnd":1581931696039,"domComplete":0,"loadEventStart":0,"loadEventEnd":0}},
{"schema": "iglu:com.snowplowanalytics.snowplow/tracking_scenario/jsonschema/1-0-0", "data": {"id": "scenario_id"}}
]}"""
val expectedEntitites =
Seq(
SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 0)),
SchemaKey("org.w3", "PerformanceTiming", "jsonschema", SchemaVer.Full(1, 0, 0))
SchemaKey("org.w3", "PerformanceTiming", "jsonschema", SchemaVer.Full(1, 0, 0)),
SchemaKey("com.snowplowanalytics.snowplow", "tracking_scenario", "jsonschema", SchemaVer.Full(1, 0, 0))
)
val expectedScenarioId = Some("scenario_id")

val (actualEntities, actualScenarioId) = Metadata.unwrapEntities(event)

Metadata.unwrapEntities(event) should containTheSameElementsAs(expected)
actualEntities should containTheSameElementsAs(expectedEntitites)
actualScenarioId should beEqualTo(expectedScenarioId)
}

"recalculate event aggregates" should {

"add metadata event to empty state" in {
val enriched = MetadataSpec.enriched
Metadata.recalculate(Map.empty, List(enriched)) should containTheSameElementsAs(
Seq(MetadataEvent(enriched) -> EntitiesAndCount(Set.empty, 1))
Seq(MetadataEvent(enriched, None) -> EntitiesAndCount(Set.empty, 1))
)
}

Expand All @@ -106,9 +119,9 @@ class MetadataSpec extends Specification with CatsEffect {
val other = MetadataSpec.enriched
val v1_0_1 = SchemaVer.Full(1, 0, 1)
other.event_version = v1_0_1.asString
val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(Set.empty[SchemaKey], 1))
val previous = Map(MetadataEvent(enriched, None) -> EntitiesAndCount(Set.empty[SchemaKey], 1))
Metadata.recalculate(previous, List(other)) should containTheSameElementsAs(
previous.toSeq ++ Seq(MetadataEvent(other) -> EntitiesAndCount(Set.empty[SchemaKey], 1))
previous.toSeq ++ Seq(MetadataEvent(other, None) -> EntitiesAndCount(Set.empty[SchemaKey], 1))
)
}

Expand All @@ -124,9 +137,9 @@ class MetadataSpec extends Specification with CatsEffect {
enrichedBis.contexts =
"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-1","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}]}"""
val entityBis = SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 1))
val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(entities, 1))
val previous = Map(MetadataEvent(enriched, None) -> EntitiesAndCount(entities, 1))
Metadata.recalculate(previous, List(enrichedBis)) should containTheSameElementsAs(
Seq(MetadataEvent(enriched) -> EntitiesAndCount(entities + entityBis, 2))
Seq(MetadataEvent(enriched, None) -> EntitiesAndCount(entities + entityBis, 2))
)
}

Expand All @@ -146,12 +159,52 @@ class MetadataSpec extends Specification with CatsEffect {
enrichedTer.contexts =
"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-2","data":{"id":"39a9934a-ddd3-4581-a4ea-d0ba20e63b92"}}]}"""
val entityTer = SchemaKey("com.snowplowanalytics.snowplow", "web_page", "jsonschema", SchemaVer.Full(1, 0, 2))
val previous = Map(MetadataEvent(enriched) -> EntitiesAndCount(entities, 1))
val previous = Map(MetadataEvent(enriched, None) -> EntitiesAndCount(entities, 1))
Metadata.recalculate(previous, List(enrichedBis, enrichedTer)) should containTheSameElementsAs(
Seq(MetadataEvent(enriched) -> EntitiesAndCount(entities + entityBis + entityTer, 3))
Seq(MetadataEvent(enriched, None) -> EntitiesAndCount(entities + entityBis + entityTer, 3))
)
}
}

"put scenario_id in the JSON if defined" in {
val json = Metadata
.mkWebhookEvent(
UUID.randomUUID(),
UUID.randomUUID(),
Instant.now(),
Instant.now(),
MetadataEvent(
SchemaKey("com.snowplowanalytics.snowplow", "whatever", "jsonschema", SchemaVer.Full(1, 0, 2)),
None,
None,
None,
Some("hello")
),
42
)
.toString
json.contains("\"scenario_id\" : \"hello\",") must beTrue
}

"put null as scenario_id in the JSON if not defined" in {
val json = Metadata
.mkWebhookEvent(
UUID.randomUUID(),
UUID.randomUUID(),
Instant.now(),
Instant.now(),
MetadataEvent(
SchemaKey("com.snowplowanalytics.snowplow", "whatever", "jsonschema", SchemaVer.Full(1, 0, 2)),
None,
None,
None,
None
),
42
)
.toString
json.contains("\"scenario_id\" : null,") must beTrue
}
}
}

Expand All @@ -160,19 +213,29 @@ object MetadataSpec {
val eventName = "example"
val eventFormat = "jsonschema"
val eventVersion = SchemaVer.Full(1, 0, 0)
val appId = "app123"
val tracker = "js-tracker-3.0.0"
val platform = "web"

def enriched = {
val appId = "app123"
val tracker = "js-tracker-3.0.0"
val enriched = new EnrichedEvent()
enriched.event_vendor = eventVendor
enriched.event_name = eventName
enriched.event_format = eventFormat
enriched.event_version = eventVersion.asString
enriched.app_id = appId
enriched.v_tracker = tracker
enriched.platform = platform
enriched
}

val eventSchema = SchemaKey(eventVendor, eventName, eventFormat, eventVersion)

val expectedMetadaEvent = MetadataEvent(
eventSchema,
Some(appId),
Some(tracker),
Some(platform),
None
)
}

0 comments on commit c7f528f

Please sign in to comment.