Skip to content

Commit

Permalink
Move callback invocation in c.s.j.p.w.C.util.CallbackProxy into calli…
Browse files Browse the repository at this point in the history
…ng thread

Dispatching the invocation of the callback handler into an executor
makes it impossible to fill [out] parameters, as the return has already
happend and [in] parameters can not be savely used if they are not 
marshalled to java code because the calling code will free the parameters
after the call.

To prevent deadlocks ComThread is modified to allow COM calls from the
callback by modifying the ComThread helper to only dispatch the COM call
into the ComThread only if the calling thread has not COM already enabled.

Reference counting was modified, so that now on construction of a
ProxyObject the reference count is AddRef'ed once and Released once on
finalization.
  • Loading branch information
matthiasblaesing committed Jan 24, 2016
1 parent 37060cb commit 8d60f93
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@
*/
package com.sun.jna.platform.win32.COM.util;

import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import com.sun.jna.Pointer;
import com.sun.jna.WString;
Expand Down Expand Up @@ -61,20 +57,6 @@ public CallbackProxy(Factory factory, Class<?> comEventCallbackInterface,
this.listenedToRiid = this.createRIID(comEventCallbackInterface);
this.dsipIdMap = this.createDispIdMap(comEventCallbackInterface);
this.dispatchListener = new DispatchListener(this);
this.executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "COM Event Callback executor");
thread.setDaemon(true);
thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
CallbackProxy.this.factory.comThread.uncaughtExceptionHandler.uncaughtException(t, e);
}
});
return thread;
}
});
}

Factory factory;
Expand All @@ -83,7 +65,6 @@ public void uncaughtException(Thread t, Throwable e) {
REFIID listenedToRiid;
public DispatchListener dispatchListener;
Map<DISPID, Method> dsipIdMap;
ExecutorService executorService;

REFIID createRIID(Class<?> comEventCallbackInterface) {
ComInterface comInterfaceAnnotation = comEventCallbackInterface.getAnnotation(ComInterface.class);
Expand Down Expand Up @@ -138,76 +119,62 @@ void invokeOnThread(final DISPID dispIdMember, final REFIID riid, LCID lcid, WOR
return;
}

// decode arguments
// must decode them on this thread, and create a proxy for any COM objects (IDispatch)
// this will AddRef on the COM object so that it is not cleaned up before we can use it
// on the thread that does the java callback.
// Arguments are converted to the JAVA side and IDispatch Interfaces
// are wrapped into an ProxyObject if so requested.
//
// Out-Parameter need to be specified as VARIANT, VARIANT args are
// not converted, so COM memory allocation rules apply.
final Class<?>[] params = eventMethod.getParameterTypes();
List<Object> rjargs = new ArrayList<Object>();
if (pDispParams.cArgs.intValue() > 0) {
VariantArg vargs = pDispParams.rgvarg;
vargs.setArraySize(pDispParams.cArgs.intValue());
for ( int i = 0; i < vargs.variantArg.length; i++) {
Class targetClass = params[vargs.variantArg.length - 1 - i];
Variant.VARIANT varg = vargs.variantArg[i];
Object jarg = Convert.toJavaObject(varg, params[vargs.variantArg.length - 1 - i]);
Object jarg = Convert.toJavaObject(varg, targetClass);
if (jarg instanceof IDispatch) {
// If a dispatch is returned try to wrap it into a proxy
// helper if the target is ComInterface annotated
IDispatch dispatch = (IDispatch) jarg;
//get raw IUnknown interface
PointerByReference ppvObject = new PointerByReference();
IID iid = com.sun.jna.platform.win32.COM.IUnknown.IID_IUNKNOWN;
dispatch.QueryInterface(new REFIID(iid), ppvObject);
Unknown rawUnk = new Unknown(ppvObject.getValue());
long unknownId = Pointer.nativeValue( rawUnk.getPointer() );
int n = rawUnk.Release();
//Note: unlike in other places, there is currently no COM ref already added for this pointer
IUnknown unk = CallbackProxy.this.factory.createProxy(IUnknown.class, unknownId, dispatch);
rjargs.add(unk);
if(targetClass.getAnnotation(ComInterface.class) != null) {
rjargs.add(unk.queryInterface(targetClass));
} else {
rjargs.add(unk);
}
} else {
rjargs.add(jarg);
}
}
}
final List<Object> jargs = new ArrayList<Object>(rjargs);
Runnable invokation = new Runnable() {
@Override
public void run() {
// need to convert arguments maybe
List<Object> margs = new ArrayList<Object>();
try {
// Reverse order from calling convention
int lastParamIdx = eventMethod.getParameterTypes().length - 1;
for (int i = lastParamIdx; i >= 0; i--) {
Class<?> paramType = params[lastParamIdx - i];
Object jobj = jargs.get(i);
if (jobj != null && paramType.getAnnotation(ComInterface.class) != null) {
if (jobj instanceof IUnknown) {
IUnknown unk = (IUnknown) jobj;
Object mobj = unk.queryInterface(paramType);
margs.add(mobj);
} else {
throw new RuntimeException("Cannot convert argument " + jobj.getClass()
+ " to ComInterface " + paramType);
}
} else {
margs.add(jobj);
}
}
eventMethod.invoke(comEventCallbackListener, margs.toArray());
} catch (Exception e) {
List<String> decodedClassNames = new ArrayList<String>(margs.size());
for(Object o: margs) {
if(o == null) {
decodedClassNames.add("NULL");
} else {
decodedClassNames.add(o.getClass().getName());
}
}
CallbackProxy.this.comEventCallbackListener.errorReceivingCallbackEvent(
"Exception invoking method " + eventMethod + " supplied: " + decodedClassNames.toString(), e);

List<Object> margs = new ArrayList<Object>();
try {
// Reverse order from calling convention
int lastParamIdx = eventMethod.getParameterTypes().length - 1;
for (int i = lastParamIdx; i >= 0; i--) {
margs.add(rjargs.get(i));
}
eventMethod.invoke(comEventCallbackListener, margs.toArray());
} catch (Exception e) {
List<String> decodedClassNames = new ArrayList<String>(margs.size());
for(Object o: margs) {
if(o == null) {
decodedClassNames.add("NULL");
} else {
decodedClassNames.add(o.getClass().getName());
}
}
};
this.executorService.execute(invokation);
CallbackProxy.this.comEventCallbackListener.errorReceivingCallbackEvent(
"Exception invoking method " + eventMethod + " supplied: " + decodedClassNames.toString(), e);
}
}

@Override
Expand Down Expand Up @@ -237,7 +204,13 @@ public HRESULT Invoke(DISPID dispIdMember, REFIID riid, LCID lcid, WORD wFlags,
DISPPARAMS.ByReference pDispParams, VARIANT.ByReference pVarResult, EXCEPINFO.ByReference pExcepInfo,
IntByReference puArgErr) {

this.invokeOnThread(dispIdMember, riid, lcid, wFlags, pDispParams);
assert (! ComThread.getCurrentThreadIsCOM()) : "Assumption about COM threading broken.";
ComThread.setCurrentThreadIsCOM(true);
try {
this.invokeOnThread(dispIdMember, riid, lcid, wFlags, pDispParams);
} finally {
ComThread.setCurrentThreadIsCOM(false);
}

return WinError.S_OK;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.sun.jna.platform.win32.COM.COMUtils;

public class ComThread {

ExecutorService executor;
Runnable firstTask;
boolean requiresInitialisation;
Expand All @@ -44,11 +43,14 @@ public ComThread(final String threadName, long timeoutMilliseconds, UncaughtExce
this.firstTask = new Runnable() {
@Override
public void run() {
// By definition this is a COM thread
ComThread.setCurrentThreadIsCOM(true);
try {
//If we do not use COINIT_MULTITHREADED, it is necessary to have
// a message loop see -
// [http://www.codeguru.com/cpp/com-tech/activex/apts/article.php/c5529/Understanding-COM-Apartments-Part-I.htm]
// [http://www.codeguru.com/cpp/com-tech/activex/apts/article.php/c5533/Understanding-COM-Apartments-Part-II.htm]
assert ((coinitialiseExFlag & Ole32.COINIT_APARTMENTTHREADED) == 0) : "ComThread only supports MTA threading model";
WinNT.HRESULT hr = Ole32.INSTANCE.CoInitializeEx(null, coinitialiseExFlag);
COMUtils.checkRC(hr);
ComThread.this.requiresInitialisation = false;
Expand Down Expand Up @@ -119,11 +121,38 @@ protected void finalize() throws Throwable {
}
}

// The currentThreadIsCOM is used if wrapper are used in a callback
// the callback is called in a new thread by the JNA runtime. As the
// call comes from COM it is asumed, that the thread is correctly
// initialized and can be used for COM calls (see MTA assumption above)
private static ThreadLocal<Boolean> currentThreadIsCOM = new ThreadLocal<Boolean>();

static void setCurrentThreadIsCOM(boolean isCOM) {
currentThreadIsCOM.set(isCOM);
}

static boolean getCurrentThreadIsCOM() {
Boolean res = currentThreadIsCOM.get();
if(res == null) {
return false;
} else {
return currentThreadIsCOM.get();
}
}

public <T> T execute(Callable<T> task) throws TimeoutException, InterruptedException, ExecutionException {
if (this.requiresInitialisation) {
executor.execute(firstTask);
}
return executor.submit(task).get(this.timeoutMilliseconds, TimeUnit.MILLISECONDS);
if(getCurrentThreadIsCOM()) {
try {
return task.call();
} catch (Exception ex) {
throw new ExecutionException(ex);
}
} else {
if (this.requiresInitialisation) {
executor.execute(firstTask);
}
return executor.submit(task).get(this.timeoutMilliseconds, TimeUnit.MILLISECONDS);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
package com.sun.jna.platform.win32.COM.util;

import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
Expand All @@ -33,6 +30,11 @@
import com.sun.jna.platform.win32.COM.IDispatch;
import com.sun.jna.platform.win32.COM.util.annotation.ComObject;
import com.sun.jna.ptr.PointerByReference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

public class Factory {

Expand All @@ -52,7 +54,6 @@ public void uncaughtException(Thread t, Throwable e) {

public Factory(ComThread comThread) {
this.comThread = comThread;
this.registeredObjects = new WeakHashMap<ProxyObject, Integer>();
}

@Override
Expand Down Expand Up @@ -232,47 +233,43 @@ public WinNT.HRESULT call() throws Exception {
}
}

//factory needs to keep a register of all handles to COM objects so that it can clean them up properly
// (if java had an out of scope clean up destructor like C++, this wouldn't be needed)
WeakHashMap<ProxyObject, Integer> registeredObjects;
// Proxy object release their COM interface reference latest in the
// finalize method, which is run when garbadge collection removes the
// object.
// When the factory is finished, the referenced objects loose their
// environment and can't be used anymore. registeredObjects is used
// to dispose interfaces even if garbadge collection has not yet collected
// the proxy objects.
private final List<WeakReference<ProxyObject>> registeredObjects = new LinkedList<WeakReference<ProxyObject>>();
public void register(ProxyObject proxyObject) {
synchronized (this.registeredObjects) {
//ProxyObject identity resolves to the underlying native pointer value
// different java ProxyObjects will resolve to the same pointer
// thus we need to count the number of references.
if (this.registeredObjects.containsKey(proxyObject)) {
int r = this.registeredObjects.get(proxyObject);
this.registeredObjects.put(proxyObject, r+1);
} else {
this.registeredObjects.put(proxyObject, 1);
}
}
synchronized (this.registeredObjects) {
this.registeredObjects.add(new WeakReference<ProxyObject>(proxyObject));
}
}

public void unregister(ProxyObject proxyObject, int d) {
synchronized (this.registeredObjects) {
if (this.registeredObjects.containsKey(proxyObject)) {
int r = this.registeredObjects.get(proxyObject);
if (r > 1) {
this.registeredObjects.put(proxyObject, r-d);
} else {
this.registeredObjects.remove(proxyObject);
}
} else {
throw new RuntimeException("Tried to dispose a ProxyObject that is not registered");
}

}
}
public void unregister(ProxyObject proxyObject) {
synchronized (this.registeredObjects) {
Iterator<WeakReference<ProxyObject>> iterator = this.registeredObjects.iterator();
while(iterator.hasNext()) {
WeakReference<ProxyObject> weakRef = iterator.next();
ProxyObject po = weakRef.get();
if(po == null || po == proxyObject) {
iterator.remove();
}
}
}
}

public void disposeAll() {
synchronized (this.registeredObjects) {
Set<ProxyObject> s = new HashSet<ProxyObject>(this.registeredObjects.keySet());
for(ProxyObject proxyObject : s) {
int r = this.registeredObjects.get(proxyObject);
proxyObject.dispose(r);
}
this.registeredObjects.clear();
}
synchronized (this.registeredObjects) {
List<WeakReference<ProxyObject>> s = new ArrayList<WeakReference<ProxyObject>>(this.registeredObjects);
for(WeakReference<ProxyObject> weakRef : s) {
ProxyObject po = weakRef.get();
if(po != null) {
po.dispose();
}
}
this.registeredObjects.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
*/
public class ProxyObject implements InvocationHandler, com.sun.jna.platform.win32.COM.util.IDispatch,
IRawDispatchHandle {

public ProxyObject(Class<?> theInterface, IDispatch rawDispatch, Factory factory) {
this.unknownId = -1;
this.rawDispatch = rawDispatch;
Expand Down Expand Up @@ -94,7 +94,7 @@ public ProxyObject(Class<?> theInterface, IDispatch rawDispatch, Factory factory
int n = this.rawDispatch.AddRef();
factory.register(this);
}
// cached value of the IUnknown interface pointer
// Rules of COM state that querying for the IUnknown interface must return
// an identical pointer value
Expand Down Expand Up @@ -137,20 +137,14 @@ public HRESULT call() throws Exception {

@Override
protected void finalize() throws Throwable {
this.dispose(1);
this.dispose();
}

public void dispose(int r) {
if (((Dispatch) this.rawDispatch).getPointer().equals(Pointer.NULL)) {
// do nothing, already disposed
} else {
for (int i = 0; i < r; ++i) {
// catch result to help with debug
int n = this.rawDispatch.Release();
int n2 = n;
}
this.factory.unregister(this, r);
((Dispatch) this.rawDispatch).setPointer(Pointer.NULL);
public synchronized void dispose() {
if (! ((Dispatch) this.rawDispatch).getPointer().equals(Pointer.NULL)) {
this.rawDispatch.Release();
((Dispatch) this.rawDispatch).setPointer(Pointer.NULL);
factory.unregister(this);
}
}

Expand Down
Loading

0 comments on commit 8d60f93

Please sign in to comment.