Skip to content

Commit 3efb9d2

Browse files
committed
attempt to fix the Subject propagation nonsense
1 parent c202601 commit 3efb9d2

File tree

3 files changed

+82
-0
lines changed

3 files changed

+82
-0
lines changed

hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/util/SubjectUtil.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
public class SubjectUtil {
1919
private static final MethodHandle CALL_AS = lookupCallAs();
2020
private static final MethodHandle CURRENT = lookupCurrent();
21+
private static boolean HAS_CALL_AS = true;
2122

2223
private SubjectUtil() {
2324
}
@@ -33,6 +34,7 @@ private static MethodHandle lookupCallAs() {
3334
return lookup.findStatic(Subject.class, "callAs",
3435
MethodType.methodType(Object.class, Subject.class, Callable.class));
3536
} catch (NoSuchMethodException x) {
37+
HAS_CALL_AS = false;
3638
try {
3739
// Lookup the old API.
3840
MethodType oldSignature = MethodType.methodType(Object.class, Subject.class,
@@ -168,6 +170,25 @@ public static Subject current() {
168170
}
169171
}
170172

173+
public static Runnable wrap(Runnable r) {
174+
if (!HAS_CALL_AS) {
175+
return r;
176+
}
177+
Subject s = current();
178+
return () -> callAs(s, () -> {
179+
r.run();
180+
return null;
181+
});
182+
}
183+
184+
public static <T> Callable<T> wrap(Callable<T> c) {
185+
if (!HAS_CALL_AS) {
186+
return c;
187+
}
188+
Subject s = current();
189+
return () -> callAs(s, () -> c.call());
190+
}
191+
171192
@SuppressWarnings("unused")
172193
private static <T> PrivilegedExceptionAction<T> callableToPrivilegedExceptionAction(Callable<T> callable) {
173194
return callable::call;

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopScheduledThreadPoolExecutor.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,18 @@
2020

2121
package org.apache.hadoop.util.concurrent;
2222

23+
import org.apache.hadoop.util.SubjectUtil;
24+
2325
import org.slf4j.Logger;
2426
import org.slf4j.LoggerFactory;
2527

28+
import java.util.concurrent.Callable;
29+
import java.util.concurrent.RejectedExecutionException;
2630
import java.util.concurrent.RejectedExecutionHandler;
31+
import java.util.concurrent.ScheduledFuture;
2732
import java.util.concurrent.ScheduledThreadPoolExecutor;
2833
import java.util.concurrent.ThreadFactory;
34+
import java.util.concurrent.TimeUnit;
2935

3036
/** An extension of ScheduledThreadPoolExecutor that provides additional
3137
* functionality. */
@@ -68,4 +74,53 @@ protected void afterExecute(Runnable r, Throwable t) {
6874
super.afterExecute(r, t);
6975
ExecutorHelper.logThrowableFromAfterExecute(r, t);
7076
}
77+
78+
/**
79+
* @throws RejectedExecutionException {@inheritDoc}
80+
* @throws NullPointerException {@inheritDoc}
81+
*/
82+
@Override
83+
public ScheduledFuture<?> schedule(Runnable command,
84+
long delay,
85+
TimeUnit unit) {
86+
return super.schedule(SubjectUtil.wrap(command), delay, unit);
87+
}
88+
89+
/**
90+
* @throws RejectedExecutionException {@inheritDoc}
91+
* @throws NullPointerException {@inheritDoc}
92+
*/
93+
@Override
94+
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
95+
long delay,
96+
TimeUnit unit) {
97+
return super.schedule(SubjectUtil.wrap(callable), delay, unit);
98+
}
99+
100+
/**
101+
* @throws RejectedExecutionException {@inheritDoc}
102+
* @throws NullPointerException {@inheritDoc}
103+
* @throws IllegalArgumentException {@inheritDoc}
104+
*/
105+
@Override
106+
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
107+
long initialDelay,
108+
long period,
109+
TimeUnit unit) {
110+
return super.scheduleAtFixedRate(SubjectUtil.wrap(command), initialDelay, period, unit);
111+
}
112+
113+
/**
114+
* @throws RejectedExecutionException {@inheritDoc}
115+
* @throws NullPointerException {@inheritDoc}
116+
* @throws IllegalArgumentException {@inheritDoc}
117+
*/
118+
@Override
119+
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
120+
long initialDelay,
121+
long delay,
122+
TimeUnit unit) {
123+
return super.scheduleWithFixedDelay(SubjectUtil.wrap(command), initialDelay, delay, unit);
124+
}
125+
71126
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopThreadPoolExecutor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package org.apache.hadoop.util.concurrent;
2222

23+
import org.apache.hadoop.util.SubjectUtil;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

@@ -76,6 +77,11 @@ public HadoopThreadPoolExecutor(int corePoolSize,
7677
threadFactory, handler);
7778
}
7879

80+
@Override
81+
public void execute(Runnable command) {
82+
super.execute(SubjectUtil.wrap(command));
83+
}
84+
7985
@Override
8086
protected void beforeExecute(Thread t, Runnable r) {
8187
if (LOG.isDebugEnabled()) {

0 commit comments

Comments
 (0)