reddit-pub/src/MyLib.hs

105 lines
3.4 KiB
Haskell

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeOperators #-}
module MyLib (defaultMain) where
import Control.Concurrent (threadDelay)
import Control.Exception (bracket)
import Control.Lens
import Control.Monad (forever, void)
import Data.Aeson (FromJSON, ToJSON, Value)
import qualified Data.Aeson as A
import Data.Aeson.Lens (_String, key)
import Data.Bool (bool)
import Data.Config
import Data.Deriving.Aeson
import Data.Foldable (for_)
import Data.Functor.Contravariant ((>$<))
import Data.Text (Text)
import qualified Data.Text.IO as TI
import qualified Data.Text.Strict.Lens as T
import qualified Database.SQLite.Simple as SQL
import GHC.Generics (Generic)
import qualified Membership
import Network.AMQP
( Channel
, DeliveryMode(Persistent)
, closeConnection
, declareExchange
, exchangeName
, exchangeType
, msgBody
, msgDeliveryMode
, newExchange
, newMsg
, openChannel
, openConnection
, publishMsg
)
import Network.Reddit (RedditId(RedditId), publishEntries)
import Network.Wreq.Session (newSession)
import Publish (Publish(..))
data MessageType = Create | Update
deriving stock (Show, Eq, Generic)
deriving anyclass (ToJSON, FromJSON)
data Message = Message
{ messageType :: MessageType
, messageIdentifier :: RedditId
, messageContent :: Value
}
deriving stock (Show, Eq, Generic)
deriving (ToJSON, FromJSON)
via AesonCodec (Field (CamelCase <<< DropPrefix "message")) Message
toMessage :: Value -> Message
toMessage entry = Message Create (RedditId (entry ^. key "id" . _String)) entry
sqlRecorder :: SQL.Connection -> Publish IO RedditId
sqlRecorder conn = Publish $ Membership.recordSeen conn
amqpPublisher :: SQL.Connection -> Channel -> Text -> Publish IO Message
amqpPublisher sqlConn channel exchange = Publish $ \msg -> do
seen <- Membership.isSeen sqlConn (messageIdentifier msg)
let msg' = msg{messageType = bool Create Update seen}
void $ publishMsg channel exchange routingKey (message (A.encode msg'))
where
routingKey = "doesn't matter on fanout"
message lbs = newMsg
{ msgBody = lbs
, msgDeliveryMode = Just Persistent
}
defaultMain :: FilePath -> IO ()
defaultMain path = do
conf <- readConfig path
pass <- getPassword (conf ^. amqp . password)
let rabbitConnect = openConnection
(conf ^. amqp . host . T.unpacked)
(conf ^. amqp . vhost)
(conf ^. amqp . username)
pass
bracket rabbitConnect closeConnection $ \conn -> do
SQL.withConnection (conf ^. sqlite) $ \sqlConn -> do
SQL.execute_ sqlConn "create table if not exists membership (reddit_id primary key)"
chan <- openChannel conn
declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" }
sess <- newSession
let encoder = amqpPublisher sqlConn chan "reddit_posts"
recorder = sqlRecorder sqlConn
publisher = encoder <> (messageIdentifier >$< recorder)
forever $ do
for_ (conf ^. fetchers) $ \fetcher -> do
print fetcher
publishEntries (toMessage >$< publisher) sess fetcher
threadDelay (15 * 60_000_000)
getPassword :: Password -> IO Text
getPassword (Password p) = pure p
getPassword (File path) = TI.readFile path