`
taiwei.peng
  • 浏览: 229773 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

多线程demo

阅读更多

package com.soft.transform.service.thread;

import com.alibaba.dubbo.common.utils.CollectionUtils;
import com.sunline.common.config.DefaultConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.TimeUnit;

/**
* Created by CaiJianbo on 2016/6/18 13:58.
* sunline
*/
@Component
public class MarketJobDaemon extends Thread {
    private static Logger logger = LoggerFactory.getLogger(MarketJobDaemon.class);
    private Vector<MarketJobInfo> workers = new Vector<>();
    private Vector<MarketJobInfo> updatedWorkers = new Vector<>();

    @Autowired
    private PhTransCodingThread phTransCodingThread;
    @Autowired
    private PhTsPersistenceThread phTsPersistenceThread;
    @Autowired
    private PhTsHisPersistenceThread phTsHisPersistenceThread;
    @Autowired
    private TsMinutePersistenceThread tsMinutePersistenceThread;
    @Autowired
    private TsFiveMinutePersistenceThread tsFiveMinutePersistenceThread;
    @Resource
    DefaultConfig defaultConfig;

    private long lastLogTime;

    @PostConstruct
    private void initJobs() {
        workers.add(new MarketJobInfo().setJob(phTransCodingThread)
                .setJobName(phTransCodingThread.getClass().getSimpleName()));
        workers.add(new MarketJobInfo().setJob(phTsPersistenceThread)
                .setJobName(phTsPersistenceThread.getClass().getSimpleName()));
        workers.add(new MarketJobInfo().setJob(phTsHisPersistenceThread)
                .setJobName(phTsHisPersistenceThread.getClass().getSimpleName()));
        workers.add(new MarketJobInfo().setJob(tsMinutePersistenceThread)
                .setJobName(tsMinutePersistenceThread.getClass().getSimpleName()));
        workers.add(new MarketJobInfo().setJob(tsFiveMinutePersistenceThread)
                .setJobName(tsFiveMinutePersistenceThread.getClass().getSimpleName()));
    }

  
    @Override
    public void run() {
        if (CollectionUtils.isNotEmpty(workers)) {
            while (true) {
                try {
                    long dateTime = new Date().getTime();
                    Iterator<MarketJobInfo> iterator = workers.iterator();
                    MarketJobInfo jobInfo;
                    BaseMarketThread job;
                    Thread workThread;
                    while (iterator.hasNext()) {
                        jobInfo = iterator.next();
                        workThread = jobInfo.getWorkThread();
                        job = jobInfo.getJob();
                        synchronized (jobInfo) {
                            if (job.isRunnable(dateTime)) {
                                if (!workThread.isAlive()) {
                                    if (workThread.getState() == State.NEW) {
                                        jobInfo.startJob(Thread.currentThread());
                                        logger.info("启动线程:" + jobInfo.getJobName());
                                    } else if (workThread.getState() == State.WAITING) {
                                        synchronized (workThread) {
                                            workThread.notify();
                                        }
                                        logger.info("启动线程:" + jobInfo.getJobName());
                                    } else if (workThread.getState() == State.TERMINATED) {
                                        workThread = jobInfo.updateWorkThread();
                                        jobInfo.startJob(Thread.currentThread());
                                        iterator.remove();
                                        updatedWorkers.add(jobInfo);
                                        logger.info("启动线程:" + jobInfo.getJobName());
                                    }
                                }
                            } else {
                                if (workThread.isAlive()) {
                                    job.isInterrupt = true;
                                    synchronized (workThread) {
                                        workThread.interrupt();
                                    }
                                    logger.info("关闭线程:" + jobInfo.getJobName());
                                }
                            }
                        }
                    }
                    if (Math.abs(dateTime - lastLogTime) / (1000 * 60) >= 1) {
                        new Thread(new Runnable() {
                            @Override
                            public void run() {
                                logWorkState();
                            }
                        }).start();
                    }
                    if (CollectionUtils.isNotEmpty(updatedWorkers)) {
                        workers.addAll(updatedWorkers);
                        updatedWorkers.clear();
                    }
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    logger.error("线程异常。", e);
                }
            }
        }
    }

    public void logWorkState() {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("线程状态:{");
        for (MarketJobInfo jobInfo : workers) {
            stringBuilder.append("\"").append(jobInfo.getJobName()).append("\"").append(":").append("\"")
                    .append(jobInfo.getWorkThread().getState()).append("\"").append(",");
        }
        stringBuilder.append("}");
        logger.info(stringBuilder.toString());
        lastLogTime = new Date().getTime();
    }

    public synchronized void StopOtherThread(Set<String> exceptJob) {
        if (exceptJob == null) {
            exceptJob = new HashSet<>();
        }
        Thread thread;
        for (MarketJobInfo worker : workers) {
            if (!exceptJob.contains(worker.getJobName())) {
                thread = worker.getWorkThread();
                if (!thread.isInterrupted()) {
                    worker.getJob().isInterrupt = true;
                    thread.interrupt();
                }
            }
        }
    }

}

 

分享到:
评论

相关推荐

    java多线程Demo

    自己写的一些多线程的小Demo,在这里与大家分享一下。

    C++多线程demo

    一个简单的C++多线程demo

    C++11多线程demo

    C++11多线程demo,包含在Ubuntu下编译C++11的方法,写了两三个C++11多线程的demo,欢迎下载!

    Delphi多线程Demo

    这个压缩包中是一个简单的多线程实例,在线程执行时,每隔一秒刷新一次当前时间!演示了如何在不影响主进程的情况下如何执行其他长时间等待任务。

    android多线程demo(很清晰很详细)

    给大家分享一个多线程学习的demo哈,此demo源自于雨松MOMO,很清晰,很详细,很实用。建议students先学习简单的理论,然后再分析这个demo。

    C#多线程Demo

    .net下,C#语言的多线程示例,使用Parallel类

    多线程demo/java多线程练习

    手写多线程demo,模拟数据库练习。感谢技术分享感谢技术分享感谢技术分享..感谢技术分享感谢技术分享感谢技术分享..

    Qt 多线程及简单实例 demo

    Qt 多线程及简单实例 demo。 多线程的几大特点: 1.多线程的执行顺序无法保证,与操作系统的调度策略和线程优先级等因素有关。 2.多线程的切换可能发生在任何时刻、任何地点。 3.多线程对代码的敏感度高,因此对...

    Delphi 多线程 DEMO

    通过网络的资料,自己编写的一个简单的多线程DEMO,供参考。

    MFC c++ vs2005 多线程 demo

    MFC c++ vs2005 多线程 demo 实现两个线程 ,两个小球的运动

    易语言多线程demo.rar

    原理,一个子程序被线程每次启动时代入参数不同计算不同的值。

    linux多线程demo

    linux多线程demo 使用cmake编译

    多线程Demo

    多线程Demo多线程Demo多线程Demo多线程Demo多线程Demo多线程Demo

    MFCACE多线程demo

    MFCACE多线程demo,基于ace 但是自己需要配置 C/C++属性里配置即可

    多线程demo程序-轻松掌握多线程技术

    简单、使用的一个多线程开发的例子,里面包含了创建线程、开启线程、线程函数处理、关闭线程等基本的代码。

    Delphi经典的多线程DEMO

    Delphi经典的多线程DEMO

Global site tag (gtag.js) - Google Analytics