diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThread.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThread.java new file mode 100644 index 00000000000..a836561c50a --- /dev/null +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThread.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.common.threadlocal; + +/** + * InternalThread + */ +public class InternalThread extends Thread { + + private InternalThreadLocalMap threadLocalMap; + + public InternalThread() { + } + + public InternalThread(Runnable target) { + super(target); + } + + public InternalThread(ThreadGroup group, Runnable target) { + super(group, target); + } + + public InternalThread(String name) { + super(name); + } + + public InternalThread(ThreadGroup group, String name) { + super(group, name); + } + + public InternalThread(Runnable target, String name) { + super(target, name); + } + + public InternalThread(ThreadGroup group, Runnable target, String name) { + super(group, target, name); + } + + public InternalThread(ThreadGroup group, Runnable target, String name, long stackSize) { + super(group, target, name, stackSize); + } + + /** + * Returns the internal data structure that keeps the threadLocal variables bound to this thread. + * Note that this method is for internal use only, and thus is subject to change at any time. + */ + public final InternalThreadLocalMap threadLocalMap() { + return threadLocalMap; + } + + /** + * Sets the internal data structure that keeps the threadLocal variables bound to this thread. + * Note that this method is for internal use only, and thus is subject to change at any time. + */ + public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { + this.threadLocalMap = threadLocalMap; + } +} diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocal.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocal.java new file mode 100644 index 00000000000..4eed57c4e84 --- /dev/null +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocal.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.common.threadlocal; + +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Set; + +/** + * InternalThreadLocal + * A special variant of {@link ThreadLocal} that yields higher access performance when accessed from a + * {@link InternalThread}. + *

+ * Internally, a {@link InternalThread} uses a constant index in an array, instead of using hash code and hash table, + * to look for a variable. Although seemingly very subtle, it yields slight performance advantage over using a hash + * table, and it is useful when accessed frequently. + *

+ * This design is learning from {@see io.netty.util.concurrent.FastThreadLocal} which is in Netty. + */ +public class InternalThreadLocal { + + private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); + + private final int index; + + public InternalThreadLocal() { + index = InternalThreadLocalMap.nextVariableIndex(); + } + + /** + * Removes all {@link InternalThreadLocal} variables bound to the current thread. This operation is useful when you + * are in a container environment, and you don't want to leave the thread local variables in the threads you do not + * manage. + */ + @SuppressWarnings("unchecked") + public static void removeAll() { + InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); + if (threadLocalMap == null) { + return; + } + + try { + Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); + if (v != null && v != InternalThreadLocalMap.UNSET) { + Set> variablesToRemove = (Set>) v; + for (InternalThreadLocal tlv : variablesToRemove) { + tlv.remove(threadLocalMap); + } + } + } finally { + InternalThreadLocalMap.remove(); + } + } + + /** + * Returns the number of thread local variables bound to the current thread. + */ + public static int size() { + InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); + if (threadLocalMap == null) { + return 0; + } else { + return threadLocalMap.size(); + } + } + + public static void destroy() { + InternalThreadLocalMap.destroy(); + } + + @SuppressWarnings("unchecked") + private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, InternalThreadLocal variable) { + Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); + Set> variablesToRemove; + if (v == InternalThreadLocalMap.UNSET || v == null) { + variablesToRemove = Collections.newSetFromMap(new IdentityHashMap, Boolean>()); + threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); + } else { + variablesToRemove = (Set>) v; + } + + variablesToRemove.add(variable); + } + + @SuppressWarnings("unchecked") + private static void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, InternalThreadLocal variable) { + + Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); + + if (v == InternalThreadLocalMap.UNSET || v == null) { + return; + } + + Set> variablesToRemove = (Set>) v; + variablesToRemove.remove(variable); + } + + /** + * Returns the current value for the current thread + */ + @SuppressWarnings("unchecked") + public final V get() { + InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); + Object v = threadLocalMap.indexedVariable(index); + if (v != InternalThreadLocalMap.UNSET) { + return (V) v; + } + + return initialize(threadLocalMap); + } + + private V initialize(InternalThreadLocalMap threadLocalMap) { + V v = null; + try { + v = initialValue(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + threadLocalMap.setIndexedVariable(index, v); + addToVariablesToRemove(threadLocalMap, this); + return v; + } + + /** + * Sets the value for the current thread. + */ + public final void set(V value) { + if (value == null || value == InternalThreadLocalMap.UNSET) { + remove(); + } else { + InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); + if (threadLocalMap.setIndexedVariable(index, value)) { + addToVariablesToRemove(threadLocalMap, this); + } + } + } + + /** + * Sets the value to uninitialized; a proceeding call to get() will trigger a call to initialValue(). + */ + @SuppressWarnings("unchecked") + public final void remove() { + remove(InternalThreadLocalMap.getIfSet()); + } + + /** + * Sets the value to uninitialized for the specified thread local map; + * a proceeding call to get() will trigger a call to initialValue(). + * The specified thread local map must be for the current thread. + */ + @SuppressWarnings("unchecked") + public final void remove(InternalThreadLocalMap threadLocalMap) { + if (threadLocalMap == null) { + return; + } + + Object v = threadLocalMap.removeIndexedVariable(index); + removeFromVariablesToRemove(threadLocalMap, this); + + if (v != InternalThreadLocalMap.UNSET) { + try { + onRemoval((V) v); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Returns the initial value for this thread-local variable. + */ + protected V initialValue() throws Exception { + return null; + } + + /** + * Invoked when this thread local variable is removed by {@link #remove()}. + */ + protected void onRemoval(@SuppressWarnings("unused") V value) throws Exception { + } +} diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocalMap.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocalMap.java new file mode 100644 index 00000000000..e2aa0fd30dc --- /dev/null +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocalMap.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.common.threadlocal; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The internal data structure that stores the threadLocal variables for Netty and all {@link InternalThread}s. + * Note that this class is for internal use only. Use {@link InternalThread} + * unless you know what you are doing. + */ +public final class InternalThreadLocalMap { + + private Object[] indexedVariables; + + private static ThreadLocal slowThreadLocalMap = new ThreadLocal(); + + private static final AtomicInteger nextIndex = new AtomicInteger(); + + public static final Object UNSET = new Object(); + + public static InternalThreadLocalMap getIfSet() { + Thread thread = Thread.currentThread(); + if (thread instanceof InternalThread) { + return ((InternalThread) thread).threadLocalMap(); + } + return slowThreadLocalMap.get(); + } + + public static InternalThreadLocalMap get() { + Thread thread = Thread.currentThread(); + if (thread instanceof InternalThread) { + return fastGet((InternalThread) thread); + } + return slowGet(); + } + + public static void remove() { + Thread thread = Thread.currentThread(); + if (thread instanceof InternalThread) { + ((InternalThread) thread).setThreadLocalMap(null); + } else { + slowThreadLocalMap.remove(); + } + } + + public static void destroy() { + slowThreadLocalMap = null; + } + + public static int nextVariableIndex() { + int index = nextIndex.getAndIncrement(); + if (index < 0) { + nextIndex.decrementAndGet(); + throw new IllegalStateException("Too many thread-local indexed variables"); + } + return index; + } + + public static int lastVariableIndex() { + return nextIndex.get() - 1; + } + + private InternalThreadLocalMap() { + indexedVariables = newIndexedVariableTable(); + } + + public Object indexedVariable(int index) { + Object[] lookup = indexedVariables; + return index < lookup.length ? lookup[index] : UNSET; + } + + /** + * @return {@code true} if and only if a new thread-local variable has been created + */ + public boolean setIndexedVariable(int index, Object value) { + Object[] lookup = indexedVariables; + if (index < lookup.length) { + Object oldValue = lookup[index]; + lookup[index] = value; + return oldValue == UNSET; + } else { + expandIndexedVariableTableAndSet(index, value); + return true; + } + } + + public Object removeIndexedVariable(int index) { + Object[] lookup = indexedVariables; + if (index < lookup.length) { + Object v = lookup[index]; + lookup[index] = UNSET; + return v; + } else { + return UNSET; + } + } + + public int size() { + int count = 0; + for (Object o : indexedVariables) { + if (o != UNSET) { + ++count; + } + } + + //the fist element in `indexedVariables` is a set to keep all the InternalThreadLocal to remove + //look at method `addToVariablesToRemove` + return count - 1; + } + + private static Object[] newIndexedVariableTable() { + Object[] array = new Object[32]; + Arrays.fill(array, UNSET); + return array; + } + + private static InternalThreadLocalMap fastGet(InternalThread thread) { + InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); + if (threadLocalMap == null) { + thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); + } + return threadLocalMap; + } + + private static InternalThreadLocalMap slowGet() { + ThreadLocal slowThreadLocalMap = InternalThreadLocalMap.slowThreadLocalMap; + InternalThreadLocalMap ret = slowThreadLocalMap.get(); + if (ret == null) { + ret = new InternalThreadLocalMap(); + slowThreadLocalMap.set(ret); + } + return ret; + } + + private void expandIndexedVariableTableAndSet(int index, Object value) { + Object[] oldArray = indexedVariables; + final int oldCapacity = oldArray.length; + int newCapacity = index; + newCapacity |= newCapacity >>> 1; + newCapacity |= newCapacity >>> 2; + newCapacity |= newCapacity >>> 4; + newCapacity |= newCapacity >>> 8; + newCapacity |= newCapacity >>> 16; + newCapacity++; + + Object[] newArray = Arrays.copyOf(oldArray, newCapacity); + Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); + newArray[index] = value; + indexedVariables = newArray; + } +} diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/NamedInternalThreadFactory.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/NamedInternalThreadFactory.java new file mode 100644 index 00000000000..03868b91751 --- /dev/null +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadlocal/NamedInternalThreadFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.common.threadlocal; + +import com.alibaba.dubbo.common.utils.NamedThreadFactory; + +/** + * NamedInternalThreadFactory + * This is a threadFactory which produce {@link InternalThread} + */ +public class NamedInternalThreadFactory extends NamedThreadFactory { + + public NamedInternalThreadFactory() { + super(); + } + + public NamedInternalThreadFactory(String prefix) { + super(prefix, false); + } + + public NamedInternalThreadFactory(String prefix, boolean daemon) { + super(prefix, daemon); + } + + @Override + public Thread newThread(Runnable runnable) { + String name = mPrefix + mThreadNum.getAndIncrement(); + InternalThread ret = new InternalThread(mGroup, runnable, name, 0); + ret.setDaemon(mDaemon); + return ret; + } +} diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/cached/CachedThreadPool.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/cached/CachedThreadPool.java index f808ab926f3..c5aa285930c 100644 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/cached/CachedThreadPool.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/cached/CachedThreadPool.java @@ -18,9 +18,9 @@ import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.threadlocal.NamedInternalThreadFactory; import com.alibaba.dubbo.common.threadpool.ThreadPool; import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport; -import com.alibaba.dubbo.common.utils.NamedThreadFactory; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; @@ -47,7 +47,6 @@ public Executor getExecutor(URL url) { queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues)), - new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); + new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } - } diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPool.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPool.java index eb4e1f3feb7..2f97b6b517a 100644 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPool.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPool.java @@ -19,9 +19,9 @@ import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.threadlocal.NamedInternalThreadFactory; import com.alibaba.dubbo.common.threadpool.ThreadPool; import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport; -import com.alibaba.dubbo.common.utils.NamedThreadFactory; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -48,7 +48,7 @@ public Executor getExecutor(URL url) { alive, TimeUnit.MILLISECONDS, taskQueue, - new NamedThreadFactory(name, true), + new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/fixed/FixedThreadPool.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/fixed/FixedThreadPool.java index 15ebb20d08d..d3b61a58691 100644 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/fixed/FixedThreadPool.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/fixed/FixedThreadPool.java @@ -18,9 +18,9 @@ import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.threadlocal.NamedInternalThreadFactory; import com.alibaba.dubbo.common.threadpool.ThreadPool; import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport; -import com.alibaba.dubbo.common.utils.NamedThreadFactory; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; @@ -44,7 +44,7 @@ public Executor getExecutor(URL url) { queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues)), - new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); + new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } } diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/limited/LimitedThreadPool.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/limited/LimitedThreadPool.java index aa5a71f3da0..b50bc9ee715 100644 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/limited/LimitedThreadPool.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/limited/LimitedThreadPool.java @@ -19,9 +19,9 @@ import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.threadlocal.NamedInternalThreadFactory; import com.alibaba.dubbo.common.threadpool.ThreadPool; import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport; -import com.alibaba.dubbo.common.utils.NamedThreadFactory; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; @@ -45,7 +45,7 @@ public Executor getExecutor(URL url) { queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues)), - new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); + new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } } diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NamedThreadFactory.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NamedThreadFactory.java index 3d30a037239..36ace7ddf54 100755 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NamedThreadFactory.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NamedThreadFactory.java @@ -23,15 +23,16 @@ * InternalThreadFactory. */ public class NamedThreadFactory implements ThreadFactory { - private static final AtomicInteger POOL_SEQ = new AtomicInteger(1); - private final AtomicInteger mThreadNum = new AtomicInteger(1); + protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1); - private final String mPrefix; + protected final AtomicInteger mThreadNum = new AtomicInteger(1); - private final boolean mDaemon; + protected final String mPrefix; - private final ThreadGroup mGroup; + protected final boolean mDaemon; + + protected final ThreadGroup mGroup; public NamedThreadFactory() { this("pool-" + POOL_SEQ.getAndIncrement(), false); diff --git a/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocalTest.java b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocalTest.java new file mode 100644 index 00000000000..b0a8c28c294 --- /dev/null +++ b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadlocal/InternalThreadLocalTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.dubbo.common.threadlocal; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; + +public class InternalThreadLocalTest { + + private static final int THREADS = 10; + + private static final int PERFORMANCE_THREAD_COUNT = 1000; + + private static final int GET_COUNT = 1000000; + + @Test + public void testInternalThreadLocal() throws InterruptedException { + final AtomicInteger index = new AtomicInteger(0); + + final InternalThreadLocal internalThreadLocal = new InternalThreadLocal() { + + @Override + protected Integer initialValue() throws Exception { + Integer v = index.getAndIncrement(); + System.out.println("thread : " + Thread.currentThread().getName() + " init value : " + v); + return v; + } + }; + + for (int i = 0; i < THREADS; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + internalThreadLocal.get(); + } + }); + t.start(); + } + + Thread.sleep(2000); + } + + @Test + public void testSetAndGet() { + final Integer testVal = 10; + final InternalThreadLocal internalThreadLocal = new InternalThreadLocal(); + internalThreadLocal.set(testVal); + Assert.assertTrue("set is not equals get", + Objects.equals(testVal, internalThreadLocal.get())); + } + + @Test + public void testMultiThreadSetAndGet() throws InterruptedException { + final Integer testVal1 = 10; + final Integer testVal2 = 20; + final InternalThreadLocal internalThreadLocal = new InternalThreadLocal(); + final CountDownLatch countDownLatch = new CountDownLatch(2); + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + + internalThreadLocal.set(testVal1); + Assert.assertTrue("set is not equals get", + Objects.equals(testVal1, internalThreadLocal.get())); + countDownLatch.countDown(); + } + }); + t1.start(); + + Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + internalThreadLocal.set(testVal2); + Assert.assertTrue("set is not equals get", + Objects.equals(testVal2, internalThreadLocal.get())); + countDownLatch.countDown(); + } + }); + t2.start(); + countDownLatch.await(); + } + + /** + * print + * take[2689]ms + *

+ * This test is based on a Machine with 4 core and 16g memory. + */ + @Test + public void testPerformanceTradition() { + final ThreadLocal[] caches1 = new ThreadLocal[PERFORMANCE_THREAD_COUNT]; + final Thread mainThread = Thread.currentThread(); + for (int i = 0; i < PERFORMANCE_THREAD_COUNT; i++) { + caches1[i] = new ThreadLocal(); + } + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < PERFORMANCE_THREAD_COUNT; i++) { + caches1[i].set("float.lu"); + } + long start = System.nanoTime(); + for (int i = 0; i < PERFORMANCE_THREAD_COUNT; i++) { + for (int j = 0; j < GET_COUNT; j++) { + caches1[i].get(); + } + } + long end = System.nanoTime(); + System.out.println("take[" + TimeUnit.NANOSECONDS.toMillis(end - start) + + "]ms"); + LockSupport.unpark(mainThread); + } + }); + t1.start(); + LockSupport.park(mainThread); + } + + /** + * print + * take[14]ms + *

+ * This test is based on a Machine with 4 core and 16g memory. + */ + @Test + public void testPerformance() { + final InternalThreadLocal[] caches = new InternalThreadLocal[PERFORMANCE_THREAD_COUNT]; + final Thread mainThread = Thread.currentThread(); + for (int i = 0; i < PERFORMANCE_THREAD_COUNT; i++) { + caches[i] = new InternalThreadLocal(); + } + Thread t = new InternalThread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < PERFORMANCE_THREAD_COUNT; i++) { + caches[i].set("float.lu"); + } + long start = System.nanoTime(); + for (int i = 0; i < PERFORMANCE_THREAD_COUNT; i++) { + for (int j = 0; j < GET_COUNT; j++) { + caches[i].get(); + } + } + long end = System.nanoTime(); + System.out.println("take[" + TimeUnit.NANOSECONDS.toMillis(end - start) + + "]ms"); + LockSupport.unpark(mainThread); + } + }); + t.start(); + LockSupport.park(mainThread); + } +} \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java index a542cd64888..bf56e2d12d4 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcContext.java @@ -18,6 +18,7 @@ import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.threadlocal.InternalThreadLocal; import com.alibaba.dubbo.common.utils.NetUtils; import java.net.InetSocketAddress; @@ -45,12 +46,16 @@ */ public class RpcContext { - private static final ThreadLocal LOCAL = new ThreadLocal() { + /** + * use internal thread local to improve performance + */ + private static final InternalThreadLocal LOCAL = new InternalThreadLocal() { @Override protected RpcContext initialValue() { return new RpcContext(); } }; + private final Map attachments = new HashMap(); private final Map values = new HashMap(); private Future future;