`

Apache Commons Pipeline 使用学习(二)

阅读更多

 

接上篇:Apache Commons Pipeline 使用学习(一)

 

stage生命周期

     当pipeline组装、运行的时候,每个stage通常是在它自己的线程中运行(pipeline的所有线程由同一个JVM实例所拥有)。这种多线程的方法在多处理器系统中有处理上的优势。对于给定的stage,各种stage的方法按照顺序运行: init(), preprocess(), process(), postprocess() and release()。然而,stage之间各方法开始和完成的顺序是不确定性。换言之,在具有多个stage的pipeline不能指望任何特定stage的preprocess()方法开始或者完成在另一stage的任何方法之后。如果你的stage之间有依赖关系,请参阅下面部分讨论的stage之前的Events和Listeners。
    

      pipeline中stage的顺序由配置文件来确定。配置文件通过Digester定义,这是一个XML文件,其中会列出使用的stage和初始化参数。每个stage被添加到pipeline中,并执行其init()方法。当所有stage都被装入到pipeline中,pipeline被设置为开始运行。调用的各个stage的preprocess()方法。当使用DedicatedThreadStageDriver每个stage在它自己的线程中运行,并且preprocess()方法被异步运行。

     当pipeline的第一个stage的preprocess方法完成,将在由关联的stage driver传入的数据对象上开始运行process()方法。当第一stage完成处理后,数据对象将传递到下一个stage。如果此时下一stage没有完成自己的preprocess()方法,传递的数据对象将会在第二stage的stage driver中排队。当所有的初始化的对象被第一个stage的process()完成之后,将调用postprocess()方法。当postprocess()方法完成后,STOP_REQUESTED信号被发送到下一个stage,以表明没有更多的对象进入pipeline。下一stage将处理队列中的对象,然后调用它自己的postprocess()方法。这中处理完队列中的数据然后调用postprocess方法向下传播。每个stage完成 postprocess方法后,运行它的release()方法()。init()和release()不依赖自己stage之外的任何东西。

     每个stage在处理过程中发生异常时可以配置为停止或继续。stage在preprocess(), process(), or postprocess()中可能抛出一个StageException()。如果配置为继续运行,stage将处理下一个数据对象。如果配置停止,stage将结束处理,并且任何后续process() 、postprocess() 方法将不会被调用。release()方法总是被调用,因为它写在stage处理代码try-catch结构中的finally块中。

 

stage之间的通信

    stage彼此通信有两种主要机制。为了保持数据流和“管道(Pipeline)”的比喻,两种都是发送消息到“下游”到后续stage。

  • 正常EMIT()到下一stage(的队列) - 有序的传递数据对象。这些对象通常实现为Java bean,并且有时被称为"data beans".
  • 事件和监听器 -  通常传递控制或stage之间同步元数据。使用此机制时,在Pipeline中较晚的stage需要的信息只能由较早的stage提供,不属于数据bean提供。

     作为事件和监听器的例子,假设你有一个从数据库表中读取数据的stage,而后面的stage将数据写入到另一个数据库。读取该表的stage需要将表的布局信息传递给写表操作的stage,这样当目标表不存在时,写表的stage可以通过事件中的信息创建一个表。TableReader.preprocess()方法触发一个事件,并且携带表的布局数据。TableWriter stage的 preprocess()方法设置为侦听表事件,并且等待该事件发生之后,才处理数据,这样的TableWriter不会处理对象,直到目标表已准备就绪。

 

三、使用Digester 配置 Pipeline

现在是时候展示的Pipeline的配置文件,当使用Digester时为XML格式。

例子一:

下面是一个展示基本结构的例子。这个Pipeline有三个stage,一个环境变量。示例代码如下。

 

<?xml version="1.0" encoding="UTF-8"?>

<!--
    Document   : configMyPipeline.xml
    Description: An example Pipeline configuration file
-->

<pipeline>
    
    <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
                   id="df0"/>

    <!-- The <env> element can be used to add global environment variable values to the pipeline.
         In this instance almost all of the stages need a key to tell them what type of data
         to process.
    -->
    <env>
        <value key="dataType">STLD</value>
    </env>               
        
    <!-- The initial stage traverses a directory so that it can feed the filenames of
         the files to be processed to the subsequent stages.
         
         The directory path to be traversed is in the feed block following this stage.
  
         The filePattern in the stage block is the pattern to look for within that directory.
    -->
    
    <stage className="org.apache.commons.pipeline.stage.FileFinderStage"
           driverFactoryId="df0"
           filePattern="SALES\.(ASWK|ST(GD|GL|LD))\.N.?\.D\d{5}"/>
      
    <feed>
        <value>/mnt/data2/gdsg/sst/npr</value>
    </feed>

    <stage className="gov.noaa.eds.example.Stage2"
           driverFactoryId="df0" />

    <!-- Write the data from the SstFileReader stage into the Rich Inventory database.  -->
    <stage className="gov.noaa.eds.sst2ri.SstWriterRI"
           driverFactoryId="df0"/>

</pipeline>

下面是上面例子的总结:

 

 

<?xml version="1.0" encoding="UTF-8"?>

 这些pipeline的配置文件总是以这个XML声明开始。

 

 

<pipeline>...</pipeline>

 顶级元素是<pipeline>包围其余部分。

 

 

<driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" id="df0"/>

 设置一个StageDriverFactory来输入和控制stage。stage被DedicatedThreadStageDriver控制,它从一个名为“DF0”的工厂中获得。

 

 

<env> <value key="dataType">STLD</value> </env>

 设置一个名为“dataType”的常量,各个stage都可以访问“STLD”数据并且运行中使用。如果有分支,环境常量是局部的,他们只是在它们所在分支中有效,分支之间不共享。但是,你可以定义相同的环境在不同分支。

 

 

<stage className="org.apache.commons.pipeline.stage.FileFinderStage" driverFactoryId="df0" filePattern="SALES\.(ASWK|ST(GD|GL|LD))\.N.?\.D\d{5}"/>

定义stage,FileFinderStage将为下一stage的处理选择文件。本例中有一个“filePattern”的参数限制了传递到下一stage的文件。仅仅匹配到给定的正则表达式的文件会被使用。注意,“driverFactoryId”是“DF0”,它匹配给先前在此文件中的driverFactory元素的名称。

 

 

<feed> <value>/mnt/data2/gdsg/sst/npr</value> </feed>

 <feed> 中的值用于第一个stage的初始数据。在这个例子中,FileFinderStage期望获取的文件,至少是这些开始的目录。注意, <feed>必须配置在的pipeline中的第一个stage之后。在stage创建时,如果之前没有任何stage,feed的值将被舍弃。

 

例二:

第二个示例显示了两个stage的最小的pipeline。第一个stage是FileFinderStage,它从起始目录“"/data/sample" 中读取的文件名和匹配任何已“HelloWorld”开头的文件。第二个status是LogStage,它在通常用在调试过程中。 LogStage调用输入对象的toString方法,然后写到日志文件,然后传递它所接收到对象到下一个stage,因此很容易在任意两个stage之间使用,在不改变它们之间传递的对象的情况下记录日志文件。



  对应上图,配置文件有一些彩色文本,使其更容易匹配到的图像中的对象。

 

<?xml version="1.0" encoding="UTF-8"?>

<!--
    Document   : configSimplePipeline.xml
    Description: A sample configuration file for a very simple pipeline
-->

<pipeline>

    <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
                   id="driverFactory"/>

    <!--
        ((1)) The first stage recursively searches the directory given in the feed statement.
        The filePattern given will match any files beginning with "HelloWorld".
    -->
    <stage className="org.apache.commons.pipeline.stage.FileFinderStage"
           driverFactoryId="driverFactory"
           filePattern="HelloWorld.*"/><!-- ((3)) -->

    <!-- Starting directory for the first stage. -->
    <feed>
        <value>/data/sample</value> <!-- ((4)) -->
    </feed>

    <!-- ((2)) Report the files found. -->
    <stage className="org.apache.commons.pipeline.stage.LogStage"
           driverFactoryId="driverFactory" />

</pipeline>

 

一个driver factory 服务两个stage。driver factory  ID是“driverFactory”,并且这个值被用于两个stage上

    理论上,pipeline可以仅仅有一个stage,但是这中退化的情况与普通的程序没有什么不同,只是它可以方便的扩展为多个stage。

 

例三:

 

 带颜色的配置文件如下:

 

<?xml version="1.0" encoding="UTF-8"?>

<!--
   Document   : branchingPipeline.xml
   Description: Configuration file for a pipeline that takes
                user provided files as input, and from that both generates HTML files and
                puts data into a database.
-->

<pipeline>

    <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
        id="df0"/>


    <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
        id="df1">
        <property propName="queueFactory"
            className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
            capacity="4" fair="false"/>
    </driverFactory>


    <!-- 
        The <env> element can be used to add global environment variable values to the pipeline.
        In this instance almost all of the stages need a key to tell them what type of data
        to process.
    -->
    <env>
        <value key="division">West</value> <!-- ((9)) -->
    </env>


    <!-- 
        ((1)) The initial stage traverses a directory so that it can feed the filenames of
        of the files to be processed to the subsequent stages.

        The directory path to be traversed is in the feed block at the end of this file.

        The filePattern in the stage block is the pattern to look for within that directory.
    -->
    <stage className="org.apache.commons.pipeline.stage.FileFinderStage"
        driverFactoryId="df0"
        filePattern="SALES\.(ASWK|ST(GD|GL|LD))\.N.?\.D\d{5}"/><!-- ((8)) --><feed>
        <value>/data/INPUT/raw</value> <!-- ((7)), ((11)) -->
    </feed>


    <!--  
        ((2)) This stage is going to select a subset of the files from the previous stage
        and orders them for time sequential processing using the date embedded in
        the last several characters of the file name.

        The filesToProcess is the number of files to emit to the next stage, before
        terminating processing.  Zero (0) has the special meaning that ALL available
        files should be processed.
    -->
    <stage className="com.demo.pipeline.stages.FileSorterStage"
        driverFactoryId="df1"
        filesToProcess="0"/>


    <!-- 
        ((3)) Read the files and create the objects to be passed to stage that writes to
        the database and to the stage that writes the data to
        HTML files.

        WARNING:  The value for htmlPipelineKey in the stage declaration here
        must exactly match the branch pipeline key further down in this file.
    -->
    <stage className="com.demo.pipeline.stages.FileReaderStage"
        driverFactoryId="df1"
        htmlPipelineKey="sales2html"/>


    <!-- 
        ((4)) Write the data from the FileReaderStage stage into the database.
    -->
    <stage className="com.demo.pipeline.stages.DatabaseWriterStage"
        driverFactoryId="df1">

        <datasource user="test"
        password="abc123"
        type="oracle"
        host="brain.demo.com"
        port="1521"
        database="SALES" />

        <database-proxy className="gov.noaa.gdsg.sql.oracle.OracleDatabaseProxy" />

        <tablePath path="summary.inventory" /> <!-- ((13)) -->
    </stage>


    <!-- 
        Write the data from the FileReaderStage stage to HTML files.

        The outputFilePath is the path to which we will be writing our summary HTML files.

        WARNING:  The value for the branch pipeline key declaration here must
        exactly match the htmlPipelineKey in the FileReaderStage stage in this file.
    -->
    <branch>
        <pipeline key="sales2html"> <!-- ((10)) --><env>
                <value key="division">West</value> <!-- ((14)) -->
            </env>

            <driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
                id="df2">
                <property propName="queueFactory"
                    className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
                    capacity="4" fair="false"/>
            </driverFactory>


            <!-- ((5)) HTMLWriterStage -->
            <stage className="com.demo.pipeline.stages.HTMLWriterStage"
                driverFactoryId="df2"
                outputFilePath="/data/OUTPUT/web"/> <!-- ((12)) -->


            <!-- ((6)) StatPlotterStage -->
            <stage className="com.demo.pipeline.stages.StatPlotterStage"
                driverFactoryId="df2"
                outputFilePath="/data/OUTPUT/web"/> <!-- ((12)) --></pipeline>
    </branch>

</pipeline>

 

 

注:在这个例子中配置为“West” 的常量“division”,定义在两个地方。在主pipeline和分支pipeline都是相同的值。这是因为分支不共享相同的环境常数。

    该driverFactories“DF1”和“DF2”通过指定ArrayBlockingQueueFactory覆盖默认queueFactory。设置容量为4个对象,这样做是为了限制使用DF1或DF2的stage的队列大小。这通常是要限制pipeline使用的资源,并且是必要的,防止无界队列使用了所有可用的java存储或超过了所允许打开的文件句柄的数量。创建队列之后的队列大小不能被改变。因为只有一个线程正在访问的队列,公平属性可以被设置为“false”。如果公平=“true”,则有额外的开销,以确保访问队列中的所有线程的顺序处理(FIFO)。

  • 大小: 88.8 KB
  • 大小: 184 KB
1
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics