Reworked NetworkClient's thread.
Cpu usage is decreased and recv/send rate should be increased.
This commit is contained in:
parent
6eb0a282c2
commit
8e9d1a4999
|
@ -29,10 +29,10 @@ using namespace DanBias;
|
||||||
float dt = (float)this->logicTimer.getElapsedSeconds();
|
float dt = (float)this->logicTimer.getElapsedSeconds();
|
||||||
if( dt >= this->logicFrameTime )
|
if( dt >= this->logicFrameTime )
|
||||||
{
|
{
|
||||||
|
this->logicTimer.reset();
|
||||||
|
|
||||||
this->ProcessClients();
|
this->ProcessClients();
|
||||||
this->gameInstance.NewFrame();
|
this->gameInstance.NewFrame();
|
||||||
|
|
||||||
this->logicTimer.reset();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,17 @@ struct NetworkClient::PrivateData : public IThreadObject
|
||||||
ThreadSafeQueue<CustomNetProtocol> sendQueue;
|
ThreadSafeQueue<CustomNetProtocol> sendQueue;
|
||||||
ThreadSafeQueue<NetEvent<NetworkClient*, NetworkClient::ClientEventArgs>> recieveQueue;
|
ThreadSafeQueue<NetEvent<NetworkClient*, NetworkClient::ClientEventArgs>> recieveQueue;
|
||||||
|
|
||||||
|
//Testing for eventSelect.
|
||||||
|
HANDLE socketEvents[2];
|
||||||
|
HANDLE shutdownEvent;
|
||||||
|
|
||||||
|
//The OysterByte each message is packed in.
|
||||||
|
OysterByte tempMessage;
|
||||||
|
|
||||||
|
//Used to buffer messages
|
||||||
|
OysterByte bufferedSend;
|
||||||
|
int numPackages;
|
||||||
|
|
||||||
//ID
|
//ID
|
||||||
static unsigned int currID;
|
static unsigned int currID;
|
||||||
const unsigned int ID;
|
const unsigned int ID;
|
||||||
|
@ -66,13 +77,17 @@ struct NetworkClient::PrivateData : public IThreadObject
|
||||||
, owner(0)
|
, owner(0)
|
||||||
, outputEvent(0)
|
, outputEvent(0)
|
||||||
{
|
{
|
||||||
|
numPackages = 0;
|
||||||
|
bufferedSend.Resize(MAX_NETWORK_MESSAGE_SIZE);
|
||||||
|
tempMessage.Resize(MAX_NETWORK_MESSAGE_SIZE);
|
||||||
InitWinSock();
|
InitWinSock();
|
||||||
this->thread.Create(this, false);
|
this->thread.Create(this, false);
|
||||||
this->thread.SetPriority(Oyster::Thread::OYSTER_THREAD_PRIORITY_1);
|
this->thread.SetPriority(Oyster::Thread::OYSTER_THREAD_PRIORITY_1);
|
||||||
}
|
}
|
||||||
~PrivateData()
|
~PrivateData()
|
||||||
{
|
{
|
||||||
this->thread.Terminate();
|
SetEvent(shutdownEvent);
|
||||||
|
this->thread.Wait();
|
||||||
|
|
||||||
ShutdownWinSock();
|
ShutdownWinSock();
|
||||||
this->connection.Disconnect();
|
this->connection.Disconnect();
|
||||||
|
@ -80,15 +95,150 @@ struct NetworkClient::PrivateData : public IThreadObject
|
||||||
this->parent = 0;
|
this->parent = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ThreadEntry()
|
||||||
|
{
|
||||||
|
//Create alla events used in the thread
|
||||||
|
shutdownEvent = CreateEvent(NULL, true, false, NULL);
|
||||||
|
socketEvents[0] = WSACreateEvent();
|
||||||
|
socketEvents[1] = WSACreateEvent();
|
||||||
|
|
||||||
|
if(socketEvents[0] == WSA_INVALID_EVENT)
|
||||||
|
{
|
||||||
|
//Error
|
||||||
|
}
|
||||||
|
|
||||||
|
if(WSAEventSelect(this->connection.GetSocket(), socketEvents[0], FD_READ) == SOCKET_ERROR)
|
||||||
|
{
|
||||||
|
//Error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadExit()
|
||||||
|
{
|
||||||
|
WSACloseEvent(socketEvents[0]);
|
||||||
|
WSACloseEvent(socketEvents[1]);
|
||||||
|
CloseHandle(shutdownEvent);
|
||||||
|
}
|
||||||
|
|
||||||
bool DoWork() override
|
bool DoWork() override
|
||||||
|
{
|
||||||
|
WSANETWORKEVENTS wsaEvents;
|
||||||
|
|
||||||
|
while(WaitForSingleObject(shutdownEvent, 0) != WAIT_OBJECT_0)
|
||||||
{
|
{
|
||||||
if(!this->connection.IsConnected()) return false;
|
if(!this->connection.IsConnected()) return false;
|
||||||
|
|
||||||
Send();
|
int result = WSAWaitForMultipleEvents(2, socketEvents, FALSE, 100, FALSE) - WSA_WAIT_EVENT_0;
|
||||||
|
if(result == 0)
|
||||||
|
{
|
||||||
|
WSAEnumNetworkEvents(this->connection.GetSocket(), socketEvents[0], &wsaEvents);
|
||||||
|
if((wsaEvents.lNetworkEvents & FD_READ) && (wsaEvents.iErrorCode[FD_READ_BIT] == 0))
|
||||||
|
{
|
||||||
|
//Recieve a message
|
||||||
Recv();
|
Recv();
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
else if(result == 1)
|
||||||
|
{
|
||||||
|
//Send all messages in the sendQueue
|
||||||
|
int i = this->sendQueue.Size();
|
||||||
|
WSAResetEvent(socketEvents[1]);
|
||||||
|
|
||||||
|
if(i == 1)
|
||||||
|
{
|
||||||
|
Send();
|
||||||
|
}
|
||||||
|
else if(i > 1)
|
||||||
|
{
|
||||||
|
for(int j = 0; j < i; j++)
|
||||||
|
BufferMessage();
|
||||||
|
|
||||||
|
SendBuffer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SendBuffer()
|
||||||
|
{
|
||||||
|
if(bufferedSend.GetSize() > 0)
|
||||||
|
{
|
||||||
|
this->connection.Send(bufferedSend);
|
||||||
|
//printf("2. %d, %d\n", numPackages, bufferedSend.GetSize());
|
||||||
|
bufferedSend.Clear();
|
||||||
|
|
||||||
|
//Debug
|
||||||
|
numPackages = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void BufferMessage()
|
||||||
|
{
|
||||||
|
int errorCode = 0;
|
||||||
|
|
||||||
|
if(!this->sendQueue.IsEmpty())
|
||||||
|
{
|
||||||
|
CustomNetProtocol p = this->sendQueue.Pop();
|
||||||
|
|
||||||
|
this->translator.Pack(tempMessage, p);
|
||||||
|
|
||||||
|
|
||||||
|
if(tempMessage.GetSize() > MAX_NETWORK_MESSAGE_SIZE - bufferedSend.GetSize())
|
||||||
|
{
|
||||||
|
//Send buffered message
|
||||||
|
errorCode = this->connection.Send(bufferedSend);
|
||||||
|
//printf("2. %d, %d\n", numPackages, bufferedSend.GetSize());
|
||||||
|
bufferedSend.Clear();
|
||||||
|
|
||||||
|
//Debug
|
||||||
|
numPackages = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bufferedSend += tempMessage;
|
||||||
|
tempMessage.Clear();
|
||||||
|
|
||||||
|
//Debug
|
||||||
|
numPackages++;
|
||||||
|
|
||||||
|
if(errorCode != 0 && errorCode != WSAEWOULDBLOCK)
|
||||||
|
{
|
||||||
|
if( errorCode == WSAECONNABORTED || errorCode == WSAENOTCONN)
|
||||||
|
{
|
||||||
|
CEA parg;
|
||||||
|
parg.type = CEA::EventType_Disconnect;
|
||||||
|
parg.data.protocol = p;
|
||||||
|
NetEvent<NetworkClient*, CEA> e = { this->parent, parg };
|
||||||
|
this->recieveQueue.Push(e);
|
||||||
|
|
||||||
|
if(this->outputEvent)
|
||||||
|
{
|
||||||
|
printf("\t(ID: %i | IP: %s | Protocol: %i) - EventType_Disconnect && EventType_ProtocolFailedToSend \n", this->ID, this->connection.GetIpAddress().c_str(), p[0].value.netShort);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
CEA parg;
|
||||||
|
parg.type = CEA::EventType_ProtocolFailedToSend;
|
||||||
|
parg.data.protocol = p;
|
||||||
|
NetEvent<NetworkClient*, CEA> e = { this->parent, parg };
|
||||||
|
this->recieveQueue.Push(e);
|
||||||
|
|
||||||
|
if(this->outputEvent)
|
||||||
|
{
|
||||||
|
printf("\t(ID: %i | IP: %s | Protocol: %i) - EventType_ProtocolFailedToSend\n", this->ID, this->connection.GetIpAddress().c_str(), p[0].value.netShort);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(this->outputEvent)
|
||||||
|
{
|
||||||
|
printf("\t(ID: %i | IP: %s | Protocol: %i) Message sent!\n", this->ID, this->connection.GetIpAddress().c_str(), p[0].value.netShort);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int Send()
|
int Send()
|
||||||
{
|
{
|
||||||
int errorCode = 0;
|
int errorCode = 0;
|
||||||
|
@ -96,11 +246,11 @@ struct NetworkClient::PrivateData : public IThreadObject
|
||||||
if(!this->sendQueue.IsEmpty())
|
if(!this->sendQueue.IsEmpty())
|
||||||
{
|
{
|
||||||
//printf("\t(%i)\n", this->sendQueue.Size());
|
//printf("\t(%i)\n", this->sendQueue.Size());
|
||||||
OysterByte temp;
|
|
||||||
CustomNetProtocol p = this->sendQueue.Pop();
|
CustomNetProtocol p = this->sendQueue.Pop();
|
||||||
|
|
||||||
this->translator.Pack(temp, p);
|
this->translator.Pack(tempMessage, p);
|
||||||
errorCode = this->connection.Send(temp);
|
errorCode = this->connection.Send(tempMessage);
|
||||||
|
tempMessage.Clear();
|
||||||
|
|
||||||
if(errorCode != 0 && errorCode != WSAEWOULDBLOCK)
|
if(errorCode != 0 && errorCode != WSAEWOULDBLOCK)
|
||||||
{
|
{
|
||||||
|
@ -144,12 +294,12 @@ struct NetworkClient::PrivateData : public IThreadObject
|
||||||
{
|
{
|
||||||
int errorCode = -1;
|
int errorCode = -1;
|
||||||
|
|
||||||
OysterByte temp = OysterByte();
|
errorCode = this->connection.Recieve(tempMessage);
|
||||||
errorCode = this->connection.Recieve(temp);
|
|
||||||
|
|
||||||
if(errorCode == 0 && temp.GetSize())
|
if(errorCode == 0 && tempMessage.GetSize())
|
||||||
{
|
{
|
||||||
HandleRecievedData(temp);
|
HandleRecievedData(tempMessage);
|
||||||
|
tempMessage.Clear();
|
||||||
|
|
||||||
|
|
||||||
/* Replaced with EmptyOutbufferedQueue() and HandleRecievedData(OysterByte)
|
/* Replaced with EmptyOutbufferedQueue() and HandleRecievedData(OysterByte)
|
||||||
|
@ -312,7 +462,7 @@ bool NetworkClient::Connect(ConnectionInfo& socket)
|
||||||
if(this->privateData) return false;
|
if(this->privateData) return false;
|
||||||
if(!this->privateData) this->privateData = new PrivateData();
|
if(!this->privateData) this->privateData = new PrivateData();
|
||||||
|
|
||||||
int result = this->privateData->connection.Connect(socket, false);
|
int result = this->privateData->connection.Connect(socket, true);
|
||||||
|
|
||||||
//Connect has succeeded
|
//Connect has succeeded
|
||||||
if(result != 0) return false;
|
if(result != 0) return false;
|
||||||
|
@ -333,7 +483,7 @@ bool NetworkClient::Connect(unsigned short port, const char serverIP[])
|
||||||
if(!this->privateData)
|
if(!this->privateData)
|
||||||
this->privateData = new PrivateData();
|
this->privateData = new PrivateData();
|
||||||
|
|
||||||
int result = this->privateData->connection.Connect(port, serverIP, false);
|
int result = this->privateData->connection.Connect(port, serverIP, true);
|
||||||
|
|
||||||
//Connect has succeeded
|
//Connect has succeeded
|
||||||
if(result != 0) return false;
|
if(result != 0) return false;
|
||||||
|
@ -381,7 +531,9 @@ void NetworkClient::Disconnect()
|
||||||
{
|
{
|
||||||
if(!privateData) return;
|
if(!privateData) return;
|
||||||
|
|
||||||
privateData->thread.Stop();
|
SetEvent(privateData->shutdownEvent);
|
||||||
|
privateData->thread.Wait();
|
||||||
|
|
||||||
privateData->connection.Disconnect();
|
privateData->connection.Disconnect();
|
||||||
this->privateData->sendQueue.Clear();
|
this->privateData->sendQueue.Clear();
|
||||||
this->privateData->recieveQueue.Clear();
|
this->privateData->recieveQueue.Clear();
|
||||||
|
@ -390,11 +542,13 @@ void NetworkClient::Disconnect()
|
||||||
void NetworkClient::Send(CustomProtocolObject& protocol)
|
void NetworkClient::Send(CustomProtocolObject& protocol)
|
||||||
{
|
{
|
||||||
this->privateData->sendQueue.Push(protocol.GetProtocol());
|
this->privateData->sendQueue.Push(protocol.GetProtocol());
|
||||||
|
WSASetEvent(this->privateData->socketEvents[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
void NetworkClient::Send(CustomNetProtocol& protocol)
|
void NetworkClient::Send(CustomNetProtocol& protocol)
|
||||||
{
|
{
|
||||||
this->privateData->sendQueue.Push(protocol);
|
this->privateData->sendQueue.Push(protocol);
|
||||||
|
WSASetEvent(this->privateData->socketEvents[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
void NetworkClient::SetOwner(NetworkSession* owner)
|
void NetworkClient::SetOwner(NetworkSession* owner)
|
||||||
|
|
|
@ -188,8 +188,8 @@ int Connection::Recieve(OysterByte &bytes)
|
||||||
if(this->closed) return -1;
|
if(this->closed) return -1;
|
||||||
int nBytes;
|
int nBytes;
|
||||||
|
|
||||||
bytes.Resize(1000);
|
bytes.Resize(MAX_NETWORK_MESSAGE_SIZE);
|
||||||
nBytes = recv(this->socket, bytes, 1000, 0);
|
nBytes = recv(this->socket, bytes, MAX_NETWORK_MESSAGE_SIZE, 0);
|
||||||
if(nBytes == SOCKET_ERROR)
|
if(nBytes == SOCKET_ERROR)
|
||||||
{
|
{
|
||||||
bytes.SetSize(0);
|
bytes.SetSize(0);
|
||||||
|
@ -263,6 +263,11 @@ std::string Connection::GetIpAddress()
|
||||||
return this->addr;
|
return this->addr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int Connection::GetSocket()
|
||||||
|
{
|
||||||
|
return socket;
|
||||||
|
}
|
||||||
|
|
||||||
///////////////////////////////////////
|
///////////////////////////////////////
|
||||||
//Private functions
|
//Private functions
|
||||||
///////////////////////////////////////
|
///////////////////////////////////////
|
||||||
|
|
|
@ -39,6 +39,8 @@ namespace Oyster
|
||||||
int SetBlockingMode( bool blocking );
|
int SetBlockingMode( bool blocking );
|
||||||
|
|
||||||
std::string GetIpAddress();
|
std::string GetIpAddress();
|
||||||
|
int GetSocket();
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int InitiateSocket();
|
int InitiateSocket();
|
||||||
|
|
|
@ -135,10 +135,10 @@ int ConnectionUDP::Recieve(OysterByte &bytes)
|
||||||
sockaddr_in from;
|
sockaddr_in from;
|
||||||
int fromLength = sizeof( from );
|
int fromLength = sizeof( from );
|
||||||
|
|
||||||
bytes.Resize(1000);
|
bytes.Resize(MAX_NETWORK_MESSAGE_SIZE);
|
||||||
nBytes = recvfrom(this->socket,
|
nBytes = recvfrom(this->socket,
|
||||||
bytes,
|
bytes,
|
||||||
1000,
|
MAX_NETWORK_MESSAGE_SIZE,
|
||||||
0,
|
0,
|
||||||
(sockaddr*)&from,
|
(sockaddr*)&from,
|
||||||
&fromLength
|
&fromLength
|
||||||
|
|
|
@ -11,6 +11,8 @@ namespace Oyster
|
||||||
{
|
{
|
||||||
namespace Network
|
namespace Network
|
||||||
{
|
{
|
||||||
|
const int MAX_NETWORK_MESSAGE_SIZE = 1400;
|
||||||
|
|
||||||
struct ConnectionInfo
|
struct ConnectionInfo
|
||||||
{
|
{
|
||||||
int socket;
|
int socket;
|
||||||
|
@ -44,6 +46,8 @@ namespace Oyster
|
||||||
|
|
||||||
//Disconnects the client or server TODO: optimize!
|
//Disconnects the client or server TODO: optimize!
|
||||||
virtual int Disconnect() = 0;
|
virtual int Disconnect() = 0;
|
||||||
|
|
||||||
|
virtual int GetSocket() { return -1; };
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue