Check for membership

This commit is contained in:
Mats Rauhala 2021-10-27 23:41:05 +03:00
parent faa8eb0a94
commit e61eeaf61f
3 changed files with 10 additions and 7 deletions

View File

@ -10,6 +10,7 @@ in { amqp = config.AMQP::{
, fetchers = , fetchers =
[ config.Fetcher::{ subreddit = "haskell" } [ config.Fetcher::{ subreddit = "haskell" }
, config.Fetcher::{ subreddit = "scala" } , config.Fetcher::{ subreddit = "scala" }
, config.Fetcher::{ subreddit = "all" }
, config.Fetcher::{ subreddit = "pics", entries = 150 } , config.Fetcher::{ subreddit = "pics", entries = 150 }
] ]
, sqlite = "reddit.sqlite" , sqlite = "reddit.sqlite"

View File

@ -15,6 +15,6 @@ recordSeen conn rid = SQL.execute conn [sql|insert into membership (reddit_id) v
isSeen :: Connection -> RedditId -> IO Bool isSeen :: Connection -> RedditId -> IO Bool
isSeen conn rid = isSeen conn rid =
unwrap <$> SQL.query conn [sql|select from membership (reddit_id) where reddit_id = ?|] (Only rid) unwrap <$> SQL.query conn [sql|select reddit_id from membership where reddit_id = ?|] (Only rid)
where where
unwrap = getAny . F.foldMap' (Any . const @_ @Text True . fromOnly) unwrap = getAny . F.foldMap' (Any . const @_ @Text True . fromOnly)

View File

@ -11,7 +11,6 @@ import Control.Lens
import Control.Monad (void) import Control.Monad (void)
import Data.Aeson (FromJSON, ToJSON, Value) import Data.Aeson (FromJSON, ToJSON, Value)
import qualified Data.Aeson as A import qualified Data.Aeson as A
import qualified Data.ByteString.Lazy as LB
import Data.Config import Data.Config
import Data.Deriving.Aeson import Data.Deriving.Aeson
import Data.Foldable (for_) import Data.Foldable (for_)
@ -35,6 +34,7 @@ import Network.Reddit (RedditId (RedditId), publishEntries)
import Network.Wreq.Session (newSession) import Network.Wreq.Session (newSession)
import Publish (Publish(..)) import Publish (Publish(..))
import Data.Aeson.Lens (key, _String) import Data.Aeson.Lens (key, _String)
import Data.Bool (bool)
data MessageType = Create | Update data MessageType = Create | Update
deriving stock (Show, Eq, Generic) deriving stock (Show, Eq, Generic)
@ -55,9 +55,11 @@ toMessage entry = Message Create (RedditId (entry ^. key "id" . _String)) entry
sqlRecorder :: SQL.Connection -> Publish IO RedditId sqlRecorder :: SQL.Connection -> Publish IO RedditId
sqlRecorder conn = Publish $ Membership.recordSeen conn sqlRecorder conn = Publish $ Membership.recordSeen conn
amqpPublisher :: Channel -> Text -> Publish IO LB.ByteString amqpPublisher :: SQL.Connection -> Channel -> Text -> Publish IO Message
amqpPublisher channel exchange = Publish $ \lbs -> amqpPublisher sqlConn channel exchange = Publish $ \msg -> do
void $ publishMsg channel exchange routingKey (message lbs) seen <- Membership.isSeen sqlConn (messageIdentifier msg)
let msg' = msg{messageType = bool Create Update seen}
void $ publishMsg channel exchange routingKey (message (A.encode msg'))
where where
routingKey = "doesn't matter on fanout" routingKey = "doesn't matter on fanout"
message lbs = newMsg message lbs = newMsg
@ -79,9 +81,9 @@ defaultMain = do
chan <- openChannel conn chan <- openChannel conn
declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" } declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" }
sess <- newSession sess <- newSession
let encoder = amqpPublisher chan "reddit_posts" let encoder = amqpPublisher sqlConn chan "reddit_posts"
recorder = sqlRecorder sqlConn recorder = sqlRecorder sqlConn
publisher = (A.encode >$< encoder) <> (messageIdentifier >$< recorder) publisher = encoder <> (messageIdentifier >$< recorder)
for_ (conf ^. fetchers) $ \fetcher -> do for_ (conf ^. fetchers) $ \fetcher -> do
print fetcher print fetcher
publishEntries (toMessage >$< publisher) sess fetcher publishEntries (toMessage >$< publisher) sess fetcher