BufferedInputStream学习笔记
BufferedInputStream是一个带有缓冲区域的InputStream,它的继承体系如下:
InputStream
|__FilterInputStream
|__BufferedInputStream
首先了解一下FilterInputStream:
FilterInputStream通过装饰器模式将InputStream封装至内部的一个成员变量:
protected volatile InputStream in;
private static int defaultBufferSize = 8192 //该变量定义了默认的缓冲大小protected volatile byte buf[]; //缓冲数组,注意该成员变量同样使用了volatile关键字进行修饰,作用为在多线程环境中,当对该变量引用进行修改时保证了内存的可见性。private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater.newUpdater(BufferedInputStream.class, byte[].class, "buf")//缓存数组的原子更新器,该成员变量与buf数组的volatile关键字共同组成了buf数组的原子更新功能实现。protected int count;//该成员变量表示目前缓冲区域中有多少有效的字节。protected int pos;//该成员变量表示了当前缓冲区的读取位置。protected int markpos = -1;/*表示标记位置,该标记位置的作用为:实现流的标记特性,即流的某个位置可以被设置为标记,允许通过设置reset(),将流的读取位置进行重置到该标记位置,但是InputStream注释上明确表示,该流不会无限的保证标记长度可以无限延长,即markpos=15,pos=139734,该保留区间可能已经超过了保留的极限(如下)*/protected int marklimit;/*该成员变量表示了上面提到的标记最大保留区间大小,当pos-markpos> marklimit时,mark标记可能会被清除(根据实现确定)。*/
public BufferedInputStream(InputStream in, int size) {super(in); if (size <= 0) { throw new IllegalArgumentException("Buffer size <= 0"); }buf = new byte[size];}
private void fill() throws IOException { byte[] buffer = getBufIfOpen();if (markpos < 0) { /*如果不存在标记位置(即没有需要进行reset的位置需求) 则可以进行大胆地直接重置pos标识下一可读取位置,但是这样 不是会读取到以前的旧数据吗?不用担心,在后面的代码里☆会实现输入流的新 数据填充*/ pos = 0;}else if (pos >= buffer.length){ /* 位置大于缓冲区长度,这里表示已经没有可用空间了 */ if (markpos > 0) { /* 表示存在mark位置,则要对mark位置到pos位置的数据予以保留, 以确保后面如果调用reset()重新从mark位置读取会取得成功*/int sz = pos - markpos; /*该实现是通过将缓冲区域中markpos至pos部分的移至缓冲区头部实现*/System.arraycopy(buffer, markpos, buffer, 0, sz);pos = sz;markpos = 0; } else if (buffer.length >= marklimit) { /* 如果缓冲区已经足够大,可以容纳marklimit,则直接重置*/ markpos = -1;pos = 0;/* 丢弃所有的缓冲区内容 */ } else { /* 如果缓冲区还能增长的空间,则进行缓冲区扩容*/int nsz = pos * 2; /*新的缓冲区大小设置成满足最大标记极限即可*/if (nsz > marklimit) nsz = marklimit;byte nbuf[] = new byte[nsz]; //将原来的较小的缓冲内容COPY至增容的新缓冲区中System.arraycopy(buffer, 0, nbuf, 0, pos); //这里使用了原子变量引用更新,确保多线程环境下内存的可见性 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) { // Can't replace buf if there was an async close. // Note: This would need to be changed if fill() // is ever made accessible to multiple threads. // But for now, the only way CAS can fail is via close. // assert buf == null; throw new IOException("Stream closed"); } buffer = nbuf; } count = pos; //从原始输入流中读取数据,填充缓冲区int n = getInIfOpen().read(buffer, pos, buffer.length - pos); //根据实际读取的字节数更新缓冲区中可用字节数 if (n > 0) count = n + pos; }
public synchronized int read() throws IOException {if (pos >= count) { /*表示读取位置已经超过了缓冲区可用范围,则对缓冲区进行重新填充*/ fill(); /*当填充后再次读取时发现没有数据可读,证明读到了流末尾*/ if (pos >= count)return -1;} /*这里表示读取位置尚未超过缓冲区有效范围,直接返回缓冲区内容*/return getBufIfOpen()[pos++] & 0xff;}
private int read1(byte[] b, int off, int len) throws IOException {int avail = count - pos;if (avail <= 0) { /*这里使用了一个巧妙的机制,如果读取的长度大于缓冲区的长度 并且没有markpos,则直接从原始输入流中进行读取,从而避免无谓的 COPY(从原始输入流至缓冲区,读取缓冲区全部数据,清空缓冲区, 重新填入原始输入流数据)*/ if (len >= getBufIfOpen().length && markpos < 0) {return getInIfOpen().read(b, off, len); } /*当无数据可读时,从原始流中载入数据到缓冲区中*/ fill(); avail = count - pos; if (avail <= 0) return -1;}int cnt = (avail < len) ? avail : len; /*从缓冲区中读取数据,返回实际读取到的大小*/System.arraycopy(getBufIfOpen(), pos, b, off, cnt);pos += cnt;return cnt; }
public synchronized int read(byte b[], int off, int len)throws IOException { getBufIfOpen(); // Check for closed stream if ((off | len | (off + len) | (b.length - (off + len))) < 0) { throw new IndexOutOfBoundsException();} else if (len == 0) { return 0; }int n = 0; for (;;) { int nread = read1(b, off + n, len - n); if (nread <= 0) return (n == 0) ? nread : n; n += nread; if (n >= len) return n; // if not closed but no bytes available, return InputStream input = in; if (input != null && input.available() <= 0) return n; } }
public synchronized long skip(long n) throws IOException { getBufIfOpen(); // Check for closed streamif (n <= 0) { return 0;}long avail = count - pos; if (avail <= 0) { // If no mark position set then don't keep in buffer //从上面的注释可以知道,这也是一个巧妙的方法,如果没有mark标记, // 则直接从原始输入流中skip if (markpos <0) return getInIfOpen().skip(n); // Fill in buffer to save bytes for reset fill(); avail = count - pos; if (avail <= 0) return 0; } //该方法的实现为尽量原则,不保证一定略过规定的字节数。 long skipped = (avail < n) ? avail : n; pos += skipped; return skipped; }
public synchronized int available() throws IOException {return getInIfOpen().available() + (count - pos); }
public synchronized void mark(int readlimit) {marklimit = readlimit;markpos = pos; }
public synchronized void reset() throws IOException { getBufIfOpen(); // Cause exception if closedif (markpos < 0) throw new IOException("Resetting to invalid mark");pos = markpos; }
public void close() throws IOException { byte[] buffer; while ( (buffer = buf) != null) { if (bufUpdater.compareAndSet(this, buffer, null)) { InputStream input = in; in = null; if (input != null) input.close(); return; } // Else retry in case a new buf was CASed in fill() } }