消息读
netty使用了相关的算法计算出比较合适缓冲区大小,整个流程图如下
ReceiveBufferSizePredictor可以根据实际读取的字节大小数设置下次读写叫合适的缓冲区大小。类结构如下
AdaptiveReceiveBufferSizePredictor 提供了一种自适应的计算方式,如下代码所述,当改类初始化的时候,会填充SIZE_TABLE数组。
private static final int[] SIZE_TABLE; static { List<Integer> sizeTable = new ArrayList<Integer>(); for (int i = 1; i <= 8; i ++) { sizeTable.add(i); } for (int i = 4; i < 32; i ++) { long v = 1L << i; long inc = v >>> 4; v -= inc << 3; for (int j = 0; j < 8; j ++) { v += inc; if (v > Integer.MAX_VALUE) { sizeTable.add(Integer.MAX_VALUE); } else { sizeTable.add((int) v); } } } SIZE_TABLE = new int[sizeTable.size()];// for (int i = 0; i < SIZE_TABLE.length; i ++) {// SIZE_TABLE[i] = sizeTable.get(i);// System.out.println(SIZE_TABLE[i]);// } }123456789101112131415161820222426283032364044485256606472808896104112120128144160176192208224240256288320352384416448480512576640704768832896960102411521280140815361664179219202048230425602816307233283584384040964608512056326144665671687680819292161024011264122881331214336153601638418432204802252824576266242867230720327683686440960450564915253248573446144065536737288192090112983041064961146881228801310721474561638401802241966082129922293762457602621442949123276803604483932164259844587524915205242885898246553607208967864328519689175049830401048576117964813107201441792157286417039361835008196608020971522359296262144028835843145728340787236700163932160419430447185925242880576716862914566815744734003278643208388608943718410485760115343361258291213631488146800641572864016777216188743682097152023068672251658242726297629360128314572803355443237748736419430404613734450331648545259525872025662914560671088647549747283886080922746881006632961090519041174405121258291201342177281509949441677721601845493762013265922181038082348810242516582402684354563019898883355443203690987524026531844362076164697620485033164805368709126039797766710886407381975048053063688724152329395240961006632960107374182412079595521342177280147639500816106127361744830464187904819220132659202147483647
Buffer = null 分配DirectBufferBuffer.capacity < size 分配DirectBufferBuffer.capacity * percent > size && exceedCount=maxExceedCount 分配DirectBufferBuffer.capacity * percent > size && exceedCount <maxExceedCount buffer重复使用Buffer.capacity * percent <= size exceedCount =0;buffer重复使用分配DirectBuffer 释放已申请的Buffer占用的系统内存;计算size进1024计算size进1024 // Normalize to multiple of 1024 int q = capacity >>> 10; int r = capacity & 1023; if (r != 0) { q ++; } return q << 10; DirectByteBuffer(int cap) {// package-privatesuper(-1, 0, cap, cap, false);Bits.reserveMemory(cap);int ps = Bits.pageSize();long base = 0;try { base = unsafe.allocateMemory(cap + ps);} catch (OutOfMemoryError x) { Bits.unreserveMemory(cap); throw x;}unsafe.setMemory(base, cap + ps, (byte) 0);if (base % ps != 0) { // Round up to page boundary address = base + ps - (base & (ps - 1));} else { address = base;} //关键代码cleaner = Cleaner.create(this, new Deallocator(base, cap)); private Cleaner(Object obj, Runnable runnable) { super(obj, dummyQueue); next = null; prev = null; thunk = runnable; } public static Cleaner create(Object obj, Runnable runnable) { if(runnable == null) return null; else return add(new Cleaner(obj, runnable)); } public void clean() { if(!remove(this)) return; try { thunk.run();public void run() { if (address == 0) {// Paranoiareturn; } unsafe.freeMemory(address); address = 0; Bits.unreserveMemory(capacity);}/* List of References waiting to be enqueued. The collector adds * References to this list, while the Reference-handler thread removes * them. This list is protected by the above lock object. */ private static Reference pending = null;
static {ThreadGroup tg = Thread.currentThread().getThreadGroup();for (ThreadGroup tgn = tg; tgn != null; tg = tgn, tgn = tg.getParent());Thread handler = new ReferenceHandler(tg, "Reference Handler");/* If there were a special system-only priority greater than * MAX_PRIORITY, it would be used here */handler.setPriority(Thread.MAX_PRIORITY);handler.setDaemon(true);handler.start(); }public void run() { for (;;) {Reference r;synchronized (lock) { if (pending != null) {r = pending;Reference rn = r.next;pending = (rn == r) ? null : rn;r.next = r; } else {try { lock.wait();} catch (InterruptedException x) { }continue; }}// 原来clean是在这处理的if (r instanceof Cleaner) { ((Cleaner)r).clean(); continue;}ReferenceQueue q = r.queue;if (q != ReferenceQueue.NULL) q.enqueue(r); }} }/* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */package org.jboss.netty.util.internal;import java.lang.reflect.Method;import java.nio.ByteBuffer;/** * This is fork of ElasticSearch's ByteBufferAllocator.Cleaner class */public final class ByteBufferUtil { private static final boolean CLEAN_SUPPORTED; private static final Method directBufferCleaner; private static final Method directBufferCleanerClean; static { Method directBufferCleanerX = null; Method directBufferCleanerCleanX = null; boolean v; try { directBufferCleanerX = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner"); directBufferCleanerX.setAccessible(true); directBufferCleanerCleanX = Class.forName("sun.misc.Cleaner").getMethod("clean"); directBufferCleanerCleanX.setAccessible(true); v = true; } catch (Exception e) { v = false; } CLEAN_SUPPORTED = v; directBufferCleaner = directBufferCleanerX; directBufferCleanerClean = directBufferCleanerCleanX; } /** * Destroy the given {@link ByteBuffer} if possible */ public static void destroy(ByteBuffer buffer) { if (CLEAN_SUPPORTED && buffer.isDirect()) { try { Object cleaner = directBufferCleaner.invoke(buffer); directBufferCleanerClean.invoke(cleaner); } catch (Exception e) { // silently ignore exception } } } private ByteBufferUtil() { // Utility class }}