首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > .NET > .NET >

高性能大容量SOCKET并发(4):粘包、分包、解包

2012-11-13 
高性能大容量SOCKET并发(四):粘包、分包、解包粘包使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若

高性能大容量SOCKET并发(四):粘包、分包、解包
粘包

  使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。粘包可能由发送方造成,也可能由接收方造成。TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据,造成多个数据包的粘连。如果接收进程不及时接收数据,已收到的数据就放在系统接收缓冲区,用户进程读取数据时就可能同时读到多个数据包。

  粘包一般的解决办法是制定通讯协议,由协议来规定如何分包解包。

分包

  在IOCPDemo例子程序中,我们分包的逻辑是先发一个长度,然后紧接着是数据包内容,这样就可以把每个包分开。

  应用层数据包格式如下:

  应用层数据包格式  
  数据包长度Len:Cardinal(4字节无符号整数) 数据包内容,长度为Len
  IOCPSocket分包处理主要代码,我们收到的数据都是在TSocketHandle.ProcessIOComplete方法中处理:

Delphi(Pascal) code
    procedure TSocketHandle.ProcessIOComplete(AIocpRecord: PIocpRecord;        const ACount: Cardinal);      begin        case AIocpRecord.IocpOperate of          ioNone: Exit;          ioRead: //收到数据          begin            FActiveTime := Now;            ReceiveData(AIocpRecord.WsaBuf.buf, ACount);            if FConnected then              PreRecv(AIocpRecord); //投递请求          end;          ioWrite: //发送数据完成,需要释放AIocpRecord的指针          begin            FActiveTime := Now;            FSendOverlapped.Release(AIocpRecord);          end;          ioStream:          begin            FActiveTime := Now;            FSendOverlapped.Release(AIocpRecord);            WriteStream; //继续发送流          end;        end;      end;  

  如果是收到数据,则调用ReceiveData函数,ReceiveData主要功能是把数据的写入流中,然后调用Process分包。FInputBuf是一个内存流(FInputBuf: TMemoryStream),内存流的每次写入会造成一次内存分配,如果要获得更高的效率,可以替换为内存池等更好的内存管理方式。还有一种更好的解决方案是规定每次发包的大小,如每个包最大不超过64K,哪么缓冲区的最大大小可以设置为128K(缓存两个数据包),这样就可以每次创建对象时一次分配好,减少内存分配次数,提高效率。(内存的分配和释放比内存的读写效率要低)
Delphi(Pascal) code
    procedure TSocketHandle.ReceiveData(AData: PAnsiChar; const ALen: Cardinal);      begin        FInputBuf.Write(AData^, ALen);        Process;      end;  

  Process则根据收到的数据进行分包逻辑,如果不够一个包,则继续等待接收数据,如果够一个或多个包,则循环调用Execute函数进行处理,代码如下:
Delphi(Pascal) code
    procedure TSocketHandle.Process;      var        AData, ALast, NewBuf: PByte;        iLenOffset, iOffset, iReserveLen: Integer;              function ReadLen: Integer;        var          wLen: Word;          cLen: Cardinal;        begin          FInputBuf.Position := iOffset;          if FLenType = ltWord then          begin            FInputBuf.Read(wLen, SizeOf(wLen));            //wLen := ntohs(wLen);            Result := wLen;          end          else          begin            FInputBuf.Read(cLen, SizeOf(cLen));            //cLen := ntohl(cLen);            Result := cLen;          end;        end;      begin        case FLenType of          ltWord, ltCardinal:          begin            if FLenType = ltWord then              iLenOffset := 2            else              iLenOffset := 4;            iReserveLen := 0;            FPacketLen := 0;            iOffset := 0;            if FPacketLen <= 0 then            begin              if FInputBuf.Size < iLenOffset then Exit;              FInputBuf.Position := 0; //移动到最前面              FPacketLen := ReadLen;              iOffset := iLenOffset;              iReserveLen := FInputBuf.Size - iOffset;              if FPacketLen > iReserveLen then //不够一个包的长度              begin                FInputBuf.Position := FInputBuf.Size; //移动到最后,以便接收后续数据                FPacketLen := 0;                Exit;              end;            end;            while (FPacketLen > 0) and (iReserveLen >= FPacketLen) do //如果数据够长,则处理            begin //多个包循环处理              AData := Pointer(Longint(FInputBuf.Memory) + iOffset); //取得当前的指针              Execute(AData, FPacketLen);              iOffset := iOffset + FPacketLen; //移到下一个点              FPacketLen := 0;              iReserveLen := FInputBuf.Size - iOffset;              if iReserveLen > iLenOffset then //剩下的数据              begin                FPacketLen := ReadLen;                iOffset := iOffset + iLenOffset;                iReserveLen := FInputBuf.Size - iOffset;                if FPacketLen > iReserveLen then //不够一个包的长度,需要把长度回退                begin                  iOffset := iOffset - iLenOffset;                  iReserveLen := FInputBuf.Size - iOffset;                  FPacketLen := 0;                end;              end              else //不够长度字节数                FPacketLen := 0;            end;            if iReserveLen > 0 then //把剩下的自己缓存起来            begin              ALast := Pointer(Longint(FInputBuf.Memory) + iOffset);              GetMem(NewBuf, iReserveLen);              try                CopyMemory(NewBuf, ALast, iReserveLen);                FInputBuf.Clear;                FInputBuf.Write(NewBuf^, iReserveLen);              finally                FreeMemory(NewBuf);              end;            end            else            begin              FInputBuf.Clear;            end;          end;        else          begin            FInputBuf.Position := 0;            AData := Pointer(Longint(FInputBuf.Memory)); //取得当前的指针            Execute(AData, FInputBuf.Size);            FInputBuf.Clear;          end;        end;      end;   


