`
coach
  • 浏览: 382797 次
  • 性别: Icon_minigender_2
  • 来自: 印度
社区版块
存档分类
最新评论

根据cpu情况决定线程运行数量和情况

 
阅读更多
一个线程分配器,根据cpu的负载情况,自动完成对应线程的唤醒或者是等待操作。整个过程是一个平滑的过程,不会因为线程的切换而导致机器负载出线锯齿。

读取Linux系统TOP等指令拿到系统当前负载
package temp.util;

import java.io.BufferedReader;
import java.io.InputStreamReader;

/**
 * @description 节点的cpu 内存 磁盘空间 情况
 * @version 1.0
 * @date 2012-7-11
 */
public class NodeLoadView
{
	/**
	 * 获取cpu使用情况
	 * @return
	 * @throws Exception
	 */
	public double getCpuUsage() throws Exception
	{
		double cpuUsed = 0;
		Runtime rt = Runtime.getRuntime();
		Process p = rt.exec("/usr/bin/uptime");// 调用系统的"top"命令
		String[] strArray = null;
		BufferedReader in = null;
		try
		{
			in = new BufferedReader(new InputStreamReader(p.getInputStream()));
			String str = null;
			while ((str = in.readLine()) != null)
			{
				strArray = str.split("load average: ");
				strArray = strArray[1].split(",");
				cpuUsed = Double.parseDouble(strArray[0]);
			}
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
		finally
		{
			in.close();
		}
		return cpuUsed;
	}

	/**
	 * 内存监控
	 * @return
	 * @throws Exception
	 */
	public double getMemUsage() throws Exception
	{
		double menUsed = 0;
		Runtime rt = Runtime.getRuntime();
		Process p = rt.exec("top -b -n 1");// 调用系统的"top"命令
		BufferedReader in = null;
		try
		{
			in = new BufferedReader(new InputStreamReader(p.getInputStream()));
			String str = null;
			String[] strArray = null;
			while ((str = in.readLine()) != null)
			{
				int m = 0;
				if (str.indexOf(" R ") != -1)
				{// 只分析正在运行的进程,top进程本身除外 &&
					//
					// System.out.println("——————3—————–");
					strArray = str.split(" ");
					for (String tmp : strArray)
					{
						if (tmp.trim().length() == 0)
							continue;
						if (++m == 10)
						{
							// 9)–第10列为mem的使用百分比(RedHat 9)
							menUsed += Double.parseDouble(tmp);
						}
					}
				}
			}
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
		finally
		{
			in.close();
		}
		return menUsed;
	}

	/**
	 * 获取磁盘空间大小
	 * @return
	 * @throws Exception
	 */
	public double getDeskUsage() throws Exception
	{
		double totalHD = 0;
		double usedHD = 0;
		Runtime rt = Runtime.getRuntime();
		Process p = rt.exec("df -hl");// df -hl 查看硬盘空间
		BufferedReader in = null;
		try
		{
			in = new BufferedReader(new InputStreamReader(p.getInputStream()));
			String str = null;
			String[] strArray = null;
			while ((str = in.readLine()) != null)
			{
				int m = 0;
				// if (flag > 0) {
				// flag++;
				strArray = str.split(" ");
				for (String tmp : strArray)
				{
					if (tmp.trim().length() == 0)
						continue;
					++m;
					// System.out.println("—-tmp—-" + tmp);
					if (tmp.indexOf("G") != -1)
					{
						if (m == 2)
						{
							// System.out.println("—G—-" + tmp);
							if (!tmp.equals("") && !tmp.equals("0"))
								totalHD += Double.parseDouble(tmp.substring(0, tmp.length() - 1)) * 1024;
						}
						if (m == 3)
						{
							// System.out.println("—G—-" + tmp);
							if (!tmp.equals("none") && !tmp.equals("0"))
								usedHD += Double.parseDouble(tmp.substring(0, tmp.length() - 1)) * 1024;
						}
					}
					if (tmp.indexOf("M") != -1)
					{
						if (m == 2)
						{
							// System.out.println("—M—" + tmp);
							if (!tmp.equals("") && !tmp.equals("0"))
								totalHD += Double.parseDouble(tmp.substring(0, tmp.length() - 1));
						}
						if (m == 3)
						{
							// System.out.println("—M—" + tmp);
							if (!tmp.equals("none") && !tmp.equals("0"))
								usedHD += Double.parseDouble(tmp.substring(0, tmp.length() - 1));
							// System.out.println("—-3—-" + usedHD);
						}
					}
				}
				// }
			}
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
		finally
		{
			in.close();
		}
		return (usedHD / totalHD) * 100;
	}

	public static void main(String[] args) throws Exception
	{
		NodeLoadView cpu = new NodeLoadView();
		System.out.println("—————cpu used:" + cpu.getCpuUsage() + "%");
		System.out.println("—————mem used:" + cpu.getMemUsage() + "%");
		System.out.println("—————HD used:" + cpu.getDeskUsage() + "%");
		System.out.println("————jvm监控———————-");
		Runtime lRuntime = Runtime.getRuntime();
		System.out.println("————–Free Momery:" + lRuntime.freeMemory() + "K");
		System.out.println("————–Max Momery:" + lRuntime.maxMemory() + "K");
		System.out.println("————–Total Momery:" + lRuntime.totalMemory() + "K");
		System.out.println("—————Available Processors :" + lRuntime.availableProcessors());
	}
}


package temp.util;

import java.util.Map;

import org.apache.log4j.Logger;

/**
 * @description
 * @version 1.0
 * @date 2012-7-11
 */
public class ThreadScheduler
{
	private static Logger logger = Logger.getLogger(ThreadScheduler.class.getName());
	private Map<String, Thread> runningThreadMap;
	private Map<String, Thread> waitingThreadMap;
	private boolean isFinished = false;
	private int runningSize;

	public ThreadScheduler(Map<String, Thread> runningThreadMap, Map<String, Thread> waitingThreadMap)
	{
		this.runningThreadMap = runningThreadMap;
		this.waitingThreadMap = waitingThreadMap;
		this.runningSize = waitingThreadMap.size();
	}

	/**
	 * 开始调度线程
	 * @author zhen.chen
	 * @createTime 2010-1-28 上午11:04:52
	 */
	public void schedule()
	{
		long sleepMilliSecond = 1 * 1000;
		int allowRunThreads = 15;
		// 一次启动的线程数,cpuLoad变大时以此值为参考递减
		int allowRunThreadsRef = 15;
		double cpuLoad = 0;// 0-15
		NodeLoadView load = new NodeLoadView();
		while (true)
		{
			try
			{
				cpuLoad = load.getCpuUsage();
			}
			catch (Exception e1)
			{
				e1.printStackTrace();
			}
			// cpuLoad低 启动的线程多
			allowRunThreads = (int) Math.floor(allowRunThreadsRef - cpuLoad);
			// threads不能为0
			if (allowRunThreads < 1)
			{
				allowRunThreads = 1;
			}
			if (allowRunThreads > allowRunThreadsRef)
			{
				allowRunThreads = allowRunThreadsRef;
			}
			if (logger.isDebugEnabled())
			{
				logger.debug("[ThreadScheduler]running Thread:" + runningThreadMap.size() + "; waiting Thread:" + waitingThreadMap.size() + "; cpu:" + cpuLoad + " allowRunThreads:" + allowRunThreads);
			}
			// 检查等待线程runningSize个线程的情况,满足条件则启动
			for (int x = 0; x < runningSize; x++)
			{
				if (waitingThreadMap.get(x + "") != null)
				{
					if (allowRunThreadsRef <= runningThreadMap.size())
					{
						break;
					}
					synchronized (waitingThreadMap.get(x + ""))
					{
						if (!waitingThreadMap.get(x + "").isAlive())
						{
							waitingThreadMap.get(x + "").start();
						}
						else
						{
							waitingThreadMap.get(x + "").notify();
						}
					}
					runningThreadMap.put(x + "", waitingThreadMap.get(x + ""));
					waitingThreadMap.remove(x + "");
				}
			}
			// 检查runningSize个线程的情况,满足条件则暂停
			for (int x = 0; x < runningSize; x++)
			{
				if (runningThreadMap.size() <= allowRunThreads)
				{
					break;
				}
				if (runningThreadMap.get(x + "") != null)
				{
					synchronized (runningThreadMap.get(x + ""))
					{
						try
						{
							if (runningThreadMap.get(x + "").isAlive())
							{
								runningThreadMap.get(x + "").wait();
							}
							else
							{
								continue;
							}
						}
						catch (InterruptedException e)
						{
							e.printStackTrace();
						}
					}
					waitingThreadMap.put(x + "", runningThreadMap.get(x));
					runningThreadMap.remove(x + "");
				}
			}
			// 全部跑完,返回
			if (waitingThreadMap.size() == 0 && runningThreadMap.size() == 0)
			{
				if (logger.isDebugEnabled())
				{
					logger.debug("[ThreadScheduler] over.total Threads size:" + runningSize);
				}
				this.isFinished = true;
				return;
			}
			// 使主while循环慢一点
			try
			{
				Thread.sleep(sleepMilliSecond);
			}
			catch (InterruptedException e1)
			{
				e1.printStackTrace();
			}
		}
	}

	public boolean isFinished()
	{
		return isFinished;
	}
}


这个类的作用:
  • 1.接收runningThreadMap和waitingThreadMap两个map,里面对应存了运行中的线程实例和等待中的线程实例。
  • 2.读cpu情况,自动判断要notify等待中的线程还是wait运行中的线程。
  • 3.两个map都结束,退出。(必须runningThreadMap内部的Thread自己将runningThreadMap对应的Thread remove掉)


使用:
package temp.util;

import java.util.HashMap;
import java.util.Map;

/**
 * @description
 * @version 1.0
 * @date 2012-7-11
 */
public class TestThread
{
	public static class Runner extends Thread
	{
		public Runner(int j, Map<String, Thread> threadMap)
		{
		}

		public void run()
		{
			// TODO 你的逻辑 完成后需要从threadMap中remove掉
		}
	}

	public static void main(String[] args)
	{
		// 运行中的线程
		Map<String, Thread> threadMap = new HashMap<String, Thread>();
		// 正在等待中的线程
		Map<String, Thread> waitThreadMap = new HashMap<String, Thread>();
		for (int j = 0; j < args.length; j++)
		{
			Thread t = new Runner(j, threadMap);
			waitThreadMap.put(j + "", t);
		}
		ThreadScheduler threadScheduler = new ThreadScheduler(threadMap, waitThreadMap);
		threadScheduler.schedule();
		if (threadScheduler.isFinished() == false)
		{
			// 没能正常结束
		}
	}
}
分享到:
评论

相关推荐

    易语言线程池操作例程(解决内存不断升高的问题)

    解决:软件多线程运行时遇到【内存不断升高】甚至爆表!。 因为本人是个小白,多线程经常用,但是线程池并没有用过,(一听到线程池,总感觉高大上)。但是近期写彩票软件的时候发现,多线程长期操作会导致内容不断...

    C++并发与多线程-运行时选择线程数量

    使用std::thread::hardware_concurrency() 来获得当前系统可以真正并发的线程数量,和cpu的核心数有关。 #include #include #include #include #include #include std::mutex g_lock; template struct accumulate_...

    可同时搜索多个文件的多线程快速搜索源码

    2、扫描速度主要依赖于CPU数量和质量,以及需要同时搜索的目标文件数量,在酷睿双核、640G硬盘的电脑中消耗时间为4秒左右,4核I5、1TB硬盘的电脑中消耗时间为2秒左右(搜索全部本地硬盘,不包括回收站这样的特殊...

    c/c++用一个源文件,一个头文件实现的线程池源码,包含测试代码,可运行

    工作线程的数量建议设置到和cpu的核数一样。 9、编码方式: g++ -std=c++11 -o test test.c cppThreadWorkers.cpp -I./ -lpthread test.c:测试的c文件; cppThreadWorkers.cpp: 线程池源文件 cppThreadWorkers.h: ...

    第7章-JUC多线程v1.1.pdf

    MaximunPoolSize: 线程池能创建最大的线程数量, 如果核心线程池和缓冲队列都已经满了, 新的任务就会进来创建新的线程来执行, 但是数量不能超过maximunPoolSize, 否则采取拒绝接受任务策略 KeepAliveTime: 非核心...

    AnyFo - Janet :通用并发线程池

    但是,如果程序在同一个极短的时间内启动太多线程,一方面操作系统对线程数量有限制,一方面多个并发线程会占用大 量的CPU,并且,每次在要启动线程时临时去创建线程的实例,极大的占用了系统运行的时间,造成系统...

    jstack生成的Thread Dump日志.docx

    jstack生成的Thread Dump日志.docx 系统线程状态 (Native Thread Status) 系统线程有如下状态: ...死锁线程,一般指多个线程调用期间进入了相互资源占用,导致一直等待无法...JVM线程运行状态 (JVM Thread Status)

    Matlab如何控制CPU使用核的数量.docx

    二、Matlab如何设置运行内核数 2 2.1 什么是并行池 2 2.2 命令parpool 3 2.2.1 循环运算 3 2.2.2 天线设计 5 2.3 Processes(线程)与threads(进程)的区别 7 2.3.1 Processes(线程) 7 2.3.2 threads(进程) 7 ...

    FastMM5:FastMM是Embarcadero Delphi应用程序的快速替换内存管理器,可在多个线程和CPU内核之间很好地扩展,不容易出现内存碎片,并且无需使用外部.DLL文件即可支持共享内存。

    它是从头开始设计的,旨在同时保持其优势并解决4.992版的缺点:跨多个CPU内核的多线程扩展得到了极大的改善,而不会出现内存使用中断的情况。对于任意数量的CPU内核,可以将其配置为接近线性扩展。在Fastcode内存...

    好用的CPU双核补丁傻瓜安装版

    本程序可自动检测出CPU的生产厂家、核心数量,并智能安装相应的双核补丁,支持参数静默安装, AMD默认安装: 官方驱动、AMD优化补丁、KB896256、KB931784 并相应的改写注册表PerfEnablePackageIdle为1 Intel默认...

    linux优化笔记

    在linux系统下只有运行SMP内核才能支持超线程,但是安装的CPu数量越多,从超线程获得的性能提升越少。 另外linux内核会将多核的处理器当做多个单独的CPU来识别,例如,两个4核的CPU会被当成8个单个CPU,从性能角度讲...

    轻量级、易用、快速的日志库,仅提供日志写入前端 C++源代码

    轻量级、易用、快速的日志库,仅提供日志写入前端。 仅标头,跨平台,在 C++ 11 中...速度的重要性放在最后,线程增加(不超过CPU数量)的情况下,对速度的影响在30%内,每条日志达到耗时在 1us 左右基本可以满足要求.

    NVIDIA CUDA

    这样的分解保留了语言表达,允许线程在解决各子问题时协作,同时支持透明的可伸缩性,使您可以安排在任何可用处理器内核上处理各子问题:因而,编译后的 CUDA 程序可以在任何数量的处理器内核上执行,只有运行时系统...

    misc-tool-benchmark:用于CPU,RAM和磁盘的基准测试工具

    如果未提供,则基准测试将尝试确定实际cpu内核的数量,而不是依靠操作系统确定的值(即多个cpu线程的数量)(启用类似超线程的功能时可能会有所不同)。 首先,在尝试测试并行处理的性能之前,您可能希望先从线程...

    Java并发编程(学习笔记).xmind

    如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务 threadFactory 创建线程的工厂 handler 拒绝策略 unit 是一个枚举,表示 ...

    CUDA技术入门介绍

    这样的分解保留了语言表达,允许线程在解决各子问题时协作,同时支持透明的可伸缩性,使您可以安排在任何可用处理器内核上处理各子问题:因而,编译后的 CUDA 程序可以在任何数量的处理器内核上执行,只有运行时系统...

    prolasso64_98259.zip

    Windows的设计允许程序在不受足够的限制的情况下垄断您的CPU,从而导致系统停滞和反映滞后。ProBalance根据需求智能调节运行的程序的优先次序,从而使不良进程不会对您的PC的响应产生负面影响。这一目的并非通过提高...

    服务器硬件架构(行业文书).doc

    图 2 英特尔处理器插座 主板上插座的数量决定了最多可支持的处理器数量,最初,服务器都只有一个处理 器插座,但为了提高服务器的性能,市场上已经出现了包含2,4和8个插座的主板。 在处理器体系结构的演变过程中,...

    Process Lasso pro with keygen

    优化进程、内核和电源消耗,防止CPU垄断,让系统运行流畅! Process Lasso并非另一个任务管理器。它是一个进程的优化和自动化工具。Process Lasso最受欢迎的功能之一是它的一种独特的技术,称为ProBalance(进程平衡...

    Process Lasso PRO v6.6.1.6 中文绿色便携注册版

    Windows的设计允许程序在不受足够的限制的情况下垄断您的CPU,从而导致系统停滞和反映滞后。ProBalance根据需求智能调节运行的程序的优先次序,从而使不良进程不会对您的PC的响应产生负面影响。这一目的并非通过提高...

Global site tag (gtag.js) - Google Analytics