Make the analysis parallel #2

Merged
MasseR merged 4 commits from parallel into master 2021-10-29 22:41:47 +03:00
Showing only changes of commit 75aa615263 - Show all commits

View File

@ -1,23 +1,18 @@
module Control.Addressbook.Streaming where module Control.Addressbook.Streaming where
import qualified Data.Text as T
import Conduit import Conduit
import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Combinators as C import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.List as CL import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Text as CT
import Data.Email import Data.Email
import Data.Email.Header import Data.Email.Header
(Header(..)) (Header(..))
import System.IO
(stdin)
import qualified Data.Foldable as F import qualified Data.Foldable as F
import qualified Data.Map.Strict as Map
import Data.Maybe import Data.Maybe
(fromMaybe) (fromMaybe)
@ -32,10 +27,8 @@ import qualified Data.ByteString.Lazy as LBS
import Data.Char (ord) import Data.Char (ord)
import qualified Data.ByteString.Lazy.Char8 as LBC import qualified Data.ByteString.Lazy.Char8 as LBC
import System.IO.Unsafe (unsafeInterleaveIO) import System.IO.Unsafe (unsafeInterleaveIO)
import Control.Parallel.Strategies (using, parList, rseq, parBuffer, rdeepseq, parMap) import Control.Parallel.Strategies (rseq, parMap)
import qualified Data.List as L import qualified Data.List as L
import Control.Concurrent (getNumCapabilities)
import Debug.Trace (traceShow)
combine :: (MonadUnliftIO m, MonadResource m, MonadThrow m, MonadIO m) => ConduitM FilePath Header m () combine :: (MonadUnliftIO m, MonadResource m, MonadThrow m, MonadIO m) => ConduitM FilePath Header m ()
combine = await >>= \case combine = await >>= \case
@ -51,35 +44,23 @@ run :: IO ()
run = do run = do
datDir <- fromMaybe "./" <$> lookupEnv "HOME" datDir <- fromMaybe "./" <$> lookupEnv "HOME"
xs <- LBS.getContents >>= stream xs <- LBS.getContents >>= stream
let x = F.fold (parMap rseq F.fold (chunks 200 xs)) let set = F.fold (parMap rseq F.fold (chunks 20 xs))
runResourceT $ runResourceT $
runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir </> ".addressbook.dat")) runConduit $
CL.sourceList (Set.elems set)
.| C.map (<> "\n")
.| CB.sinkFileCautious (datDir </> ".addressbook.dat")
where where
separate = \case separate = \case
From x -> [x] From x -> [x]
To xs -> F.toList xs To xs -> F.toList xs
stream :: LBS.ByteString -> IO ([Set ByteString]) -- A set of (locally) unique addresses. Composes with parMap
stream = traverse (unsafeInterleaveIO . parse . LBC.unpack) . filter (not . LBS.null) . LBS.split (fromIntegral $ ord '\n') stream :: LBS.ByteString -> IO [Set ByteString]
stream =
traverse (unsafeInterleaveIO . parse . LBC.unpack)
. filter (not . LBS.null)
. LBS.split (fromIntegral $ ord '\n')
parse path = parse path =
runResourceT $ runResourceT $
runConduit $ runConduit $
CB.sourceFile path .| parseEmail .| C.concatMap separate .| C.foldMap Set.singleton CB.sourceFile path .| parseEmail .| C.concatMap separate .| C.foldMap Set.singleton
run_ :: IO ()
run_ = do
datDir <- fromMaybe "./" <$> lookupEnv "HOME"
runResourceT $ do
x <- runConduit stream
runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir </> ".addressbook.dat"))
where
separate = \case
From x -> [x]
To xs -> F.toList xs
stream =
CB.sourceHandle stdin
.| CT.decode CT.utf8
.| CT.lines
.| C.map T.unpack
.| combine
.| C.concatMap separate
.| C.foldMap (Set.singleton)