Make the analysis parallel (#2)
Co-authored-by: Mats Rauhala <mats.rauhala@iki.fi> Reviewed-on: #2 Co-authored-by: Mats Rauhala <masse@rauhala.info> Co-committed-by: Mats Rauhala <masse@rauhala.info>
This commit is contained in:
@ -1,23 +1,18 @@
|
||||
module Control.Addressbook.Streaming where
|
||||
|
||||
import qualified Data.Text as T
|
||||
|
||||
import Conduit
|
||||
import qualified Data.Conduit.Binary as CB
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import qualified Data.Conduit.List as CL
|
||||
import qualified Data.Conduit.Text as CT
|
||||
|
||||
import Data.Email
|
||||
import Data.Email.Header
|
||||
(Header(..))
|
||||
|
||||
import System.IO
|
||||
(stdin)
|
||||
|
||||
import qualified Data.Foldable as F
|
||||
|
||||
import qualified Data.Map.Strict as Map
|
||||
|
||||
import Data.Maybe
|
||||
(fromMaybe)
|
||||
@ -25,28 +20,47 @@ import System.Environment
|
||||
(lookupEnv)
|
||||
import System.FilePath
|
||||
((</>))
|
||||
import Data.Set (Set)
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.Set as Set
|
||||
import qualified Data.ByteString.Lazy as LBS
|
||||
import Data.Char (ord)
|
||||
import qualified Data.ByteString.Lazy.Char8 as LBC
|
||||
import System.IO.Unsafe (unsafeInterleaveIO)
|
||||
import Control.Parallel.Strategies (rseq, parMap)
|
||||
import qualified Data.List as L
|
||||
|
||||
combine :: (MonadUnliftIO m, MonadResource m, MonadThrow m, MonadIO m) => ConduitM FilePath Header m ()
|
||||
combine = await >>= \case
|
||||
Nothing -> pure ()
|
||||
Just path -> (CB.sourceFile path .| parseEmail) >> combine
|
||||
|
||||
chunks :: Int -> [a] -> [[a]]
|
||||
chunks n = L.unfoldr $ \case
|
||||
[] -> Nothing
|
||||
xs -> Just (splitAt n xs)
|
||||
|
||||
run :: IO ()
|
||||
run = do
|
||||
datDir <- fromMaybe "./" <$> lookupEnv "HOME"
|
||||
runResourceT $ do
|
||||
x <- runConduit stream
|
||||
runConduit (CL.sourceList (Map.keys x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir </> ".addressbook.dat"))
|
||||
xs <- LBS.getContents >>= stream
|
||||
let set = F.fold (parMap rseq F.fold (chunks 20 xs))
|
||||
runResourceT $
|
||||
runConduit $
|
||||
CL.sourceList (Set.elems set)
|
||||
.| C.map (<> "\n")
|
||||
.| CB.sinkFileCautious (datDir </> ".addressbook.dat")
|
||||
where
|
||||
separate = \case
|
||||
From x -> [x]
|
||||
To xs -> F.toList xs
|
||||
-- A set of (locally) unique addresses. Composes with parMap
|
||||
stream :: LBS.ByteString -> IO [Set ByteString]
|
||||
stream =
|
||||
CB.sourceHandle stdin
|
||||
.| CT.decode CT.utf8
|
||||
.| CT.lines
|
||||
.| C.map T.unpack
|
||||
.| combine
|
||||
.| C.concatMap separate
|
||||
.| CT.encode CT.utf8
|
||||
.| C.foldMap (`Map.singleton` ())
|
||||
traverse (unsafeInterleaveIO . parse . LBC.unpack)
|
||||
. filter (not . LBS.null)
|
||||
. LBS.split (fromIntegral $ ord '\n')
|
||||
parse path =
|
||||
runResourceT $
|
||||
runConduit $
|
||||
CB.sourceFile path .| parseEmail .| C.concatMap separate .| C.foldMap Set.singleton
|
||||
|
@ -5,12 +5,9 @@ import Data.Email.Header
|
||||
|
||||
import Conduit
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import qualified Data.Conduit.Text as CT
|
||||
|
||||
import Data.ByteString
|
||||
(ByteString)
|
||||
|
||||
parseEmail :: (MonadUnliftIO m, MonadThrow m, Monad m) => ConduitM ByteString Header m ()
|
||||
parseEmail = catchC (CT.decode CT.utf8) err .| CT.lines .| C.concatMap decode
|
||||
where
|
||||
err e = liftIO (print @CT.TextException e) >> yield ""
|
||||
parseEmail = C.linesUnboundedAscii .| C.concatMap decode
|
||||
|
@ -1,29 +1,25 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
module Data.Email.Header where
|
||||
|
||||
import Data.Text
|
||||
(Text)
|
||||
import qualified Data.Text as T
|
||||
|
||||
import qualified Data.Foldable as F
|
||||
|
||||
import Data.Attoparsec.Text
|
||||
import Data.Attoparsec.ByteString.Char8
|
||||
|
||||
import Data.Vector
|
||||
(Vector)
|
||||
import qualified Data.Vector as V
|
||||
|
||||
import Data.Char
|
||||
(isSpace)
|
||||
|
||||
import Control.Applicative
|
||||
((<|>))
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.ByteString.Char8 as BC
|
||||
|
||||
data Header
|
||||
= From !Text
|
||||
| To !(Vector Text)
|
||||
= From !ByteString
|
||||
| To !(Vector ByteString)
|
||||
deriving (Show, Eq)
|
||||
|
||||
decode :: Text -> Either String Header
|
||||
decode :: ByteString -> Either String Header
|
||||
decode = parseOnly parseHeader
|
||||
where
|
||||
parseHeader :: Parser Header
|
||||
@ -33,23 +29,23 @@ decode = parseOnly parseHeader
|
||||
parseTo :: Parser Header
|
||||
parseTo = To <$> (string "To:" *> emptySpace *> emails)
|
||||
emptySpace = many' space
|
||||
emails :: Parser (Vector Text)
|
||||
emails :: Parser (Vector ByteString)
|
||||
emails = V.fromList <$> (bracketEmail <|> email) `sepBy` char ','
|
||||
bracketEmail :: Parser Text
|
||||
bracketEmail :: Parser ByteString
|
||||
bracketEmail = do
|
||||
_ <- manyTill anyChar (char '<')
|
||||
email
|
||||
email :: Parser Text
|
||||
email :: Parser ByteString
|
||||
email = do
|
||||
_ <- many' space
|
||||
name <- T.pack <$> many' (satisfy (\c -> not (isSpace c) && c /= '@'))
|
||||
name <- BC.pack <$> many' (satisfy (\c -> not (isSpace c) && c /= '@'))
|
||||
_ <- char '@'
|
||||
rest <- T.pack <$> many' (satisfy (\c -> not (isSpace c) && c /= ',' && c /= '>'))
|
||||
rest <- BC.pack <$> many' (satisfy (\c -> not (isSpace c) && c /= ',' && c /= '>'))
|
||||
_ <- many' (notChar ',')
|
||||
pure (name <> "@" <> rest)
|
||||
|
||||
|
||||
encode :: Header -> Text
|
||||
encode :: Header -> ByteString
|
||||
encode = \case
|
||||
From addr -> "From: " <> addr
|
||||
To addrs -> "To: " <> T.intercalate ", " (F.toList addrs)
|
||||
To addrs -> "To: " <> BC.intercalate ", " (F.toList addrs)
|
||||
|
Reference in New Issue
Block a user