From 560ea238612a360f3d02ab9c668e4f03e064237b Mon Sep 17 00:00:00 2001 From: Mats Rauhala Date: Fri, 29 Oct 2021 20:47:04 +0300 Subject: [PATCH 1/4] parallel --- addressbook.cabal | 1 + default.nix | 7 ++++--- src/Control/Addressbook/Streaming.hs | 23 +++++++++++++++++++++++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/addressbook.cabal b/addressbook.cabal index 6bf76b3..571b0ca 100644 --- a/addressbook.cabal +++ b/addressbook.cabal @@ -41,6 +41,7 @@ library , vector , containers , filepath + , parallel hs-source-dirs: src default-language: Haskell2010 ghc-options: -Wall diff --git a/default.nix b/default.nix index d687fff..1431253 100644 --- a/default.nix +++ b/default.nix @@ -1,7 +1,7 @@ { mkDerivation, attoparsec, base, bytestring, conduit , conduit-extra, containers, criterion, filepath, hedgehog , hedgehog-corpus, HUnit, lens, lib, mtl, optparse-applicative -, tasty, tasty-hedgehog, tasty-hunit, text, vector +, parallel, tasty, tasty-hedgehog, tasty-hunit, text, vector }: mkDerivation { pname = "addressbook"; @@ -11,10 +11,11 @@ mkDerivation { isExecutable = true; libraryHaskellDepends = [ attoparsec base bytestring conduit conduit-extra containers - filepath lens mtl text vector + filepath lens mtl parallel text vector ]; executableHaskellDepends = [ - base criterion hedgehog-corpus optparse-applicative text + base bytestring containers criterion hedgehog-corpus + optparse-applicative text ]; testHaskellDepends = [ base bytestring conduit conduit-extra containers hedgehog diff --git a/src/Control/Addressbook/Streaming.hs b/src/Control/Addressbook/Streaming.hs index a8989fd..cd04bfd 100644 --- a/src/Control/Addressbook/Streaming.hs +++ b/src/Control/Addressbook/Streaming.hs @@ -25,6 +25,12 @@ import System.Environment (lookupEnv) import System.FilePath (()) +import Data.Set (Set) +import Data.ByteString (ByteString) +import qualified Data.Set as Set +import qualified Data.ByteString.Lazy as LBS +import Data.Char (ord) +import qualified Data.ByteString.Lazy.Char8 as LBC combine :: (MonadUnliftIO m, MonadResource m, MonadThrow m, MonadIO m) => ConduitM FilePath Header m () combine = await >>= \case @@ -33,6 +39,23 @@ combine = await >>= \case run :: IO () run = do + datDir <- fromMaybe "./" <$> lookupEnv "HOME" + x <- LBS.getContents >>= stream + runResourceT $ + runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir ".addressbook.dat")) + where + separate = \case + From x -> [x] + To xs -> F.toList xs + stream :: LBS.ByteString -> IO (Set ByteString) + stream xs = F.fold <$> traverse (parse . LBC.unpack) (LBS.split (fromIntegral $ ord '\n') xs) + parse path = + runResourceT $ + runConduit $ + CB.sourceFile path .| parseEmail .| C.concatMap separate .| CT.encode CT.utf8 .| C.foldMap Set.singleton + +run_ :: IO () +run_ = do datDir <- fromMaybe "./" <$> lookupEnv "HOME" runResourceT $ do x <- runConduit stream -- 2.47.0 From a300c88cfbb7c261fb24730311ea8979b5638e86 Mon Sep 17 00:00:00 2001 From: Mats Rauhala Date: Fri, 29 Oct 2021 22:09:43 +0300 Subject: [PATCH 2/4] Try parallel --- addressbook.cabal | 2 +- src/Control/Addressbook/Streaming.hs | 24 +++++++++++++++------ src/Data/Email.hs | 5 +---- src/Data/Email/Header.hs | 32 ++++++++++++---------------- 4 files changed, 33 insertions(+), 30 deletions(-) diff --git a/addressbook.cabal b/addressbook.cabal index 571b0ca..ef0188e 100644 --- a/addressbook.cabal +++ b/addressbook.cabal @@ -56,7 +56,7 @@ executable addressbook , text hs-source-dirs: app default-language: Haskell2010 - ghc-options: -Wall -threaded + ghc-options: -Wall -threaded -eventlog test-suite addressbook-test import: deps diff --git a/src/Control/Addressbook/Streaming.hs b/src/Control/Addressbook/Streaming.hs index cd04bfd..efdedfe 100644 --- a/src/Control/Addressbook/Streaming.hs +++ b/src/Control/Addressbook/Streaming.hs @@ -31,35 +31,46 @@ import qualified Data.Set as Set import qualified Data.ByteString.Lazy as LBS import Data.Char (ord) import qualified Data.ByteString.Lazy.Char8 as LBC +import System.IO.Unsafe (unsafeInterleaveIO) +import Control.Parallel.Strategies (using, parList, rseq, parBuffer, rdeepseq, parMap) +import qualified Data.List as L +import Control.Concurrent (getNumCapabilities) +import Debug.Trace (traceShow) combine :: (MonadUnliftIO m, MonadResource m, MonadThrow m, MonadIO m) => ConduitM FilePath Header m () combine = await >>= \case Nothing -> pure () Just path -> (CB.sourceFile path .| parseEmail) >> combine +chunks :: Int -> [a] -> [[a]] +chunks n = L.unfoldr $ \case + [] -> Nothing + xs -> Just (splitAt n xs) + run :: IO () run = do datDir <- fromMaybe "./" <$> lookupEnv "HOME" - x <- LBS.getContents >>= stream + xs <- LBS.getContents >>= stream + let x = F.fold (parMap rseq F.fold (chunks 200 xs)) runResourceT $ runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir ".addressbook.dat")) where separate = \case From x -> [x] To xs -> F.toList xs - stream :: LBS.ByteString -> IO (Set ByteString) - stream xs = F.fold <$> traverse (parse . LBC.unpack) (LBS.split (fromIntegral $ ord '\n') xs) + stream :: LBS.ByteString -> IO ([Set ByteString]) + stream = traverse (unsafeInterleaveIO . parse . LBC.unpack) . filter (not . LBS.null) . LBS.split (fromIntegral $ ord '\n') parse path = runResourceT $ runConduit $ - CB.sourceFile path .| parseEmail .| C.concatMap separate .| CT.encode CT.utf8 .| C.foldMap Set.singleton + CB.sourceFile path .| parseEmail .| C.concatMap separate .| C.foldMap Set.singleton run_ :: IO () run_ = do datDir <- fromMaybe "./" <$> lookupEnv "HOME" runResourceT $ do x <- runConduit stream - runConduit (CL.sourceList (Map.keys x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir ".addressbook.dat")) + runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir ".addressbook.dat")) where separate = \case From x -> [x] @@ -71,5 +82,4 @@ run_ = do .| C.map T.unpack .| combine .| C.concatMap separate - .| CT.encode CT.utf8 - .| C.foldMap (`Map.singleton` ()) + .| C.foldMap (Set.singleton) diff --git a/src/Data/Email.hs b/src/Data/Email.hs index 72ee8a2..b34e50f 100644 --- a/src/Data/Email.hs +++ b/src/Data/Email.hs @@ -5,12 +5,9 @@ import Data.Email.Header import Conduit import qualified Data.Conduit.Combinators as C -import qualified Data.Conduit.Text as CT import Data.ByteString (ByteString) parseEmail :: (MonadUnliftIO m, MonadThrow m, Monad m) => ConduitM ByteString Header m () -parseEmail = catchC (CT.decode CT.utf8) err .| CT.lines .| C.concatMap decode - where - err e = liftIO (print @CT.TextException e) >> yield "" +parseEmail = C.linesUnboundedAscii .| C.concatMap decode diff --git a/src/Data/Email/Header.hs b/src/Data/Email/Header.hs index 2651757..6c3d918 100644 --- a/src/Data/Email/Header.hs +++ b/src/Data/Email/Header.hs @@ -1,29 +1,25 @@ +{-# LANGUAGE OverloadedStrings #-} module Data.Email.Header where -import Data.Text - (Text) -import qualified Data.Text as T - import qualified Data.Foldable as F -import Data.Attoparsec.Text +import Data.Attoparsec.ByteString.Char8 import Data.Vector (Vector) import qualified Data.Vector as V -import Data.Char - (isSpace) - import Control.Applicative ((<|>)) +import Data.ByteString (ByteString) +import qualified Data.ByteString.Char8 as BC data Header - = From !Text - | To !(Vector Text) + = From !ByteString + | To !(Vector ByteString) deriving (Show, Eq) -decode :: Text -> Either String Header +decode :: ByteString -> Either String Header decode = parseOnly parseHeader where parseHeader :: Parser Header @@ -33,23 +29,23 @@ decode = parseOnly parseHeader parseTo :: Parser Header parseTo = To <$> (string "To:" *> emptySpace *> emails) emptySpace = many' space - emails :: Parser (Vector Text) + emails :: Parser (Vector ByteString) emails = V.fromList <$> (bracketEmail <|> email) `sepBy` char ',' - bracketEmail :: Parser Text + bracketEmail :: Parser ByteString bracketEmail = do _ <- manyTill anyChar (char '<') email - email :: Parser Text + email :: Parser ByteString email = do _ <- many' space - name <- T.pack <$> many' (satisfy (\c -> not (isSpace c) && c /= '@')) + name <- BC.pack <$> many' (satisfy (\c -> not (isSpace c) && c /= '@')) _ <- char '@' - rest <- T.pack <$> many' (satisfy (\c -> not (isSpace c) && c /= ',' && c /= '>')) + rest <- BC.pack <$> many' (satisfy (\c -> not (isSpace c) && c /= ',' && c /= '>')) _ <- many' (notChar ',') pure (name <> "@" <> rest) -encode :: Header -> Text +encode :: Header -> ByteString encode = \case From addr -> "From: " <> addr - To addrs -> "To: " <> T.intercalate ", " (F.toList addrs) + To addrs -> "To: " <> BC.intercalate ", " (F.toList addrs) -- 2.47.0 From 75aa615263b6fe4321518222c79c8cf21c69209c Mon Sep 17 00:00:00 2001 From: Mats Rauhala Date: Fri, 29 Oct 2021 22:37:47 +0300 Subject: [PATCH 3/4] Clean up --- src/Control/Addressbook/Streaming.hs | 43 ++++++++-------------------- 1 file changed, 12 insertions(+), 31 deletions(-) diff --git a/src/Control/Addressbook/Streaming.hs b/src/Control/Addressbook/Streaming.hs index efdedfe..5c22554 100644 --- a/src/Control/Addressbook/Streaming.hs +++ b/src/Control/Addressbook/Streaming.hs @@ -1,23 +1,18 @@ module Control.Addressbook.Streaming where -import qualified Data.Text as T import Conduit import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.Combinators as C import qualified Data.Conduit.List as CL -import qualified Data.Conduit.Text as CT import Data.Email import Data.Email.Header (Header(..)) -import System.IO - (stdin) import qualified Data.Foldable as F -import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe) @@ -32,10 +27,8 @@ import qualified Data.ByteString.Lazy as LBS import Data.Char (ord) import qualified Data.ByteString.Lazy.Char8 as LBC import System.IO.Unsafe (unsafeInterleaveIO) -import Control.Parallel.Strategies (using, parList, rseq, parBuffer, rdeepseq, parMap) +import Control.Parallel.Strategies (rseq, parMap) import qualified Data.List as L -import Control.Concurrent (getNumCapabilities) -import Debug.Trace (traceShow) combine :: (MonadUnliftIO m, MonadResource m, MonadThrow m, MonadIO m) => ConduitM FilePath Header m () combine = await >>= \case @@ -51,35 +44,23 @@ run :: IO () run = do datDir <- fromMaybe "./" <$> lookupEnv "HOME" xs <- LBS.getContents >>= stream - let x = F.fold (parMap rseq F.fold (chunks 200 xs)) + let set = F.fold (parMap rseq F.fold (chunks 20 xs)) runResourceT $ - runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir ".addressbook.dat")) + runConduit $ + CL.sourceList (Set.elems set) + .| C.map (<> "\n") + .| CB.sinkFileCautious (datDir ".addressbook.dat") where separate = \case From x -> [x] To xs -> F.toList xs - stream :: LBS.ByteString -> IO ([Set ByteString]) - stream = traverse (unsafeInterleaveIO . parse . LBC.unpack) . filter (not . LBS.null) . LBS.split (fromIntegral $ ord '\n') + -- A set of (locally) unique addresses. Composes with parMap + stream :: LBS.ByteString -> IO [Set ByteString] + stream = + traverse (unsafeInterleaveIO . parse . LBC.unpack) + . filter (not . LBS.null) + . LBS.split (fromIntegral $ ord '\n') parse path = runResourceT $ runConduit $ CB.sourceFile path .| parseEmail .| C.concatMap separate .| C.foldMap Set.singleton - -run_ :: IO () -run_ = do - datDir <- fromMaybe "./" <$> lookupEnv "HOME" - runResourceT $ do - x <- runConduit stream - runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir ".addressbook.dat")) - where - separate = \case - From x -> [x] - To xs -> F.toList xs - stream = - CB.sourceHandle stdin - .| CT.decode CT.utf8 - .| CT.lines - .| C.map T.unpack - .| combine - .| C.concatMap separate - .| C.foldMap (Set.singleton) -- 2.47.0 From 537c0df198c9e5ff5604b7a23de006503561d83e Mon Sep 17 00:00:00 2001 From: Mats Rauhala Date: Fri, 29 Oct 2021 22:39:27 +0300 Subject: [PATCH 4/4] Update tests --- test/Test/Data/Email/Header.hs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/test/Test/Data/Email/Header.hs b/test/Test/Data/Email/Header.hs index bc58f71..d6cc750 100644 --- a/test/Test/Data/Email/Header.hs +++ b/test/Test/Data/Email/Header.hs @@ -10,11 +10,12 @@ import qualified Hedgehog.Corpus as Corpus import qualified Hedgehog.Gen as Gen import qualified Hedgehog.Range as Range -import Data.Text -import qualified Data.Text as T +import Data.ByteString +import qualified Data.ByteString as T import qualified Data.Vector as V import Data.Email.Header +import qualified Data.ByteString.Char8 as BC genHeader :: Gen Header genHeader = Gen.choice @@ -22,17 +23,17 @@ genHeader = Gen.choice , To . V.fromList <$> Gen.list (Range.linear 0 10) genEmail ] -genEmail :: Gen Text +genEmail :: Gen ByteString genEmail = do name <- Gen.element Corpus.simpsons domain <- Gen.element Corpus.cooking tld <- Gen.element ["com","fi","org"] pure $ name <> "@" <> domain <> "." <> tld -wrapped :: Char -> Text -> Char -> Text -wrapped l x r = T.singleton l <> x <> T.singleton r +wrapped :: Char -> ByteString -> Char -> ByteString +wrapped l x r = BC.singleton l <> x <> BC.singleton r -genComment :: Gen Text +genComment :: Gen ByteString genComment = do x <- Gen.element Corpus.simpsons Gen.element [x, wrapped '"' x '"'] -- 2.47.0