74 lines
2.1 KiB
Haskell
74 lines
2.1 KiB
Haskell
module Control.Addressbook.Streaming where
|
|
|
|
|
|
import Conduit
|
|
import qualified Data.Conduit.Binary as CB
|
|
import qualified Data.Conduit.Combinators as C
|
|
import qualified Data.Conduit.List as CL
|
|
|
|
import Data.Email
|
|
import Data.Email.Header
|
|
(Header(..))
|
|
|
|
|
|
import qualified Data.Foldable as F
|
|
|
|
|
|
import Data.Maybe
|
|
(fromMaybe)
|
|
import System.Environment
|
|
(lookupEnv)
|
|
import System.FilePath
|
|
((</>))
|
|
import Data.Set (Set)
|
|
import Data.ByteString (ByteString)
|
|
import qualified Data.Set as Set
|
|
import qualified Data.ByteString.Lazy as LBS
|
|
import Data.Char (ord)
|
|
import qualified Data.ByteString.Lazy.Char8 as LBC
|
|
import System.IO.Unsafe (unsafeInterleaveIO)
|
|
import Control.Parallel.Strategies (rseq, parMap)
|
|
import qualified Data.List as L
|
|
import Control.Monad (unless)
|
|
import System.Posix (touchFile)
|
|
|
|
combine :: (MonadUnliftIO m, MonadResource m, MonadThrow m, MonadIO m) => ConduitM FilePath Header m ()
|
|
combine = await >>= \case
|
|
Nothing -> pure ()
|
|
Just path -> (CB.sourceFile path .| parseEmail) >> combine
|
|
|
|
chunks :: Int -> [a] -> [[a]]
|
|
chunks n = L.unfoldr $ \case
|
|
[] -> Nothing
|
|
xs -> Just (splitAt n xs)
|
|
|
|
run :: IO ()
|
|
run = do
|
|
datDir <- fromMaybe "./" <$> lookupEnv "HOME"
|
|
let datFile = datDir </> ".addressbook.dat"
|
|
touchFile datFile
|
|
original <- Set.fromList . map LBS.toStrict . lbsLines <$> LBS.readFile datFile
|
|
xs <- LBS.getContents >>= stream
|
|
let set = original `Set.union` F.fold (parMap rseq F.fold (chunks 200 xs))
|
|
unless (original == set) $
|
|
runResourceT $
|
|
runConduit $
|
|
CL.sourceList (Set.elems set)
|
|
.| C.map (<> "\n")
|
|
.| CB.sinkFileCautious datFile
|
|
where
|
|
separate = \case
|
|
From x -> [x]
|
|
To xs -> F.toList xs
|
|
-- A set of (locally) unique addresses. Composes with parMap
|
|
lbsLines = LBS.split (fromIntegral $ ord '\n')
|
|
stream :: LBS.ByteString -> IO [Set ByteString]
|
|
stream =
|
|
traverse (unsafeInterleaveIO . parse . LBC.unpack)
|
|
. filter (not . LBS.null)
|
|
. lbsLines
|
|
parse path =
|
|
runResourceT $
|
|
runConduit $
|
|
CB.sourceFile path .| parseEmail .| C.concatMap separate .| C.foldMap Set.singleton
|