diff --git a/src/main/java/org/culturegraph/mf/stream/converter/TriplesToStream.java b/src/main/java/org/culturegraph/mf/stream/converter/TriplesToStream.java new file mode 100644 index 000000000..b62bb10d2 --- /dev/null +++ b/src/main/java/org/culturegraph/mf/stream/converter/TriplesToStream.java @@ -0,0 +1,51 @@ +/** + * + */ +package org.culturegraph.mf.stream.converter; + +import org.culturegraph.mf.formeta.parser.FormetaParser; +import org.culturegraph.mf.formeta.parser.PartialRecordEmitter; +import org.culturegraph.mf.framework.DefaultObjectPipe; +import org.culturegraph.mf.framework.StreamReceiver; +import org.culturegraph.mf.framework.annotations.Description; +import org.culturegraph.mf.framework.annotations.In; +import org.culturegraph.mf.framework.annotations.Out; +import org.culturegraph.mf.types.Triple; +import org.culturegraph.mf.types.Triple.ObjectType; + +/** + * @author schaeferd + * + */ +@Description("Converts a triple into a record stream") +@In(Triple.class) +@Out(StreamReceiver.class) +public final class TriplesToStream extends + DefaultObjectPipe { + + private final FormetaParser parser = new FormetaParser(); + private final PartialRecordEmitter emitter = new PartialRecordEmitter(); + + public TriplesToStream() { + parser.setEmitter(emitter); + } + + public void process(final Triple triple) { + getReceiver().startRecord(triple.getSubject()); + if(triple.getObjectType() == ObjectType.STRING){ + getReceiver().literal(triple.getPredicate(), triple.getObject()); + }else if (triple.getObjectType() == ObjectType.ENTITY){ + emitter.setDefaultName(triple.getPredicate()); + parser.parse(triple.getObject()); + }else{ + throw new UnsupportedOperationException(triple.getObjectType() + " can not yet be decoded"); + } + getReceiver().endRecord(); + } + + @Override + protected void onSetReceiver() { + emitter.setReceiver(getReceiver()); + } + +} diff --git a/src/main/resources/flux-commands.properties b/src/main/resources/flux-commands.properties index c37aedba3..2a0218d50 100644 --- a/src/main/resources/flux-commands.properties +++ b/src/main/resources/flux-commands.properties @@ -15,7 +15,7 @@ count-triples org.culturegraph.mf.stream.pipe.sort.TripleCount collect-triples org.culturegraph.mf.stream.pipe.sort.TripleCollect stream-to-triples org.culturegraph.mf.stream.converter.StreamToTriples filter-triples org.culturegraph.mf.stream.pipe.TripleFilter - +triples-to-stream org.culturegraph.mf.stream.converter.TriplesToStream calculate-metrics org.culturegraph.mf.stream.pipe.stat.CooccurrenceMetricCalculator jscript org.culturegraph.mf.stream.pipe.JScriptObjectPipe