Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a camel 'Pipeline' #449

Closed
dannylamb opened this issue Dec 7, 2016 · 29 comments
Closed

Create a camel 'Pipeline' #449

dannylamb opened this issue Dec 7, 2016 · 29 comments
Assignees

Comments

@dannylamb
Copy link
Contributor

Similar to #448

Create a small camel route that sends a message through a dynamic list of recipients, with the output of each being passed as input to the next. This can be accomplished using the Routing Slip and Request / Reply patterns. The list should be read from a header, which you can assume comes with the message.

Check out this gist for a POC of this concept.

@dannylamb
Copy link
Contributor Author

FYI, this is how we solve the "what derivatives go where" conundrum for everybody. Making a derivative would become a list of operations, and the user could configure the last one to ingest into Fedora, Drupal, or both. We're gonna get some insane flexibility from this.

@whikloj
Copy link
Member

whikloj commented Jan 20, 2017

Couple confirmation questions:

  1. Java DSL or Spring DSL?
  2. Gradle
  3. Final list of routes supplied as a header, which header?

@ajs6f
Copy link

ajs6f commented Jan 20, 2017

This seems like it's entirely overlapping with fcrepo-camel-toolbox. Is there a danger of duplicated effort here? If it's a question of synchronous workflow as opposed to asynchronous, @acoburn and I have already begun exploring thoughts about synchronous repository-driven workflows, but using the standardized repo services and not anything new.

@whikloj
Copy link
Member

whikloj commented Jan 20, 2017

@ajs6f I think this is far too simple to be part of fcrepo-camel-toolbox. This would be used to have end-users in Drupal decide which endpoints (out of a curated list) to send a request to. Those endpoints could be fcrepo-camel-toolbox, but more likely the API-X derivative service @acoburn is (apparently) working on.

@ajs6f
Copy link

ajs6f commented Jan 20, 2017

I'm not asking about this exact piece of technology-- I'm asking about its uses. IOW, it's hard for me to imagine a use for this piece of work that wouldn't be better served as part of -camel-toolbox, but maybe I am misunderstanding the expected kinds of requests it would receive. Is the work here not going to fall under the general heading of "make and store derivatives"?

@dannylamb
Copy link
Contributor Author

@whikloj

  1. Java
  2. Gradle
  3. With the broadcaster we used IslandoraBroadcastRecipients, so IslandoraPipelineRecipients?

@ajs6f Yes, the end result here could be used to create and store derivatives, or any other sequence of well defined asynchronous operations. The long term goal for me (post MVP) is to provide config entities per asynchronous operation with nice descriptions and labels. That way we can work with them in a UI so repository admins can control their asynchronous workflows outside of karaf config files.

If you think this core piece would be better suited for fcrepo-camel-toolbox, that's a conversation I'll gladly have.

@whikloj
Copy link
Member

whikloj commented Jan 20, 2017

@ajs6f Oh I see what you mean, yes I guess this could fit in fcrepo-camel-toolbox. We could have the header name be configurable. What does @acoburn think of that...or is there maybe a feature already in existence?

@ajs6f
Copy link

ajs6f commented Jan 20, 2017

Yeah, that's where I was going, and to be clear, I'm not claiming that this is a sure thing for -camel-toolbox, just that it might be and we should have the conversation to which @dannylamb refers lest anyone do unneedful work. Not enough hours in the day to let that happen.

@dannylamb
Copy link
Contributor Author

dannylamb commented Jan 20, 2017

@ajs6f At the end of the day, this is nothing more than a question of where do we deposit the work. I'm not expecting the greater community to do that work for us.

@ajs6f
Copy link

ajs6f commented Jan 20, 2017

Sure, and where you deposit the work has a significant impact on how expensive it is and its future maintenance costs. This ticket might be developing new Camel gear, or it might be merely configuring Camel gear the maintenance of which is a shared expense. It's worth asking that question. I think we might easily overestimating the novelty of this need.

@dannylamb
Copy link
Contributor Author

@ajs6f Fair enough.

@ajs6f
Copy link

ajs6f commented Jan 23, 2017

And having shoved my neck out, I'll bring the axe down by taking this ticket through the exploratory stage that I just vociferously demanded. I'll take responsibility for talking with @acoburn and determining what, if any, we can get done as part of the larger -camel-toolbox context and recording that (with any design notes) here. I don't mean by that to slow anyone down or confuse any scheduling or logistics, and if that seems inappropriate, I can take my hands off and leave it be.

@ajs6f
Copy link

ajs6f commented Jan 23, 2017

I just don't want to basically come in here and say @whikloj, I demand that you work this ticket in the way that I think is best. That's not cool.

@acoburn
Copy link
Contributor

acoburn commented Jan 23, 2017

Let me preface this by saying that I haven't thought this through in any sort of rigorous manner, but let me argue the other side for now, namely that such a pipeline should not go into fcrepo-camel-toolbox.

  • First, the fcrepo-camel-toolbox release schedule is pretty slow, putting application-specific code there will make it harder to make application-specific changes. As a case in point, the next fcrepo-camel-toolbox release will probably happen only after the Fedora community impl is aligned to the API specification.
  • Second, the fcrepo-camel-toolbox code is really designed for very generic sorts of workflows; I am going to guess that some of the details here aren't quite so generic.
  • Third, if some of the structure of this pipeline is really generic, I'd highly recommend the sort of structure I've adopted w/r/t fcrepo-ldpath. The code there is useful and pretty generic, but it doesn't work with certain endpoints (e.g. Getty), so I've extended our local code to build on the fcrepo-ldpath structure while allowing us to support Getty endpoints (and I will argue strongly that this sort of "special purpose, endpoint-specific" code should not go into fcrepo-ldpath)

@ajs6f
Copy link

ajs6f commented Jan 23, 2017

@acoburn, this all makes sense-- but it's exactly your third point/recommendation that is what I am suggesting here. I'm suggesting that either we find the generic pattern in -camel-toolbox or we offer it as a contribution to -camel-toolbox of the ilk of fcrepo-ldpath. (Like fcrepo-simple-chained-configurable-workflow or... maybe a better name.) Then the CLAW-specific stuff (e.g. dealing with Drupal) would stay CLAW-side, but some non-CLAW site might use the basic recipe for another purpose (e.g. maybe for dropping preservation copies into some other store or that sort of thing).

Does that makes sense to you, @acoburn, or am I missing your point? Thanks for checking in with this discussion!

@acoburn
Copy link
Contributor

acoburn commented Jan 23, 2017

@ajs6f if your suggestion here follows what I describe in the third point, then, yes, I would completely support that.

@ajs6f
Copy link

ajs6f commented Jan 23, 2017

@acoburn I think that it does. The crucial question is: can we extract the generic pattern in this ticket as a contribution to -camel-toolkit? I believe that we can. I suspect that it is something like:

"Configure a chained series of simple (i.e. one Camel endpoint) steps, each feeding the next. This pipeline is fed by resources that change in the repository as they change, in the usual -camel-toolkit way. The output of the last step goes to some configurable Camel endpoint, by default back into the repository to some location related to the location of the original resource in some simple, well-known way."

@dannylamb @whikloj Does that sound like it captures a generic form of what you are trying to do here? Now that I read it back, it sounds almost like a generic version of several of the -camel-toolbox recipes. I certainly am not trying to drag us down the road of building a generic workflow engine, but I do want to spot any architectural wins we pass on whatever road on which we are all traveling together.

@dannylamb
Copy link
Contributor Author

To go through @acoburn's points:

  • The release cycle could turn into a real concern
  • The specifics of it are that the endpoints are passed in as a header, which comes from a UI like in this PR I have going: Content model objects in Islandora/F4 #31. You could always make the header configurable, but you could also just as easily put the endpoints in osgi config.
  • And I'll extend/layer on whatever is necessary if it's determined there's a core nugget of common use. I'm fine with that.

@ajs6f You should also see https://github.com/Islandora-CLAW/Alpaca/tree/master/islandora-connector-broadcast and https://gitlab.amherst.edu/acdc/repository-extension-services/tree/master/acrepo-connector-broadcast, which are pretty similar and differ in the manner described above.

@dannylamb
Copy link
Contributor Author

dannylamb commented Jan 23, 2017

@ajs6f To more directly answer the question you posed, that is pretty much exactly the long-term goal, except there's folks in the islandora audience who will want the derivative stored just in drupal, and not in fedora. That usually comes from people with large binaries like video.

So that's a curveball from our end. But then again, if we can just layer in our step at the end, or swap something out, etc...

@acoburn
Copy link
Contributor

acoburn commented Jan 23, 2017

Here's a suggestion for moving forward, which is pretty much what @bseeger and I do at Amherst:

  1. Implement whatever you need to implement yourself in a CLAW-based repo. Our corollary at Amherst is https://gitlab.amherst.edu/acdc/repository-extension-services
  2. If there is a nugget in that implementation that could be extracted into a more general bit of code, push that into fcrepo-camel-toolbox.

That has been the process for much of what is currently in fcrepo-camel-toolbox.

@ajs6f
Copy link

ajs6f commented Jan 23, 2017

@acoburn Yes, that's the concrete MO. The reason I wanted to start this discussion earlier is to clarify whether there might be such a nugget. It sounds like there is, so whoever is doing this ticket should have that in mind. First things first, the functionality has to work for CLAW, but it would be good to keep the longer term in mind.

@whikloj
Copy link
Member

