Skip to content

Commit

Permalink
Merge pull request #8 from well-typed/jdral/keepalive
Browse files Browse the repository at this point in the history
Use `MutableByteArray` as buffers, add manual `keepAlive`
  • Loading branch information
dcoutts authored Mar 15, 2024
2 parents d38d85e + bb4b65c commit a4b63e0
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 35 deletions.
58 changes: 41 additions & 17 deletions System/IO/BlockIO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ import Data.Array.IArray
import Data.Array.IO
import Data.Array.Unboxed
import Data.Coerce
import Data.Primitive.ByteArray

import Control.Monad
import Control.Monad.Primitive
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar
import Control.Concurrent.QSemN
import Control.Concurrent.Chan
import Control.Exception (mask_, throw, ArrayException(UndefinedElement),
finally, assert, throwIO)
import System.IO.Error
import GHC.IO.Exception (IOErrorType(ResourceVanished))
import GHC.IO.Exception (IOErrorType(ResourceVanished, InvalidArgument))

import Foreign.Ptr
import Foreign.C.Error (Errno(..))
Expand Down Expand Up @@ -141,9 +143,11 @@ closeIOCtx IOCtx {ioctxURing, ioctxCloseSync} = do
URing.closeURing uring
putMVar ioctxURing Nothing

data IOOp = IOOpRead !Fd !FileOffset !(Ptr Word8) !ByteCount
| IOOpWrite !Fd !FileOffset !(Ptr Word8) !ByteCount
deriving Show
-- | The 'MutableByteArray' buffers within __must__ be pinned. Addresses into
-- these buffers are passed to @io_uring@, and the buffers must therefore not be
-- moved around.
data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount
| IOOpWrite !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !ByteCount

newtype IOResult = IOResult_ URing.IOResult

