{-|
Module : Orc
Description : Distributed computation Orchestration language
Maintainer : gatlin@niltag.net

Defines a simple but expressive EDSL for orchestrating parallel and concurrent
computations. Based on the Orc language from UT Austin.

Re-implementation of a library from Galois. The 'Orc' type has been re-defined
in terms of a delimited continuation monad.
-}

{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

module Orc
  ( -- * A language for distributed Orchestration
    Orc
  , runOrc
  , collect
    -- * Combinators
  , par
  , (<|>)
  , stop
  , signal
  , (<+>)
  , (<?>)
  , cut
  , val
  , eagerly
  , putStrLine
  , echo
  , onlyUntil
  , butAfter
  , notBefore
  , delay
  , publish
  , repeating
  , sync
  -- * List-like utilities
  , takeOrc
  , dropOrc
  , zipOrc
  , liftList
  , syncList
  , runChan
  -- * Re-exports & convenience
  , printOrc
  , prompt
  , (#)
  , shift
  , reset
  )
where

import HIO
import CPS
import Control.Monad
import Control.Concurrent.MonadIO
import Control.Applicative
import Control.DeepSeq (NFData(..), deepseq)
import Control.Concurrent.STM.MonadIO
import qualified Control.Concurrent.StdInOut as S

import Data.Profunctor
import System.IO.Unsafe

-- | A monad for orchestrating distributed computations via 'HIO'.
type Orc a = CPS () HIO a

instance MonadIO (CPS () HIO) where
  liftIO :: IO a -> CPS () HIO a
liftIO io :: IO a
io = ((a -> HIO ()) -> HIO ()) -> CPS () HIO a
forall k (result :: k) (m :: k -> *) answer.
((answer -> m result) -> m result) -> CPS result m answer
CPS (IO a -> HIO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO a
io HIO a -> (a -> HIO ()) -> HIO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=)


-- | Runs an Orc computation, discarding the (many) results of the computation.
-- See @collect@ on a mechanism for collecting the results of a computation
-- into a list.
runOrc :: Orc a -> IO ()
runOrc :: Orc a -> IO ()
runOrc p :: Orc a
p = HIO () -> IO ()
forall b. HIO b -> IO ()
runHIO (Orc a
p Orc a -> (a -> HIO ()) -> HIO ()
forall k (result :: k) (m :: k -> *) answer.
CPS result m answer -> (answer -> m result) -> m result
# \_ -> () -> HIO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())

-- | Terminates an 'Orc' computation.
stop :: Orc a
stop :: Orc a
stop = ((a -> HIO ()) -> HIO ()) -> Orc a
forall k (result :: k) (m :: k -> *) answer.
((answer -> m result) -> m result) -> CPS result m answer
CPS (((a -> HIO ()) -> HIO ()) -> Orc a)
-> ((a -> HIO ()) -> HIO ()) -> Orc a
forall a b. (a -> b) -> a -> b
$ \_ -> () -> HIO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Alternate phrasing of @return ()@, which can be placed at the end of an
-- Orc computation to signal that it has no more values to produce.
signal :: Orc ()
signal :: Orc ()
signal = () -> Orc ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Parallel choice operator that performs the actions of @p@ and @q@ and
-- returns their results as they become available. Also written as @<|>@.
-- There is no left-right bias: the ordering between @p@ and @q@ is
-- unspecified.
par :: Orc a -> Orc a -> Orc a
par :: Orc a -> Orc a -> Orc a
par = Orc a -> Orc a -> Orc a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
(<|>)

-- | Immediately fires up a thread for @p@, and then returns a handle to the
-- first result of that thread which is also of type @Orc a@.
-- An invocation of @eagerly@ is non-blocking, while an invocation of the
-- resulting handle is blocking.
eagerly :: Orc a -> Orc (Orc a)
eagerly :: Orc a -> Orc (Orc a)
eagerly p :: Orc a
p = ((Orc a -> HIO ()) -> HIO ()) -> Orc (Orc a)
forall k (result :: k) (m :: k -> *) answer.
((answer -> m result) -> m result) -> CPS result m answer
CPS (((Orc a -> HIO ()) -> HIO ()) -> Orc (Orc a))
-> ((Orc a -> HIO ()) -> HIO ()) -> Orc (Orc a)
forall a b. (a -> b) -> a -> b
$ \k :: Orc a -> HIO ()
k -> do
  MVar a
res <- HIO (MVar a)
forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  Group
w <- HIO Group
newGroup
  ThreadId
threadId <- HIO () -> HIO ThreadId
forall (io :: * -> *). HasFork io => io () -> io ThreadId
fork (HIO () -> HIO ThreadId) -> HIO () -> HIO ThreadId
forall a b. (a -> b) -> a -> b
$ Orc a
p Orc a -> (MVar a, Group) -> HIO ()
forall (m :: * -> *) a.
MonadIO m =>
CPS () m a -> (MVar a, Group) -> m ()
`saveOnce` (MVar a
res, Group
w)
  ThreadId
_ <- Group -> HIO ThreadId -> HIO ThreadId
forall a. Group -> HIO a -> HIO a
local Group
w (HIO ThreadId -> HIO ThreadId) -> HIO ThreadId -> HIO ThreadId
forall a b. (a -> b) -> a -> b
$ ThreadId -> HIO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
threadId
  Orc a -> HIO ()
k (IO a -> Orc a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> Orc a) -> IO a -> Orc a
forall a b. (a -> b) -> a -> b
$ MVar a -> IO a
forall (io :: * -> *) a. MonadIO io => MVar a -> io a
readMVar MVar a
res)

-- | Cut executes an orc expression, waits for the first result, and then
-- suppresses the rest, including killing any threads involved in computing the
-- remainder.
cut :: Orc a -> Orc a
cut :: Orc a -> Orc a
cut = CPS () HIO (Orc a) -> Orc a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (CPS () HIO (Orc a) -> Orc a)
-> (Orc a -> CPS () HIO (Orc a)) -> Orc a -> Orc a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Orc a -> CPS () HIO (Orc a)
forall a. Orc a -> Orc (Orc a)
eagerly

-- | Convenience function to print to stdout in an 'Orc' computation.
putStrLine :: String -> Orc ()
putStrLine :: String -> Orc ()
putStrLine str :: String
str = IO () -> Orc ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Orc ()) -> IO () -> Orc ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall (io :: * -> *). MonadIO io => String -> io ()
S.putStrLine String
str

-- | Provided that the result type can be 'Show'n, this prints the result of an
-- 'Orc' computation.
printOrc :: Show a => Orc a -> IO ()
printOrc :: Orc a -> IO ()
printOrc p :: Orc a
p = IO () -> IO ()
forall a. IO a -> IO a
S.setupStdInOut (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Orc () -> IO ()
forall a. Orc a -> IO ()
runOrc (Orc () -> IO ()) -> Orc () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
  a
a <- Orc a
p
  String -> Orc ()
putStrLine ("Ans = " String -> String -> String
forall a. [a] -> [a] -> [a]
++ a -> String
forall a. Show a => a -> String
show a
a)

-- | Biased choice operator ("and-then") that performs the action
-- (and returns all the results) of p first, and then once done performs
-- the actions of q.
(<+>) :: Orc a -> Orc a -> Orc a
p :: Orc a
p <+> :: Orc a -> Orc a -> Orc a
<+> q :: Orc a
q = ((a -> HIO ()) -> HIO ()) -> Orc a
forall k (result :: k) (m :: k -> *) answer.
((answer -> m result) -> m result) -> CPS result m answer
CPS (((a -> HIO ()) -> HIO ()) -> Orc a)
-> ((a -> HIO ()) -> HIO ()) -> Orc a
forall a b. (a -> b) -> a -> b
$ \k :: a -> HIO ()
k -> do
  Group
w <- HIO Group
newGroup
  ThreadId
threadId <- HIO () -> HIO ThreadId
forall (io :: * -> *). HasFork io => io () -> io ThreadId
fork (Orc a
p Orc a -> (a -> HIO ()) -> HIO ()
forall k (result :: k) (m :: k -> *) answer.
CPS result m answer -> (answer -> m result) -> m result
# a -> HIO ()
k)
  ThreadId
_ <- Group -> HIO ThreadId -> HIO ThreadId
forall a. Group -> HIO a -> HIO a
local Group
w (HIO ThreadId -> HIO ThreadId) -> HIO ThreadId -> HIO ThreadId
forall a b. (a -> b) -> a -> b
$ ThreadId -> HIO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
threadId
  Group -> HIO ()
finished Group
w
  Orc a
q Orc a -> (a -> HIO ()) -> HIO ()
forall k (result :: k) (m :: k -> *) answer.
CPS result m answer -> (answer -> m result) -> m result
# a -> HIO ()
k

-- | A variant of '<+>' ("or-else") which performs and returns the results of
-- @p@, and if @p@ produced no answers continues to perform and return the
-- results of @q@.
(<?>) :: Orc a -> Orc a -> Orc a
p :: Orc a
p <?> :: Orc a -> Orc a -> Orc a
<?> q :: Orc a
q = do
  MVar ()
tripwire <- CPS () HIO (MVar ())
forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  do a
x <- Orc a
p
     Bool
_ <- MVar () -> () -> CPS () HIO Bool
forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io Bool
tryPutMVar MVar ()
tripwire ()
     a -> Orc a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
   Orc a -> Orc a -> Orc a
forall a. Orc a -> Orc a -> Orc a
<+>
   do Maybe ()
triggered <- MVar () -> CPS () HIO (Maybe ())
forall (io :: * -> *) a. MonadIO io => MVar a -> io (Maybe a)
tryTakeMVar MVar ()
tripwire
      case Maybe ()
triggered of
        Nothing -> Orc a
q
        Just _  -> Orc a
forall a. Orc a
stop

p :: CPS () m a
p saveOnce :: CPS () m a -> (MVar a, Group) -> m ()
`saveOnce` (r :: MVar a
r,w :: Group
w) = do
  MVar ()
ticket <- () -> m (MVar ())
forall (io :: * -> *) a. MonadIO io => a -> io (MVar a)
newMVar ()
  CPS () m a
p CPS () m a -> (a -> m ()) -> m ()
forall k (result :: k) (m :: k -> *) answer.
CPS result m answer -> (answer -> m result) -> m result
# \x :: a
x -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ (MVar () -> IO ()
forall (io :: * -> *) a. MonadIO io => MVar a -> io a
takeMVar MVar ()
ticket IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar a -> a -> IO ()
forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar a
r a
x IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Group -> IO ()
close Group
w)

-- | Repeatedly reads values from the @vals@ 'MVar' until @j@ values have been
-- read or the @vals@ 'MVar' is exhausted (a 'Nothing' is passed). When there
-- are no more values to be returned, fills the @end@ 'MVar'.
echo :: Int -> MVar (Maybe a) -> MVar () -> Orc a
echo :: Int -> MVar (Maybe a) -> MVar () -> Orc a
echo 0 _ end :: MVar ()
end = Orc () -> Orc a
forall a b. Orc a -> Orc b
silent (MVar () -> () -> Orc ()
forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar ()
end ())
echo j :: Int
j vals :: MVar (Maybe a)
vals end :: MVar ()
end = do
  Maybe a
mx <- MVar (Maybe a) -> CPS () HIO (Maybe a)
forall (io :: * -> *) a. MonadIO io => MVar a -> io a
takeMVar MVar (Maybe a)
vals
  case Maybe a
mx of
    Nothing -> Orc () -> Orc a
forall a b. Orc a -> Orc b
silent (MVar () -> () -> Orc ()
forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar ()
end ())
    Just x :: a
x -> a -> Orc a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x Orc a -> Orc a -> Orc a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Int -> MVar (Maybe a) -> MVar () -> Orc a
forall a. Int -> MVar (Maybe a) -> MVar () -> Orc a
echo (Int
jInt -> Int -> Int
forall a. Num a => a -> a -> a
-1) MVar (Maybe a)
vals MVar ()
end

-- | Executes the computation @p@ but suppresses its results.
silent :: Orc a -> Orc b
silent :: Orc a -> Orc b
silent p :: Orc a
p  = Orc a
p Orc a -> Orc b -> Orc b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Orc b
forall a. Orc a
stop

-- | Executes the computation @p@ and @done@. Once @done@ returns its first
-- result, kill both computations and return that result.
-- This discards the results of @p@.
onlyUntil :: Orc a -> Orc b -> Orc b
p :: Orc a
p onlyUntil :: Orc a -> Orc b -> Orc b
`onlyUntil` done :: Orc b
done = Orc b -> Orc b
forall a. Orc a -> Orc a
cut (Orc a -> Orc b
forall a b. Orc a -> Orc b
silent Orc a
p Orc b -> Orc b -> Orc b
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Orc b
done)

-- | Immediately executes the computation @p@, but if it hasn't returned a
-- result in @t@ seconds, executes the computation @q@ and returns whichever
-- computations returns a result first (kill the other thread).
butAfter :: (RealFrac n, Show n) => Orc a -> (n, Orc a) -> Orc a
p :: Orc a
p butAfter :: Orc a -> (n, Orc a) -> Orc a
`butAfter` (t :: n
t,def :: Orc a
def) = Orc a -> Orc a
forall a. Orc a -> Orc a
cut (Orc a
p Orc a -> Orc a -> Orc a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (n -> Orc ()
forall a. (RealFrac a, Show a) => a -> Orc ()
delay n
t Orc () -> Orc a -> Orc a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Orc a
def))

-- | Runs the computation @p@ and returns its first result, but doesn't return
-- before @w@ seconds have elapsed.
notBefore :: Orc a -> Float -> Orc a
p :: Orc a
p notBefore :: Orc a -> Float -> Orc a
`notBefore` w :: Float
w = (a -> () -> a) -> Orc a -> Orc () -> Orc a
forall a b c. (a -> b -> c) -> Orc a -> Orc b -> Orc c
sync a -> () -> a
forall a b. a -> b -> a
const Orc a
p (Float -> Orc ()
forall a. (RealFrac a, Show a) => a -> Orc ()
delay Float
w)

-- | Wait for a period of @w@ seconds before continuing.
delay :: (RealFrac a, Show a) => a -> Orc ()
delay :: a -> Orc ()
delay w :: a
w = (IO () -> Orc ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Orc ()) -> IO () -> Orc ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (io :: * -> *). HasFork io => Int -> io ()
threadDelay (a -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (a
w a -> a -> a
forall a. Num a => a -> a -> a
* 1000000)))
       Orc () -> Orc () -> Orc ()
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (Orc () -> Orc ()
forall a b. Orc a -> Orc b
silent (Orc () -> Orc ()) -> Orc () -> Orc ()
forall a b. (a -> b) -> a -> b
$ do
             Bool -> Orc ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (a
w a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> 100)
             String -> Orc ()
putStrLine ("Just checking you meant to wait " String -> String -> String
forall a. [a] -> [a] -> [a]
++ a -> String
forall a. Show a => a -> String
show a
w String -> String -> String
forall a. [a] -> [a] -> [a]
++ " seconds"))

-- | Takes the first result of @p@, the first result of @q@, and applies them
-- to @f@.
-- The computations for @p@ and @q@ are run in parallel.
sync :: (a -> b -> c) -> Orc a -> Orc b -> Orc c
sync :: (a -> b -> c) -> Orc a -> Orc b -> Orc c
sync f :: a -> b -> c
f p :: Orc a
p q :: Orc b
q = do
  Orc a
po <- Orc a -> Orc (Orc a)
forall a. Orc a -> Orc (Orc a)
eagerly Orc a
p
  Orc b
qo <- Orc b -> Orc (Orc b)
forall a. Orc a -> Orc (Orc a)
eagerly Orc b
q
  (a -> b -> c) -> CPS () HIO (a -> b -> c)
forall (f :: * -> *) a. Applicative f => a -> f a
pure a -> b -> c
f CPS () HIO (a -> b -> c) -> Orc a -> CPS () HIO (b -> c)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Orc a
po CPS () HIO (b -> c) -> Orc b -> Orc c
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Orc b
qo

-- | An alternate mechanism for 'eagerly', it fires up a thread for @p@
-- and returns a lazy thunk that contains the single (trimmed) result
-- of the computation.  Be careful to use this function with 'publish'
-- when these lazy values need to be fully evaluated before proceeding
-- further.
val :: Orc a -> Orc a
val :: Orc a -> Orc a
val p :: Orc a
p = ((a -> HIO ()) -> HIO ()) -> Orc a
forall k (result :: k) (m :: k -> *) answer.
((answer -> m result) -> m result) -> CPS result m answer
CPS (((a -> HIO ()) -> HIO ()) -> Orc a)
-> ((a -> HIO ()) -> HIO ()) -> Orc a
forall a b. (a -> b) -> a -> b
$ \k :: a -> HIO ()
k -> do
  MVar a
res <- HIO (MVar a)
forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  Group
w <- HIO Group
newGroup
  ThreadId
threadId <- HIO () -> HIO ThreadId
forall (io :: * -> *). HasFork io => io () -> io ThreadId
fork (HIO () -> HIO ThreadId) -> HIO () -> HIO ThreadId
forall a b. (a -> b) -> a -> b
$ Orc a
p Orc a -> (MVar a, Group) -> HIO ()
forall (m :: * -> *) a.
MonadIO m =>
CPS () m a -> (MVar a, Group) -> m ()
`saveOnce` (MVar a
res, Group
w)
  ThreadId
_ <- Group -> HIO ThreadId -> HIO ThreadId
forall a. Group -> HIO a -> HIO a
local Group
w (HIO ThreadId -> HIO ThreadId) -> HIO ThreadId -> HIO ThreadId
forall a b. (a -> b) -> a -> b
$ ThreadId -> HIO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
threadId
  a -> HIO ()
k (IO a -> a
forall a. IO a -> a
unsafePerformIO (IO a -> a) -> IO a -> a
forall a b. (a -> b) -> a -> b
$ MVar a -> IO a
forall (io :: * -> *) a. MonadIO io => MVar a -> io a
readMVar MVar a
res)

-- | Convenience to solicit user input from stdin, via stdout.
prompt :: String -> Orc String
prompt :: String -> Orc String
prompt str :: String
str = IO String -> Orc String
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO String -> Orc String) -> IO String -> Orc String
forall a b. (a -> b) -> a -> b
$ String -> IO String
forall (io :: * -> *). HasFork io => String -> io String
S.prompt String
str

