From 6e77b4e7d2f74da964fd95494dad1ee56d4c4536 Mon Sep 17 00:00:00 2001 From: "Laurent P. René de Cotret" Date: Sun, 30 May 2021 15:00:59 -0400 Subject: Async runtime with graph-based dependency cycle checks (#844) * Async runtime * Activate multi-threading in template repo * Style changes after feedback * Limiting the number of concurrent tasks * Revert "Limiting the number of concurrent tasks" This reverts commit 38984f6f5332632be8c4cab3e29d37e318492d70. --- lib/Hakyll/Core/Runtime.hs | 319 ++++++++++++++++++++++++++------------------- 1 file changed, 183 insertions(+), 136 deletions(-) (limited to 'lib') diff --git a/lib/Hakyll/Core/Runtime.hs b/lib/Hakyll/Core/Runtime.hs index 922b676..e0edf5b 100644 --- a/lib/Hakyll/Core/Runtime.hs +++ b/lib/Hakyll/Core/Runtime.hs @@ -5,19 +5,24 @@ module Hakyll.Core.Runtime -------------------------------------------------------------------------------- -import Control.Monad (unless) -import Control.Monad.Except (ExceptT, runExceptT, throwError) -import Control.Monad.Reader (ask) -import Control.Monad.RWS (RWST, runRWST) -import Control.Monad.State (get, modify) -import Control.Monad.Trans (liftIO) -import Data.List (intercalate) -import Data.Map (Map) -import qualified Data.Map as M -import Data.Set (Set) -import qualified Data.Set as S -import System.Exit (ExitCode (..)) -import System.FilePath (()) +import Control.Concurrent.Async.Lifted (forConcurrently_) +import Control.Concurrent.STM (atomically, modifyTVar', readTVarIO, newTVarIO, TVar) +import Control.Monad (unless) +import Control.Monad.Except (ExceptT, runExceptT, throwError) +import Control.Monad.Reader (ask) +import Control.Monad.RWS (RWST, runRWST) +import Control.Monad.State (get) +import Control.Monad.Trans (liftIO) +import qualified Data.Array as A +import Data.Graph (Graph) +import qualified Data.Graph as G +import Data.List (intercalate) +import Data.Map (Map) +import qualified Data.Map as M +import Data.Set (Set) +import qualified Data.Set as S +import System.Exit (ExitCode (..)) +import System.FilePath (()) -------------------------------------------------------------------------------- @@ -67,11 +72,13 @@ run config logger rules = do , runtimeRoutes = rulesRoutes ruleSet , runtimeUniverse = M.fromList compilers } - state = RuntimeState - { runtimeDone = S.empty - , runtimeSnapshots = S.empty - , runtimeTodo = M.empty - , runtimeFacts = oldFacts + + state <- newTVarIO $ RuntimeState + { runtimeDone = S.empty + , runtimeSnapshots = S.empty + , runtimeTodo = M.empty + , runtimeFacts = oldFacts + , runtimeDependencies = M.empty } -- Run the program and fetch the resulting state @@ -83,7 +90,8 @@ run config logger rules = do return (ExitFailure 1, ruleSet) Right (_, s, _) -> do - Store.set store factsKey $ runtimeFacts s + facts <- fmap runtimeFacts . liftIO . readTVarIO $ s + Store.set store factsKey facts Logger.debug logger "Removing tmp directory..." removeDirectory $ tmpDirectory config @@ -107,15 +115,30 @@ data RuntimeRead = RuntimeRead -------------------------------------------------------------------------------- data RuntimeState = RuntimeState - { runtimeDone :: Set Identifier - , runtimeSnapshots :: Set (Identifier, Snapshot) - , runtimeTodo :: Map Identifier (Compiler SomeItem) - , runtimeFacts :: DependencyFacts + { runtimeDone :: Set Identifier + , runtimeSnapshots :: Set (Identifier, Snapshot) + , runtimeTodo :: Map Identifier (Compiler SomeItem) + , runtimeFacts :: DependencyFacts + , runtimeDependencies :: Map Identifier (Set Identifier) } -------------------------------------------------------------------------------- -type Runtime a = RWST RuntimeRead () RuntimeState (ExceptT String IO) a +type Runtime a = RWST RuntimeRead () (TVar RuntimeState) (ExceptT String IO) a + + +-------------------------------------------------------------------------------- +-- Because compilation of rules often revolves around IO, +-- it is not possible to live in the STM monad and hence benefit from +-- its guarantees. +-- Be very careful when modifying the state +modifyRuntimeState :: (RuntimeState -> RuntimeState) -> Runtime () +modifyRuntimeState f = get >>= \s -> liftIO . atomically $ modifyTVar' s f + + +-------------------------------------------------------------------------------- +getRuntimeState :: Runtime RuntimeState +getRuntimeState = liftIO . readTVarIO =<< get -------------------------------------------------------------------------------- @@ -135,13 +158,15 @@ scheduleOutOfDate = do logger <- runtimeLogger <$> ask provider <- runtimeProvider <$> ask universe <- runtimeUniverse <$> ask - facts <- runtimeFacts <$> get - todo <- runtimeTodo <$> get let identifiers = M.keys universe modified = S.fromList $ flip filter identifiers $ resourceModified provider - + + state <- getRuntimeState + let facts = runtimeFacts state + todo = runtimeTodo state + let (ood, facts', msgs) = outOfDate identifiers modified facts todo' = M.filterWithKey (\id' _ -> id' `S.member` ood) universe @@ -150,7 +175,7 @@ scheduleOutOfDate = do mapM_ (Logger.debug logger) msgs -- Update facts and todo items - modify $ \s -> s + modifyRuntimeState $ \s -> s { runtimeDone = runtimeDone s `S.union` (S.fromList identifiers `S.difference` ood) , runtimeTodo = todo `M.union` todo' @@ -161,116 +186,138 @@ scheduleOutOfDate = do -------------------------------------------------------------------------------- pickAndChase :: Runtime () pickAndChase = do - todo <- runtimeTodo <$> get - case M.minViewWithKey todo of - Nothing -> return () - Just ((id', _), _) -> do - chase [] id' - pickAndChase + todo <- runtimeTodo <$> getRuntimeState + unless (null todo) $ do + checkForDependencyCycle + forConcurrently_ (M.keys todo) chase + pickAndChase -------------------------------------------------------------------------------- -chase :: [Identifier] -> Identifier -> Runtime () -chase trail id' - | id' `elem` trail = throwError $ "Hakyll.Core.Runtime.chase: " ++ - "Dependency cycle detected: " ++ intercalate " depends on " - (map show $ dropWhile (/= id') (reverse trail) ++ [id']) - | otherwise = do - logger <- runtimeLogger <$> ask - todo <- runtimeTodo <$> get - provider <- runtimeProvider <$> ask - universe <- runtimeUniverse <$> ask - routes <- runtimeRoutes <$> ask - store <- runtimeStore <$> ask - config <- runtimeConfiguration <$> ask - Logger.debug logger $ "Processing " ++ show id' - - let compiler = todo M.! id' - read' = CompilerRead - { compilerConfig = config - , compilerUnderlying = id' - , compilerProvider = provider - , compilerUniverse = M.keysSet universe - , compilerRoutes = routes - , compilerStore = store - , compilerLogger = logger +-- | Check for cyclic dependencies in the current state +checkForDependencyCycle :: Runtime () +checkForDependencyCycle = do + deps <- runtimeDependencies <$> getRuntimeState + let (depgraph, nodeFromVertex, _) = G.graphFromEdges [(k, k, S.toList dps) | (k, dps) <- M.toList deps] + dependencyCycles = map ((\(_, k, _) -> k) . nodeFromVertex) $ cycles depgraph + + unless (null dependencyCycles) $ do + throwError $ "Hakyll.Core.Runtime.pickAndChase: " ++ + "Dependency cycle detected: " ++ intercalate ", " (map show dependencyCycles) ++ + " are inter-dependent." + where + cycles :: Graph -> [G.Vertex] + cycles g = map fst . filter (uncurry $ reachableFromAny g) . A.assocs $ g + + reachableFromAny :: Graph -> G.Vertex -> [G.Vertex] -> Bool + reachableFromAny graph node = elem node . concatMap (G.reachable graph) + + +-------------------------------------------------------------------------------- +chase :: Identifier -> Runtime () +chase id' = do + logger <- runtimeLogger <$> ask + provider <- runtimeProvider <$> ask + universe <- runtimeUniverse <$> ask + routes <- runtimeRoutes <$> ask + store <- runtimeStore <$> ask + config <- runtimeConfiguration <$> ask + + state <- getRuntimeState + + Logger.debug logger $ "Processing " ++ show id' + + let compiler = (runtimeTodo state) M.! id' + read' = CompilerRead + { compilerConfig = config + , compilerUnderlying = id' + , compilerProvider = provider + , compilerUniverse = M.keysSet universe + , compilerRoutes = routes + , compilerStore = store + , compilerLogger = logger + } + + result <- liftIO $ runCompiler compiler read' + case result of + -- Rethrow error + CompilerError e -> throwError $ case compilerErrorMessages e of + [] -> "Compiler failed but no info given, try running with -v?" + es -> intercalate "; " es + + -- Signal that a snapshot was saved -> + CompilerSnapshot snapshot c -> do + -- Update info. The next 'chase' will pick us again at some + -- point so we can continue then. + modifyRuntimeState $ \s -> s + { runtimeSnapshots = S.insert (id', snapshot) (runtimeSnapshots s) + , runtimeTodo = M.insert id' c (runtimeTodo s) + } + + + -- Huge success + CompilerDone (SomeItem item) cwrite -> do + -- Print some info + let facts = compilerDependencies cwrite + cacheHits + | compilerCacheHits cwrite <= 0 = "updated" + | otherwise = "cached " + Logger.message logger $ cacheHits ++ " " ++ show id' + + -- Sanity check + unless (itemIdentifier item == id') $ throwError $ + "The compiler yielded an Item with Identifier " ++ + show (itemIdentifier item) ++ ", but we were expecting " ++ + "an Item with Identifier " ++ show id' ++ " " ++ + "(you probably want to call makeItem to solve this problem)" + + -- Write if necessary + (mroute, _) <- liftIO $ runRoutes routes provider id' + case mroute of + Nothing -> return () + Just route -> do + let path = destinationDirectory config route + liftIO $ makeDirectories path + liftIO $ write path item + Logger.debug logger $ "Routed to " ++ path + + -- Save! (For load) + liftIO $ save store item + + modifyRuntimeState $ \s -> s + { runtimeDone = S.insert id' (runtimeDone s) + , runtimeTodo = M.delete id' (runtimeTodo s) + , runtimeFacts = M.insert id' facts (runtimeFacts s) + } + + -- Try something else first + CompilerRequire dep c -> do + let (depId, depSnapshot) = dep + Logger.debug logger $ + "Compiler requirement found for: " ++ show id' ++ + ", requirement: " ++ show depId + + let done = runtimeDone state + snapshots = runtimeSnapshots state + deps = runtimeDependencies state + + -- Done if we either completed the entire item (runtimeDone) or + -- if we previously saved the snapshot (runtimeSnapshots). + let depDone = + depId `S.member` done || + (depId, depSnapshot) `S.member` snapshots + + let deps' = if depDone + then deps + else M.insertWith S.union id' (S.singleton depId) deps + + modifyRuntimeState $ \s -> s + { runtimeTodo = M.insert id' + (if depDone then c else compilerResult result) + (runtimeTodo s) + , runtimeDependencies = deps' } - result <- liftIO $ runCompiler compiler read' - case result of - -- Rethrow error - CompilerError e -> throwError $ case compilerErrorMessages e of - [] -> "Compiler failed but no info given, try running with -v?" - es -> intercalate "; " es - - -- Signal that a snapshot was saved -> - CompilerSnapshot snapshot c -> do - -- Update info. The next 'chase' will pick us again at some - -- point so we can continue then. - modify $ \s -> s - { runtimeSnapshots = - S.insert (id', snapshot) (runtimeSnapshots s) - , runtimeTodo = M.insert id' c (runtimeTodo s) - } - - -- Huge success - CompilerDone (SomeItem item) cwrite -> do - -- Print some info - let facts = compilerDependencies cwrite - cacheHits - | compilerCacheHits cwrite <= 0 = "updated" - | otherwise = "cached " - Logger.message logger $ cacheHits ++ " " ++ show id' - - -- Sanity check - unless (itemIdentifier item == id') $ throwError $ - "The compiler yielded an Item with Identifier " ++ - show (itemIdentifier item) ++ ", but we were expecting " ++ - "an Item with Identifier " ++ show id' ++ " " ++ - "(you probably want to call makeItem to solve this problem)" - - -- Write if necessary - (mroute, _) <- liftIO $ runRoutes routes provider id' - case mroute of - Nothing -> return () - Just route -> do - let path = destinationDirectory config route - liftIO $ makeDirectories path - liftIO $ write path item - Logger.debug logger $ "Routed to " ++ path - - -- Save! (For load) - liftIO $ save store item - - -- Update state - modify $ \s -> s - { runtimeDone = S.insert id' (runtimeDone s) - , runtimeTodo = M.delete id' (runtimeTodo s) - , runtimeFacts = M.insert id' facts (runtimeFacts s) - } - - -- Try something else first - CompilerRequire dep c -> do - -- Update the compiler so we don't execute it twice - let (depId, depSnapshot) = dep - done <- runtimeDone <$> get - snapshots <- runtimeSnapshots <$> get - - -- Done if we either completed the entire item (runtimeDone) or - -- if we previously saved the snapshot (runtimeSnapshots). - let depDone = - depId `S.member` done || - (depId, depSnapshot) `S.member` snapshots - - modify $ \s -> s - { runtimeTodo = M.insert id' - (if depDone then c else compilerResult result) - (runtimeTodo s) - } - - -- If the required item is already compiled, continue, or, start - -- chasing that - Logger.debug logger $ "Require " ++ show depId ++ - " (snapshot " ++ depSnapshot ++ "): " ++ - (if depDone then "OK" else "chasing") - if depDone then chase trail id' else chase (id' : trail) depId + Logger.debug logger $ "Require " ++ show depId ++ + " (snapshot " ++ depSnapshot ++ ") " + \ No newline at end of file -- cgit v1.2.3