(一)
new Thread(new SolrDataHandleThread()).start();
new Thread(new ElasticSearchDataHandler()).start();
new Thread(new RedisDataHandler()).start();
private class SolrDataHandleThread implements Runnable {
public void run() {
log.info("in SolrDataHandleThread run()--->begin");
log.info("pageSize is--->" + pageSize);
//调saf接口 根据商家id查询商家管理系统oracle数据 如商家的所属运营人员及部门的数据
List<VenderAuthResultData> resultDataAllList = venderAuthResultDataDao.getAllVenderAuthResultData();
if (resultDataAllList != null && resultDataAllList.size() > 0) {
int totalCount = resultDataAllList.size();
int totalPages = totalCount % pageSize > 0 ? (totalCount / pageSize + 1) : totalCount / pageSize;
List<VenderAuthResultData> resultDataTempList = new ArrayList<VenderAuthResultData>();
for (int page = 1; page <= totalPages; page++) {
int startIndex = (page - 1) * pageSize;
int endIndex = startIndex + pageSize;
if (totalCount > endIndex) {
resultDataTempList = resultDataAllList.subList(startIndex, endIndex);
} else {
resultDataTempList = resultDataAllList.subList(startIndex, totalCount);
}
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
if (resultDataTempList != null && resultDataTempList.size() > 0) {
for (VenderAuthResultData vdl : resultDataTempList) {
VenderInfoDTO venderInfoDTO = penaltyBlackWhiteClient.getVenderInfoDtoByVenderId(vdl.getVenderId());
if (venderInfoDTO != null) {
//构建刷solr数据
SolrInputDocument doc = buildSolrInputDocument(vdl, venderInfoDTO);
docs.add(doc);
}
}
}
try {
solrServer92.add(docs);
solrServer92.commit();
} catch (Exception e) {
log.error("批量同步所有mysql数据到92solr服务器-刷新全部solr数据遇到错误", e);
}
try {
solrServer96.add(docs);
solrServer96.commit();
} catch (Exception e) {
log.error("批量同步所有mysql数据到96solr服务器-刷新全部solr数据遇到错误", e);
}
log.info("deal with data end, page number is--->" + page);
}
}
log.info("in SolrDataHandleThread run()--->end");
}
}
private class ElasticSearchDataHandler implements Runnable {
public void run() {
log.info("Elastic search thread begin to process data !");
boolean switchEs = ConfigCenterUtil.getSwitchConfig(SellerAuthStrategy.ES_SWITCH);
log.info("The switch of writing to ElasticSearch is opened :" + switchEs);
if(switchEs) {
List<VenderAuthResultData> resultDataAllList = venderAuthResultDataDao.getAllVenderAuthResultData();
if (resultDataAllList != null && resultDataAllList.size() > 0) {
log.info("Size of data is " + resultDataAllList.size());
for (VenderAuthResultData vdl : resultDataAllList) {
VenderInfoDTO venderInfoDTO = penaltyBlackWhiteClient.getVenderInfoDtoByVenderId(vdl.getVenderId());
if (venderInfoDTO != null) {
SellerAuthDocument sellerAuthDocument = buildSellerAuthDocument(vdl, venderInfoDTO);
indexElasticSearchDoc(sellerAuthDocument);
}
}
}
}
log.info("Elastic search thread process data successfully !");
}
}
private class RedisDataHandler implements Runnable {
public void run() {
log.info("begin to synchronize the redis data in thread : RedisDataHandler, with page size: " + pageSize);
Calendar calendar = getLegalCalendar();
String[] monthArray = getMonths(calendar);
for(String m : monthArray) {
log.info("Begin to process sync data to redis server.with month : " + m);
List<VenderAuthResultData> resultDataAllList = venderAuthResultDataDao.getCurrentYearMonthVenderAuthResultDataByOpTime(m);
if(null != resultDataAllList && resultDataAllList.size() > 0) {
syncRedisDataProcess(resultDataAllList);
}else{
log.info("The synchronous data is empty. the month is " + m);
}
log.info("Finish to process sync data to redis server.with month : " + m);
}
log.info("finish to synchronize the redis data in thread : RedisDataHandler, with page size: " + pageSize);
}
}
===============================================================================
(二)
注入线程池:
private ThreadPoolUtil threadPoolUtil;
public void setThreadPoolUtil(ThreadPoolUtil threadPoolUtil) {
this.threadPoolUtil = threadPoolUtil;
}
bean配置:
<!-- 线程池util -->
<bean id="threadPoolUtil" class="com.jd.util.ThreadPoolUtil"/>
调用:
private void writeCustomsOpenStatusToRedis(Long customsId, int openStatus) {
try {
final int openStatusTmp = openStatus;
final Long customsIdTmp = customsId;
threadPoolUtil.getCachedThreadPool().execute(new Runnable() {
public void run() {
if(customsIdTmp == null) {
return;
}
if (openStatusTmp == 1) {
redisUtils.set(RedisConstants.CUSTOMS_OPEN_STATUS_SURFIX + customsIdTmp, String.valueOf(openStatusTmp));
} else {
redisUtils.del(RedisConstants.CUSTOMS_OPEN_STATUS_SURFIX + customsIdTmp);
}
}
});
}catch (Exception e) {
log.error("构建海关启用停用redis数据失败", e);
}
}
private void writeCustomsVenderToRedis(Long customsId, Long venderId, int customsVenderStatus) {
try {
final Long venderIdTemp = venderId;
final Long customsIdTmp = customsId;
final int customsVenderStatusTmp = customsVenderStatus;
threadPoolUtil.getCachedThreadPool().execute(new Runnable() {
public void run() {
if (customsVenderStatusTmp == 1) {
redisUtils.hset(RedisConstants.CUSTOMS_VENDER_ID_SURFIX + venderIdTemp, String.valueOf(customsIdTmp), "1");
} else {
redisUtils.hdel(RedisConstants.CUSTOMS_VENDER_ID_SURFIX + venderIdTemp, String.valueOf(customsIdTmp));
}
}
});
}catch (Exception e) {
log.error("构建海关店铺添加或移除redis数据失败", e);
}
}
线程池类ThreadPoolUtil.java:
package com.jd.util;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 创建获取线程池的util
* User: shaodong
* Date: 13-1-5
* Time: 下午12:46
* To change this template use File | Settings | File Templates.
*/
public class ThreadPoolUtil {
/**
* 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,
* 这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。
# 如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
*/
private final ExecutorService CACHED_THREAD_POOL = Executors.newCachedThreadPool();
/**
* 创建一个固定大小的线程池,最大50个,超过50个的时候,会阻塞等待
*/
private final ExecutorService FIXED_THREAD_POOL = Executors.newFixedThreadPool(50);
/**
* 获得固定大小的线程池
* @return
*/
public ExecutorService getCachedThreadPool() {
return CACHED_THREAD_POOL;
}
// public static void main(String args[]){
// new ThreadPoolUtil().m();
// }
//
// void m() {
// //创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
// ExecutorService pool = Executors.newFixedThreadPool(50);
// //创建实现了runnable接口的对象
// for(int i=1; i<=100; i++) {
// pool.execute(new MyThread(i));
// }
// pool.shutdown();
// }
//
// class MyThread extends Thread{
// int i = 0;
// public MyThread(int _i){
// System.out.println("create thread:"+_i);
// i = _i;
// }
//
// @Override
// public void run(){
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
// }
// System.out.println(Thread.currentThread().getName()+" is running... thread:"+i);
// }
// }
}
相关推荐
java多线程,对多线程,线程池进行封装,方便使用
Java 多线程与并发(17_26)-JUC线程池_ FutureTask详解
线程与多线程 1.线程 在操作系统中,线程是比进程更小的能够独立运行的基本单位。同时,它也是 CPU 调度的基本单位。线程本身基本上不拥有系统资源,只是拥有一些在运行时 需要用到的系统资源,例如程序计数器,...
* 使用多线程的好处: 1.降低消耗,减少了创建和销毁线程的次数,每个线程都可以重复利用,可执行多个任务 2.提高响应速度,任务可以不需要等到线程创建就可以立即执行 3.提高线程的可管理性,根据系统的承受能力,...
JAVA使用线程池查询大批量数据
计算机后端-Java-Java核心基础-第20章 多线程 19. 使用线程池的好处.avi
目标:Java中多线程技术是一个难点,但是也是一个核心技术。因为Java本身就是一个多线程语言。本人目前在给46班讲授Swing的网络编程--使用Swing来模拟真实的QQ实时聊天软件。因为涉及到Socket编程,所以一定会使用多...
Java线程池及观察者模式解决多线程意外死亡重启问题,附件含两个要运行代码!
计算机后端-Java-Java核心基础-第20章 多线程 20. 创建多线程的方式四:使用线程池.avi
java线程池Executors实现数据批量操作。 批量异步Executors处理数据,实现限流操作,QPS限流。 线程池调用第三方接口限流实现逻辑。 案例适合: 1.批量处理大数据。 2.数据批量导出。 3任务数据异步执行。 4.多线程...
首先希望大家喜欢我制作的文档,如果文档中有什么误解的地方,望告诉一下,5分是也不多,是系统默认的,那么就5分咯,java多线程详解,线程池原理,8种锁,java内存模型......
在多线程大师Doug Lea的贡献下,在JDK1.5中加入了许多对并发特性的支持,例如:线程池。.......................................JAVA线程、线程池资料----下载不扣分,回帖加1分,欢迎下载,童叟无欺JAVA线程、...
详细的讲述了多线程的各种用法 Java线程:概念与原理 Java线程:创建与启动 Java线程:线程栈模型与线程的变量 Java线程:线程状态的转换 Java线程:线程的同步与锁 Java线程:线程的交互 Java线程:线程的调度-休眠...
Java线程:新特征-线程池 Java线程:新特征-有返回值的线程 Java线程:新特征-锁(上) Java线程:新特征-锁(下) Java线程:新特征-信号量 Java线程:新特征-阻塞队列 Java线程:新特征-阻塞栈 Java线程:...
volatile关键字的非原子性、volatile关键字的使用、AtomicInteger原子性操作、线程安全小例子:多个线程竞争问题、多个线程多个锁问题、创建一个缓存的线程池、多线程使用Vector或者HashTable的示例(简单线程同步...
java线程、线程池、xml解析 适合入门的例子或文档 上手使用
java多线程并发查询数据库,使用线程池控制分页,并发查询。
java线程池的原理和实现,挺全面的,分享给大家!