{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DerivingVia #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TypeOperators #-} module MyLib (defaultMain) where import Control.Exception (bracket) import Control.Lens import Control.Monad (void) import Data.Aeson (FromJSON, ToJSON, Value) import qualified Data.Aeson as A import Data.Config import Data.Deriving.Aeson import Data.Foldable (for_) import Data.Functor.Contravariant ((>$<)) import Data.Text (Text) import qualified Data.Text.Strict.Lens as T import qualified Database.SQLite.Simple as SQL import GHC.Generics (Generic) import qualified Membership import Network.AMQP ( Channel , DeliveryMode(Persistent) , exchangeName , exchangeType , msgBody , msgDeliveryMode , newMsg , publishMsg, openConnection, closeConnection, openChannel, newExchange, declareExchange ) import Network.Reddit (RedditId (RedditId), publishEntries) import Network.Wreq.Session (newSession) import Publish (Publish(..)) import Data.Aeson.Lens (key, _String) import Data.Bool (bool) data MessageType = Create | Update deriving stock (Show, Eq, Generic) deriving anyclass (ToJSON, FromJSON) data Message = Message { messageType :: MessageType , messageIdentifier :: RedditId , messageContent :: Value } deriving stock (Show, Eq, Generic) deriving (ToJSON, FromJSON) via AesonCodec (Field (CamelCase <<< DropPrefix "message")) Message toMessage :: Value -> Message toMessage entry = Message Create (RedditId (entry ^. key "id" . _String)) entry sqlRecorder :: SQL.Connection -> Publish IO RedditId sqlRecorder conn = Publish $ Membership.recordSeen conn amqpPublisher :: SQL.Connection -> Channel -> Text -> Publish IO Message amqpPublisher sqlConn channel exchange = Publish $ \msg -> do seen <- Membership.isSeen sqlConn (messageIdentifier msg) let msg' = msg{messageType = bool Create Update seen} void $ publishMsg channel exchange routingKey (message (A.encode msg')) where routingKey = "doesn't matter on fanout" message lbs = newMsg { msgBody = lbs , msgDeliveryMode = Just Persistent } defaultMain :: FilePath -> IO () defaultMain path = do conf <- readConfig path let rabbitConnect = openConnection (conf ^. amqp . host . T.unpacked) (conf ^. amqp . vhost) (conf ^. amqp . username) (conf ^. amqp . password) bracket rabbitConnect closeConnection $ \conn -> do SQL.withConnection (conf ^. sqlite) $ \sqlConn -> do SQL.execute_ sqlConn "create table if not exists membership (reddit_id primary key)" chan <- openChannel conn declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" } sess <- newSession let encoder = amqpPublisher sqlConn chan "reddit_posts" recorder = sqlRecorder sqlConn publisher = encoder <> (messageIdentifier >$< recorder) for_ (conf ^. fetchers) $ \fetcher -> do print fetcher publishEntries (toMessage >$< publisher) sess fetcher