diff --git a/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java b/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java new file mode 100644 index 000000000..41495378f --- /dev/null +++ b/metafacture-flowcontrol/src/main/java/org/metafacture/flowcontrol/ObjectThreader.java @@ -0,0 +1,67 @@ +/* Copyright 2019 hbz, Pascal Christoph. + * + * Licensed under the Apache License, Version 2.0 the "License"; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.metafacture.flowcontrol; + +import org.metafacture.flowcontrol.ObjectPipeDecoupler; +import org.metafacture.framework.ObjectPipe; +import org.metafacture.framework.ObjectReceiver; +import org.metafacture.framework.Tee; +import org.metafacture.framework.annotations.In; +import org.metafacture.framework.annotations.Out; +import org.metafacture.framework.helpers.DefaultTee; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Divides incoming objects and distributes them to added receivers. These + * receivers are coupled with an + * {@link org.metafacture.flowcontrol.ObjectPipeDecoupler}, so each added + * receiver runs in its own thread. + * + * @param Object type + * + * @author Pascal Christoph(dr0i) + * + */ +@In(Object.class) +@Out(Object.class) +public class ObjectThreader extends DefaultTee> implements ObjectPipe> { + + private static final Logger LOG = LoggerFactory.getLogger(ObjectThreader.class); + private int objectNumber = 0; + + @Override + public void process(final T obj) { + getReceivers().get(objectNumber).process(obj); + if (objectNumber == getReceivers().size() - 1) + objectNumber = 0; + else + objectNumber++; + } + + @Override + public > R setReceiver(final R receiver) { + return super.setReceiver(new ObjectPipeDecoupler().setReceiver(receiver)); + } + + @Override + public Tee> addReceiver(final ObjectReceiver receiver) { + LOG.info("Adding thread " + (getReceivers().size() + 1)); + ObjectPipeDecoupler opd = new ObjectPipeDecoupler<>(); + opd.setReceiver(receiver); + return super.addReceiver(opd); + } +} diff --git a/metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java b/metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java index 26cceadc4..db420ebcf 100644 --- a/metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java +++ b/metafacture-framework/src/main/java/org/metafacture/framework/helpers/DefaultTee.java @@ -35,7 +35,7 @@ public class DefaultTee implements Tee { private final List receivers = new ArrayList(); @Override - public final R setReceiver(final R receiver) { + public R setReceiver(final R receiver) { receivers.clear(); receivers.add(receiver); onChangeReceivers(); @@ -52,7 +52,7 @@ public final R setReceivers(final R receiver, final T lateralRecei } @Override - public final Tee addReceiver(final T receiver) { + public Tee addReceiver(final T receiver) { receivers.add(receiver); onChangeReceivers(); return this;