-- | Publish is a hyperstrict form of 'return'.
-- It is usfeul for combining results from multiple 'val' computations,
-- providing a synchronization point.
publish :: NFData a => a -> Orc a
publish :: a -> Orc a
publish x :: a
x = a -> Orc a -> Orc a
forall a b. NFData a => a -> b -> b
deepseq a
x (Orc a -> Orc a) -> Orc a -> Orc a
forall a b. (a -> b) -> a -> b
$ a -> Orc a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Repeatedly executes the computation @p@ and returns its results.
-- 'repeating' works best when @p@ is single-valued:
-- if @p@ is multi-valued Orc will spawn a repeating thread for every result
-- returned, resulting in an exponential blow-up of threads.
-- NB: this behavior may not be intentional
repeating :: Orc a -> Orc a
repeating :: Orc a -> Orc a
repeating p :: Orc a
p = do
  a
x <- Orc a
p
  a -> Orc a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x Orc a -> Orc a -> Orc a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Orc a -> Orc a
forall a. Orc a -> Orc a
repeating Orc a
p

-- | Runs the computation @p@ and repeatedly puts its results (tagged with
-- 'Just' into the @vals@ 'MVar'. Puts 'Nothing' if there are no results left.
-- Stops executing when the @end@ MVar is filled.
sandbox :: Orc a -> MVar (Maybe a) -> MVar () -> Orc ()
sandbox :: Orc a -> MVar (Maybe a) -> MVar () -> Orc ()
sandbox p :: Orc a
p vals :: MVar (Maybe a)
vals end :: MVar ()
end =
  ((Orc a
p Orc a -> (a -> Orc ()) -> Orc ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MVar (Maybe a) -> Maybe a -> Orc ()
forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar (Maybe a)
vals (Maybe a -> Orc ()) -> (a -> Maybe a) -> a -> Orc ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just)) Orc () -> Orc () -> Orc ()
forall a. Orc a -> Orc a -> Orc a
<+> MVar (Maybe a) -> Maybe a -> Orc ()
forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar (Maybe a)
vals Maybe a
forall a. Maybe a
Nothing)
  Orc () -> Orc () -> Orc ()
forall a b. Orc a -> Orc b -> Orc b
`onlyUntil` MVar () -> Orc ()
forall (io :: * -> *) a. MonadIO io => MVar a -> io a
takeMVar MVar ()
end

-- | Runs the computation @p@ and returns the first @n@ results.
takeOrc :: Int -> Orc a -> Orc a
takeOrc :: Int -> Orc a -> Orc a
takeOrc n :: Int
n p :: Orc a
p = do
  MVar (Maybe a)
vals <- CPS () HIO (MVar (Maybe a))
forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  MVar ()
end <- CPS () HIO (MVar ())
forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  Int -> MVar (Maybe a) -> MVar () -> Orc a
forall a. Int -> MVar (Maybe a) -> MVar () -> Orc a
echo Int
n MVar (Maybe a)
vals MVar ()
end Orc a -> Orc a -> Orc a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Orc () -> Orc a
forall a b. Orc a -> Orc b
silent (Orc a -> MVar (Maybe a) -> MVar () -> Orc ()
forall a. Orc a -> MVar (Maybe a) -> MVar () -> Orc ()
sandbox Orc a
p MVar (Maybe a)
vals MVar ()
end)

-- | Drops the first @n@ results of the computation @p@ and then returns the
-- rest of the results.
dropOrc :: Int -> Orc a -> Orc a
dropOrc :: Int -> Orc a -> Orc a
dropOrc n :: Int
n p :: Orc a
p = do
  TVar Int
countdown <- Int -> CPS () HIO (TVar Int)
forall (io :: * -> *) a. MonadIO io => a -> io (TVar a)
newTVar Int
n
  a
x <- Orc a
p
  CPS () HIO (Orc a) -> Orc a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (CPS () HIO (Orc a) -> Orc a) -> CPS () HIO (Orc a) -> Orc a
forall a b. (a -> b) -> a -> b
$ STM (Orc a) -> CPS () HIO (Orc a)
forall (io :: * -> *) a. MonadIO io => STM a -> io a
atomically (STM (Orc a) -> CPS () HIO (Orc a))
-> STM (Orc a) -> CPS () HIO (Orc a)
forall a b. (a -> b) -> a -> b
$ do
    Int
w <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVarSTM TVar Int
countdown
    if Int
w Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 0
      then Orc a -> STM (Orc a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Orc a -> STM (Orc a)) -> Orc a -> STM (Orc a)
forall a b. (a -> b) -> a -> b
$ a -> Orc a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
      else do
        TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVarSTM TVar Int
countdown (Int
wInt -> Int -> Int
forall a. Num a => a -> a -> a
-1)
        Orc a -> STM (Orc a)
forall (m :: * -> *) a. Monad m => a -> m a
return Orc a
forall a. Orc a
stop

-- | Zips the results of two computations @p@ and @q@. When one computation
-- finishes, kill the other.
zipOrc :: Orc a -> Orc b -> Orc (a, b)
zipOrc :: Orc a -> Orc b -> Orc (a, b)
zipOrc p :: Orc a
p q :: Orc b
q = do
  MVar (Maybe a)
pvals <- CPS () HIO (MVar (Maybe a))
forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  MVar (Maybe b)
qvals <- CPS () HIO (MVar (Maybe b))
forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  MVar ()
end   <- CPS () HIO (MVar ())
forall (io :: * -> *) a. MonadIO io => io (MVar a)
newEmptyMVar
  MVar (Maybe a) -> MVar (Maybe b) -> MVar () -> Orc (a, b)
forall a b.
MVar (Maybe a) -> MVar (Maybe b) -> MVar () -> Orc (a, b)
zipp MVar (Maybe a)
pvals MVar (Maybe b)
qvals MVar ()
end
    Orc (a, b) -> Orc (a, b) -> Orc (a, b)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Orc () -> Orc (a, b)
forall a b. Orc a -> Orc b
silent (Orc a -> MVar (Maybe a) -> MVar () -> Orc ()
forall a. Orc a -> MVar (Maybe a) -> MVar () -> Orc ()
sandbox Orc a
p MVar (Maybe a)
pvals MVar ()
end)
    Orc (a, b) -> Orc (a, b) -> Orc (a, b)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Orc () -> Orc (a, b)
