Skip to content

Commit

Permalink
Make TLS serialisation optional.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikehearn committed Mar 16, 2016
1 parent 60365a5 commit 9c0b63a
Showing 1 changed file with 60 additions and 27 deletions.
87 changes: 60 additions & 27 deletions quasar-core/src/main/java/co/paralleluniverse/fibers/Fiber.java
Original file line number Diff line number Diff line change
Expand Up @@ -1934,8 +1934,20 @@ public static <V> Fiber<V> unparkDeserialized(Fiber<V> f, FiberScheduler schedul
* Returns a {@link ByteArraySerializer} capable of serializing an object graph containing fibers.
*/
public static ByteArraySerializer getFiberSerializer() {
return getFiberSerializer(true);
}

/**
* Returns a {@link ByteArraySerializer} capable of serializing an object graph containing fibers.
*
* @param includeThreadLocals if true, thread/fiber local storage slots will also be serialised.
* You may want to set this to false if you are using frameworks that put
* things that cannot be properly serialised into TLS slots, or if the feature
* causes other issues.
*/
public static ByteArraySerializer getFiberSerializer(boolean includeThreadLocals) {
final KryoSerializer s = new KryoSerializer();
s.getKryo().addDefaultSerializer(Fiber.class, new FiberSerializer());
s.getKryo().addDefaultSerializer(Fiber.class, new FiberSerializer(includeThreadLocals));
s.getKryo().addDefaultSerializer(ThreadLocal.class, new ThreadLocalSerializer());
s.getKryo().addDefaultSerializer(FiberWriter.class, new FiberWriterSerializer());
s.getKryo().register(Fiber.class);
Expand All @@ -1947,38 +1959,59 @@ public static ByteArraySerializer getFiberSerializer() {
}

private static class FiberSerializer extends Serializer<Fiber> {
public FiberSerializer() {
private boolean includeThreadLocals;

public FiberSerializer(boolean includeThreadLocals) {
this.includeThreadLocals = includeThreadLocals;
setImmutable(true);
}

@Override
@SuppressWarnings("CallToPrintStackTrace")
@SuppressWarnings({"CallToPrintStackTrace", "unchecked"})
public void write(Kryo kryo, Output output, Fiber f) {
final Thread currentThread = Thread.currentThread();
final Object tmpThreadLocals = ThreadAccess.getThreadLocals(currentThread);
final Object tmpInheritableThreadLocals = ThreadAccess.getInheritableThreadLocals(currentThread);
ThreadAccess.setThreadLocals(currentThread, f.fiberLocals);
ThreadAccess.setInheritableThreadLocals(currentThread, f.inheritableFiberLocals);
Object realFiberLocals = f.fiberLocals;
Object realInheritableFiberLocals = f.inheritableFiberLocals;
try {
// Switch the type of fiberLocals here to Object[] from ThreadLocalMap, to ease serialisation.
// We must switch it back when we are done.
f.fiberLocals = realFiberLocals != null ? ThreadAccess.toMap(realFiberLocals).keySet().toArray() : null;
f.inheritableFiberLocals = realInheritableFiberLocals != null ?
ThreadAccess.toMap(realInheritableFiberLocals).keySet().toArray() : null;
f.stack.resumeStack();

kryo.writeClass(output, f.getClass());
new FieldSerializer(kryo, f.getClass()).write(kryo, output, f);
} catch (Throwable t) {
t.printStackTrace();
throw t;
} finally {
f.fiberLocals = realFiberLocals;
f.inheritableFiberLocals = realInheritableFiberLocals;
ThreadAccess.setThreadLocals(currentThread, tmpThreadLocals);
ThreadAccess.setInheritableThreadLocals(currentThread, tmpInheritableThreadLocals);

// If we need to serialise thread local storage slots as well, then we have to do a swap to avoid
// type problems. If we don't, then it's much easier to serialise things.
if (!includeThreadLocals) {
Object tmpFiberLocals = f.fiberLocals;
Object tmpInheritableFiberLocals = f.inheritableFiberLocals;
f.fiberLocals = null;
f.inheritableFiberLocals = null;
try {
f.stack.resumeStack();
kryo.writeClass(output, f.getClass());
new FieldSerializer(kryo, f.getClass()).write(kryo, output, f);
} finally {
f.fiberLocals = tmpFiberLocals;
f.inheritableFiberLocals = tmpInheritableFiberLocals;
}
} else {
final Object tmpThreadLocals = ThreadAccess.getThreadLocals(currentThread);
final Object tmpInheritableThreadLocals = ThreadAccess.getInheritableThreadLocals(currentThread);
ThreadAccess.setThreadLocals(currentThread, f.fiberLocals);
ThreadAccess.setInheritableThreadLocals(currentThread, f.inheritableFiberLocals);
Object realFiberLocals = f.fiberLocals;
Object realInheritableFiberLocals = f.inheritableFiberLocals;
try {
// Switch the type of fiberLocals here to Object[] from ThreadLocalMap, to ease serialisation.
// We must switch it back when we are done.
f.fiberLocals = realFiberLocals != null ? ThreadAccess.toMap(realFiberLocals).keySet().toArray() : null;
f.inheritableFiberLocals = realInheritableFiberLocals != null ?
ThreadAccess.toMap(realInheritableFiberLocals).keySet().toArray() : null;
f.stack.resumeStack();

kryo.writeClass(output, f.getClass());
new FieldSerializer(kryo, f.getClass()).write(kryo, output, f);
} catch (Throwable t) {
t.printStackTrace();
throw t;
} finally {
f.fiberLocals = realFiberLocals;
f.inheritableFiberLocals = realInheritableFiberLocals;
ThreadAccess.setThreadLocals(currentThread, tmpThreadLocals);
ThreadAccess.setInheritableThreadLocals(currentThread, tmpInheritableThreadLocals);
}
}
}

Expand Down

0 comments on commit 9c0b63a

Please sign in to comment.