Expand Down Expand Up @@ -206,7 +210,7 @@ viewIOError (IOResult_ e)
-- the target depth, fill it up to double again. This way there is always
-- at least the target number in flight at once.
--
submitIO :: IOCtx -> [IOOp] -> IO [IOResult]
submitIO :: IOCtx -> [IOOp IO] -> IO [IOResult]
submitIO IOCtx {
ioctxQSemN,
ioctxURing,
Expand All @@ -219,11 +223,13 @@ submitIO IOCtx {
waitQSemN ioctxQSemN (fromIntegral iobatchOpCount)
iobatchIx <- readChan ioctxChanIOBatchIx
iobatchCompletion <- newEmptyMVar
let iobatchKeepAlives = ioops
writeChan ioctxChanIOBatch
IOBatch {
iobatchIx,
iobatchOpCount,
iobatchCompletion
iobatchCompletion,
iobatchKeepAlives
}
withMVar ioctxURing $ \case
Nothing -> throwIO closed
Expand All @@ -232,22 +238,35 @@ submitIO IOCtx {
sequence_
[ --print ioop >>
case ioop of
IOOpRead fd off buf cnt ->
URing.prepareRead uring fd off buf cnt ioopid

IOOpWrite fd off buf cnt ->
URing.prepareWrite uring fd off buf cnt ioopid
IOOpRead fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareRead uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
IOOpWrite fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
| (ioop, ioopix) <- zip ioops [IOOpIx 0 ..]
, let !ioopid = packIOOpId iobatchIx ioopix ]
URing.submitIO uring
-- print ("submitIO", "submitting done")
map (IOResult_ . coerce) . elems <$> takeMVar iobatchCompletion
where closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing
where
closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing
guardPinned mba = do
unless (isMutableByteArrayPinned mba) $ throwIO notPinned
notPinned = mkIOError InvalidArgument "MutableByteArray is unpinned" Nothing Nothing

data IOBatch = IOBatch {
iobatchIx :: !IOBatchIx,
iobatchOpCount :: !Word32,
iobatchCompletion :: MVar (UArray IOOpIx Int32)
iobatchCompletion :: MVar (UArray IOOpIx Int32),
-- | The list of I\/O operations is sent to the completion
-- thread so that the buffers are kept alive while the kernel
-- is using them.
iobatchKeepAlives :: [IOOp IO]
}

newtype IOBatchIx = IOBatchIx Word32
Expand Down Expand Up @@ -286,14 +305,16 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do
counts <- newArray iobatchixBounds (-1)
results <- newArray iobatchixBounds invalidEntry
completions <- newArray iobatchixBounds invalidEntry
collectCompletion counts results completions
keepAlives <- newArray iobatchixBounds invalidEntry
collectCompletion counts results completions keepAlives
`finally` putMVar done ()
where
collectCompletion :: IOUArray IOBatchIx Int
-> IOArray IOBatchIx (IOUArray IOOpIx Int32)
-> IOArray IOBatchIx (MVar (UArray IOOpIx Int32))
-> IOArray IOBatchIx [IOOp IO]
-> IO ()
collectCompletion counts results completions = do
collectCompletion counts results completions keepAlives = do
iocompletion <- URing.awaitIO uring
let (URing.IOCompletion ioopid iores) = iocompletion
unless (ioopid == URing.IOOpId maxBound) $ do
Expand All @@ -311,12 +332,13 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do
writeArray counts iobatchix (-1)
writeArray results iobatchix invalidEntry
writeArray completions iobatchix invalidEntry
writeArray keepAlives iobatchix invalidEntry
result' <- freeze result
putMVar completion (result' :: UArray IOOpIx Int32)
writeChan chaniobatchix iobatchix
let !qrelease = rangeSize (bounds result')
signalQSemN qsem qrelease
collectCompletion counts results completions
collectCompletion counts results completions keepAlives

-- wait for single IO result
-- if the count is positive, decrement and update result array
Expand All @@ -330,14 +352,16 @@ completionThread uring done maxc qsem chaniobatch chaniobatchix = do
IOBatch{
iobatchIx,
iobatchOpCount,
iobatchCompletion
iobatchCompletion,
iobatchKeepAlives
} <- readChan chaniobatch
oldcount <- readArray counts iobatchIx
assert (oldcount == (-1)) (return ())
writeArray counts iobatchIx (fromIntegral iobatchOpCount)
result <- newArray (IOOpIx 0, IOOpIx (iobatchOpCount-1)) (-1)
writeArray results iobatchIx result
writeArray completions iobatchIx iobatchCompletion
writeArray keepAlives iobatchIx iobatchKeepAlives
if iobatchIx == iobatchixNeeded
then return $! fromIntegral iobatchOpCount
else collectIOBatches iobatchixNeeded
Expand Down
31 changes: 16 additions & 15 deletions benchmark/Bench.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE BangPatterns #-}
{- HLINT ignore "Use camelCase" -}

module Main (main) where

import Data.Primitive
import qualified Data.Set as Set
import Control.Monad
import Control.Exception
Expand Down Expand Up @@ -100,20 +101,20 @@ main_highlevel filename = do
ioctxConcurrencyLimit = 64 * 4
}
blocks = zip [0..] (randomPermute rng [0..lastBlock])
bracket (initIOCtx params) closeIOCtx $ \ioctx ->
allocaBytes (4096 * nbufs) $ \bufptr -> do

before <- getCurrentTime
forConcurrently_ (groupsOfN 32 blocks) $ \batch ->
submitIO ioctx
[ IOOpRead fd blockoff bufptr' 4096
| (i, block) <- batch
, let bufptr' = bufptr `plusPtr` ((i `mod` nbufs) * 4096)
blockoff = fromIntegral (block * 4096)
]
after <- getCurrentTime
let total = lastBlock + 1
report before after total
bracket (initIOCtx params) closeIOCtx $ \ioctx -> do
buf <- newPinnedByteArray (4096 * nbufs)

before <- getCurrentTime
forConcurrently_ (groupsOfN 32 blocks) $ \batch ->
submitIO ioctx
[ IOOpRead fd blockoff buf bufOff 4096
| (i, block) <- batch
, let bufOff = (i `mod` nbufs) * 4096
blockoff = fromIntegral (block * 4096)
]
after <- getCurrentTime
let total = lastBlock + 1
report before after total

report :: UTCTime -> UTCTime -> Int -> IO ()
report before after total = do
Expand Down
8 changes: 5 additions & 3 deletions blockio-uring.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ library
System.IO.BlockIO.URingFFI

build-depends:
, array ^>=0.5
, base >=4.16 && <4.20
, unix ^>=2.8
, array ^>=0.5
, base >=4.16 && <4.20
, primitive ^>=0.9
, unix ^>=2.8

pkgconfig-depends: liburing
default-language: Haskell2010
Expand All @@ -58,6 +59,7 @@ benchmark bench
, async
, base
, containers
, primitive
, random
, time
, unix
Expand Down

0 comments on commit a4b63e0

Please sign in to comment.