`
lixinye0123
  • 浏览: 323654 次
  • 性别: Icon_minigender_1
  • 来自: 温州
社区版块
存档分类
最新评论

Java 5.0 多线程编程实践

    博客分类:
  • Java
阅读更多
<noscript type="text/javascript"><!----></noscript><noscript src="http://pagead2.googlesyndication.com/pagead/show_ads.js" type="text/javascript"> </noscript>
  Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了 Callable, Future等接口,并使用了Java 5的另外一个新特性泛型。

  简介

  本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信息。一个典型的网络服务器模型如下:

  1. 建立监听端口。

  2. 发现有新连接,接受连接,启动线程,执行服务线程。 3. 服务完毕,关闭线程。

   这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线程的创建销毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方面得到很大提高。因此,本文的网络服务器 模型将如下:

  1. 建立监听端口,创建线程池。

  2. 发现有新连接,使用线程池来执行服务任务。

  3. 服务完毕,释放线程到线程池。

  下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。

  初始化

   初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态方法 newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个 java.util.concurrent.ThreadPoolExecutor实例来执行任务。这里我们采用newFixedThreadPool方 法来建立线程池。

ExecutorService pool = Executors.newFixedThreadPool(10);

  表示新建了一个线程池,线程池里面有10个线程为任务队列服务。

  使用ServerSocket对象来初始化监听端口。

private static final int PORT = 19527;
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
serverListenSocket.setReuseAddress(true);

  服务新连接

  当有新连接建立时,accept返回时,将服务任务提交给线程池执行。

while(true){
 Socket socket = serverListenSocket.accept();
 pool.execute(new ServiceThread(socket));
}

  这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。

  服务任务

   服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此 ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个 线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码:

private static ReentrantLock lock = new ReentrantLock ();
private static int count = 0;
private int getCount(){
 int ret = 0;
 try{
  lock.lock();
  ret = count;
 }finally{
  lock.unlock();
 }
 return ret;
}
private void increaseCount(){
 try{
  lock.lock();
  ++count;
 }finally{
  lock.unlock();
 }
}

  服务线程在开始给客户端打印一个欢迎信息,

increaseCount();
int curCount = getCount();
helloString = "hello, id = " + curCount+"\r\n";
dos = new DataOutputStream(connectedSocket.getOutputStream());
dos.write(helloString.getBytes());

   然后使用ExecutorService的submit方法提交一个Callable的任务,返回一个Future接口的引用。这种做法对费时的任务非 常有效,submit任务之后可以继续执行下面的代码,然后在适当的位置可以使用Future的get方法来获取结果,如果这时候该方法已经执行完毕,则 无需等待即可获得结果,如果还在执行,则等待到运行完毕。

ExecutorService executor = Executors.newSingleThreadExecutor();
Future <String> future = executor.submit(new TimeConsumingTask());
dos.write("let's do soemthing other".getBytes());
String result = future.get();
dos.write(result.getBytes());

  其中TimeConsumingTask实现了Callable接口

class TimeConsumingTask implements Callable <String>{
 public String call() throws Exception {
  System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
  return "ok, here's the result: It takes me lots of time to produce this result";
 }
}

   这里使用了Java 5的另外一个新特性泛型,声明TimeConsumingTask的时候使用了String做为类型参数。必须实现Callable接口的call函数, 其作用类似与Runnable中的run函数,在call函数里写入要执行的代码,其返回值类型等同于在类声明中传入的类型值。在这段程序中,我们提交了 一个Callable的任务,然后程序不会堵塞,而是继续执行dos.write("let's do soemthing other".getBytes());当程序执行到String result = future.get()时如果call函数已经执行完毕,则取得返回值,如果还在执行,则等待其执行完毕。
服务器端的完整实现

  服务器端的完整实现代码如下:

package com.andrew;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Server {
 private static int produceTaskSleepTime = 100;
 private static int consumeTaskSleepTime = 1200;
 private static int produceTaskMaxNumber = 100;
 private static final int CORE_POOL_SIZE = 2;
 private static final int MAX_POOL_SIZE = 100;
 private static final int KEEPALIVE_TIME = 3;
 private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
 private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
 private static final String HOST = "127.0.0.1";
 private static final int PORT = 19527;
 private BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
 //private ThreadPoolExecutor serverThreadPool = null;
 private ExecutorService pool = null;
 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
 private ServerSocket serverListenSocket = null;
 private int times = 5;
 public void start() {
  // You can also init thread pool in this way.
  /*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
  MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue,
  rejectedExecutionHandler);*/
  pool = Executors.newFixedThreadPool(10);
  try {
   serverListenSocket = new ServerSocket(PORT);
   serverListenSocket.setReuseAddress(true);

   System.out.println("I'm listening");
   while (times-- > 0) {
    Socket socket = serverListenSocket.accept();
    String welcomeString = "hello";
    //serverThreadPool.execute(new ServiceThread(socket, welcomeString));
    pool.execute(new ServiceThread(socket));
   }
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  cleanup();
 }

 public void cleanup() {
  if (null != serverListenSocket) {
   try {
    serverListenSocket.close();
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
  //serverThreadPool.shutdown();
  pool.shutdown();
 }

 public static void main(String args[]) {
  Server server = new Server();
  server.start();
 }
}

class ServiceThread implements Runnable, Serializable {
 private static final long serialVersionUID = 0;
 private Socket connectedSocket = null;
 private String helloString = null;
 private static int count = 0;
 private static ReentrantLock lock = new ReentrantLock();

 ServiceThread(Socket socket) {
  connectedSocket = socket;
 }

 public void run() {
  increaseCount();
  int curCount = getCount();
  helloString = "hello, id = " + curCount + "\r\n";

  ExecutorService executor = Executors.newSingleThreadExecutor();
  Future<String> future = executor.submit(new TimeConsumingTask());

  DataOutputStream dos = null;
  try {
   dos = new DataOutputStream(connectedSocket.getOutputStream());
   dos.write(helloString.getBytes());
   try {
    dos.write("let's do soemthing other.\r\n".getBytes());
    String result = future.get();
    dos.write(result.getBytes());
   } catch (InterruptedException e) {
    e.printStackTrace();
   } catch (ExecutionException e) {
    e.printStackTrace();
   }
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } finally {
   if (null != connectedSocket) {
    try {
     connectedSocket.close();
    } catch (IOException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
   if (null != dos) {
    try {
     dos.close();
    } catch (IOException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
   executor.shutdown();
  }
 }

 private int getCount() {
  int ret = 0;
  try {
   lock.lock();
   ret = count;
  } finally {
   lock.unlock();
  }
  return ret;
 }

 private void increaseCount() {
  try {
   lock.lock();
   ++count;
  } finally {
   lock.unlock();
  }
 }
}

class TimeConsumingTask implements Callable<String> {
 public String call() throws Exception {
  System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
  return "ok, here's the result: It takes me lots of time to produce this result";
 }

}

  运行程序

  运行服务端,客户端只需使用telnet 127.0.0.1 19527 即可看到信息如下:

分享到:
评论

相关推荐

    Java5.0多线程编程实践.pdf

    Java5.0多线程编程实践.pdf

    (超赞)JAVA精华之--深入JAVA API

    1.7 Java 5.0多线程编程 1.8 Java Socket编程 1.9 Java的内存泄漏 1.10 抽象类与接口的区别 1.11 Java变量类型间的相互转换 2 JAVA与WEB 2.1 JMX规范 2.1.1 JMX概述 2.1.2 设备层(Instrumentation Level) 2.1.3 ...

    JAVA SE学习精华集锦

    1.7 Java 5.0多线程编程 65 1.8 Java Socket编程 80 1.9 Java的内存泄漏 85 1.10 抽象类与接口的区别 86 1.11 Java变量类型间的相互转换 87 2 JAVA与WEB 87 2.1 JMX规范 87 2.1.1 JMX概述 87 2.1.2 设备层...

    精通Java:JDK、数据库系统开发Web开发(实例代码)

    本书每一节的例子都是精挑细选的,具有很经听针对性,力求让读者通过实践掌握Java编程的基本方法 本书适合没有编程经验的初、中级读者,高等院校计算机相关专业的师生及SCJP认证考试学员学习和使用。 第1篇 认识...

    Java优化编程(第2版)

    1.11 java编程实践 1.11.1 访问实例与类中变量的规则 1.11.2 引用类中的静态变量与方法的 …… 小结 第4章 java核心类与性能优化 4.1 散列表类与性能优化 4.1.1 线程同步散列表类 4.1.2 设置arraylist初始化容量 ...

    Java SE实践教程 pdf格式电子书 下载(四) 更新

    1.2.3 J2SE 5.0新特性实践 26 1.3 小结 35 第2章 对象无处不在——面向对象的基本概念 37 2.1 讲解 38 2.1.1 什么是面向对象 38 2.1.2 面向对象的基本概念 38 2.1.3 Java对面向对象的支持 41 2.2 练习 42 ...

    Java SE实践教程 源代码 下载

    1.2.3 J2SE 5.0新特性实践 26 1.3 小结 35 第2章 对象无处不在——面向对象的基本概念 37 2.1 讲解 38 2.1.1 什么是面向对象 38 2.1.2 面向对象的基本概念 38 2.1.3 Java对面向对象的支持 41 2.2 练习 42 ...

    Java SE实践教程 pdf格式电子书 下载(一) 更新

    1.2.3 J2SE 5.0新特性实践 26 1.3 小结 35 第2章 对象无处不在——面向对象的基本概念 37 2.1 讲解 38 2.1.1 什么是面向对象 38 2.1.2 面向对象的基本概念 38 2.1.3 Java对面向对象的支持 41 2.2 练习 42 ...

    Hands-On-High-Performance-with-Spring-5:Packt发布的Spring 5的动手高性能

    本书涵盖以下激动人心的功能: 掌握Bean Wiring的最佳编程实践和性能改进分析各种AOP实施的性能探索与Spring的数据库交互以优化设计和配置解决Hibernate性能问题和陷阱利用多线程和并发编程来提高应用程序性能如果...

    Hibernate实战(第2版 中文高清版)

     17.1 Java EE 5.0编程模型   17.1.1 JSF详解   17.1.2 EJB 3.0详解   17.1.3 用JSF和EJB 3.0编写Web应用程序   17.1.4 分析应用程序   17.2 用Seam改善应用程序   17.2.1 配置Seam   17.2.2 将页面...

    Android程序设计基础

    1.1.1 Java 5.0+ 3 1.1.2 Eclipse 4 1.1.3 Android 4 1.1.4 Eclipse插件 5 1.2 创建第一个程序 7 1.3 在模拟器上运行程序 8 1.4 在手机上运行程序 9 1.5 快速阅读指南 9 第2章 基本概念 11 2.1 Android的...

    asp.net知识库

    关于能自定义格式的、支持多语言的、支持多数据库的代码生成器的想法 发布Oracle存储过程包c#代码生成工具(CodeRobot) New Folder XCodeFactory3.0完全攻略--序 XCodeFactory3.0完全攻略--基本思想 XCodeFactory...

Global site tag (gtag.js) - Google Analytics