{-# LANGUAGE DataKinds #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE Trustworthy #-}
module BroadcastChan.Internal where
import Control.Concurrent.MVar
import Control.Exception (mask_)
import Control.Monad ((>=>))
import Control.Monad.IO.Unlift (MonadIO(..))
import System.IO.Unsafe (unsafeInterleaveIO)
data Direction = In
| Out
type In = 'In
type Out = 'Out
newtype BroadcastChan (dir :: Direction) a = BChan (MVar (Stream a))
deriving (BroadcastChan dir a -> BroadcastChan dir a -> Bool
(BroadcastChan dir a -> BroadcastChan dir a -> Bool)
-> (BroadcastChan dir a -> BroadcastChan dir a -> Bool)
-> Eq (BroadcastChan dir a)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall (dir :: Direction) a.
BroadcastChan dir a -> BroadcastChan dir a -> Bool
$c== :: forall (dir :: Direction) a.
BroadcastChan dir a -> BroadcastChan dir a -> Bool
== :: BroadcastChan dir a -> BroadcastChan dir a -> Bool
$c/= :: forall (dir :: Direction) a.
BroadcastChan dir a -> BroadcastChan dir a -> Bool
/= :: BroadcastChan dir a -> BroadcastChan dir a -> Bool
Eq)
type Stream a = MVar (ChItem a)
data ChItem a = ChItem a {-# UNPACK #-} !(Stream a) | Closed
newBroadcastChan :: MonadIO m => m (BroadcastChan In a)
newBroadcastChan :: forall (m :: * -> *) a. MonadIO m => m (BroadcastChan In a)
newBroadcastChan = IO (BroadcastChan In a) -> m (BroadcastChan In a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (BroadcastChan In a) -> m (BroadcastChan In a))
-> IO (BroadcastChan In a) -> m (BroadcastChan In a)
forall a b. (a -> b) -> a -> b
$ do
hole <- IO (MVar (ChItem a))
forall a. IO (MVar a)
newEmptyMVar
writeVar <- newMVar hole
return (BChan writeVar)
closeBChan :: MonadIO m => BroadcastChan In a -> m Bool
closeBChan :: forall (m :: * -> *) a. MonadIO m => BroadcastChan In a -> m Bool
closeBChan (BChan MVar (Stream a)
writeVar) = IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> (IO Bool -> IO Bool) -> IO Bool -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO Bool -> IO Bool
forall a. IO a -> IO a
mask_ (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
old_hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
takeMVar MVar (Stream a)
writeVar
tryPutMVar old_hole Closed <* putMVar writeVar old_hole
isClosedBChan :: MonadIO m => BroadcastChan dir a -> m Bool
isClosedBChan :: forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m Bool
isClosedBChan (BChan MVar (Stream a)
mvar) = IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
old_hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
readMVar MVar (Stream a)
mvar
val <- tryReadMVar old_hole
case val of
Just ChItem a
Closed -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Maybe (ChItem a)
_ -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
writeBChan :: MonadIO m => BroadcastChan In a -> a -> m Bool
writeBChan :: forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan In a -> a -> m Bool
writeBChan (BChan MVar (Stream a)
writeVar) a
val = IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
new_hole <- IO (Stream a)
forall a. IO (MVar a)
newEmptyMVar
mask_ $ do
old_hole <- takeMVar writeVar
empty <- tryPutMVar old_hole (ChItem val new_hole)
if empty
then putMVar writeVar new_hole
else putMVar writeVar old_hole
return empty
{-# INLINE writeBChan #-}
readBChan :: MonadIO m => BroadcastChan Out a -> m (Maybe a)
readBChan :: forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan (BChan MVar (Stream a)
readVar) = IO (Maybe a) -> m (Maybe a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> m (Maybe a)) -> IO (Maybe a) -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
MVar (Stream a)
-> (Stream a -> IO (Stream a, Maybe a)) -> IO (Maybe a)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVarMasked MVar (Stream a)
readVar ((Stream a -> IO (Stream a, Maybe a)) -> IO (Maybe a))
-> (Stream a -> IO (Stream a, Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ \Stream a
read_end -> do
result <- Stream a -> IO (ChItem a)
forall a. MVar a -> IO a
readMVar Stream a
read_end
case result of
ChItem a
val Stream a
new_read_end -> (Stream a, Maybe a) -> IO (Stream a, Maybe a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a
new_read_end, a -> Maybe a
forall a. a -> Maybe a
Just a
val)
ChItem a
Closed -> (Stream a, Maybe a) -> IO (Stream a, Maybe a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a
read_end, Maybe a
forall a. Maybe a
Nothing)
{-# INLINE readBChan #-}
newBChanListener :: MonadIO m => BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener :: forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener (BChan MVar (Stream a)
mvar) = IO (BroadcastChan Out a) -> m (BroadcastChan Out a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (BroadcastChan Out a) -> m (BroadcastChan Out a))
-> IO (BroadcastChan Out a) -> m (BroadcastChan Out a)
forall a b. (a -> b) -> a -> b
$ do
hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
readMVar MVar (Stream a)
mvar
newReadVar <- newMVar hole
return (BChan newReadVar)
getBChanContents :: BroadcastChan dir a -> IO [a]
getBChanContents :: forall (dir :: Direction) a. BroadcastChan dir a -> IO [a]
getBChanContents = BroadcastChan dir a -> IO (BroadcastChan Out a)
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener (BroadcastChan dir a -> IO (BroadcastChan Out a))
-> (BroadcastChan Out a -> IO [a]) -> BroadcastChan dir a -> IO [a]
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> BroadcastChan Out a -> IO [a]
forall {a}. BroadcastChan Out a -> IO [a]
go
where
go :: BroadcastChan Out a -> IO [a]
go BroadcastChan Out a
ch = IO [a] -> IO [a]
forall a. IO a -> IO a
unsafeInterleaveIO (IO [a] -> IO [a]) -> IO [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ do
result <- BroadcastChan Out a -> IO (Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out a
ch
case result of
Maybe a
Nothing -> [a] -> IO [a]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return []
Just a
x -> do
xs <- BroadcastChan Out a -> IO [a]
go BroadcastChan Out a
ch
return (x:xs)
foldBChan
:: (MonadIO m, MonadIO n)
=> (x -> a -> x)
-> x
-> (x -> b)
-> BroadcastChan d a
-> n (m b)
foldBChan :: forall (m :: * -> *) (n :: * -> *) x a b (d :: Direction).
(MonadIO m, MonadIO n) =>
(x -> a -> x) -> x -> (x -> b) -> BroadcastChan d a -> n (m b)
foldBChan x -> a -> x
step x
begin x -> b
done BroadcastChan d a
chan = do
listen <- BroadcastChan d a -> n (BroadcastChan Out a)
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener BroadcastChan d a
chan
return $ go listen begin
where
go :: BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen x
x = do
x' <- BroadcastChan Out a -> m (Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out a
listen
case x' of
Just a
x'' -> BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen (x -> m b) -> x -> m b
forall a b. (a -> b) -> a -> b
$! x -> a -> x
step x
x a
x''
Maybe a
Nothing -> b -> m b
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> b -> m b
forall a b. (a -> b) -> a -> b
$! x -> b
done x
x
{-# INLINABLE foldBChan #-}
foldBChanM
:: (MonadIO m, MonadIO n)
=> (x -> a -> m x)
-> m x
-> (x -> m b)
-> BroadcastChan d a
-> n (m b)
foldBChanM :: forall (m :: * -> *) (n :: * -> *) x a b (d :: Direction).
(MonadIO m, MonadIO n) =>
(x -> a -> m x)
-> m x -> (x -> m b) -> BroadcastChan d a -> n (m b)
foldBChanM x -> a -> m x
step m x
begin x -> m b
done BroadcastChan d a
chan = do
listen <- BroadcastChan d a -> n (BroadcastChan Out a)
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener BroadcastChan d a
chan
return $ do
x0 <- begin
go listen x0
where
go :: BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen x
x = do
x' <- BroadcastChan Out a -> m (Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out a
listen
case x' of
Just a
x'' -> x -> a -> m x
step x
x a
x'' m x -> (x -> m b) -> m b
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen
Maybe a
Nothing -> x -> m b
done x
x
{-# INLINABLE foldBChanM #-}