Make the analysis parallel #2

Merged
MasseR merged 4 commits from parallel into master 2021-10-29 22:41:47 +03:00
4 changed files with 33 additions and 30 deletions
Showing only changes of commit a300c88cfb - Show all commits

View File

@ -56,7 +56,7 @@ executable addressbook
, text , text
hs-source-dirs: app hs-source-dirs: app
default-language: Haskell2010 default-language: Haskell2010
ghc-options: -Wall -threaded ghc-options: -Wall -threaded -eventlog
test-suite addressbook-test test-suite addressbook-test
import: deps import: deps

View File

@ -31,35 +31,46 @@ import qualified Data.Set as Set
import qualified Data.ByteString.Lazy as LBS import qualified Data.ByteString.Lazy as LBS
import Data.Char (ord) import Data.Char (ord)
import qualified Data.ByteString.Lazy.Char8 as LBC import qualified Data.ByteString.Lazy.Char8 as LBC
import System.IO.Unsafe (unsafeInterleaveIO)
import Control.Parallel.Strategies (using, parList, rseq, parBuffer, rdeepseq, parMap)
import qualified Data.List as L
import Control.Concurrent (getNumCapabilities)
import Debug.Trace (traceShow)
combine :: (MonadUnliftIO m, MonadResource m, MonadThrow m, MonadIO m) => ConduitM FilePath Header m () combine :: (MonadUnliftIO m, MonadResource m, MonadThrow m, MonadIO m) => ConduitM FilePath Header m ()
combine = await >>= \case combine = await >>= \case
Nothing -> pure () Nothing -> pure ()
Just path -> (CB.sourceFile path .| parseEmail) >> combine Just path -> (CB.sourceFile path .| parseEmail) >> combine
chunks :: Int -> [a] -> [[a]]
chunks n = L.unfoldr $ \case
[] -> Nothing
xs -> Just (splitAt n xs)
run :: IO () run :: IO ()
run = do run = do
datDir <- fromMaybe "./" <$> lookupEnv "HOME" datDir <- fromMaybe "./" <$> lookupEnv "HOME"
x <- LBS.getContents >>= stream xs <- LBS.getContents >>= stream
let x = F.fold (parMap rseq F.fold (chunks 200 xs))
runResourceT $ runResourceT $
runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir </> ".addressbook.dat")) runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir </> ".addressbook.dat"))
where where
separate = \case separate = \case
From x -> [x] From x -> [x]
To xs -> F.toList xs To xs -> F.toList xs
stream :: LBS.ByteString -> IO (Set ByteString) stream :: LBS.ByteString -> IO ([Set ByteString])
stream xs = F.fold <$> traverse (parse . LBC.unpack) (LBS.split (fromIntegral $ ord '\n') xs) stream = traverse (unsafeInterleaveIO . parse . LBC.unpack) . filter (not . LBS.null) . LBS.split (fromIntegral $ ord '\n')
parse path = parse path =
runResourceT $ runResourceT $
runConduit $ runConduit $
CB.sourceFile path .| parseEmail .| C.concatMap separate .| CT.encode CT.utf8 .| C.foldMap Set.singleton CB.sourceFile path .| parseEmail .| C.concatMap separate .| C.foldMap Set.singleton
run_ :: IO () run_ :: IO ()
run_ = do run_ = do
datDir <- fromMaybe "./" <$> lookupEnv "HOME" datDir <- fromMaybe "./" <$> lookupEnv "HOME"
runResourceT $ do runResourceT $ do
x <- runConduit stream x <- runConduit stream
runConduit (CL.sourceList (Map.keys x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir </> ".addressbook.dat")) runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir </> ".addressbook.dat"))
where where
separate = \case separate = \case
From x -> [x] From x -> [x]
@ -71,5 +82,4 @@ run_ = do
.| C.map T.unpack .| C.map T.unpack
.| combine .| combine
.| C.concatMap separate .| C.concatMap separate
.| CT.encode CT.utf8 .| C.foldMap (Set.singleton)
.| C.foldMap (`Map.singleton` ())

View File

@ -5,12 +5,9 @@ import Data.Email.Header
import Conduit import Conduit
import qualified Data.Conduit.Combinators as C import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.Text as CT
import Data.ByteString import Data.ByteString
(ByteString) (ByteString)
parseEmail :: (MonadUnliftIO m, MonadThrow m, Monad m) => ConduitM ByteString Header m () parseEmail :: (MonadUnliftIO m, MonadThrow m, Monad m) => ConduitM ByteString Header m ()
parseEmail = catchC (CT.decode CT.utf8) err .| CT.lines .| C.concatMap decode parseEmail = C.linesUnboundedAscii .| C.concatMap decode
where
err e = liftIO (print @CT.TextException e) >> yield ""

View File

@ -1,29 +1,25 @@
{-# LANGUAGE OverloadedStrings #-}
module Data.Email.Header where module Data.Email.Header where
import Data.Text
(Text)
import qualified Data.Text as T
import qualified Data.Foldable as F import qualified Data.Foldable as F
import Data.Attoparsec.Text import Data.Attoparsec.ByteString.Char8
import Data.Vector import Data.Vector
(Vector) (Vector)
import qualified Data.Vector as V import qualified Data.Vector as V
import Data.Char
(isSpace)
import Control.Applicative import Control.Applicative
((<|>)) ((<|>))
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BC
data Header data Header
= From !Text = From !ByteString
| To !(Vector Text) | To !(Vector ByteString)
deriving (Show, Eq) deriving (Show, Eq)
decode :: Text -> Either String Header decode :: ByteString -> Either String Header
decode = parseOnly parseHeader decode = parseOnly parseHeader
where where
parseHeader :: Parser Header parseHeader :: Parser Header
@ -33,23 +29,23 @@ decode = parseOnly parseHeader
parseTo :: Parser Header parseTo :: Parser Header
parseTo = To <$> (string "To:" *> emptySpace *> emails) parseTo = To <$> (string "To:" *> emptySpace *> emails)
emptySpace = many' space emptySpace = many' space
emails :: Parser (Vector Text) emails :: Parser (Vector ByteString)
emails = V.fromList <$> (bracketEmail <|> email) `sepBy` char ',' emails = V.fromList <$> (bracketEmail <|> email) `sepBy` char ','
bracketEmail :: Parser Text bracketEmail :: Parser ByteString
bracketEmail = do bracketEmail = do
_ <- manyTill anyChar (char '<') _ <- manyTill anyChar (char '<')
email email
email :: Parser Text email :: Parser ByteString
email = do email = do
_ <- many' space _ <- many' space
name <- T.pack <$> many' (satisfy (\c -> not (isSpace c) && c /= '@')) name <- BC.pack <$> many' (satisfy (\c -> not (isSpace c) && c /= '@'))
_ <- char '@' _ <- char '@'
rest <- T.pack <$> many' (satisfy (\c -> not (isSpace c) && c /= ',' && c /= '>')) rest <- BC.pack <$> many' (satisfy (\c -> not (isSpace c) && c /= ',' && c /= '>'))
_ <- many' (notChar ',') _ <- many' (notChar ',')
pure (name <> "@" <> rest) pure (name <> "@" <> rest)
encode :: Header -> Text encode :: Header -> ByteString
encode = \case encode = \case
From addr -> "From: " <> addr From addr -> "From: " <> addr
To addrs -> "To: " <> T.intercalate ", " (F.toList addrs) To addrs -> "To: " <> BC.intercalate ", " (F.toList addrs)