SyncServer.cpp   SyncServer.cpp 
skipping to change at line 29 skipping to change at line 29
#include "SyncServer.hpp" #include "SyncServer.hpp"
#include "SyncMessages.hpp" #include "SyncMessages.hpp"
#include "SyncServerHandlers.hpp" #include "SyncServerHandlers.hpp"
#include "SyncServerEventSenders.hpp" #include "SyncServerEventSenders.hpp"
#include <QTcpServer> #include <QTcpServer>
#include <QTcpSocket> #include <QTcpSocket>
#include <QTimerEvent> #include <QTimerEvent>
Q_LOGGING_CATEGORY(syncServer,"stel.plugin.remoteSync.server")
using namespace SyncProtocol; using namespace SyncProtocol;
SyncServer::SyncServer(QObject* parent) SyncServer::SyncServer(QObject* parent)
: QObject(parent) : QObject(parent), stopping(false), timeoutTimerId(-1)
{ {
qserver = new QTcpServer(this); qserver = new QTcpServer(this);
connect(qserver,SIGNAL(newConnection()), this, SLOT(handleNewConnect ion())); connect(qserver,SIGNAL(newConnection()), this, SLOT(handleNewConnect ion()));
connect(qserver,SIGNAL(acceptError(QAbstractSocket::SocketError)),th is,SLOT(connectionError(QAbstractSocket::SocketError))); connect(qserver,SIGNAL(acceptError(QAbstractSocket::SocketError)),th is,SLOT(connectionError(QAbstractSocket::SocketError)));
//create message handlers
handlerList.resize(MSGTYPE_SIZE);
handlerList[ERROR] = new ServerErrorHandler();
handlerList[CLIENT_CHALLENGE_RESPONSE] = new ServerAuthHandler(this,
false);
handlerList[ALIVE] = new ServerAliveHandler();
} }
SyncServer::~SyncServer() SyncServer::~SyncServer()
{ {
stop(); stop();
//delete handlers
foreach(SyncMessageHandler* h, handlerList)
{
if(h)
delete h;
}
handlerList.clear();
qCDebug(syncServer)<<"Destroyed";
} }
bool SyncServer::start(int port) bool SyncServer::start(int port)
{ {
if(qserver->isListening()) if(qserver->isListening())
stop(); stop();
bool ok = qserver->listen(QHostAddress::Any, port); bool ok = qserver->listen(QHostAddress::Any, port);
if(ok) if(ok)
{ {
qDebug()<<"[SyncServer] Started on port"<<port; qCDebug(syncServer)<<"Started on port"<<port;
//create message handlers timeoutTimerId = startTimer(5000,Qt::VeryCoarseTimer);
handlerList.resize(MSGTYPE_SIZE);
handlerList[ERROR] = new ServerErrorHandler();
handlerList[CLIENT_CHALLENGE_RESPONSE] = new ServerAuthHandl
er(this, true);
handlerList[ALIVE] = new ServerAliveHandler();
//create senders
addSender(new TimeEventSender()); addSender(new TimeEventSender());
addSender(new LocationEventSender()); addSender(new LocationEventSender());
addSender(new SelectionEventSender()); addSender(new SelectionEventSender());
addSender(new StelPropertyEventSender());
timeoutTimerId = startTimer(5000,Qt::VeryCoarseTimer); addSender(new ViewEventSender());
addSender(new FovEventSender());
} }
else else
qDebug()<<"[SyncServer] Error while starting:"<<errorString( ); qCCritical(syncServer)<<"Error while starting:"<<qserver->er rorString();
return ok; return ok;
} }
void SyncServer::addSender(SyncServerEventSender *snd) void SyncServer::addSender(SyncServerEventSender *snd)
{ {
snd->server = this; snd->server = this;
senderList.append(snd); senderList.append(snd);
} }
void SyncServer::broadcastMessage(const SyncMessage &msg) void SyncServer::broadcastMessage(const SyncMessage &msg)
{ {
qDebug()<<"[SyncServer] Broadcast message"<<msg.getMessageType(); qCDebug(syncServer)<<"Broadcast message"<<msg;
qint64 size = msg.createFullMessage(broadcastBuffer); qint64 size = msg.createFullMessage(broadcastBuffer);
if(!size) if(!size)
{ {
//crash here when message is too large in debugging //crash here when message is too large in debugging
Q_ASSERT(true); Q_ASSERT(true);
qCritical()<<"[SyncServer] A message is too large for broadc qCCritical(syncServer)<<"A message is too large for broadcas
ast! Message buffer contents follow..."; t! Message buffer contents follow...";
qCritical()<<broadcastBuffer.toHex(); qCCritical(syncServer)<<broadcastBuffer.toHex();
//stop server //stop server
stop(); stop();
return; return;
} }
for(tClientMap::iterator it = clients.begin();it!=clients.end();++it ) for(tClientList::iterator it = clients.begin();it!=clients.end();++i t)
{ {
SyncRemotePeer& client = it.value(); SyncRemotePeer* client = *it;
if(client.isAuthenticated) if(client->isAuthenticated())
{ {
client.writeData(broadcastBuffer,size); client->writeData(broadcastBuffer,size);
} }
} }
} }
void SyncServer::stop() void SyncServer::stop()
{ {
if(qserver->isListening()) if(qserver->isListening())
{ {
stopping = true;
killTimer(timeoutTimerId); killTimer(timeoutTimerId);
qserver->close(); qserver->close();
for(tClientMap::iterator it = clients.begin();it!=clients.en
d(); )
{
//this may cause disconnected signal, which will rem
ove the client
QAbstractSocket* sock = it.key();
sock->disconnectFromHost();
if(sock->state() != QAbstractSocket::UnconnectedStat
e)
{
if(!sock->waitForDisconnected(500))
{
sock->abort();
sock->deleteLater();
++it;
}
else
{
//restart iterator because it is mos
t likely invalid
it = clients.begin();
}
}
else
{
//restart iterator because it is most likely
invalid
it = clients.begin();
}
}
clients.clear();
//delete handlers
foreach(SyncMessageHandler* h, handlerList)
{
if(h)
delete h;
}
handlerList.clear();
//delete senders //delete senders
foreach(SyncServerEventSender* s, senderList) foreach(SyncServerEventSender* s, senderList)
{ {
if(s) if(s)
delete s; delete s;
} }
senderList.clear(); senderList.clear();
qDebug()<<"[SyncServer] Stopped"; for(tClientList::iterator it = clients.begin();it!=clients.e
nd(); )
{
//this may cause disconnected signal, which will rem
ove the client
SyncRemotePeer* peer = *it;
peer->disconnectPeer();
}
qCDebug(syncServer)<<"Stopped listening";
checkStopState();
} }
} }
void SyncServer::update() void SyncServer::update()
{ {
foreach(SyncServerEventSender* s, senderList) foreach(SyncServerEventSender* s, senderList)
{ {
s->update(); s->update();
} }
} }
skipping to change at line 177 skipping to change at line 168
{ {
if(evt->timerId() == timeoutTimerId) if(evt->timerId() == timeoutTimerId)
{ {
checkTimeouts(); checkTimeouts();
evt->accept(); evt->accept();
} }
} }
void SyncServer::checkTimeouts() void SyncServer::checkTimeouts()
{ {
qint64 currentTime = QDateTime::currentMSecsSinceEpoch();
//iterate over the connected clients //iterate over the connected clients
for(tClientMap::iterator it = clients.begin(); it!=clients.end(); ) for(tClientList::iterator it = clients.begin(); it!=clients.end(); + +it)
{ {
qint64 writeDiff = currentTime - it.value().lastSendTime; (*it)->checkTimeout();
qint64 readDiff = currentTime - it.value().lastReceiveTime; }
}
if(writeDiff > 5000)
{
//no data sent to this client for some time, send a
ALIVE
Alive msg;
it.value().writeMessage(msg);
}
if(readDiff > 15000) void SyncServer::checkStopState()
{
if(stopping)
{
if(clients.isEmpty())
{ {
if(it.key()->state() == QAbstractSocket::ConnectedSt qCDebug(syncServer)<<"All clients disconnected";
ate) stopping = false;
{ emit serverStopped();
//no data received for some time, assume cli
ent timed out
clientLog(it.key(),QString("No data received
for %1ms, timing out").arg(readDiff));
it.key()->disconnectFromHost();
//restart iterator, disconnect may have modi
fied client list
it = clients.begin();
}
} }
else
++it;
} }
} }
QString SyncServer::errorString() const QString SyncServer::errorString() const
{ {
return qserver->errorString(); return qserver->errorString();
} }
void SyncServer::handleNewConnection() void SyncServer::handleNewConnection()
{ {
QTcpSocket* newConn = qserver->nextPendingConnection(); QTcpSocket* newConn = qserver->nextPendingConnection();
clientLog(newConn,"New Connection");
SyncRemotePeer* newClient = new SyncRemotePeer(newConn,false,handler
List);
newClient->peerLog("New client connection");
//add to client list //add to client list
clients.insert(newConn,SyncRemotePeer(newConn,false,handlerList)); clients.append(newClient);
SyncRemotePeer& peer = clients.last();
//assign an ID to the client
peer.id = QUuid::createUuid();
qDebug()<<"[SyncServer] "<<clients.size()<<" current connections";
//hook up disconnect, error and data signals
connect(newConn, SIGNAL(disconnected()), this, SLOT(clientDisconnect
ed()));
connect(newConn, SIGNAL(error(QAbstractSocket::SocketError)), this,
SLOT(clientError(QAbstractSocket::SocketError)));
connect(newConn, SIGNAL(readyRead()), this, SLOT(clientDataReceived(
)));
//set low delay option qCDebug(syncServer)<<clients.size()<<"current connections";
newConn->setSocketOption(QAbstractSocket::LowDelayOption,1);
//hook up disconnect signal
connect(newClient, SIGNAL(disconnected(bool)), this, SLOT(clientDisc
onnected(bool)));
//write challenge //write challenge
ServerChallenge msg; ServerChallenge msg;
msg.clientId = peer.id; msg.clientId = newClient->getID();
peer.writeMessage(msg); newClient->writeMessage(msg);
}
void SyncServer::clientDataReceived()
{
qDebug()<<"[SyncServer] client data received";
QAbstractSocket* sock = qobject_cast<QAbstractSocket*>(sender());
tClientMap::iterator it = clients.find(sock);
if(it!=clients.end())
(*it).receiveMessage();
else
{
Q_ASSERT(false);
qCritical()<<"Received data from socket without client";
sock->disconnectFromHost();
sock->deleteLater();
}
}
void SyncServer::clientError(QAbstractSocket::SocketError)
{
//Note: we also get an error if the client has disconnected, handle
it differently?
QAbstractSocket* sock = qobject_cast<QAbstractSocket*>(sender());
clientLog(sock, "Socket error: " + sock->errorString());
} }
void SyncServer::clientAuthenticated(SyncRemotePeer &peer) void SyncServer::clientAuthenticated(SyncRemotePeer &peer)
{ {
//we have to send the client the current app state //we have to send the client the current app state
foreach(SyncServerEventSender* s, senderList) foreach(SyncServerEventSender* s, senderList)
{ {
s->newClientConnected(peer); s->newClientConnected(peer);
} }
} }
void SyncServer::clientDisconnected() void SyncServer::clientDisconnected(bool clean)
{ {
QAbstractSocket* sock = qobject_cast<QAbstractSocket*>(sender()); SyncRemotePeer* peer = qobject_cast<SyncRemotePeer*>(sender());
clientLog(sock, "Socket disconnected");
clients.remove(sock);
sock->deleteLater();
qDebug()<<"[SyncServer] "<<clients.size()<<" current connections";
}
void SyncServer::connectionError(QAbstractSocket::SocketError err) if(!clean)
{ {
qWarning()<<"[SyncServer] Could not accept an incoming connection, s qCWarning(syncServer)<<"Client disconnected with error"<<pee
ocket error is: "<<err; r->getError();
}
clients.removeAll(peer);
peer->deleteLater();
qCDebug(syncServer)<<clients.size()<<"current connections";
checkStopState();
} }
void SyncServer::clientLog(QAbstractSocket *cl, const QString &msg) void SyncServer::connectionError(QAbstractSocket::SocketError err)
{ {
qDebug()<<"[SyncServer][Client"<<(cl->peerAddress().toString() + ":" Q_UNUSED(err);
+ QString::number(cl->peerPort()))<<"]:"<<msg; qCWarning(syncServer)<<"Could not accept an incoming connection, soc
ket error is: "<<qserver->errorString();
} }
 End of changes. 33 change blocks. 
146 lines changed or deleted 86 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/