首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

JAVA NIO跟MINA发送数据过程解析

2012-08-31 
JAVA NIO和MINA发送数据过程解析NIO发送数据过程:1 将信道写入操作加锁保证其他线程不对信道写入(文档中称

JAVA NIO和MINA发送数据过程解析

NIO发送数据过程:
1 将信道写入操作加锁保证其他线程不对信道写入(文档中称“是如果另一个线程已经在此通道上发起了一个写入操作,则在该操作完成前此方法的调用被阻塞。”)
2 如果缓冲区为非直接缓冲区,则复制缓冲区内容到直接缓冲区,防止外界对缓冲区内容修改导致发送数据损坏
??? * 复制过程分配的内存将被捆绑在线程上,在线程关闭之前,这部分内存不被回收,等IO操作完成后,可重用这部分内存,减少内存分配过程,其性能消耗更多在复制内容上。
??? * 直接缓冲区与非直接缓冲区参见ByteBuffer类文档
3 通过JNI调用本地方法直接将缓冲区地址和长度传递给操作系统,由系统进行异步IO操作

由此可见jvm与操作系统共用直接缓冲区。

MINA发送数据过程:
?MINA发送数据过程在
??? org.apache.mina.core.polling.AbstractPollingIoProcessor.flushNow(T, long)
??? org.apache.mina.core.polling.AbstractPollingIoProcessor.writeBuffer(T, WriteRequest, boolean, int, long)
??? org.apache.mina.transport.socket.nio.NioProcessor.write(NioSession, IoBuffer, int)
?方法中。
?
??? flashNow方法负责清空IoSession.writeRequestQueue列队。为保证传输效率,flashNow方法尽量将输出长度匹配到接收缓冲区长度的1.5倍。
??? flashNow方法循环调用writeBuffer,直到所调用writeBuffer方法返回值总和大于等于接收缓冲区容量的1.5倍、writeBuffer返回0或全部消息发送成功。
??? writeBuffer方法负责发送一条消息数据,write方法返回0,writeBuffer将连续尝试256次。
??? 如果消息完整发送,将调用IoFilterChain.fireMessageSent(req)方法。如果有数据发送,将返回实际发送数据长度,否则返回0。
??? ??? * 如果连接支持碎片拼接(如TCP连接),则writeBuffer方法可以决定输出消息的一部分,保证输出数据长度不超过输入缓冲区的1.5倍。
??? ??? * 如果连接不支持碎片拼接(如UDP连接),则writterBuffer方法必须将消息完整输出。
??? write方法调用信道的write方法,将IoBuffer包装的ByteBuffer放入操作系统缓冲区,并返回结果。
???
由此可见,在基本NIO中,如果系统底层缓冲区填满,SocketChannel.write方法将返回0,业务逻辑必须处理此情况,否则将丢失发送数据。
在MINA中,使用消息分批或拆分方式将最大效率利用系统缓冲区,并保证由于系统缓冲区装满而未发送的消息将重新发送。

?

?

?

相关代码:

?

sun.nio.ch.SocketChannelImpl.write(ByteBuffer)关键代码:
??? ??? .
??? ??? .
??? ??? .
??? ??? for (;;) {
??? ??? ??? n = IOUtil.write(fd, buf, -1, nd, writeLock);
??? ??? ??? if ((n == IOStatus.INTERRUPTED) && isOpen())
??? ??? ??? continue;
??? ??? ??? return IOStatus.normalize(n);
??? ??? }
??? ??? .
??? ??? .
??? ??? .
??? ???
sun.nio.ch.IOUtil.write(FileDescriptor, ByteBuffer[], int, int, NativeDispatcher) 关键代码:
??? ??? .
??? ??? .
??? ??? .
??? ???
??? IOVecWrapper vec = IOVecWrapper.get(length);
??? .
??? .
??????? ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
??????? hadow.put(buf);
??????? shadow.flip();
??????? vec.setShadow(iov_len, shadow);
??? .
??? .
??? .
??? long bytesWritten = nd.writev(fd, vec.address, iov_len);
???
SocketDispatcher.writev:
??? long writev(FileDescriptor fd, long address, int len) throws IOException {
??????? return writev0(fd, address, len);
??? }

?


特殊返回值定义在:
final class IOStatus {
??? static final int EOF = -1;??? ??? ??? // End of file
??? static final int UNAVAILABLE = -2;??? ??? // Nothing available (non-blocking)
??? static final int INTERRUPTED = -3;??? ??? // System call interrupted
??? static final int UNSUPPORTED = -4;??? ??? // Operation not supported
??? static final int THROWN = -5;??? ??? // Exception thrown in JNI code
??? static final int UNSUPPORTED_CASE = -6; // This case not supported
???
??? static int normalize(int n) {
??? ??? if (n == UNAVAILABLE)
??? ??? ??? return 0;
??? ??? return n;
??? }
}
windows下JNI调用:
Java_sun_nio_ch_SocketDispatcher_writev0(JNIEnv *env, jclass clazz,
???????????????????????????????????????? jobject fdo, jlong address, jint len)
{
??? /* set up */
??? int i = 0;
??? DWORD written = 0;
??? jint fd = fdval(env, fdo);
??? struct iovec *iovp = (struct iovec *)address;
??? WSABUF *bufs = malloc(len * sizeof(WSABUF));

??? if (bufs == 0) {
??????? JNU_ThrowOutOfMemoryError(env, 0);
??????? return IOS_THROWN;
??? }

??? if ((isNT() == JNI_FALSE) && (len > 16)) {
??????? len = 16;
??? }

??? /* copy iovec into WSABUF */
??? for(i=0; i<len; i++) {
??????? bufs[i].buf = (char *)iovp[i].iov_base;
??????? bufs[i].len = (u_long)iovp[i].iov_len;
??? }
???
??? /* read into the buffers */
??? i = WSASend((SOCKET)fd, /* Socket */
??????????? bufs,?????????? /* pointers to the buffers */
??????????? (DWORD)len,???? /* number of buffers to process */
??????????? &written,?????? /* receives number of bytes written */
??????????? 0,????????????? /* no flags */
??????????? 0,????????????? /* no overlapped sockets */
??????????? 0);???????????? /* no completion routine */

??? /* clean up */
??? free(bufs);

??? if (i != 0) {
??????? int theErr = (jint)WSAGetLastError();
??????? if (theErr == WSAEWOULDBLOCK) {
??????????? return IOS_UNAVAILABLE;
??????? }
??????? JNU_ThrowIOExceptionWithLastError(env, "Vector write failed");
??????? return IOS_THROWN;
??? }

??? return convertLongReturnVal(env, (jlong)written, JNI_FALSE);
}
windows本地接口
  int WSASend (
  SOCKET s,
  LPWSABUF lpBuffers
  DWORD dwBufferCount,
  LPDWORD lpNumberOfBytesSent,
  DWORD dwFlags,
  LPWSAOVERLAPPED lpOverlapped,
  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
  );
返回值WSAEWOULDBLOCK??? Windows NT: Overlapped sockets: There are too many outstanding overlapped I/O requests. Nonoverlapped sockets: The socket is marked as nonblocking and the send operation cannot be completed immediately.

热点排行