高性能大容量SOCKET并发(四):粘包、分包、解包
本帖最后由 SQLDebug_Fan 于 2012-09-05 23:11:17 编辑 粘包
使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。粘包可能由发送方造成,也可能由接收方造成。TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据,造成多个数据包的粘连。如果接收进程不及时接收数据,已收到的数据就放在系统接收缓冲区,用户进程读取数据时就可能同时读到多个数据包。
粘包一般的解决办法是制定通讯协议,由协议来规定如何分包解包。
分包
在IOCPDemo例子程序中,我们分包的逻辑是先发一个长度,然后紧接着是数据包内容,这样就可以把每个包分开。
应用层数据包格式如下:
应用层数据包格式
数据包长度Len:Cardinal(4字节无符号整数) 数据包内容,长度为Len
IOCPSocket分包处理主要代码,我们收到的数据都是在TSocketHandle.ProcessIOComplete方法中处理:
procedure TSocketHandle.ProcessIOComplete(AIocpRecord: PIocpRecord;
const ACount: Cardinal);
begin
case AIocpRecord.IocpOperate of
ioNone: Exit;
ioRead: //收到数据
begin
FActiveTime := Now;
ReceiveData(AIocpRecord.WsaBuf.buf, ACount);
if FConnected then
PreRecv(AIocpRecord); //投递请求
end;
ioWrite: //发送数据完成,需要释放AIocpRecord的指针
begin
FActiveTime := Now;
FSendOverlapped.Release(AIocpRecord);
end;
ioStream:
begin
FActiveTime := Now;
FSendOverlapped.Release(AIocpRecord);
WriteStream; //继续发送流
end;
end;
end;
procedure TSocketHandle.ReceiveData(AData: PAnsiChar; const ALen: Cardinal);
begin
FInputBuf.Write(AData^, ALen);
Process;
end;
procedure TSocketHandle.Process;
var
AData, ALast, NewBuf: PByte;
iLenOffset, iOffset, iReserveLen: Integer;
function ReadLen: Integer;
var
wLen: Word;
cLen: Cardinal;
begin
FInputBuf.Position := iOffset;
if FLenType = ltWord then
begin
FInputBuf.Read(wLen, SizeOf(wLen));
//wLen := ntohs(wLen);
Result := wLen;
end
else
begin
FInputBuf.Read(cLen, SizeOf(cLen));
//cLen := ntohl(cLen);
Result := cLen;
end;
end;
begin
case FLenType of
ltWord, ltCardinal:
begin
if FLenType = ltWord then
iLenOffset := 2
else
iLenOffset := 4;
iReserveLen := 0;
FPacketLen := 0;
iOffset := 0;
if FPacketLen <= 0 then
begin
if FInputBuf.Size < iLenOffset then Exit;
FInputBuf.Position := 0; //移动到最前面
FPacketLen := ReadLen;
iOffset := iLenOffset;
iReserveLen := FInputBuf.Size - iOffset;
if FPacketLen > iReserveLen then //不够一个包的长度
begin
FInputBuf.Position := FInputBuf.Size; //移动到最后,以便接收后续数据
FPacketLen := 0;
Exit;
end;
end;
while (FPacketLen > 0) and (iReserveLen >= FPacketLen) do //如果数据够长,则处理
begin //多个包循环处理
AData := Pointer(Longint(FInputBuf.Memory) + iOffset); //取得当前的指针
Execute(AData, FPacketLen);
iOffset := iOffset + FPacketLen; //移到下一个点
FPacketLen := 0;
iReserveLen := FInputBuf.Size - iOffset;
if iReserveLen > iLenOffset then //剩下的数据
begin
FPacketLen := ReadLen;
iOffset := iOffset + iLenOffset;
iReserveLen := FInputBuf.Size - iOffset;
if FPacketLen > iReserveLen then //不够一个包的长度,需要把长度回退
begin
iOffset := iOffset - iLenOffset;
iReserveLen := FInputBuf.Size - iOffset;
FPacketLen := 0;
end;
end
else //不够长度字节数
FPacketLen := 0;
end;
if iReserveLen > 0 then //把剩下的自己缓存起来
begin
ALast := Pointer(Longint(FInputBuf.Memory) + iOffset);
GetMem(NewBuf, iReserveLen);
try
CopyMemory(NewBuf, ALast, iReserveLen);
FInputBuf.Clear;
FInputBuf.Write(NewBuf^, iReserveLen);
finally
FreeMemory(NewBuf);
end;
end
else
begin
FInputBuf.Clear;
end;
end;
else
begin
FInputBuf.Position := 0;
AData := Pointer(Longint(FInputBuf.Memory)); //取得当前的指针
Execute(AData, FInputBuf.Size);
FInputBuf.Clear;
end;
end;
end;
function TBaseSocket.DecodePacket(APacketData: PByte;
const ALen: Integer): Boolean;
var
CommandLen: Integer;
UTF8Command: UTF8String;
begin
if ALen > 4 then //命令长度为4字节,因而长度必须大于4
begin
CopyMemory(@CommandLen, APacketData, SizeOf(Cardinal)); //获取命令长度
Inc(APacketData, SizeOf(Cardinal));
SetLength(UTF8Command, CommandLen);
CopyMemory(PUTF8String(UTF8Command), APacketData, CommandLen); //读取命令
Inc(APacketData, CommandLen);
FRequestData := APacketData; //数据
FRequestDataLen := ALen - SizeOf(Cardinal) - CommandLen; //数据长度
FRequest.Text := Utf8ToAnsi(UTF8Command); //把UTF8转为Ansi
Result := True;
end
else
Result := False;
end;
procedure TSQLSocket.Execute(AData: PByte; const ALen: Cardinal);
var
sErr: string;
begin
inherited;
FRequest.Clear;
FResponse.Clear;
try
AddResponseHeader;
if ALen = 0 then
begin
DoFailure(CIPackLenError);
DoSendResult;
Exit;
end;
if DecodePacket(AData, ALen) then
begin
FResponse.Clear;
AddResponseHeader;
case StrToSQLCommand(Command) of
scLogin:
begin
DoCmdLogin;
DoSendResult;
end;
scActive:
begin
DoSuccess;
DoSendResult;
end;
scSQLOpen:
begin
DoCmdSQLOpen;
end;
scSQLExec:
begin
DoCmdSQLExec;
DoSendResult;
end;
scBeginTrans:
begin
DoCmdBeginTrans;
DoSendResult;
end;
scCommitTrans:
begin
DoCmdCommitTrans;
DoSendResult;
end;
scRollbackTrans:
begin
DoCmdRollbackTrans;
DoSendResult;
end;
else
DoFailure(CINoExistCommand, 'Unknow Command');
DoSendResult;
end;
end
else
begin
DoFailure(CIPackFormatError, 'Packet Must Include \r\n\r\n');
DoSendResult;
end;
except
on E: Exception do //发生未知错误,断开连接
begin
sErr := RemoteAddress + ':' + IntToStr(RemotePort) + CSComma + 'Unknow Error: ' + E.Message;
WriteLogMsg(ltError, sErr);
Disconnect;
end;
end;
end;