whikloj commented Feb 8, 2017

I am going to un-assign myself from this. I think there are others that could benefit from this small well defined need for some Camel. @Natkeeran??

Also based on @acoburn's statement above, I think we go ahead and implement this for CLAW and if we discover some generalities we can open a discussion at that point.

@whikloj whikloj removed their assignment Feb 8, 2017
@whikloj whikloj assigned whikloj and unassigned whikloj Feb 22, 2017
@Natkeeran
Copy link
Contributor

Natkeeran commented Mar 3, 2017

@dannylamb

I am trying out your POC.

While implementing it in Java DSL, I am getting the following exception. Any suggestion as to the causes:

package com.lab.hellocamel;

import java.io.File;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.FileComponent;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.pollconsumer.quartz2.QuartzScheduledPollConsumerScheduler;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;

public class App 
{
	
    public static final String URL        = "tcp://localhost:61616";
	
    public static void main( String[] args ) throws Exception
    {
    	
        System.out.println( "Testing Apache Camel");
        
        Logger logger = Logger.getLogger(App.class);
        
        String log4JPropertyFile = "/home/nat/workspace/hellocamel/resources/log4j.properties";
        PropertyConfigurator.configure(log4JPropertyFile);
        
        ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory( URL );
        
        CamelContext context = new DefaultCamelContext();
        context.addComponent("jms",
                JmsComponent.jmsComponentAutoAcknowledge(connFactory));
        
        context.addRoutes(new RouteBuilder() {

	   public void configure() {
                from("timer:foo?period=1s")
                .setHeader("slip", simple("jms:queue:activemq/queue/testA,jms:queue:activemq/queue/testB"))
                .setBody(simple("Message222 at ${date:now:yyyy-MM-dd HH:mm:ss}"))
                .setExchangePattern(ExchangePattern.InOut)
                .log("FINISHED: ${body}")
                .routingSlip(header("slip"), ",")
                .to("jms:queue:activemq/queue/testA")
                .transform(simple("DEFP"))
                .to("jms:queue:activemq/queue/testB")
                .log("FROM A: ${body}")
                .transform(simple("HERP"));

            }
        });
        
        context.start();
        Thread.sleep(5000);
        context.stop();

    }
}
Testing Apache Camel
15:34| INFO | DefaultCamelContext.java 2418 | Apache Camel 2.15.1 (CamelContext: camel-1) is starting
15:34| INFO | ManagedManagementStrategy.java 187 | JMX is enabled
15:34| INFO | DefaultTypeConverter.java 56 | Loaded 183 type converters
15:34| INFO | DefaultCamelContext.java 2633 | AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance.
15:34| INFO | DefaultCamelContext.java 2643 | StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
15:34| INFO | DefaultCamelContext.java 3164 | Route: route1 started and consuming from: Endpoint[timer://foo?period=1s]
15:34| INFO | DefaultCamelContext.java 2453 | Total 1 routes, of which 1 is started.
15:34| INFO | DefaultCamelContext.java 2454 | Apache Camel 2.15.1 (CamelContext: camel-1) started in 0.242 seconds
15:34| INFO | MarkerIgnoringBase.java 95 | FINISHED: Message222 at 2017-03-03 15:34:30
15:34| ERROR | MarkerIgnoringBase.java 159 | Failed delivery for (MessageId: ID-natubuntu-33252-1488573268679-0-1 on ExchangeId: ID-natubuntu-33252-1488573268679-0-2). Exhausted after delivery attempt: 1 caught: org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[Message: Message222 at 2017-03-03 15:34:30]

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [timer://foo?period=1s                                                         ] [       196]
[route1            ] [setHeader1        ] [setHeader[slip]                                                               ] [         2]
[route1            ] [setBody1          ] [setBody[simple{Simple: Message222 at ${date:now:yyyy-MM-dd HH:mm:ss}}]        ] [         1]
[route1            ] [setExchangePattern] [setExchangePattern[InOut]                                                     ] [         0]
[route1            ] [log1              ] [log                                                                           ] [         1]
[route1            ] [routingSlip1      ] [routingSlip[header{header(slip)}]                                             ] [       190]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
	Id                  ID-natubuntu-33252-1488573268679-0-2
	ExchangePattern     InOut
	Headers             {breadcrumbId=ID-natubuntu-33252-1488573268679-0-1, CamelRedelivered=false, CamelRedeliveryCounter=0, firedTime=Fri Mar 03 15:34:30 EST 2017, JMSCorrelationID=Camel-ID-natubuntu-33252-1488573268679-0-3, slip=jms:queue:activemq/queue/testA,jms:queue:activemq/queue/testB}
	BodyType            String
	Body                Message222 at 2017-03-03 15:34:30
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[Message: Message222 at 2017-03-03 15:34:30]
	at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1635)
	at org.apache.camel.impl.DefaultExchange.setException(DefaultExchange.java:308)
	at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:158)
	at org.apache.camel.processor.RoutingSlip$2.doInAsyncProducer(RoutingSlip.java:301)
	at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:304)
	at org.apache.camel.processor.RoutingSlip.processExchange(RoutingSlip.java:294)
	at org.apache.camel.processor.RoutingSlip.doRoutingSlip(RoutingSlip.java:215)
	at org.apache.camel.processor.RoutingSlip.process(RoutingSlip.java:146)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
	at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:165)
	at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:73)
	at java.util.TimerThread.mainLoop(Timer.java:555)
	at java.util.TimerThread.run(Timer.java:505)