解包

  由于我们应用层数据包既可以传命令也可以传数据,因而针对每个包我们进行解包,分出命令和数据分别处理,因而每个Socket服务对象都需要解包,我们解包的逻辑是放在TBaseSocket.DecodePacket中,命令和数据的包格式为:

  命令长度Len:Cardinal(4字节无符号整数) 命令 数据
  这里和第一版公布的代码不同,这版的代码对命令进行了编码,采用UTF-8编码,代码如下:
Delphi(Pascal) code
    function TBaseSocket.DecodePacket(APacketData: PByte;        const ALen: Integer): Boolean;      var        CommandLen: Integer;        UTF8Command: UTF8String;      begin        if ALen > 4 then //命令长度为4字节,因而长度必须大于4        begin          CopyMemory(@CommandLen, APacketData, SizeOf(Cardinal)); //获取命令长度          Inc(APacketData, SizeOf(Cardinal));          SetLength(UTF8Command, CommandLen);          CopyMemory(PUTF8String(UTF8Command), APacketData, CommandLen); //读取命令          Inc(APacketData, CommandLen);          FRequestData := APacketData; //数据          FRequestDataLen := ALen - SizeOf(Cardinal) - CommandLen; //数据长度          FRequest.Text := Utf8ToAnsi(UTF8Command); //把UTF8转为Ansi          Result := True;        end        else          Result := False;       end;  

  具体每个协议可以继承Execute方法,调用DecodePacket进行解包,然后根据命令进行协议逻辑处理,例如TSQLSocket主要代码如下:
  {* SQL查询SOCKET基类 *}  
  TSQLSocket = class(TBaseSocket)  
  private  
  {* 开始事务创建TADOConnection,关闭事务时释放 *}  
  FBeginTrans: Boolean;  
  FADOConn: TADOConnection;  
  protected  
  {* 处理数据接口 *}  
  procedure Execute(AData: PByte; const ALen: Cardinal); override;  
  {* 返回SQL语句执行结果 *}  
  procedure DoCmdSQLOpen;  
  {* 执行SQL语句 *}  
  procedure DoCmdSQLExec;  
  {* 开始事务 *}  
  procedure DoCmdBeginTrans;  
  {* 提交事务 *}  
  procedure DoCmdCommitTrans;  
  {* 回滚事务 *}  
  procedure DoCmdRollbackTrans;  
  public  
  procedure DoCreate; override;  
  destructor Destroy; override;  
  {* 获取SQL语句 *}  
  function GetSQL: string;  
  property BeginTrans: Boolean read FBeginTrans;  
  end;  
  Exceute是调用DecodePacket进行解包,然后获取命令分别调用不同的命令处理逻辑,代码如下:
Delphi(Pascal) code
    procedure TSQLSocket.Execute(AData: PByte; const ALen: Cardinal);      var        sErr: string;      begin        inherited;        FRequest.Clear;        FResponse.Clear;        try          AddResponseHeader;          if ALen = 0 then          begin            DoFailure(CIPackLenError);            DoSendResult;            Exit;          end;          if DecodePacket(AData, ALen) then          begin            FResponse.Clear;            AddResponseHeader;            case StrToSQLCommand(Command) of              scLogin:              begin                DoCmdLogin;                DoSendResult;              end;              scActive:              begin                DoSuccess;                DoSendResult;              end;              scSQLOpen:              begin                DoCmdSQLOpen;              end;              scSQLExec:              begin                DoCmdSQLExec;                DoSendResult;              end;              scBeginTrans:              begin                DoCmdBeginTrans;                DoSendResult;              end;              scCommitTrans:              begin                DoCmdCommitTrans;                DoSendResult;              end;              scRollbackTrans:              begin                DoCmdRollbackTrans;                DoSendResult;              end;            else              DoFailure(CINoExistCommand, 'Unknow Command');              DoSendResult;            end;          end          else          begin            DoFailure(CIPackFormatError, 'Packet Must Include \r\n\r\n');            DoSendResult;          end;        except          on E: Exception do //发生未知错误,断开连接          begin            sErr := RemoteAddress + ':' + IntToStr(RemotePort) + CSComma + 'Unknow Error: ' + E.Message;            WriteLogMsg(ltError, sErr);            Disconnect;          end;        end;      end;   


  更详细代码见示例代码的IOCPSocket单元。

博文地址:http://blog.csdn.net/sqldebug_fan/article/details/7907765

[解决办法]
居然有沙发,拜读。
[解决办法]
无数404,情何以堪
[解决办法]
回復被刪了。。。留個位置學習一下

热点排行