123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 | // Copyright (C) 2016 The Qt Company Ltd.// Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only#include"qwindowspipewriter_p.h"#include <qcoreapplication.h>#include <QMutexLocker>#include <QPointer> QT_BEGIN_NAMESPACE QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent):QObject(parent),handle(pipeWriteEnd),eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),waitObject(NULL),pendingBytesWrittenValue(0),lastError(ERROR_SUCCESS),completionState(NoError),stopped(true),writeSequenceStarted(false),bytesWrittenPending(false),winEventActPosted(false){ZeroMemory(&overlapped,sizeof(OVERLAPPED)); overlapped.hEvent = eventHandle; waitObject =CreateThreadpoolWait(waitCallback,this, NULL);if(waitObject == NULL)qErrnoWarning("QWindowsPipeWriter: CreateThreadpollWait failed.");}QWindowsPipeWriter::~QWindowsPipeWriter(){stop();CloseThreadpoolWait(waitObject);CloseHandle(eventHandle);CloseHandle(syncHandle);}/*! Assigns the handle to this writer. The handle must be valid. Call this function if data was buffered before getting the handle. */voidQWindowsPipeWriter::setHandle(HANDLE hPipeWriteEnd){Q_ASSERT(!stopped); handle = hPipeWriteEnd; QMutexLocker locker(&mutex);startAsyncWriteHelper(&locker);}/*! Stops the asynchronous write sequence. If the write sequence is running then the I/O operation is canceled. */voidQWindowsPipeWriter::stop(){if(stopped)return; mutex.lock(); stopped =true;if(writeSequenceStarted) {// Trying to disable callback before canceling the operation.// Callback invocation is unnecessary here.SetThreadpoolWait(waitObject, NULL, NULL);if(!CancelIoEx(handle, &overlapped)) {const DWORD dwError =GetLastError();if(dwError != ERROR_NOT_FOUND) {qErrnoWarning(dwError,"QWindowsPipeWriter: CancelIoEx on handle %p failed.", handle);}} writeSequenceStarted =false;} mutex.unlock();WaitForThreadpoolWaitCallbacks(waitObject, TRUE);}/*! Returns the number of bytes that are waiting to be written. */ qint64 QWindowsPipeWriter::bytesToWrite()const{ QMutexLocker locker(&mutex);return writeBuffer.size() + pendingBytesWrittenValue;}/*! Returns \c true if async operation is in progress.*/boolQWindowsPipeWriter::isWriteOperationActive()const{return completionState == NoError &&bytesToWrite() !=0;}/*! Writes a shallow copy of \a ba to the internal buffer. */voidQWindowsPipeWriter::write(const QByteArray &ba){if(completionState != WriteDisabled)writeImpl(ba);}/*! Writes data to the internal buffer. */voidQWindowsPipeWriter::write(const char*data, qint64 size){if(completionState != WriteDisabled)writeImpl(data, size);}template<typename... Args>inlinevoidQWindowsPipeWriter::writeImpl(Args... args){ QMutexLocker locker(&mutex); writeBuffer.append(args...);if(writeSequenceStarted || (lastError != ERROR_SUCCESS))return; stopped =false;// If we don't have an assigned handle yet, defer writing until// setHandle() is called.if(handle != INVALID_HANDLE_VALUE)startAsyncWriteHelper(&locker);}voidQWindowsPipeWriter::startAsyncWriteHelper(QMutexLocker<QMutex> *locker){startAsyncWriteLocked();// Do not post the event, if the write operation will be completed asynchronously.if(!bytesWrittenPending && lastError == ERROR_SUCCESS)return;notifyCompleted(locker);}/*! Starts a new write sequence. */voidQWindowsPipeWriter::startAsyncWriteLocked(){while(!writeBuffer.isEmpty()) {// WriteFile() returns true, if the write operation completes synchronously.// We don't need to call GetOverlappedResult() additionally, because// 'numberOfBytesWritten' is valid in this case. DWORD numberOfBytesWritten; DWORD errorCode = ERROR_SUCCESS;if(!WriteFile(handle, writeBuffer.readPointer(), writeBuffer.nextDataBlockSize(),&numberOfBytesWritten, &overlapped)) { errorCode =GetLastError();if(errorCode == ERROR_IO_PENDING) {// Operation has been queued and will complete in the future. writeSequenceStarted =true;SetThreadpoolWait(waitObject, eventHandle, NULL);break;}}if(!writeCompleted(errorCode, numberOfBytesWritten))break;}}/*! \internal Thread pool callback procedure. */voidQWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WAIT wait, TP_WAIT_RESULT waitResult){Q_UNUSED(instance);Q_UNUSED(wait);Q_UNUSED(waitResult); QWindowsPipeWriter *pipeWriter =reinterpret_cast<QWindowsPipeWriter *>(context);// Get the result of the asynchronous operation. DWORD numberOfBytesTransfered =0; DWORD errorCode = ERROR_SUCCESS;if(!GetOverlappedResult(pipeWriter->handle, &pipeWriter->overlapped,&numberOfBytesTransfered, FALSE)) errorCode =GetLastError(); QMutexLocker locker(&pipeWriter->mutex);// After the writer was stopped, the only reason why this function can be called is the// completion of a cancellation. No signals should be emitted, and no new write sequence// should be started in this case.if(pipeWriter->stopped)return; pipeWriter->writeSequenceStarted =false;if(pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered)) pipeWriter->startAsyncWriteLocked();// We post the notification even if the write operation failed,// to unblock the main thread, in case it is waiting for the event. pipeWriter->notifyCompleted(&locker);}/*! Will be called whenever the write operation completes. Returns \c true if no error occurred; otherwise returns \c false. */boolQWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten){switch(errorCode) {case ERROR_SUCCESS: bytesWrittenPending =true; pendingBytesWrittenValue += numberOfBytesWritten; writeBuffer.free(numberOfBytesWritten);return true;case ERROR_PIPE_NOT_CONNECTED:// the other end has closed the pipecase ERROR_OPERATION_ABORTED:// the operation was canceledcase ERROR_NO_DATA:// the pipe is being closedbreak;default:qErrnoWarning(errorCode,"QWindowsPipeWriter: write failed.");break;}// The buffer is not cleared here, because the write progress// should appear on the main thread synchronously. lastError = errorCode;return false;}/*! Posts a notification event to the main thread. */voidQWindowsPipeWriter::notifyCompleted(QMutexLocker<QMutex> *locker){if(!winEventActPosted) { winEventActPosted =true; locker->unlock();QCoreApplication::postEvent(this,newQEvent(QEvent::WinEventAct));}else{ locker->unlock();}// We set the event only after unlocking to avoid additional context// switches due to the released thread immediately running into the lock.SetEvent(syncHandle);}/*! Receives notification that the write operation has completed. */boolQWindowsPipeWriter::event(QEvent *e){if(e->type() ==QEvent::WinEventAct) {consumePendingAndEmit(true);return true;}returnQObject::event(e);}/*! Updates the state and emits pending signals in the main thread. Returns \c true, if bytesWritten() was emitted. */boolQWindowsPipeWriter::consumePendingAndEmit(bool allowWinActPosting){ResetEvent(syncHandle); QMutexLocker locker(&mutex);// Enable QEvent::WinEventAct posting.if(allowWinActPosting) winEventActPosted =false;const qint64 numberOfBytesWritten = pendingBytesWrittenValue;const bool emitBytesWritten = bytesWrittenPending;if(emitBytesWritten) { bytesWrittenPending =false; pendingBytesWrittenValue =0;}const DWORD dwError = lastError; locker.unlock();// Disable any further processing, if the pipe was stopped.if(stopped)return false;// Trigger 'ErrorDetected' state only once. This state must be set before// emitting the bytesWritten() signal. Otherwise, the write sequence will// be considered not finished, and we may hang if a slot connected// to bytesWritten() calls waitForBytesWritten().if(dwError != ERROR_SUCCESS && completionState == NoError) { QPointer<QWindowsPipeWriter>alive(this); completionState = ErrorDetected;if(emitBytesWritten) emit bytesWritten(numberOfBytesWritten);if(alive) { writeBuffer.clear(); completionState = WriteDisabled; emit writeFailed();}}else if(emitBytesWritten) { emit bytesWritten(numberOfBytesWritten);}return emitBytesWritten;} QT_END_NAMESPACE #include"moc_qwindowspipewriter_p.cpp"
|