首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > .NET > .NET >

高性能大容量SOCKET并发(四):粘包、分包、解包解决方法

2013-01-25 
高性能大容量SOCKET并发(四):粘包、分包、解包本帖最后由 SQLDebug_Fan 于 2012-09-05 23:11:17 编辑粘包使

高性能大容量SOCKET并发(四):粘包、分包、解包
本帖最后由 SQLDebug_Fan 于 2012-09-05 23:11:17 编辑 粘包

    使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。粘包可能由发送方造成,也可能由接收方造成。TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据,造成多个数据包的粘连。如果接收进程不及时接收数据,已收到的数据就放在系统接收缓冲区,用户进程读取数据时就可能同时读到多个数据包。

    粘包一般的解决办法是制定通讯协议,由协议来规定如何分包解包。

分包

    在IOCPDemo例子程序中,我们分包的逻辑是先发一个长度,然后紧接着是数据包内容,这样就可以把每个包分开。

    应用层数据包格式如下:

    应用层数据包格式  
    数据包长度Len:Cardinal(4字节无符号整数) 数据包内容,长度为Len
    IOCPSocket分包处理主要代码,我们收到的数据都是在TSocketHandle.ProcessIOComplete方法中处理:


    procedure TSocketHandle.ProcessIOComplete(AIocpRecord: PIocpRecord;  
      const ACount: Cardinal);  
    begin  
      case AIocpRecord.IocpOperate of  
        ioNone: Exit;  
        ioRead: //收到数据  
        begin  
          FActiveTime := Now;  
          ReceiveData(AIocpRecord.WsaBuf.buf, ACount);  
          if FConnected then  
            PreRecv(AIocpRecord); //投递请求  
        end;  
        ioWrite: //发送数据完成,需要释放AIocpRecord的指针  
        begin  
          FActiveTime := Now;  
          FSendOverlapped.Release(AIocpRecord);  
        end;  
        ioStream:  
        begin  
          FActiveTime := Now;  
          FSendOverlapped.Release(AIocpRecord);  
          WriteStream; //继续发送流  
        end;  
      end;  
    end;  

    如果是收到数据,则调用ReceiveData函数,ReceiveData主要功能是把数据的写入流中,然后调用Process分包。FInputBuf是一个内存流(FInputBuf: TMemoryStream),内存流的每次写入会造成一次内存分配,如果要获得更高的效率,可以替换为内存池等更好的内存管理方式。还有一种更好的解决方案是规定每次发包的大小,如每个包最大不超过64K,哪么缓冲区的最大大小可以设置为128K(缓存两个数据包),这样就可以每次创建对象时一次分配好,减少内存分配次数,提高效率。(内存的分配和释放比内存的读写效率要低)

    procedure TSocketHandle.ReceiveData(AData: PAnsiChar; const ALen: Cardinal);  
    begin  
      FInputBuf.Write(AData^, ALen);  
      Process;  
    end;  

    Process则根据收到的数据进行分包逻辑,如果不够一个包,则继续等待接收数据,如果够一个或多个包,则循环调用Execute函数进行处理,代码如下:

    procedure TSocketHandle.Process;  


    var  
      AData, ALast, NewBuf: PByte;  
      iLenOffset, iOffset, iReserveLen: Integer;  
      
      function ReadLen: Integer;  
      var  
        wLen: Word;  
        cLen: Cardinal;  
      begin  
        FInputBuf.Position := iOffset;  
        if FLenType = ltWord then  
        begin  
          FInputBuf.Read(wLen, SizeOf(wLen));  
          //wLen := ntohs(wLen);  
          Result := wLen;  
        end  
        else  
        begin  
          FInputBuf.Read(cLen, SizeOf(cLen));  
          //cLen := ntohl(cLen);  
          Result := cLen;  
        end;  
      end;  
    begin  
      case FLenType of  
        ltWord, ltCardinal:  
        begin  
          if FLenType = ltWord then  
            iLenOffset := 2  
          else  
            iLenOffset := 4;  
          iReserveLen := 0;  
          FPacketLen := 0;  
          iOffset := 0;  
          if FPacketLen <= 0 then  
          begin  
            if FInputBuf.Size < iLenOffset then Exit;  
            FInputBuf.Position := 0; //移动到最前面  
            FPacketLen := ReadLen;  
            iOffset := iLenOffset;  
            iReserveLen := FInputBuf.Size - iOffset;  
            if FPacketLen > iReserveLen then //不够一个包的长度  
            begin  


              FInputBuf.Position := FInputBuf.Size; //移动到最后,以便接收后续数据  
              FPacketLen := 0;  
              Exit;  
            end;  
          end;  
          while (FPacketLen > 0) and (iReserveLen >= FPacketLen) do //如果数据够长,则处理  
          begin //多个包循环处理  
            AData := Pointer(Longint(FInputBuf.Memory) + iOffset); //取得当前的指针  
            Execute(AData, FPacketLen);  
            iOffset := iOffset + FPacketLen; //移到下一个点  
            FPacketLen := 0;  
            iReserveLen := FInputBuf.Size - iOffset;  
            if iReserveLen > iLenOffset then //剩下的数据  
            begin  
              FPacketLen := ReadLen;  
              iOffset := iOffset + iLenOffset;  
              iReserveLen := FInputBuf.Size - iOffset;  
              if FPacketLen > iReserveLen then //不够一个包的长度,需要把长度回退  
              begin  
                iOffset := iOffset - iLenOffset;  
                iReserveLen := FInputBuf.Size - iOffset;  
                FPacketLen := 0;  
              end;  
            end  
            else //不够长度字节数  
              FPacketLen := 0;  
          end;  
          if iReserveLen > 0 then //把剩下的自己缓存起来  
          begin  
            ALast := Pointer(Longint(FInputBuf.Memory) + iOffset);  
            GetMem(NewBuf, iReserveLen);  


            try  
              CopyMemory(NewBuf, ALast, iReserveLen);  
              FInputBuf.Clear;  
              FInputBuf.Write(NewBuf^, iReserveLen);  
            finally  
              FreeMemory(NewBuf);  
            end;  
          end  
          else  
          begin  
            FInputBuf.Clear;  
          end;  
        end;  
      else  
        begin  
          FInputBuf.Position := 0;  
          AData := Pointer(Longint(FInputBuf.Memory)); //取得当前的指针  
          Execute(AData, FInputBuf.Size);  
          FInputBuf.Clear;  
        end;  
      end;  
    end;  


解包

    由于我们应用层数据包既可以传命令也可以传数据,因而针对每个包我们进行解包,分出命令和数据分别处理,因而每个Socket服务对象都需要解包,我们解包的逻辑是放在TBaseSocket.DecodePacket中,命令和数据的包格式为:

    命令长度Len:Cardinal(4字节无符号整数) 命令 数据
    这里和第一版公布的代码不同,这版的代码对命令进行了编码,采用UTF-8编码,代码如下:

    function TBaseSocket.DecodePacket(APacketData: PByte;  
      const ALen: Integer): Boolean;  
    var  
      CommandLen: Integer;  
      UTF8Command: UTF8String;  
    begin  
      if ALen > 4 then //命令长度为4字节,因而长度必须大于4  
      begin  
        CopyMemory(@CommandLen, APacketData, SizeOf(Cardinal)); //获取命令长度  
        Inc(APacketData, SizeOf(Cardinal));  
        SetLength(UTF8Command, CommandLen);  
        CopyMemory(PUTF8String(UTF8Command), APacketData, CommandLen); //读取命令  
        Inc(APacketData, CommandLen);  
        FRequestData := APacketData; //数据  
        FRequestDataLen := ALen - SizeOf(Cardinal) - CommandLen; //数据长度  
        FRequest.Text := Utf8ToAnsi(UTF8Command); //把UTF8转为Ansi  


        Result := True;  
      end  
      else  
        Result := False;   
    end;  


    具体每个协议可以继承Execute方法,调用DecodePacket进行解包,然后根据命令进行协议逻辑处理,例如TSQLSocket主要代码如下:
    {* SQL查询SOCKET基类 *}  
    TSQLSocket = class(TBaseSocket)  
    private  
      {* 开始事务创建TADOConnection,关闭事务时释放 *}  
      FBeginTrans: Boolean;  
      FADOConn: TADOConnection;  
    protected  
      {* 处理数据接口 *}  
      procedure Execute(AData: PByte; const ALen: Cardinal); override;  
      {* 返回SQL语句执行结果 *}  
      procedure DoCmdSQLOpen;  
      {* 执行SQL语句 *}  
      procedure DoCmdSQLExec;  
      {* 开始事务 *}  
      procedure DoCmdBeginTrans;  
      {* 提交事务 *}  
      procedure DoCmdCommitTrans;  
      {* 回滚事务 *}  
      procedure DoCmdRollbackTrans;  
    public  
      procedure DoCreate; override;  
      destructor Destroy; override;  
      {* 获取SQL语句 *}  
      function GetSQL: string;  
      property BeginTrans: Boolean read FBeginTrans;  
    end;  
    Exceute是调用DecodePacket进行解包,然后获取命令分别调用不同的命令处理逻辑,代码如下:

    procedure TSQLSocket.Execute(AData: PByte; const ALen: Cardinal);  
    var  
      sErr: string;  
    begin  
      inherited;  
      FRequest.Clear;  
      FResponse.Clear;  
      try  
        AddResponseHeader;  
        if ALen = 0 then  
        begin  
          DoFailure(CIPackLenError);  
          DoSendResult;  
          Exit;  
        end;  
        if DecodePacket(AData, ALen) then  
        begin  
          FResponse.Clear;  


          AddResponseHeader;  
          case StrToSQLCommand(Command) of  
            scLogin:  
            begin  
              DoCmdLogin;  
              DoSendResult;  
            end;  
            scActive:  
            begin  
              DoSuccess;  
              DoSendResult;  
            end;  
            scSQLOpen:  
            begin  
              DoCmdSQLOpen;  
            end;  
            scSQLExec:  
            begin  
              DoCmdSQLExec;  
              DoSendResult;  
            end;  
            scBeginTrans:  
            begin  
              DoCmdBeginTrans;  
              DoSendResult;  
            end;  
            scCommitTrans:  
            begin  
              DoCmdCommitTrans;  
              DoSendResult;  
            end;  
            scRollbackTrans:  
            begin  
              DoCmdRollbackTrans;  
              DoSendResult;  
            end;  
          else  
            DoFailure(CINoExistCommand, 'Unknow Command');  
            DoSendResult;  


          end;  
        end  
        else  
        begin  
          DoFailure(CIPackFormatError, 'Packet Must Include \r\n\r\n');  
          DoSendResult;  
        end;  
      except  
        on E: Exception do //发生未知错误,断开连接  
        begin  
          sErr := RemoteAddress + ':' + IntToStr(RemotePort) + CSComma + 'Unknow Error: ' + E.Message;  
          WriteLogMsg(ltError, sErr);  
          Disconnect;  
        end;  
      end;  
    end;  


    更详细代码见示例代码的IOCPSocket单元。

博文地址:http://blog.csdn.net/sqldebug_fan/article/details/7907765
[解决办法]
居然有沙发,拜读。
[解决办法]
无数404,情何以堪
[解决办法]
回復被刪了。。。留個位置學習一下高性能大容量SOCKET并发(四):粘包、分包、解包解决方法

热点排行