Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.junit.Test;
import org.apache.parquet.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

public class TestInputOutputFormat {
private static final Log LOG = Log.getLog(TestInputOutputFormat.class);
private static final Logger LOG = LoggerFactory.getLogger(TestInputOutputFormat.class);

private static Schema avroSchema;
static {
Expand Down Expand Up @@ -132,10 +133,10 @@ public void testReadWrite() throws Exception {
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job " + job.getJobName());
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.parquet.Log;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.filter.ColumnPredicates;
import org.apache.parquet.filter.ColumnRecordFilter;
Expand All @@ -46,6 +45,8 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.Thread.sleep;
import static org.junit.Assert.assertArrayEquals;
Expand All @@ -55,7 +56,7 @@
import static org.junit.Assert.fail;

public class TestReflectInputOutputFormat {
private static final Log LOG = Log.getLog(TestReflectInputOutputFormat.class);
private static final Logger LOG = LoggerFactory.getLogger(TestReflectInputOutputFormat.class);


public static class Service {
Expand Down Expand Up @@ -477,10 +478,10 @@ public void testReadWriteChangedCar() throws Exception {
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job " + job.getJobName());
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.parquet.Log;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.filter.ColumnPredicates;
import org.apache.parquet.filter.ColumnRecordFilter;
import org.apache.parquet.filter.RecordFilter;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestSpecificInputOutputFormat {
private static final Log LOG = Log.getLog(TestSpecificInputOutputFormat.class);
private static final Logger LOG = LoggerFactory.getLogger(TestSpecificInputOutputFormat.class);

public static Car nextRecord(int i) {
String vin = "1VXBR12EXCP000000";
Expand Down Expand Up @@ -268,10 +269,10 @@ public void testReadWriteChangedCar() throws Exception {
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job " + job.getJobName());
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import org.apache.parquet.VersionParser.ParsedVersion;
import org.apache.parquet.column.Encoding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CorruptDeltaByteArrays {
private static final Log LOG = Log.getLog(CorruptStatistics.class);
private static final Logger LOG = LoggerFactory.getLogger(CorruptStatistics.class);

private static final SemanticVersion PARQUET_246_FIXED_VERSION =
new SemanticVersion(1, 8, 0);
Expand All @@ -43,7 +45,7 @@ public static boolean requiresSequentialReads(ParsedVersion version, Encoding en

if (!version.hasSemanticVersion()) {
LOG.warn("Requiring sequential reads because created_by did not " +
"contain a valid version (see PARQUET-246): " + version.version);
"contain a valid version (see PARQUET-246): {}", version.version);
return true;
}

Expand All @@ -61,7 +63,7 @@ public static boolean requiresSequentialReads(SemanticVersion semver, Encoding e

if (semver.compareTo(PARQUET_246_FIXED_VERSION) < 0) {
LOG.info("Requiring sequential reads because this file was created " +
"prior to " + PARQUET_246_FIXED_VERSION + ". See PARQUET-246" );
"prior to {}. See PARQUET-246", PARQUET_246_FIXED_VERSION );
return true;
}

Expand All @@ -75,8 +77,7 @@ public static boolean requiresSequentialReads(String createdBy, Encoding encodin
}

if (Strings.isNullOrEmpty(createdBy)) {
LOG.info("Requiring sequential reads because file version is empty. " +
"See PARQUET-246");
LOG.info("Requiring sequential reads because file version is empty. See PARQUET-246");
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.parquet.VersionParser.ParsedVersion;
import org.apache.parquet.VersionParser.VersionParseException;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* There was a bug (PARQUET-251) that caused the statistics metadata
Expand All @@ -35,7 +37,7 @@
public class CorruptStatistics {
private static final AtomicBoolean alreadyLogged = new AtomicBoolean(false);

private static final Log LOG = Log.getLog(CorruptStatistics.class);
private static final Logger LOG = LoggerFactory.getLogger(CorruptStatistics.class);

// the version in which the bug described by jira: PARQUET-251 was fixed
// the bug involved writing invalid binary statistics, so stats written prior to this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.parquet.column.impl;

import static java.lang.String.format;
import static org.apache.parquet.Log.DEBUG;
import static org.apache.parquet.Preconditions.checkNotNull;
import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
Expand All @@ -30,7 +29,6 @@
import java.nio.ByteBuffer;

import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.Log;
import org.apache.parquet.VersionParser.ParsedVersion;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
Expand All @@ -51,6 +49,8 @@
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* ColumnReader implementation
Expand All @@ -59,7 +59,7 @@
*
*/
public class ColumnReaderImpl implements ColumnReader {
private static final Log LOG = Log.getLog(ColumnReaderImpl.class);
private static final Logger LOG = LoggerFactory.getLogger(ColumnReaderImpl.class);

/**
* binds the lower level page decoder to the record converter materializing the records
Expand Down Expand Up @@ -523,7 +523,7 @@ private void readRepetitionAndDefinitionLevels() {
private void checkRead() {
if (isPageFullyConsumed()) {
if (isFullyConsumed()) {
if (DEBUG) LOG.debug("end reached");
LOG.debug("end reached");
repetitionLevel = 0; // the next repetition level
return;
}
Expand All @@ -533,7 +533,7 @@ private void checkRead() {
}

private void readPage() {
if (DEBUG) LOG.debug("loading page");
LOG.debug("loading page");
DataPage page = pageReader.readPage();
page.accept(new DataPage.Visitor<Void>() {
@Override
Expand Down Expand Up @@ -590,14 +590,14 @@ private void readPageV1(DataPageV1 page) {
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
ByteBuffer bytes = page.getBytes().toByteBuffer();
if (DEBUG) LOG.debug("page size " + bytes.remaining() + " bytes and " + pageValueCount + " records");
if (DEBUG) LOG.debug("reading repetition levels at 0");
LOG.debug("page size {} bytes and {} records", bytes.remaining(), pageValueCount);
LOG.debug("reading repetition levels at 0");
rlReader.initFromPage(pageValueCount, bytes, 0);
int next = rlReader.getNextOffset();
if (DEBUG) LOG.debug("reading definition levels at " + next);
LOG.debug("reading definition levels at {}", next);
dlReader.initFromPage(pageValueCount, bytes, next);
next = dlReader.getNextOffset();
if (DEBUG) LOG.debug("reading data at " + next);
LOG.debug("reading data at {}", next);
initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
Expand All @@ -608,7 +608,7 @@ private void readPageV2(DataPageV2 page) {
this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
try {
if (DEBUG) LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
initDataReader(page.getDataEncoding(), page.getData().toByteBuffer(), 0, page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.io.IOException;

import org.apache.parquet.Log;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.ParquetProperties;
Expand All @@ -32,8 +31,8 @@
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.api.Binary;

import static java.lang.Math.max;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
Expand All @@ -42,8 +41,11 @@
*
*/
final class ColumnWriterV1 implements ColumnWriter {
private static final Log LOG = Log.getLog(ColumnWriterV1.class);
private static final boolean DEBUG = Log.DEBUG;
private static final Logger LOG = LoggerFactory.getLogger(ColumnWriterV1.class);

// By default: Debugging disabled this way (using the "if (DEBUG)" IN the methods) to allow
// the java compiler (not the JIT) to remove the unused statements during build time.
private static final boolean DEBUG = false;

private final ColumnDescriptor path;
private final PageWriter pageWriter;
Expand Down Expand Up @@ -74,7 +76,7 @@ public ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter,
}

private void log(Object value, int r, int d) {
LOG.debug(path + " " + value + " r:" + r + " d:" + d);
if (DEBUG) LOG.debug( "{} {} r:{} d:{}", path, value, r, d);
}

private void resetStatistics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@
*/
package org.apache.parquet.column.impl;

import static java.lang.Math.max;
import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;

import java.io.IOException;

import org.apache.parquet.Ints;
import org.apache.parquet.Log;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.Encoding;
Expand All @@ -38,6 +33,8 @@
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.api.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
Expand All @@ -46,8 +43,11 @@
*
*/
final class ColumnWriterV2 implements ColumnWriter {
private static final Log LOG = Log.getLog(ColumnWriterV2.class);
private static final boolean DEBUG = Log.DEBUG;
private static final Logger LOG = LoggerFactory.getLogger(ColumnWriterV2.class);

// By default: Debugging disabled this way (using the "if (DEBUG)" IN the methods) to allow
// the java compiler (not the JIT) to remove the unused statements during build time.
private static final boolean DEBUG = false;

private final ColumnDescriptor path;
private final PageWriter pageWriter;
Expand All @@ -73,7 +73,7 @@ public ColumnWriterV2(
}

private void log(Object value, int r, int d) {
LOG.debug(path + " " + value + " r:" + r + " d:" + d);
LOG.debug("{} {} r:{} d:{}", path, value, r, d);
}

private void resetStatistics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import java.nio.ByteBuffer;

import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.Log;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingReader;
import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* a column reader that packs the ints in the number of bits required based on the maximum size.
Expand All @@ -38,7 +39,7 @@
*
*/
public class BitPackingValuesReader extends ValuesReader {
private static final Log LOG = Log.getLog(BitPackingValuesReader.class);
private static final Logger LOG = LoggerFactory.getLogger(BitPackingValuesReader.class);

private ByteBufferInputStream in;
private BitPackingReader bitPackingReader;
Expand Down Expand Up @@ -73,7 +74,7 @@ public int readInteger() {
public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
int effectiveBitLength = valueCount * bitsPerValue;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
if (Log.DEBUG) LOG.debug("reading " + length + " bytes for " + valueCount + " values of size " + bitsPerValue + " bits." );
LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitsPerValue);
this.in = new ByteBufferInputStream(in, offset, length);
this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
this.nextOffset = offset + length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import java.util.Arrays;
import java.nio.ByteBuffer;

import org.apache.parquet.Log;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ByteBitPackingValuesReader extends ValuesReader {
private static final int VALUES_AT_A_TIME = 8; // because we're using unpack8Values()

private static final Log LOG = Log.getLog(ByteBitPackingValuesReader.class);
private static final Logger LOG = LoggerFactory.getLogger(ByteBitPackingValuesReader.class);

private final int bitWidth;
private final BytePacker packer;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void initFromPage(int valueCount, ByteBuffer page, int offset)
throws IOException {
int effectiveBitLength = valueCount * bitWidth;
int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil
if (Log.DEBUG) LOG.debug("reading " + length + " bytes for " + valueCount + " values of size " + bitWidth + " bits." );
LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitWidth);
this.encoded = page;
this.encodedPos = offset;
this.decodedPosition = VALUES_AT_A_TIME - 1;
Expand Down
Loading