Merge pull request #133 from njeisecke/master

Fix protocol to solve #121 and #125
This commit is contained in:
Itay Grudev 2021-06-01 23:59:59 +03:00 committed by GitHub
commit c557da5d0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 34 deletions

View File

@ -248,10 +248,7 @@ bool SingleApplication::sendMessage( const QByteArray &message, int timeout )
if( ! d->connectToPrimary( timeout, SingleApplicationPrivate::Reconnect ) ) if( ! d->connectToPrimary( timeout, SingleApplicationPrivate::Reconnect ) )
return false; return false;
d->socket->write( message ); return d->writeConfirmedMessage( timeout, message );
bool dataWritten = d->socket->waitForBytesWritten( timeout );
d->socket->flush();
return dataWritten;
} }
/** /**

View File

@ -263,20 +263,46 @@ bool SingleApplicationPrivate::connectToPrimary( int msecs, ConnectionType conne
#endif #endif
writeStream << checksum; writeStream << checksum;
// The header indicates the message length that follows return writeConfirmedMessage( static_cast<int>(msecs - time.elapsed()), initMsg );
}
void SingleApplicationPrivate::writeAck( QLocalSocket *sock ) {
sock->putChar('\n');
}
bool SingleApplicationPrivate::writeConfirmedMessage (int msecs, const QByteArray &msg)
{
QElapsedTimer time;
time.start();
// Frame 1: The header indicates the message length that follows
QByteArray header; QByteArray header;
QDataStream headerStream(&header, QIODevice::WriteOnly); QDataStream headerStream(&header, QIODevice::WriteOnly);
#if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0)) #if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0))
headerStream.setVersion(QDataStream::Qt_5_6); headerStream.setVersion(QDataStream::Qt_5_6);
#endif #endif
headerStream << static_cast <quint64>( initMsg.length() ); headerStream << static_cast <quint64>( msg.length() );
socket->write( header ); if( ! writeConfirmedFrame( static_cast<int>(msecs - time.elapsed()), header ))
socket->write( initMsg ); return false;
bool result = socket->waitForBytesWritten( static_cast<int>(msecs - time.elapsed()) );
// Frame 2: The message
return writeConfirmedFrame( static_cast<int>(msecs - time.elapsed()), msg );
}
bool SingleApplicationPrivate::writeConfirmedFrame( int msecs, const QByteArray &msg )
{
socket->write( msg );
socket->flush(); socket->flush();
return result;
bool result = socket->waitForReadyRead( msecs ); // await ack byte
if (result) {
socket->read( 1 );
return true;
}
return false;
} }
quint16 SingleApplicationPrivate::blockChecksum() const quint16 SingleApplicationPrivate::blockChecksum() const
@ -340,13 +366,16 @@ void SingleApplicationPrivate::slotConnectionEstablished()
[nextConnSocket, this](){ [nextConnSocket, this](){
auto &info = connectionMap[nextConnSocket]; auto &info = connectionMap[nextConnSocket];
switch(info.stage){ switch(info.stage){
case StageHeader: case StageInitHeader:
readInitMessageHeader(nextConnSocket); readMessageHeader( nextConnSocket, StageInitBody );
break; break;
case StageBody: case StageInitBody:
readInitMessageBody(nextConnSocket); readInitMessageBody(nextConnSocket);
break; break;
case StageConnected: case StageConnectedHeader:
readMessageHeader( nextConnSocket, StageConnectedBody );
break;
case StageConnectedBody:
this->slotDataAvailable( nextConnSocket, info.instanceId ); this->slotDataAvailable( nextConnSocket, info.instanceId );
break; break;
default: default:
@ -356,7 +385,7 @@ void SingleApplicationPrivate::slotConnectionEstablished()
); );
} }
void SingleApplicationPrivate::readInitMessageHeader( QLocalSocket *sock ) void SingleApplicationPrivate::readMessageHeader( QLocalSocket *sock, SingleApplicationPrivate::ConnectionStage nextStage )
{ {
if (!connectionMap.contains( sock )){ if (!connectionMap.contains( sock )){
return; return;
@ -376,29 +405,35 @@ void SingleApplicationPrivate::readInitMessageHeader( QLocalSocket *sock )
quint64 msgLen = 0; quint64 msgLen = 0;
headerStream >> msgLen; headerStream >> msgLen;
ConnectionInfo &info = connectionMap[sock]; ConnectionInfo &info = connectionMap[sock];
info.stage = StageBody; info.stage = nextStage;
info.msgLen = msgLen; info.msgLen = msgLen;
if ( sock->bytesAvailable() >= (qint64) msgLen ){ writeAck( sock );
readInitMessageBody( sock );
} }
bool SingleApplicationPrivate::isFrameComplete( QLocalSocket *sock )
{
if (!connectionMap.contains( sock )){
return false;
}
ConnectionInfo &info = connectionMap[sock];
if( sock->bytesAvailable() < ( qint64 )info.msgLen ){
return false;
}
return true;
} }
void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock ) void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock )
{ {
Q_Q(SingleApplication); Q_Q(SingleApplication);
if (!connectionMap.contains( sock )){ if( !isFrameComplete( sock ) )
return; return;
}
ConnectionInfo &info = connectionMap[sock];
if( sock->bytesAvailable() < ( qint64 )info.msgLen ){
return;
}
// Read the message body // Read the message body
QByteArray msgBytes = sock->read(info.msgLen); QByteArray msgBytes = sock->readAll();
QDataStream readStream(msgBytes); QDataStream readStream(msgBytes);
#if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0)) #if (QT_VERSION >= QT_VERSION_CHECK(5, 6, 0))
@ -438,8 +473,9 @@ void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock )
return; return;
} }
ConnectionInfo &info = connectionMap[sock];
info.instanceId = instanceId; info.instanceId = instanceId;
info.stage = StageConnected; info.stage = StageConnectedHeader;
if( connectionType == NewInstance || if( connectionType == NewInstance ||
( connectionType == SecondaryInstance && ( connectionType == SecondaryInstance &&
@ -448,15 +484,22 @@ void SingleApplicationPrivate::readInitMessageBody( QLocalSocket *sock )
Q_EMIT q->instanceStarted(); Q_EMIT q->instanceStarted();
} }
if (sock->bytesAvailable() > 0){ writeAck( sock );
this->slotDataAvailable( sock, instanceId );
}
} }
void SingleApplicationPrivate::slotDataAvailable( QLocalSocket *dataSocket, quint32 instanceId ) void SingleApplicationPrivate::slotDataAvailable( QLocalSocket *dataSocket, quint32 instanceId )
{ {
Q_Q(SingleApplication); Q_Q(SingleApplication);
if ( !isFrameComplete( dataSocket ) )
return;
Q_EMIT q->receivedMessage( instanceId, dataSocket->readAll() ); Q_EMIT q->receivedMessage( instanceId, dataSocket->readAll() );
writeAck( dataSocket );
ConnectionInfo &info = connectionMap[dataSocket];
info.stage = StageConnectedHeader;
} }
void SingleApplicationPrivate::slotClientConnectionClosed( QLocalSocket *closedSocket, quint32 instanceId ) void SingleApplicationPrivate::slotClientConnectionClosed( QLocalSocket *closedSocket, quint32 instanceId )

View File

@ -61,9 +61,10 @@ public:
Reconnect = 3 Reconnect = 3
}; };
enum ConnectionStage : quint8 { enum ConnectionStage : quint8 {
StageHeader = 0, StageInitHeader = 0,
StageBody = 1, StageInitBody = 1,
StageConnected = 2, StageConnectedHeader = 2,
StageConnectedBody = 3,
}; };
Q_DECLARE_PUBLIC(SingleApplication) Q_DECLARE_PUBLIC(SingleApplication)
@ -79,8 +80,12 @@ public:
quint16 blockChecksum() const; quint16 blockChecksum() const;
qint64 primaryPid() const; qint64 primaryPid() const;
QString primaryUser() const; QString primaryUser() const;
void readInitMessageHeader(QLocalSocket *socket); bool isFrameComplete(QLocalSocket *sock);
void readMessageHeader(QLocalSocket *socket, ConnectionStage nextStage);
void readInitMessageBody(QLocalSocket *socket); void readInitMessageBody(QLocalSocket *socket);
void writeAck(QLocalSocket *sock);
bool writeConfirmedFrame(int msecs, const QByteArray &msg);
bool writeConfirmedMessage(int msecs, const QByteArray &msg);
static void randomSleep(); static void randomSleep();
void addAppData(const QString &data); void addAppData(const QString &data);
QStringList appData() const; QStringList appData() const;