package com.soft.transform.service.thread;
import com.alibaba.dubbo.common.utils.CollectionUtils;
import com.sunline.common.config.DefaultConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
/**
* Created by CaiJianbo on 2016/6/18 13:58.
* sunline
*/
@Component
public class MarketJobDaemon extends Thread {
private static Logger logger = LoggerFactory.getLogger(MarketJobDaemon.class);
private Vector<MarketJobInfo> workers = new Vector<>();
private Vector<MarketJobInfo> updatedWorkers = new Vector<>();
@Autowired
private PhTransCodingThread phTransCodingThread;
@Autowired
private PhTsPersistenceThread phTsPersistenceThread;
@Autowired
private PhTsHisPersistenceThread phTsHisPersistenceThread;
@Autowired
private TsMinutePersistenceThread tsMinutePersistenceThread;
@Autowired
private TsFiveMinutePersistenceThread tsFiveMinutePersistenceThread;
@Resource
DefaultConfig defaultConfig;
private long lastLogTime;
@PostConstruct
private void initJobs() {
workers.add(new MarketJobInfo().setJob(phTransCodingThread)
.setJobName(phTransCodingThread.getClass().getSimpleName()));
workers.add(new MarketJobInfo().setJob(phTsPersistenceThread)
.setJobName(phTsPersistenceThread.getClass().getSimpleName()));
workers.add(new MarketJobInfo().setJob(phTsHisPersistenceThread)
.setJobName(phTsHisPersistenceThread.getClass().getSimpleName()));
workers.add(new MarketJobInfo().setJob(tsMinutePersistenceThread)
.setJobName(tsMinutePersistenceThread.getClass().getSimpleName()));
workers.add(new MarketJobInfo().setJob(tsFiveMinutePersistenceThread)
.setJobName(tsFiveMinutePersistenceThread.getClass().getSimpleName()));
}
@Override
public void run() {
if (CollectionUtils.isNotEmpty(workers)) {
while (true) {
try {
long dateTime = new Date().getTime();
Iterator<MarketJobInfo> iterator = workers.iterator();
MarketJobInfo jobInfo;
BaseMarketThread job;
Thread workThread;
while (iterator.hasNext()) {
jobInfo = iterator.next();
workThread = jobInfo.getWorkThread();
job = jobInfo.getJob();
synchronized (jobInfo) {
if (job.isRunnable(dateTime)) {
if (!workThread.isAlive()) {
if (workThread.getState() == State.NEW) {
jobInfo.startJob(Thread.currentThread());
logger.info("启动线程:" + jobInfo.getJobName());
} else if (workThread.getState() == State.WAITING) {
synchronized (workThread) {
workThread.notify();
}
logger.info("启动线程:" + jobInfo.getJobName());
} else if (workThread.getState() == State.TERMINATED) {
workThread = jobInfo.updateWorkThread();
jobInfo.startJob(Thread.currentThread());
iterator.remove();
updatedWorkers.add(jobInfo);
logger.info("启动线程:" + jobInfo.getJobName());
}
}
} else {
if (workThread.isAlive()) {
job.isInterrupt = true;
synchronized (workThread) {
workThread.interrupt();
}
logger.info("关闭线程:" + jobInfo.getJobName());
}
}
}
}
if (Math.abs(dateTime - lastLogTime) / (1000 * 60) >= 1) {
new Thread(new Runnable() {
@Override
public void run() {
logWorkState();
}
}).start();
}
if (CollectionUtils.isNotEmpty(updatedWorkers)) {
workers.addAll(updatedWorkers);
updatedWorkers.clear();
}
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
logger.error("线程异常。", e);
}
}
}
}
public void logWorkState() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("线程状态:{");
for (MarketJobInfo jobInfo : workers) {
stringBuilder.append("\"").append(jobInfo.getJobName()).append("\"").append(":").append("\"")
.append(jobInfo.getWorkThread().getState()).append("\"").append(",");
}
stringBuilder.append("}");
logger.info(stringBuilder.toString());
lastLogTime = new Date().getTime();
}
public synchronized void StopOtherThread(Set<String> exceptJob) {
if (exceptJob == null) {
exceptJob = new HashSet<>();
}
Thread thread;
for (MarketJobInfo worker : workers) {
if (!exceptJob.contains(worker.getJobName())) {
thread = worker.getWorkThread();
if (!thread.isInterrupted()) {
worker.getJob().isInterrupt = true;
thread.interrupt();
}
}
}
}
}
相关推荐
自己写的一些多线程的小Demo,在这里与大家分享一下。
一个简单的C++多线程demo
C++11多线程demo,包含在Ubuntu下编译C++11的方法,写了两三个C++11多线程的demo,欢迎下载!
这个压缩包中是一个简单的多线程实例,在线程执行时,每隔一秒刷新一次当前时间!演示了如何在不影响主进程的情况下如何执行其他长时间等待任务。
给大家分享一个多线程学习的demo哈,此demo源自于雨松MOMO,很清晰,很详细,很实用。建议students先学习简单的理论,然后再分析这个demo。
.net下,C#语言的多线程示例,使用Parallel类
手写多线程demo,模拟数据库练习。感谢技术分享感谢技术分享感谢技术分享..感谢技术分享感谢技术分享感谢技术分享..
Qt 多线程及简单实例 demo。 多线程的几大特点: 1.多线程的执行顺序无法保证,与操作系统的调度策略和线程优先级等因素有关。 2.多线程的切换可能发生在任何时刻、任何地点。 3.多线程对代码的敏感度高,因此对...
通过网络的资料,自己编写的一个简单的多线程DEMO,供参考。
MFC c++ vs2005 多线程 demo 实现两个线程 ,两个小球的运动
原理,一个子程序被线程每次启动时代入参数不同计算不同的值。
linux多线程demo 使用cmake编译
多线程Demo多线程Demo多线程Demo多线程Demo多线程Demo多线程Demo
MFCACE多线程demo,基于ace 但是自己需要配置 C/C++属性里配置即可
简单、使用的一个多线程开发的例子,里面包含了创建线程、开启线程、线程函数处理、关闭线程等基本的代码。
Delphi经典的多线程DEMO