From 0c458ec8c96d15f96947f1fe401654a903152a72 Mon Sep 17 00:00:00 2001 From: Nils Jeisecke Date: Mon, 31 May 2021 15:23:18 +0200 Subject: [PATCH 1/2] Add simple acknowledge protocol to ensure the server has received all data The server now acknowledges every received message by sending a \r. By waiting for the acknowledgement, clients should no longer terminate too early, causing bytes getting lost in Qt's internal socket handling. Message handling in the server is simplified because we can now rely on the readyRead signal being raised for every frame. This should finally solve #125 and #121. What remains is to correctly handle data sent using SingleApplication::sendMessage. --- singleapplication.cpp | 5 +---- singleapplication_p.cpp | 38 ++++++++++++++++++++++++++++---------- singleapplication_p.h | 2 ++ 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/singleapplication.cpp b/singleapplication.cpp index 7e153a0..09e264e 100644 --- a/singleapplication.cpp +++ b/singleapplication.cpp @@ -248,10 +248,7 @@ bool SingleApplication::sendMessage( const QByteArray &message, int timeout ) if( ! d->connectToPrimary( timeout, SingleApplicationPrivate::Reconnect ) ) return false; - d->socket->write( message ); - bool dataWritten = d->socket->waitForBytesWritten( timeout ); - d->socket->flush(); - return dataWritten; + return d->writeConfirmedMessage( timeout, message ); } /** diff --git a/singleapplication_p.cpp b/singleapplication_p.cpp index 27b688f..365799e 100644 --- a/singleapplication_p.cpp +++ b/singleapplication_p.cpp @@ -272,11 +272,31 @@ bool SingleApplicationPrivate::connectToPrimary( int msecs, ConnectionType conne #endif headerStream << static_cast ( initMsg.length() ); - socket->write( header ); - socket->write( initMsg ); - bool result = socket->waitForBytesWritten( static_cast(msecs - time.elapsed()) ); + if( !writeConfirmedMessage( static_cast(msecs - time.elapsed()), header ) ) + return false; + + if( !writeConfirmedMessage( static_cast(msecs - time.elapsed()), initMsg ) ) + return false; + + return true; +} + +void SingleApplicationPrivate::writeAck( QLocalSocket *sock ) { + sock->putChar('\n'); +} + +bool SingleApplicationPrivate::writeConfirmedMessage( int msecs, const QByteArray &msg ) +{ + socket->write( msg ); socket->flush(); - return result; + + bool result = socket->waitForReadyRead( msecs ); // await ack byte + if (result) { + socket->read( 1 ); + return true; + } + + return false; } quint16 SingleApplicationPrivate::blockChecksum() const @@ -379,9 +399,7 @@ void SingleApplicationPrivate::readInitMessageHeader( QLocalSocket *sock ) info.stage = StageBody; info.msgLen = msgLen; - if ( sock->bytesAvailable() >= (qint64) msgLen ){ - readInitMessageBody( sock ); - } + writeAck( sock ); } void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock ) @@ -448,15 +466,15 @@ void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock ) Q_EMIT q->instanceStarted(); } - if (sock->bytesAvailable() > 0){ - this->slotDataAvailable( sock, instanceId ); - } + writeAck( sock ); } void SingleApplicationPrivate::slotDataAvailable( QLocalSocket *dataSocket, quint32 instanceId ) { Q_Q(SingleApplication); Q_EMIT q->receivedMessage( instanceId, dataSocket->readAll() ); + + writeAck( dataSocket ); } void SingleApplicationPrivate::slotClientConnectionClosed( QLocalSocket *closedSocket, quint32 instanceId ) diff --git a/singleapplication_p.h b/singleapplication_p.h index c49a46d..0777c33 100644 --- a/singleapplication_p.h +++ b/singleapplication_p.h @@ -81,6 +81,8 @@ public: QString primaryUser() const; void readInitMessageHeader(QLocalSocket *socket); void readInitMessageBody(QLocalSocket *socket); + bool writeConfirmedMessage(int msecs, const QByteArray &msg); + void writeAck(QLocalSocket *sock); static void randomSleep(); void addAppData(const QString &data); QStringList appData() const; From 25a7b609822f6e8ab9ef6a7850f60c33a8d996f3 Mon Sep 17 00:00:00 2001 From: Nils Jeisecke Date: Mon, 31 May 2021 16:14:39 +0200 Subject: [PATCH 2/2] Ensure data sent via SingleApplication::sendMessage is received completely Whilst the initial "connect" message is framed with a length header, this was missing for the user data. Thus there was no guarantee that the message frame was really received completely on emitting the "receivedMessage" signal. This change splits the previous "StageConnected" state into "StageConnectedHeader" and "StageConnectedBody" and does some refactoring to allow using the same write and read functions as the "init" messages. --- singleapplication_p.cpp | 89 ++++++++++++++++++++++++++--------------- singleapplication_p.h | 13 +++--- 2 files changed, 65 insertions(+), 37 deletions(-) diff --git a/singleapplication_p.cpp b/singleapplication_p.cpp index 365799e..1339728 100644 --- a/singleapplication_p.cpp +++ b/singleapplication_p.cpp @@ -263,29 +263,35 @@ bool SingleApplicationPrivate::connectToPrimary( int msecs, ConnectionType conne #endif writeStream << checksum; - // The header indicates the message length that follows - QByteArray header; - QDataStream headerStream(&header, QIODevice::WriteOnly); - -#if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0)) - headerStream.setVersion(QDataStream::Qt_5_6); -#endif - headerStream << static_cast ( initMsg.length() ); - - if( !writeConfirmedMessage( static_cast(msecs - time.elapsed()), header ) ) - return false; - - if( !writeConfirmedMessage( static_cast(msecs - time.elapsed()), initMsg ) ) - return false; - - return true; + return writeConfirmedMessage( static_cast(msecs - time.elapsed()), initMsg ); } void SingleApplicationPrivate::writeAck( QLocalSocket *sock ) { sock->putChar('\n'); } -bool SingleApplicationPrivate::writeConfirmedMessage( int msecs, const QByteArray &msg ) +bool SingleApplicationPrivate::writeConfirmedMessage (int msecs, const QByteArray &msg) +{ + QElapsedTimer time; + time.start(); + + // Frame 1: The header indicates the message length that follows + QByteArray header; + QDataStream headerStream(&header, QIODevice::WriteOnly); + +#if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0)) + headerStream.setVersion(QDataStream::Qt_5_6); +#endif + headerStream << static_cast ( msg.length() ); + + if( ! writeConfirmedFrame( static_cast(msecs - time.elapsed()), header )) + return false; + + // Frame 2: The message + return writeConfirmedFrame( static_cast(msecs - time.elapsed()), msg ); +} + +bool SingleApplicationPrivate::writeConfirmedFrame( int msecs, const QByteArray &msg ) { socket->write( msg ); socket->flush(); @@ -360,13 +366,16 @@ void SingleApplicationPrivate::slotConnectionEstablished() [nextConnSocket, this](){ auto &info = connectionMap[nextConnSocket]; switch(info.stage){ - case StageHeader: - readInitMessageHeader(nextConnSocket); + case StageInitHeader: + readMessageHeader( nextConnSocket, StageInitBody ); break; - case StageBody: + case StageInitBody: readInitMessageBody(nextConnSocket); break; - case StageConnected: + case StageConnectedHeader: + readMessageHeader( nextConnSocket, StageConnectedBody ); + break; + case StageConnectedBody: this->slotDataAvailable( nextConnSocket, info.instanceId ); break; default: @@ -376,7 +385,7 @@ void SingleApplicationPrivate::slotConnectionEstablished() ); } -void SingleApplicationPrivate::readInitMessageHeader( QLocalSocket *sock ) +void SingleApplicationPrivate::readMessageHeader( QLocalSocket *sock, SingleApplicationPrivate::ConnectionStage nextStage ) { if (!connectionMap.contains( sock )){ return; @@ -396,27 +405,35 @@ void SingleApplicationPrivate::readInitMessageHeader( QLocalSocket *sock ) quint64 msgLen = 0; headerStream >> msgLen; ConnectionInfo &info = connectionMap[sock]; - info.stage = StageBody; + info.stage = nextStage; info.msgLen = msgLen; writeAck( sock ); } +bool SingleApplicationPrivate::isFrameComplete( QLocalSocket *sock ) +{ + if (!connectionMap.contains( sock )){ + return false; + } + + ConnectionInfo &info = connectionMap[sock]; + if( sock->bytesAvailable() < ( qint64 )info.msgLen ){ + return false; + } + + return true; +} + void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock ) { Q_Q(SingleApplication); - if (!connectionMap.contains( sock )){ + if( !isFrameComplete( sock ) ) return; - } - - ConnectionInfo &info = connectionMap[sock]; - if( sock->bytesAvailable() < ( qint64 )info.msgLen ){ - return; - } // Read the message body - QByteArray msgBytes = sock->read(info.msgLen); + QByteArray msgBytes = sock->readAll(); QDataStream readStream(msgBytes); #if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0)) @@ -456,8 +473,9 @@ void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock ) return; } + ConnectionInfo &info = connectionMap[sock]; info.instanceId = instanceId; - info.stage = StageConnected; + info.stage = StageConnectedHeader; if( connectionType == NewInstance || ( connectionType == SecondaryInstance && @@ -472,9 +490,16 @@ void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock ) void SingleApplicationPrivate::slotDataAvailable( QLocalSocket *dataSocket, quint32 instanceId ) { Q_Q(SingleApplication); + + if ( !isFrameComplete( dataSocket ) ) + return; + Q_EMIT q->receivedMessage( instanceId, dataSocket->readAll() ); writeAck( dataSocket ); + + ConnectionInfo &info = connectionMap[dataSocket]; + info.stage = StageConnectedHeader; } void SingleApplicationPrivate::slotClientConnectionClosed( QLocalSocket *closedSocket, quint32 instanceId ) diff --git a/singleapplication_p.h b/singleapplication_p.h index 0777c33..58507cf 100644 --- a/singleapplication_p.h +++ b/singleapplication_p.h @@ -61,9 +61,10 @@ public: Reconnect = 3 }; enum ConnectionStage : quint8 { - StageHeader = 0, - StageBody = 1, - StageConnected = 2, + StageInitHeader = 0, + StageInitBody = 1, + StageConnectedHeader = 2, + StageConnectedBody = 3, }; Q_DECLARE_PUBLIC(SingleApplication) @@ -79,10 +80,12 @@ public: quint16 blockChecksum() const; qint64 primaryPid() const; QString primaryUser() const; - void readInitMessageHeader(QLocalSocket *socket); + bool isFrameComplete(QLocalSocket *sock); + void readMessageHeader(QLocalSocket *socket, ConnectionStage nextStage); void readInitMessageBody(QLocalSocket *socket); - bool writeConfirmedMessage(int msecs, const QByteArray &msg); void writeAck(QLocalSocket *sock); + bool writeConfirmedFrame(int msecs, const QByteArray &msg); + bool writeConfirmedMessage(int msecs, const QByteArray &msg); static void randomSleep(); void addAppData(const QString &data); QStringList appData() const;