Spring社区上月发布了基于事件驱动的异步框架 reactor。当前reactor还在密集研发中,代码几天大变样,非常的不稳定,这2周还完全重写了它自己的reactor-tcp。sample code也跟不上。这里只选取其其中最最基本和核心的功能,让大家先睹为快。
一:核心:基于事件驱动。
事件注册:
//初始化环境变量,若无,会从默认的读取classpath下默认的default.properties 文件
Environment env = new Environment();
//创建一个Reactor,指定的dispatcher为eventLoop,(dispatcher 种类下面会再细说)
Reactor reactor = R.reactor().using(env).dispatcher("eventLoop").get();
//注册一个名为event的事件,事件为Consumer
reactor.on($("event"), new Consumer<Event<String>>() {
@Override
public void accept(Event<String> t) {
System.out.println("This is sample code" + t.getData());
}
});
事件发布(或者事件触发)
//传入一个String事件,事件的值为Hello World!
reactor.notify("event", new Event<String>("Hello World!"));
运行后的结果为: This is sample code!
由此一个最最简单的事件驱动程序就完成了。
二. Composable的使用
Reactor里另外一个核心的功能就是Composable,当一个业务有很多步阻塞调用的时候,在不用callback的情况下,可以应用Composable将其中的阻塞调用,分割成异步非阻塞执行。
我们想象一个应用场景,一个账号服务,需要根据一个accountId来拿到一个account的余额。业务上可能需要这么几步:
a) 验证是否是有效用户 --> 调用authService.isValidUser
b) 根据accountId拿到整个account信息 -->accountService.getAccountById
c) 传入account获得account balance --> billService.getAccountBalance
那么代码怎么实现的。最最简单的直观的就是顺序阻塞调用。
代码A
public Long getAccountBalance1(Integer accountId){
boolean auth = authService.isValidUser(accountId);
if(auth){
Account account = accountService.getAccountById(accountId);
if(account!=null){
return billService.getAccountBalance(account);
}
else{
return -1l;
}
}
else{
return -1l;
}
}
先不论代码的逻辑正确性,上述的代码肯定是吞吐量不高。为求改善,现改为异步调用。最简单的异步就是加callback
代码B:
public Long getAccountBalance1(Integer id){
final Integer accountId = id;
final Future<Long> balanceFuture;
authService.isValidUser(accountId, new ICallback(){
@Override
public void callback(Object value) {
boolean auth = (boolean)value;
if(auth){
accountService.getAccountById(accountId, new ICallback(){
@Override
public void callback(Object value) {
if(value!=null){
Account account = (Account)value;
Long balance = billService.getAccountBalance(account);
balanceFuture.setValue(balance);
}
}
});
}
}
});
return balanceFuture.get(3000,TimeUnit.MILLISECONDS);
}
这断代码没有测试过。但是可以明显的看到这里用了嵌套的callback,代码很难看,不优雅,并对代码有侵入(需要有一个callback的同名接口)。
那么用了Reactor Composable的代码又将会是如何呢,请看:
public Long getAccountBalance(Integer id){
final Integer accountId = id;
Composable<Long> c = S.each(Arrays.asList(accountId))
.using(new Environment())
.dispatcher("eventLoop")
.get()
.map(new Function<Integer, Boolean>(){
@Override
public Boolean apply(Integer accountId) {
return authService.isValidUser(accountId);
}
})
.map(new Function<Boolean,Account>(){
@Override
public Account apply(Boolean auth) {
if(auth){
return accountService.getAccountById(accountId);
}
return null;
}
}).map(new Function<Account,Long>(){
@Override
public Long apply(Account account) {
return billService.getAccountBalance(account);
}
});
try {
return c.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return -1l;
}
}
显然代码优雅了很多,没有代码侵入。但这些都不是关键。
关键是业务中的,a,b,c都被分割成一个独立的事件注册到了reactor,每一步都是独立的线程执行,并且在这个列子中,是顺序执行的,线程安全。
三:Dispatcher
最后这里再详细介绍下之前提到的Reactor的Dispatcher,dispatcher是reactor的核心,顾名思义,就是一个分发器,用作事件的分发,当一个事件到达,(即Reacot.notify被调用)最终会由dispatcher进行任务分发调度(dispatcher.dipatch)
Dispatcher根据线程和队列分为下面几种dispatcher,系统现在默认为SynchronousDispatcher(不知道以后会不会变,之前默认的是BlockingQueueDispatcher)
SynchronousDispatcher
当一个事件到达时,直接由reactor所在的线程直接执行
BlockingQueueDispatcher(eventloop)
事件到达时先存储在一个Blockingqueue中,再由统一的后台线程一一顺序执行
ThreadPoolExecutorDispatcher(threadpool)
事件达到时将事件交由线程池统一调度。该线程池为固定大小线程池,(Executors.newFixedThreadPool)线程大小由配置文件指定。
RingBufferDispatcher(ringbuffer)
该dispatcher是吞吐量最高,使用了名头比较响的lmax的Disruptor构建的ringBuffer作为事件存储数组,其实就是一个不断递增,并可覆盖之前值环。(大家可以关注下lmax disruptpr)还是比较有意思的。
分享到:
相关推荐
spring 最近(2013年5月30日)发布了 spring 的异步事件框架 Reactor. 符合高性能 .耦合性低. 易于使用的原则. 因为刚推出,官方没有 和spring mvc中使用的示例.在写博客的时候发现无法上传附件.放到csdn这里.
我想看看新的闪亮(好吧,不是真的)Spring Reactor与Vert.x的比较 这两个软件包都是用Groovy编写的。 这个想法是将9 hash get命令发送给redis。 这是通过以下方式完成的: 在vert.x示例中,使用Vertx-Redis-...
Reactor 3参考文档,reactor 3是一个围绕Reactive Streams规范构建的库,它在JVM上引入了响应式编程的一个范例。目前Spring5 引入的Webflux就是reactor 3实现的一个响应式web框架
杜万(倚贤)专家从事了 12 年 Java 语言为主的软件开发工作,热衷于整合框架与开发工具,Linux拥趸,问题终结者。合作翻译《Elixir 程序设计》。目前负责阿里云函数计算的工具链开发,正在实践 WebFlux 和 Reactor ...
SpringFramework5 包含响应流(定义响应性API的语言中立尝试)和 Reactor(由Spring Pivotal团队提供的 Reactive Stream 的Java实现), 以用于其自身的用途以及其许多核心API。 Spring Web Reactive 在 spring-...
Reactor3中文帮助文档,帮助初学者了解非阻塞响应式式框架的使用以及原理,帮助了解spring webflux的一些底层实现原理
Spring Cloud Gateway是Spring官方基于Spring 5.0,Spring Boot 2.0和Project Reactor等技术开发的网关,Spring Cloud Gateway旨在为微服务...使用的Webflux中的reactor-netty响应式编程组件,底层使用了Netty通讯框架
Spring Reactor 的通用 Java 库,用于导出/下载或转储各种格式的文件,如 CSV、Excel、PDF 等。 了解更多! 了解更多! 为什么是通用文件导出库? 在大多数应用程序中,存在从从数据库或任何其他来源查询的数据集中...
该项目的目的是跟上Spring框架的发展 内容: -Spring 5 Frawework的测试驱动器(对Spring Boot的最小关注) 参考: 在线的 图书 一,科斯米纳(2019年12月11日)。 Pivotal认证的专业核心Spring 5开发人员考试:...
中间层的业务代码由Reactive Stream方式管理,Reactive Streams默认采用Reactor框架,同时还支持另一款相对庞大的Reactive Stream框架RxJava。可以说WebFlux框架是非常灵活的,选择WebFlux作为响应式异步网络编程是...
特点 1.webflux是一个异步非阻塞的Web框架,它能够充分利用多核CPU的硬件资源去处理大量的并发请求 2.内部使用的是响应式编程,以Reactor库为基础,基于异步和事件驱动,可以让我们在不扩充硬件资源的前提下,提升...
"spring-framework-5.2.8.RELEASE"是Spring Framework的一个发布版本,其中包含了许多更新和改进。...Spring Boot集成:与Spring Boot框架更好地集成,通过自动配置和快速启动器简化了应用程序的开发和部署过程。
React堆净值 Reactor Netty提供了基于Netty框架的非阻塞且可支持背压的TCP / HTTP / UDP客户端和服务器。在做了Reactor Netty需要Java 8或+才能运行。 使用来自或Maven Central存储库的Gradle (仅稳定版本): ...
功能,特性 基于r2dbc ,easy-orm 的通用响应式CRUD H2,Mysql,SqlServer,PostgreSQL 响应式r2dbc事务控制 响应式权限控制,以及权限信息... 在使用hsweb之前,你应该对 project-reactor , spring-boot 有一定的了解.
java ## 核心特性 #### 开放源代码 全部源代码开放,可自由拓展功能,不再受制于人.... #### 统一设备接入,海量设备管理 ...灵活的规则模型配置,支持...4. [Project Reactor](https://projectreactor.io/) 响应式编程框架
本书涵盖了以下激动人心的功能:使用AngularJS,Bootstrap模板和jQuery构建UI了解Spring WebFlux框架以及它如何使用Reactor库与Elasticsearch交互以对数据进行索引,查询和聚合使用Spring Security和Spring ...
3、MqttBroker(支持集群化部署)基于Netty、Reactor3、Reactor-netty。 4、注册中心、配置中心选型Nacos,权限认证使用Redis。 5、流量控制框架选型Sentinel,分布式事务选型Seata。 6、时序数据
后端采用Spring Boot、Spring Cloud & Alibaba。MqttBroker(支持集群化部署)基于Netty、Reactor3、Reactor-netty。注册中心、配置中心选型Nacos,权限认证使用Redis。流量控制框架选型Sentinel,分布式事务选型Seata...
主要介绍spring5 webflux模块相关的内容,包括spring5的一些新特性,webflux的简单使用,异步常见的处理方式,webclient处理微服务的一些场景;Reactive-Streams 编程的实现规范,目前主流的一些响应式编程框架;...