From c9a7d79bcfa1dfe1128737366e0b170aa74b56a7 Mon Sep 17 00:00:00 2001 From: Mats Rauhala Date: Wed, 20 Apr 2022 21:01:01 +0300 Subject: [PATCH] Refactor the publishing and introduce logging --- src/MyLib.hs | 53 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/src/MyLib.hs b/src/MyLib.hs index 3c0df08..61fb6c0 100644 --- a/src/MyLib.hs +++ b/src/MyLib.hs @@ -5,6 +5,7 @@ {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TypeOperators #-} +{-# LANGUAGE LambdaCase #-} module MyLib (defaultMain) where import Control.Concurrent (threadDelay) @@ -43,6 +44,8 @@ import Network.AMQP import Network.Reddit (RedditId(RedditId), publishEntries) import Network.Wreq.Session (newSession) import Publish (Publish(..)) +import Text.Printf (printf) +import Data.ByteString.Lazy (ByteString) data MessageType = Create | Update deriving stock (Show, Eq, Generic) @@ -57,17 +60,23 @@ data Message = Message deriving (ToJSON, FromJSON) via AesonCodec (Field (CamelCase <<< DropPrefix "message")) Message -toMessage :: Value -> Message -toMessage entry = Message Create (RedditId (entry ^. key "id" . _String)) entry -sqlRecorder :: SQL.Connection -> Publish IO RedditId -sqlRecorder conn = Publish $ Membership.recordSeen conn +toMessage :: SQL.Connection -> Publish IO (Maybe Message) -> Publish IO Value +toMessage sqlConn (Publish p) = Publish $ \entry -> do + case RedditId <$> (entry ^? key "id" . _String) of + Nothing -> p Nothing + Just redditId -> do + event <- bool Create Update <$> Membership.isSeen sqlConn redditId + p $ Just $ Message event redditId entry -amqpPublisher :: SQL.Connection -> Channel -> Text -> Publish IO Message -amqpPublisher sqlConn channel exchange = Publish $ \msg -> do - seen <- Membership.isSeen sqlConn (messageIdentifier msg) - let msg' = msg{messageType = bool Create Update seen} - void $ publishMsg channel exchange routingKey (message (A.encode msg')) +sqlRecorder :: SQL.Connection -> Publish IO (Maybe RedditId) +sqlRecorder conn = Publish $ maybe (pure ()) (Membership.recordSeen conn) + +amqpPublisher :: Channel -> Text -> Publish IO (Maybe ByteString) +amqpPublisher channel exchange = Publish $ \case + Nothing -> pure () + Just lbs -> + void $ publishMsg channel exchange routingKey (message lbs) where routingKey = "doesn't matter on fanout" message lbs = newMsg @@ -75,6 +84,23 @@ amqpPublisher sqlConn channel exchange = Publish $ \msg -> do , msgDeliveryMode = Just Persistent } +stdoutPublisher :: Publish IO String +stdoutPublisher = Publish putStrLn + +data Fetch + = Fetch Fetcher + | PublishMessage Message + | ParseFailed + +fetchToLog :: Fetch -> String +fetchToLog (Fetch fetcher) = printf "Refreshing %s" (show $ fetcherSubreddit fetcher) +fetchToLog ParseFailed = printf "Failed parsing" +fetchToLog (PublishMessage msg) = messageToLog msg + where + messageToLog :: Message -> String + messageToLog m = printf "Publishing %s as type %s" (show $ messageIdentifier m) (show $ messageType m) + + defaultMain :: FilePath -> IO () defaultMain path = do conf <- readConfig path @@ -90,13 +116,14 @@ defaultMain path = do chan <- openChannel conn declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" } sess <- newSession - let encoder = amqpPublisher sqlConn chan "reddit_posts" + let encoder = amqpPublisher chan "reddit_posts" recorder = sqlRecorder sqlConn - publisher = encoder <> (messageIdentifier >$< recorder) + publisher = (fmap A.encode >$< encoder) <> (fmap messageIdentifier >$< recorder) <> (maybe ParseFailed PublishMessage >$< logger) + logger = fetchToLog >$< stdoutPublisher forever $ do for_ (conf ^. fetchers) $ \fetcher -> do - print fetcher - publishEntries (toMessage >$< publisher) sess fetcher + publish logger (Fetch fetcher) + publishEntries (toMessage sqlConn publisher) sess fetcher threadDelay (15 * 60_000_000) getPassword :: Password -> IO Text