Move message to MyLib

This commit is contained in:
Mats Rauhala 2021-10-27 23:30:48 +03:00
parent 774eea8d70
commit 1b0b4c2ab8
2 changed files with 43 additions and 26 deletions

View File

@ -1,22 +1,56 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeOperators #-}
module MyLib (defaultMain) where module MyLib (defaultMain) where
import Control.Exception (bracket) import Control.Exception (bracket)
import Control.Lens import Control.Lens
import Control.Monad (void) import Control.Monad (void)
import Data.Aeson (FromJSON, ToJSON, Value)
import qualified Data.Aeson as A import qualified Data.Aeson as A
import qualified Data.ByteString.Lazy as LB import qualified Data.ByteString.Lazy as LB
import Data.Config import Data.Config
import Data.Deriving.Aeson
import Data.Foldable (for_) import Data.Foldable (for_)
import Data.Functor.Contravariant ((>$<)) import Data.Functor.Contravariant ((>$<))
import Data.Text (Text) import Data.Text (Text)
import qualified Data.Text.Strict.Lens as T import qualified Data.Text.Strict.Lens as T
import qualified Database.SQLite.Simple as SQL
import GHC.Generics (Generic)
import qualified Membership
import Network.AMQP import Network.AMQP
import Network.Reddit (publishEntries, RedditId, messageIdentifier) ( Channel
, DeliveryMode(Persistent)
, exchangeName
, exchangeType
, msgBody
, msgDeliveryMode
, newMsg
, publishMsg, openConnection, closeConnection, openChannel, newExchange, declareExchange
)
import Network.Reddit (RedditId (RedditId), publishEntries)
import Network.Wreq.Session (newSession) import Network.Wreq.Session (newSession)
import Publish (Publish(..)) import Publish (Publish(..))
import qualified Database.SQLite.Simple as SQL import Data.Aeson.Lens (key, _String)
import qualified Membership
data MessageType = Create | Update
deriving stock (Show, Eq, Generic)
deriving anyclass (ToJSON, FromJSON)
data Message = Message
{ messageType :: MessageType
, messageIdentifier :: RedditId
, messageContent :: Value
}
deriving stock (Show, Eq, Generic)
deriving (ToJSON, FromJSON)
via AesonCodec (Field (CamelCase <<< DropPrefix "message")) Message
toMessage :: Value -> Message
toMessage entry = Message Create (RedditId (entry ^. key "id" . _String)) entry
sqlRecorder :: SQL.Connection -> Publish IO RedditId sqlRecorder :: SQL.Connection -> Publish IO RedditId
sqlRecorder conn = Publish $ Membership.recordSeen conn sqlRecorder conn = Publish $ Membership.recordSeen conn
@ -50,4 +84,4 @@ defaultMain = do
publisher = (A.encode >$< encoder) <> (messageIdentifier >$< recorder) publisher = (A.encode >$< encoder) <> (messageIdentifier >$< recorder)
for_ (conf ^. fetchers) $ \fetcher -> do for_ (conf ^. fetchers) $ \fetcher -> do
print fetcher print fetcher
publishEntries publisher sess fetcher publishEntries (toMessage >$< publisher) sess fetcher

View File

@ -1,20 +1,15 @@
{-# LANGUAGE DataKinds #-} {-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingVia #-} {-# LANGUAGE DerivingVia #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-} {-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeOperators #-}
module Network.Reddit where module Network.Reddit where
import Control.Lens import Control.Lens
import Data.Aeson (FromJSON, ToJSON, Value) import Data.Aeson (FromJSON, ToJSON, Value)
import Data.Aeson.Lens import Data.Aeson.Lens
import Data.Config import Data.Config
import Data.Deriving.Aeson
import Data.SubReddit import Data.SubReddit
import Data.Text (Text) import Data.Text (Text)
import GHC.Generics (Generic)
import Network.Wreq hiding (getWith) import Network.Wreq hiding (getWith)
import Network.Wreq.Session (Session, getWith) import Network.Wreq.Session (Session, getWith)
import Pipes (Producer, (>->), for, runEffect) import Pipes (Producer, (>->), for, runEffect)
@ -25,37 +20,25 @@ import Control.Monad.Trans (liftIO)
import Database.SQLite.Simple.ToField (ToField) import Database.SQLite.Simple.ToField (ToField)
import Database.SQLite.Simple.FromField (FromField) import Database.SQLite.Simple.FromField (FromField)
data MessageType = Create | Update
deriving stock (Show, Eq, Generic)
deriving anyclass (ToJSON, FromJSON)
newtype RedditId = RedditId Text newtype RedditId = RedditId Text
deriving stock (Show, Eq) deriving stock (Show, Eq)
deriving (ToJSON, FromJSON, ToField, FromField) via Text deriving (ToJSON, FromJSON, ToField, FromField) via Text
data Message = Message
{ messageType :: MessageType
, messageIdentifier :: RedditId
, messageContent :: Value
}
deriving stock (Show, Eq, Generic)
deriving (ToJSON, FromJSON)
via AesonCodec (Field (CamelCase <<< DropPrefix "message")) Message
messages :: Session -> SubReddit -> Producer Message IO () messages :: Session -> SubReddit -> Producer Value IO ()
messages sess sre = P.unfoldr go Nothing >-> P.concat messages sess sre = P.unfoldr go Nothing >-> P.concat
where where
go :: Maybe Text -> IO (Either () ([Message], Maybe Text)) go :: Maybe Text -> IO (Either () ([Value], Maybe Text))
go after = do go after = do
let opts = defaults & header "User-Agent" .~ ["reddit-pubsub"] & param "after" .~ (maybeToList after) let opts = defaults & header "User-Agent" .~ ["reddit-pubsub"] & param "after" .~ maybeToList after
r <- getWith opts sess ("https://www.reddit.com/r/" <> getSubReddit sre <> ".json") r <- getWith opts sess ("https://www.reddit.com/r/" <> getSubReddit sre <> ".json")
let xs = r ^.. responseBody . key "data" . key "children" . _Array . traversed . key "data" let msgs = r ^.. responseBody . key "data" . key "children" . _Array . traversed . key "data"
next = r ^? responseBody . key "data" . key "after" . _String next = r ^? responseBody . key "data" . key "after" . _String
msgs = [Message Create (RedditId (entry ^. key "id" . _String)) entry | entry <- xs]
print next print next
pure $ Right (msgs, next) pure $ Right (msgs, next)
publishEntries :: Publish IO Message -> Session -> Fetcher -> IO () publishEntries :: Publish IO Value -> Session -> Fetcher -> IO ()
publishEntries publisher sess fetcher = publishEntries publisher sess fetcher =
runEffect $ runEffect $
for for