diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroWriter.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroWriter.java new file mode 100644 index 00000000..fff647ae --- /dev/null +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroWriter.java @@ -0,0 +1,67 @@ +/*- + * -\-\- + * DBeam Core + * -- + * Copyright (C) 2016 - 2019 Spotify AB + * -- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * -/-/- + */ + +package com.spotify.dbeam.avro; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Takes ByteBuffer datums from a BlockingQueue and writes to a DataFileWriter. + */ +public class AvroWriter implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(AvroWriter.class); + private final DataFileWriter dataFileWriter; + private final BlockingQueue queue; + + public AvroWriter( + DataFileWriter dataFileWriter, + BlockingQueue queue) { + this.dataFileWriter = dataFileWriter; + this.queue = queue; + } + + @Override + public void run() { + LOGGER.debug("AvroWriter started"); + try { + while (true) { + final ByteBuffer datum = queue.take(); + if (datum.capacity() == 0) { + this.dataFileWriter.sync(); + return; + } else { + this.dataFileWriter.appendEncoded(datum); + } + } + } catch (InterruptedException ex) { + LOGGER.warn("AvroWriter interrupted"); + } catch (IOException e) { + LOGGER.error("Error on AvroWriter", e); + } + } +} diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java index 2dfef317..60910962 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java @@ -25,12 +25,22 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.CountingOutputStream; import com.spotify.dbeam.args.JdbcAvroArgs; + +import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; + import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; @@ -136,6 +146,7 @@ private static class JdbcAvroWriter extends FileBasedSink.Writer { private Connection connection; private JdbcAvroMetering metering; private CountingOutputStream countingOutputStream; + private BlockingQueue queue; JdbcAvroWriter( FileBasedSink.WriteOperation writeOperation, @@ -145,6 +156,7 @@ private static class JdbcAvroWriter extends FileBasedSink.Writer { this.dynamicDestinations = dynamicDestinations; this.jdbcAvroArgs = jdbcAvroArgs; this.metering = JdbcAvroMetering.create(); + this.queue = new LinkedBlockingDeque<>(jdbcAvroArgs.fetchSize() * 4); } public Void getDestination() { @@ -154,7 +166,7 @@ public Void getDestination() { @SuppressWarnings("deprecation") // uses internal test functionality. @Override protected void prepareWrite(final WritableByteChannel channel) throws Exception { - LOGGER.info("jdbcavroio : Preparing write..."); + LOGGER.debug("jdbcavroio : Preparing write..."); connection = jdbcAvroArgs.jdbcConnectionConfiguration().createConnection(); final Void destination = getDestination(); final Schema schema = dynamicDestinations.getSchema(destination); @@ -165,7 +177,7 @@ protected void prepareWrite(final WritableByteChannel channel) throws Exception dataFileWriter.setMeta("created_by", this.getClass().getCanonicalName()); this.countingOutputStream = new CountingOutputStream(Channels.newOutputStream(channel)); dataFileWriter.create(schema, this.countingOutputStream); - LOGGER.info("jdbcavroio : Write prepared"); + LOGGER.debug("jdbcavroio : Write prepared"); } private ResultSet executeQuery(final String query) throws Exception { @@ -199,22 +211,34 @@ private ResultSet executeQuery(final String query) throws Exception { public void write(final String query) throws Exception { checkArgument(dataFileWriter != null, "Avro DataFileWriter was not properly created"); LOGGER.info("jdbcavroio : Starting write..."); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); try (ResultSet resultSet = executeQuery(query)) { + final Future future = executorService.submit(new AvroWriter(dataFileWriter, queue)); metering.startWriteMeter(); - final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet); - while (resultSet.next()) { - dataFileWriter.appendEncoded(converter.convertResultSetIntoAvroBytes()); - this.metering.incrementRecordCount(); - } + convertAllResultSet(resultSet, JdbcAvroRecordConverter.create(resultSet)); + queue.put(ByteBuffer.allocate(0)); // write final record, so that consumer stops + final long startTime2 = System.nanoTime(); + future.get(); + executorService.shutdown(); + LOGGER.info(String.format("jdbcavroio : Waited %5.2f seconds for finishing write operation", + (System.nanoTime() - startTime2) / (1000000000.0))); this.dataFileWriter.flush(); this.metering.exposeWriteElapsed(); this.metering.exposeWrittenBytes(this.countingOutputStream.getCount()); } } + private void convertAllResultSet(ResultSet resultSet, JdbcAvroRecordConverter converter) + throws SQLException, InterruptedException, IOException { + while (resultSet.next()) { + queue.put(converter.convertResultSetIntoAvroBytes()); + this.metering.incrementRecordCount(); + } + } + @Override protected void finishWrite() throws Exception { - LOGGER.info("jdbcavroio : Closing connection, flushing writer..."); + LOGGER.debug("jdbcavroio : Closing connection, flushing writer..."); if (connection != null) { connection.close(); }