summaryrefslogtreecommitdiffstats
path: root/src/corelib/io/qwindowspipereader.cpp
blob: 31d0dc141775ac3ebc049ae7b1a1e864d62c3c23 (plain)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
// Copyright (C) 2016 The Qt Company Ltd.// Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only#include"qwindowspipereader_p.h"#include <qcoreapplication.h>#include <QMutexLocker>#include <QPointer> QT_BEGIN_NAMESPACE using namespaceQt::StringLiterals;static const DWORD minReadBufferSize =4096;QWindowsPipeReader::QWindowsPipeReader(QObject *parent):QObject(parent),handle(INVALID_HANDLE_VALUE),eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),waitObject(NULL),readBufferMaxSize(0),actualReadBufferSize(0),pendingReadBytes(0),lastError(ERROR_SUCCESS),state(Stopped),readSequenceStarted(false),pipeBroken(true),readyReadPending(false),winEventActPosted(false){ZeroMemory(&overlapped,sizeof(OVERLAPPED)); overlapped.hEvent = eventHandle; waitObject =CreateThreadpoolWait(waitCallback,this, NULL);if(waitObject == NULL)qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed.");}QWindowsPipeReader::~QWindowsPipeReader(){stop();// Wait for thread pool callback to complete, as it can be still// executing some completion code.WaitForThreadpoolWaitCallbacks(waitObject, FALSE);CloseThreadpoolWait(waitObject);CloseHandle(eventHandle);CloseHandle(syncHandle);}/*! Sets the handle to read from. The handle must be valid. Do not call this function while the pipe is running. */voidQWindowsPipeReader::setHandle(HANDLE hPipeReadEnd){ readBuffer.clear(); actualReadBufferSize =0; readyReadPending =false; pendingReadBytes =0; handle = hPipeReadEnd; pipeBroken =false; lastError = ERROR_SUCCESS;}/*! Stops the asynchronous read sequence. If the read sequence is running then the I/O operation is canceled. */voidQWindowsPipeReader::stop(){cancelAsyncRead(Stopped); pipeBroken =true;}/*! Stops the asynchronous read sequence. Reads all pending bytes into the internal buffer. */voidQWindowsPipeReader::drainAndStop(){cancelAsyncRead(Draining); pipeBroken =true;}/*! Stops the asynchronous read sequence. Clears the internal buffer and discards any pending data. */voidQWindowsPipeReader::stopAndClear(){cancelAsyncRead(Stopped); readBuffer.clear(); actualReadBufferSize =0;// QLocalSocket is supposed to write data in the 'Closing'// state, so we don't set 'pipeBroken' flag here. Also, avoid// setting this flag in checkForReadyRead(). lastError = ERROR_SUCCESS;}/*! Stops the asynchronous read sequence. */voidQWindowsPipeReader::cancelAsyncRead(State newState){if(state != Running)return; mutex.lock(); state = newState;if(readSequenceStarted) {// This can legitimately fail due to the GetOverlappedResult()// in the callback not being locked. We ignore ERROR_NOT_FOUND// in this case.if(!CancelIoEx(handle, &overlapped)) {const DWORD dwError =GetLastError();if(dwError != ERROR_NOT_FOUND) {qErrnoWarning(dwError,"QWindowsPipeReader: CancelIoEx on handle %p failed.", handle);}}// Wait for callback to complete.do{ mutex.unlock();waitForNotification(); mutex.lock();}while(readSequenceStarted);} mutex.unlock();// Finish reading to keep the class state consistent. Note that// signals are not emitted in the call below, as the caller is// expected to do that synchronously.consumePending();}/*! Sets the size of internal read buffer. */voidQWindowsPipeReader::setMaxReadBufferSize(qint64 size){ QMutexLocker locker(&mutex); readBufferMaxSize = size;}/*! Returns \c true if async operation is in progress, there is pending data to read, or a read error is pending.*/boolQWindowsPipeReader::isReadOperationActive()const{ QMutexLocker locker(&mutex);return readSequenceStarted || readyReadPending || (lastError != ERROR_SUCCESS && !pipeBroken);}/*! Returns the number of bytes we've read so far. */ qint64 QWindowsPipeReader::bytesAvailable()const{return actualReadBufferSize;}/*! Copies at most \c{maxlen} bytes from the internal read buffer to \c{data}. */ qint64 QWindowsPipeReader::read(char*data, qint64 maxlen){ QMutexLocker locker(&mutex); qint64 readSoFar;// If startAsyncRead() has read data, copy it to its destination.if(maxlen ==1&& actualReadBufferSize >0) {*data = readBuffer.getChar(); actualReadBufferSize--; readSoFar =1;}else{ readSoFar = readBuffer.read(data,qMin(actualReadBufferSize, maxlen)); actualReadBufferSize -= readSoFar;}if(!pipeBroken) {startAsyncReadHelper(&locker);if(readSoFar ==0)return-2;// signal EWOULDBLOCK}return readSoFar;}/*! Reads a line from the internal buffer, but no more than \c{maxlen} characters. A terminating '\0' byte is always appended to \c{data}, so \c{maxlen} must be larger than 1. */ qint64 QWindowsPipeReader::readLine(char*data, qint64 maxlen){ QMutexLocker locker(&mutex); qint64 readSoFar =0;if(actualReadBufferSize >0) { readSoFar = readBuffer.readLine(data,qMin(actualReadBufferSize +1, maxlen)); actualReadBufferSize -= readSoFar;}if(!pipeBroken) {startAsyncReadHelper(&locker);if(readSoFar ==0)return-2;// signal EWOULDBLOCK}return readSoFar;}/*! Skips up to \c{maxlen} bytes from the internal read buffer. */ qint64 QWindowsPipeReader::skip(qint64 maxlen){ QMutexLocker locker(&mutex);const qint64 skippedSoFar = readBuffer.skip(qMin(actualReadBufferSize, maxlen)); actualReadBufferSize -= skippedSoFar;if(!pipeBroken) {startAsyncReadHelper(&locker);if(skippedSoFar ==0)return-2;// signal EWOULDBLOCK}return skippedSoFar;}/*! Returns \c true if a complete line of data can be read from the buffer. */boolQWindowsPipeReader::canReadLine()const{ QMutexLocker locker(&mutex);return readBuffer.indexOf('\n', actualReadBufferSize) >=0;}/*! Starts an asynchronous read sequence on the pipe. */voidQWindowsPipeReader::startAsyncRead(){ QMutexLocker locker(&mutex);startAsyncReadHelper(&locker);}voidQWindowsPipeReader::startAsyncReadHelper(QMutexLocker<QMutex> *locker){if(readSequenceStarted || lastError != ERROR_SUCCESS)return; state = Running;startAsyncReadLocked();// Do not post the event, if the read operation will be completed asynchronously.if(!readyReadPending && lastError == ERROR_SUCCESS)return;if(!winEventActPosted) { winEventActPosted =true; locker->unlock();QCoreApplication::postEvent(this,newQEvent(QEvent::WinEventAct));}else{ locker->unlock();}SetEvent(syncHandle);}/*! Starts a new read sequence. Thread-safety should be ensured by the caller. */voidQWindowsPipeReader::startAsyncReadLocked(){// Determine the number of bytes to read. qint64 bytesToRead =qMax(checkPipeState(), state == Running ? minReadBufferSize :0);// This can happen only while draining; just do nothing in this case.if(bytesToRead ==0)return;while(lastError == ERROR_SUCCESS) {if(readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { bytesToRead = readBufferMaxSize - readBuffer.size();if(bytesToRead <=0) {// Buffer is full. User must read data from the buffer// before we can read more from the pipe.return;}}char*ptr = readBuffer.reserve(bytesToRead);// ReadFile() returns true, if the read operation completes synchronously.// We don't need to call GetOverlappedResult() additionally, because// 'numberOfBytesRead' is valid in this case. DWORD numberOfBytesRead; DWORD errorCode = ERROR_SUCCESS;if(!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) { errorCode =GetLastError();if(errorCode == ERROR_IO_PENDING) {Q_ASSERT(state == Running);// Operation has been queued and will complete in the future. readSequenceStarted =true;SetThreadpoolWait(waitObject, eventHandle, NULL);return;}}if(!readCompleted(errorCode, numberOfBytesRead))return;// In the 'Draining' state, we have to get all the data with one call// to ReadFile(). Note that message mode pipes are not supported here.if(state == Draining) {Q_ASSERT(bytesToRead ==qint64(numberOfBytesRead));return;}// We need to loop until all pending data has been read and an// operation is queued for asynchronous completion.// If the pipe is configured to work in message mode, we read// the data in chunks. bytesToRead =qMax(checkPipeState(), minReadBufferSize);}}/*! \internal Thread pool callback procedure. */voidQWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WAIT wait, TP_WAIT_RESULT waitResult){Q_UNUSED(instance);Q_UNUSED(wait);Q_UNUSED(waitResult); QWindowsPipeReader *pipeReader =reinterpret_cast<QWindowsPipeReader *>(context);// Get the result of the asynchronous operation. DWORD numberOfBytesTransfered =0; DWORD errorCode = ERROR_SUCCESS;if(!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,&numberOfBytesTransfered, FALSE)) errorCode =GetLastError(); pipeReader->mutex.lock(); pipeReader->readSequenceStarted =false;// Do not overwrite error code, if error has been detected by// checkPipeState() in waitForPipeClosed(). Also, if the reader was// stopped, the only reason why this function can be called is the// completion of a cancellation. No signals should be emitted, and// no new read sequence should be started in this case.if(pipeReader->lastError == ERROR_SUCCESS && pipeReader->state != Stopped) {// Ignore ERROR_OPERATION_ABORTED. We have canceled the I/O operation// specifically for flushing the pipe.if(pipeReader->state == Draining && errorCode == ERROR_OPERATION_ABORTED) errorCode = ERROR_SUCCESS;if(pipeReader->readCompleted(errorCode, numberOfBytesTransfered)) pipeReader->startAsyncReadLocked();if(pipeReader->state == Running && !pipeReader->winEventActPosted) { pipeReader->winEventActPosted =true; pipeReader->mutex.unlock();QCoreApplication::postEvent(pipeReader,newQEvent(QEvent::WinEventAct));}else{ pipeReader->mutex.unlock();}}else{ pipeReader->mutex.unlock();}// We set the event only after unlocking to avoid additional context// switches due to the released thread immediately running into the lock.SetEvent(pipeReader->syncHandle);}/*! Will be called whenever the read operation completes. Returns \c true if no error occurred; otherwise returns \c false. */boolQWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead){// ERROR_MORE_DATA is not an error. We're connected to a message mode// pipe and the message didn't fit into the pipe's system// buffer. We will read the remaining data in the next call.if(errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) { readyReadPending =true; pendingReadBytes += numberOfBytesRead; readBuffer.truncate(actualReadBufferSize + pendingReadBytes);return true;} lastError = errorCode;return false;}/*! Receives notification that the read operation has completed. */boolQWindowsPipeReader::event(QEvent *e){if(e->type() ==QEvent::WinEventAct) {consumePendingAndEmit(true);return true;}returnQObject::event(e);}/*! Updates the read buffer size and emits pending signals in the main thread. Returns \c true, if readyRead() was emitted. */boolQWindowsPipeReader::consumePendingAndEmit(bool allowWinActPosting){ResetEvent(syncHandle); mutex.lock();// Enable QEvent::WinEventAct posting.if(allowWinActPosting) winEventActPosted =false;const bool emitReadyRead =consumePending();const DWORD dwError = lastError; mutex.unlock();// Trigger 'pipeBroken' only once. This flag must be updated before// emitting the readyRead() signal. Otherwise, the read sequence will// be considered not finished, and we may hang if a slot connected// to readyRead() calls waitForReadyRead().const bool emitPipeClosed = (dwError != ERROR_SUCCESS && !pipeBroken);if(emitPipeClosed) pipeBroken =true;// Disable any further processing, if the pipe was stopped.// We are not allowed to emit signals in either 'Stopped'// or 'Draining' state.if(state != Running)return false;if(!emitPipeClosed) {if(emitReadyRead) emit readyRead();}else{ QPointer<QWindowsPipeReader>alive(this);if(emitReadyRead) emit readyRead();if(alive && dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED) emit winError(dwError,"QWindowsPipeReader::consumePendingAndEmit"_L1);if(alive) emit pipeClosed();}return emitReadyRead;}/*! Updates the read buffer size. Returns \c true, if readyRead() should be emitted. Thread-safety should be ensured by the caller. */boolQWindowsPipeReader::consumePending(){if(readyReadPending) { readyReadPending =false; actualReadBufferSize += pendingReadBytes; pendingReadBytes =0;return true;}return false;}/*! Returns the number of available bytes in the pipe. */ DWORD QWindowsPipeReader::checkPipeState(){ DWORD bytes;if(PeekNamedPipe(handle,nullptr,0,nullptr, &bytes,nullptr))return bytes; lastError =GetLastError();return0;}boolQWindowsPipeReader::waitForNotification(){ DWORD waitRet;do{ waitRet =WaitForSingleObjectEx(syncHandle, INFINITE, TRUE);if(waitRet == WAIT_OBJECT_0)return true;// Some I/O completion routine was called. Wait some more.}while(waitRet == WAIT_IO_COMPLETION);return false;} QT_END_NAMESPACE #include"moc_qwindowspipereader_p.cpp"
close