Initial commit
This commit is contained in:
69
src/Data/Config.hs
Normal file
69
src/Data/Config.hs
Normal file
@ -0,0 +1,69 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DeriveGeneric #-}
|
||||
{-# LANGUAGE DerivingVia #-}
|
||||
{-# LANGUAGE TypeOperators #-}
|
||||
module Data.Config where
|
||||
|
||||
import Control.Lens
|
||||
import Data.Text (Text)
|
||||
import Dhall (FromDhall, Generic, ToDhall, auto, inputFile)
|
||||
import Dhall.Deriving
|
||||
import Numeric.Natural (Natural)
|
||||
import Data.SubReddit (SubReddit)
|
||||
|
||||
data AMQP = AMQP
|
||||
{ amqpVhost :: Text
|
||||
, amqpUsername :: Text
|
||||
, amqpPassword :: Text
|
||||
, amqpHost :: Text
|
||||
}
|
||||
deriving stock (Generic, Show)
|
||||
deriving (FromDhall, ToDhall)
|
||||
via (Codec (Field (CamelCase <<< DropPrefix "amqp"))) AMQP
|
||||
|
||||
host :: Lens' AMQP Text
|
||||
host = lens amqpHost (\ am txt -> am{amqpHost=txt})
|
||||
|
||||
vhost :: Lens' AMQP Text
|
||||
vhost = lens amqpVhost (\ am txt -> am{amqpVhost=txt})
|
||||
|
||||
username :: Lens' AMQP Text
|
||||
username = lens amqpUsername (\ am txt -> am{amqpUsername=txt})
|
||||
|
||||
password :: Lens' AMQP Text
|
||||
password = lens amqpPassword (\ am txt -> am{amqpPassword=txt})
|
||||
|
||||
data Fetcher = Fetcher
|
||||
{ fetcherSubreddit :: SubReddit
|
||||
, fetcherEntries :: Natural
|
||||
, fetcherQualifier :: Maybe Qualifier
|
||||
}
|
||||
deriving stock (Show, Generic)
|
||||
deriving (FromDhall, ToDhall) via Codec (Field (CamelCase <<< DropPrefix "fetcher")) Fetcher
|
||||
|
||||
subreddit :: Lens' Fetcher SubReddit
|
||||
subreddit = lens fetcherSubreddit (\ fe sr -> fe{fetcherSubreddit=sr})
|
||||
|
||||
entries :: Lens' Fetcher Natural
|
||||
entries = lens fetcherEntries (\ fe nat -> fe{fetcherEntries=nat})
|
||||
|
||||
data Qualifier = Top | Controversial
|
||||
deriving stock (Show, Generic)
|
||||
deriving (FromDhall, ToDhall) via Codec (Constructor TitleCase) Qualifier
|
||||
|
||||
data Config = Config
|
||||
{ configAmqp :: AMQP
|
||||
, configFetchers :: [Fetcher]
|
||||
}
|
||||
deriving stock (Generic, Show)
|
||||
deriving (FromDhall, ToDhall)
|
||||
via (Codec (Field (CamelCase <<< DropPrefix "config"))) Config
|
||||
|
||||
amqp :: Lens' Config AMQP
|
||||
amqp = lens configAmqp (\ con am -> con{configAmqp=am})
|
||||
|
||||
fetchers :: Lens' Config [Fetcher]
|
||||
fetchers = lens configFetchers (\ con fes -> con{configFetchers=fes})
|
||||
|
||||
readConfig :: FilePath -> IO Config
|
||||
readConfig = inputFile auto
|
41
src/Data/Deriving/Aeson.hs
Normal file
41
src/Data/Deriving/Aeson.hs
Normal file
@ -0,0 +1,41 @@
|
||||
{-# LANGUAGE AllowAmbiguousTypes #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# LANGUAGE TypeOperators #-}
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
module Data.Deriving.Aeson
|
||||
( AesonCodec(..)
|
||||
, type (<<<)
|
||||
, Field
|
||||
, CamelCase
|
||||
, DropPrefix
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Lens
|
||||
import Data.Aeson
|
||||
import qualified Data.Text.Strict.Lens as T
|
||||
import Dhall.Deriving
|
||||
import GHC.Generics (Generic, Rep)
|
||||
|
||||
newtype AesonCodec codec a = AesonCodec a
|
||||
|
||||
class ModifyAesonOptions a where
|
||||
modifyAesonOptions :: Options -> Options
|
||||
|
||||
instance (ModifyAesonOptions f, ModifyAesonOptions g) => ModifyAesonOptions (f <<< g) where
|
||||
modifyAesonOptions = modifyAesonOptions @f . modifyAesonOptions @g
|
||||
|
||||
instance TextFunction f => ModifyAesonOptions (Field f) where
|
||||
modifyAesonOptions opts = opts{fieldLabelModifier = over T.packed (textFunction @f) }
|
||||
|
||||
instance (Generic a, ModifyAesonOptions codec, GToJSON' Value Zero (Rep a))
|
||||
=> ToJSON (AesonCodec codec a) where
|
||||
toJSON (AesonCodec a) =
|
||||
genericToJSON (modifyAesonOptions @codec defaultOptions) a
|
||||
|
||||
instance (ModifyAesonOptions codec, Generic a, GFromJSON Zero (Rep a))
|
||||
=> FromJSON (AesonCodec codec a) where
|
||||
parseJSON va =
|
||||
AesonCodec <$> genericParseJSON (modifyAesonOptions @codec defaultOptions) va
|
9
src/Data/SubReddit.hs
Normal file
9
src/Data/SubReddit.hs
Normal file
@ -0,0 +1,9 @@
|
||||
{-# LANGUAGE DerivingVia #-}
|
||||
module Data.SubReddit where
|
||||
|
||||
import Dhall (FromDhall, ToDhall)
|
||||
|
||||
newtype SubReddit = SubReddit { getSubReddit :: String }
|
||||
deriving Show
|
||||
deriving (FromDhall, ToDhall) via String
|
||||
|
44
src/MyLib.hs
Normal file
44
src/MyLib.hs
Normal file
@ -0,0 +1,44 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
module MyLib (someFunc) where
|
||||
|
||||
import Control.Exception (bracket)
|
||||
import Control.Lens
|
||||
import Data.Config
|
||||
import qualified Data.Text.Strict.Lens as T
|
||||
import Network.AMQP
|
||||
import Network.AMQP.Reddit (publishEntries)
|
||||
import Network.Wreq.Session (newSession)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.ByteString.Lazy as LB
|
||||
import Control.Monad (void)
|
||||
import qualified Data.Aeson as A
|
||||
import Data.Functor.Contravariant ((>$<))
|
||||
import Publish (Publish(..))
|
||||
import Data.Foldable (for_)
|
||||
|
||||
|
||||
amqpPublisher :: Channel -> Text -> Publish IO LB.ByteString
|
||||
amqpPublisher channel exchange = Publish $ \lbs -> void $ publishMsg channel exchange "" (message lbs)
|
||||
where
|
||||
message lbs = newMsg
|
||||
{ msgBody = lbs
|
||||
, msgDeliveryMode = Just Persistent
|
||||
}
|
||||
|
||||
someFunc :: IO ()
|
||||
someFunc = do
|
||||
conf <- readConfig "./config.dhall"
|
||||
let connect = openConnection
|
||||
(conf ^. amqp . host . T.unpacked)
|
||||
(conf ^. amqp . vhost)
|
||||
(conf ^. amqp . username)
|
||||
(conf ^. amqp . password)
|
||||
bracket connect closeConnection $ \conn -> do
|
||||
putStrLn "Hello"
|
||||
chan <- openChannel conn
|
||||
declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" }
|
||||
sess <- newSession
|
||||
let encoder = amqpPublisher chan "reddit_posts"
|
||||
for_ (conf ^. fetchers) $ \fetcher -> do
|
||||
print fetcher
|
||||
publishEntries (A.encode >$< encoder) sess fetcher
|
61
src/Network/AMQP/Reddit.hs
Normal file
61
src/Network/AMQP/Reddit.hs
Normal file
@ -0,0 +1,61 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DeriveAnyClass #-}
|
||||
{-# LANGUAGE DeriveGeneric #-}
|
||||
{-# LANGUAGE DerivingVia #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
{-# LANGUAGE TypeOperators #-}
|
||||
module Network.AMQP.Reddit where
|
||||
|
||||
import Control.Lens
|
||||
import Data.Aeson (FromJSON, ToJSON, Value)
|
||||
import Data.Aeson.Lens
|
||||
import Data.Config
|
||||
import Data.Deriving.Aeson
|
||||
import Data.SubReddit
|
||||
import Data.Text (Text)
|
||||
import GHC.Generics (Generic)
|
||||
import Network.Wreq hiding (getWith)
|
||||
import Network.Wreq.Session (Session, getWith)
|
||||
import Pipes (Producer, (>->), for, runEffect)
|
||||
import qualified Pipes.Prelude as P
|
||||
import Publish
|
||||
import Data.Maybe (maybeToList)
|
||||
import Control.Monad.Trans (liftIO)
|
||||
|
||||
data MessageType = Create | Update
|
||||
deriving stock (Show, Eq, Generic)
|
||||
deriving anyclass (ToJSON, FromJSON)
|
||||
|
||||
newtype RedditId = RedditId Text
|
||||
deriving stock (Show, Eq)
|
||||
deriving (ToJSON, FromJSON) via Text
|
||||
|
||||
data Message = Message
|
||||
{ messageType :: MessageType
|
||||
, messageIdentifier :: RedditId
|
||||
, messageContent :: Value
|
||||
}
|
||||
deriving stock (Show, Eq, Generic)
|
||||
deriving (ToJSON, FromJSON)
|
||||
via AesonCodec (Field (CamelCase <<< DropPrefix "message")) Message
|
||||
|
||||
messages :: Session -> SubReddit -> Producer Message IO ()
|
||||
messages sess sre = P.unfoldr go Nothing >-> P.concat
|
||||
where
|
||||
go :: Maybe Text -> IO (Either () ([Message], Maybe Text))
|
||||
go after = do
|
||||
let opts = defaults & header "User-Agent" .~ ["reddit-pubsub"] & param "after" .~ (maybeToList after)
|
||||
r <- getWith opts sess ("https://www.reddit.com/r/" <> getSubReddit sre <> ".json")
|
||||
let xs = r ^.. responseBody . key "data" . key "children" . _Array . traversed . key "data"
|
||||
next = r ^? responseBody . key "data" . key "after" . _String
|
||||
msgs = [Message Create (RedditId (entry ^. key "id" . _String)) entry | entry <- xs]
|
||||
print next
|
||||
pure $ Right (msgs, next)
|
||||
|
||||
publishEntries :: Publish IO Message -> Session -> Fetcher -> IO ()
|
||||
publishEntries publisher sess fetcher =
|
||||
runEffect $
|
||||
for
|
||||
(messages sess (fetcher ^. subreddit) >-> P.take (fromIntegral $ fetcher ^. entries))
|
||||
(liftIO . publish publisher)
|
7
src/Publish.hs
Normal file
7
src/Publish.hs
Normal file
@ -0,0 +1,7 @@
|
||||
{-# LANGUAGE DerivingVia #-}
|
||||
module Publish where
|
||||
|
||||
import Data.Functor.Contravariant
|
||||
|
||||
newtype Publish m a = Publish { publish :: a -> m () }
|
||||
deriving Contravariant via (Op (m ()))
|
Reference in New Issue
Block a user