Compare commits
No commits in common. "537c0df198c9e5ff5604b7a23de006503561d83e" and "a300c88cfbb7c261fb24730311ea8979b5638e86" have entirely different histories.
537c0df198
...
a300c88cfb
@ -1,18 +1,23 @@
|
||||
module Control.Addressbook.Streaming where
|
||||
|
||||
import qualified Data.Text as T
|
||||
|
||||
import Conduit
|
||||
import qualified Data.Conduit.Binary as CB
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import qualified Data.Conduit.List as CL
|
||||
import qualified Data.Conduit.Text as CT
|
||||
|
||||
import Data.Email
|
||||
import Data.Email.Header
|
||||
(Header(..))
|
||||
|
||||
import System.IO
|
||||
(stdin)
|
||||
|
||||
import qualified Data.Foldable as F
|
||||
|
||||
import qualified Data.Map.Strict as Map
|
||||
|
||||
import Data.Maybe
|
||||
(fromMaybe)
|
||||
@ -27,8 +32,10 @@ import qualified Data.ByteString.Lazy as LBS
|
||||
import Data.Char (ord)
|
||||
import qualified Data.ByteString.Lazy.Char8 as LBC
|
||||
import System.IO.Unsafe (unsafeInterleaveIO)
|
||||
import Control.Parallel.Strategies (rseq, parMap)
|
||||
import Control.Parallel.Strategies (using, parList, rseq, parBuffer, rdeepseq, parMap)
|
||||
import qualified Data.List as L
|
||||
import Control.Concurrent (getNumCapabilities)
|
||||
import Debug.Trace (traceShow)
|
||||
|
||||
combine :: (MonadUnliftIO m, MonadResource m, MonadThrow m, MonadIO m) => ConduitM FilePath Header m ()
|
||||
combine = await >>= \case
|
||||
@ -44,23 +51,35 @@ run :: IO ()
|
||||
run = do
|
||||
datDir <- fromMaybe "./" <$> lookupEnv "HOME"
|
||||
xs <- LBS.getContents >>= stream
|
||||
let set = F.fold (parMap rseq F.fold (chunks 20 xs))
|
||||
let x = F.fold (parMap rseq F.fold (chunks 200 xs))
|
||||
runResourceT $
|
||||
runConduit $
|
||||
CL.sourceList (Set.elems set)
|
||||
.| C.map (<> "\n")
|
||||
.| CB.sinkFileCautious (datDir </> ".addressbook.dat")
|
||||
runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir </> ".addressbook.dat"))
|
||||
where
|
||||
separate = \case
|
||||
From x -> [x]
|
||||
To xs -> F.toList xs
|
||||
-- A set of (locally) unique addresses. Composes with parMap
|
||||
stream :: LBS.ByteString -> IO [Set ByteString]
|
||||
stream =
|
||||
traverse (unsafeInterleaveIO . parse . LBC.unpack)
|
||||
. filter (not . LBS.null)
|
||||
. LBS.split (fromIntegral $ ord '\n')
|
||||
stream :: LBS.ByteString -> IO ([Set ByteString])
|
||||
stream = traverse (unsafeInterleaveIO . parse . LBC.unpack) . filter (not . LBS.null) . LBS.split (fromIntegral $ ord '\n')
|
||||
parse path =
|
||||
runResourceT $
|
||||
runConduit $
|
||||
CB.sourceFile path .| parseEmail .| C.concatMap separate .| C.foldMap Set.singleton
|
||||
|
||||
run_ :: IO ()
|
||||
run_ = do
|
||||
datDir <- fromMaybe "./" <$> lookupEnv "HOME"
|
||||
runResourceT $ do
|
||||
x <- runConduit stream
|
||||
runConduit (CL.sourceList (Set.elems x) .| C.map (<> "\n") .| CB.sinkFileCautious (datDir </> ".addressbook.dat"))
|
||||
where
|
||||
separate = \case
|
||||
From x -> [x]
|
||||
To xs -> F.toList xs
|
||||
stream =
|
||||
CB.sourceHandle stdin
|
||||
.| CT.decode CT.utf8
|
||||
.| CT.lines
|
||||
.| C.map T.unpack
|
||||
.| combine
|
||||
.| C.concatMap separate
|
||||
.| C.foldMap (Set.singleton)
|
||||
|
@ -10,12 +10,11 @@ import qualified Hedgehog.Corpus as Corpus
|
||||
import qualified Hedgehog.Gen as Gen
|
||||
import qualified Hedgehog.Range as Range
|
||||
|
||||
import Data.ByteString
|
||||
import qualified Data.ByteString as T
|
||||
import Data.Text
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Vector as V
|
||||
|
||||
import Data.Email.Header
|
||||
import qualified Data.ByteString.Char8 as BC
|
||||
|
||||
genHeader :: Gen Header
|
||||
genHeader = Gen.choice
|
||||
@ -23,17 +22,17 @@ genHeader = Gen.choice
|
||||
, To . V.fromList <$> Gen.list (Range.linear 0 10) genEmail
|
||||
]
|
||||
|
||||
genEmail :: Gen ByteString
|
||||
genEmail :: Gen Text
|
||||
genEmail = do
|
||||
name <- Gen.element Corpus.simpsons
|
||||
domain <- Gen.element Corpus.cooking
|
||||
tld <- Gen.element ["com","fi","org"]
|
||||
pure $ name <> "@" <> domain <> "." <> tld
|
||||
|
||||
wrapped :: Char -> ByteString -> Char -> ByteString
|
||||
wrapped l x r = BC.singleton l <> x <> BC.singleton r
|
||||
wrapped :: Char -> Text -> Char -> Text
|
||||
wrapped l x r = T.singleton l <> x <> T.singleton r
|
||||
|
||||
genComment :: Gen ByteString
|
||||
genComment :: Gen Text
|
||||
genComment = do
|
||||
x <- Gen.element Corpus.simpsons
|
||||
Gen.element [x, wrapped '"' x '"']
|
||||
|
Loading…
Reference in New Issue
Block a user