`
zhangwei_david
  • 浏览: 468820 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Spring 基于事件的通信

 
阅读更多

Spring 应用上下文支持基于事件的Bean间通信。在基于事件的通信模式中,事件的发送者不需要关系,事件的监听者。这样可以使消息的发送者和监听者进行解耦。

在Spring中所有事件类必须继承自ApplicationEvent,这样任何bean都可以调用事件发布者的publishEvent()方法,发布一个事件。

 

 

 

public class MyEvent extends ApplicationEvent {

	/**  */
	private static final long serialVersionUID = 1L;


	/**
	 * @param source
	 */
	public MyEvent(Object source) {
		super(source);
		
	}

}

 

@Component("eventPublisher")
public class EventPublisher implements ApplicationEventPublisherAware {
	
	private ApplicationEventPublisher applicationEventPublisher;
	
	
	public void ckeckout(){
		applicationEventPublisher.publishEvent(new MyEvent(this));
	}

	/** 
	 * @see org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)
	 */
	@Override
	public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
		this.applicationEventPublisher=applicationEventPublisher;
	}

}

 

@Component
public class MyListener implements ApplicationListener<MyEvent> {

	/** 
	 * @see org.springframework.context.ApplicationListener#onApplicationEvent(org.springframework.context.ApplicationEvent)
	 */
	@Override
	public void onApplicationEvent(MyEvent event) {
		System.out.println(Thread.currentThread().getName()+";"+event.getTimestamp());
	}

}

 

public class Test {

	/**
	 * 
	 * @param args
	 * @author zhangwei<wei.zw@corp.netease.com>
	 */
	public static void main(String[] args) {
		ApplicationContext context = new ClassPathXmlApplicationContext("spring.xml");
		EventPublisher cashier = (EventPublisher) context.getBean("eventPublisher");
		for(int i=0;i<20;i++) {
			cashier.ckeckout();
		}
	}

}

 

	<bean id="taskExecutor"
		class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
		<property name="corePoolSize" value="5" />
		<property name="keepAliveSeconds" value="30000" />
		<property name="maxPoolSize" value="1000" />
		<property name="queueCapacity" value="200" />
	</bean>
	<bean id="applicationEventMulticaster"
		class="org.springframework.context.event.SimpleApplicationEventMulticaster">
		<property name="taskExecutor" ref="taskExecutor" />
	</bean>

 

taskExecutor-1;1454033201241
taskExecutor-2;1454033201242
taskExecutor-3;1454033201242
taskExecutor-4;1454033201242
taskExecutor-5;1454033201243
taskExecutor-1;1454033201243
taskExecutor-3;1454033201243
taskExecutor-3;1454033201243
taskExecutor-2;1454033201243
taskExecutor-5;1454033201243
taskExecutor-1;1454033201243
taskExecutor-4;1454033201243
taskExecutor-3;1454033201243
taskExecutor-2;1454033201244
taskExecutor-5;1454033201244
taskExecutor-5;1454033201244
taskExecutor-4;1454033201244
taskExecutor-3;1454033201244
taskExecutor-2;1454033201244
taskExecutor-5;1454033201244

 

 

 

通过上面的一个示例,实现了异步的基于事件通信。

 

在AbstractApplicationContext中发布事件的实现如下,首先获取applicationEventMulticaster,通过其发布事件。

 

 

/**
	 * Publish the given event to all listeners.
	 * <p>Note: Listeners get initialized after the MessageSource, to be able
	 * to access it within listener implementations. Thus, MessageSource
	 * implementations cannot publish events.
	 * @param event the event to publish (may be application-specific or a
	 * standard framework event)
	 */
	public void publishEvent(ApplicationEvent event) {
		Assert.notNull(event, "Event must not be null");
		if (logger.isTraceEnabled()) {
			logger.trace("Publishing event in " + getDisplayName() + ": " + event);
		}
		getApplicationEventMulticaster().multicastEvent(event);
		if (this.parent != null) {
			this.parent.publishEvent(event);
		}
	}

     在看看ApplicaitonEventMulticaster的初始化逻辑,如果在有配置过applicationEventMulticaster则直接使用,否则创建一个;注意,配置是ID必须是applicationEventMulticaster

/**
	 * Initialize the ApplicationEventMulticaster.
	 * Uses SimpleApplicationEventMulticaster if none defined in the context.
	 * @see org.springframework.context.event.SimpleApplicationEventMulticaster
	 */
	protected void initApplicationEventMulticaster() {
		ConfigurableListableBeanFactory beanFactory = getBeanFactory();
		if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
			this.applicationEventMulticaster =
					beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
			if (logger.isDebugEnabled()) {
				logger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
			}
		}
		else {
			this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
			beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
			if (logger.isDebugEnabled()) {
				logger.debug("Unable to locate ApplicationEventMulticaster with name '" +
						APPLICATION_EVENT_MULTICASTER_BEAN_NAME +
						"': using default [" + this.applicationEventMulticaster + "]");
			}
		}
	}

 再看看ApplicationEventMulticaster中又是如何发布事件的。如果有配置线程池,则异步处理,否则同步处理。

 

public void multicastEvent(final ApplicationEvent event) {
		for (final ApplicationListener listener : getApplicationListeners(event)) {
			Executor executor = getTaskExecutor();
			if (executor != null) {
				executor.execute(new Runnable() {
					@SuppressWarnings("unchecked")
					public void run() {
						listener.onApplicationEvent(event);
					}
				});
			}
			else {
				listener.onApplicationEvent(event);
			}
		}
	}

 

 

 

1
0
分享到:
评论
3 楼 cywhoyi 2016-02-02  
我现在采用的是RXJava的方式,不过eventbus可能更加舒服点
2 楼 cywhoyi 2016-02-02  
我会是微博 写道
用于什么业务场景?

消息消费等渠道
1 楼 我会是微博 2016-02-01  
用于什么业务场景?

相关推荐

Global site tag (gtag.js) - Google Analytics