`

坚持学习WF(11):工作流通信与队列

阅读更多

 

[置顶]坚持学习WF文章索引

WF 提供的通信模型是构建于队列系统的基础之上,我们可以使用自定义活动来注册以接收关于队列的消息,而宿主应用程序中的服务则发送关于队列的消息。自定义活动可以使用此模型来处理外部事件,也可以传递异步活动执行的完成。这样,您的活动可以先执行到某一点,然后等待激发因素的到来以便继续执行。下图描述了宿主应用程序中的代码与工作流中的代码(或活动)之间的通信模型。

CouQueue

下面这张图是WF类库中和队列相关的三个类:

QueueClass

为了使自定义活动能够侦听消息是否到达某个队列,我们通常有以下步骤:

1.使用WorkflowQueuingService 创建工作流队列,该类还提供了创建、查找或删除工作流队列所需的方法。一般我们会在自定义活动的 Initialize 或 Execute 方法中来实现。

2.自定义活动必须注册才能接收到这些通知,方法是在工作流队列自身中注册 QueueItemAvailable 事件。您可以使用 RegisterForQueueItemAvailable 方法为 QueueItemAvailable 事件注册一个订户。QueueItemAvailable事件用于通知订户项已经传送(以异步方式)至此 WorkflowQueue。在确保队列存在并注册事件后,当队列中有可用项目时,您的活动会得到通知,之后,您可以从队列中取出该项目并对其进行处理。

3.我们自定义活动要能够充当事件接收器的活动(如 HandleExternalEvent 活动),您还需要实现 IEventActivity 接口。如果您的活动要侦听事件,则此接口用于定义该活动的主要职责:

public interface IEventActivity 
{
    
void Subscribe(ActivityExecutionContext parentContext, IActivityEventListener<QueueEventArgs> parentEventHandler);
    
void Unsubscribe(ActivityExecutionContext parentContext, IActivityEventListener<QueueEventArgs> parentEventHandler);
    IComparable QueueName 
get; } 
}

WF中所有的通信的活动都实现了这个接口。

QueueName 属性必须返回 IComparable 值,消息加入队列时,它可以唯一地标识您的活动。对于用于将消息加入队列以通知工作流运行时的代码,也需要使用这同一个队列名。

通过此接口,能够命令活动在其执行前订阅事件并让活动知道何时取消订阅。在订阅和取消订阅方法中,该活动负责确保使用 QueueName 来创建队列并在处理结束时删除队列。此外,这也为您的活动能够向任何本地服务注册信息提供了机会,这些本地服务将代表活动来执行逻辑并通过将消息加入队列予以响应。

本地服务是您定义并从主机添加到工作流运行时的一个类,它可以被您的宿主代码、工作流或您的活动所利用。只要宿主应用程序处于运行状态,本地服务就能够维护事件处理程序或其他侦听程序,从而可通过将消息加入队列来确保相应的数据到达工作流。您传递给本地服务的信息应包括队列名和工作流实例 ID 的相关信息,以及该服务发起工作或向您的活动返回结果时所需的任何信息。

1.下面我们先来实现一个这样的自定义活动,利用该活动得到对列中的信息,然后在将该信息发送给宿主程序代码如下:

 

RequestResponseData.cs
<!--<br><br>Code highlighting produced by Actipro CodeHighlighter (freeware)<br>http://www.CodeHighlighter.com/<br><br>-->using System; 
using System.ComponentModel; 
using System.ComponentModel.Design; 
using System.Collections; 
using System.Drawing; 
using System.Linq; 
using System.Workflow.ComponentModel; 
using System.Workflow.ComponentModel.Design; 
using System.Workflow.ComponentModel.Compiler; 
using System.Workflow.ComponentModel.Serialization; 
using System.Workflow.Runtime; 
using System.Workflow.Activities; 
using System.Workflow.Activities.Rules; 
using System.Collections.Generic; 

namespace CaryQueue 

    [Designer(
typeof(SequentialActivityDesigner),typeof(IDesigner))] 
    
public partial class RequestResponseData: SequenceActivity,IActivityEventListener<QueueEventArgs>,IEventActivity 
    

        
public RequestResponseData() 
        

            InitializeComponent(); 
        }
 

        
Properties#region Properties 

        
public static DependencyProperty OutputValuesProperty = System.Workflow.ComponentModel.DependencyProperty.Register("OutputValues"typeof(Dictionary<stringstring>), typeof(RequestResponseData)); 

        [Description(
"The values to be sent back to the host")] 
        [Category(
"Data")] 
        [Browsable(
true)] 
        [DesignerSerializationVisibility(DesignerSerializationVisibility.Visible)] 
        
public Dictionary<stringstring> OutputValues 
        

            
get 
            

                
return ((Dictionary<stringstring>)(base.GetValue(RequestResponseData.OutputValuesProperty))); 
            }
 
            
set 
            

                
base.SetValue(RequestResponseData.OutputValuesProperty, value); 
            }
 
        }
 
        
public static DependencyProperty InputValuesProperty = System.Workflow.ComponentModel.DependencyProperty.Register("InputValues"typeof(Dictionary<stringstring>), typeof(RequestResponseData)); 

        [Description(
"The data sent to the activity")] 
        [Category(
"Data")] 
        [Browsable(
true)] 
        [DesignerSerializationVisibility(DesignerSerializationVisibility.Visible)] 
        
public Dictionary<stringstring> InputValues 
        

            
get 
            

                
return ((Dictionary<stringstring>)(base.GetValue(RequestResponseData.InputValuesProperty))); 
            }
 
            
set 
            

                
base.SetValue(RequestResponseData.InputValuesProperty, value); 
            }
 
        }
 

        
#endregion
 

        
protected override ActivityExecutionStatus Execute(ActivityExecutionContext executionContext) 
        


            
if (ProcessMessage(GetQueue(executionContext, QueueName))) 
            

                
//let the base class run the children, and return status 
                return base.Execute(executionContext); 
            }
 
            
//if no items are there, then subscribe to get notified when they arrive 
            Subscribe(executionContext, this); 
            
return ActivityExecutionStatus.Executing; 
        }
 

        
IEvent and IActivityListener#region IEvent and IActivityListener 

        [Browsable(
false)] 
        
public IComparable QueueName 
        

            
get return "CaryQueue"; } 
        }
 

        
public void Subscribe(ActivityExecutionContext parentContext, IActivityEventListener<QueueEventArgs> parentEventHandler) 
        

            WorkflowQueue queue 
= parentContext.GetService<WorkflowQueuingService>().CreateWorkflowQueue(QueueName, false); 
            queue.RegisterForQueueItemAvailable(parentEventHandler); 

        }
 

        
public void Unsubscribe(ActivityExecutionContext parentContext, IActivityEventListener<QueueEventArgs> parentEventHandler) 
        

            WorkflowQueue q 
= GetQueue(parentContext, QueueName); 
            
if (q != null
            

                q.UnregisterForQueueItemAvailable(parentEventHandler); 
                parentContext.GetService
<WorkflowQueuingService>().DeleteWorkflowQueue(QueueName); 
            }
 
        }
 

        
public void OnEvent(object sender, QueueEventArgs e) 
        

            ActivityExecutionContext ctx 
= sender as ActivityExecutionContext; 

            
if (ProcessMessage(GetQueue(ctx, e.QueueName))) 
            

                
if (base.Execute(ctx) == ActivityExecutionStatus.Closed) 
                    ctx.CloseActivity(); 
            }
 
        }
 

        
#endregion
 

        
private WorkflowQueue GetQueue(ActivityExecutionContext context, IComparable queueName) 
        

            WorkflowQueuingService qService 
= context.GetService<WorkflowQueuingService>(); 
            
if (qService != null && qService.Exists(queueName)) 
                
return qService.GetWorkflowQueue(queueName); 
            
else 
                
return null
        }
 

        
private bool ProcessMessage(WorkflowQueue queue) 
        

            
if (queue == null || queue.Count == 0
                
return false

            MessageHelper msg 
= queue.Peek() as MessageHelper; 
            
if (msg != null && msg.InputValues != null
            

                InputValues 
= msg.InputValues; 
                Console.WriteLine(
"Request:"+msg.InputValues["inputvalueone"]); 
                Console.WriteLine(
"Request:" + msg.InputValues["inputvaluetwo"]); 
                
return true
            }
 

            
return false
        }
 

        
/**//// <summary> 
        
/// Called when the base class completes executing the 
        
/// child activities. Here we know all the children are complete. 
        
/// </summary> 
        
/// <param name="executionContext"></param> 

        protected override void OnSequenceComplete(ActivityExecutionContext executionContext) 
        

            
//pull the message from the queue and send the 
            
//response back to the host, signalling we are done. 
            WorkflowQueue q = executionContext.GetService<WorkflowQueuingService>().GetWorkflowQueue(QueueName); 
            MessageHelper msg 
= q.Dequeue() as MessageHelper; 
            msg.SendResponse(OutputValues); 

            
//clean up, we
分享到:
评论

相关推荐

    坚持学习WF,WF学习教程

    坚持学习WF(11):工作流通信与队列 WF 提供的通信模型是构建于队列系统的基础之上,我们可以使用自定义活动来注册以接收关于队列的消息,而宿主应用程序中的服务则发送关于队列的消息。自定义活动可以使用此模型来...

    坚持学习WF

    坚持学习WF(11):工作流通信与队列 WF 提供的通信模型是构建于队列系统的基础之上,我们可以使用自定义活动来注册以接收关于队列的消息,而宿主应用程序中的服务则发送关于队列的消息。自定义活动可以使用此模型来...

    .Net.Framework3.5开发技术详解

    .Net.Framework3.5开发...19.1 工作流(WF)简介 19.1.1 工作流的概念 19.1.2 WindowsWorkFlowFoundation 19.2 Activity(活动) 19.3 WF运行时 19.4 基于状态机的工作流 19.5 工作流的持久化 19.6 工作流的XAML表示

    .Net.Framework3.5开发技术详解[中文][PDF][VOL1]

    19.1 工作流(WF)简介 392 19.1.1 工作流的概念 392 19.1.2 WindowsWorkFlowFoundation 393 19.2 Activity(活动) 396 19.3 WF运行时 401 19.4 基于状态机的工作流 405 19.5 工作流的持久化 406 19.6 工作流的XAML表示...

    workflow资料

    WF资料 ├─Activity │ │ 1_Activity 类.doc │ │ 2_状态机与顺序工作流的继承结构.doc │ │ 3_顺序工作流容器 SequentialWorkflowActivity .doc │ │ EventDriven绑定容器 ...

    .Net.Framework3.5开发技术详解[中文][共二卷][PDF][VOL2]

    19.1 工作流(WF)简介 392 19.1.1 工作流的概念 392 19.1.2 WindowsWorkFlowFoundation 393 19.2 Activity(活动) 396 19.3 WF运行时 401 19.4 基于状态机的工作流 405 19.5 工作流的持久化 406 19.6 工作流的XAML表示...

Global site tag (gtag.js) - Google Analytics