`
gaozzsoft
  • 浏览: 413276 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类

多线程-java线程池使用

 
阅读更多

(一)

new Thread(new SolrDataHandleThread()).start();

new Thread(new ElasticSearchDataHandler()).start();

new Thread(new RedisDataHandler()).start();

 

private class SolrDataHandleThread implements Runnable {

        public void run() {

            log.info("in SolrDataHandleThread  run()--->begin");

            log.info("pageSize is--->" + pageSize);

            //调saf接口 根据商家id查询商家管理系统oracle数据 如商家的所属运营人员及部门的数据

            List<VenderAuthResultData> resultDataAllList = venderAuthResultDataDao.getAllVenderAuthResultData();

 

            if (resultDataAllList != null && resultDataAllList.size() > 0) {

                int totalCount = resultDataAllList.size();

                int totalPages = totalCount % pageSize > 0 ? (totalCount / pageSize + 1) : totalCount / pageSize;

                List<VenderAuthResultData> resultDataTempList = new ArrayList<VenderAuthResultData>();

 

                for (int page = 1; page <= totalPages; page++) {

                    int startIndex = (page - 1) * pageSize;

                    int endIndex = startIndex + pageSize;

                    if (totalCount > endIndex) {

                        resultDataTempList = resultDataAllList.subList(startIndex, endIndex);

                    } else {

                        resultDataTempList = resultDataAllList.subList(startIndex, totalCount);

                    }

 

                    Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();

                    if (resultDataTempList != null && resultDataTempList.size() > 0) {

                        for (VenderAuthResultData vdl : resultDataTempList) {

                            VenderInfoDTO venderInfoDTO = penaltyBlackWhiteClient.getVenderInfoDtoByVenderId(vdl.getVenderId());

                            if (venderInfoDTO != null) {

                                //构建刷solr数据

                                SolrInputDocument doc = buildSolrInputDocument(vdl, venderInfoDTO);

                                docs.add(doc);

                            }

                        }

                    }

 

                    try {

                        solrServer92.add(docs);

                        solrServer92.commit();

                    } catch (Exception e) {

                        log.error("批量同步所有mysql数据到92solr服务器-刷新全部solr数据遇到错误", e);

                    }

 

                    try {

                        solrServer96.add(docs);

                        solrServer96.commit();

                    } catch (Exception e) {

                        log.error("批量同步所有mysql数据到96solr服务器-刷新全部solr数据遇到错误", e);

                    }

                    log.info("deal with data end, page number is--->" + page);

                }

            }

            log.info("in SolrDataHandleThread  run()--->end");

        }

    }

 

 

 

private class ElasticSearchDataHandler implements Runnable {

        public void run() {

            log.info("Elastic search thread begin to process data !");

            boolean switchEs = ConfigCenterUtil.getSwitchConfig(SellerAuthStrategy.ES_SWITCH);

            log.info("The switch of writing to ElasticSearch is opened :" + switchEs);

            if(switchEs) {

                List<VenderAuthResultData> resultDataAllList = venderAuthResultDataDao.getAllVenderAuthResultData();

                if (resultDataAllList != null && resultDataAllList.size() > 0) {

                    log.info("Size of data is " + resultDataAllList.size());

                    for (VenderAuthResultData vdl : resultDataAllList) {

                        VenderInfoDTO venderInfoDTO = penaltyBlackWhiteClient.getVenderInfoDtoByVenderId(vdl.getVenderId());

                        if (venderInfoDTO != null) {

                            SellerAuthDocument sellerAuthDocument = buildSellerAuthDocument(vdl, venderInfoDTO);

                            indexElasticSearchDoc(sellerAuthDocument);

                        }

                    }

                }

            }

            log.info("Elastic search thread process data successfully !");

        }

    }

 

 

private class RedisDataHandler implements Runnable {

        public void run() {

            log.info("begin to synchronize the redis data in thread : RedisDataHandler, with page size: " + pageSize);

            Calendar calendar = getLegalCalendar();

            String[] monthArray = getMonths(calendar);

 

            for(String m : monthArray) {

                log.info("Begin to process sync data to redis server.with month : " + m);

                List<VenderAuthResultData> resultDataAllList = venderAuthResultDataDao.getCurrentYearMonthVenderAuthResultDataByOpTime(m);

                if(null != resultDataAllList && resultDataAllList.size() > 0) {

                    syncRedisDataProcess(resultDataAllList);

                }else{

                    log.info("The synchronous data is empty. the month is " + m);

                }

                log.info("Finish to process sync data to redis server.with month : " + m);

            }

            log.info("finish to synchronize the redis data in thread : RedisDataHandler, with page size: " + pageSize);

        }

    }

 

===============================================================================

(二)

注入线程池:

private ThreadPoolUtil threadPoolUtil;

public void setThreadPoolUtil(ThreadPoolUtil threadPoolUtil) {

        this.threadPoolUtil = threadPoolUtil;

}

 

bean配置:

<!-- 线程池util -->

<bean id="threadPoolUtil" class="com.jd.util.ThreadPoolUtil"/>

 

 

调用:

private void writeCustomsOpenStatusToRedis(Long customsId, int openStatus) {

        try {

            final int openStatusTmp = openStatus;

            final Long customsIdTmp = customsId;

            threadPoolUtil.getCachedThreadPool().execute(new Runnable() {

                public void run() {

                    if(customsIdTmp == null) {

                        return;

                    }

 

                    if (openStatusTmp == 1) {

                        redisUtils.set(RedisConstants.CUSTOMS_OPEN_STATUS_SURFIX + customsIdTmp, String.valueOf(openStatusTmp));

                    } else {

                        redisUtils.del(RedisConstants.CUSTOMS_OPEN_STATUS_SURFIX + customsIdTmp);

                    }

                }

            });

        }catch (Exception e) {

            log.error("构建海关启用停用redis数据失败",  e);

        }

    }

 

    private void writeCustomsVenderToRedis(Long customsId, Long venderId, int customsVenderStatus) {

        try {

            final Long venderIdTemp = venderId;

            final Long customsIdTmp = customsId;

            final int customsVenderStatusTmp = customsVenderStatus;

            threadPoolUtil.getCachedThreadPool().execute(new Runnable() {

                public void run() {

                    if (customsVenderStatusTmp == 1) {

                        redisUtils.hset(RedisConstants.CUSTOMS_VENDER_ID_SURFIX + venderIdTemp, String.valueOf(customsIdTmp), "1");

                    } else {

                        redisUtils.hdel(RedisConstants.CUSTOMS_VENDER_ID_SURFIX + venderIdTemp, String.valueOf(customsIdTmp));

                    }

                }

            });

        }catch (Exception e) {

            log.error("构建海关店铺添加或移除redis数据失败",  e);

        }

    }

 

 

 

线程池类ThreadPoolUtil.java:

 

package com.jd.util;

 

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

/**

 * 创建获取线程池的util

 * User: shaodong

 * Date: 13-1-5

 * Time: 下午12:46

 * To change this template use File | Settings | File Templates.

 */

public class ThreadPoolUtil {

    /**

     * 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,

     * 这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。

     # 如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

     */

    private final ExecutorService CACHED_THREAD_POOL = Executors.newCachedThreadPool();

 

    /**

     * 创建一个固定大小的线程池,最大50个,超过50个的时候,会阻塞等待

     */

    private final ExecutorService FIXED_THREAD_POOL = Executors.newFixedThreadPool(50);

 

    /**

     * 获得固定大小的线程池

     * @return

     */

    public ExecutorService getCachedThreadPool() {

        return CACHED_THREAD_POOL;

    }

 

//    public static void main(String args[]){

//        new ThreadPoolUtil().m();

//    }

//

//    void m() {

//       //创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。

//         ExecutorService pool = Executors.newFixedThreadPool(50);

//         //创建实现了runnable接口的对象

//        for(int i=1; i<=100; i++) {

//            pool.execute(new MyThread(i));

//        }

//        pool.shutdown();

//    }

//

//    class MyThread extends Thread{

//      int i = 0;

 

//      public MyThread(int _i){

//          System.out.println("create thread:"+_i);

//          i = _i;

//      }

//

//      @Override

//      public void run(){

//          try {

//              Thread.sleep(2000);

//          } catch (InterruptedException e) {

//              e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

//          }

//          System.out.println(Thread.currentThread().getName()+" is running... thread:"+i);

//      }

//    }

}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics