`

rabbitmq 使用

    博客分类:
  • java
阅读更多

API模块接收请求,推送到消息队列

router模块消费消息,分发到各个模块

每个模块消费消息,在推回API模块,因为api模块需要知道最终执行结果

 

 

API模块配置:

spring:

  cloud:

    stream:

      bindings:

        outbound-agent-state-list.destination: outbound.agent-state-list   #生产

        agent-state-list-reply-channel:                                    #消费回调回来的消息

          destination: outbound.agent-state-list-reply

          group: ${nodeNo:0}

          durableSubscription: false

          consumer.maxPriority: 10

      rabbit.bindings:

        outbound-agent-state-list.producer.routing-key-expression: '''router'''

        agent-state-list-reply-channel.consumer.durableSubscription: false

 

@Component

public interface OutboundOutputChannels {

String OUTBOUND_AGENT_STATE_LIST = "outbound-agent-state-list";

 

@Output(value = OUTBOUND_AGENT_STATE_LIST)

    MessageChannel agentStateListOutput();

}

 

@Component

public interface OutboundInputChannels {

 

String OUTBOUND_AGENT_STATE_LIST_REPLY = "agent-state-list-reply-channel";

 

@Input(value = OUTBOUND_AGENT_STATE_LIST_REPLY)

    SubscribableChannel agentStateListReply();

}

 

接收回调消息并出来业务逻辑

@Slf4j

@RequiredArgsConstructor

@EnableBinding(OutboundInputChannels.class)

public class OutboundReplyMonitor extends DestroyableMonitor {

 

@StreamListener(OutboundInputChannels.OUTBOUND_AGENT_STATE_LIST_REPLY)

    public void agentStateListReply(AgentStateListReplyDTO payload) {

        countUp();

        agentStateService.processAgentStateListReply(payload);

        countDown();

    }

}

 

API接口入口

@Slf4j

@RestController

@RequiredArgsConstructor

@RequestMapping("/api")

@Api(value = "外呼接口", tags = "精准智能请求人工及人工接起")

public class AgentStateController {

 

    private final AgentStateService agentStateService;

 

    /**

     * 5.9 精准智能请求人工及人工接起

     * 调用商路通服务,获取所有坐席首页数据

     *

     * @param agentStateRequestDTO 请求参数

     * @return 返回ResponseServer

     */

    @ApiOperation(value = "用商路通服务,获取所有坐席首页数据", httpMethod = "POST")

    @PostMapping("/agentStateList")

    public ApiResponse agentStateList(@Validated @RequestBody AgentStateListRequestDTO agentStateRequestDTO, @RequestHeader(value = WebConstant.PRIORITY, required = false, defaultValue = "2") Integer priority) throws Exception {

        log.info("请求:调用商路通服务,获取所有坐席首页数据,param:{}", agentStateRequestDTO.toString());

        return agentStateService.agentStateList(agentStateRequestDTO, priority);

    }

 

}

 

API接收请求后,数据校验,发送请求到消息队列,并等待消息的响应

@Slf4j

@Service

@RequiredArgsConstructor

public class AgentStateService {

 

    private final MessageService messageService;

 

    private final CallSupplierTypeRelationService callSupplierTypeRelationService;

 

    private final CallSupplierService callSupplierService;

 

    private final CallTypeService callTypeService;

 

    protected final ApplicationProperties properties;

 

    private final Map<String, CompletableFuture<ApiResponse>> queryFutureMap = new ConcurrentHashMap<>();

 

    public ApiResponse agentStateList(AgentStateListRequestDTO queue, Integer priority) throws Exception {

        long startTime = System.currentTimeMillis();

 

        String businessId = SltBusinessEnum.getNameByCallType(queue.getCallType());

        if (StringUtils.isEmpty(businessId)) {

            throw new BusinessException("商路通不支持的外呼类型:" + queue.getCallType());

        }

 

        CallSupplierPO supplierPO = callSupplierService.findByCode(queue.getSupplier());

        if (supplierPO == null) {

            throw new BusinessException("供应商不存在:" + queue.getSupplier());

        }

 

        CallTypePO callTypePO = callTypeService.findByCode(queue.getCallType());

        if (callTypePO == null) {

            throw new BusinessException("外呼类型不存在:" + queue.getCallType());

        }

 

        CallSupplierTypeRelationPO callSupplierTypeRelationPo = callSupplierTypeRelationService.findBySupplierIdAndCallTypeId(supplierPO.getId(), callTypePO.getId());

        if (null == callSupplierTypeRelationPo) {

            throw new BusinessException("供应商外呼类型关系不存在");

        }

        queue.setCallType(callTypePO.getCode());

        queue.setSupplierCode(supplierPO.getCode());

        queue.setQueueId(StringCheckUtil.uuid());

        messageService.sendToAgentStateListChannel(queue, priority);

        CompletableFuture<ApiResponse> future = new CompletableFuture<>();

        queryFutureMap.put(queue.getQueueId(), future);

 

        long maxWaitMillis = properties.getMaxWaitMillis().toMillis();

        Duration duration = properties.getMaxWaitMillis().minusMillis(System.currentTimeMillis() - startTime);

        if (duration.isNegative() || duration.isZero()) {

            log.info("调用商路通服务,获取所有坐席首页数据,已超过最大等待时间{}ms,param:{}", properties.getMaxWaitMillis().toMillis(), queue.toString());

            throw new RequestTimeoutException(maxWaitMillis, queue.getQueueId());

        }

        log.debug("等待返回调用商路通服务,获取所有坐席首页数据请求结果,允许等待时间{}ms", duration.toMillis());

        try {

            return future.get(duration.toMillis(), TimeUnit.MILLISECONDS);

        } catch (TimeoutException e) {

            throw new RequestTimeoutException(maxWaitMillis, queue.getQueueId());

        } finally {

            queryFutureMap.remove(queue.getQueueId());

        }

    }

 

    public void processAgentStateListReply(AgentStateListReplyDTO payload) {

        String queueId = payload.getQueueId();

        log.info("API模块-接收-调用商路通服务,获取所有坐席首页数据结果,{}", payload.toString());

        if (queryFutureMap.containsKey(queueId)) {

            queryFutureMap.get(queueId).complete(ApiResponse.success(payload.getRows()));

        } else {

            log.debug("未找到queueId记录:{}", queueId);

        }

    }

}

 

 

停止服务前,检查是否有消息正在处理

@Slf4j

public class DestroyableMonitor {

 

    private static final Integer MAX_WAIT_COUNT = 20;

 

    private AtomicLong messageCount = new AtomicLong();

 

    protected Long countUp() {

        return messageCount.incrementAndGet();

    }

 

    protected Long countDown() {

        return messageCount.decrementAndGet();

    }

 

    @PreDestroy

    private void tearDown() throws InterruptedException {

        int waitCount = 0;

        while (messageCount.get() > 0 && waitCount++ < MAX_WAIT_COUNT) {

            log.info("正在关闭消息监听程序{},等待3秒[{}/{}]...", this.getClass().getCanonicalName(), waitCount, MAX_WAIT_COUNT);

            Thread.sleep(3000L);

        }

        if (messageCount.get() > 0) {

            log.warn("应用非安全关闭,当前仍有{}条正在处理的消息", messageCount.get());

        }

    }

}

 

 

 

router模块队列配置:

spring.cloud.stream:

  bindings:

    agent-state-list-input-channel:

      destination: outbound.agent-state-list

      group: router

      consumer:

        maxAttempts: 1

        concurrency: 10

    agent-state-list-slt-acv-channel.destination: outbound.agent-state-list

    agent-state-list-slt-ivr-channel.destination: outbound.agent-state-list

  rabbit.bindings:

    agent-state-list-input-channel.consumer.bindingRoutingKey: router

    agent-state-list-slt-acv-channel.producer.routing-key-expression: '''slt-acv'''

    agent-state-list-slt-ivr-channel.producer.routing-key-expression: '''slt-ivr'''

 

 

@Component

public interface RouterInputChannels {

String AGENT_STATE_LIST_INPUT_CHANNEL = "agent-state-list-input-channel";

 

@Input(value = AGENT_STATE_LIST_INPUT_CHANNEL)

    SubscribableChannel agentStateListInput();

}

 

@Component

public interface RouterOutputChannels {

String AGENT_STATE_LIST_SLT_ACV = "agent-state-list-slt-acv-channel";

    String AGENT_STATE_LIST_SLT_IVR = "agent-state-list-slt-ivr-channel";

 

    @Output(AGENT_STATE_LIST_SLT_ACV)

    MessageChannel agentStateListSltACR();

 

    @Output(AGENT_STATE_LIST_SLT_IVR)

    MessageChannel agentStateListSltIVR();

}

 

router 模块监听

@Slf4j

@Component

@EnableBinding(RouterInputChannels.class)

@RequiredArgsConstructor

public class AgentStatusListMonitor extends DestroyableMonitor {

    private final AgentStateListService agentStateListService;

 

    @StreamListener(RouterInputChannels.AGENT_STATE_LIST_INPUT_CHANNEL)

    public void onMessage(AgentStateListRequestDTO req) {

        countUp();

        try {

            log.info("AgentStatusListMonitor收到请求,调用商路通服务,获取所有坐席首页数据:查询参数:{}", req.toString());

            agentStateListService.process(req);

        } finally {

            countDown();

        }

    }

}

 

router业务处理和消息分发

@Slf4j

@Service

@RequiredArgsConstructor

public class AgentStateListService {

 

    private final AgentStateListMessageService agentStateListMessageService;

 

    private final RouterOutputChannels routerOutputChannels;

 

    public void process(AgentStateListRequestDTO payload) {

        agentStateListMessageService.send(payload);

 

        Message<AgentStateListRequestDTO> message = MessageBuilder.withPayload(payload).build();

        switch (payload.getSupplier()) {

            case OutboundSupplierConstant.SLT:

                switch (payload.getCallType()) {

                    case OutboundTypeConstant.IVR:

                        routerOutputChannels.agentStateListSltIVR().send(message);

                        return;

                    case OutboundTypeConstant.ACV:

                        routerOutputChannels.agentStateListSltACR().send(message);

                        return;

                    default:

                        routerOutputChannels.agentStateListSltACR().send(message);

                        routerOutputChannels.agentStateListSltIVR().send(message);

                        return;

                }

            default:

                log.error("调用商路通服务,获取所有坐席首页数据的消息转发失败,供应商不支持,[payload:{}]", payload);

        }

    }

}

 

分发后的模块

base模块

application-slt.yml

spring.cloud.stream:

  bindings:

    agent-state-list-reply-channel.destination: outbound.agent-state-list-reply

 

@Component

public interface SltBoundInputChannels {

String AGENT_STATE_LIST_CHANNEL = "agent-state-list-channel";

 

@Input(value = AGENT_STATE_LIST_CHANNEL)

    SubscribableChannel agentStateListSltAcvInput();

}

 

@Component

public interface SltBoundOutputChannels {

String AGENT_STATE_LIST_REPLY_CHANNEL = "agent-state-list-reply-channel";

 

@Output(AGENT_STATE_LIST_REPLY_CHANNEL)

    MessageChannel agentStateListReplyOutput();

}

 

base模块的monitor

@Slf4j

@Component

@RequiredArgsConstructor

@EnableBinding(SltBoundInputChannels.class)

public class SltAgentInfoMonitor extends DestroyableMonitor {

 

    private final SltAgentStateListService sltAgentStateListService;

 

    @StreamListener(SltBoundInputChannels.AGENT_STATE_LIST_CHANNEL)

    public void process(Message<AgentStateListRequestDTO> message) {

        log.info("外呼平台:SLT,代理模块收到消息,调用商路通服务,获取所有坐席首页数据, params: {}", message.getPayload());

        countUp();

        sltAgentStateListService.process(message.getPayload());

        countDown();

    }

}

 

service实现了一些slt请求的方法

@Slf4j

@Service

public class SltAgentStateListService extends AbstractSltRequestServiceTemplate<AgentStateListRequestDTO, AgentStateListResponseDTO> {

 

    private final SltBoundOutputChannels sltBoundOutputChannels;

 

    public SltAgentStateListService(ApplicationProperties properties, ObjectMapper objectMapper, RestTemplateService restTemplateService, SltBoundOutputChannels sltBoundOutputChannels) {

        super(properties, objectMapper, restTemplateService);

        this.sltBoundOutputChannels = sltBoundOutputChannels;

    }

 

    @Override

    protected void handleException(Exception exception, AgentStateListRequestDTO payload) {

        log.info("外呼平台:SLT,代理模块执行,调用商路通服务,获取所有坐席首页数据-指令,调用异常, params: {}", payload, exception);

    }

 

    @Override

    public RequestDTO generateRequest(AgentStateListRequestDTO payload) {

        log.info("外呼平台:SLT,代理模块执行,调用商路通服务,获取所有坐席首页数据-指令,创建请求,params:{}", payload);

        SupplierRequestInfoDTO requestInfo = properties.getRequestInfo(payload.getSupplierCode());

        RequestDTO request = new RequestDTO();

        request.setAction(OUTBOUND_AGENT_STATE_LIST_ACTION);

        request.setStartTime(payload.getStartTime());

        request.setEndTime(payload.getEndTime());

        request.setBusinessID(SltBusinessEnum.getNameByCallType(payload.getCallType()));

        request.setBaseUrl(requestInfo.getBaseUrl());

        request.setUrl(requestInfo.getBaseUrl().concat(properties.getRequestUrlMap().get(OUTBOUND_AGENT_STATE_LIST_SERVLET)));

        request.setLoginUser(requestInfo.getUser());

        request.setLoginPwd(requestInfo.getPassword());

        return request;

    }

 

 

    @Override

    public void handleResponse(AgentStateListResponseDTO responseDTO, AgentStateListRequestDTO payload) {

        if (SltResponseCode.RESPONSE_SUCCESS_STR.equals(responseDTO.getReturnCode())) {

            processReply(responseDTO, payload);

        } else {

            log.warn("外呼平台:SLT,代理模块执行,调用商路通服务,获取所有坐席首页数据-处理响应,获取失败,params:{}, result:{}", payload, responseDTO);

        }

    }

 

    @Override

    protected TypeReference<AgentStateListResponseDTO> instanceReference() {

        return new TypeReference<>() {

        };

    }

    

//消息写会API模块

    private void processReply(AgentStateListResponseDTO responseDTO, AgentStateListRequestDTO payload) {

        List<AgentStateListJsonTO> rowsJsonObject = responseDTO.getRows();

        List<AgentStateListDTO> rowsList = new ArrayList<>();

        for (AgentStateListJsonTO e : rowsJsonObject) {

            AgentStateListDTO agentStateListDTO = new AgentStateListDTO();

            BeanUtils.copyProperties(e, agentStateListDTO);

            rowsList.add(agentStateListDTO);

        }

        AgentStateListReplyDTO reply = new AgentStateListReplyDTO();

        reply.setReturnCode(responseDTO.getReturnCode());

        reply.setMessage(responseDTO.getReturnMessage());

        reply.setRows(rowsList);

        reply.setQueueId(payload.getQueueId());

        reply.setTotal(responseDTO.getTotal());

        sltBoundOutputChannels.agentStateListReplyOutput().send(MessageBuilder.withPayload(reply).build());

    }

 

}

 

@Slf4j

public abstract class AbstractSltRequestServiceTemplate<T extends BaseQueue, K extends ResponseDTO> extends SltSendRequestService<K>

        implements IEncapsulationRequestEntityInterface<T, RequestDTO>, IParsingResponseBodyInterface<T, K> {

 

 

    public AbstractSltRequestServiceTemplate(ApplicationProperties properties, ObjectMapper objectMapper, RestTemplateService restTemplateService) {

        super(properties, objectMapper, restTemplateService);

    }

 

    public void process(T payload) {

        try {

            RequestDTO requestDTO = generateRequest(payload);

            K response = sendRequest(requestDTO);

            handleResponse(response, payload);

        } catch (Exception e) {

            handleException(e, payload);

        }

    }

 

    /**

     * 处理异常

     *

     * @param exception 异常对象

     * @param payload   传递数据

     */

    protected abstract void handleException(Exception exception, T payload);

}

 

下面几个类是对SLT的所有请求的封装

@Slf4j

@Component

@RequiredArgsConstructor

public class SltSendRequestService<T extends ResponseDTO> implements ISendRequestInterface<RequestDTO, T> {

 

    protected final ApplicationProperties properties;

    protected final ObjectMapper objectMapper;

    protected final RestTemplateService restTemplateService;

 

    ThreadLocal<Boolean> checkLogin = new ThreadLocal<>();

 

    /**

     * 发送请求

     *

     * @param request 请求参数

     * @return 响应实体

     * @throws Exception 异常

     */

    @Override

    public T sendRequest(RequestDTO request) throws Exception {

        HttpHeaders headers = new HttpHeaders();

        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);

        String requestParam = "param=".concat(objectMapper.writeValueAsString(request));

        HttpEntity<String> requestEntity = new HttpEntity<>(requestParam, headers);

        long startTime = System.currentTimeMillis();

        ResponseEntity<String> responseEntity = null;

        log.info("请求:[{}],header: {}", requestParam, headers.toString());

        try {

            responseEntity = restTemplateService.post(request.getUrl(), requestEntity, properties.getRequestTimeoutMaximum(), String.class);

            Assert.hasText(responseEntity.getBody(), "接口调用异常:商路通接口响应体为空");

            if (responseEntity.getStatusCode().equals(HttpStatus.OK)) {

                T response = objectMapper.readValue(responseEntity.getBody(), instanceReference());

                return checkResponse(response, request);

            } else {

                log.error("商路通外呼请求:接口调用失败 [statusCode:{},body:{}]", responseEntity.getStatusCodeValue(), responseEntity.getBody());

                throw new BusinessException(CommonEnum.REQUEST_EXCEPTION.getCode(), "商路通外呼请求:接口调用失败");

            }

        } finally {

            checkLogin.remove();

            long elapsedTime = System.currentTimeMillis() - startTime;

            if (responseEntity == null) {

                log.error("商路通外呼请求详情:[请求地址:[{}],请求体:[{}],响应体:null]", request.getUrl(), requestParam);

            } else {

                log.info("商路通外呼请求详情:[请求地址:[{}],请求时间:{},请求体:[{}],响应体:[{}]]",

                        request.getUrl(), elapsedTime, requestParam, responseEntity.getBody());

            }

        }

    }

 

    protected TypeReference<T> instanceReference() {

        return new TypeReference<T>() {

        };

    }

 

    /**

     * 校验响应体是否合法

     *

     * @param response 响应体对象

     * @param request  请求体对象

     * @return 响应体对象

     * @throws Exception 异常

     */

    T checkResponse(T response, RequestDTO request) throws Exception {

        if (StringUtils.equals(SltResponseCode.RESPONSE_TIME_OUT_STR, response.getReturnCode()) &&

                StringUtils.equals("超时", response.getReturnMessage())) {

            log.info("商路通外呼请求:接口请求响应码超时:[{}],调用登录接口", response.toString());

            return login(request);

        } else {

            return response;

        }

    }

 

    /**

     * 商路通登陆操作

     *

     * @param request 请求参数对象

     * @return 响应体对象

     * @throws Exception 异常

     */

    private T login(RequestDTO request) throws Exception {

        checkLogin();

        HttpHeaders headers = new HttpHeaders();

        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);

        String loginParam = "userId=".concat(request.getLoginUser()).concat("&md5pwd=").concat(request.getLoginPwd());

        HttpEntity<String> requestEntity = new HttpEntity<>(loginParam, headers);

        ResponseEntity<String> loginResponseEntity = restTemplateService.post(request.getBaseUrl().concat(properties.getRequestUrlMap().get(OUTBOUND_LOGIN)), requestEntity);

        log.info("商路通外呼请求:登陆接口响应:[body:{}]", loginResponseEntity.getBody());

        Assert.hasText(loginResponseEntity.getBody(), "接口调用异常:商路通登录接口响应体为空");

        LoginResponseDTO loginResponse = objectMapper.readValue(loginResponseEntity.getBody(), LoginResponseDTO.class);

        if (SltResponseCode.RESPONSE_SUCCESS.equals(loginResponse.getReturnCode())) {

            return sendRequest(request);

        } else {

            throw new BusinessException(CommonEnum.REQUEST_EXCEPTION.getCode(),

                    "商路通外呼请求:登录接口调用失败:".concat(Objects.requireNonNull(loginResponseEntity.getBody())));

        }

    }

 

    /**

     * 校验请求而否二次登陆

     */

    private void checkLogin() {

        if (checkLogin.get() == null || !checkLogin.get()) {

            checkLogin.set(true);

        } else {

            throw new BusinessException(CommonEnum.REQUEST_EXCEPTION.getCode(), "操作失败: 商路通登录接口已经调用过");

        }

    }

 

}

 

public interface ISendRequestInterface<T extends BaseRequestEntity,K extends BaseResponseEntity> {

 

 

    /**

     * 发送请求

     *

     * @param request 请求实体

     * @return 响应实体

     * @throws Exception  异常

     */

    K sendRequest(T request) throws Exception;

 

}

 

public interface IEncapsulationRequestEntityInterface<Q extends BaseQueue,T extends BaseRequestEntity> {

 

    /**

     * 初始化请求对象

     *

     * @param payload 消息队列传递请求相关参数信息

     * @return 请求参数

     */

     T generateRequest(Q payload) throws Exception;

 

}

 

@Data

public class BaseRequestEntity {

}

 

 

下面是IVR模块的示例,ACV模块省略

配置队列:

spring.cloud.stream:

  bindings:

    agent-state-list-channel:

      destination: outbound.agent-state-list

      group: slt-ivr

      consumer:

        maxAttempts: 1

        concurrency: 10

  rabbit.bindings:

    agent-state-list-channel.consumer.bindingRoutingKey: slt-ivr

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics