`
gaozzsoft
  • 浏览: 413254 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类

JAVA多线程使用研究

    博客分类:
  • J2EE
 
阅读更多

Part One: 一个线程处理

if (CollectionUtils.isNotEmpty(cqcAttachmentDataList)) {

long startTimes = System.currentTimeMillis();

final CountDownLatch latchOnlyOne = new CountDownLatch(1);

new Thread(new CqcAttachmentDataWorker(latchOnlyOne,"车前程附件数据处理-线程-only", cqcAttachmentDataList)).start();

 

try {

latchOnlyOne.await();

long endTimes = System.currentTimeMillis();

logger.info("所有线程(1个线程)执行完毕:" + ((endTimes - startTimes)/1000) + "秒");

} catch (InterruptedException e) {

logger.error("线程(1个线程)处理异常::::",e);

}

 

}

 

//数据处理线程worker

public class CqcAttachmentDataWorker implements Runnable{

List<TCdCqcAttachmentData> cqcAttachmentDataSubList = null;

String name = "";

CountDownLatch latch;

public CqcAttachmentDataWorker(CountDownLatch latch,String name, List<TCdCqcAttachmentData> cqcAttachmentDataListTemp){

this.name = name;

this.latch = latch;

this.cqcAttachmentDataSubList = cqcAttachmentDataListTemp;

}

 

@Override

public void run() {

logger.info(name + "正在执行...");

try {

//2.遍历处理

if (CollectionUtils.isNotEmpty(cqcAttachmentDataSubList)) {

                                      //do it

                                }

                          } catch (Exception e) {

logger.error("车前程-数据处理run异常::::", e);

  } finally {

latch.countDown();

 }

}

}

 

Part Two: 多线程数据处理1-初级

 

List<TCdCqcAttachmentData> cqcAttachmentDataList = tCdCqcAttachmentDataService.findTCdCqcAttachmentDataAllList();

if (CollectionUtils.isNotEmpty(cqcAttachmentDataList)) {

long startTimes = System.currentTimeMillis();

int threadCount = cqcWorkerThreadCount;

int total = cqcAttachmentDataList.size();

 

if (total < threadCount) {

final CountDownLatch latchOnlyOne = new CountDownLatch(1);

new Thread(new CqcAttachmentDataWorker(latchOnlyOne,"车前程附件数据处理-线程-only", cqcAttachmentDataList)).start();

 

try {

latchOnlyOne.await();

long endTimes = System.currentTimeMillis();

logger.info("所有线程(1个线程)执行完毕:" + ((endTimes - startTimes)/1000) + "秒");

} catch (InterruptedException e) {

logger.error("线程(1个线程)处理异常::::",e);

}

} else {

int every = total / threadCount;

final CountDownLatch latch = new CountDownLatch(threadCount);

List<TCdCqcAttachmentData> cqcAttachmentDataSubList = null;

int divideRemainNumber = total % every; //取模后的余数

int divideRemainNumberTmep = total % threadCount; //取模后的余数

 

for (int i = 1; i <= threadCount; i++) {

int startIndex = (i - 1) * every;

int endIndex = startIndex + every;

if (total >= endIndex) {

if(divideRemainNumber > 0){

if (i == threadCount)

{

cqcAttachmentDataSubList = cqcAttachmentDataList.subList(startIndex, total);

} else {

cqcAttachmentDataSubList = cqcAttachmentDataList.subList(startIndex, startIndex + every);

}

} else {

if (divideRemainNumberTmep > 0) {

if (i == threadCount)

{

cqcAttachmentDataSubList = cqcAttachmentDataList.subList(startIndex, total);

} else {

cqcAttachmentDataSubList = cqcAttachmentDataList.subList(startIndex, startIndex + every);

}

} else {

cqcAttachmentDataSubList = cqcAttachmentDataList.subList(startIndex, startIndex + every);

}

}

}

new Thread(new CqcAttachmentDataWorker(latch,"车前程附件数据处理-线程" + i, cqcAttachmentDataSubList)).start();

}

 

try {

latch.await();

long endTimes = System.currentTimeMillis();

logger.info("所有线程执行完毕:" + ((endTimes - startTimes)/1000) + "秒");

} catch (InterruptedException e) {

logger.error("线程处理异常::::",e);

}

}

 

}

 

//数据处理线程worker

public class CqcAttachmentDataWorker implements Runnable{

List<TCdCqcAttachmentData> cqcAttachmentDataSubList = null;

String name = "";

CountDownLatch latch;

public CqcAttachmentDataWorker(CountDownLatch latch,String name, List<TCdCqcAttachmentData> cqcAttachmentDataListTemp){

this.name = name;

this.latch = latch;

this.cqcAttachmentDataSubList = cqcAttachmentDataListTemp;

}

 

@Override

public void run() {

logger.info(name + "正在执行...");

//2.遍历处理

                      if (CollectionUtils.isNotEmpty(cqcAttachmentDataSubList)) {

                        }

                       latch.countDown();

      }

 }

         

Part Three: 多线程数据处理2-高级-线程池实现

代码实现:

// 多线程处理

private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(50000), new ThreadPoolExecutor.CallerRunsPolicy());

 

if (CollectionUtils.isNotEmpty(ycAndyzTCdBusiExtendlist)) {

LinkedBlockingQueue<TCdBusiExtend> queue = new LinkedBlockingQueue<TCdBusiExtend>();

queue.addAll(ycAndyzTCdBusiExtendlist);

int size = queue.size();

CountDownLatch countDownLatch = new CountDownLatch(size);

for (int i = 0; i < size; i++) {

threadPool.execute(new CqcAttachmentDataHandler(queue, countDownLatch, cqcHandlerServiceFactory, cqcAttachmentDomain, cqcAttachmentFolder));

}

countDownLatch.await();

long poolEndTimes = System.currentTimeMillis();

logger.info("线程池对目标数据处理执行完毕:" + ((poolEndTimes - targetBeginTimes)/1000) + "秒");

long endTimes = System.currentTimeMillis();

logger.info("全部执行完毕:" + ((endTimes - startTimes)/1000) + "秒");

}

 

worker:

 

 public class CqcAttachmentDataHandler implements Runnable {

    private static final AppLogger logger = new AppLogger(CqcAttachmentDataHandler.class);

    private LinkedBlockingQueue<TCdBusiExtend> queue;

    private CountDownLatch countDownLatch;

    private String cqcAttachmentDomain;

    private String cqcAttachmentFolder;

 

    private CqcHandlerServiceFactory cqcHandlerServiceFactory;

 

    public CqcAttachmentDataHandler(LinkedBlockingQueue<TCdBusiExtend> queue, CountDownLatch countDownLatch,CqcHandlerServiceFactory cqcHandlerServiceFactoryTemp, String cqcAttachmentDomainTemp, String cqcAttachmentFolderTemp) {

        this.queue = queue;

        this.countDownLatch = countDownLatch;

        this.cqcHandlerServiceFactory=cqcHandlerServiceFactoryTemp;

        this.cqcAttachmentDomain = cqcAttachmentDomainTemp;

        this.cqcAttachmentFolder = cqcAttachmentFolderTemp;

    }

 

    @Override

    public void run() {

        try {

            TCdBusiExtend tCdBusiExtend = queue.poll();

            if(tCdBusiExtend != null) {

    //do it

    }

        } catch (Exception e) {

            logger.error("CqcAttachmentDataHandler.run error", e);

        } finally {

            countDownLatch.countDown();

        }

    }

 }

 

工厂模式:

 

@Service

public class CqcHandlerServiceFactory {

    @Resource

    private ITCdCqcAttachmentDataService tCdCqcAttachmentDataService;

 

    @Autowired

    private TCdBusiExtendMapper tCdBusiExtendMapper;

 

     public ITCdCqcAttachmentDataService gettCdCqcAttachmentDataService() {

        return tCdCqcAttachmentDataService;

    }

 

    public void settCdCqcAttachmentDataService(ITCdCqcAttachmentDataService tCdCqcAttachmentDataService) {

        this.tCdCqcAttachmentDataService = tCdCqcAttachmentDataService;

    }

 

    public TCdBusiExtendMapper gettCdBusiExtendMapper() {

        return tCdBusiExtendMapper;

    }

 

    public void settCdBusiExtendMapper(TCdBusiExtendMapper tCdBusiExtendMapper) {

        this.tCdBusiExtendMapper = tCdBusiExtendMapper;

    }

}

 

==========================================================

Part Four:  ThreadPoolTaskExecutor的使用

 

<!--核心线程池-->
<beanid="taskExecutor" name="taskExecutor_passive"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <propertyname="corePoolSize" value="40"/>
    <propertyname="maxPoolSize" value="100"/>
    <propertyname="queueCapacity" value="3072"/>
    <propertyname="rejectedExecutionHandler">
        <beanclass="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /><!--调用者运行-->
</property>
</bean>

/**
 * 线程池
 */
@Resource(name = "taskExecutor")
private ThreadPoolTaskExecutor cartPropertyDataGetterTaskExecutor;

public ThreadPoolTaskExecutor getTaskExecutor() {
   return cartPropertyDataGetterTaskExecutor;
}

public void setTaskExecutor(ThreadPoolTaskExecutor taskExecutor) {
   this.cartPropertyDataGetterTaskExecutor = taskExecutor;
}

// 命令立减查询
if (result.isActivateKeyt() && currentCart.getKeyt() != null && StringUtil.isNotBlank(currentCart.getKeyt().getKeytStr())) {
   keytFuture = this.getTaskExecutor().submit(new KeytGetter(keytBS, user, siteId, currentCart.getKeyt().getKeytStr(), currentCart, commerceItemList, priceInfo, shippingGroups));
}

if (blueCouponsFuture != null) {
   final BlueCouponSummary blueCouponSummary = buildBlueCouponSummary(blueCouponsFuture.get());
   result.setBlueCouponsAmount(blueCouponSummary.getUsedAmount()); // 蓝券的使用金额
result.setBlueCouponsNum(blueCouponSummary.getUsedNum()); // 蓝券的使用张数
result.setBlueCoupons(blueCouponSummary.getList());
}
if (shopCouponsFuture != null) {
   final ShopCouponSummary shopCouponSummary = buildShopCouponSummary(shopCouponsFuture.get());
   result.setShopCouponsAmount(shopCouponSummary.getUsedAmount()); // 店铺券使用金额
result.setShopCouponsNum(shopCouponSummary.getUsedNum()); // 店铺券使用张数
result.setQueryShopCoupons(shopCouponSummary.getList());
}
if (redCouponsFuture != null) {
   final RedCouponSummary redCouponSummary = buildRedCouponSummary(redCouponsFuture.get());
   result.setRedCouponsAmount(redCouponSummary.getUsedAmount()); // 红券使用的金额
result.setRedCouponsNum(redCouponSummary.getUsedNum()); // 红券使用的张数
result.setRedCoupons(redCouponSummary.getList());
}
if (prepaidCardFuture != null) {
   final PrepaidCardSummary prepaidCardSummary = buildPrepaidCardSummary(prepaidCardFuture.get());
   result.setPrepaidCardsAmount(prepaidCardSummary.getUsedAmount()); // 预付卡使用的金额
result.setPrepaidCardsNum(prepaidCardSummary.getUsedNum()); // 预付卡使用的张数
result.setPrepaidCards(prepaidCardSummary.getList());
}
VirtualAccountResult virtualAccountResult = null;
if (virtualAccountResultFuture != null) {
   virtualAccountResult = virtualAccountResultFuture.get();
   result.setVirtualAccountResult(virtualAccountResult);
}
if ((containsHaiWaiGou || !shouldQueryVirtualAccount(siteId, commerceItemList)) && virtualAccountResult != null) {
   virtualAccountResult.setPayAmount(DOUBLE_ZERO); // 可用金额为0
}

if (blueCouponsFuture != null) {
   final BlueCouponSummary blueCouponSummary = buildBlueCouponSummary(blueCouponsFuture.get());
   result.setBlueCouponsAmount(blueCouponSummary.getUsedAmount()); // 蓝券的使用金额
result.setBlueCouponsNum(blueCouponSummary.getUsedNum()); // 蓝券的使用张数
result.setBlueCoupons(blueCouponSummary.getList());
}
if (shopCouponsFuture != null) {
   final ShopCouponSummary shopCouponSummary = buildShopCouponSummary(shopCouponsFuture.get());
   result.setShopCouponsAmount(shopCouponSummary.getUsedAmount()); // 店铺券使用金额
result.setShopCouponsNum(shopCouponSummary.getUsedNum()); // 店铺券使用张数
result.setQueryShopCoupons(shopCouponSummary.getList());
}
if (redCouponsFuture != null) {
   final RedCouponSummary redCouponSummary = buildRedCouponSummary(redCouponsFuture.get());
   result.setRedCouponsAmount(redCouponSummary.getUsedAmount()); // 红券使用的金额
result.setRedCouponsNum(redCouponSummary.getUsedNum()); // 红券使用的张数
result.setRedCoupons(redCouponSummary.getList());
}
if (prepaidCardFuture != null) {
   final PrepaidCardSummary prepaidCardSummary = buildPrepaidCardSummary(prepaidCardFuture.get());
   result.setPrepaidCardsAmount(prepaidCardSummary.getUsedAmount()); // 预付卡使用的金额
result.setPrepaidCardsNum(prepaidCardSummary.getUsedNum()); // 预付卡使用的张数
result.setPrepaidCards(prepaidCardSummary.getList());
}
VirtualAccountResult virtualAccountResult = null;
if (virtualAccountResultFuture != null) {
   virtualAccountResult = virtualAccountResultFuture.get();
   result.setVirtualAccountResult(virtualAccountResult);
}
if ((containsHaiWaiGou || !shouldQueryVirtualAccount(siteId, commerceItemList)) && virtualAccountResult != null) {
   virtualAccountResult.setPayAmount(DOUBLE_ZERO); // 可用金额为0
}
if (storePointFuture != null) {
   result.setStorePoint(storePointFuture.get());
}
if (gomedoFuture != null) {
   result.setGomedo(gomedoFuture.get());
}
if (keytFuture != null) {
   result.setKeyt(keytFuture.get());
}

 

package com.gome.pangu.trading.cart.business.taskflow.listcart.task;

import java.util.List;
import java.util.concurrent.Callable;

import com.gome.framework.bleach.Bleacher;
import com.gome.framework.logging.Logger;
import com.gome.pangu.pricing.client.dto.result.PriceInfoResultRDTO;
import com.gome.pangu.trading.bo.Cart;
import com.gome.pangu.trading.bo.CommerceItem;
import com.gome.pangu.trading.bo.ShippingGroup;
import com.gome.pangu.trading.bo.User;
import com.gome.pangu.trading.keyt.business.KeytBS;
import com.gome.pangu.trading.keyt.client.dto.result.Keyt;

/**
 * 命令立减查询服务Callable实现类
 * Created by huangwenfeng on 2016/9/28.
 */
public class KeytGetter implements Callable<Keyt> {

   private static final Logger LOGGER = Bleacher.getLogger(KeytGetter.class);
   private final KeytBS keytBS;
   private final User user;
   private final String siteId;
   private final String keytStr;
   private final Cart currentCart;
   private final List<CommerceItem> commerceItemList;
   private final PriceInfoResultRDTO priceInfoResultRDTO;
   private final List<ShippingGroup> shippingGroups;

   public KeytGetter(KeytBS keytBS, User user, String siteId, String keytStr, Cart currentCart, List<CommerceItem> commerceItemList, PriceInfoResultRDTO priceInfoResultRDTO,
         List<ShippingGroup> shippingGroups) {
      this.keytBS = keytBS;
      this.user = user;
      this.siteId = siteId;
      this.keytStr = keytStr;
      this.currentCart = currentCart;
      this.commerceItemList = commerceItemList;
      this.priceInfoResultRDTO = priceInfoResultRDTO;
      this.shippingGroups = shippingGroups;
   }

   @Override
public Keyt call() throws Exception {

      try {
         Keyt keyt = keytBS.queryKeyt(user, siteId, keytStr, currentCart, commerceItemList, priceInfoResultRDTO, shippingGroups);
         return keyt;
      } catch (Exception e) {
         LOGGER.error("调用命令立减查询服务失败,错误信息:", e);
         LOGGER.error("调用命令立减查询服务失败,用户ID:{},站点ID:{},口令串:{},Cart:{},商品列表:{},价格信息:{},配送单:{}", user.getUserId(), siteId, keytStr, currentCart, commerceItemList, priceInfoResultRDTO, shippingGroups);
         return null;
      }
   }
}


/**
 * 口令立减
 * Created by huangwenfeng on 2016/9/26.
 */
public class Keyt extends BaseModel {

   private static final long serialVersionUID = 4824030816608236547L;

   private String keytStr; // 命令串
private double remainingAmount; // 剩余支付金额(OMS备用)
private double amount = 0; // 命令立减兑换到的金额
private int status = 0; // 向前端返回状态错误码 0-初始状态(没有用命令立减) 1-正常(可用命令立减)
public double getAmount() {
      return amount;
   }

   public void setAmount(double amount) {
      this.amount = amount;
   }

   public int getStatus() {
      return status;
   }

   public void setStatus(int status) {
      this.status = status;
   }

   public String getKeytStr() {
      return keytStr;
   }

   public void setKeytStr(String keytStr) {
      this.keytStr = keytStr;
   }

   public double getRemainingAmount() {
      return remainingAmount;
   }

   public void setRemainingAmount(double remainingAmount) {
      this.remainingAmount = remainingAmount;
   }
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics