{-|
Module : Orc
Description : Distributed computation Orchestration language
Maintainer : gatlin@niltag.net

Defines a simple but expressive EDSL for orchestrating parallel and concurrent
computations. Based on the Orc language from UT Austin.

Re-implementation of a library from Galois. The 'Orc' type has been re-defined
in terms of a delimited continuation monad.
-}

{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

module Orc
  ( -- * A language for distributed Orchestration
    Orc
  , runOrc
  , collect
    -- * Combinators
  , par
  , (<|>)
  , stop
  , signal
  , (<+>)
  , (<?>)
  , cut
  , val
  , eagerly
  , putStrLine
  , echo
  , onlyUntil
  , butAfter
  , notBefore
  , delay
  , publish
  , repeating
  , sync
  -- * List-like utilities
  , takeOrc
  , dropOrc
  , zipOrc
  , liftList
  , syncList
  , runChan
  -- * Re-exports & convenience
  , printOrc
  , prompt
  , (#)
  , shift
  , reset
  )
where

import HIO (HIO, runHIO, newGroup, local, finished, close)
import LCPS (CPS(..), (#), shift, reset)
import Control.Monad (guard, join, MonadPlus(..))
import Control.Concurrent.MonadIO
  ( MonadIO(..)
  , Chan(..)
  , MVar(..)
  , putMVar
  , takeMVar
  , newEmptyMVar
  , readMVar
  , fork
  , threadDelay
  , newMVar
  , tryTakeMVar
  , tryPutMVar
  , writeChan )
import Control.Applicative ((<|>))
import Control.DeepSeq (NFData(..), deepseq)
import Control.Concurrent.STM.MonadIO
  ( readTVarSTM
  , writeTVarSTM
  , modifyTVar
  , atomically
  , readTVar
  , newTVar )
import qualified Control.Concurrent.StdInOut as S

import System.IO.Unsafe (unsafePerformIO)

-- | A monad for orchestrating distributed computations via 'HIO'.
type Orc = CPS () HIO

-- | Runs an Orc computation, discarding the (many) results of the computation.
-- See @collect@ on a mechanism for collecting the results of a computation
-- into a list.
runOrc :: Orc a -> IO ()
runOrc :: forall a. Orc a -> IO ()
runOrc Orc a
p = forall b. HIO b -> IO ()
runHIO (Orc a
p forall {k} (result :: k) (m :: k -> *) answer.
CPS result m answer -> (answer -> m result) -> m result
# \a
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ())

-- | Terminates an 'Orc' computation.
stop :: Orc a
stop :: forall a. Orc a
stop = forall {k} answer (m :: k -> *) (result :: k).
((answer -> m result) -> m result) -> CPS result m answer
CPS forall a b. (a -> b) -> a -> b
$ \a -> HIO ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Alternate phrasing of @return ()@, which can be placed at the end of an
-- Orc computation to signal that it has no more values to produce.
signal :: Orc ()
signal :: Orc ()
signal = forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Parallel choice operator that performs the actions of @p@ and @q@ and
-- returns their results as they become available. Also written as @<|>@.
-- There is no left-right bias: the ordering between @p@ and @q@ is
-- unspecified.
par :: Orc a -> Orc a -> Orc a
par :: forall a. Orc a -> Orc a -> Orc a
par = forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
(<|>)

-- | Immediately fires up a thread for @p@, and then returns a handle to the
-- first result of that thread which is also of type @Orc a@.
-- An invocation of @eagerly@ is non-blocking, while an invocation of the
-- resulting handle is blocking.
eagerly :: Orc a -> Orc (Orc a)
eagerly :: forall a. Orc a -> Orc (Orc a)
eagerly Orc a
p = forall {k} answer (m :: k -> *) (result :: k).
((answer -> m result) -> m result) -> CPS result m answer
CPS forall a b. (a -> b) -> a -> b
$ \Orc a -> HIO ()
k -> do
  MVar a
res <- forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  Group
w <- HIO Group
newGroup
  ThreadId
threadId <- forall (io :: * -> *). HasFork io => io () -> io ThreadId
fork forall a b. (a -> b) -> a -> b
$ Orc a
p forall {m :: * -> *} {a}.
MonadIO m =>
CPS () m a -> (MVar a, Group) -> m ()
`saveOnce` (MVar a
res, Group
w)
  ThreadId
_ <- forall a. Group -> HIO a -> HIO a
local Group
w forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
threadId
  Orc a -> HIO ()
k (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (io :: * -> *) a. MonadIO io => MVar a -> io a
readMVar MVar a
res)

-- | Cut executes an orc expression, waits for the first result, and then
-- suppresses the rest, including killing any threads involved in computing the
-- remainder.
cut :: Orc a -> Orc a
cut :: forall a. Orc a -> Orc a
cut = forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Orc a -> Orc (Orc a)
eagerly

-- | Convenience function to print to stdout in an 'Orc' computation.
putStrLine :: String -> Orc ()
putStrLine :: String -> Orc ()
putStrLine String
str = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (io :: * -> *). MonadIO io => String -> io ()
S.putStrLine String
str

-- | Provided that the result type can be 'Show'n, this prints the result of an
-- 'Orc' computation.
printOrc :: Show a => Orc a -> IO ()
printOrc :: forall a. Show a => Orc a -> IO ()
printOrc Orc a
p = forall a. IO a -> IO a
S.setupStdInOut forall a b. (a -> b) -> a -> b
$ forall a. Orc a -> IO ()
runOrc forall a b. (a -> b) -> a -> b
$ do
  a
a <- Orc a
p
  String -> Orc ()
putStrLine (String
"Ans = " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show a
a)

-- | Biased choice operator ("and-then") that performs the action
-- (and returns all the results) of p first, and then once done performs
-- the actions of q.
(<+>) :: Orc a -> Orc a -> Orc a
Orc a
p <+> :: forall a. Orc a -> Orc a -> Orc a
<+> Orc a
q = forall {k} answer (m :: k -> *) (result :: k).
((answer -> m result) -> m result) -> CPS result m answer
CPS forall a b. (a -> b) -> a -> b
$ \a -> HIO ()
k -> do
  Group
w <- HIO Group
newGroup
  ThreadId
threadId <- forall (io :: * -> *). HasFork io => io () -> io ThreadId
fork (Orc a
p forall {k} (result :: k) (m :: k -> *) answer.
CPS result m answer -> (answer -> m result) -> m result
# a -> HIO ()
k)
  ThreadId
_ <- forall a. Group -> HIO a -> HIO a
local Group
w forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
threadId
  Group -> HIO ()
finished Group
w
  Orc a
q forall {k} (result :: k) (m :: k -> *) answer.
CPS result m answer -> (answer -> m result) -> m result
# a -> HIO ()
k

-- | A variant of '<+>' ("or-else") which performs and returns the results of
-- @p@, and if @p@ produced no answers continues to perform and return the
-- results of @q@.
(<?>) :: Orc a -> Orc a -> Orc a
Orc a
p <?> :: forall a. Orc a -> Orc a -> Orc a
<?> Orc a
q = do
  MVar ()
tripwire <- forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  do a
x <- Orc a
p
     Bool
_ <- forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io Bool
tryPutMVar MVar ()
tripwire ()
     forall (m :: * -> *) a. Monad m => a -> m a
return a
x
   forall a. Orc a -> Orc a -> Orc a
<+>
   do Maybe ()
triggered <- forall (io :: * -> *) a. MonadIO io => MVar a -> io (Maybe a)
tryTakeMVar MVar ()
tripwire
      case Maybe ()
triggered of
        Maybe ()
Nothing -> Orc a
q
        Just ()
_  -> forall a. Orc a
stop

CPS () m a
p saveOnce :: CPS () m a -> (MVar a, Group) -> m ()
`saveOnce` (MVar a
r,Group
w) = do
  MVar ()
ticket <- forall (io :: * -> *) a. MonadIO io => a -> io (MVar a)
newMVar ()
  CPS () m a
p forall {k} (result :: k) (m :: k -> *) answer.
CPS result m answer -> (answer -> m result) -> m result
# \a
x -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ (forall (io :: * -> *) a. MonadIO io => MVar a -> io a
takeMVar MVar ()
ticket forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar a
r a
x forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Group -> IO ()
close Group
w)

-- | Repeatedly reads values from the @vals@ 'MVar' until @j@ values have been
-- read or the @vals@ 'MVar' is exhausted (a 'Nothing' is passed). When there
-- are no more values to be returned, fills the @end@ 'MVar'.
echo :: Int -> MVar (Maybe a) -> MVar () -> Orc a
echo :: forall a. Int -> MVar (Maybe a) -> MVar () -> Orc a
echo Int
0 MVar (Maybe a)
_ MVar ()
end = forall a b. Orc a -> Orc b
silent (forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar ()
end ())
echo Int
j MVar (Maybe a)
vals MVar ()
end = do
  Maybe a
mx <- forall (io :: * -> *) a. MonadIO io => MVar a -> io a
takeMVar MVar (Maybe a)
vals
  case Maybe a
mx of
    Maybe a
Nothing -> forall a b. Orc a -> Orc b
silent (forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar ()
end ())
    Just a
x -> forall (m :: * -> *) a. Monad m => a -> m a
return a
x forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> forall a. Int -> MVar (Maybe a) -> MVar () -> Orc a
echo (Int
jforall a. Num a => a -> a -> a
-Int
1) MVar (Maybe a)
vals MVar ()
end

-- | Executes the computation @p@ but suppresses its results.
silent :: Orc a -> Orc b
silent :: forall a b. Orc a -> Orc b
silent Orc a
p  = Orc a
p forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. Orc a
stop

-- | Executes the computation @p@ and @done@. Once @done@ returns its first
-- result, kill both computations and return that result.
-- This discards the results of @p@.
onlyUntil :: Orc a -> Orc b -> Orc b
Orc a
p onlyUntil :: forall a b. Orc a -> Orc b -> Orc b
`onlyUntil` Orc b
done = forall a. Orc a -> Orc a
cut (forall a b. Orc a -> Orc b
silent Orc a
p forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Orc b
done)

-- | Immediately executes the computation @p@, but if it hasn't returned a
-- result in @t@ seconds, executes the computation @q@ and returns whichever
-- computations returns a result first (kill the other thread).
butAfter :: (RealFrac n, Show n) => Orc a -> (n, Orc a) -> Orc a
Orc a
p butAfter :: forall n a. (RealFrac n, Show n) => Orc a -> (n, Orc a) -> Orc a
`butAfter` (n
t,Orc a
def) = forall a. Orc a -> Orc a
cut (Orc a
p forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (forall a. (RealFrac a, Show a) => a -> Orc ()
delay n
t forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Orc a
def))

-- | Runs the computation @p@ and returns its first result, but doesn't return
-- before @w@ seconds have elapsed.
notBefore :: Orc a -> Float -> Orc a
Orc a
p notBefore :: forall a. Orc a -> Float -> Orc a
`notBefore` Float
w = forall a b c. (a -> b -> c) -> Orc a -> Orc b -> Orc c
sync forall a b. a -> b -> a
const Orc a
p (forall a. (RealFrac a, Show a) => a -> Orc ()
delay Float
w)

-- | Wait for a period of @w@ seconds before continuing.
delay :: (RealFrac a, Show a) => a -> Orc ()
delay :: forall a. (RealFrac a, Show a) => a -> Orc ()
delay a
w = (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (io :: * -> *). HasFork io => Int -> io ()
threadDelay (forall a b. (RealFrac a, Integral b) => a -> b
round (a
w forall a. Num a => a -> a -> a
* a
1000000)))
       forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (forall a b. Orc a -> Orc b
silent forall a b. (a -> b) -> a -> b
$ do
             forall (f :: * -> *). Alternative f => Bool -> f ()
guard (a
w forall a. Ord a => a -> a -> Bool
> a
100)
             String -> Orc ()
putStrLine (String
"Just checking you meant to wait " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show a
w forall a. [a] -> [a] -> [a]
++ String
" seconds"))

-- | Takes the first result of @p@, the first result of @q@, and applies them
-- to @f@.
-- The computations for @p@ and @q@ are run in parallel.
sync :: (a -> b -> c) -> Orc a -> Orc b -> Orc c
sync :: forall a b c. (a -> b -> c) -> Orc a -> Orc b -> Orc c
sync a -> b -> c
f Orc a
p Orc b
q = do
  Orc a
po <- forall a. Orc a -> Orc (Orc a)
eagerly Orc a
p
  Orc b
qo <- forall a. Orc a -> Orc (Orc a)
eagerly Orc b
q
  forall (f :: * -> *) a. Applicative f => a -> f a
pure a -> b -> c
f forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Orc a
po forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Orc b
qo

-- | An alternate mechanism for 'eagerly', it fires up a thread for @p@
-- and returns a lazy thunk that contains the single (trimmed) result
-- of the computation.  Be careful to use this function with 'publish'
-- when these lazy values need to be fully evaluated before proceeding
-- further.
val :: Orc a -> Orc a
val :: forall a. Orc a -> Orc a
val Orc a
p = forall {k} answer (m :: k -> *) (result :: k).
((answer -> m result) -> m result) -> CPS result m answer
CPS forall a b. (a -> b) -> a -> b
$ \a -> HIO ()
k -> do
  MVar a
res <- forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  Group
w <- HIO Group
newGroup
  ThreadId
threadId <- forall (io :: * -> *). HasFork io => io () -> io ThreadId
fork forall a b. (a -> b) -> a -> b
$ Orc a
p forall {m :: * -> *} {a}.
MonadIO m =>
CPS () m a -> (MVar a, Group) -> m ()
`saveOnce` (MVar a
res, Group
w)
  ThreadId
_ <- forall a. Group -> HIO a -> HIO a
local Group
w forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
threadId
  a -> HIO ()
k (forall a. IO a -> a
unsafePerformIO forall a b. (a -> b) -> a -> b
$ forall (io :: * -> *) a. MonadIO io => MVar a -> io a
readMVar MVar a
res)

-- | Convenience to solicit user input from stdin, via stdout.
prompt :: String -> Orc String
prompt :: String -> Orc String
prompt String
str = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (io :: * -> *). HasFork io => String -> io String
S.prompt String
str

-- | Publish is a hyperstrict form of 'return'.
-- It is usfeul for combining results from multiple 'val' computations,
-- providing a synchronization point.
publish :: NFData a => a -> Orc a
publish :: forall a. NFData a => a -> Orc a
publish a
x = forall a b. NFData a => a -> b -> b
deepseq a
x forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Repeatedly executes the computation @p@ and returns its results.
-- 'repeating' works best when @p@ is single-valued:
-- if @p@ is multi-valued Orc will spawn a repeating thread for every result
-- returned, resulting in an exponential blow-up of threads.
-- NB: this behavior may not be intentional
repeating :: Orc a -> Orc a
repeating :: forall a. Orc a -> Orc a
repeating Orc a
p = do
  a
x <- Orc a
p
  forall (m :: * -> *) a. Monad m => a -> m a
return a
x forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> forall a. Orc a -> Orc a
repeating Orc a
p

-- | Runs the computation @p@ and repeatedly puts its results (tagged with
-- 'Just' into the @vals@ 'MVar'. Puts 'Nothing' if there are no results left.
-- Stops executing when the @end@ MVar is filled.
sandbox :: Orc a -> MVar (Maybe a) -> MVar () -> Orc ()
sandbox :: forall a. Orc a -> MVar (Maybe a) -> MVar () -> Orc ()
sandbox Orc a
p MVar (Maybe a)
vals MVar ()
end =
  ((Orc a
p forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar (Maybe a)
vals forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a
Just)) forall a. Orc a -> Orc a -> Orc a
<+> forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar (Maybe a)
vals forall a. Maybe a
Nothing)
  forall a b. Orc a -> Orc b -> Orc b
`onlyUntil` forall (io :: * -> *) a. MonadIO io => MVar a -> io a
takeMVar MVar ()
end

-- | Runs the computation @p@ and returns the first @n@ results.
takeOrc :: Int -> Orc a -> Orc a
takeOrc :: forall a. Int -> Orc a -> Orc a
takeOrc Int
n Orc a
p = do
  MVar (Maybe a)
vals <- forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  MVar ()
end <- forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  forall a. Int -> MVar (Maybe a) -> MVar () -> Orc a
echo Int
n MVar (Maybe a)
vals MVar ()
end forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> forall a b. Orc a -> Orc b
silent (forall a. Orc a -> MVar (Maybe a) -> MVar () -> Orc ()
sandbox Orc a
p MVar (Maybe a)
vals MVar ()
end)

-- | Drops the first @n@ results of the computation @p@ and then returns the
-- rest of the results.
dropOrc :: Int -> Orc a -> Orc a
dropOrc :: forall a. Int -> Orc a -> Orc a
dropOrc Int
n Orc a
p = do
  TVar Int
countdown <- forall (io :: * -> *) a. MonadIO io => a -> io (TVar a)
newTVar Int
n
  a
x <- Orc a
p
  forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall a b. (a -> b) -> a -> b
$ forall (io :: * -> *) a. MonadIO io => STM a -> io a
atomically forall a b. (a -> b) -> a -> b
$ do
    Int
w <- forall a. TVar a -> STM a
readTVarSTM TVar Int
countdown
    if Int
w forall a. Eq a => a -> a -> Bool
== Int
0
      then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return a
x
      else do
        forall a. TVar a -> a -> STM ()
writeTVarSTM TVar Int
countdown (Int
wforall a. Num a => a -> a -> a
-Int
1)
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Orc a
stop

-- | Zips the results of two computations @p@ and @q@. When one computation
-- finishes, kill the other.
zipOrc :: Orc a -> Orc b -> Orc (a, b)
zipOrc :: forall a b. Orc a -> Orc b -> Orc (a, b)
zipOrc Orc a
p Orc b
q = do
  MVar (Maybe a)
pvals <- forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  MVar (Maybe b)
qvals <- forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  MVar ()
end   <- forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  forall a b.
MVar (Maybe a) -> MVar (Maybe b) -> MVar () -> Orc (a, b)
zipp MVar (Maybe a)
pvals MVar (Maybe b)
qvals MVar ()
end
    forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> forall a b. Orc a -> Orc b
silent (forall a. Orc a -> MVar (Maybe a) -> MVar () -> Orc ()
sandbox Orc a
p MVar (Maybe a)
pvals MVar ()
end)
    forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> forall a b. Orc a -> Orc b
silent (forall a. Orc a -> MVar (Maybe a) -> MVar () -> Orc ()
sandbox Orc b
q MVar (Maybe b)
qvals MVar ()
end)


-- | Like 'echo', repeatedly reads values from the @pvals@ and @qvals@ 'MVar',
-- returning tuples of the values until one 'MVar' is exhausted.
-- When there are no more values to be returned, fills the @end@ MVar.
zipp :: MVar (Maybe a) -> MVar (Maybe b) -> MVar () -> Orc (a, b)
zipp :: forall a b.
MVar (Maybe a) -> MVar (Maybe b) -> MVar () -> Orc (a, b)
zipp MVar (Maybe a)
pvals MVar (Maybe b)
qvals MVar ()
end = do
  Maybe a
mx <- forall (io :: * -> *) a. MonadIO io => MVar a -> io a
takeMVar MVar (Maybe a)
pvals
  Maybe b
my <- forall (io :: * -> *) a. MonadIO io => MVar a -> io a
takeMVar MVar (Maybe b)
qvals
  case Maybe a
mx of
    Maybe a
Nothing -> forall a b. Orc a -> Orc b
silent (forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar ()
end () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar ()
end ())
    Just a
x -> case Maybe b
my of
      Maybe b
Nothing -> forall a b. Orc a -> Orc b
silent (forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar ()
end () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar ()
end ())
      Just b
y -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
x, b
y) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> forall a b.
MVar (Maybe a) -> MVar (Maybe b) -> MVar () -> Orc (a, b)
zipp MVar (Maybe a)
pvals MVar (Maybe b)
qvals MVar ()
end

-- | Collects all of the values of the computation @p@ and delivers them as a
-- list when @p@ is completed.
collect :: Orc a -> Orc [a]
collect :: forall a. Orc a -> Orc [a]
collect Orc a
p = do
  TVar [a]
accum <- forall (io :: * -> *) a. MonadIO io => a -> io (TVar a)
newTVar []
  forall a b. Orc a -> Orc b
silent (Orc a
p forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
v -> forall (io :: * -> *) a.
MonadIO io =>
TVar a -> (a -> a) -> io (a, a)
modifyTVar TVar [a]
accum (a
vforall a. a -> [a] -> [a]
:)) forall a. Orc a -> Orc a -> Orc a
<+> forall (io :: * -> *) a. MonadIO io => TVar a -> io a
readTVar TVar [a]
accum

-- | Runs a computation @p@ and writes its results to the channel @ch@.
runChan :: Chan a -> Orc a -> IO ()
runChan :: forall a. Chan a -> Orc a -> IO ()
runChan Chan a
ch Orc a
p = forall a. Orc a -> IO ()
runOrc forall a b. (a -> b) -> a -> b
$ (Orc a
p forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (io :: * -> *) a. MonadIO io => Chan a -> a -> io ()
writeChan Chan a
ch)

-- | Runs a list of Orc computations @ps@ in parallel until they produce their
-- first result, and returns a list of all these results.
syncList :: [Orc a] -> CPS () HIO [a]
syncList :: forall a. [Orc a] -> CPS () HIO [a]
syncList [Orc a]
ps = forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence (forall a b. (a -> b) -> [a] -> [b]
map forall a. Orc a -> Orc (Orc a)
eagerly [Orc a]
ps) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence

-- | Analogous to the list scan function.
-- The order in which the combining function is applied to the results produced
-- by @p@ is nondeterministic.
scan :: (a -> s -> s) -> s -> Orc a -> Orc s
scan :: forall a s. (a -> s -> s) -> s -> Orc a -> Orc s
scan a -> s -> s
f s
s Orc a
p = do
  TVar s
accum <- forall (io :: * -> *) a. MonadIO io => a -> io (TVar a)
newTVar s
s
  a
x <- Orc a
p
  (s
_w, s
w') <- forall (io :: * -> *) a.
MonadIO io =>
TVar a -> (a -> a) -> io (a, a)
modifyTVar TVar s
accum (a -> s -> s
f a
x)
  forall (m :: * -> *) a. Monad m => a -> m a
return s
w'

liftList :: (MonadPlus list) => [a] -> list a
liftList :: forall (list :: * -> *) a. MonadPlus list => [a] -> list a
liftList [a]
ps = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall (m :: * -> *) a. MonadPlus m => m a -> m a -> m a
mplus forall (m :: * -> *) a. MonadPlus m => m a
mzero forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall (m :: * -> *) a. Monad m => a -> m a
return [a]
ps