`
hougechuanqi
  • 浏览: 72106 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Mina2.0控制读取、写出流量代码(出自red5)

    博客分类:
  • MINA
 
阅读更多

public class TrafficShapingFilter extends IoFilterAdapter {

 protected static Logger log = LoggerFactory.getLogger(TrafficShapingFilter.class);

 private final AttributeKey STATE = new AttributeKey(getClass(), "state");

 //定时器服务,用来创建定时任务的定时器服务类
 private final ScheduledExecutorService scheduledExecutor;

 //用来估算对象占据JVM内存的大小的类,计算出一个对象实际占用JVM内存的大小
 private final MessageSizeEstimator messageSizeEstimator;

 private volatile int maxReadThroughput;

 private volatile int maxWriteThroughput;

 private volatile int poolSize = 1;

 /**
  * 构造方法
  * @param maxReadThroughput 最大读取字节大小(单位:秒)
  * @param maxWriteThroughput 最大写出的字节大小(单位:秒)
  */
 public TrafficShapingFilter(int maxReadThroughput, int maxWriteThroughput) {
  this(null, null, maxReadThroughput, maxWriteThroughput);
 }

 public TrafficShapingFilter(ScheduledExecutorService scheduledExecutor, int maxReadThroughput,
   int maxWriteThroughput) {
  this(scheduledExecutor, null, maxReadThroughput, maxWriteThroughput);
 }

 public TrafficShapingFilter(ScheduledExecutorService scheduledExecutor, MessageSizeEstimator messageSizeEstimator,
   int maxReadThroughput, int maxWriteThroughput) {

  log.debug("ctor - executor: {} estimator: {} max read: {} max write: {}", new Object[] { scheduledExecutor,
    messageSizeEstimator, maxReadThroughput, maxWriteThroughput });

  if (scheduledExecutor == null) {
   //实例化一个定时器对象,线程池的默认数量是1.
   scheduledExecutor = new ScheduledThreadPoolExecutor(poolSize);
   //throw new NullPointerException("scheduledExecutor");
  }

  if (messageSizeEstimator == null) {
   //实例化默认的估算消息大小的类
   messageSizeEstimator = new DefaultMessageSizeEstimator() {
    @Override
    public int estimateSize(Object message) {
     if (message instanceof IoBuffer) {
      return ((IoBuffer) message).remaining();
     }
     return super.estimateSize(message);
    }
   };
  }

  this.scheduledExecutor = scheduledExecutor;
  this.messageSizeEstimator = messageSizeEstimator;
  //设置最大读取字节长度(单位:秒)
  setMaxReadThroughput(maxReadThroughput);
  //设置最大写出字节长度(单位:秒)
  setMaxWriteThroughput(maxWriteThroughput);
 }

 public ScheduledExecutorService getScheduledExecutor() {
  return scheduledExecutor;
 }

 public MessageSizeEstimator getMessageSizeEstimator() {
  return messageSizeEstimator;
 }

 public int getMaxReadThroughput() {
  return maxReadThroughput;
 }

 public void setMaxReadThroughput(int maxReadThroughput) {
  if (maxReadThroughput < 0) {
   maxReadThroughput = 0;
  }
  this.maxReadThroughput = maxReadThroughput;
 }

 public int getMaxWriteThroughput() {
  return maxWriteThroughput;
 }

 public void setMaxWriteThroughput(int maxWriteThroughput) {
  if (maxWriteThroughput < 0) {
   maxWriteThroughput = 0;
  }
  this.maxWriteThroughput = maxWriteThroughput;
 }

 public int getPoolSize() {
  return poolSize;
 }

 public void setPoolSize(int poolSize) {
  if (poolSize < 1) {
   poolSize = 1;
  }
  this.poolSize = poolSize;
 }

 @Override
 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
  if (parent.contains(this)) {
   throw new IllegalArgumentException(
     "You can't add the same filter instance more than once.  Create another instance and add it.");
  }
  //给每一个session添加一个属性 STATE 属性,关联一个State对象。
  parent.getSession().setAttribute(STATE, new State());
  //调节会话sessiion读取buffer大小
  adjustReadBufferSize(parent.getSession());
 }

 @Override
 public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
  //写完毕,关闭会话移除关联对象State
  parent.getSession().removeAttribute(STATE);
 }

 @Override
 public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) throws Exception {

  int maxReadThroughput = this.maxReadThroughput;
  //process the request if our max is greater than zero
  if (maxReadThroughput > 0) {
   final State state = (State) session.getAttribute(STATE);
   long currentTime = System.currentTimeMillis();

   long suspendTime = 0;
   boolean firstRead = false;
   synchronized (state) {
    //估算当前已经读取字节数组总量
    state.readBytes += messageSizeEstimator.estimateSize(message);
                 //如果读取回话挂起,suspendedRead=true
    if (!state.suspendedRead) {
     if (state.readStartTime == 0) {//表示第一次读取会话的数据
      firstRead = true;
      //设置会话开始读取数据的时间
      state.readStartTime = currentTime - 1000;
     }
                    //估算当前平均每秒读取字节流量大小
     long throughput = (state.readBytes * 1000 / (currentTime - state.readStartTime));
     if (throughput >= maxReadThroughput) {//如果平均读取流量大于设置的数值,
      //计算需要挂起的时间,((state.readBytes/maxReadThroughput)*1000)计算如果按照规定流量读取数据需要多少秒,然后再减去已经读取的时间差,计算出需要挂起的时间
      suspendTime = Math.max(0, (state.readBytes * 1000 / maxReadThroughput)
        - (firstRead ? 0 : currentTime - state.readStartTime));

      state.readBytes = 0;
      state.readStartTime = 0;
      state.suspendedRead = suspendTime != 0;
                        //调整会话状态,设置会话不在读取数据,利用定时器,挂起会话,
      adjustReadBufferSize(session);
     }
    }
   }

   if (suspendTime != 0) {
    session.suspendRead();
    scheduledExecutor.schedule(new Runnable() {
     public void run() {
      synchronized (state) {
       state.suspendedRead = false;
      }
      session.resumeRead();
     }
    }, suspendTime, TimeUnit.MILLISECONDS);
   }
  }

  nextFilter.messageReceived(session, message);

 }

 /**
  *
  * 调整session默认设置最大读取字节数组长度
  * @param session
  */
 private void adjustReadBufferSize(IoSession session) {
  int maxReadThroughput = this.maxReadThroughput;
  if (maxReadThroughput == 0) {//如果不限制读取流量返回
   return;
  }
  IoSessionConfig config = session.getConfig();
  if (config.getReadBufferSize() > maxReadThroughput) {
   config.setReadBufferSize(maxReadThroughput);
  }
  if (config.getMaxReadBufferSize() > maxReadThroughput) {
   config.setMaxReadBufferSize(maxReadThroughput);
  }
 }

 @Override
 public void messageSent(NextFilter nextFilter, final IoSession session, WriteRequest writeRequest) throws Exception {
          //得到当前系统设置最大写出字节
  int maxWriteThroughput = this.maxWriteThroughput;
  //process the request if our max is greater than zero
  if (maxWriteThroughput > 0) {
   final State state = (State) session.getAttribute(STATE);
   //得到系统时间
   long currentTime = System.currentTimeMillis();
            //挂起时间长度0
   long suspendTime = 0;
   boolean firstWrite = false;
   synchronized (state) {
    state.writtenBytes += messageSizeEstimator.estimateSize(writeRequest.getMessage());
    if (!state.suspendedWrite) {
     if (state.writeStartTime == 0) {
      firstWrite = true;
      //初始化写时间
      state.writeStartTime = currentTime - 1000;
     }
                    //计算平均写字节数组流量
     long throughput = (state.writtenBytes * 1000 / (currentTime - state.writeStartTime));
     if (throughput >= maxWriteThroughput) {//写流量超出系统设置,会话挂起操作
      //计算会话需要挂起的时间
      suspendTime = Math.max(0, state.writtenBytes * 1000 / maxWriteThroughput
        - (firstWrite ? 0 : currentTime - state.writeStartTime));
      state.writtenBytes = 0;
      state.writeStartTime = 0;
      state.suspendedWrite = suspendTime != 0;
     }
    }
   }

   if (suspendTime != 0) {
    log.trace("Suspending write");
    //挂起会话
    session.suspendWrite();
    //定时器执行定时挂起操作
    scheduledExecutor.schedule(new Runnable() {
     public void run() {
      synchronized (state) {
       state.suspendedWrite = false;
      }
      //挂起唤醒会话
      session.resumeWrite();
      log.trace("Resuming write");
     }
    }, suspendTime, TimeUnit.MILLISECONDS);
   }
  }
        //串模式,执行下一个过滤器
  nextFilter.messageSent(session, writeRequest);

 }

 /**
  * 状态标志
  * @author Administrator
  *
  */
 private static class State {
  /**
   * 开始读取数据的时间
   */
  private long readStartTime;

  /**
   * 开始写数据的时间
   */
  private long writeStartTime;

  /**
   * 读数据是否已经被挂起,true:挂起,false:未挂起
   */
  private boolean suspendedRead;

  /**
   * 是否写操作被挂起,true:被挂起,false:未被挂起
   */
  private boolean suspendedWrite;

  /**
   * 该会话总共被读取数据字节长度
   */
  private long readBytes;

  /**
   * 总共被写的数据的长度
   */
  private long writtenBytes;
 }
}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics