From 7151b070f78f6796153aeef841c7bdcf437ff651 Mon Sep 17 00:00:00 2001 From: Pascal Christoph Date: Thu, 7 Mar 2019 12:03:37 +0100 Subject: [PATCH 1/2] Add ObjectThreader for multithreading using pipelines The ObjectThreader divides incoming objects and passes them to receivers which, in conjunction with an ObjectPipeDecoupler, are running in their own threads. The receivers can be added like Tee receivers. Thus, a multithreaded object pipeline is created. - change methods of DefaultTee to not be final The ObjectThreader makes use of the DefaultTee and overrides two methods, so they musn't be final. See hbz/lobid-resources#967. --- .../flowcontrol/ObjectThreader.java | 67 +++++++++++++++++++ .../framework/helpers/DefaultTee.java | 4 +- 2 files changed, 69 insertions(+), 2 deletions(-) create mode 100644 metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java diff --git a/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java b/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java new file mode 100644 index 000000000..41495378f --- /dev/null +++ b/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java @@ -0,0 +1,67 @@ +/* Copyright 2019 hbz, Pascal Christoph. + * + * Licensed under the Apache License, Version 2.0 the "License"; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.metafacture.flowcontrol; + +import org.metafacture.flowcontrol.ObjectPipeDecoupler; +import org.metafacture.framework.ObjectPipe; +import org.metafacture.framework.ObjectReceiver; +import org.metafacture.framework.Tee; +import org.metafacture.framework.annotations.In; +import org.metafacture.framework.annotations.Out; +import org.metafacture.framework.helpers.DefaultTee; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Divides incoming objects and distributes them to added receivers. These + * receivers are coupled with an + * {@link org.metafacture.flowcontrol.ObjectPipeDecoupler}, so each added + * receiver runs in its own thread. + * + * @param Object type + * + * @author Pascal Christoph(dr0i) + * + */ +@In(Object.class) +@Out(Object.class) +public class ObjectThreader extends DefaultTee> implements ObjectPipe> { + + private static final Logger LOG = LoggerFactory.getLogger(ObjectThreader.class); + private int objectNumber = 0; + + @Override + public void process(final T obj) { + getReceivers().get(objectNumber).process(obj); + if (objectNumber == getReceivers().size() - 1) + objectNumber = 0; + else + objectNumber++; + } + + @Override + public > R setReceiver(final R receiver) { + return super.setReceiver(new ObjectPipeDecoupler().setReceiver(receiver)); + } + + @Override + public Tee> addReceiver(final ObjectReceiver receiver) { + LOG.info("Adding thread " + (getReceivers().size() + 1)); + ObjectPipeDecoupler opd = new ObjectPipeDecoupler<>(); + opd.setReceiver(receiver); + return super.addReceiver(opd); + } +} diff --git a/metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java b/metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java index 26cceadc4..db420ebcf 100644 --- a/metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java +++ b/metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java @@ -35,7 +35,7 @@ public class DefaultTee implements Tee { private final List receivers = new ArrayList(); @Override - public final R setReceiver(final R receiver) { + public R setReceiver(final R receiver) { receivers.clear(); receivers.add(receiver); onChangeReceivers(); @@ -52,7 +52,7 @@ public final R setReceivers(final R receiver, final T lateralRecei } @Override - public final Tee addReceiver(final T receiver) { + public Tee addReceiver(final T receiver) { receivers.add(receiver); onChangeReceivers(); return this; From 87b0b982e03b363c2d0e762007087bc7429d67ef Mon Sep 17 00:00:00 2001 From: Pascal Christoph Date: Fri, 15 Mar 2019 13:40:51 +0100 Subject: [PATCH 2/2] Use put() instead of add() While add() throws an IllegalStateException if the Queue is full, a put() just waits until there is space left in Queue. - add "return" The usage of "return" is encouraged to make sure to stop a thread when it's interrupted. See #980. --- .../org/metafacture/flowcontrol/ObjectPipeDecoupler.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectPipeDecoupler.java b/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectPipeDecoupler.java index 9b4fda69c..a9882dbe9 100644 --- a/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectPipeDecoupler.java +++ b/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectPipeDecoupler.java @@ -97,12 +97,12 @@ public > R setReceiver(final R receiver) { @Override public void resetStream() { - queue.add(Feeder.BLUE_PILL); + queue.put(Feeder.BLUE_PILL); } @Override public void closeStream() { - queue.add(Feeder.RED_PILL); + queue.put(Feeder.RED_PILL); try { thread.join(); } catch (InterruptedException e) { @@ -147,6 +147,7 @@ public void run() { } } catch (InterruptedException e) { Thread.currentThread().interrupt(); + return; } } }