Store membership
This commit is contained in:
parent
d646fc9095
commit
774eea8d70
@ -12,5 +12,6 @@ in { amqp = config.AMQP::{
|
|||||||
, config.Fetcher::{ subreddit = "scala" }
|
, config.Fetcher::{ subreddit = "scala" }
|
||||||
, config.Fetcher::{ subreddit = "pics", entries = 150 }
|
, config.Fetcher::{ subreddit = "pics", entries = 150 }
|
||||||
]
|
]
|
||||||
|
, sqlite = "reddit.sqlite"
|
||||||
}
|
}
|
||||||
: config.Type
|
: config.Type
|
||||||
|
12
default.nix
12
default.nix
@ -1,6 +1,6 @@
|
|||||||
{ mkDerivation, acid-state, aeson, amqp, base, bytestring, dhall
|
{ mkDerivation, aeson, amqp, base, bytestring, containers, dhall
|
||||||
, hedgehog, hspec, hspec-hedgehog, lens, lens-aeson, lib, mtl
|
, hedgehog, hspec, hspec-hedgehog, lens, lens-aeson, lib, mtl
|
||||||
, pipes, safecopy, text, wreq
|
, pipes, sqlite-simple, text, wreq
|
||||||
}:
|
}:
|
||||||
mkDerivation {
|
mkDerivation {
|
||||||
pname = "reddit-pub";
|
pname = "reddit-pub";
|
||||||
@ -9,11 +9,13 @@ mkDerivation {
|
|||||||
isLibrary = true;
|
isLibrary = true;
|
||||||
isExecutable = true;
|
isExecutable = true;
|
||||||
libraryHaskellDepends = [
|
libraryHaskellDepends = [
|
||||||
acid-state aeson amqp base bytestring dhall lens lens-aeson mtl
|
aeson amqp base bytestring containers dhall lens lens-aeson mtl
|
||||||
pipes safecopy text wreq
|
pipes sqlite-simple text wreq
|
||||||
];
|
];
|
||||||
executableHaskellDepends = [ base ];
|
executableHaskellDepends = [ base ];
|
||||||
testHaskellDepends = [ base hedgehog hspec hspec-hedgehog ];
|
testHaskellDepends = [
|
||||||
|
base bytestring containers hedgehog hspec hspec-hedgehog mtl
|
||||||
|
];
|
||||||
license = "unknown";
|
license = "unknown";
|
||||||
hydraPlatforms = lib.platforms.none;
|
hydraPlatforms = lib.platforms.none;
|
||||||
}
|
}
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
{ amqp : ./AMQP/Type.dhall
|
{ amqp : ./AMQP/Type.dhall
|
||||||
, fetchers : List ./Fetcher/Type.dhall
|
, fetchers : List ./Fetcher/Type.dhall
|
||||||
|
, sqlite : Text
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,7 @@ library
|
|||||||
Data.SubReddit
|
Data.SubReddit
|
||||||
Publish
|
Publish
|
||||||
Data.Trie
|
Data.Trie
|
||||||
|
Membership
|
||||||
|
|
||||||
-- Modules included in this library but not exported.
|
-- Modules included in this library but not exported.
|
||||||
-- other-modules:
|
-- other-modules:
|
||||||
@ -47,9 +48,8 @@ library
|
|||||||
, dhall
|
, dhall
|
||||||
, wreq
|
, wreq
|
||||||
, pipes
|
, pipes
|
||||||
, safecopy
|
|
||||||
, acid-state
|
|
||||||
, containers
|
, containers
|
||||||
|
, sqlite-simple
|
||||||
hs-source-dirs: src
|
hs-source-dirs: src
|
||||||
default-language: Haskell2010
|
default-language: Haskell2010
|
||||||
|
|
||||||
|
@ -54,6 +54,7 @@ data Qualifier = Top | Controversial
|
|||||||
data Config = Config
|
data Config = Config
|
||||||
{ configAmqp :: AMQP
|
{ configAmqp :: AMQP
|
||||||
, configFetchers :: [Fetcher]
|
, configFetchers :: [Fetcher]
|
||||||
|
, configSqlite :: FilePath
|
||||||
}
|
}
|
||||||
deriving stock (Generic, Show)
|
deriving stock (Generic, Show)
|
||||||
deriving (FromDhall, ToDhall)
|
deriving (FromDhall, ToDhall)
|
||||||
@ -65,5 +66,8 @@ amqp = lens configAmqp (\ con am -> con{configAmqp=am})
|
|||||||
fetchers :: Lens' Config [Fetcher]
|
fetchers :: Lens' Config [Fetcher]
|
||||||
fetchers = lens configFetchers (\ con fes -> con{configFetchers=fes})
|
fetchers = lens configFetchers (\ con fes -> con{configFetchers=fes})
|
||||||
|
|
||||||
|
sqlite :: Lens' Config FilePath
|
||||||
|
sqlite = lens configSqlite (\ con s -> con{configSqlite=s})
|
||||||
|
|
||||||
readConfig :: FilePath -> IO Config
|
readConfig :: FilePath -> IO Config
|
||||||
readConfig = inputFile auto
|
readConfig = inputFile auto
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
{-# LANGUAGE DeriveFunctor #-}
|
{-# LANGUAGE DeriveFunctor #-}
|
||||||
{-# LANGUAGE LambdaCase #-}
|
{-# LANGUAGE LambdaCase #-}
|
||||||
{-# LANGUAGE ScopedTypeVariables #-}
|
{-# LANGUAGE ScopedTypeVariables #-}
|
||||||
|
{-# LANGUAGE TypeSynonymInstances #-}
|
||||||
-- I know theres the bytestring-trie package, but it's been kind of
|
-- I know theres the bytestring-trie package, but it's been kind of
|
||||||
-- unmaintained lately, I don't have the time to start pinning and stuff.
|
-- unmaintained lately, I don't have the time to start pinning and stuff.
|
||||||
module Data.Trie where
|
module Data.Trie where
|
||||||
@ -14,10 +15,15 @@ import qualified Data.ByteString as B
|
|||||||
import Data.Word (Word8)
|
import Data.Word (Word8)
|
||||||
import Data.Maybe (isJust)
|
import Data.Maybe (isJust)
|
||||||
|
|
||||||
|
data Pair a b = Pair !a !b
|
||||||
|
|
||||||
|
instance (Semigroup a, Semigroup b) => Semigroup (Pair a b) where
|
||||||
|
(Pair a b) <> (Pair a' b') = Pair (a <> a') (b <> b')
|
||||||
|
|
||||||
-- XXX: This is as lazy as it gets, strictify where it's needed
|
-- XXX: This is as lazy as it gets, strictify where it's needed
|
||||||
data TrieF a f
|
data TrieF a f
|
||||||
= Empty
|
= Empty
|
||||||
| ElementNode (Maybe (ByteString, a)) (Map Word8 f)
|
| ElementNode (Maybe (Pair ByteString a)) (Map Word8 f)
|
||||||
deriving Functor
|
deriving Functor
|
||||||
|
|
||||||
newtype Fix f = Fix { getFix :: f (Fix f) }
|
newtype Fix f = Fix { getFix :: f (Fix f) }
|
||||||
@ -38,7 +44,7 @@ fromList = F.foldl' (\acc (k,v) -> insert k v acc) empty
|
|||||||
toList :: Trie a -> [(ByteString, a)]
|
toList :: Trie a -> [(ByteString, a)]
|
||||||
toList = cata $ \case
|
toList = cata $ \case
|
||||||
Empty -> []
|
Empty -> []
|
||||||
ElementNode (Just a) children -> a : concat (M.elems children)
|
ElementNode (Just (Pair a b)) children -> (a,b) : concat (M.elems children)
|
||||||
ElementNode Nothing children -> concat (M.elems children)
|
ElementNode Nothing children -> concat (M.elems children)
|
||||||
|
|
||||||
insert :: Semigroup a => ByteString -> a -> Trie a -> Trie a
|
insert :: Semigroup a => ByteString -> a -> Trie a -> Trie a
|
||||||
@ -47,13 +53,15 @@ insert k v = union (singleton k v)
|
|||||||
singleton :: ByteString -> a -> Trie a
|
singleton :: ByteString -> a -> Trie a
|
||||||
singleton bs a = ana go (B.uncons bs)
|
singleton bs a = ana go (B.uncons bs)
|
||||||
where
|
where
|
||||||
go Nothing = ElementNode (Just (bs, a)) M.empty
|
go Nothing = let x = Pair bs a in x `seq` ElementNode (Just x) M.empty
|
||||||
go (Just (w8,children)) = ElementNode Nothing (M.singleton w8 (B.uncons children))
|
go (Just (w8,children)) = ElementNode Nothing (M.singleton w8 (B.uncons children))
|
||||||
|
|
||||||
union :: Semigroup a => Trie a -> Trie a -> Trie a
|
union :: Semigroup a => Trie a -> Trie a -> Trie a
|
||||||
union a (Fix Empty) = a
|
union a (Fix Empty) = a
|
||||||
union (Fix Empty) b = b
|
union (Fix Empty) b = b
|
||||||
union (Fix (ElementNode ma' x1)) (Fix (ElementNode ma x0)) = Fix (ElementNode (ma' <> ma) (M.unionWith union x1 x0))
|
union (Fix (ElementNode ma' x1)) (Fix (ElementNode ma x0)) =
|
||||||
|
let x = ma' <> ma
|
||||||
|
in x `seq` Fix (ElementNode x (M.unionWith union x1 x0))
|
||||||
|
|
||||||
empty :: Trie a
|
empty :: Trie a
|
||||||
empty = Fix Empty
|
empty = Fix Empty
|
||||||
@ -63,7 +71,7 @@ lookup bs t = cata go t $ B.uncons bs
|
|||||||
where
|
where
|
||||||
go :: TrieF a (Maybe (Word8, ByteString) -> Maybe a) -> Maybe (Word8, ByteString) -> Maybe a
|
go :: TrieF a (Maybe (Word8, ByteString) -> Maybe a) -> Maybe (Word8, ByteString) -> Maybe a
|
||||||
go Empty _ = Nothing
|
go Empty _ = Nothing
|
||||||
go (ElementNode ma' _) Nothing = fmap snd ma'
|
go (ElementNode ma' _) Nothing = fmap (\(Pair _ b) -> b) ma'
|
||||||
go (ElementNode _ m) (Just (w8,xs)) = M.lookup w8 m >>= \next -> next (B.uncons xs)
|
go (ElementNode _ m) (Just (w8,xs)) = M.lookup w8 m >>= \next -> next (B.uncons xs)
|
||||||
|
|
||||||
member :: ByteString -> Trie a -> Bool
|
member :: ByteString -> Trie a -> Bool
|
||||||
|
20
src/Membership.hs
Normal file
20
src/Membership.hs
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
{-# LANGUAGE QuasiQuotes #-}
|
||||||
|
{-# LANGUAGE TypeApplications #-}
|
||||||
|
module Membership where
|
||||||
|
|
||||||
|
import Database.SQLite.Simple (Connection, Only (..))
|
||||||
|
import Network.Reddit (RedditId)
|
||||||
|
import qualified Database.SQLite.Simple as SQL
|
||||||
|
import Database.SQLite.Simple.QQ (sql)
|
||||||
|
import qualified Data.Foldable as F
|
||||||
|
import Data.Monoid (Any(..))
|
||||||
|
import Data.Text (Text)
|
||||||
|
|
||||||
|
recordSeen :: Connection -> RedditId -> IO ()
|
||||||
|
recordSeen conn rid = SQL.execute conn [sql|insert into membership (reddit_id) values (?) on conflict do nothing|] (Only rid)
|
||||||
|
|
||||||
|
isSeen :: Connection -> RedditId -> IO Bool
|
||||||
|
isSeen conn rid =
|
||||||
|
unwrap <$> SQL.query conn [sql|select from membership (reddit_id) where reddit_id = ?|] (Only rid)
|
||||||
|
where
|
||||||
|
unwrap = getAny . F.foldMap' (Any . const @_ @Text True . fromOnly)
|
29
src/MyLib.hs
29
src/MyLib.hs
@ -12,10 +12,14 @@ import Data.Functor.Contravariant ((>$<))
|
|||||||
import Data.Text (Text)
|
import Data.Text (Text)
|
||||||
import qualified Data.Text.Strict.Lens as T
|
import qualified Data.Text.Strict.Lens as T
|
||||||
import Network.AMQP
|
import Network.AMQP
|
||||||
import Network.Reddit (publishEntries)
|
import Network.Reddit (publishEntries, RedditId, messageIdentifier)
|
||||||
import Network.Wreq.Session (newSession)
|
import Network.Wreq.Session (newSession)
|
||||||
import Publish (Publish(..))
|
import Publish (Publish(..))
|
||||||
|
import qualified Database.SQLite.Simple as SQL
|
||||||
|
import qualified Membership
|
||||||
|
|
||||||
|
sqlRecorder :: SQL.Connection -> Publish IO RedditId
|
||||||
|
sqlRecorder conn = Publish $ Membership.recordSeen conn
|
||||||
|
|
||||||
amqpPublisher :: Channel -> Text -> Publish IO LB.ByteString
|
amqpPublisher :: Channel -> Text -> Publish IO LB.ByteString
|
||||||
amqpPublisher channel exchange = Publish $ \lbs ->
|
amqpPublisher channel exchange = Publish $ \lbs ->
|
||||||
@ -30,17 +34,20 @@ amqpPublisher channel exchange = Publish $ \lbs ->
|
|||||||
defaultMain :: IO ()
|
defaultMain :: IO ()
|
||||||
defaultMain = do
|
defaultMain = do
|
||||||
conf <- readConfig "./config.dhall"
|
conf <- readConfig "./config.dhall"
|
||||||
let connect = openConnection
|
let rabbitConnect = openConnection
|
||||||
(conf ^. amqp . host . T.unpacked)
|
(conf ^. amqp . host . T.unpacked)
|
||||||
(conf ^. amqp . vhost)
|
(conf ^. amqp . vhost)
|
||||||
(conf ^. amqp . username)
|
(conf ^. amqp . username)
|
||||||
(conf ^. amqp . password)
|
(conf ^. amqp . password)
|
||||||
bracket connect closeConnection $ \conn -> do
|
bracket rabbitConnect closeConnection $ \conn -> do
|
||||||
putStrLn "Hello"
|
SQL.withConnection (conf ^. sqlite) $ \sqlConn -> do
|
||||||
chan <- openChannel conn
|
SQL.execute_ sqlConn "create table if not exists membership (reddit_id primary key)"
|
||||||
declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" }
|
chan <- openChannel conn
|
||||||
sess <- newSession
|
declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" }
|
||||||
let encoder = amqpPublisher chan "reddit_posts"
|
sess <- newSession
|
||||||
for_ (conf ^. fetchers) $ \fetcher -> do
|
let encoder = amqpPublisher chan "reddit_posts"
|
||||||
print fetcher
|
recorder = sqlRecorder sqlConn
|
||||||
publishEntries (A.encode >$< encoder) sess fetcher
|
publisher = (A.encode >$< encoder) <> (messageIdentifier >$< recorder)
|
||||||
|
for_ (conf ^. fetchers) $ \fetcher -> do
|
||||||
|
print fetcher
|
||||||
|
publishEntries publisher sess fetcher
|
||||||
|
@ -22,6 +22,8 @@ import qualified Pipes.Prelude as P
|
|||||||
import Publish
|
import Publish
|
||||||
import Data.Maybe (maybeToList)
|
import Data.Maybe (maybeToList)
|
||||||
import Control.Monad.Trans (liftIO)
|
import Control.Monad.Trans (liftIO)
|
||||||
|
import Database.SQLite.Simple.ToField (ToField)
|
||||||
|
import Database.SQLite.Simple.FromField (FromField)
|
||||||
|
|
||||||
data MessageType = Create | Update
|
data MessageType = Create | Update
|
||||||
deriving stock (Show, Eq, Generic)
|
deriving stock (Show, Eq, Generic)
|
||||||
@ -29,7 +31,7 @@ data MessageType = Create | Update
|
|||||||
|
|
||||||
newtype RedditId = RedditId Text
|
newtype RedditId = RedditId Text
|
||||||
deriving stock (Show, Eq)
|
deriving stock (Show, Eq)
|
||||||
deriving (ToJSON, FromJSON) via Text
|
deriving (ToJSON, FromJSON, ToField, FromField) via Text
|
||||||
|
|
||||||
data Message = Message
|
data Message = Message
|
||||||
{ messageType :: MessageType
|
{ messageType :: MessageType
|
||||||
|
@ -2,6 +2,8 @@
|
|||||||
module Publish where
|
module Publish where
|
||||||
|
|
||||||
import Data.Functor.Contravariant
|
import Data.Functor.Contravariant
|
||||||
|
import Data.Monoid (Ap(..))
|
||||||
|
|
||||||
newtype Publish m a = Publish { publish :: a -> m () }
|
newtype Publish m a = Publish { publish :: a -> m () }
|
||||||
deriving Contravariant via (Op (m ()))
|
deriving Contravariant via (Op (m ()))
|
||||||
|
deriving (Monoid, Semigroup) via (Op (Ap m ()) a)
|
||||||
|
Loading…
Reference in New Issue
Block a user