`

java多线程

阅读更多
1)
package com.sfpay.sypay.msp.process;

public interface ServiceHolder<T> {

T getService();
}



2)

package com.sfpay.sypay.msp.process.service.impl;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sfpay.sypay.msp.common.enums.StatusCode;
import com.sfpay.sypay.msp.dao.ICourierSfjSendDao;
import com.sfpay.sypay.msp.process.ServiceHolder;
import com.sfpay.sypay.order.service.IPrefBusinessService;
import com.sfpay.sypay.order.valueobject.dto.PrefBusinessRequest;
import com.sfpay.sypay.order.valueobject.dto.PrefBusinessResponse;

/**
* 非线支付,给收派员送顺丰金任务
*
* @author sfhq703
*
*/
public class PrefSenderRun implements Runnable {

private Logger logger = LoggerFactory.getLogger(PrefSenderRun.class);

private BlockingQueue<List<PrefBusinessRequest>> senderQueue = null;

private ServiceHolder<IPrefBusinessService> prefBusinessService;// 发送奖励对象

private ServiceHolder<ICourierSfjSendDao> courierSfjSendDao;

private AtomicBoolean isTodayTaskCompletePref = null;

private int runNum = 0;

private AtomicBoolean isExecutePref = null;

/**
*
* @param senderQueue
*            :下发队列
* @param voucherManagerService
*            :下游系统service
* @param courierSfjSendDao
*            :发送DAO
* @param isTodayTaskComplete
*            :调度是否完成标示
*/
public PrefSenderRun(BlockingQueue<List<PrefBusinessRequest>> senderQueue,
ServiceHolder<IPrefBusinessService> prefBusinessService,
ServiceHolder<ICourierSfjSendDao> courierSfjSendDao,
AtomicBoolean isTodayTaskCompletePref, int runNum,
AtomicBoolean isExecutePref) {
super();
this.senderQueue = senderQueue;
this.prefBusinessService = prefBusinessService;
this.courierSfjSendDao = courierSfjSendDao;
this.isTodayTaskCompletePref = isTodayTaskCompletePref;
this.runNum = runNum;
this.isExecutePref = isExecutePref;
}

@Override
public void run() {
while (true) {
try {
List<PrefBusinessRequest> sendParamList = senderQueue.take();
if (isExecutePref.get()) {
isExecutePref.set(false);
}
logger.info(String.format("%s开始发送如下的顺丰金数据%s", runNum,
sendParamList));

if (null != sendParamList && !sendParamList.isEmpty()) {
try {
// 发送接口
List<PrefBusinessResponse> prefBusinessResponses = prefBusinessService
.getService().sendPrefBusiness(sendParamList);

updateError(prefBusinessResponses);
updateFin(sendParamList);
} catch (Exception e) {
logger.error("执行非线支付,处理给收派员送顺丰金线程异常,runNum=" + runNum,
e);
}
}

if (isTodayTaskCompletePref.get() && senderQueue.isEmpty()) {
isExecutePref.set(true);
logger.info("********************非线支付,给收派员送顺丰金线任务执行完毕********************runNum="
+ runNum);
}
} catch (InterruptedException e) {
logger.error("非线支付,给收派员送顺丰金线程被中断,runNum=" + runNum, e);
} catch (Exception ex) {
logger.error("执行非线支付,给收派员送顺丰金线程异常,runNum=" + runNum, ex);
}
}
}

/**
* 批量更新成功
*
* @param prefBusinessRequests
*/
private void updateFin(List<PrefBusinessRequest> prefBusinessRequests) {
List<Long> list = new ArrayList<Long>();
if (prefBusinessRequests != null && !prefBusinessRequests.isEmpty()) {
for (PrefBusinessRequest prefBusinessRequest : prefBusinessRequests) {
if (StringUtils.isNotBlank(prefBusinessRequest.getMarketId())) {
list.add(Long.valueOf(prefBusinessRequest.getMarketId()));
}
}
}

workStatusUpdate(list, StatusCode.FIN.name());
}

/**
* 批量更新失败
*
* @param businessResponses
*/
private void updateError(List<PrefBusinessResponse> businessResponses) {
List<Long> list = new ArrayList<Long>();
if (businessResponses != null && !businessResponses.isEmpty()) {
for (PrefBusinessResponse prefBusinessResponse : businessResponses) {
if (StringUtils.isNotBlank(prefBusinessResponse.getMarketId())) {
list.add(Long.valueOf(prefBusinessResponse.getMarketId()));
}
}
}

workStatusUpdate(list, StatusCode.ERROR.name());
}

/**
* 更新赠送的明细
*/
private void workStatusUpdate(List<Long> list, String status) {
try {
logger.info(String.format("[%s]更新非线支付,给收派员送顺丰金状态[%s],更新数量:[%s]",
runNum, status, list.size()));
if (!list.isEmpty()) {
courierSfjSendDao.getService().updateCourierSfjSendStatus(
StatusCode.SENDING.name(), status, list);
}
} catch (Exception e) {
logger.error(
"courierSfjSendDao.updateCourierSfjSendStatus 修改非线支付,给收派员送顺丰金状态失败,runNum="
+ runNum, e);

}
}

/**
*
* 方法说明:<br>
* 获取本机ip
*
* @return
*/
public static String getHostAddr() {
String hostIp = "";
// 定义网络接口枚举类
Enumeration<NetworkInterface> allNetInterfaces;
try {
// 获得网络接口
allNetInterfaces = NetworkInterface.getNetworkInterfaces();
// 声明一个InetAddress类型ip地址
InetAddress ip = null;
// 遍历所有的网络接口
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface = allNetInterfaces.nextElement();
// 同样再定义网络地址枚举类
Enumeration<InetAddress> addresses = netInterface
.getInetAddresses();
while (addresses.hasMoreElements()) {
ip = addresses.nextElement();
// InetAddress类包括Inet4Address和Inet6Address
if (ip != null && (ip instanceof Inet4Address)) {
hostIp = ip.getHostAddress();
if ("127.0.0.1".equals(hostIp)) {
hostIp = "";
}
}
}
}
} catch (Exception e) {
}
if (StringUtils.isEmpty(hostIp)) {
// 如果无法获取本机Ip,给默认内网ip
hostIp = "192.168.0.1";
}
return hostIp;
}
}

3)
package com.sfpay.sypay.msp.process.service;


/**
* 方法说明:非线支付,短信发送任务
*
* @author sfhq703
*
*/
public interface IFxSmsSendTaskService {

/**
* 方法说明:方法说明:非线支付,短信发送任务
*
* @return
*/
public void presentPrefSend();

}

4)

package com.sfpay.sypay.msp.process.service.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.sfpay.framework.config.properties.Property;
import com.sfpay.sypay.common.enums.CcyType;
import com.sfpay.sypay.common.enums.TradeType;
import com.sfpay.sypay.msp.common.enums.StatusCode;
import com.sfpay.sypay.msp.dao.ICourierSfjSendDao;
import com.sfpay.sypay.msp.process.ServiceHolder;
import com.sfpay.sypay.msp.process.service.IFxPrefSendTaskService;
import com.sfpay.sypay.msp.valueobject.dto.CourierSfjSend;
import com.sfpay.sypay.order.service.IPrefBusinessService;
import com.sfpay.sypay.order.valueobject.dto.PrefBusinessRequest;

/**
* 非线支付,给收派员送顺丰金
*
* @author sfhq703
*
*/
@Service
public class PrefSendTaskServiceImpl implements IFxPrefSendTaskService {

private Logger logger = LoggerFactory
.getLogger(PrefSendTaskServiceImpl.class);

/**
* 字段说明:批次个数
*/
private static final int BATCH_COUNT = 800;
/**
* 调用订单系统单次发送条数
*/
private int one_send_size = 0;
/**
* 发送队列
*/
private BlockingQueue<List<PrefBusinessRequest>> senderQueue = null;

@Resource
private ICourierSfjSendDao courierSfjSendDao;

@Resource
private IPrefBusinessService prefBusinessService;

private AtomicBoolean isTodayTaskCompletePref = new AtomicBoolean(true);

private AtomicBoolean isExecutePref = new AtomicBoolean(true);

/**
* 非线支付,给收派员送顺丰金实现构造函数
*/
public PrefSendTaskServiceImpl() {
int sender_thread = Integer.parseInt(Property
.getProperty("MSP_PRESENT_SENDER_THREAD_CNT"));
one_send_size = Integer.parseInt(Property
.getProperty("MSP_PRESENT_ONE_SEND_SIZE"));
senderQueue = new LinkedBlockingQueue<List<PrefBusinessRequest>>(
Integer.parseInt(Property
.getProperty("MSP_PRESENT_DATA_QUEUE_SIZE")));
ExecutorService exec = Executors.newFixedThreadPool(sender_thread);// 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
for (int i = 0; i < sender_thread; i++) {
exec.submit(new PrefSenderRun(senderQueue,
new ServiceHolder<IPrefBusinessService>() {
public IPrefBusinessService getService() {
return prefBusinessService;
}
}, new ServiceHolder<ICourierSfjSendDao>() {
public ICourierSfjSendDao getService() {
return courierSfjSendDao;
}
}, isTodayTaskCompletePref, i, isExecutePref));
}
}

/**
* 方法说明:非线支付,给收派员送顺丰金任务实现
*
* @return
*/
@Override
public void presentPrefSend() {
if (!isExecutePref.get()) {
logger.info("非线支付,给收派员送顺丰金任务执行中………………………………");
if (isTodayTaskCompletePref.get() && senderQueue.isEmpty()) {
isExecutePref.set(true);
logger.info("********************非线支付,给收派员送顺丰金线任务执行列表以被线程取完,可继续执行本次个调度********************");
} else {
return;
}
}
isExecutePref.set(false);
isTodayTaskCompletePref.set(false);
int pgIndex = 0;
List<CourierSfjSend> oneBatchList = new ArrayList<CourierSfjSend>();

List<CourierSfjSend> allDataList = null;
try {
courierSfjSendDao.updateCourierSfjSendStatus(
StatusCode.SENDING.name(), StatusCode.INIT.name(), null);
allDataList = courierSfjSendDao.findCourierSfjSendList(pgIndex,
BATCH_COUNT);
} catch (Exception e) {
logger.info("非线支付,给收派员送顺丰金任务,初始化数据异常:", e);
}
if (allDataList == null || allDataList.isEmpty()) {
isExecutePref.set(true);
isTodayTaskCompletePref.set(true);
logger.info("非线支付,给收派员送顺丰金任务,没有需要发送的数据");
return;
}
while (allDataList != null && !allDataList.isEmpty()) {
try {
for (CourierSfjSend awardResult : allDataList) {
oneBatchList.add(awardResult);
// 每ONE_SEND_SIZE次调用一次订单接口
if (oneBatchList.size() > one_send_size) {
workOneJob(oneBatchList);
oneBatchList.clear();
}
}
// 没有凑足 ONE_SEND_SIZE,剩下的直接放入发送队列吧
if (!oneBatchList.isEmpty()) {
workOneJob(oneBatchList);
oneBatchList.clear();
}

} catch (Exception e) {
logger.error("非线支付,给收派员送顺丰金异常", e);
throw new RuntimeException(e);
}
try {

allDataList = courierSfjSendDao.findCourierSfjSendList(pgIndex,
BATCH_COUNT);
} catch (Exception e) {
logger.info("非线支付,给收派员送顺丰金任务,取数据异常:", e);
}
}
isTodayTaskCompletePref.set(true);
}

/**
* 执行赠送逻辑
*
* @param resultList
* @throws Exception
* @return 返回赠送失败id
*/
public void workOneJob(List<CourierSfjSend> resultList) throws Exception {
List<PrefBusinessRequest> sendParamList = new ArrayList<PrefBusinessRequest>();
workStatusUpdate(resultList);
try {
// 批次更新发放状态
// 构造请求报文,调用订单系统发放奖品
for (CourierSfjSend courierSfjSend : resultList) {
// TODO封装数据
PrefBusinessRequest prefBusiness = new PrefBusinessRequest();
prefBusiness.setAmount(courierSfjSend.getSfjinAmt());
// prefBusiness.setActivityId();
prefBusiness.setCcy(CcyType.SFP);
prefBusiness.setLoginName(courierSfjSend.getMobile());
prefBusiness.setMemberNo(courierSfjSend.getMemberNo());
prefBusiness.setMarketId(courierSfjSend.getId() + "");
prefBusiness.setMerchantNo(Property
.getProperty("FX_PREF_MERCHANT_NO"));
prefBusiness.setOutBusinessNo(courierSfjSend.getId() + "");
prefBusiness.setSendMerchantNo(Property
.getProperty("FX_SND_MERCHANT_NO"));
prefBusiness.setBusinessType("FX");
prefBusiness.setPrefType(TradeType.POINT_PREF);
// prefBusiness.setOrderAmont();
// prefBusiness.setCostEntity();
prefBusiness.setRemark("电子支付奖励");
// prefBusiness.setCostSubjectsNo();
// prefBusiness.setCostSubjectsName();
sendParamList.add(prefBusiness);

}
if (!sendParamList.isEmpty()) {
logger.info(String.format("非线支付,给收派员送顺丰金放入发送队列,当前队列长度%s",
senderQueue.size()));
senderQueue.put(sendParamList);
}
} catch (Exception e) {
logger.error("非线支付,给收派员送顺丰金异常", e);
}
}

/**
* 更新赠送的明细
*/
private void workStatusUpdate(List<CourierSfjSend> courierSfjSends) {
try {
List<Long> list = new ArrayList<Long>();
for (CourierSfjSend courierSfjSend : courierSfjSends) {
list.add(courierSfjSend.getId());
}
courierSfjSendDao.updateCourierSfjSendStatus(
StatusCode.INIT.name(), StatusCode.SENDING.name(), list);
} catch (Exception e) {
logger.error(
"courierSfjSendDao.updateCourierSfjSendStatus 修改非线支付,给收派员送顺丰金状态失败",
e);

}
}

}


  • msp.zip (123.8 KB)
  • 下载次数: 0
分享到:
评论

相关推荐

    Java多线程设计模式上传文件

    Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式...

    java多线程读取文件

    Java多线程读大文件 java多线程写文件:多线程往队列中写入数据

    java多线程ppt

    java多线程PPT 多线程基本概念 创建线程的方式 线程的挂起与唤醒 多线程问题

    java 多线程操作数据库

    一个java 多线程操作数据库应用程序!!!

    java多线程经典案例

    java多线程经典案例,线程同步、线程通信、线程阻塞等经典案例

    Java多线程编程技术

    《Java多线程编程核心技术》建议猿友们读两遍,因为其写得没有那么抽象,第一遍有些概念不是很理解,可以先跳过并记录起来,第一遍阅读的目的主要是了解整个架构。第二遍再慢慢品味,并贯穿全部是指点来思考,并将...

    Java多线程编程实战指南(核心篇)

    Java多线程编程实战指南(核心篇) 高清pdf带目录 随着现代处理器的生产工艺从提升处理器主频频率转向多核化,即在一块芯片上集成多个处理器内核(Core),多核处理器(Multicore Processor)离我们越来越近了――如今...

    Java多线程知识点总结

    该文档总结了Java多线程相关的知识点,分享给大家,简单易懂!

    java多线程的讲解和实战

    详细的讲解了java多线程的原理,并配有代码进行实战,适合java初学者和想对多线程有进一步了解的人。

    java多线程通信图解

    一张图方便理解和掌握java 多线程之间通信的实质 java 多线程 其实就是每个线程都拥有自己的内存空间,多线程之间的通信,比例A线程修改了主内存(main方法的线程)变量,需要把A线程修改的结果同步到主线程中,...

    java多线程处理数据库数据

    java多线程处理数据库数据,使用并发包,无框架,可批量处数据库数据,进行增删改。。等等操作。

    java多线程,对多线程,线程池进行封装,方便使用

    java多线程,对多线程,线程池进行封装,方便使用

    Java多线程编程经验

    现在的操作系统是多任务操作系统。多线程是实现多任务的一种方式。 线程是指进程中的一个执行流程,一个进程中可以运行多个线程。...本文档提供Java多线程编程经验,方便广大Java爱好者研究学习Java多线程

    java多线程处理大数据

    java多线程处理大数据,可根据配置的线程数,任务去调度处理

    java多线程并发

    java多线程并发的在新窗口

    Java多线程机制(讲述java里面与多线程有关的函数)

    Java多线程机制 9.1 Java中的线程 9.2 Thread的子类创建线程 9.3 使用Runable接口 9.4 线程的常用方法 9.5 GUI线程 9.6 线程同步 9.7 在同步方法中使用wait()、notify 和notifyAll()方法 9.8 挂起、恢复和终止线程 ...

    java多线程核心技术

    资深Java专家10年经验总结,全程案例式讲解,首本全面介绍Java多线程编程技术的专著 结合大量实例,全面讲解Java多线程编程中的并发访问、线程间通信、锁等最难突破的核心技术与应用实践 Java多线程无处不在,如...

    java多线程实现大批量数据导入源码

    java多线程实现大批量数据切分成指定份数的数据,然后多线程处理入库或者导出,线程的个数和每份数据的数量都可以控制

    java多线程查询数据库

    java多线程并发查询数据库,使用线程池控制分页,并发查询。

    java多线程模拟队列实现排队叫号

    java多线程模拟队列实现排队叫号,多线程模拟排队叫号取号 java多线程模拟队列实现排队叫号,多线程模拟排队叫号取号

Global site tag (gtag.js) - Google Analytics