Caused by: java.lang.NoSuchMethodError: org.apache.camel.support.DefaultTimeoutMap.put(Ljava/lang/Object;Ljava/lang/Object;J)V
	at org.apache.camel.component.jms.reply.CorrelationTimeoutMap.put(CorrelationTimeoutMap.java:86)
	at org.apache.camel.component.jms.reply.TemporaryQueueReplyManager.registerReply(TemporaryQueueReplyManager.java:64)
	at org.apache.camel.component.jms.JmsProducer$1.createMessage(JmsProducer.java:221)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:274)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:217)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:231)
	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:228)
	at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:427)
	at org.apache.camel.component.jms.JmsProducer.processInOut(JmsProducer.java:233)
	at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:150)
	... 15 more
15:34| WARN | MarkerIgnoringBase.java 135 | Error processing exchange. Exchange[Message: Message222 at 2017-03-03 15:34:30]. Caused by: [org.apache.camel.CamelExecutionException - Exception occurred during execution on the exchange: Exchange[Message: Message222 at 2017-03-03 15:34:30]]
org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[Message: Message222 at 2017-03-03 15:34:30]
	at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1635)
	at org.apache.camel.impl.DefaultExchange.setException(DefaultExchange.java:308)
	at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:158)
	at org.apache.camel.processor.RoutingSlip$2.doInAsyncProducer(RoutingSlip.java:301)
	at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:304)
	at org.apache.camel.processor.RoutingSlip.processExchange(RoutingSlip.java:294)
	at org.apache.camel.processor.RoutingSlip.doRoutingSlip(RoutingSlip.java:215)
	at org.apache.camel.processor.RoutingSlip.process(RoutingSlip.java:146)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
	at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:165)
	at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:73)
	at java.util.TimerThread.mainLoop(Timer.java:555)
	at java.util.TimerThread.run(Timer.java:505)
Caused by: java.lang.NoSuchMethodError: org.apache.camel.support.DefaultTimeoutMap.put(Ljava/lang/Object;Ljava/lang/Object;J)V
	at org.apache.camel.component.jms.reply.CorrelationTimeoutMap.put(CorrelationTimeoutMap.java:86)
	at org.apache.camel.component.jms.reply.TemporaryQueueReplyManager.registerReply(TemporaryQueueReplyManager.java:64)
	at org.apache.camel.component.jms.JmsProducer$1.createMessage(JmsProducer.java:221)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:274)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:217)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:231)
	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:228)
	at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:427)
	at org.apache.camel.component.jms.JmsProducer.processInOut(JmsProducer.java:233)
	at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:150)
	... 15 more

@whikloj
Copy link
Member

whikloj commented Mar 3, 2017

@Natkeeran Not sure if this is helpful at all, but here is a Java DSL camel route I route wrote. This is a WAR webapp though.
https://github.com/uml-digitalinitiatives/derivative-route/blob/master/src/main/java/ca/umanitoba/dam/DerivativeRoute.java

@dannylamb
Copy link
Contributor Author

@Natkeeran I'd try and set this up using blueprint within Alpaca and its build process. You may be down a sinkhole you wouldn't encounter in that context.

If you 'd like, I can help you get that set up with a feature and all.

@dannylamb
Copy link
Contributor Author

And just after posting that I see the problem:
.routingSlip(header("slip"), ",") should just be .routingSlip("slip", ",")

@dannylamb
Copy link
Contributor Author

@Natkeeran Offer still stands if you want help getting that set up all the way with a feature. Just tag me.

@dannylamb
Copy link
Contributor Author

@Natkeeran I've done some explorations after our last few rounds of discussion, and I think we can merge the broadcaster and pipeline code if we make the exchange pattern come from a header. Tag me or skype me if you want to discuss this.

Basically just take the work you've already done and replace the broadcaster with it, hacking out the ExchangePattern (InOnly, InOut) from a header.

This was referenced Mar 14, 2017
@dannylamb
Copy link
Contributor Author

Resolved with Islandora/Alpaca@558ef69

Additional changes to be made in a subsequent ticket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants