diff --git a/src/Control/Addressbook/Streaming.hs b/src/Control/Addressbook/Streaming.hs index efdedfe..5c22554 100644 --- a/src/Control/Addressbook/Streaming.hs +++ b/src/Control/Addressbook/Streaming.hs @@ -1,23 +1,18 @@ module Control.Addressbook.Streaming where -import qualified Data.Text as T import Conduit import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.Combinators as C import qualified Data.Conduit.List as CL -import qualified Data.Conduit.Text as CT import Data.Email import Data.Email.Header (Header(..)) -import System.IO - (stdin) import qualified Data.Foldable as F -import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe) @@ -32,10 +27,8 @@ import qualified Data.ByteString.Lazy as LBS import Data.Char (ord) import qualified Data.ByteString.Lazy.Char8 as LBC import System.IO.Unsafe (unsafeInterleaveIO) -import Control.Parallel.Strategies (using, parList, rseq, parBuffer, rdeepseq, parMap) +import Control.Parallel.Strategies (rseq, parMap) import qualified Data.List as L -import Control.Concurrent (getNumCapabilities) -import Debug.Trace (traceShow) combine :: (MonadUnliftIO m, MonadResource m, MonadThrow m, MonadIO m) => ConduitM FilePath Header m () combine = await >>= \case @@ -51,35 +44,23 @@ run :: IO () run = do datDir <- fromMaybe "./" <$> lookupEnv "HOME" xs <- LBS.getContents >>= stream - let x = F.fold (parMap rseq F.fold (chunks 200 xs)) + let set = F.fold (parMap rseq F.fold (chunks 20 xs)) runResourceT $ - runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir ".addressbook.dat")) + runConduit $ + CL.sourceList (Set.elems set) + .| C.map (<> "\n") + .| CB.sinkFileCautious (datDir ".addressbook.dat") where separate = \case From x -> [x] To xs -> F.toList xs - stream :: LBS.ByteString -> IO ([Set ByteString]) - stream = traverse (unsafeInterleaveIO . parse . LBC.unpack) . filter (not . LBS.null) . LBS.split (fromIntegral $ ord '\n') + -- A set of (locally) unique addresses. Composes with parMap + stream :: LBS.ByteString -> IO [Set ByteString] + stream = + traverse (unsafeInterleaveIO . parse . LBC.unpack) + . filter (not . LBS.null) + . LBS.split (fromIntegral $ ord '\n') parse path = runResourceT $ runConduit $ CB.sourceFile path .| parseEmail .| C.concatMap separate .| C.foldMap Set.singleton - -run_ :: IO () -run_ = do - datDir <- fromMaybe "./" <$> lookupEnv "HOME" - runResourceT $ do - x <- runConduit stream - runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir ".addressbook.dat")) - where - separate = \case - From x -> [x] - To xs -> F.toList xs - stream = - CB.sourceHandle stdin - .| CT.decode CT.utf8 - .| CT.lines - .| C.map T.unpack - .| combine - .| C.concatMap separate - .| C.foldMap (Set.singleton)