diff --git a/config.dhall b/config.dhall index 019c3f2..495d6db 100644 --- a/config.dhall +++ b/config.dhall @@ -12,5 +12,6 @@ in { amqp = config.AMQP::{ , config.Fetcher::{ subreddit = "scala" } , config.Fetcher::{ subreddit = "pics", entries = 150 } ] + , sqlite = "reddit.sqlite" } : config.Type diff --git a/default.nix b/default.nix index 92c9481..ec705ee 100644 --- a/default.nix +++ b/default.nix @@ -1,6 +1,6 @@ -{ mkDerivation, acid-state, aeson, amqp, base, bytestring, dhall +{ mkDerivation, aeson, amqp, base, bytestring, containers, dhall , hedgehog, hspec, hspec-hedgehog, lens, lens-aeson, lib, mtl -, pipes, safecopy, text, wreq +, pipes, sqlite-simple, text, wreq }: mkDerivation { pname = "reddit-pub"; @@ -9,11 +9,13 @@ mkDerivation { isLibrary = true; isExecutable = true; libraryHaskellDepends = [ - acid-state aeson amqp base bytestring dhall lens lens-aeson mtl - pipes safecopy text wreq + aeson amqp base bytestring containers dhall lens lens-aeson mtl + pipes sqlite-simple text wreq ]; executableHaskellDepends = [ base ]; - testHaskellDepends = [ base hedgehog hspec hspec-hedgehog ]; + testHaskellDepends = [ + base bytestring containers hedgehog hspec hspec-hedgehog mtl + ]; license = "unknown"; hydraPlatforms = lib.platforms.none; } diff --git a/dhall/Type.dhall b/dhall/Type.dhall index e30616f..a3560d4 100644 --- a/dhall/Type.dhall +++ b/dhall/Type.dhall @@ -1,3 +1,4 @@ { amqp : ./AMQP/Type.dhall , fetchers : List ./Fetcher/Type.dhall +, sqlite : Text } diff --git a/reddit-pub.cabal b/reddit-pub.cabal index 8f46921..322849d 100644 --- a/reddit-pub.cabal +++ b/reddit-pub.cabal @@ -30,6 +30,7 @@ library Data.SubReddit Publish Data.Trie + Membership -- Modules included in this library but not exported. -- other-modules: @@ -47,9 +48,8 @@ library , dhall , wreq , pipes - , safecopy - , acid-state , containers + , sqlite-simple hs-source-dirs: src default-language: Haskell2010 diff --git a/src/Data/Config.hs b/src/Data/Config.hs index ea8e00c..2bd20a3 100644 --- a/src/Data/Config.hs +++ b/src/Data/Config.hs @@ -54,6 +54,7 @@ data Qualifier = Top | Controversial data Config = Config { configAmqp :: AMQP , configFetchers :: [Fetcher] + , configSqlite :: FilePath } deriving stock (Generic, Show) deriving (FromDhall, ToDhall) @@ -65,5 +66,8 @@ amqp = lens configAmqp (\ con am -> con{configAmqp=am}) fetchers :: Lens' Config [Fetcher] fetchers = lens configFetchers (\ con fes -> con{configFetchers=fes}) +sqlite :: Lens' Config FilePath +sqlite = lens configSqlite (\ con s -> con{configSqlite=s}) + readConfig :: FilePath -> IO Config readConfig = inputFile auto diff --git a/src/Data/Trie.hs b/src/Data/Trie.hs index a2b4d14..3d350ae 100644 --- a/src/Data/Trie.hs +++ b/src/Data/Trie.hs @@ -2,6 +2,7 @@ {-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeSynonymInstances #-} -- I know theres the bytestring-trie package, but it's been kind of -- unmaintained lately, I don't have the time to start pinning and stuff. module Data.Trie where @@ -14,10 +15,15 @@ import qualified Data.ByteString as B import Data.Word (Word8) import Data.Maybe (isJust) +data Pair a b = Pair !a !b + +instance (Semigroup a, Semigroup b) => Semigroup (Pair a b) where + (Pair a b) <> (Pair a' b') = Pair (a <> a') (b <> b') + -- XXX: This is as lazy as it gets, strictify where it's needed data TrieF a f = Empty - | ElementNode (Maybe (ByteString, a)) (Map Word8 f) + | ElementNode (Maybe (Pair ByteString a)) (Map Word8 f) deriving Functor newtype Fix f = Fix { getFix :: f (Fix f) } @@ -38,7 +44,7 @@ fromList = F.foldl' (\acc (k,v) -> insert k v acc) empty toList :: Trie a -> [(ByteString, a)] toList = cata $ \case Empty -> [] - ElementNode (Just a) children -> a : concat (M.elems children) + ElementNode (Just (Pair a b)) children -> (a,b) : concat (M.elems children) ElementNode Nothing children -> concat (M.elems children) insert :: Semigroup a => ByteString -> a -> Trie a -> Trie a @@ -47,13 +53,15 @@ insert k v = union (singleton k v) singleton :: ByteString -> a -> Trie a singleton bs a = ana go (B.uncons bs) where - go Nothing = ElementNode (Just (bs, a)) M.empty + go Nothing = let x = Pair bs a in x `seq` ElementNode (Just x) M.empty go (Just (w8,children)) = ElementNode Nothing (M.singleton w8 (B.uncons children)) union :: Semigroup a => Trie a -> Trie a -> Trie a union a (Fix Empty) = a union (Fix Empty) b = b -union (Fix (ElementNode ma' x1)) (Fix (ElementNode ma x0)) = Fix (ElementNode (ma' <> ma) (M.unionWith union x1 x0)) +union (Fix (ElementNode ma' x1)) (Fix (ElementNode ma x0)) = + let x = ma' <> ma + in x `seq` Fix (ElementNode x (M.unionWith union x1 x0)) empty :: Trie a empty = Fix Empty @@ -63,7 +71,7 @@ lookup bs t = cata go t $ B.uncons bs where go :: TrieF a (Maybe (Word8, ByteString) -> Maybe a) -> Maybe (Word8, ByteString) -> Maybe a go Empty _ = Nothing - go (ElementNode ma' _) Nothing = fmap snd ma' + go (ElementNode ma' _) Nothing = fmap (\(Pair _ b) -> b) ma' go (ElementNode _ m) (Just (w8,xs)) = M.lookup w8 m >>= \next -> next (B.uncons xs) member :: ByteString -> Trie a -> Bool diff --git a/src/Membership.hs b/src/Membership.hs new file mode 100644 index 0000000..7abd948 --- /dev/null +++ b/src/Membership.hs @@ -0,0 +1,20 @@ +{-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE TypeApplications #-} +module Membership where + +import Database.SQLite.Simple (Connection, Only (..)) +import Network.Reddit (RedditId) +import qualified Database.SQLite.Simple as SQL +import Database.SQLite.Simple.QQ (sql) +import qualified Data.Foldable as F +import Data.Monoid (Any(..)) +import Data.Text (Text) + +recordSeen :: Connection -> RedditId -> IO () +recordSeen conn rid = SQL.execute conn [sql|insert into membership (reddit_id) values (?) on conflict do nothing|] (Only rid) + +isSeen :: Connection -> RedditId -> IO Bool +isSeen conn rid = + unwrap <$> SQL.query conn [sql|select from membership (reddit_id) where reddit_id = ?|] (Only rid) + where + unwrap = getAny . F.foldMap' (Any . const @_ @Text True . fromOnly) diff --git a/src/MyLib.hs b/src/MyLib.hs index 168348e..0b08348 100644 --- a/src/MyLib.hs +++ b/src/MyLib.hs @@ -12,10 +12,14 @@ import Data.Functor.Contravariant ((>$<)) import Data.Text (Text) import qualified Data.Text.Strict.Lens as T import Network.AMQP -import Network.Reddit (publishEntries) +import Network.Reddit (publishEntries, RedditId, messageIdentifier) import Network.Wreq.Session (newSession) import Publish (Publish(..)) +import qualified Database.SQLite.Simple as SQL +import qualified Membership +sqlRecorder :: SQL.Connection -> Publish IO RedditId +sqlRecorder conn = Publish $ Membership.recordSeen conn amqpPublisher :: Channel -> Text -> Publish IO LB.ByteString amqpPublisher channel exchange = Publish $ \lbs -> @@ -30,17 +34,20 @@ amqpPublisher channel exchange = Publish $ \lbs -> defaultMain :: IO () defaultMain = do conf <- readConfig "./config.dhall" - let connect = openConnection + let rabbitConnect = openConnection (conf ^. amqp . host . T.unpacked) (conf ^. amqp . vhost) (conf ^. amqp . username) (conf ^. amqp . password) - bracket connect closeConnection $ \conn -> do - putStrLn "Hello" - chan <- openChannel conn - declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" } - sess <- newSession - let encoder = amqpPublisher chan "reddit_posts" - for_ (conf ^. fetchers) $ \fetcher -> do - print fetcher - publishEntries (A.encode >$< encoder) sess fetcher + bracket rabbitConnect closeConnection $ \conn -> do + SQL.withConnection (conf ^. sqlite) $ \sqlConn -> do + SQL.execute_ sqlConn "create table if not exists membership (reddit_id primary key)" + chan <- openChannel conn + declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" } + sess <- newSession + let encoder = amqpPublisher chan "reddit_posts" + recorder = sqlRecorder sqlConn + publisher = (A.encode >$< encoder) <> (messageIdentifier >$< recorder) + for_ (conf ^. fetchers) $ \fetcher -> do + print fetcher + publishEntries publisher sess fetcher diff --git a/src/Network/Reddit.hs b/src/Network/Reddit.hs index a744122..772d690 100644 --- a/src/Network/Reddit.hs +++ b/src/Network/Reddit.hs @@ -22,6 +22,8 @@ import qualified Pipes.Prelude as P import Publish import Data.Maybe (maybeToList) import Control.Monad.Trans (liftIO) +import Database.SQLite.Simple.ToField (ToField) +import Database.SQLite.Simple.FromField (FromField) data MessageType = Create | Update deriving stock (Show, Eq, Generic) @@ -29,7 +31,7 @@ data MessageType = Create | Update newtype RedditId = RedditId Text deriving stock (Show, Eq) - deriving (ToJSON, FromJSON) via Text + deriving (ToJSON, FromJSON, ToField, FromField) via Text data Message = Message { messageType :: MessageType diff --git a/src/Publish.hs b/src/Publish.hs index 3b3e357..d5383cc 100644 --- a/src/Publish.hs +++ b/src/Publish.hs @@ -2,6 +2,8 @@ module Publish where import Data.Functor.Contravariant +import Data.Monoid (Ap(..)) newtype Publish m a = Publish { publish :: a -> m () } deriving Contravariant via (Op (m ())) + deriving (Monoid, Semigroup) via (Op (Ap m ()) a)