diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3
index 43f2875271d5..e07200177029 100644
--- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3
@@ -177,7 +177,7 @@ kubernetes-model-settings/4.12.0//kubernetes-model-settings-4.12.0.jar
kubernetes-model-storageclass/4.12.0//kubernetes-model-storageclass-4.12.0.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
-libthrift/0.12.0//libthrift-0.12.0.jar
+libthrift/0.16.0//libthrift-0.16.0.jar
log4j/1.2.17//log4j-1.2.17.jar
logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar
lz4-java/1.7.1//lz4-java-1.7.1.jar
diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3
index 3fe16b2c7b51..8b83f55ba980 100644
--- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3
@@ -189,7 +189,7 @@ kubernetes-model-settings/4.12.0//kubernetes-model-settings-4.12.0.jar
kubernetes-model-storageclass/4.12.0//kubernetes-model-storageclass-4.12.0.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
-libthrift/0.12.0//libthrift-0.12.0.jar
+libthrift/0.16.0//libthrift-0.16.0.jar
log4j/1.2.17//log4j-1.2.17.jar
logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar
lz4-java/1.7.1//lz4-java-1.7.1.jar
diff --git a/pom.xml b/pom.xml
index 17995dc9f565..236662fa612d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -188,7 +188,7 @@
2.10.5
3.5.2
3.0.0
- 0.12.0
+ 0.16.0
4.8-1
1.1
3.141.59
@@ -2295,6 +2295,10 @@
org.slf4j
slf4j-api
+
+ javax.annotation
+ javax.annotation-api
+
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HiveAuthFactory.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HiveAuthFactory.java
index 0ef8ef28f116..51377129c544 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HiveAuthFactory.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HiveAuthFactory.java
@@ -251,7 +251,7 @@ public static UserGroupInformation loginFromSpnegoKeytabAndReturnUGI(HiveConf hi
}
}
- public static TTransport getSocketTransport(String host, int port, int loginTimeout) {
+ public static TTransport getSocketTransport(String host, int port, int loginTimeout) throws TTransportException {
return new TSocket(host, port, loginTimeout);
}
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/KerberosSaslHelper.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/KerberosSaslHelper.java
index cb9595009ecf..574d1813d305 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/KerberosSaslHelper.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/KerberosSaslHelper.java
@@ -31,6 +31,7 @@
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
public final class KerberosSaslHelper {
@@ -69,8 +70,8 @@ public static TTransport createSubjectAssumedTransport(String principal,
new TSaslClientTransport("GSSAPI", null, names[0], names[1], saslProps, null,
underlyingTransport);
return new TSubjectAssumingTransport(saslTransport);
- } catch (SaslException se) {
- throw new IOException("Could not instantiate SASL transport", se);
+ } catch (SaslException | TTransportException se) {
+ throw new IOException("Could not instantiate transport", se);
}
}
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PlainSaslHelper.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PlainSaslHelper.java
index 80ab3f8b2465..14c15f2b29e2 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PlainSaslHelper.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/PlainSaslHelper.java
@@ -39,6 +39,7 @@
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
public final class PlainSaslHelper {
@@ -65,7 +66,7 @@ public static TTransportFactory getPlainTransportFactory(String authTypeStr)
}
public static TTransport getPlainTransport(String username, String password,
- TTransport underlyingTransport) throws SaslException {
+ TTransport underlyingTransport) throws SaslException, TTransportException {
return new TSaslClientTransport("PLAIN", null, null, null, new HashMap(),
new PlainCallbackHandler(username, password), underlyingTransport);
}
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java
index 91a30cdefaec..e39052411d5d 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSetIpAddressProcessor.java
@@ -46,11 +46,12 @@ public TSetIpAddressProcessor(Iface iface) {
}
@Override
- public boolean process(final TProtocol in, final TProtocol out) throws TException {
+ public void process(final TProtocol in, final TProtocol out) throws TException {
setIpAddress(in);
setUserName(in);
try {
- return super.process(in, out);
+ super.process(in, out);
+ return;
} finally {
THREAD_LOCAL_USER_NAME.remove();
THREAD_LOCAL_IP_ADDRESS.remove();
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
index 8085c8d12e6b..409b642a36e2 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -91,16 +91,10 @@ protected void initializeServer() {
// Server args
int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
- int requestTimeout = (int) hiveConf.getTimeVar(
- HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
- int beBackoffSlotLength = (int) hiveConf.getTimeVar(
- HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
.processorFactory(processorFactory).transportFactory(transportFactory)
.protocolFactory(new TBinaryProtocol.Factory())
.inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
- .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
- .beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
.executorService(executorService);
// TCP Server
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index a65951c8aec2..3cda94b71f4c 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -84,6 +84,16 @@ public void setSessionHandle(SessionHandle sessionHandle) {
public SessionHandle getSessionHandle() {
return sessionHandle;
}
+
+ @Override
+ public T unwrap(Class aClass) {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class> aClass) {
+ return false;
+ }
}
public ThriftCLIService(CLIService service, String serviceName) {
diff --git a/sql/hive/src/main/java/org/apache/thrift/transport/TFramedTransport.java b/sql/hive/src/main/java/org/apache/thrift/transport/TFramedTransport.java
new file mode 100644
index 000000000000..4b32108c7d20
--- /dev/null
+++ b/sql/hive/src/main/java/org/apache/thrift/transport/TFramedTransport.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.thrift.transport;
+
+
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TConfiguration;
+
+/**
+ * This is based on libthrift-0.12.0 {@link org.apache.thrift.transport.TFramedTransport}.
+ * To fix class of org.apache.thrift.transport.TFramedTransport not found after upgrading libthrift.
+ *
+ * TFramedTransport is a buffered TTransport that ensures a fully read message
+ * every time by preceding messages with a 4-byte frame size.
+ */
+public class TFramedTransport extends TTransport {
+
+ protected static final int DEFAULT_MAX_LENGTH = 16384000;
+
+ private int maxLength_;
+
+ /**
+ * Underlying transport
+ */
+ private TTransport transport_ = null;
+
+ /**
+ * Buffer for output
+ */
+ private final TByteArrayOutputStream writeBuffer_ =
+ new TByteArrayOutputStream(1024);
+
+ /**
+ * Buffer for input
+ */
+ private final TMemoryInputTransport readBuffer_ =
+ new TMemoryInputTransport(new byte[0]);
+
+ public static class Factory extends TTransportFactory {
+ private int maxLength_;
+
+ public Factory() {
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public Factory(int maxLength) {
+ maxLength_ = maxLength;
+ }
+
+ @Override
+ public TTransport getTransport(TTransport base) throws TTransportException {
+ return new TFramedTransport(base, maxLength_);
+ }
+ }
+
+ /**
+ * Constructor wraps around another transport
+ */
+ public TFramedTransport(TTransport transport, int maxLength) throws TTransportException {
+ transport_ = transport;
+ maxLength_ = maxLength;
+ }
+
+ public TFramedTransport(TTransport transport) throws TTransportException {
+ transport_ = transport;
+ maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ }
+
+ public void open() throws TTransportException {
+ transport_.open();
+ }
+
+ public boolean isOpen() {
+ return transport_.isOpen();
+ }
+
+ public void close() {
+ transport_.close();
+ }
+
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ int got = readBuffer_.read(buf, off, len);
+ if (got > 0) {
+ return got;
+ }
+
+ // Read another frame of data
+ readFrame();
+
+ return readBuffer_.read(buf, off, len);
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return readBuffer_.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return readBuffer_.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return readBuffer_.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ readBuffer_.consumeBuffer(len);
+ }
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long l) throws TTransportException {
+
+ }
+
+ @Override
+ public void checkReadBytesAvailable(long l) throws TTransportException {
+
+ }
+
+ public void clear() {
+ readBuffer_.clear();
+ }
+
+ private final byte[] i32buf = new byte[4];
+
+ private void readFrame() throws TTransportException {
+ transport_.readAll(i32buf, 0, 4);
+ int size = decodeFrameSize(i32buf);
+
+ if (size < 0) {
+ close();
+ throw new TTransportException(TTransportException.CORRUPTED_DATA,
+ "Read a negative frame size (" + size + ")!");
+ }
+
+ if (size > maxLength_) {
+ close();
+ throw new TTransportException(TTransportException.CORRUPTED_DATA,
+ "Frame size (" + size + ") larger than max length (" + maxLength_ + ")!");
+ }
+
+ byte[] buff = new byte[size];
+ transport_.readAll(buff, 0, size);
+ readBuffer_.reset(buff);
+ }
+
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ writeBuffer_.write(buf, off, len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ byte[] buf = writeBuffer_.get();
+ int len = writeBuffer_.len();
+ writeBuffer_.reset();
+
+ encodeFrameSize(len, i32buf);
+ transport_.write(i32buf, 0, 4);
+ transport_.write(buf, 0, len);
+ transport_.flush();
+ }
+
+ public static final void encodeFrameSize(final int frameSize, final byte[] buf) {
+ buf[0] = (byte)(0xff & (frameSize >> 24));
+ buf[1] = (byte)(0xff & (frameSize >> 16));
+ buf[2] = (byte)(0xff & (frameSize >> 8));
+ buf[3] = (byte)(0xff & (frameSize));
+ }
+
+ public static final int decodeFrameSize(final byte[] buf) {
+ return
+ ((buf[0] & 0xff) << 24) |
+ ((buf[1] & 0xff) << 16) |
+ ((buf[2] & 0xff) << 8) |
+ ((buf[3] & 0xff));
+ }
+}