Skip to content

Commit

Permalink
Log if we cannot deserialize a line into an airbyte message. (#5948)
Browse files Browse the repository at this point in the history
After #5136, we removed this block of code as this does not conform to how the Airbyte Protocol is meant to work.

However, since existing connectors do violate this, this PR puts this back for the meantime as a bandaid to give us time to fix this.

Previously, if a line is not a JSON string, the platform assumes this is a log and logs this - we add this back.

Due to the new set up, where for performance reasons we deserialise directly to an AirbyteMessage, what we lose here is the ability to differentiate between a string that isn't a json and a json string that cannot be deserialized into an AirbyteMessage.

Overall, this should result in slightly noiser logs. We would be printing these lines anyway as deserialization errors, so this does not affect performance.
  • Loading branch information
davinchia committed Apr 17, 2023
1 parent 09f4d66 commit ea2b8b9
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,17 @@ protected Stream<AirbyteMessage> toAirbyteMessage(final String line) {
return upgradeMessage(m.get());
}

logger.error("Deserialization failed: {}", Jsons.serialize(line));
// If a line cannot be deserialized into an AirbyteMessage,
// we assume it is a log message that is mistakenly not an
// Airbyte Log Message.
//
// This is because some sources actually log their process on stdout,
// so we want to make sure this info is available in the logs.
//
// When Connector Ops rectifies this, we can remove this.
try (final var mdcScope = containerLogMdcBuilder.build()) {
logger.info(line);
}
return m.stream();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -80,7 +81,7 @@ void testLoggingLine() {
final Stream<AirbyteMessage> messageStream = stringToMessageStream(invalidRecord);

assertEquals(Collections.emptyList(), messageStream.collect(Collectors.toList()));
verify(logger).error("Deserialization failed: {}", "\"invalid line\"");
verify(logger).info(invalidRecord);
}

@Test
Expand All @@ -100,7 +101,7 @@ void testFailDeserializationObvious() {
final Stream<AirbyteMessage> messageStream = stringToMessageStream(invalidRecord);

assertEquals(Collections.emptyList(), messageStream.collect(Collectors.toList()));
verify(logger).error(anyString(), anyString());
verify(logger).info(invalidRecord);
}

@Test
Expand All @@ -110,7 +111,7 @@ void testFailDeserializationSubtle() {
final Stream<AirbyteMessage> messageStream = stringToMessageStream(invalidRecord);

assertEquals(Collections.emptyList(), messageStream.collect(Collectors.toList()));
verify(logger).error(anyString(), anyString());
verify(logger).info(invalidRecord);
}

@Test
Expand All @@ -120,7 +121,9 @@ void testFailValidation() {
final Stream<AirbyteMessage> messageStream = stringToMessageStream(invalidRecord);

assertEquals(Collections.emptyList(), messageStream.collect(Collectors.toList()));
verify(logger).error(anyString(), anyString());

verify(logger, atLeastOnce()).info(anyString(), anyString(), anyString());
verify(logger, atLeastOnce()).error(anyString(), anyString());
}

@Test
Expand Down

0 comments on commit ea2b8b9

Please sign in to comment.