Refactor the publishing and introduce logging

This commit is contained in:
Mats Rauhala 2022-04-20 21:01:01 +03:00
parent 21013d7e40
commit c9a7d79bcf

View File

@ -5,6 +5,7 @@
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE LambdaCase #-}
module MyLib (defaultMain) where
import Control.Concurrent (threadDelay)
@ -43,6 +44,8 @@ import Network.AMQP
import Network.Reddit (RedditId(RedditId), publishEntries)
import Network.Wreq.Session (newSession)
import Publish (Publish(..))
import Text.Printf (printf)
import Data.ByteString.Lazy (ByteString)
data MessageType = Create | Update
deriving stock (Show, Eq, Generic)
@ -57,17 +60,23 @@ data Message = Message
deriving (ToJSON, FromJSON)
via AesonCodec (Field (CamelCase <<< DropPrefix "message")) Message
toMessage :: Value -> Message
toMessage entry = Message Create (RedditId (entry ^. key "id" . _String)) entry
sqlRecorder :: SQL.Connection -> Publish IO RedditId
sqlRecorder conn = Publish $ Membership.recordSeen conn
toMessage :: SQL.Connection -> Publish IO (Maybe Message) -> Publish IO Value
toMessage sqlConn (Publish p) = Publish $ \entry -> do
case RedditId <$> (entry ^? key "id" . _String) of
Nothing -> p Nothing
Just redditId -> do
event <- bool Create Update <$> Membership.isSeen sqlConn redditId
p $ Just $ Message event redditId entry
amqpPublisher :: SQL.Connection -> Channel -> Text -> Publish IO Message
amqpPublisher sqlConn channel exchange = Publish $ \msg -> do
seen <- Membership.isSeen sqlConn (messageIdentifier msg)
let msg' = msg{messageType = bool Create Update seen}
void $ publishMsg channel exchange routingKey (message (A.encode msg'))
sqlRecorder :: SQL.Connection -> Publish IO (Maybe RedditId)
sqlRecorder conn = Publish $ maybe (pure ()) (Membership.recordSeen conn)
amqpPublisher :: Channel -> Text -> Publish IO (Maybe ByteString)
amqpPublisher channel exchange = Publish $ \case
Nothing -> pure ()
Just lbs ->
void $ publishMsg channel exchange routingKey (message lbs)
where
routingKey = "doesn't matter on fanout"
message lbs = newMsg
@ -75,6 +84,23 @@ amqpPublisher sqlConn channel exchange = Publish $ \msg -> do
, msgDeliveryMode = Just Persistent
}
stdoutPublisher :: Publish IO String
stdoutPublisher = Publish putStrLn
data Fetch
= Fetch Fetcher
| PublishMessage Message
| ParseFailed
fetchToLog :: Fetch -> String
fetchToLog (Fetch fetcher) = printf "Refreshing %s" (show $ fetcherSubreddit fetcher)
fetchToLog ParseFailed = printf "Failed parsing"
fetchToLog (PublishMessage msg) = messageToLog msg
where
messageToLog :: Message -> String
messageToLog m = printf "Publishing %s as type %s" (show $ messageIdentifier m) (show $ messageType m)
defaultMain :: FilePath -> IO ()
defaultMain path = do
conf <- readConfig path
@ -90,13 +116,14 @@ defaultMain path = do
chan <- openChannel conn
declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" }
sess <- newSession
let encoder = amqpPublisher sqlConn chan "reddit_posts"
let encoder = amqpPublisher chan "reddit_posts"
recorder = sqlRecorder sqlConn
publisher = encoder <> (messageIdentifier >$< recorder)
publisher = (fmap A.encode >$< encoder) <> (fmap messageIdentifier >$< recorder) <> (maybe ParseFailed PublishMessage >$< logger)
logger = fetchToLog >$< stdoutPublisher
forever $ do
for_ (conf ^. fetchers) $ \fetcher -> do
print fetcher
publishEntries (toMessage >$< publisher) sess fetcher
publish logger (Fetch fetcher)
publishEntries (toMessage sqlConn publisher) sess fetcher
threadDelay (15 * 60_000_000)
getPassword :: Password -> IO Text