`
Everyday都不同
  • 浏览: 714531 次
  • 性别: Icon_minigender_1
  • 来自: 宇宙
社区版块
存档分类
最新评论

Camel流程扩展探索——多个流程是否可行?

阅读更多

camel框架是一个成熟的流程框架,一般而言我们只是把它应用到一个完整的流程中,而一些逻辑的分支也是在“一个”流程中去控制的。现在如果在流程的源头就需要分支,即拿到源数据,

但是我们需要走不同的流程。(这里不再局限在“一个”流程了!),是否可行呢?下面来探讨。

 

首先,建立流程的配置:

context-route.xml配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:drools="http://drools.org/schema/drools-spring"
	xmlns:camel="http://camel.apache.org/schema/spring"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://drools.org/schema/drools-spring http://anonsvn.jboss.org/repos/labs/labs/jbossrules/trunk/drools-container/drools-spring/src/main/resources/org/drools/container/spring/drools-spring-1.0.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
	
	<bean id="fileConverter" class="com.kayak.itmon.web.demo.camel.FileConvertProcessor" />
	<bean id="fileConverter2" class="com.kayak.itmon.web.demo.camel.FileConvertProcessor2" />
	<bean id="fileConverter3" class="com.kayak.itmon.web.demo.camel.FileConvertProcessor3" />
	
	<camelContext id="camel" autoStartup="true" xmlns="http://camel.apache.org/schema/spring">
		 <template id="producerTemplate" />
		<threadPool id="pool" threadName="Thread-dataformat"
			poolSize="50" maxPoolSize="200" maxQueueSize="250" rejectedPolicy="CallerRuns" />
		
		 
		 <route>  
            <from uri="direct://AstartData"/>  
            <process ref="A1"/>  
            <to uri="direct://startA" />
            
        </route> 
        
         <route> 
	        <from uri="direct://startA"/>  
         	<threads executorServiceRef="pool">
         		<process ref="A2" />
         	</threads> 
        </route>
        
        <!-- B流程 -->
         <route>  
            <from uri="direct://BstartData"/>  
            <process ref="B1"/>  
            <to uri="direct://startB" />
        </route> 
        
         <route> 
	        <from uri="direct://startB"/>  
         	<threads executorServiceRef="pool">
         		<process ref="B2" />
         	</threads> 
        </route>
	</camelContext>
	
	<!-- camel的一些bean -->
	<bean id="A1" class="com.kayak.itmon.web.demo.camel.ACamel1"></bean>
	<bean id="A2" class="com.kayak.itmon.web.demo.camel.ACamel2"></bean>
	<bean id="B1" class="com.kayak.itmon.web.demo.camel.BCamel1"></bean>
	<bean id="B2" class="com.kayak.itmon.web.demo.camel.BCamel2"></bean>
</beans>

 

其中,流程类如下

public class ACamel1 implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {
		String ret = (String) exchange.getIn().getBody();
		System.out.println("进入到A流程的第一步。。 " + ret);
		ret = ret.toUpperCase();
		exchange.getIn().setBody(ret);
	}

}

public class ACamel2 implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {
		String ret = (String) exchange.getIn().getBody();
		System.out.println("进入到A流程的第二步。。" + ret);
	}

}

public class BCamel1 implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {
		String ret = (String) exchange.getIn().getBody();
		System.out.println("进入到B流程的第一步。。  " + ret);
		ret = ret + "hahha";
		exchange.getIn().setBody(ret);
	}

}

public class BCamel2 implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {
		String ret = (String) exchange.getIn().getBody();
		System.out.println("进入到B流程的第二步。。 " + ret);
	}

}

 

在测试类里发起流程:

public static void main(String[] args) {
		ApplicationContext ctx = new FileSystemXmlApplicationContext("WebRoot/WEB-INF/conf/xxxxx/context-route.xml");
		System.out.println(ctx);
		
		ProducerTemplate template = ctx.getBean("producerTemplate", ProducerTemplate.class);
		//template.sendBody("direct://AstartData");
		template.sendBody("direct://AstartData", "okoko");
        template.sendBody("direct://BstartData", "okoko");
		try {
			template.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
}

 

会发现,打印结果如下:

进入到A流程的第一步。。 okoko

进入到A流程的第二步。。OKOKO

进入到B流程的第一步。。  okoko

进入到B流程的第二步。。 okokohahha

 

说明,同一个数据源(这个demo用的是okoko模拟),可以“分发”给若干个(这里是用2个测试)流程来处理,且流程与流程之间拿到的同一数据源,

但他们的处理互不干涉。只要在ProducerTemplate.sendBody("direct://xxx", data); send多个即可

(注:不要嵌套流程,否则容易出错。即在本例中,

<route>  
            <from uri="direct://AstartData"/>  
            <process ref="A1"/>  
            <to uri="direct://startA" />
            <to uri="direct://BstartData" />
</route>

 

构成了嵌套,这是不合理的!direct://AstartData和direct://BstartData(这两个都是流程的源头uri)都应该在java类中启动!)

 

btw, 说下在测试的过程中,出现的一些异常及解决办法吧,记录一下。

 

1)xml里from uri="xxx"里面要加direct://前缀  形如from uri="direct://xxx", 否则会出现异常:

Exception in thread "main" org.apache.camel.RuntimeCamelException: org.apache.camel.FailedToStartRouteException: Failed to start route route4 because of Multiple consumers for the same endpoint is not allowed: Endpoint[direct://start]

at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1316)

at org.apache.camel.spring.SpringCamelContext.onApplicationEvent(SpringCamelContext.java:120)

at org.apache.camel.spring.CamelContextFactoryBean.onApplicationEvent(CamelContextFactoryBean.java:280)

at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:151)

at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:128)

at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:331)

at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:773)

at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:483)

at org.springframework.context.support.FileSystemXmlApplicationContext.<init>(FileSystemXmlApplicationContext.java:140)

at org.springframework.context.support.FileSystemXmlApplicationContext.<init>(FileSystemXmlApplicationContext.java:84)

at com.kayak.itmon.web.demo.camel.CamelStart.main(CamelStart.java:9)

Caused by: org.apache.camel.FailedToStartRouteException: Failed to start route route4 because of Multiple consumers for the same endpoint is not allowed: Endpoint[direct://start]

at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRouteConsumers(DefaultCamelContext.java:2019)

at org.apache.camel.impl.DefaultCamelContext.doStartRouteConsumers(DefaultCamelContext.java:1995)

at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:1923)

at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:1702)

at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:1583)

at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:1444)

at org.apache.camel.spring.SpringCamelContext.doStart(SpringCamelContext.java:179)

at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:60)

at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:1412)

at org.apache.camel.spring.SpringCamelContext.maybeStart(SpringCamelContext.java:228)

at org.apache.camel.spring.SpringCamelContext.onApplicationEvent(SpringCamelContext.java:118)

... 9 more

 

2)不要有相同的uri="direct://xxx"  否则会出现异常:

Exception in thread "main" org.apache.camel.RuntimeCamelException: org.apache.camel.FailedToCreateRouteException: Failed to create route route1: Route[[From[AstartData]] -> [process[ref:A1], To[direct://st... because of No endpoint could be found for: AstartData, please check your classpath contains the needed Camel component jar.

at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1316)

at org.apache.camel.spring.SpringCamelContext.onApplicationEvent(SpringCamelContext.java:120)

at org.apache.camel.spring.CamelContextFactoryBean.onApplicationEvent(CamelContextFactoryBean.java:280)

at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:151)

at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:128)

at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:331)

at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:773)

at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:483)

at org.springframework.context.support.FileSystemXmlApplicationContext.<init>(FileSystemXmlApplicationContext.java:140)

at org.springframework.context.support.FileSystemXmlApplicationContext.<init>(FileSystemXmlApplicationContext.java:84)

at com.kayak.itmon.web.demo.camel.CamelStart.main(CamelStart.java:9)

Caused by: org.apache.camel.FailedToCreateRouteException: Failed to create route route1: Route[[From[AstartData]] -> [process[ref:A1], To[direct://st... because of No endpoint could be found for: AstartData, please check your classpath contains the needed Camel component jar.

at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:177)

at org.apache.camel.impl.DefaultCamelContext.startRoute(DefaultCamelContext.java:722)

at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:1789)

at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:1575)

at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:1444)

at org.apache.camel.spring.SpringCamelContext.doStart(SpringCamelContext.java:179)

at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:60)

at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:1412)

at org.apache.camel.spring.SpringCamelContext.maybeStart(SpringCamelContext.java:228)

at org.apache.camel.spring.SpringCamelContext.onApplicationEvent(SpringCamelContext.java:118)

... 9 more

Caused by: org.apache.camel.NoSuchEndpointException: No endpoint could be found for: AstartData, please check your classpath contains the needed Camel component jar.

at org.apache.camel.util.CamelContextHelper.getMandatoryEndpoint(CamelContextHelper.java:52)

at org.apache.camel.model.RouteDefinition.resolveEndpoint(RouteDefinition.java:187)

at org.apache.camel.impl.DefaultRouteContext.resolveEndpoint(DefaultRouteContext.java:108)

at org.apache.camel.impl.DefaultRouteContext.resolveEndpoint(DefaultRouteContext.java:114)

at org.apache.camel.model.FromDefinition.resolveEndpoint(FromDefinition.java:72)

at org.apache.camel.impl.DefaultRouteContext.getEndpoint(DefaultRouteContext.java:90)

at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:857)

at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:172)

... 18 more

 

3)java代码里templete.sendBody("direct://xxxx")必须要加第2个参数 如:templete.sendBody("direct://xxxx", data) 否则会出现异常:

Exception in thread "main" java.lang.IllegalArgumentException: defaultEndpoint must be specified

at org.apache.camel.util.ObjectHelper.notNull(ObjectHelper.java:294)

at org.apache.camel.impl.DefaultProducerTemplate.getMandatoryDefaultEndpoint(DefaultProducerTemplate.java:445)

at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:342)

at com.kayak.itmon.web.demo.camel.CamelStart.main(CamelStart.java:13)

 

4
5
分享到:
评论
6 楼 Everyday都不同 2015-08-27  
string2020 写道
Everyday都不同 写道
string2020 写道
direct 和 vm有什么区别?

他们是不同的通讯组件,在camel中代表不同的通讯协议。direct接到消息后不做任何处理直接路由到下一个节点;vm 接收到消息后先存入队列中,然后由监听线程取出消息送入下一个节点进行处理.


就是说,功能和效果一样,只是一个是同步,一个是异步。
是吗?

5 楼 string2020 2015-08-27  
Everyday都不同 写道
string2020 写道
direct 和 vm有什么区别?

他们是不同的通讯组件,在camel中代表不同的通讯协议。direct接到消息后不做任何处理直接路由到下一个节点;vm 接收到消息后先存入队列中,然后由监听线程取出消息送入下一个节点进行处理.


就是说,功能和效果一样,只是一个是同步,一个是异步。
是吗?
4 楼 Everyday都不同 2015-08-26  
string2020 写道
direct 和 vm有什么区别?

他们是不同的通讯组件,在camel中代表不同的通讯协议。direct接到消息后不做任何处理直接路由到下一个节点;vm 接收到消息后先存入队列中,然后由监听线程取出消息送入下一个节点进行处理.
3 楼 string2020 2015-08-26  
direct 和 vm有什么区别?
2 楼 Everyday都不同 2015-08-26  
comsci 写道


   通过修改代码来实现对不同流程的同步访问?  运行时? 实时的?

   在修改代码之前,用户的运行过程必须全部终止吗?

额,我这篇的意思是说在数据的源头“分发”给不同流程,同时配置文件必须配置相应的不同流程。并不是说在现有流程运行的情况下修改代码。不知道这样说是否解决了你的困惑。
1 楼 comsci 2015-08-26  


   通过修改代码来实现对不同流程的同步访问?  运行时? 实时的?

   在修改代码之前,用户的运行过程必须全部终止吗?

相关推荐

    CAMEL 呼叫流程和信令流程

    CAMEL 呼叫流程和信令流程 CAMEL 呼叫流程和信令流程 CAMEL 呼叫流程和信令流程 CAMEL 呼叫流程和信令流程

    Camel服务集成,服务编排操作文档

    Camel服务集成,服务编排操作文档

    亲测好用——Apache Camel简介以及使用场景.pptx.zip

    Apache Camel简介以及使用场景

    Camel in action(camel实战)

    Apache Camel is a Java framework that lets you implement the standard enterprise integration patterns in a few lines of code. With a concise but sophisticated DSL you snap integration logic into your ...

    Camel_Camel3Camel6函数_

    Camel3 Camel6函数等matlab源代码

    [Camel实战].(Camel.in.Action).Claus.Ibsen&Jonathan;.Anstey.文字版

    中文名: Camel 实战 原名: Camel in Action 作者: Claus Ibsen Jonathan Anstey 资源格式: PDF 版本: 英文文字版/更新源代码 出版社: Manning书号: 9781935182368发行时间: 2010年12月 地区: 美国 语言: 英文 简介: ...

    ApacheCamel-JDBC

    ApacheCamel-JDBC Apache Camel JDBC组件 代码样例Demo

    大众点评开源软负载管理中间件 Camel.zip

    Camel项目由camel-admin, Dengine(基于Tengine开发的Web服务器), camel-agent三个模块组成: camel-admin:  Camel管理端:可以通过接口及页面两种方式对Nginx集群进行发布、重启、监控等操作。 ...

    Apache Camel中文开发使用指南.zip

    Apache Camel 开发使用指南中文版

    apache camel 简介

    Apache Camel(http://camel.apache.org/) 是一个非常强大的基于规则的路由以及媒介引擎,该引擎提供了基于POJO的 企业应用模式(EIP--Enterprise Integration Patterns)的实现。

    ApacheCamel-FTP

    ApacheCamel-FTP ApacheCamel-FTP Apache Camel FTP组件 Demo 样例

    camel, Apache camel 镜像.zip

    camel, Apache camel 镜像 Apache camel 是基于已知企业集成模式的强大开放源代码集成框架,它具有强大的Bean集成。简介flex允许你创建企业集成模式,以基于基于Java的域特定语言( 或者 Fluent API ) 或者基于 Sc

    ApacheCamel-Timer

    09-ApacheCamel-Timer Apache Camel Timer组件 定时器 代码Demo

    Camel in Action ch1

    1: Meet Camel - FREE 2: Routing with Camel - AVAILABLE Part 2 Core Camel 3: Transforming Data with Camel - AVAILABLE 4: Using Beans with Camel - AVAILABLE 5: Error Handling - AVAILABLE 6:...

    camel-manual-2.0

    camel-manual-2.0 camel ESB EIP EAI

    Apache Camel Developer's Cookbook

    camel的介绍书籍比较少,该书能帮助我们理解apache camel的用发,对于做企业应用集成,规则流程开发等需求的开发者来说,该书是一个不错的参考

    camel文档

    camel 文档

    apache-camel-demo

    Apache Camel是一个基于规则路由和中介引擎,提供企业集成模式的Java对象(POJO)的实现,通过应用程序接口(或称为陈述式的Java领域特定语言(DSL))来配置路由和中介的规则。领域特定语言意味着Apache Camel支持你...

    Camel in Action.zip

    Apache Camel 作为集成项目的利器,针对应用集成场景的抽象出了一套消息交互模型,...与传统的企业集成服务总线(ESB)相比,Apache Camel的核心库非常小巧(是一个只有几M的jar包),可以方便地与其他系统进行集成。

    Mastering.Apache.Camel

    An advanced guide to Enterprise Integration using Apache Camel About This Book Integrate your applications with Apache Camel and enhance efficiency and scalability Master all the EIPs supported by ...

Global site tag (gtag.js) - Google Analytics