Ensure data sent via SingleApplication::sendMessage is received completely

Whilst the initial "connect" message is framed with a length header,
this was missing for the user data.

Thus there was no guarantee that the message frame was really received
completely on emitting the "receivedMessage" signal.

This change splits the previous "StageConnected" state into
"StageConnectedHeader" and "StageConnectedBody" and does some
refactoring to allow using the same write and read functions as the
"init" messages.
This commit is contained in:
Nils Jeisecke 2021-05-31 16:14:39 +02:00
parent 0c458ec8c9
commit 25a7b60982
2 changed files with 65 additions and 37 deletions

View File

@ -263,22 +263,7 @@ bool SingleApplicationPrivate::connectToPrimary( int msecs, ConnectionType conne
#endif #endif
writeStream << checksum; writeStream << checksum;
// The header indicates the message length that follows return writeConfirmedMessage( static_cast<int>(msecs - time.elapsed()), initMsg );
QByteArray header;
QDataStream headerStream(&header, QIODevice::WriteOnly);
#if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0))
headerStream.setVersion(QDataStream::Qt_5_6);
#endif
headerStream << static_cast <quint64>( initMsg.length() );
if( !writeConfirmedMessage( static_cast<int>(msecs - time.elapsed()), header ) )
return false;
if( !writeConfirmedMessage( static_cast<int>(msecs - time.elapsed()), initMsg ) )
return false;
return true;
} }
void SingleApplicationPrivate::writeAck( QLocalSocket *sock ) { void SingleApplicationPrivate::writeAck( QLocalSocket *sock ) {
@ -286,6 +271,27 @@ void SingleApplicationPrivate::writeAck( QLocalSocket *sock ) {
} }
bool SingleApplicationPrivate::writeConfirmedMessage (int msecs, const QByteArray &msg) bool SingleApplicationPrivate::writeConfirmedMessage (int msecs, const QByteArray &msg)
{
QElapsedTimer time;
time.start();
// Frame 1: The header indicates the message length that follows
QByteArray header;
QDataStream headerStream(&header, QIODevice::WriteOnly);
#if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0))
headerStream.setVersion(QDataStream::Qt_5_6);
#endif
headerStream << static_cast <quint64>( msg.length() );
if( ! writeConfirmedFrame( static_cast<int>(msecs - time.elapsed()), header ))
return false;
// Frame 2: The message
return writeConfirmedFrame( static_cast<int>(msecs - time.elapsed()), msg );
}
bool SingleApplicationPrivate::writeConfirmedFrame( int msecs, const QByteArray &msg )
{ {
socket->write( msg ); socket->write( msg );
socket->flush(); socket->flush();
@ -360,13 +366,16 @@ void SingleApplicationPrivate::slotConnectionEstablished()
[nextConnSocket, this](){ [nextConnSocket, this](){
auto &info = connectionMap[nextConnSocket]; auto &info = connectionMap[nextConnSocket];
switch(info.stage){ switch(info.stage){
case StageHeader: case StageInitHeader:
readInitMessageHeader(nextConnSocket); readMessageHeader( nextConnSocket, StageInitBody );
break; break;
case StageBody: case StageInitBody:
readInitMessageBody(nextConnSocket); readInitMessageBody(nextConnSocket);
break; break;
case StageConnected: case StageConnectedHeader:
readMessageHeader( nextConnSocket, StageConnectedBody );
break;
case StageConnectedBody:
this->slotDataAvailable( nextConnSocket, info.instanceId ); this->slotDataAvailable( nextConnSocket, info.instanceId );
break; break;
default: default:
@ -376,7 +385,7 @@ void SingleApplicationPrivate::slotConnectionEstablished()
); );
} }
void SingleApplicationPrivate::readInitMessageHeader( QLocalSocket *sock ) void SingleApplicationPrivate::readMessageHeader( QLocalSocket *sock, SingleApplicationPrivate::ConnectionStage nextStage )
{ {
if (!connectionMap.contains( sock )){ if (!connectionMap.contains( sock )){
return; return;
@ -396,27 +405,35 @@ void SingleApplicationPrivate::readInitMessageHeader( QLocalSocket *sock )
quint64 msgLen = 0; quint64 msgLen = 0;
headerStream >> msgLen; headerStream >> msgLen;
ConnectionInfo &info = connectionMap[sock]; ConnectionInfo &info = connectionMap[sock];
info.stage = StageBody; info.stage = nextStage;
info.msgLen = msgLen; info.msgLen = msgLen;
writeAck( sock ); writeAck( sock );
} }
bool SingleApplicationPrivate::isFrameComplete( QLocalSocket *sock )
{
if (!connectionMap.contains( sock )){
return false;
}
ConnectionInfo &info = connectionMap[sock];
if( sock->bytesAvailable() < ( qint64 )info.msgLen ){
return false;
}
return true;
}
void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock ) void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock )
{ {
Q_Q(SingleApplication); Q_Q(SingleApplication);
if (!connectionMap.contains( sock )){ if( !isFrameComplete( sock ) )
return; return;
}
ConnectionInfo &info = connectionMap[sock];
if( sock->bytesAvailable() < ( qint64 )info.msgLen ){
return;
}
// Read the message body // Read the message body
QByteArray msgBytes = sock->read(info.msgLen); QByteArray msgBytes = sock->readAll();
QDataStream readStream(msgBytes); QDataStream readStream(msgBytes);
#if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0)) #if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0))
@ -456,8 +473,9 @@ void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock )
return; return;
} }
ConnectionInfo &info = connectionMap[sock];
info.instanceId = instanceId; info.instanceId = instanceId;
info.stage = StageConnected; info.stage = StageConnectedHeader;
if( connectionType == NewInstance || if( connectionType == NewInstance ||
( connectionType == SecondaryInstance && ( connectionType == SecondaryInstance &&
@ -472,9 +490,16 @@ void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock )
void SingleApplicationPrivate::slotDataAvailable( QLocalSocket *dataSocket, quint32 instanceId ) void SingleApplicationPrivate::slotDataAvailable( QLocalSocket *dataSocket, quint32 instanceId )
{ {
Q_Q(SingleApplication); Q_Q(SingleApplication);
if ( !isFrameComplete( dataSocket ) )
return;
Q_EMIT q->receivedMessage( instanceId, dataSocket->readAll() ); Q_EMIT q->receivedMessage( instanceId, dataSocket->readAll() );
writeAck( dataSocket ); writeAck( dataSocket );
ConnectionInfo &info = connectionMap[dataSocket];
info.stage = StageConnectedHeader;
} }
void SingleApplicationPrivate::slotClientConnectionClosed( QLocalSocket *closedSocket, quint32 instanceId ) void SingleApplicationPrivate::slotClientConnectionClosed( QLocalSocket *closedSocket, quint32 instanceId )

View File

@ -61,9 +61,10 @@ public:
Reconnect = 3 Reconnect = 3
}; };
enum ConnectionStage : quint8 { enum ConnectionStage : quint8 {
StageHeader = 0, StageInitHeader = 0,
StageBody = 1, StageInitBody = 1,
StageConnected = 2, StageConnectedHeader = 2,
StageConnectedBody = 3,
}; };
Q_DECLARE_PUBLIC(SingleApplication) Q_DECLARE_PUBLIC(SingleApplication)
@ -79,10 +80,12 @@ public:
quint16 blockChecksum() const; quint16 blockChecksum() const;
qint64 primaryPid() const; qint64 primaryPid() const;
QString primaryUser() const; QString primaryUser() const;
void readInitMessageHeader(QLocalSocket *socket); bool isFrameComplete(QLocalSocket *sock);
void readMessageHeader(QLocalSocket *socket, ConnectionStage nextStage);
void readInitMessageBody(QLocalSocket *socket); void readInitMessageBody(QLocalSocket *socket);
bool writeConfirmedMessage(int msecs, const QByteArray &msg);
void writeAck(QLocalSocket *sock); void writeAck(QLocalSocket *sock);
bool writeConfirmedFrame(int msecs, const QByteArray &msg);
bool writeConfirmedMessage(int msecs, const QByteArray &msg);
static void randomSleep(); static void randomSleep();
void addAppData(const QString &data); void addAppData(const QString &data);
QStringList appData() const; QStringList appData() const;