From 7151b070f78f6796153aeef841c7bdcf437ff651 Mon Sep 17 00:00:00 2001 From: Pascal Christoph Date: Thu, 7 Mar 2019 12:03:37 +0100 Subject: [PATCH] Add ObjectThreader for multithreading using pipelines The ObjectThreader divides incoming objects and passes them to receivers which, in conjunction with an ObjectPipeDecoupler, are running in their own threads. The receivers can be added like Tee receivers. Thus, a multithreaded object pipeline is created. - change methods of DefaultTee to not be final The ObjectThreader makes use of the DefaultTee and overrides two methods, so they musn't be final. See hbz/lobid-resources#967. --- .../flowcontrol/ObjectThreader.java | 67 +++++++++++++++++++ .../framework/helpers/DefaultTee.java | 4 +- 2 files changed, 69 insertions(+), 2 deletions(-) create mode 100644 metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java diff --git a/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java b/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java new file mode 100644 index 000000000..41495378f --- /dev/null +++ b/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java @@ -0,0 +1,67 @@ +/* Copyright 2019 hbz, Pascal Christoph. + * + * Licensed under the Apache License, Version 2.0 the "License"; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.metafacture.flowcontrol; + +import org.metafacture.flowcontrol.ObjectPipeDecoupler; +import org.metafacture.framework.ObjectPipe; +import org.metafacture.framework.ObjectReceiver; +import org.metafacture.framework.Tee; +import org.metafacture.framework.annotations.In; +import org.metafacture.framework.annotations.Out; +import org.metafacture.framework.helpers.DefaultTee; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Divides incoming objects and distributes them to added receivers. These + * receivers are coupled with an + * {@link org.metafacture.flowcontrol.ObjectPipeDecoupler}, so each added + * receiver runs in its own thread. + * + * @param Object type + * + * @author Pascal Christoph(dr0i) + * + */ +@In(Object.class) +@Out(Object.class) +public class ObjectThreader extends DefaultTee> implements ObjectPipe> { + + private static final Logger LOG = LoggerFactory.getLogger(ObjectThreader.class); + private int objectNumber = 0; + + @Override + public void process(final T obj) { + getReceivers().get(objectNumber).process(obj); + if (objectNumber == getReceivers().size() - 1) + objectNumber = 0; + else + objectNumber++; + } + + @Override + public > R setReceiver(final R receiver) { + return super.setReceiver(new ObjectPipeDecoupler().setReceiver(receiver)); + } + + @Override + public Tee> addReceiver(final ObjectReceiver receiver) { + LOG.info("Adding thread " + (getReceivers().size() + 1)); + ObjectPipeDecoupler opd = new ObjectPipeDecoupler<>(); + opd.setReceiver(receiver); + return super.addReceiver(opd); + } +} diff --git a/metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java b/metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java index 26cceadc4..db420ebcf 100644 --- a/metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java +++ b/metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java @@ -35,7 +35,7 @@ public class DefaultTee implements Tee { private final List receivers = new ArrayList(); @Override - public final R setReceiver(final R receiver) { + public R setReceiver(final R receiver) { receivers.clear(); receivers.add(receiver); onChangeReceivers(); @@ -52,7 +52,7 @@ public final R setReceivers(final R receiver, final T lateralRecei } @Override - public final Tee addReceiver(final T receiver) { + public Tee addReceiver(final T receiver) { receivers.add(receiver); onChangeReceivers(); return this;