接上篇:Apache Commons Pipeline 使用学习(一)
stage生命周期
当pipeline组装、运行的时候,每个stage通常是在它自己的线程中运行(pipeline的所有线程由同一个JVM实例所拥有)。这种多线程的方法在多处理器系统中有处理上的优势。对于给定的stage,各种stage的方法按照顺序运行: init(), preprocess(), process(), postprocess() and release()。然而,stage之间各方法开始和完成的顺序是不确定性。换言之,在具有多个stage的pipeline不能指望任何特定stage的preprocess()方法开始或者完成在另一stage的任何方法之后。如果你的stage之间有依赖关系,请参阅下面部分讨论的stage之前的Events和Listeners。
pipeline中stage的顺序由配置文件来确定。配置文件通过Digester定义,这是一个XML文件,其中会列出使用的stage和初始化参数。每个stage被添加到pipeline中,并执行其init()方法。当所有stage都被装入到pipeline中,pipeline被设置为开始运行。调用的各个stage的preprocess()方法。当使用DedicatedThreadStageDriver每个stage在它自己的线程中运行,并且preprocess()方法被异步运行。
当pipeline的第一个stage的preprocess方法完成,将在由关联的stage driver传入的数据对象上开始运行process()方法。当第一stage完成处理后,数据对象将传递到下一个stage。如果此时下一stage没有完成自己的preprocess()方法,传递的数据对象将会在第二stage的stage driver中排队。当所有的初始化的对象被第一个stage的process()完成之后,将调用postprocess()方法。当postprocess()方法完成后,STOP_REQUESTED信号被发送到下一个stage,以表明没有更多的对象进入pipeline。下一stage将处理队列中的对象,然后调用它自己的postprocess()方法。这中处理完队列中的数据然后调用postprocess方法向下传播。每个stage完成 postprocess方法后,运行它的release()方法()。init()和release()不依赖自己stage之外的任何东西。
每个stage在处理过程中发生异常时可以配置为停止或继续。stage在preprocess(), process(), or postprocess()中可能抛出一个StageException()。如果配置为继续运行,stage将处理下一个数据对象。如果配置停止,stage将结束处理,并且任何后续process() 、postprocess() 方法将不会被调用。release()方法总是被调用,因为它写在stage处理代码try-catch结构中的finally块中。
stage之间的通信
stage彼此通信有两种主要机制。为了保持数据流和“管道(Pipeline)”的比喻,两种都是发送消息到“下游”到后续stage。
- 正常EMIT()到下一stage(的队列) - 有序的传递数据对象。这些对象通常实现为Java bean,并且有时被称为"data beans".
- 事件和监听器 - 通常传递控制或stage之间同步元数据。使用此机制时,在Pipeline中较晚的stage需要的信息只能由较早的stage提供,不属于数据bean提供。
作为事件和监听器的例子,假设你有一个从数据库表中读取数据的stage,而后面的stage将数据写入到另一个数据库。读取该表的stage需要将表的布局信息传递给写表操作的stage,这样当目标表不存在时,写表的stage可以通过事件中的信息创建一个表。TableReader.preprocess()方法触发一个事件,并且携带表的布局数据。TableWriter stage的 preprocess()方法设置为侦听表事件,并且等待该事件发生之后,才处理数据,这样的TableWriter不会处理对象,直到目标表已准备就绪。
三、使用Digester 配置 Pipeline
现在是时候展示的Pipeline的配置文件,当使用Digester时为XML格式。
例子一:
下面是一个展示基本结构的例子。这个Pipeline有三个stage,一个环境变量。示例代码如下。
<?xml version="1.0" encoding="UTF-8"?> <!-- Document : configMyPipeline.xml Description: An example Pipeline configuration file --> <pipeline> <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df0"/> <!-- The <env> element can be used to add global environment variable values to the pipeline. In this instance almost all of the stages need a key to tell them what type of data to process. --> <env> <value key="dataType">STLD</value> </env> <!-- The initial stage traverses a directory so that it can feed the filenames of the files to be processed to the subsequent stages. The directory path to be traversed is in the feed block following this stage. The filePattern in the stage block is the pattern to look for within that directory. --> <stage className="org.apache.commons.pipeline.stage.FileFinderStage" driverFactoryId="df0" filePattern="SALES\.(ASWK|ST(GD|GL|LD))\.N.?\.D\d{5}"/> <feed> <value>/mnt/data2/gdsg/sst/npr</value> </feed> <stage className="gov.noaa.eds.example.Stage2" driverFactoryId="df0" /> <!-- Write the data from the SstFileReader stage into the Rich Inventory database. --> <stage className="gov.noaa.eds.sst2ri.SstWriterRI" driverFactoryId="df0"/> </pipeline>
下面是上面例子的总结:
<?xml version="1.0" encoding="UTF-8"?>
这些pipeline的配置文件总是以这个XML声明开始。
<pipeline>...</pipeline>
顶级元素是<pipeline>包围其余部分。
<driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df0"/>
设置一个StageDriverFactory来输入和控制stage。stage被DedicatedThreadStageDriver控制,它从一个名为“DF0”的工厂中获得。
<env> <value key="dataType">STLD</value> </env>
设置一个名为“dataType”的常量,各个stage都可以访问“STLD”数据并且运行中使用。如果有分支,环境常量是局部的,他们只是在它们所在分支中有效,分支之间不共享。但是,你可以定义相同的环境在不同分支。
<stage className="org.apache.commons.pipeline.stage.FileFinderStage" driverFactoryId="df0" filePattern="SALES\.(ASWK|ST(GD|GL|LD))\.N.?\.D\d{5}"/>
定义stage,FileFinderStage将为下一stage的处理选择文件。本例中有一个“filePattern”的参数限制了传递到下一stage的文件。仅仅匹配到给定的正则表达式的文件会被使用。注意,“driverFactoryId”是“DF0”,它匹配给先前在此文件中的driverFactory元素的名称。
<feed> <value>/mnt/data2/gdsg/sst/npr</value> </feed>
<feed> 中的值用于第一个stage的初始数据。在这个例子中,FileFinderStage期望获取的文件,至少是这些开始的目录。注意, <feed>必须配置在的pipeline中的第一个stage之后。在stage创建时,如果之前没有任何stage,feed的值将被舍弃。
例二:
第二个示例显示了两个stage的最小的pipeline。第一个stage是FileFinderStage,它从起始目录“"/data/sample" 中读取的文件名和匹配任何已“HelloWorld”开头的文件。第二个status是LogStage,它在通常用在调试过程中。 LogStage调用输入对象的toString方法,然后写到日志文件,然后传递它所接收到对象到下一个stage,因此很容易在任意两个stage之间使用,在不改变它们之间传递的对象的情况下记录日志文件。
对应上图,配置文件有一些彩色文本,使其更容易匹配到的图像中的对象。
<?xml version="1.0" encoding="UTF-8"?> <!-- Document : configSimplePipeline.xml Description: A sample configuration file for a very simple pipeline --> <pipeline> <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="driverFactory"/> <!-- ((1)) The first stage recursively searches the directory given in the feed statement. The filePattern given will match any files beginning with "HelloWorld". --> <stage className="org.apache.commons.pipeline.stage.FileFinderStage" driverFactoryId="driverFactory" filePattern="HelloWorld.*"/><!-- ((3)) --> <!-- Starting directory for the first stage. --> <feed> <value>/data/sample</value> <!-- ((4)) --> </feed> <!-- ((2)) Report the files found. --> <stage className="org.apache.commons.pipeline.stage.LogStage" driverFactoryId="driverFactory" /> </pipeline>
一个driver factory 服务两个stage。driver factory ID是“driverFactory”,并且这个值被用于两个stage上
理论上,pipeline可以仅仅有一个stage,但是这中退化的情况与普通的程序没有什么不同,只是它可以方便的扩展为多个stage。
例三:
带颜色的配置文件如下:
<?xml version="1.0" encoding="UTF-8"?> <!-- Document : branchingPipeline.xml Description: Configuration file for a pipeline that takes user provided files as input, and from that both generates HTML files and puts data into a database. --> <pipeline> <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df0"/> <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df1"> <property propName="queueFactory" className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory" capacity="4" fair="false"/> </driverFactory> <!-- The <env> element can be used to add global environment variable values to the pipeline. In this instance almost all of the stages need a key to tell them what type of data to process. --> <env> <value key="division">West</value> <!-- ((9)) --> </env> <!-- ((1)) The initial stage traverses a directory so that it can feed the filenames of of the files to be processed to the subsequent stages. The directory path to be traversed is in the feed block at the end of this file. The filePattern in the stage block is the pattern to look for within that directory. --> <stage className="org.apache.commons.pipeline.stage.FileFinderStage" driverFactoryId="df0" filePattern="SALES\.(ASWK|ST(GD|GL|LD))\.N.?\.D\d{5}"/><!-- ((8)) --><feed> <value>/data/INPUT/raw</value> <!-- ((7)), ((11)) --> </feed> <!-- ((2)) This stage is going to select a subset of the files from the previous stage and orders them for time sequential processing using the date embedded in the last several characters of the file name. The filesToProcess is the number of files to emit to the next stage, before terminating processing. Zero (0) has the special meaning that ALL available files should be processed. --> <stage className="com.demo.pipeline.stages.FileSorterStage" driverFactoryId="df1" filesToProcess="0"/> <!-- ((3)) Read the files and create the objects to be passed to stage that writes to the database and to the stage that writes the data to HTML files. WARNING: The value for htmlPipelineKey in the stage declaration here must exactly match the branch pipeline key further down in this file. --> <stage className="com.demo.pipeline.stages.FileReaderStage" driverFactoryId="df1" htmlPipelineKey="sales2html"/> <!-- ((4)) Write the data from the FileReaderStage stage into the database. --> <stage className="com.demo.pipeline.stages.DatabaseWriterStage" driverFactoryId="df1"> <datasource user="test" password="abc123" type="oracle" host="brain.demo.com" port="1521" database="SALES" /> <database-proxy className="gov.noaa.gdsg.sql.oracle.OracleDatabaseProxy" /> <tablePath path="summary.inventory" /> <!-- ((13)) --> </stage> <!-- Write the data from the FileReaderStage stage to HTML files. The outputFilePath is the path to which we will be writing our summary HTML files. WARNING: The value for the branch pipeline key declaration here must exactly match the htmlPipelineKey in the FileReaderStage stage in this file. --> <branch> <pipeline key="sales2html"> <!-- ((10)) --><env> <value key="division">West</value> <!-- ((14)) --> </env> <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df2"> <property propName="queueFactory" className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory" capacity="4" fair="false"/> </driverFactory> <!-- ((5)) HTMLWriterStage --> <stage className="com.demo.pipeline.stages.HTMLWriterStage" driverFactoryId="df2" outputFilePath="/data/OUTPUT/web"/> <!-- ((12)) --> <!-- ((6)) StatPlotterStage --> <stage className="com.demo.pipeline.stages.StatPlotterStage" driverFactoryId="df2" outputFilePath="/data/OUTPUT/web"/> <!-- ((12)) --></pipeline> </branch> </pipeline>
注:在这个例子中配置为“West” 的常量“division”,定义在两个地方。在主pipeline和分支pipeline都是相同的值。这是因为分支不共享相同的环境常数。
该driverFactories“DF1”和“DF2”通过指定ArrayBlockingQueueFactory覆盖默认queueFactory。设置容量为4个对象,这样做是为了限制使用DF1或DF2的stage的队列大小。这通常是要限制pipeline使用的资源,并且是必要的,防止无界队列使用了所有可用的java存储或超过了所允许打开的文件句柄的数量。创建队列之后的队列大小不能被改变。因为只有一个线程正在访问的队列,公平属性可以被设置为“false”。如果公平=“true”,则有额外的开销,以确保访问队列中的所有线程的顺序处理(FIFO)。
相关推荐
apache commons all 中文api合集
commons-lang3.3.1.jar、Apache Commons包中的一个,包含了一些数据类型工具类,是java.lang.*的扩展。必须使用的jar包。为JRE5.0+的更好的版本所提供 Jar文件包含的类: META-INF/MANIFEST.MFMETA-INF/LICENSE....
apache commons 工具包中提供的一个针对配置文件动态修改的工具类
commons-lang3.3.1.jar、Apache Commons包中的一个,包含了一些数据类型工具类,是java.lang.*的扩展。必须使用的jar包。为JRE5.0+的更好的版本所提供 Jar文件包含的类: META-INF/MANIFEST.MFMETA-INF/LICENSE....
apache Commons Lang 2.4 API apache Commons Lang 2.4 API
apache commons jar(commons所有的jar包,从官网下载提供给大家) 因为涉及jar太多,包括有src源代码,只需要3分,希望大家理解,我也是从官网花了很长时间才一个一个下完,需要的请自取。全部是zip文件,每个对应的...
Apache Commons Collections的使用指南,该jar包提供了多数集合的线程安全版本,以及增强了大多数的集合功能,送给那些不想重复发明轮子的人.
Apache Commons API简介,主要介绍Apache Commons API所包含的包,后续更新补充各类的简介和使用方法
Apache Commons Collections,commons-collections-3.2.1和commons-collections4-4.0,含jar包及源码和api文档。
主要介绍了Apache Commons Math3探索之多项式曲线拟合实现代码,小编觉得挺不错的,这里分享给大家,供需要的朋友参考。
apache-commons下全部官方源码和官方API文档,其中有: commons-beanutils-1.8.0 commons-codec commons-collections commons-dbcp commons-dbutils commons-fileupload commons-io commons-lang commons-lang3 ...
Apache Commons IO 功能是使用 Apache Commons Pool简介 Apache Commons DBCP使用
Commons-beanutils-API Commons-collections-API Commons-configuration-API Commons-lang-API Commons-logging-API Dom4j_API java api javascript Struts API J2EE API
Apache Commons Logging 1.2
Apache Commons是一个非常有用的工具包,解决各种实际的通用问题。(附件中提供了该工具包的jar包,及源文件以供研究) BeanUtils Commons-BeanUtils 提供对 Java 反射和自省API的包装 Betwixt Betwixt提供将 ...
本文将介绍如何在程序中使用Apache Commons-logging
Apache Commons Pool 2.4.1,编译 jedis 2.7.2 时候使用。
import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpException; import org.apache.commons.httpclient.HttpStatus; import org.apache.commons.httpclient.methods....
天天都有人导入Apache的包,但是里面那么多工具类又有多少人使用过,这里面有一些使用介绍
Apache Commons官网jar包,包含io,cli,codec,net,lang,email等等等等