Network - Fixed receiving parts of packages.

This commit is contained in:
Pontus Fransson 2014-01-30 23:22:46 +01:00
parent 4167b6a93b
commit 3b8a8e9c09
3 changed files with 117 additions and 7 deletions

View File

@ -16,11 +16,15 @@
#include "../../Misc/Utilities.h"
#include "../../Misc/Thread/IThreadObject.h"
#include "../../Misc/Thread/OysterThread.h"
#include "../../Misc/Packing/Packing.h"
#include <queue>
using namespace Oyster::Network;
using namespace Oyster::Thread;
using namespace Utility::DynamicMemory;
using namespace Utility::Container;
using namespace std;
/*************************************
PrivateData
@ -35,11 +39,12 @@ struct NetworkClient::PrivateData : public IThreadObject
Translator translator;
OysterThread thread;
OysterByte recieveBuffer;
//Message queue for sending and recieving
ThreadSafeQueue<CustomNetProtocol> sendQueue;
ThreadSafeQueue<NetEvent<NetworkClient*, NetworkClient::ClientEventArgs>> recieveQueue;
//ID
static unsigned int currID;
const unsigned int ID;
@ -66,7 +71,7 @@ struct NetworkClient::PrivateData : public IThreadObject
bool DoWork() override
{
if(!this->connection.IsConnected()) return false;
Send();
Recv();
@ -82,6 +87,7 @@ struct NetworkClient::PrivateData : public IThreadObject
CustomNetProtocol p = this->sendQueue.Pop();
this->translator.Pack(temp, p);
errorCode = this->connection.Send(temp);
if(errorCode != 0)
{
CEA parg;
@ -103,9 +109,13 @@ struct NetworkClient::PrivateData : public IThreadObject
if(errorCode == 0 && temp.GetSize())
{
HandleRecievedData(temp);
/* Replaced with EmptyOutbufferedQueue() and HandleRecievedData(OysterByte)
CustomNetProtocol protocol;
bool ok = this->translator.Unpack(protocol, temp);
//Check if the protocol was unpacked correctly
if(ok)
{
@ -114,7 +124,8 @@ struct NetworkClient::PrivateData : public IThreadObject
parg.data.protocol = protocol;
NetEvent<NetworkClient*, NetworkClient::ClientEventArgs> e = { this->parent, parg };
this->recieveQueue.Push(e);
}
}*/
}
//else
//{
@ -127,6 +138,76 @@ struct NetworkClient::PrivateData : public IThreadObject
return errorCode;
}
void HandleRecievedData(OysterByte& data)
{
//Loop through all packages that was recieved and add them to the queue.
unsigned int size = 0;
Oyster::Network::OysterByte msg;
//If there is part of a message in the buffer.
if(recieveBuffer.GetSize() > 0)
{
//cout << "the buffer size: " << recvBuffer.GetSize() <<endl;
unsigned int temp = recieveBuffer.GetSize();
size = Oyster::Packing::Unpacki(recieveBuffer);
if(temp + data.GetSize() > size)
{
msg = recieveBuffer;
recieveBuffer.Clear();
size -= msg.GetSize();
msg.AppendPartOfArray(data, 0, size);
UnpackMessageAndAddToQueue(msg);
}
else if(temp + data.GetSize() == size)
{
msg = recieveBuffer;
recieveBuffer.Clear();
size -= msg.GetSize();
msg += data;
UnpackMessageAndAddToQueue(msg);
return;
}
else
{
recieveBuffer += data;
size = data.GetSize();
return;
}
}
for(unsigned int i = size; i < data.GetSize(); i += size)
{
size = Oyster::Packing::Unpacki(&data.GetByteArray()[i]);
if(i+size > data.GetSize())
{
//Add it to the recvBuffer instead.
recieveBuffer.AppendPartOfArray(data, i, data.GetSize());
break;
}
msg.Clear();
msg.AppendPartOfArray(data, i, i+size);
UnpackMessageAndAddToQueue(msg);
}
}
void UnpackMessageAndAddToQueue(OysterByte& msg)
{
CustomNetProtocol protocol;
bool ok = this->translator.Unpack(protocol, msg);
//Check if the protocol was unpacked correctly
if(ok)
{
CEA parg;
parg.type = CEA::EventType_ProtocolRecieved;
parg.data.protocol = protocol;
NetEvent<NetworkClient*, NetworkClient::ClientEventArgs> e = { this->parent, parg };
this->recieveQueue.Push(e);
}
}
};
unsigned int NetworkClient::PrivateData::currID = 0;

View File

@ -48,7 +48,7 @@ void OysterByte::Resize(unsigned int cap)
}
}
int OysterByte::GetSize()
unsigned int OysterByte::GetSize()
{
return size;
}
@ -60,7 +60,7 @@ unsigned char* OysterByte::GetByteArray()
void OysterByte::AddSize(unsigned int size)
{
int newCapacity = this->size + size;
unsigned int newCapacity = this->size + size;
if(newCapacity >= capacity)
{
@ -81,6 +81,32 @@ void OysterByte::SetSize(unsigned int size)
this->size = size;
}
void OysterByte::AppendPartOfArray(OysterByte& source, unsigned int startIndex, unsigned int endIndex)
{
if(startIndex < 0 && startIndex >= endIndex)
return;
if(endIndex > source.size)
return;
unsigned int totalSize = endIndex - startIndex;
totalSize += size;
//Make sure the new data can fit in the array.
if(totalSize > capacity)
{
IncreaseCapacity(totalSize);
}
//Copy over new data.
for(unsigned int i = size; i < totalSize; i++)
{
byteArray[i] = source.byteArray[startIndex++];
}
//Set the new size
size = totalSize;
}
OysterByte& OysterByte::operator =(const OysterByte& obj)
{
delete[] this->byteArray;

View File

@ -25,7 +25,7 @@ namespace Oyster
//Resizes the array with, it does not keep anything in it.
void Resize(unsigned int cap);
int GetSize();
unsigned int GetSize();
unsigned char* GetByteArray();
void AddSize(unsigned int size);
@ -34,6 +34,9 @@ namespace Oyster
//Only sets the private variable 'size'
void SetSize(unsigned int size);
//Copies over a part of the addFrom array and adds it to the end of this array.
void AppendPartOfArray(OysterByte& source, unsigned int startIndex, unsigned int endIndex);
OysterByte& operator =(const OysterByte& obj);
operator char*();