/* $Id: aiomgr.cpp 46246 2013-05-23 19:15:44Z vboxsync $ */ /** @file * IPRT - Async I/O manager. */ /* * Copyright (C) 2013 Oracle Corporation * * This file is part of VirtualBox Open Source Edition (OSE), as * available from http://www.virtualbox.org. This file is free software; * you can redistribute it and/or modify it under the terms of the GNU * General Public License (GPL) as published by the Free Software * Foundation, in version 2 as it comes in the "COPYING" file of the * VirtualBox OSE distribution. VirtualBox OSE is distributed in the * hope that it will be useful, but WITHOUT ANY WARRANTY of any kind. * * The contents of this file may alternatively be used under the terms * of the Common Development and Distribution License Version 1.0 * (CDDL) only, as it comes in the "COPYING.CDDL" file of the * VirtualBox OSE distribution, in which case the provisions of the * CDDL are applicable instead of those of the GPL. * * You may elect to license modified versions of this file under the * terms and conditions of either the GPL or the CDDL or both. */ /******************************************************************************* * Header Files * *******************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "internal/magics.h" /******************************************************************************* * Structures and Typedefs * *******************************************************************************/ /** Pointer to an internal async I/O file instance. */ typedef struct RTAIOMGRFILEINT *PRTAIOMGRFILEINT; /** * Blocking event types. */ typedef enum RTAIOMGREVENT { /** Invalid tye */ RTAIOMGREVENT_INVALID = 0, /** No event pending. */ RTAIOMGREVENT_NO_EVENT, /** A file is added to the manager. */ RTAIOMGREVENT_FILE_ADD, /** A file is about to be closed. */ RTAIOMGREVENT_FILE_CLOSE, /** The async I/O manager is shut down. */ RTAIOMGREVENT_SHUTDOWN, /** 32bit hack */ RTAIOMGREVENT_32BIT_HACK = 0x7fffffff } RTAIOMGREVENT; /** * Async I/O manager instance data. */ typedef struct RTAIOMGRINT { /** Magic value. */ uint32_t u32Magic; /** Reference count. */ volatile uint32_t cRefs; /** Async I/O context handle. */ RTFILEAIOCTX hAioCtx; /** async I/O thread. */ RTTHREAD hThread; /** List of files assigned to this manager. */ RTLISTANCHOR ListFiles; /** Number of requests active currently. */ unsigned cReqsActive; /** Number of maximum requests active. */ uint32_t cReqsActiveMax; /** Memory cache for requests. */ RTMEMCACHE hMemCacheReqs; /** Critical section protecting the blocking event handling. */ RTCRITSECT CritSectBlockingEvent; /** Event semaphore for blocking external events. * The caller waits on it until the async I/O manager * finished processing the event. */ RTSEMEVENT hEventSemBlock; /** Blocking event type */ volatile RTAIOMGREVENT enmBlockingEvent; /** Event type data */ union { /** The file to be added */ volatile PRTAIOMGRFILEINT pFileAdd; /** The file to be closed */ volatile PRTAIOMGRFILEINT pFileClose; } BlockingEventData; } RTAIOMGRINT; /** Pointer to an internal async I/O manager instance. */ typedef RTAIOMGRINT *PRTAIOMGRINT; /** * Async I/O manager file instance data. */ typedef struct RTAIOMGRFILEINT { /** Magic value. */ uint32_t u32Magic; /** Reference count. */ volatile uint32_t cRefs; /** Flags. */ uint32_t fFlags; /** Opaque user data passed on creation. */ void *pvUser; /** File handle. */ RTFILE hFile; /** async I/O manager this file belongs to. */ PRTAIOMGRINT pAioMgr; /** Work queue for new requests. */ RTQUEUEATOMIC QueueReqs; /** Completion callback for this file. */ PFNRTAIOMGRREQCOMPLETE pfnReqCompleted; /** Data for exclusive use by the assigned async I/O manager. */ struct { /** List node of assigned files for a async I/O manager. */ RTLISTNODE NodeAioMgrFiles; /** List of requests waiting for submission. */ RTLISTANCHOR ListWaitingReqs; /** Number of requests currently being processed for this endpoint * (excluded flush requests). */ unsigned cReqsActive; } AioMgr; } RTAIOMGRFILEINT; /** Flag whether the file is closed. */ #define RTAIOMGRFILE_FLAGS_CLOSING RT_BIT_32(1) /** * Request type. */ typedef enum RTAIOMGRREQTYPE { /** Invalid request type. */ RTAIOMGRREQTYPE_INVALID = 0, /** Read reques type. */ RTAIOMGRREQTYPE_READ, /** Write request. */ RTAIOMGRREQTYPE_WRITE, /** Flush request. */ RTAIOMGRREQTYPE_FLUSH, /** Prefetech request. */ RTAIOMGRREQTYPE_PREFETCH, /** 32bit hack. */ RTAIOMGRREQTYPE_32BIT_HACK = 0x7fffffff } RTAIOMGRREQTYPE; /** Pointer to a reques type. */ typedef RTAIOMGRREQTYPE *PRTAIOMGRREQTYPE; /** * Async I/O manager request. */ typedef struct RTAIOMGRREQ { /** Atomic queue work item. */ RTQUEUEATOMICITEM WorkItem; /** Node for a waiting list. */ RTLISTNODE NodeWaitingList; /** Request flags. */ uint32_t fFlags; /** Transfer type. */ RTAIOMGRREQTYPE enmType; /** Assigned file request. */ RTFILEAIOREQ hReqIo; /** File the request belongs to. */ PRTAIOMGRFILEINT pFile; /** Opaque user data. */ void *pvUser; /** Start offset */ RTFOFF off; /** Data segment. */ RTSGSEG DataSeg; /** When non-zero the segment uses a bounce buffer because the provided buffer * doesn't meet host requirements. */ size_t cbBounceBuffer; /** Pointer to the used bounce buffer if any. */ void *pvBounceBuffer; /** Start offset in the bounce buffer to copy from. */ uint32_t offBounceBuffer; } RTAIOMGRREQ; /** Pointer to a I/O manager request. */ typedef RTAIOMGRREQ *PRTAIOMGRREQ; /** Flag whether the request was prepared already. */ #define RTAIOMGRREQ_FLAGS_PREPARED RT_BIT_32(0) /******************************************************************************* * Defined Constants And Macros * *******************************************************************************/ /** Validates a handle and returns VERR_INVALID_HANDLE if not valid. */ #define RTAIOMGR_VALID_RETURN_RC(a_hAioMgr, a_rc) \ do { \ AssertPtrReturn((a_hAioMgr), (a_rc)); \ AssertReturn((a_hAioMgr)->u32Magic == RTAIOMGR_MAGIC, (a_rc)); \ } while (0) /** Validates a handle and returns VERR_INVALID_HANDLE if not valid. */ #define RTAIOMGR_VALID_RETURN(a_hAioMgr) RTAIOMGR_VALID_RETURN_RC((hAioMgr), VERR_INVALID_HANDLE) /** Validates a handle and returns (void) if not valid. */ #define RTAIOMGR_VALID_RETURN_VOID(a_hAioMgr) \ do { \ AssertPtrReturnVoid(a_hAioMgr); \ AssertReturnVoid((a_hAioMgr)->u32Magic == RTAIOMGR_MAGIC); \ } while (0) /******************************************************************************* * Internal Functions * *******************************************************************************/ static int rtAioMgrReqsEnqueue(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile, PRTFILEAIOREQ pahReqs, unsigned cReqs); /** * Removes an endpoint from the currently assigned manager. * * @returns TRUE if there are still requests pending on the current manager for this endpoint. * FALSE otherwise. * @param pEndpointRemove The endpoint to remove. */ static bool rtAioMgrFileRemove(PRTAIOMGRFILEINT pFile) { /* Make sure that there is no request pending on this manager for the endpoint. */ if (!pFile->AioMgr.cReqsActive) { RTListNodeRemove(&pFile->AioMgr.NodeAioMgrFiles); return false; } return true; } /** * Allocate a new I/O request. * * @returns Pointer to the allocated request or NULL if out of memory. * @param pThis The async I/O manager instance. */ static PRTAIOMGRREQ rtAioMgrReqAlloc(PRTAIOMGRINT pThis) { return (PRTAIOMGRREQ)RTMemCacheAlloc(pThis->hMemCacheReqs); } /** * Frees an I/O request. * * @returns nothing. * @param pThis The async I/O manager instance. * @param pReq The request to free. */ static void rtAioMgrReqFree(PRTAIOMGRINT pThis, PRTAIOMGRREQ pReq) { if (pReq->cbBounceBuffer) { AssertPtr(pReq->pvBounceBuffer); RTMemPageFree(pReq->pvBounceBuffer, pReq->cbBounceBuffer); pReq->pvBounceBuffer = NULL; pReq->cbBounceBuffer = 0; } pReq->fFlags = 0; RTAioMgrFileRelease(pReq->pFile); RTMemCacheFree(pThis->hMemCacheReqs, pReq); } static void rtAioMgrReqCompleteRc(PRTAIOMGRINT pThis, PRTAIOMGRREQ pReq, int rcReq, size_t cbTransfered) { int rc = VINF_SUCCESS; PRTAIOMGRFILEINT pFile; pFile = pReq->pFile; pThis->cReqsActive--; pFile->AioMgr.cReqsActive--; /* * It is possible that the request failed on Linux with kernels < 2.6.23 * if the passed buffer was allocated with remap_pfn_range or if the file * is on an NFS endpoint which does not support async and direct I/O at the same time. * The endpoint will be migrated to a failsafe manager in case a request fails. */ if (RT_FAILURE(rcReq)) { pFile->pfnReqCompleted(pFile, rcReq, pReq->pvUser); rtAioMgrReqFree(pThis, pReq); } else { /* * Restart an incomplete transfer. * This usually means that the request will return an error now * but to get the cause of the error (disk full, file too big, I/O error, ...) * the transfer needs to be continued. */ if (RT_UNLIKELY( cbTransfered < pReq->DataSeg.cbSeg || ( pReq->cbBounceBuffer && cbTransfered < pReq->cbBounceBuffer))) { RTFOFF offStart; size_t cbToTransfer; uint8_t *pbBuf = NULL; Assert(cbTransfered % 512 == 0); if (pReq->cbBounceBuffer) { AssertPtr(pReq->pvBounceBuffer); offStart = (pReq->off & ~((RTFOFF)512-1)) + cbTransfered; cbToTransfer = pReq->cbBounceBuffer - cbTransfered; pbBuf = (uint8_t *)pReq->pvBounceBuffer + cbTransfered; } else { Assert(!pReq->pvBounceBuffer); offStart = pReq->off + cbTransfered; cbToTransfer = pReq->DataSeg.cbSeg - cbTransfered; pbBuf = (uint8_t *)pReq->DataSeg.pvSeg + cbTransfered; } if ( pReq->enmType == RTAIOMGRREQTYPE_PREFETCH || pReq->enmType == RTAIOMGRREQTYPE_READ) { rc = RTFileAioReqPrepareRead(pReq->hReqIo, pFile->hFile, offStart, pbBuf, cbToTransfer, pReq); } else { AssertMsg(pReq->enmType == RTAIOMGRREQTYPE_WRITE, ("Invalid transfer type\n")); rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile, offStart, pbBuf, cbToTransfer, pReq); } AssertRC(rc); rc = rtAioMgrReqsEnqueue(pThis, pFile, &pReq->hReqIo, 1); AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES), ("Unexpected return code rc=%Rrc\n", rc)); } else if (pReq->enmType == RTAIOMGRREQTYPE_PREFETCH) { Assert(pReq->cbBounceBuffer); pReq->enmType = RTAIOMGRREQTYPE_WRITE; memcpy(((uint8_t *)pReq->pvBounceBuffer) + pReq->offBounceBuffer, pReq->DataSeg.pvSeg, pReq->DataSeg.cbSeg); /* Write it now. */ RTFOFF offStart = pReq->off & ~(RTFOFF)(512-1); size_t cbToTransfer = RT_ALIGN_Z(pReq->DataSeg.cbSeg + (pReq->off - offStart), 512); rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile, offStart, pReq->pvBounceBuffer, cbToTransfer, pReq); AssertRC(rc); rc = rtAioMgrReqsEnqueue(pThis, pFile, &pReq->hReqIo, 1); AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES), ("Unexpected return code rc=%Rrc\n", rc)); } else { if (RT_SUCCESS(rc) && pReq->cbBounceBuffer) { if (pReq->enmType == RTAIOMGRREQTYPE_READ) memcpy(pReq->DataSeg.pvSeg, ((uint8_t *)pReq->pvBounceBuffer) + pReq->offBounceBuffer, pReq->DataSeg.cbSeg); } /* Call completion callback */ pFile->pfnReqCompleted(pFile, rcReq, pReq->pvUser); rtAioMgrReqFree(pThis, pReq); } } /* request completed successfully */ } /** * Wrapper around rtAioMgrReqCompleteRc(). */ static void rtAioMgrReqComplete(PRTAIOMGRINT pThis, RTFILEAIOREQ hReq) { size_t cbTransfered = 0; int rcReq = RTFileAioReqGetRC(hReq, &cbTransfered); PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(hReq); rtAioMgrReqCompleteRc(pThis, pReq, rcReq, cbTransfered); } /** * Wrapper around RTFIleAioCtxSubmit() which is also doing error handling. */ static int rtAioMgrReqsEnqueue(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile, PRTFILEAIOREQ pahReqs, unsigned cReqs) { pThis->cReqsActive += cReqs; pFile->AioMgr.cReqsActive += cReqs; int rc = RTFileAioCtxSubmit(pThis->hAioCtx, pahReqs, cReqs); if (RT_FAILURE(rc)) { if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES) { /* Append any not submitted task to the waiting list. */ for (size_t i = 0; i < cReqs; i++) { int rcReq = RTFileAioReqGetRC(pahReqs[i], NULL); if (rcReq != VERR_FILE_AIO_IN_PROGRESS) { PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(pahReqs[i]); Assert(pReq->hReqIo == pahReqs[i]); RTListAppend(&pFile->AioMgr.ListWaitingReqs, &pReq->NodeWaitingList); pThis->cReqsActive--; pFile->AioMgr.cReqsActive--; } } pThis->cReqsActiveMax = pThis->cReqsActive; rc = VINF_SUCCESS; } else /* Another kind of error happened (full disk, ...) */ { /* An error happened. Find out which one caused the error and resubmit all other tasks. */ for (size_t i = 0; i < cReqs; i++) { PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)RTFileAioReqGetUser(pahReqs[i]); int rcReq = RTFileAioReqGetRC(pahReqs[i], NULL); if (rcReq == VERR_FILE_AIO_NOT_SUBMITTED) { /* We call ourself again to do any error handling which might come up now. */ rc = rtAioMgrReqsEnqueue(pThis, pFile, &pahReqs[i], 1); AssertRC(rc); } else if (rcReq != VERR_FILE_AIO_IN_PROGRESS) rtAioMgrReqCompleteRc(pThis, pReq, rcReq, 0); } } } return VINF_SUCCESS; } /** * Adds a list of requests to the waiting list. * * @returns nothing. * @param pFile The file instance to add the requests to. * @param pReqsHead The head of the request list to add. */ static void rtAioMgrFileAddReqsToWaitingList(PRTAIOMGRFILEINT pFile, PRTAIOMGRREQ pReqsHead) { while (pReqsHead) { PRTAIOMGRREQ pReqCur = pReqsHead; pReqsHead = (PRTAIOMGRREQ)pReqsHead->WorkItem.pNext; pReqCur->WorkItem.pNext = NULL; RTListAppend(&pFile->AioMgr.ListWaitingReqs, &pReqCur->NodeWaitingList); } } /** * Prepare the native I/o request ensuring that all alignment prerequisites of * the host are met. * * @returns IPRT statuse code. * @param pFile The file instance data. * @param pReq The request to prepare. */ static int rtAioMgrReqPrepareNonBuffered(PRTAIOMGRFILEINT pFile, PRTAIOMGRREQ pReq) { int rc = VINF_SUCCESS; RTFOFF offStart = pReq->off & ~(RTFOFF)(512-1); size_t cbToTransfer = RT_ALIGN_Z(pReq->DataSeg.cbSeg + (pReq->off - offStart), 512); void *pvBuf = pReq->DataSeg.pvSeg; bool fAlignedReq = cbToTransfer == pReq->DataSeg.cbSeg && offStart == pReq->off; /* * Check if the alignment requirements are met. * Offset, transfer size and buffer address * need to be on a 512 boundary. */ if ( !fAlignedReq /** @todo: || ((pEpClassFile->uBitmaskAlignment & (RTR3UINTPTR)pvBuf) != (RTR3UINTPTR)pvBuf) */) { /* Create bounce buffer. */ pReq->cbBounceBuffer = cbToTransfer; AssertMsg(pReq->off >= offStart, ("Overflow in calculation off=%llu offStart=%llu\n", pReq->off, offStart)); pReq->offBounceBuffer = pReq->off - offStart; /** @todo: I think we need something like a RTMemAllocAligned method here. * Current assumption is that the maximum alignment is 4096byte * (GPT disk on Windows) * so we can use RTMemPageAlloc here. */ pReq->pvBounceBuffer = RTMemPageAlloc(cbToTransfer); if (RT_LIKELY(pReq->pvBounceBuffer)) { pvBuf = pReq->pvBounceBuffer; if (pReq->enmType == RTAIOMGRREQTYPE_WRITE) { if ( RT_UNLIKELY(cbToTransfer != pReq->DataSeg.cbSeg) || RT_UNLIKELY(offStart != pReq->off)) { /* We have to fill the buffer first before we can update the data. */ pReq->enmType = RTAIOMGRREQTYPE_WRITE; } else memcpy(pvBuf, pReq->DataSeg.pvSeg, pReq->DataSeg.cbSeg); } } else rc = VERR_NO_MEMORY; } else pReq->cbBounceBuffer = 0; if (RT_SUCCESS(rc)) { if (pReq->enmType == RTAIOMGRREQTYPE_WRITE) { rc = RTFileAioReqPrepareWrite(pReq->hReqIo, pFile->hFile, offStart, pvBuf, cbToTransfer, pReq); } else /* Read or prefetch request. */ rc = RTFileAioReqPrepareRead(pReq->hReqIo, pFile->hFile, offStart, pvBuf, cbToTransfer, pReq); AssertRC(rc); pReq->fFlags |= RTAIOMGRREQ_FLAGS_PREPARED; } return rc; } /** * Prepare a new request for enqueuing. * * @returns IPRT status code. * @param pReq The request to prepare. * @param phReqIo Where to store the handle to the native I/O request on success. */ static int rtAioMgrPrepareReq(PRTAIOMGRREQ pReq, PRTFILEAIOREQ phReqIo) { int rc = VINF_SUCCESS; PRTAIOMGRFILEINT pFile = pReq->pFile; switch (pReq->enmType) { case RTAIOMGRREQTYPE_FLUSH: { rc = RTFileAioReqPrepareFlush(pReq->hReqIo, pFile->hFile, pReq); break; } case RTAIOMGRREQTYPE_READ: case RTAIOMGRREQTYPE_WRITE: { rc = rtAioMgrReqPrepareNonBuffered(pFile, pReq); break; } default: AssertMsgFailed(("Invalid transfer type %d\n", pReq->enmType)); } /* switch transfer type */ if (RT_SUCCESS(rc)) *phReqIo = pReq->hReqIo; return rc; } /** * Prepare newly submitted requests for processing. * * @returns IPRT status code * @param pThis The async I/O manager instance data. * @param pFile The file instance. * @param pReqsNew The list of new requests to prepare. */ static int rtAioMgrPrepareNewReqs(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile, PRTAIOMGRREQ pReqsNew) { RTFILEAIOREQ apReqs[20]; unsigned cRequests = 0; int rc = VINF_SUCCESS; /* Go through the list and queue the requests. */ while ( pReqsNew && (pThis->cReqsActive + cRequests < pThis->cReqsActiveMax) && RT_SUCCESS(rc)) { PRTAIOMGRREQ pCurr = pReqsNew; pReqsNew = (PRTAIOMGRREQ)pReqsNew->WorkItem.pNext; pCurr->WorkItem.pNext = NULL; AssertMsg(VALID_PTR(pCurr->pFile) && (pCurr->pFile == pFile), ("Files do not match\n")); AssertMsg(!(pCurr->fFlags & RTAIOMGRREQ_FLAGS_PREPARED), ("Request on the new list is already prepared\n")); rc = rtAioMgrPrepareReq(pCurr, &apReqs[cRequests]); if (RT_FAILURE(rc)) rtAioMgrReqCompleteRc(pThis, pCurr, rc, 0); else cRequests++; /* Queue the requests if the array is full. */ if (cRequests == RT_ELEMENTS(apReqs)) { rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests); cRequests = 0; AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES), ("Unexpected return code\n")); } } if (cRequests) { rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests); AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES), ("Unexpected return code rc=%Rrc\n", rc)); } if (pReqsNew) { /* Add the rest of the tasks to the pending list */ rtAioMgrFileAddReqsToWaitingList(pFile, pReqsNew); } /* Insufficient resources are not fatal. */ if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES) rc = VINF_SUCCESS; return rc; } /** * Queues waiting requests. * * @returns IPRT status code. * @param pThis The async I/O manager instance data. * @param pFile The file to get the requests from. */ static int rtAioMgrQueueWaitingReqs(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile) { RTFILEAIOREQ apReqs[20]; unsigned cRequests = 0; int rc = VINF_SUCCESS; PRTAIOMGRREQ pReqIt; PRTAIOMGRREQ pReqItNext; /* Go through the list and queue the requests. */ RTListForEachSafe(&pFile->AioMgr.ListWaitingReqs, pReqIt, pReqItNext, RTAIOMGRREQ, NodeWaitingList) { RTListNodeRemove(&pReqIt->NodeWaitingList); AssertMsg(VALID_PTR(pReqIt->pFile) && (pReqIt->pFile == pFile), ("Files do not match\n")); if (!(pReqIt->fFlags & RTAIOMGRREQ_FLAGS_PREPARED)) { rc = rtAioMgrPrepareReq(pReqIt, &apReqs[cRequests]); if (RT_FAILURE(rc)) rtAioMgrReqCompleteRc(pThis, pReqIt, rc, 0); else cRequests++; } else { apReqs[cRequests] = pReqIt->hReqIo; cRequests++; } /* Queue the requests if the array is full. */ if (cRequests == RT_ELEMENTS(apReqs)) { rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests); cRequests = 0; AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES), ("Unexpected return code\n")); } } if (cRequests) { rc = rtAioMgrReqsEnqueue(pThis, pFile, apReqs, cRequests); AssertMsg(RT_SUCCESS(rc) || (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES), ("Unexpected return code rc=%Rrc\n", rc)); } /* Insufficient resources are not fatal. */ if (rc == VERR_FILE_AIO_INSUFFICIENT_RESSOURCES) rc = VINF_SUCCESS; return rc; } /** * Adds all pending requests for the given file. * * @returns IPRT status code. * @param pThis The async I/O manager instance data. * @param pFile The file to get the requests from. */ static int rtAioMgrQueueReqs(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile) { int rc = VINF_SUCCESS; PRTAIOMGRFILEINT pReqsHead = NULL; /* Check the pending list first */ if (!RTListIsEmpty(&pFile->AioMgr.ListWaitingReqs)) rc = rtAioMgrQueueWaitingReqs(pThis, pFile); if ( RT_SUCCESS(rc) && RTListIsEmpty(&pFile->AioMgr.ListWaitingReqs)) { PRTAIOMGRREQ pReqsNew = (PRTAIOMGRREQ)RTQueueAtomicRemoveAll(&pFile->QueueReqs); if (pReqsNew) { rc = rtAioMgrPrepareNewReqs(pThis, pFile, pReqsNew); AssertRC(rc); } } return rc; } /** * Checks all files for new requests. * * @returns IPRT status code. * @param pThis The I/O manager instance data. */ static int rtAioMgrCheckFiles(PRTAIOMGRINT pThis) { int rc = VINF_SUCCESS; PRTAIOMGRFILEINT pIt; RTListForEach(&pThis->ListFiles, pIt, RTAIOMGRFILEINT, AioMgr.NodeAioMgrFiles) { rc = rtAioMgrQueueReqs(pThis, pIt); if (RT_FAILURE(rc)) return rc; } return rc; } /** * Process a blocking event from the outside. * * @returns IPRT status code. * @param pThis The async I/O manager instance data. */ static int rtAioMgrProcessBlockingEvent(PRTAIOMGRINT pThis) { int rc = VINF_SUCCESS; bool fNotifyWaiter = false; switch (pThis->enmBlockingEvent) { case RTAIOMGREVENT_NO_EVENT: /* Nothing to do. */ break; case RTAIOMGREVENT_FILE_ADD: { PRTAIOMGRFILEINT pFile = ASMAtomicReadPtrT(&pThis->BlockingEventData.pFileAdd, PRTAIOMGRFILEINT); AssertMsg(VALID_PTR(pFile), ("Adding file event without a file to add\n")); RTListAppend(&pThis->ListFiles, &pFile->AioMgr.NodeAioMgrFiles); fNotifyWaiter = true; break; } case RTAIOMGREVENT_FILE_CLOSE: { PRTAIOMGRFILEINT pFile = ASMAtomicReadPtrT(&pThis->BlockingEventData.pFileClose, PRTAIOMGRFILEINT); AssertMsg(VALID_PTR(pFile), ("Close file event without a file to close\n")); if (!(pFile->fFlags & RTAIOMGRFILE_FLAGS_CLOSING)) { /* Make sure all requests finished. Process the queues a last time first. */ rc = rtAioMgrQueueReqs(pThis, pFile); AssertRC(rc); pFile->fFlags |= RTAIOMGRFILE_FLAGS_CLOSING; fNotifyWaiter = !rtAioMgrFileRemove(pFile); } else if (!pFile->AioMgr.cReqsActive) fNotifyWaiter = true; break; } case RTAIOMGREVENT_SHUTDOWN: { if (!pThis->cReqsActive) fNotifyWaiter = true; break; } default: AssertReleaseMsgFailed(("Invalid event type %d\n", pThis->enmBlockingEvent)); } if (fNotifyWaiter) { /* Release the waiting thread. */ rc = RTSemEventSignal(pThis->hEventSemBlock); AssertRC(rc); } return rc; } /** * async I/O manager worker loop. * * @returns IPRT status code. * @param hThreadSelf The thread handle this worker belongs to. * @param pvUser Opaque user data (Pointer to async I/O manager instance). */ static DECLCALLBACK(int) rtAioMgrWorker(RTTHREAD hThreadSelf, void *pvUser) { PRTAIOMGRINT pThis = (PRTAIOMGRINT)pvUser; bool fRunning = true; int rc = VINF_SUCCESS; do { uint32_t cReqsCompleted = 0; RTFILEAIOREQ ahReqsCompleted[32]; rc = RTFileAioCtxWait(pThis->hAioCtx, 1, RT_INDEFINITE_WAIT, &ahReqsCompleted[0], RT_ELEMENTS(ahReqsCompleted), &cReqsCompleted); if (rc == VERR_INTERRUPTED) { /* Process external event. */ rtAioMgrProcessBlockingEvent(pThis); rc = rtAioMgrCheckFiles(pThis); } else if (RT_FAILURE(rc)) { /* Something bad happened. */ /** @todo: */ } else { /* Requests completed. */ for (uint32_t i = 0; i < cReqsCompleted; i++) rtAioMgrReqComplete(pThis, ahReqsCompleted[i]); /* Check files for new requests and queue waiting requests. */ rc = rtAioMgrCheckFiles(pThis); } } while ( fRunning && RT_SUCCESS(rc)); return rc; } /** * Wakes up the async I/O manager. * * @returns IPRT status code. * @param pThis The async I/O manager. */ static int rtAioMgrWakeup(PRTAIOMGRINT pThis) { return RTFileAioCtxWakeup(pThis->hAioCtx); } /** * Waits until the async I/O manager handled the given event. * * @returns IPRT status code. * @param pThis The async I/O manager. * @param enmEvent The event to pass to the manager. */ static int rtAioMgrWaitForBlockingEvent(PRTAIOMGRINT pThis, RTAIOMGREVENT enmEvent) { Assert(pThis->enmBlockingEvent == RTAIOMGREVENT_NO_EVENT); ASMAtomicWriteU32((volatile uint32_t *)&pThis->enmBlockingEvent, enmEvent); /* Wakeup the async I/O manager */ int rc = rtAioMgrWakeup(pThis); if (RT_FAILURE(rc)) return rc; /* Wait for completion. */ rc = RTSemEventWait(pThis->hEventSemBlock, RT_INDEFINITE_WAIT); AssertRC(rc); ASMAtomicWriteU32((volatile uint32_t *)&pThis->enmBlockingEvent, RTAIOMGREVENT_NO_EVENT); return rc; } /** * Add a given file to the given I/O manager. * * @returns IPRT status code. * @param pThis The async I/O manager. * @param pFile The file to add. */ static int rtAioMgrAddFile(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile) { /* Update the assigned I/O manager. */ ASMAtomicWritePtr(&pFile->pAioMgr, pThis); int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent); AssertRCReturn(rc, rc); ASMAtomicWritePtr(&pThis->BlockingEventData.pFileAdd, pFile); rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_FILE_ADD); ASMAtomicWriteNullPtr(&pThis->BlockingEventData.pFileAdd); RTCritSectLeave(&pThis->CritSectBlockingEvent); return rc; } /** * Removes a given file from the given I/O manager. * * @returns IPRT status code. * @param pThis The async I/O manager. * @param pFile The file to remove. */ static int rtAioMgrCloseFile(PRTAIOMGRINT pThis, PRTAIOMGRFILEINT pFile) { int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent); AssertRCReturn(rc, rc); ASMAtomicWritePtr(&pThis->BlockingEventData.pFileClose, pFile); rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_FILE_CLOSE); ASMAtomicWriteNullPtr(&pThis->BlockingEventData.pFileClose); RTCritSectLeave(&pThis->CritSectBlockingEvent); return rc; } /** * Process a shutdown event. * * @returns IPRT status code. * @param pThis The async I/O manager to shut down. */ static int rtAioMgrShutdown(PRTAIOMGRINT pThis) { int rc = RTCritSectEnter(&pThis->CritSectBlockingEvent); AssertRCReturn(rc, rc); rc = rtAioMgrWaitForBlockingEvent(pThis, RTAIOMGREVENT_SHUTDOWN); RTCritSectLeave(&pThis->CritSectBlockingEvent); return rc; } /** * Destroys an async I/O manager. * * @returns nothing. * @param pThis The async I/O manager instance to destroy. */ static void rtAioMgrDestroy(PRTAIOMGRINT pThis) { int rc; rc = rtAioMgrShutdown(pThis); AssertRC(rc); rc = RTThreadWait(pThis->hThread, RT_INDEFINITE_WAIT, NULL); AssertRC(rc); rc = RTFileAioCtxDestroy(pThis->hAioCtx); AssertRC(rc); rc = RTMemCacheDestroy(pThis->hMemCacheReqs); AssertRC(rc); pThis->hThread = NIL_RTTHREAD; pThis->hAioCtx = NIL_RTFILEAIOCTX; pThis->hMemCacheReqs = NIL_RTMEMCACHE; pThis->u32Magic = ~RTAIOMGR_MAGIC; RTCritSectDelete(&pThis->CritSectBlockingEvent); RTSemEventDestroy(pThis->hEventSemBlock); RTMemFree(pThis); } /** * Queues a new request for processing. */ static void rtAioMgrFileQueueReq(PRTAIOMGRFILEINT pThis, PRTAIOMGRREQ pReq) { RTAioMgrFileRetain(pThis); RTQueueAtomicInsert(&pThis->QueueReqs, &pReq->WorkItem); rtAioMgrWakeup(pThis->pAioMgr); } /** * Destroys an async I/O manager file. * * @returns nothing. * @param pThis The async I/O manager file. */ static void rtAioMgrFileDestroy(PRTAIOMGRFILEINT pThis) { pThis->u32Magic = ~RTAIOMGRFILE_MAGIC; rtAioMgrCloseFile(pThis->pAioMgr, pThis); RTAioMgrRelease(pThis->pAioMgr); RTMemFree(pThis); } /** * Queues a new I/O request. * * @returns IPRT status code. * @param hAioMgrFile The I/O manager file handle. * @param off Start offset of the I/o request. * @param pSgBuf Data S/G buffer. * @param cbIo How much to transfer. * @param pvUser Opaque user data. * @param enmType I/O direction type (read/write). */ static int rtAioMgrFileIoReqCreate(RTAIOMGRFILE hAioMgrFile, RTFOFF off, PRTSGBUF pSgBuf, size_t cbIo, void *pvUser, RTAIOMGRREQTYPE enmType) { int rc; PRTAIOMGRFILEINT pFile = hAioMgrFile; PRTAIOMGRINT pAioMgr; AssertPtrReturn(pFile, VERR_INVALID_HANDLE); pAioMgr = pFile->pAioMgr; PRTAIOMGRREQ pReq = rtAioMgrReqAlloc(pAioMgr); if (RT_LIKELY(pReq)) { unsigned cSeg = 1; size_t cbSeg = RTSgBufSegArrayCreate(pSgBuf, &pReq->DataSeg, &cSeg, cbIo); if (cbSeg == cbIo) { pReq->enmType = enmType; pReq->pFile = pFile; pReq->pvUser = pvUser; pReq->off = off; rtAioMgrFileQueueReq(pFile, pReq); rc = VERR_FILE_AIO_IN_PROGRESS; } else { /** @todo: Real S/G buffer support. */ rtAioMgrReqFree(pAioMgr, pReq); rc = VERR_NOT_SUPPORTED; } } else rc = VERR_NO_MEMORY; return rc; } /** * Request constructor for the memory cache. * * @returns IPRT status code. * @param hMemCache The cache handle. * @param pvObj The memory object that should be initialized. * @param pvUser The user argument. */ static DECLCALLBACK(int) rtAioMgrReqCtor(RTMEMCACHE hMemCache, void *pvObj, void *pvUser) { PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)pvObj; memset(pReq, 0, sizeof(RTAIOMGRREQ)); return RTFileAioReqCreate(&pReq->hReqIo); } /** * Request destructor for the memory cache. * * @param hMemCache The cache handle. * @param pvObj The memory object that should be destroyed. * @param pvUser The user argument. */ static DECLCALLBACK(void) rtAioMgrReqDtor(RTMEMCACHE hMemCache, void *pvObj, void *pvUser) { PRTAIOMGRREQ pReq = (PRTAIOMGRREQ)pvObj; int rc = RTFileAioReqDestroy(pReq->hReqIo); AssertRC(rc); } RTDECL(int) RTAioMgrCreate(PRTAIOMGR phAioMgr, uint32_t cReqsMax) { int rc = VINF_SUCCESS; PRTAIOMGRINT pThis; AssertPtrReturn(phAioMgr, VERR_INVALID_POINTER); AssertReturn(cReqsMax > 0, VERR_INVALID_PARAMETER); pThis = (PRTAIOMGRINT)RTMemAllocZ(sizeof(RTAIOMGRINT)); if (pThis) { pThis->u32Magic = RTAIOMGR_MAGIC; pThis->cRefs = 1; pThis->enmBlockingEvent = RTAIOMGREVENT_NO_EVENT; RTListInit(&pThis->ListFiles); rc = RTCritSectInit(&pThis->CritSectBlockingEvent); if (RT_SUCCESS(rc)) { rc = RTSemEventCreate(&pThis->hEventSemBlock); if (RT_SUCCESS(rc)) { rc = RTMemCacheCreate(&pThis->hMemCacheReqs, sizeof(RTAIOMGRREQ), 0, UINT32_MAX, rtAioMgrReqCtor, rtAioMgrReqDtor, NULL, 0); if (RT_SUCCESS(rc)) { rc = RTFileAioCtxCreate(&pThis->hAioCtx, cReqsMax == UINT32_MAX ? RTFILEAIO_UNLIMITED_REQS : cReqsMax, RTFILEAIOCTX_FLAGS_WAIT_WITHOUT_PENDING_REQUESTS); if (RT_SUCCESS(rc)) { rc = RTThreadCreateF(&pThis->hThread, rtAioMgrWorker, pThis, 0, RTTHREADTYPE_IO, RTTHREADFLAGS_WAITABLE, "AioMgr-%u", cReqsMax); if (RT_FAILURE(rc)) { rc = RTFileAioCtxDestroy(pThis->hAioCtx); AssertRC(rc); } } if (RT_FAILURE(rc)) RTMemCacheDestroy(pThis->hMemCacheReqs); } if (RT_FAILURE(rc)) RTSemEventDestroy(pThis->hEventSemBlock); } if (RT_FAILURE(rc)) RTCritSectDelete(&pThis->CritSectBlockingEvent); } if (RT_FAILURE(rc)) RTMemFree(pThis); } else rc = VERR_NO_MEMORY; if (RT_SUCCESS(rc)) *phAioMgr = pThis; return rc; } RTDECL(uint32_t) RTAioMgrRetain(RTAIOMGR hAioMgr) { PRTAIOMGRINT pThis = hAioMgr; AssertReturn(hAioMgr != NIL_RTAIOMGR, UINT32_MAX); AssertPtrReturn(pThis, UINT32_MAX); AssertReturn(pThis->u32Magic == RTAIOMGR_MAGIC, UINT32_MAX); uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs); AssertMsg(cRefs > 1 && cRefs < _1G, ("%#x %p\n", cRefs, pThis)); return cRefs; } RTDECL(uint32_t) RTAioMgrRelease(RTAIOMGR hAioMgr) { PRTAIOMGRINT pThis = hAioMgr; if (pThis == NIL_RTAIOMGR) return 0; AssertPtrReturn(pThis, UINT32_MAX); AssertReturn(pThis->u32Magic == RTAIOMGR_MAGIC, UINT32_MAX); uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs); AssertMsg(cRefs < _1G, ("%#x %p\n", cRefs, pThis)); if (cRefs == 0) rtAioMgrDestroy(pThis); return cRefs; } RTDECL(int) RTAioMgrFileCreate(RTAIOMGR hAioMgr, RTFILE hFile, PFNRTAIOMGRREQCOMPLETE pfnReqComplete, void *pvUser, PRTAIOMGRFILE phAioMgrFile) { int rc = VINF_SUCCESS; PRTAIOMGRFILEINT pThis; AssertReturn(hAioMgr != NIL_RTAIOMGR, VERR_INVALID_HANDLE); AssertReturn(hFile != NIL_RTFILE, VERR_INVALID_HANDLE); AssertPtrReturn(pfnReqComplete, VERR_INVALID_POINTER); AssertPtrReturn(phAioMgrFile, VERR_INVALID_POINTER); pThis = (PRTAIOMGRFILEINT)RTMemAllocZ(sizeof(RTAIOMGRFILEINT)); if (pThis) { pThis->u32Magic = RTAIOMGRFILE_MAGIC; pThis->cRefs = 1; pThis->hFile = hFile; pThis->pAioMgr = hAioMgr; pThis->pvUser = pvUser; pThis->pfnReqCompleted = pfnReqComplete; RTQueueAtomicInit(&pThis->QueueReqs); RTListInit(&pThis->AioMgr.ListWaitingReqs); RTAioMgrRetain(hAioMgr); rc = RTFileAioCtxAssociateWithFile(pThis->pAioMgr->hAioCtx, hFile); if (RT_FAILURE(rc)) rtAioMgrFileDestroy(pThis); else rtAioMgrAddFile(pThis->pAioMgr, pThis); } else rc = VERR_NO_MEMORY; if (RT_SUCCESS(rc)) *phAioMgrFile = pThis; return rc; } RTDECL(uint32_t) RTAioMgrFileRetain(RTAIOMGRFILE hAioMgrFile) { PRTAIOMGRFILEINT pThis = hAioMgrFile; AssertReturn(hAioMgrFile != NIL_RTAIOMGRFILE, UINT32_MAX); AssertPtrReturn(pThis, UINT32_MAX); AssertReturn(pThis->u32Magic == RTAIOMGRFILE_MAGIC, UINT32_MAX); uint32_t cRefs = ASMAtomicIncU32(&pThis->cRefs); AssertMsg(cRefs > 1 && cRefs < _1G, ("%#x %p\n", cRefs, pThis)); return cRefs; } RTDECL(uint32_t) RTAioMgrFileRelease(RTAIOMGRFILE hAioMgrFile) { PRTAIOMGRFILEINT pThis = hAioMgrFile; if (pThis == NIL_RTAIOMGRFILE) return 0; AssertPtrReturn(pThis, UINT32_MAX); AssertReturn(pThis->u32Magic == RTAIOMGRFILE_MAGIC, UINT32_MAX); uint32_t cRefs = ASMAtomicDecU32(&pThis->cRefs); AssertMsg(cRefs < _1G, ("%#x %p\n", cRefs, pThis)); if (cRefs == 0) rtAioMgrFileDestroy(pThis); return cRefs; } RTDECL(void *) RTAioMgrFileGetUser(RTAIOMGRFILE hAioMgrFile) { PRTAIOMGRFILEINT pThis = hAioMgrFile; AssertPtrReturn(pThis, NULL); return pThis->pvUser; } RTDECL(int) RTAioMgrFileRead(RTAIOMGRFILE hAioMgrFile, RTFOFF off, PRTSGBUF pSgBuf, size_t cbRead, void *pvUser) { return rtAioMgrFileIoReqCreate(hAioMgrFile, off, pSgBuf, cbRead, pvUser, RTAIOMGRREQTYPE_READ); } RTDECL(int) RTAioMgrFileWrite(RTAIOMGRFILE hAioMgrFile, RTFOFF off, PRTSGBUF pSgBuf, size_t cbWrite, void *pvUser) { return rtAioMgrFileIoReqCreate(hAioMgrFile, off, pSgBuf, cbWrite, pvUser, RTAIOMGRREQTYPE_WRITE); } RTDECL(int) RTAioMgrFileFlush(RTAIOMGRFILE hAioMgrFile, void *pvUser) { PRTAIOMGRFILEINT pFile = hAioMgrFile; PRTAIOMGRINT pAioMgr; AssertPtrReturn(pFile, VERR_INVALID_HANDLE); pAioMgr = pFile->pAioMgr; PRTAIOMGRREQ pReq = rtAioMgrReqAlloc(pAioMgr); if (RT_UNLIKELY(!pReq)) return VERR_NO_MEMORY; pReq->pFile = pFile; pReq->enmType = RTAIOMGRREQTYPE_FLUSH; pReq->pvUser = pvUser; rtAioMgrFileQueueReq(pFile, pReq); return VERR_FILE_AIO_IN_PROGRESS; }