`
raymond.chen
  • 浏览: 1418102 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

用Spring Cloud Stream构建消息驱动的微服务

 
阅读更多

Spring Cloud Stream 是一个用于构建“基于事件驱动的、与共享消息系统相连接的高度可扩展微服务”的框架,并提供了许多抽象和原语,以简化Spring生态系统消息驱动应用程序的开发。

 

核心概念

    Spring Cloud Stream的应用程序模型

          应用程序通过inputs或者outputs来与Binder交互,其通过配置来绑定,Binder负责与中间件交互。

     Binder抽象

          提供与外部消息中间件集成的组件。

          目前只提供了RabbitMQ和Kafka的Binder实现。

          通过使用它所提供的扩展API来实现其他中间件的Binder。

     持久的发布-订阅模型支持

          消息通信方式遵循发布-订阅模式。

     消费者组支持

          当一个应用程序不同实例放置在一个具有竞争关系的消费组中,组里面的实例中只有一个能够消费消息。

          消费者类型:

                 Message-driven (消息驱动型,有时简称为异步)

                 Polled (轮询型,有时简称为同步)

     分区支持

           分区的作用就是为了确保具有共同特征标识的数据由同一个消费者实例进行处理。

     可拔插的Binder API

 

Spring Cloud Stream 提供了三个绑定消息通道的默认实现

       Sink:通过指定消费消息的目标来标识消息使用者的约定。

       Source:与Sink相反,用于标识消息生产者的约定。

       Processor:集成了Sink和Source的作用,标识消息生产者和使用者。

 

       也可以自定义消息通道:

public interface OrderOutputChannel {
	String OUTPUT = "output";
	
	@Output(OrderOutputChannel.OUTPUT)
	MessageChannel output();
}

public interface OrderInputChannel {
	String INPUT = "input";
	
	@Input(OrderInputChannel.INPUT)
	SubscribableChannel input();
}

 

创建消息生产者工程

     pom.xml的关键配置:

<parent>
  	<groupId>org.springframework.boot</groupId>
  	<artifactId>spring-boot-starter-parent</artifactId>
  	<version>2.0.7.RELEASE</version>
  	<relativePath/>
  </parent>

  <dependencies> 
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
  	<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  </dependencies>
  
  <dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Finchley.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
  </dependencyManagement>

 

    application.yml文件的配置信息:

server:
  port: 5502
  
spring:
  application:
    name: service-stream-sender
  rabbitmq:
    host: 192.168.134.134
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        output: #通道名
          destination: order  #目的地
          content-type: application/json  #消息格式
          group: default  #消费组名

 

    启动类:

@SpringBootApplication
@RestController
@EnableBinding(OrderOutputChannel.class) //启用与消息通道的绑定
public class Main {
	/**
	 * 此处使用自定义的消息通道
	 */
	@Autowired
	private OrderOutputChannel outputChannel;
	
	public static void main(String[] args) {
		SpringApplication.run(Main.class, args);
	}
	
	@GetMapping("/index")
	public String index(){
		//将消息通过channel发送到目的地
		User user = new User("cjm", "123");
		Message<User> message = MessageBuilder.withPayload(user).build();
		outputChannel.output().send(message); //Bean对象会转成json字符串存储到目的地
		
		return "service stream sender";
	}
}

 

 创建消息消费者工程

     pom.xml关键配置:

           参考消息生产者工程。

 

     application.yml文件的配置信息:

server:
  port: 5501
  
spring:
  application:
    name: service-stream-receiver
  rabbitmq:
    host: 192.168.134.134
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input: #通道名
          destination: order  #目的地
          content-type: application/json  #消息格式
          group: default  #消费组名,添加group后队列就是持久化的了

 

    启动类:

@SpringBootApplication
@RestController
@EnableBinding({OrderInputChannel.class}) //启用与消息通道的绑定
public class Main {
	private String message = "";
	
	public static void main(String[] args) {
		SpringApplication.run(Main.class, args);
	}
	
	@GetMapping("/index")
	public String index(){
		return "service stream receiver: " + message;
	}
	
	/**
	 * 监听指定通道,通过该通道接收指定目的地的消息
	 */
	@StreamListener(OrderInputChannel.INPUT)
    public void receive(String payload) {
		message = payload;
        System.out.println("Received1: " + payload);
    }
	
	/**
	 * 将json格式的消息转成User对象
	 */
	@StreamListener(OrderInputChannel.INPUT)
    public void receive2(User user) {
		System.out.println(user.getClass().getName());
        System.out.println("usernaem=" + user.getUsername() + ", password=" + user.getPassword());
    }
}

 



 

  • 大小: 18.5 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics