diff --git a/ingestion-beam/pom.xml b/ingestion-beam/pom.xml index 004391de4..5b5dc418c 100644 --- a/ingestion-beam/pom.xml +++ b/ingestion-beam/pom.xml @@ -59,6 +59,10 @@ org.apache.beam beam-sdks-java-extensions-json-jackson + + org.apache.beam + beam-sdks-java-extensions-avro + org.apache.beam beam-sdks-java-io-google-cloud-platform diff --git a/ingestion-beam/src/main/java/com/mozilla/telemetry/avro/BinaryRecordFormatter.java b/ingestion-beam/src/main/java/com/mozilla/telemetry/avro/BinaryRecordFormatter.java index fcf28f068..a86af2e27 100644 --- a/ingestion-beam/src/main/java/com/mozilla/telemetry/avro/BinaryRecordFormatter.java +++ b/ingestion-beam/src/main/java/com/mozilla/telemetry/avro/BinaryRecordFormatter.java @@ -10,7 +10,7 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; -import org.apache.beam.sdk.io.AvroIO.RecordFormatter; +import org.apache.beam.sdk.extensions.avro.io.AvroIO.RecordFormatter; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; /** diff --git a/ingestion-beam/src/main/java/com/mozilla/telemetry/avro/PubsubMessageRecordFormatter.java b/ingestion-beam/src/main/java/com/mozilla/telemetry/avro/PubsubMessageRecordFormatter.java index 5dd1a4d99..f72c392d0 100644 --- a/ingestion-beam/src/main/java/com/mozilla/telemetry/avro/PubsubMessageRecordFormatter.java +++ b/ingestion-beam/src/main/java/com/mozilla/telemetry/avro/PubsubMessageRecordFormatter.java @@ -10,7 +10,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; -import org.apache.beam.sdk.io.AvroIO.RecordFormatter; +import org.apache.beam.sdk.extensions.avro.io.AvroIO.RecordFormatter; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; /** diff --git a/ingestion-beam/src/main/java/com/mozilla/telemetry/io/Write.java b/ingestion-beam/src/main/java/com/mozilla/telemetry/io/Write.java index e86b667c6..5e8ebabc8 100644 --- a/ingestion-beam/src/main/java/com/mozilla/telemetry/io/Write.java +++ b/ingestion-beam/src/main/java/com/mozilla/telemetry/io/Write.java @@ -37,7 +37,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.extensions.avro.io.AvroIO; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.TextIO; diff --git a/ingestion-beam/src/test/java/com/mozilla/telemetry/avro/PubsubMessageRecordFormatterTest.java b/ingestion-beam/src/test/java/com/mozilla/telemetry/avro/PubsubMessageRecordFormatterTest.java index 403934208..28eecdd9b 100644 --- a/ingestion-beam/src/test/java/com/mozilla/telemetry/avro/PubsubMessageRecordFormatterTest.java +++ b/ingestion-beam/src/test/java/com/mozilla/telemetry/avro/PubsubMessageRecordFormatterTest.java @@ -1,6 +1,7 @@ package com.mozilla.telemetry.avro; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -96,7 +97,7 @@ public void testFormatWithNestedObjectStruct() { GenericRecord shape = (GenericRecord) record.get("shape"); GenericRecord quad = (GenericRecord) shape.get("quadrilateral"); assertEquals(true, quad.get("rhombus")); - assertEquals(null, shape.get("triangle")); + assertFalse(shape.hasField("triangle")); } @Test @@ -338,6 +339,6 @@ public void testFormatCorrectsFieldNames() { assertEquals(true, record.get("test_dot")); assertEquals(true, record.get("_test_prefix_hyphen")); assertEquals(true, record.get("_0_test_prefix_number")); - assertEquals(null, record.get("$test_bad_symbol")); + assertFalse(record.hasField("$test_bad_symbol")); } } diff --git a/ingestion-beam/src/test/java/com/mozilla/telemetry/integration/BeamDependenciesIntegrationTest.java b/ingestion-beam/src/test/java/com/mozilla/telemetry/integration/BeamDependenciesIntegrationTest.java index fa53afa59..9c6f6769d 100644 --- a/ingestion-beam/src/test/java/com/mozilla/telemetry/integration/BeamDependenciesIntegrationTest.java +++ b/ingestion-beam/src/test/java/com/mozilla/telemetry/integration/BeamDependenciesIntegrationTest.java @@ -20,9 +20,11 @@ public class BeamDependenciesIntegrationTest { public void checkVersions() throws Exception { final String beamVersion = System.getProperty("beam.version"); final Pom beamCore = getPom("org.apache.beam", "beam-sdks-java-core", beamVersion); + final Pom avroExtension = getPom("org.apache.beam", "beam-sdks-java-extensions-avro", + beamVersion); final Map> expectedVersions = ImmutableMap.of(// - "avro.version", beamCore.getVersion("org.apache.avro", "avro"), // + "avro.version", avroExtension.getVersion("org.apache.avro", "avro"), // "jackson.version", beamCore.getVersion("com.fasterxml.jackson.core", "jackson-core")); final Map> actualVersions = ImmutableMap.of(// diff --git a/pom.xml b/pom.xml index 1ff273da1..c399c4c77 100644 --- a/pom.xml +++ b/pom.xml @@ -55,12 +55,12 @@ -Xmx1024m -Djdk.net.URLClassPath.disableClassPathURLCheck=true 1.10.4 - 2.51.0 + 2.60.0 - 2.14.1 - 1.8.2 + 2.15.4 + 1.11.3 1.1.10.5