`
dylan0514sina.cn
  • 浏览: 92340 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

消息读

 
阅读更多
netty使用了相关的算法计算出比较合适缓冲区大小,整个流程图如下

ReceiveBufferSizePredictor可以根据实际读取的字节大小数设置下次读写叫合适的缓冲区大小。类结构如下



AdaptiveReceiveBufferSizePredictor   提供了一种自适应的计算方式,如下代码所述,当改类初始化的时候,会填充SIZE_TABLE数组。
  private static final int[] SIZE_TABLE;

    static {
        List<Integer> sizeTable = new ArrayList<Integer>();
        for (int i = 1; i <= 8; i ++) {
            sizeTable.add(i);
        }

        for (int i = 4; i < 32; i ++) {
            long v = 1L << i;
            long inc = v >>> 4;
            v -= inc << 3;

            for (int j = 0; j < 8; j ++) {
                v += inc;
                if (v > Integer.MAX_VALUE) {
                    sizeTable.add(Integer.MAX_VALUE);
                } else {
                    sizeTable.add((int) v);
                }
            }
        }

        SIZE_TABLE = new int[sizeTable.size()];
//        for (int i = 0; i < SIZE_TABLE.length; i ++) {
//            SIZE_TABLE[i] = sizeTable.get(i);
//            System.out.println(SIZE_TABLE[i]);
//        }
    }

SIZE_TABLE数据长度为232,从以下数据可以看出
1-8的数据增量为1
8-16增量为1
16-32增量为2
32-64增量为4
64-128增量为8
128-256增量为16
依次类推如算法所述,1-8直接加入到数据,8以后的分段可以如下表示2^m到2^(m+1)之间的增量是2^(m-3)次方。如上算法的结果如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
18
20
22
24
26
28
30
32
36
40
44
48
52
56
60
64
72
80
88
96
104
112
120
128
144
160
176
192
208
224
240
256
288
320
352
384
416
448
480
512
576
640
704
768
832
896
960
1024
1152
1280
1408
1536
1664
1792
1920
2048
2304
2560
2816
3072
3328
3584
3840
4096
4608
5120
5632
6144
6656
7168
7680
8192
9216
10240
11264
12288
13312
14336
15360
16384
18432
20480
22528
24576
26624
28672
30720
32768
36864
40960
45056
49152
53248
57344
61440
65536
73728
81920
90112
98304
106496
114688
122880
131072
147456
163840
180224
196608
212992
229376
245760
262144
294912
327680
360448
393216
425984
458752
491520
524288
589824
655360
720896
786432
851968
917504
983040
1048576
1179648
1310720
1441792
1572864
1703936
1835008
1966080
2097152
2359296
2621440
2883584
3145728
3407872
3670016
3932160
4194304
4718592
5242880
5767168
6291456
6815744
7340032
7864320
8388608
9437184
10485760
11534336
12582912
13631488
14680064
15728640
16777216
18874368
20971520
23068672
25165824
27262976
29360128
31457280
33554432
37748736
41943040
46137344
50331648
54525952
58720256
62914560
67108864
75497472
83886080
92274688
100663296
109051904
117440512
125829120
134217728
150994944
167772160
184549376
201326592
218103808
234881024
251658240
268435456
301989888
335544320
369098752
402653184
436207616
469762048
503316480
536870912
603979776
671088640
738197504
805306368
872415232
939524096
1006632960
1073741824
1207959552
1342177280
1476395008
1610612736
1744830464
1879048192
2013265920
2147483647

初始缓存大小为1024,可以在SIZE_TABLE中找到对应数组位置。当实际读取字节小于1024数组左偏,否则右偏;下次预计的时候根据本次已设置的索引情况来判断;直到超过最大或最小值时,以最值为主。
SocketReceiveBufferAllocator
  SocketReceiveBufferAllocator.get(size)处理逻辑如下所述,没什么特别的。值得借鉴下段位移代码部分
Buffer = null                分配DirectBuffer
Buffer.capacity < size 分配DirectBuffer
Buffer.capacity * percent > size && exceedCount=maxExceedCount 分配DirectBuffer
Buffer.capacity * percent > size && exceedCount <maxExceedCount buffer重复使用
Buffer.capacity * percent <= size exceedCount =0;buffer重复使用
分配DirectBuffer  释放已申请的Buffer占用的系统内存;计算size进1024
计算size进1024 
   // Normalize to multiple of 1024
        int q = capacity >>> 10;
        int r = capacity & 1023;
        if (r != 0) {
            q ++;
        }
        return q << 10;


java.nio.DirectByteBuffer
   直接向操作系统请求分配一些资源空间,改空间并不是JAVA推内存。之后的所有写入读取操作都不通过堆内存处理,直接操作系统资源空间。通过unsafe.allocateMemory,setMemory操作资源空间。而最后一行代码,Cleaner是一个PhantomReference实现类
  DirectByteBuffer(int cap) {			// package-private

	super(-1, 0, cap, cap, false);
	Bits.reserveMemory(cap);
	int ps = Bits.pageSize();
	long base = 0;
	try {
	    base = unsafe.allocateMemory(cap + ps);
	} catch (OutOfMemoryError x) {
	    Bits.unreserveMemory(cap);
	    throw x;
	}
	unsafe.setMemory(base, cap + ps, (byte) 0);
	if (base % ps != 0) {
	    // Round up to page boundary
	    address = base + ps - (base & (ps - 1));
	} else {
	    address = base;
	}
         //关键代码
	cleaner = Cleaner.create(this, new Deallocator(base, cap));

从如下代码可以看出Cleaner接受一个runnable对象,clean方法中调用runnable对象的run方法。
  private Cleaner(Object obj, Runnable runnable)
    {
        super(obj, dummyQueue);
        next = null;
        prev = null;
        thunk = runnable;
    }

    public static Cleaner create(Object obj, Runnable runnable)
    {
        if(runnable == null)
            return null;
        else
            return add(new Cleaner(obj, runnable));
    }

    public void clean()
    {
        if(!remove(this))
            return;
        try
        {
            thunk.run();

在这里,Deallocator即为runnable对象,可以看到由unsafe.freeMemory
public void run() {
	    if (address == 0) {
		// Paranoia
		return;
	    }
	    unsafe.freeMemory(address);
	    address = 0;
	    Bits.unreserveMemory(capacity);
	}

那clean方法调用时机在哪呢,正如JDK文档中指出当虚拟机确定对象虚可到达时,那么在那时将会被加入pending队列。在Reference中体现,

    /* List of References waiting to be enqueued.  The collector adds
     * References to this list, while the Reference-handler thread removes
     * them.  This list is protected by the above lock object.
     */
    private static Reference pending = null;

Reference的静态语句块中初始化一个后台线程,不断的遍历pending,处理pending中的每个节点
 static {
	ThreadGroup tg = Thread.currentThread().getThreadGroup();
	for (ThreadGroup tgn = tg;
	     tgn != null;
	     tg = tgn, tgn = tg.getParent());
	Thread handler = new ReferenceHandler(tg, "Reference Handler");
	/* If there were a special system-only priority greater than
	 * MAX_PRIORITY, it would be used here
	 */
	handler.setPriority(Thread.MAX_PRIORITY);
	handler.setDaemon(true);
	handler.start();
    }

处理pending代码,注意Cleaner部分
public void run() {
	    for (;;) {

		Reference r;
		synchronized (lock) {
		    if (pending != null) {
			r = pending;
			Reference rn = r.next;
			pending = (rn == r) ? null : rn;
			r.next = r;
		    } else {
			try {
			    lock.wait();
			} catch (InterruptedException x) { }
			continue;
		    }
		}

		// 原来clean是在这处理的
		if (r instanceof Cleaner) {
		    ((Cleaner)r).clean();
		    continue;
		}

		ReferenceQueue q = r.queue;
		if (q != ReferenceQueue.NULL) q.enqueue(r);
	    }
	}
    }

这种由JVM确定的调用时机,其实对开发来讲是被动的,不“靠谱的”,鬼知道它何时释放。所以我们只要争取主动释放。即调用Cleaner的clean即可。
netty中的ByteBufferUtil就干这事。
/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package org.jboss.netty.util.internal;

import java.lang.reflect.Method;
import java.nio.ByteBuffer;

/**
 * This is fork of ElasticSearch's ByteBufferAllocator.Cleaner class
 */
public final class ByteBufferUtil {
    private static final boolean CLEAN_SUPPORTED;
    private static final Method directBufferCleaner;
    private static final Method directBufferCleanerClean;

    static {
        Method directBufferCleanerX = null;
        Method directBufferCleanerCleanX = null;
        boolean v;
        try {
            directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner");
            directBufferCleanerX.setAccessible(true);
            directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean");
            directBufferCleanerCleanX.setAccessible(true);
            v = true;
        } catch (Exception e) {
            v = false;
        }
        CLEAN_SUPPORTED = v;
        directBufferCleaner = directBufferCleanerX;
        directBufferCleanerClean = directBufferCleanerCleanX;
    }

    /**
     * Destroy the given {@link ByteBuffer} if possible
     */
    public static void destroy(ByteBuffer buffer) {
        if (CLEAN_SUPPORTED && buffer.isDirect()) {
            try {
                Object cleaner = directBufferCleaner.invoke(buffer);
                directBufferCleanerClean.invoke(cleaner);
            } catch (Exception e) {
                // silently ignore exception
            }
        }
    }

    private ByteBufferUtil() {
        // Utility class
    }
}

  • 大小: 23.7 KB
  • 大小: 26.4 KB
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics