当客户端和服务器使用TCP协议进行通信的时候,当收到对端数据时,操作系统就会将数据存入到Socket的接收缓冲区中。操作系统层面上的缓冲区完全由操作系统操作,程序并不能直接操作他们,而是要通过Socket.Recive和Socekt.Send方法来间接操作。同时,缓冲区分为发送缓冲区和接收缓冲区两个部分
Socket的Recive方法只是把接收缓冲区的数据提取出来,当系统的接收缓冲区为空的时候,Recive方法会被阻塞直到里面有数据。而Socket的Send方法只是把数据写入到发送缓冲区里面,具体的发送过程则由操作系统控制。同样,当发送缓冲区满了之后,Send方法将会被阻塞。所以,当Send方法被成功调用的时候,并不意味着数据已经被对端所接收到,还有可能是还在缓冲区中待命
正式由于上述数据流的操作方法,因此发送的数据可能会出现粘包和半包的现象。
粘包现象是指发送的两条数据被合并为了一条读取,比如聊天软件中发送“Yzp”和“_is_handsome”,如果读取速度稍微慢了一些,就可能被Receive方法合并成为一条“Yzp_is_handsome”读取出来。
半包现象是指发送端的数据有可能被拆分。比如发送“HelloWorld”,在接收端可能先收到了“Hel”,然后就被Receive方法调用收走了,而之后的字母"loWorld"则被下一次Receive方法接收,所以就形成了半包现象
这是与我们的感知不符的,正确的收发数据应当做到一次发送多少数据,一次也要接收多少数据
长度信息法是指在每个数据包前面加上长度信息。每次接收到数据之后,先读取表示长度的字节,如果缓冲区中的数据大于要取的字节数,则取出相应字节,否则等待下一次的数据接收
这个方法很好理解,就是通过一个长度标识来告知这一条消息的长度,等待其全部接收到之后再读取
每次都以相同长度发送数据
假设规定每条信息的长度都为10个字符,那么“Hello”“Unity”两条信息可以发送成“Hello…”“Unity…”,其中的".“表示填充字符,是为了凑数,没有实际意义,只为了每次发送的长度都是固定长度。接收方每次读取10个字符,作为一条消息去处理。如果读到的字符数大于10,比如第一次读到"Hello…Un”,那它只要把前十个字节“Hello…”抽取出来,再把后面两个字节“Un”存起来,等到再次接收数据,拼接第二条信息
**规定一个结束符号,作为消息间的分割符。**假设规定结束符号为"$",那么接收方就去根据这个符号去分割信息。
登录后复制
using System.Collections;
using System.Collections.Generic;
using System;
using System.Linq;
using UnityEngine;
using System.Net.Sockets;
using UnityEngine.UI;
public class Echo : MonoBehaviour
{
Socket socket;//定义套接字
//UGUI 这里声明界面元素
public InputField inputField;
public Text text;
//接收缓冲区
byte[] readBuff = new byte[1024];
//接收缓冲区的数据长度
int buffCount = 0;
string recvStr = "";
//点击链接按钮
public void Connection()
{
//Socket
socket = new Socket(AddressFamily.InterNetwork,
SocketType.Stream,ProtocolType.Tcp);
// socket.BeginConnect("127.0.0.1",8888,ConnectCallback,socket);//异步连接
socket.Connect("127.0.0.1",8888);//直接连接函数
socket.BeginReceive(readBuff,buffCount,1024-buffCount,0,ReceiveCallback,socket);//和直接连接函数一起用的异步通信
}
//Connect回调函数
public void ConnectCallback(IAsyncResult ar)
{
try{
Socket socket = (Socket) ar.AsyncState;
socket.EndConnect(ar);
Debug.Log("Socket Connect Succ");
socket.BeginReceive(readBuff,0,1024,0,ReceiveCallback,socket);//异步接收
}
catch(SocketException ex){
Debug.Log("Socket Connect fail" + ex.ToString());
}
}
public void ReceiveCallback(IAsyncResult ar)
{
try{
Socket socket = (Socket) ar.AsyncState;
//获取接收数据长度
int count = socket.EndReceive(ar);
buffCount += count;
//处理二进制消息
OnReceiveData();
string s = System.Text.Encoding.Default.GetString(readBuff,0,count);
recvStr = s + "\n" + recvStr;//这里显示历史聊天记录
//继续接收数据
socket.BeginReceive(readBuff,0,1024,0,ReceiveCallback,socket);//这里是一个递归的调用
}
catch(SocketException ex){
Debug.Log("Socket Receive fail" + ex.ToString());
}
}
public void OnReceiveData()
{
Debug.Log("[Recv 1] buffCount = " + buffCount);
Debug.Log("[Recv 2] readbuff = " + BitConverter.ToString(readBuff));//将字节转换为字符串
//消息长度小于二的时候不处理
if(buffCount <= 2)
{
return;
}
Int16 bodyLength = BitConverter.ToInt16(readBuff,0);
Debug.Log("[Recv 3] bodyLength = "+ bodyLength);
//消息体不完整的时候也不处理
if(buffCount<2 + bodyLength)
{
return;
}
//完整收到了整条消息才读出
string s = System.Text.Encoding.UTF8.GetString(readBuff,2,bodyLength);
Debug.Log("[Recv 4] s = " + s);
//更新缓冲区
int start = 2 + bodyLength;
int count = buffCount - start;
Array.Copy(readBuff,start,readBuff,0,count);//使用Copy移动缓冲区,参数见末尾注释
buffCount -= start;
Debug.Log("[Recv 5] buffCount = " + buffCount);
//消息处理
recvStr = s + "\n" + recvStr;
//完成一次读取,开启下一次
OnReceiveData();
}
//点击发送按钮
public void Send()
{
//Send
string sendStr = inputField.text;//得到发送的消息
//组装协议
byte[] bodyBytes = System.Text.Encoding.Default.GetBytes(sendStr);
Int16 len = (Int16)bodyBytes.Length;
byte[] lenBytes = BitConverter.GetBytes(len);
byte[] sendBytes = lenBytes.Concat(bodyBytes).ToArray();//将长度和信息拼接
// socket.BeginSend(sendBytes,0,sendBytes.Length,0,SendCallback,socket);
socket.Send(sendBytes);
}
//Send回调
public void SendCallback(IAsyncResult ar)
{
try{
Socket socket = (Socket) ar.AsyncState;
int count = socket.EndSend(ar);
Debug.Log("Socket Send Succ" + count);
}
catch(SocketException ex){
Debug.Log("Socket Send fail" + ex.ToString());
}
}
//Poll型客户端
private void Update()
{
if(socket == null){
return;
}
if(socket.Poll(0,SelectMode.SelectRead))
{
byte[] readBuff = new byte[1024];
int count = socket.Receive(readBuff);
string recvStr = System.Text.Encoding.Default.GetString(readBuff,0,count);
text.text = recvStr;
}
}
}
// 注释部分
// 1.Socket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
// 用于创建一个Socket对象,它的三个参数分别代表地址族,套接字类型和协议
// 地址族知名使用的是IPV4还是IPV6,InterNetwork代表IPV4,InterNetworkV6代表IPV6
// SocketType是套接字类型,游戏中最常用的是字节流套接字,即Stream
// ProtocolType指明协议
// 2.客户端通过socket.Connect(远程IP地址,远程端口)连接服务端。
// Connect是一个阻塞方法,程序会卡住直到服务端回应(接受、拒绝或者超时)
// 3.客户端通过socket.send发送数据
// GetBytes(字符串)把字符串转换成byte这也是一个阻塞方法。该方法接受一个byte[]类型的参数指明要发送的内容。Send的返回值指明发送数据的长度
// 程序用System.Text.Encoding.Default.[]数组,然后发送给服务端
// 4.客户端使用socket.Receive接受服务端数据。
// Receive也是阻塞方法,没有收到服务端的数据时,程序将卡在Receive不会往下执行
// Receive带有一个byte[]类型的参数,它存储接收到的数据
// Receive的返回值指明接收到的数据的长度。之后使用System.Text.Encoding.Default.GetString(readBuff,0,count)将byte[]
// 5.通过socket.close()关闭连接
// 6.通过BeginConnect和EndConnect来让客户端代码变成异步进行,防止程序卡死
// IAsyncResult是.NET提供的一种异步操作,通过名为BeginXXX和EndXXX的两个方法来为实现原本同步方法的异步调用
// BeginXXX方法中包含同步方法中所需的参数,此外还包含两个参数:一个AsyncCallback委托和一个用户定义的状态对象
// 委托用来调用回调方法,状态对象用来向回调方法传递状态信息,且BeginXXX方法返回一个实现IAsyncResult接口的对象,EndXXX方法用于结束异步操作并且返回结果
// EndXXX方法含有一个IAsyncResult参数,用于获取异步操作是否完成的信息,它的返回值与同步方法相同
// 7.BeginReceive的参数为(readBuff,0,1024,ReceiveCallback,socket)
// 第一个参数readBuff表示接收缓冲区,第二个参数0表示从readBuff第0位开始接收数据,这个参数和TCP粘包问题有关
// 第三个参数1024代表每次最多接收1024个字节,假如服务端回应一串长长的数据,那一次也只会收到1024个字节
// 8.BeginReceive的调用位置
// 程序在两个地方调用了BeginReceive,一个是ConnectCallback,在连接成功后,就开始接收数据,接收到数据之后,回调函数ReceiveCallback被调用
// 另一个是BeginReceive内部,接受完一串数据之后,等待下一串数据的到来
// 9.Update和recvStr
// 在Unity中,只有主线程可以操作UI组件。由于异步回调是在其他线程执行的,如果在BeginReceive给text.text赋值,Unity会弹出异常信息,所以只能在主线程中
// 10.异步BeginSend参数说明
// buffer Byte类型的数组,包含需要发送的数据
// offset 从Buffer中的offset位置开始发送
// size 将要发送的字节数
// socketFlags SocetFlags值的按位组合,这里设置为0
// callback 回调函数,一个AsyncCallbakc委托
// state 一个用户定义对象,其中包含发送操作的相关信息。当操作完成时,此对象会传递给EndSend委托
//11.数组移动方法Array.Copy参数说明
// public static void Copy{
// Array sourceArray,
// long sourceIndex,
// Array destinationArray,
// long destinationIndex,
// long length
// }
//sourceArray代表源数组 destyinationArray代表目标数据,sourceIndex代表复制起始位置
//destinationIndex代表目标数组的起始位置,length代表要复制的信息的长度
在实际测试中,粘包半包问题出现的频率大致占到了收发数据问题的80%,剩下的20%中,有一个问题是大端小端问题。出现的原因是因为BitConverter.ToInt16(buffer,offset)这个方法针对大端编码和小段编码方式下的计算方式不同,就会导致我们读取的数据长度不一样,所以,还要处理大端小端问题
我们规定缓冲区中的数据都以小端编码存储,所以可以使用BitConverter.IsLittleEndian来判断是否是小端编码,如果不是的话,则使用lenBytes.Reverse()进行转换
由于发送缓冲区也是有大小限制的,当网络状况不佳,缓冲区被逐渐填充满时,有可能会发生数据截断的情况(一个字符串无法完整的放入缓冲区)。当网络重新通畅时,缓冲区中的数据可能被直接全部发送,字符串就这样被截断发送了。从而导致从这里开始之后的所有数据发送都无法解析造成通信失败
我们使用一个缓冲区队列来控制我们发送的数据流。我们所需要发送的每一条信息都可以新建一个缓冲区,并将其加入到写入队列之中。每一个缓冲区元素都有长度和当前已发送字符位置的标识。只有当队列头部的一整条缓冲区字符都被发送完毕时,才会轮到下一个缓冲区。因此,我们需要数据结构对这种做法进行封装
登录后复制
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
//封装byte[],readIdx和length的类
public class ByteArray
{
//缓冲区
public byte[] bytes;
//读写位置
public int readIdx = 0;
public int writeIdx = 0;//指向缓冲区字符末尾
//数据长度
public int length{ get {return writeIdx - readIdx;}}
//构造函数
public ByteArray(byte[] defaultBytes){
bytes = defaultBytes;
readIdx = 0;
writeIdx = defaultBytes.Length;
}
}
之后使用C#自带的Queue< ByteArray >即可实现。但是由于我们使用异步通信的时候,可能会有多个线程去读取此队列,造成线程冲突,从而造成结果不正确,所以需要在操作Queue的地方使用lock加上线程锁,类似下面这种
我们这里可以使用一个指针readIdx去指向当前已发送到了缓冲区的哪个位置,写入的时候则使用另一个指针writeIdx指向字符末尾,继续填充。当缓冲区长度不够时,才做一次Array.Copy,这样可以极大程度上节省空间
由于定义的缓冲区最大长度为1024,如果网络状况不好的情况下,会将其撑爆
登录后复制
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System.Linq;
using System;
//封装byte[],readIdx和length的类
public class ByteArray
{
//默认大小
const int DEFAULT_SIZE = 1024;
//初始大小
int initSize = 0;
//缓冲区
public byte[] bytes;
//读写位置
public int readIdx = 0;
public int writeIdx = 0;//指向缓冲区字符末尾
//容量
private int capacity = 0;
//剩余空间
public int remain{ get {return capacity-writeIdx; } }
//数据长度
public int length{ get {return writeIdx - readIdx;} }
//构造函数1
public ByteArray(int size = DEFAULT_SIZE)
{
bytes = new byte[size];
capacity = size;
initSize = size;
readIdx = 0;
writeIdx = 0;
}
//构造函数2
public ByteArray(byte[] defaultBytes){
bytes = defaultBytes;
capacity = defaultBytes.Length;
initSize = defaultBytes.Length;
readIdx = 0;
writeIdx = defaultBytes.Length;
}
//重设尺寸,通过该方法控制缓冲区扩展
public void ReSize(int size){
if(size<length) return;
if(size<initSize) return;
int n = 1;
while(n<size) n*=2;//以两倍扩展
capacity = n;
byte[] newBytes = new byte[capacity];
Array.Copy(bytes,readIdx,newBytes,0,writeIdx-readIdx);
bytes = newBytes;
writeIdx = length;
readIdx = 0;
}
//当数据量很少时可以通过移动数据的方式,而不用拓展
public void CheckAndMoveBytes(){
if(length < 8){
MoveBytes();
}
}
public void MoveBytes(){
if(length > 0){
Array.Copy(bytes,readIdx,bytes,0,length);
}
writeIdx = length;
readIdx = 0;
}
//写入数据
public int Write(byte[] bs,int offset,int count){
if(remain < count){
ReSize(length + count);
}
Array.Copy(bs,offset,bytes,writeIdx,count);
writeIdx += count;
return count;
}
//读取数据
public int Read(byte[] bs,int offset,int count){
count = Math.Min(count,length);
Array.Copy(bytes,readIdx,bs,offset,count);
readIdx += count;
CheckAndMoveBytes();
return count;
}
//读取INT16
public Int16 ReadInt16(){
if(length<2) return 0;
Int16 ret = (Int16)((bytes[readIdx + 1]<<8) | bytes[readIdx]);
readIdx += 2;
CheckAndMoveBytes();
return ret;
}
//读取Int32
public Int32 ReadInt32(){
if(length<4) return 0;
Int32 ret = (Int32) ((bytes[readIdx +3]<<24)|
(bytes[readIdx +2]<<16)|
(bytes[readIdx +1]<<8) |
bytes[readIdx +0]);
readIdx += 4;
CheckAndMoveBytes();
return ret;
}
}
免责声明:本文系网络转载或改编,未找到原创作者,版权归原作者所有。如涉及版权,请联系删