Skip to content

Commit

Permalink
[FLINK-27805][Connectors/ORC] bump orc version to 1.7.2
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiawinds authored and U-CORP\liujia10 committed May 30, 2022
1 parent 0b139a1 commit f3bf29e
Show file tree
Hide file tree
Showing 13 changed files with 918 additions and 278 deletions.
12 changes: 12 additions & 0 deletions flink-formats/flink-orc-nohive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ under the License.
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>

<!-- Tests -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,27 @@ public class NoHivePhysicalWriterImpl extends PhysicalWriterImpl {
public NoHivePhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts)
throws IOException {
super(out, opts);
noHiveProtobufWriter = CodedOutputStream.newInstance(writer);
noHiveProtobufWriter = CodedOutputStream.newInstance(compressStream);
}

@Override
protected void writeMetadata(OrcProto.Metadata metadata) throws IOException {
metadata.writeTo(noHiveProtobufWriter);
noHiveProtobufWriter.flush();
writer.flush();
compressStream.flush();
}

@Override
protected void writeFileFooter(OrcProto.Footer footer) throws IOException {
footer.writeTo(noHiveProtobufWriter);
noHiveProtobufWriter.flush();
writer.flush();
compressStream.flush();
}

@Override
protected void writeStripeFooter(OrcProto.StripeFooter footer) throws IOException {
footer.writeTo(noHiveProtobufWriter);
noHiveProtobufWriter.flush();
writer.flush();
compressStream.flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ protected void prepareReadFileWithTypes(String file, int rowSize) throws IOExcep
TypeDescription schema =
TypeDescription.fromString(
"struct<"
+ "f0:float,"
+ "f1:double,"
+ "f2:timestamp,"
+ "f3:tinyint,"
+ "f4:smallint"
+ "_col0:float,"
+ "_col1:double,"
+ "_col2:timestamp,"
+ "_col3:tinyint,"
+ "_col4:smallint"
+ ">");

org.apache.hadoop.fs.Path filePath = new org.apache.hadoop.fs.Path(file);
Expand Down Expand Up @@ -105,7 +105,9 @@ protected OrcColumnarRowSplitReader createReader(
throws IOException {
return OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(
new Configuration(),
IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new),
IntStream.range(0, fullTypes.length)
.mapToObj(i -> "_col" + i)
.toArray(String[]::new),
fullTypes,
partitionSpec,
selectedFields,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.orc.writer;

import org.apache.hadoop.conf.Configuration;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.CryptoUtils;
import org.apache.orc.impl.HadoopShims;
import org.apache.orc.impl.KeyProvider;
import org.apache.orc.impl.writer.WriterEncryptionKey;
import org.apache.orc.impl.writer.WriterEncryptionVariant;

import java.io.IOException;
import java.security.SecureRandom;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

/**
* Copy encryption variants generation code from org.apache.orc:orc-core:1.7.2 {@link
* org.apache.orc.impl.WriterImpl}. It's used to get encryption variants which are same as {@link
* org.apache.orc.impl.WriterImpl} generated.
*
* <p>NOTE: If the ORC dependency version is updated, this file may have to be updated as well to be
* in sync with the new version's WriterImpl.
*/
public class EncryptionProvider {

private final SortedMap<String, WriterEncryptionKey> keys = new TreeMap<>();

private final OrcFile.WriterOptions opts;

public EncryptionProvider(OrcFile.WriterOptions opts) {
this.opts = opts;
}

public WriterEncryptionVariant[] getEncryptionVariants() throws IOException {
TypeDescription schema = opts.getSchema();
schema.annotateEncryption(opts.getEncryption(), opts.getMasks());
return setupEncryption(opts.getKeyProvider(), schema, opts.getKeyOverrides());
}

/**
* Iterate through the encryption options given by the user and set up our data structures.
*
* @param provider the KeyProvider to use to generate keys
* @param schema the type tree that we search for annotations
* @param keyOverrides user specified key overrides
*/
private WriterEncryptionVariant[] setupEncryption(
KeyProvider provider,
TypeDescription schema,
Map<String, HadoopShims.KeyMetadata> keyOverrides)
throws IOException {
KeyProvider keyProvider =
provider != null
? provider
: CryptoUtils.getKeyProvider(new Configuration(), new SecureRandom());
// Load the overrides into the cache so that we use the required key versions.
for (HadoopShims.KeyMetadata key : keyOverrides.values()) {
keys.put(key.getKeyName(), new WriterEncryptionKey(key));
}
int variantCount = visitTypeTree(schema, false, keyProvider);

// Now that we have de-duped the keys and maskDescriptions, make the arrays
int nextId = 0;
int nextVariantId = 0;
WriterEncryptionVariant[] result = new WriterEncryptionVariant[variantCount];
for (WriterEncryptionKey key : keys.values()) {
key.setId(nextId++);
key.sortRoots();
for (WriterEncryptionVariant variant : key.getEncryptionRoots()) {
result[nextVariantId] = variant;
variant.setId(nextVariantId++);
}
}
return result;
}

private int visitTypeTree(TypeDescription schema, boolean encrypted, KeyProvider provider)
throws IOException {
int result = 0;
String keyName = schema.getAttributeValue(TypeDescription.ENCRYPT_ATTRIBUTE);
if (keyName != null) {
if (provider == null) {
throw new IllegalArgumentException("Encryption requires a KeyProvider.");
}
if (encrypted) {
throw new IllegalArgumentException("Nested encryption type: " + schema);
}
encrypted = true;
result += 1;
WriterEncryptionKey key = getKey(keyName, provider);
HadoopShims.KeyMetadata metadata = key.getMetadata();
WriterEncryptionVariant variant =
new WriterEncryptionVariant(key, schema, provider.createLocalKey(metadata));
key.addRoot(variant);
}
List<TypeDescription> children = schema.getChildren();
if (children != null) {
for (TypeDescription child : children) {
result += visitTypeTree(child, encrypted, provider);
}
}
return result;
}

private WriterEncryptionKey getKey(String keyName, KeyProvider provider) throws IOException {
WriterEncryptionKey result = keys.get(keyName);
if (result == null) {
result = new WriterEncryptionKey(provider.getCurrentKeyVersion(keyName));
keys.put(keyName, result);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.impl.WriterImpl;

Expand Down Expand Up @@ -73,6 +74,13 @@ public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuratio
this(vectorizer, null, configuration);
}

public OrcBulkWriterFactory(Vectorizer<T> vectorizer, OrcFile.WriterOptions writerOptions) {
this.vectorizer = vectorizer;
this.writerOptions = writerOptions;
this.writerProperties = null;
this.confMap = new HashMap<>();
}

/**
* Creates a new OrcBulkWriterFactory using the provided Vectorizer, Hadoop Configuration, ORC
* writer properties.
Expand Down Expand Up @@ -115,9 +123,12 @@ protected OrcFile.WriterOptions getWriterOptions() {
}

writerOptions = OrcFile.writerOptions(writerProperties, conf);
writerOptions.setSchema(this.vectorizer.getSchema());
}

// Column encryption configuration
writerOptions.encrypt(OrcConf.ENCRYPTION.getString(writerProperties, conf));
writerOptions.masks(OrcConf.DATA_MASK.getString(writerProperties, conf));
}
writerOptions.setSchema(this.vectorizer.getSchema());
return writerOptions;
}
}
Loading

0 comments on commit f3bf29e

Please sign in to comment.