forall a b. Orc a -> Orc b
silent (Orc b -> MVar (Maybe b) -> MVar () -> Orc ()
forall a. Orc a -> MVar (Maybe a) -> MVar () -> Orc ()
sandbox Orc b
q MVar (Maybe b)
qvals MVar ()
end)


-- | Like 'echo', repeatedly reads values from the @pvals@ and @qvals@ 'MVar',
-- returning tuples of the values until one 'MVar' is exhausted.
-- When there are no more values to be returned, fills the @end@ MVar.
zipp :: MVar (Maybe a) -> MVar (Maybe b) -> MVar () -> Orc (a, b)
zipp :: MVar (Maybe a) -> MVar (Maybe b) -> MVar () -> Orc (a, b)
zipp pvals :: MVar (Maybe a)
pvals qvals :: MVar (Maybe b)
qvals end :: MVar ()
end = do
  Maybe a
mx <- MVar (Maybe a) -> CPS () HIO (Maybe a)
forall (io :: * -> *) a. MonadIO io => MVar a -> io a
takeMVar MVar (Maybe a)
pvals
  Maybe b
my <- MVar (Maybe b) -> CPS () HIO (Maybe b)
forall (io :: * -> *) a. MonadIO io => MVar a -> io a
takeMVar MVar (Maybe b)
qvals
  case Maybe a
mx of
    Nothing -> Orc () -> Orc (a, b)
forall a b. Orc a -> Orc b
silent (MVar () -> () -> Orc ()
forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar ()
end () Orc () -> Orc () -> Orc ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> () -> Orc ()
forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar ()
end ())
    Just x :: a
x -> case Maybe b
my of
      Nothing -> Orc () -> Orc (a, b)
forall a b. Orc a -> Orc b
silent (MVar () -> () -> Orc ()
forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar ()
end () Orc () -> Orc () -> Orc ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> () -> Orc ()
forall (io :: * -> *) a. MonadIO io => MVar a -> a -> io ()
putMVar MVar ()
end ())
      Just y :: b
y -> (a, b) -> Orc (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
x, b
y) Orc (a, b) -> Orc (a, b) -> Orc (a, b)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> MVar (Maybe a) -> MVar (Maybe b) -> MVar () -> Orc (a, b)
forall a b.
MVar (Maybe a) -> MVar (Maybe b) -> MVar () -> Orc (a, b)
zipp MVar (Maybe a)
pvals MVar (Maybe b)
qvals MVar ()
end

-- | Collects all of the values of the computation @p@ and delivers them as a
-- list when @p@ is completed.
collect :: Orc a -> Orc [a]
collect :: Orc a -> Orc [a]
collect p :: Orc a
p = do
  TVar [a]
accum <- [a] -> CPS () HIO (TVar [a])
forall (io :: * -> *) a. MonadIO io => a -> io (TVar a)
newTVar []
  Orc ([a], [a]) -> Orc [a]
forall a b. Orc a -> Orc b
silent (Orc a
p Orc a -> (a -> Orc ([a], [a])) -> Orc ([a], [a])
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \v :: a
v -> TVar [a] -> ([a] -> [a]) -> Orc ([a], [a])
forall (io :: * -> *) a.
MonadIO io =>
TVar a -> (a -> a) -> io (a, a)
modifyTVar TVar [a]
accum (a
va -> [a] -> [a]
forall a. a -> [a] -> [a]
:)) Orc [a] -> Orc [a] -> Orc [a]
forall a. Orc a -> Orc a -> Orc a
<+> TVar [a] -> Orc [a]
forall (io :: * -> *) a. MonadIO io => TVar a -> io a
readTVar TVar [a]
accum

-- | Runs a computation @p@ and writes its results to the channel @ch@.
runChan :: Chan a -> Orc a -> IO ()
runChan :: Chan a -> Orc a -> IO ()
runChan ch :: Chan a
ch p :: Orc a
p = Orc () -> IO ()
forall a. Orc a -> IO ()
runOrc (Orc () -> IO ()) -> Orc () -> IO ()
forall a b. (a -> b) -> a -> b
$ (Orc a
p Orc a -> (a -> Orc ()) -> Orc ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Chan a -> a -> Orc ()
forall (io :: * -> *) a. MonadIO io => Chan a -> a -> io ()
writeChan Chan a
ch)

-- | Runs a list of Orc computations @ps@ in parallel until they produce their
-- first result, and returns a list of all these results.
syncList :: [Orc a] -> CPS () HIO [a]
syncList :: [Orc a] -> CPS () HIO [a]
syncList ps :: [Orc a]
ps = [CPS () HIO (Orc a)] -> CPS () HIO [Orc a]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence ((Orc a -> CPS () HIO (Orc a)) -> [Orc a] -> [CPS () HIO (Orc a)]
forall a b. (a -> b) -> [a] -> [b]
map Orc a -> CPS () HIO (Orc a)
forall a. Orc a -> Orc (Orc a)
eagerly [Orc a]
ps) CPS () HIO [Orc a] -> ([Orc a] -> CPS () HIO [a]) -> CPS () HIO [a]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [Orc a] -> CPS () HIO [a]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence

-- | Analogous to the list scan function.
-- The order in which the combining function is applied to the results produced
-- by @p@ is nondeterministic.
scan :: (a -> s -> s) -> s -> Orc a -> Orc s
scan :: (a -> s -> s) -> s -> Orc a -> Orc s
scan f :: a -> s -> s
f s :: s
s p :: Orc a
p = do
  TVar s
accum <- s -> CPS () HIO (TVar s)
forall (io :: * -> *) a. MonadIO io => a -> io (TVar a)
newTVar s
s
  a
x <- Orc a
p
  (_w :: s
_w, w' :: s
w') <- TVar s -> (s -> s) -> CPS () HIO (s, s)
forall (io :: * -> *) a.
MonadIO io =>
TVar a -> (a -> a) -> io (a, a)
modifyTVar TVar s
accum (a -> s -> s
f a
x)
  s -> Orc s
forall (m :: * -> *) a. Monad m => a -> m a
return s
w'

liftList :: (MonadPlus list) => [a] -> list a
liftList :: [a] -> list a
liftList ps :: [a]
ps = (list a -> list a -> list a) -> list a -> [list a] -> list a
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr list a -> list a -> list a
forall (m :: * -> *) a. MonadPlus m => m a -> m a -> m a
mplus list a
forall (m :: * -> *) a. MonadPlus m => m a
mzero ([list a] -> list a) -> [list a] -> list a
forall a b. (a -> b) -> a -> b
$ (a -> list a) -> [a] -> [list a]
forall a b. (a -> b) -> [a] -> [b]
map a -> list a
forall (m :: * -> *) a. Monad m => a -> m a
return [a]
ps