From 90df3615b53285f4d053a6990fdd03dc10655607 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 30 Jun 2025 12:18:45 -0600 Subject: [PATCH] Ignore unknown sync lines --- crates/core/src/sync/line.rs | 74 ++++++++++++++++++++++---- crates/core/src/sync/streaming_sync.rs | 7 +++ 2 files changed, 71 insertions(+), 10 deletions(-) diff --git a/crates/core/src/sync/line.rs b/crates/core/src/sync/line.rs index ab8c1993..8f730789 100644 --- a/crates/core/src/sync/line.rs +++ b/crates/core/src/sync/line.rs @@ -1,5 +1,6 @@ use alloc::borrow::Cow; use alloc::vec::Vec; +use serde::de::{IgnoredAny, VariantAccess, Visitor}; use serde::Deserialize; use crate::util::{deserialize_optional_string_to_i64, deserialize_string_to_i64}; @@ -13,24 +14,63 @@ use super::Checksum; /// internal copy). type SyncLineStr<'a> = Cow<'a, str>; -#[derive(Deserialize, Debug)] +#[derive(Debug)] pub enum SyncLine<'a> { - #[serde(rename = "checkpoint", borrow)] Checkpoint(Checkpoint<'a>), - #[serde(rename = "checkpoint_diff", borrow)] CheckpointDiff(CheckpointDiff<'a>), - - #[serde(rename = "checkpoint_complete")] CheckpointComplete(CheckpointComplete), - #[serde(rename = "partial_checkpoint_complete")] CheckpointPartiallyComplete(CheckpointPartiallyComplete), - - #[serde(rename = "data", borrow)] Data(DataLine<'a>), - - #[serde(rename = "token_expires_in")] KeepAlive(TokenExpiresIn), + UnknownSyncLine, +} + +impl<'de> Deserialize<'de> for SyncLine<'de> { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct SyncLineVisitor; + + impl<'de> Visitor<'de> for SyncLineVisitor { + type Value = SyncLine<'de>; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(formatter, "a sync line") + } + + fn visit_enum(self, data: A) -> Result + where + A: serde::de::EnumAccess<'de>, + { + let (name, payload) = data.variant::<&'de str>()?; + Ok(match name { + "checkpoint" => SyncLine::Checkpoint(payload.newtype_variant::()?), + "checkpoint_diff" => { + SyncLine::CheckpointDiff(payload.newtype_variant::()?) + } + "checkpoint_complete" => SyncLine::CheckpointComplete( + payload.newtype_variant::()?, + ), + "partial_checkpoint_complete" => SyncLine::CheckpointPartiallyComplete( + payload.newtype_variant::()?, + ), + "data" => SyncLine::Data(payload.newtype_variant::()?), + "token_expires_in" => { + SyncLine::KeepAlive(payload.newtype_variant::()?) + } + _ => { + payload.newtype_variant::()?; + + SyncLine::UnknownSyncLine + } + }) + } + } + + deserializer.deserialize_enum("SyncLine", &[], SyncLineVisitor) + } } #[derive(Deserialize, Debug)] @@ -160,6 +200,8 @@ impl<'a, 'de: 'a> Deserialize<'de> for OplogData<'a> { mod tests { use core::assert_matches::assert_matches; + use alloc::string::ToString; + use super::*; fn deserialize(source: &str) -> SyncLine { @@ -305,4 +347,16 @@ mod tests { } ); } + + #[test] + fn parse_unknown() { + assert_matches!(deserialize("{\"foo\": {}}"), SyncLine::UnknownSyncLine); + assert_matches!(deserialize("{\"foo\": 123}"), SyncLine::UnknownSyncLine); + } + + #[test] + fn parse_invalid_duplicate_key() { + let e = serde_json::from_str::(r#"{"foo": {}, "bar": {}}"#).unwrap_err(); + assert_eq!(e.to_string(), "expected value at line 1 column 10"); + } } diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index 62381f2c..a344cb68 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -398,6 +398,13 @@ impl StreamingSyncIteration { SyncStateMachineTransition::Empty } } + SyncLine::UnknownSyncLine => { + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::DEBUG, + line: "Unknown sync line".into(), + }); + SyncStateMachineTransition::Empty + } }) }