Compare commits

..

12 Commits

Author SHA1 Message Date
d253cbaa82 Can parse the tags 2022-05-16 22:49:54 +03:00
c0dbc5f0d2 Listen for messages 2022-05-16 22:13:58 +03:00
ce3773b3ee Boilerplate for the tags 2022-05-16 21:49:01 +03:00
03b4cfb3bf Split to multiproject 2022-05-16 21:42:11 +03:00
0266d4b06b Setup vhost 2022-05-16 21:01:16 +03:00
9f3196cd1f Script for setting up rabbitmq locally 2022-05-16 20:50:26 +03:00
c9a7d79bcf Refactor the publishing and introduce logging 2022-04-20 21:01:01 +03:00
21013d7e40 Run forever 2022-04-20 20:29:04 +03:00
9002d3424c Update packages 2022-04-20 20:27:30 +03:00
94f4593fdc Pass from file 2022-02-10 22:34:55 +02:00
63411532c4 Configurable path 2022-02-10 21:47:09 +02:00
1ccf953c57 Package dhall 2022-02-10 18:07:49 +02:00
46 changed files with 471 additions and 152 deletions

1
cabal.project Normal file
View File

@ -0,0 +1 @@
packages: */*

View File

@ -1,30 +1,12 @@
{ {
"nodes": { "nodes": {
"easy-hls": {
"inputs": {
"nixpkgs": "nixpkgs"
},
"locked": {
"lastModified": 1636606878,
"narHash": "sha256-rLxYl7iYP9vQhSvVlV2uRCdgrqKDz/vN1Z8ZmA8itkM=",
"owner": "jkachmar",
"repo": "easy-hls-nix",
"rev": "edd5710946d46ea40810ef9a708b084d7e05a118",
"type": "github"
},
"original": {
"owner": "jkachmar",
"repo": "easy-hls-nix",
"type": "github"
}
},
"flake-utils": { "flake-utils": {
"locked": { "locked": {
"lastModified": 1637014545, "lastModified": 1649676176,
"narHash": "sha256-26IZAc5yzlD9FlDT54io1oqG/bBoyka+FJk5guaX4x4=", "narHash": "sha256-OWKJratjt2RW151VUlJPRALb7OU2S5s+f0vLj4o1bHM=",
"owner": "numtide", "owner": "numtide",
"repo": "flake-utils", "repo": "flake-utils",
"rev": "bba5dcc8e0b20ab664967ad83d24d64cb64ec4f4", "rev": "a4b154ebbdc88c8498a5c7b01589addc9e9cb678",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -35,26 +17,11 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1615055905, "lastModified": 1650440614,
"narHash": "sha256-Ig7CXgE5C3mwtTVBl8nSTessa1oMAm6Pm/p+T6iAJD8=", "narHash": "sha256-7mF7gyS5P3UmZmQuo9jbikP2wMyRUnimI7HcKLJ9OZQ=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "f3fc074642a25ef5a1423412f09946149237e338",
"type": "github"
},
"original": {
"owner": "nixos",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1637238590,
"narHash": "sha256-zGI/dR9WZvfZE0Eyk7zC4Y8ukxC/oSbfApqIvSCtB+o=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "57407fed58509a8d731d525f39842ea5ab696237", "rev": "0bbb65673c0ba31047c9ba6c4cd211556b534a4e",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -64,9 +31,8 @@
}, },
"root": { "root": {
"inputs": { "inputs": {
"easy-hls": "easy-hls",
"flake-utils": "flake-utils", "flake-utils": "flake-utils",
"nixpkgs": "nixpkgs_2" "nixpkgs": "nixpkgs"
} }
} }
}, },

View File

@ -2,19 +2,20 @@
description = "A very basic flake"; description = "A very basic flake";
inputs = { inputs = {
easy-hls = { url = "github:jkachmar/easy-hls-nix"; };
flake-utils = { flake-utils = {
url = "github:numtide/flake-utils"; url = "github:numtide/flake-utils";
inputs.nixpkgs.follows = "nixpkgs"; inputs.nixpkgs.follows = "nixpkgs";
}; };
}; };
outputs = { self, nixpkgs, flake-utils, easy-hls }: outputs = { self, nixpkgs, flake-utils }:
{ {
overlay = final: prev: { overlay = final: prev: {
haskellPackages = prev.haskellPackages.override ( old: { haskellPackages = prev.haskellPackages.override ( old: {
overrides = final.lib.composeExtensions ( old.overrides or (_: _: {})) (f: p: { overrides = final.lib.composeExtensions ( old.overrides or (_: _: {})) (f: p: rec {
reddit-pub = f.callPackage ./. {}; reddit-pub = f.callPackage ./reddit_pub {};
reddit-lib = f.callPackage ./reddit_lib {};
reddit-tags = f.callPackage ./reddit_tags {};
}); });
} ); } );
}; };
@ -24,16 +25,18 @@
let let
pkgs = import nixpkgs { inherit system; overlays = [ self.overlay ]; }; pkgs = import nixpkgs { inherit system; overlays = [ self.overlay ]; };
hp = pkgs.haskellPackages; hp = pkgs.haskellPackages;
hls = (easy-hls.withGhcs [ hp.ghc.version ] ).${system};
in in
rec { rec {
packages = { inherit (hp) reddit-pub; }; packages.reddit-pub = pkgs.haskell.lib.justStaticExecutables hp.reddit-pub;
packages.reddit-pub-dhall = pkgs.dhallPackages.callPackage ./dhall.nix {};
packages.reddit-tags = pkgs.haskell.lib.justStaticExecutables hp.reddit-tags;
defaultPackage = packages.reddit-pub; defaultPackage = packages.reddit-pub;
devShell = devShell =
hp.shellFor { hp.shellFor {
packages = h: [h.reddit-pub]; packages = h: [h.reddit-pub h.reddit-tags];
withHoogle = true; withHoogle = true;
buildInputs = with pkgs; [ buildInputs = with pkgs; [
dhall-lsp-server dhall-lsp-server
@ -43,7 +46,6 @@
hp.hlint hp.hlint
stylish-haskell stylish-haskell
ghcid ghcid
hls
rrdtool rrdtool
jq jq
sqlite-interactive sqlite-interactive
@ -51,6 +53,10 @@
sqlite-interactive sqlite-interactive
hp.graphmod hp.graphmod
hp.dhall-nixpkgs
hp.haskell-language-server
]; ];
}; };
} }

2
hie.yaml Normal file
View File

@ -0,0 +1,2 @@
cradle:
cabal:

View File

@ -1,11 +1,27 @@
{ lib, config, pkgs, ... }: { lib, config, pkgs, ... }:
let
cookie = "dontusethisinprod";
setup = pkgs.writeScriptBin "setup-rabbitmq.sh" ''
rabbitmqctl --erlang-cookie "${cookie}" add_vhost reddit
rabbitmqctl --erlang-cookie "${cookie}" add_user admin_user password
rabbitmqctl --erlang-cookie "${cookie}" set_user_tags admin_user administrator
rabbitmqctl --erlang-cookie "${cookie}" set_permissions -p / admin_user ".*" ".*" ".*"
rabbitmqctl --erlang-cookie "${cookie}" set_permissions -p reddit admin_user ".*" ".*" ".*"
rabbitmqctl --erlang-cookie "${cookie}" add_user reddit_user password
rabbitmqctl --erlang-cookie "${cookie}" set_permissions -p reddit reddit_user ".*" ".*" ".*"
'';
in
{ {
services.rabbitmq = { services.rabbitmq = {
enable = true; enable = true;
listenAddress = "0.0.0.0"; listenAddress = "0.0.0.0";
managementPlugin.enable = true; managementPlugin.enable = true;
cookie = "dontusethisinprod";
}; };
networking.firewall.allowedTCPPorts = [ 5672 15672 ]; networking.firewall.allowedTCPPorts = [ 5672 15672 ];
environment.systemPackages = [ setup ];
} }

5
reddit_lib/CHANGELOG.md Normal file
View File

@ -0,0 +1,5 @@
# Revision history for reddit-lib
## 0.1.0.0 -- YYYY-mm-dd
* First version. Released on an unsuspecting world.

10
reddit_lib/default.nix Normal file
View File

@ -0,0 +1,10 @@
{ mkDerivation, base, lib }:
mkDerivation {
pname = "reddit-lib";
version = "0.1.0.0";
src = ./.;
libraryHaskellDepends = [ base ];
testHaskellDepends = [ base ];
license = "unknown";
hydraPlatforms = lib.platforms.none;
}

View File

@ -0,0 +1,38 @@
cabal-version: 3.0
name: reddit-lib
version: 0.1.0.0
synopsis:
-- A longer description of the package.
-- description:
homepage:
-- A URL where users can report bugs.
-- bug-reports:
license: NONE
author: Mats Rauhala
maintainer: mats.rauhala@iki.fi
-- A copyright notice.
-- copyright:
category: Web
extra-source-files: CHANGELOG.md
library
exposed-modules: Reddit.Publish
-- Modules included in this library but not exported.
-- other-modules:
-- LANGUAGE extensions used by modules in this package.
-- other-extensions:
build-depends: base ^>=4.15.1.0
hs-source-dirs: src
default-language: Haskell2010
test-suite reddit-lib-test
default-language: Haskell2010
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: MyLibTest.hs
build-depends: base ^>=4.15.1.0

View File

@ -1,5 +1,5 @@
{-# LANGUAGE DerivingVia #-} {-# LANGUAGE DerivingVia #-}
module Publish where module Reddit.Publish where
import Data.Functor.Contravariant import Data.Functor.Contravariant
import Data.Monoid (Ap(..)) import Data.Monoid (Ap(..))

View File

@ -0,0 +1,4 @@
module Main (main) where
main :: IO ()
main = putStrLn "Test suite not yet implemented."

9
reddit_pub/app/Main.hs Normal file
View File

@ -0,0 +1,9 @@
module Main where
import qualified MyLib (defaultMain)
import System.Environment (getArgs)
main :: IO ()
main = do
[path] <- getArgs
MyLib.defaultMain path

View File

@ -3,7 +3,7 @@ let config = ./dhall/package.dhall
in { amqp = config.AMQP::{ in { amqp = config.AMQP::{
, vhost = "reddit" , vhost = "reddit"
, username = env:AMQP_USER as Text ? "reddit_pub" , username = env:AMQP_USER as Text ? "reddit_pub"
, password = env:AMQP_PASS as Text ? "tester" , password = config.Password.Type.Password (env:AMQP_PASS as Text ? "tester")
, host = env:AMQP_HOST as Text ? "127.0.0.1" , host = env:AMQP_HOST as Text ? "127.0.0.1"
-- , host = "10.233.5.2" -- , host = "10.233.5.2"
} }

View File

@ -1,6 +1,6 @@
{ mkDerivation, aeson, amqp, base, bytestring, containers, dhall { mkDerivation, aeson, amqp, base, bytestring, containers, dhall
, hedgehog, hspec, hspec-hedgehog, lens, lens-aeson, lib, mtl , hedgehog, hspec, hspec-hedgehog, lens, lens-aeson, lib, mtl
, pipes, sqlite-simple, text, wreq , pipes, reddit-lib, sqlite-simple, text, wreq
}: }:
mkDerivation { mkDerivation {
pname = "reddit-pub"; pname = "reddit-pub";
@ -8,9 +8,10 @@ mkDerivation {
src = ./.; src = ./.;
isLibrary = true; isLibrary = true;
isExecutable = true; isExecutable = true;
enableSeparateDataOutput = true;
libraryHaskellDepends = [ libraryHaskellDepends = [
aeson amqp base bytestring containers dhall lens lens-aeson mtl aeson amqp base bytestring containers dhall lens lens-aeson mtl
pipes sqlite-simple text wreq pipes reddit-lib sqlite-simple text wreq
]; ];
executableHaskellDepends = [ base ]; executableHaskellDepends = [ base ];
testHaskellDepends = [ testHaskellDepends = [

9
reddit_pub/dhall.nix Normal file
View File

@ -0,0 +1,9 @@
{ buildDhallDirectoryPackage }:
buildDhallDirectoryPackage {
name = "";
src = ./dhall;
file = "package.dhall";
source = false;
document = false;
dependencies = [];
}

View File

@ -1,5 +1,5 @@
{ vhost : Text { vhost : Text
, username : Text , username : Text
, password : Text , password : ../Password/Type.dhall
, host : Text , host : Text
} }

View File

@ -0,0 +1 @@
< Password : Text | File : Text >

View File

@ -0,0 +1 @@
{ Type = ./Type.dhall }

View File

@ -2,4 +2,5 @@
, default = ./default.dhall , default = ./default.dhall
, AMQP = ./AMQP/package.dhall , AMQP = ./AMQP/package.dhall
, Fetcher = ./Fetcher/package.dhall , Fetcher = ./Fetcher/package.dhall
, Password = ./Password/package.dhall
} }

View File

@ -32,7 +32,6 @@ library
Data.Deriving.Aeson Data.Deriving.Aeson
Network.Reddit Network.Reddit
Data.SubReddit Data.SubReddit
Publish
Membership Membership
-- Modules included in this library but not exported. -- Modules included in this library but not exported.
@ -40,7 +39,7 @@ library
-- LANGUAGE extensions used by modules in this package. -- LANGUAGE extensions used by modules in this package.
-- other-extensions: -- other-extensions:
build-depends: base ^>=4.14.1.0 build-depends: base ^>=4.15.1.0
, amqp , amqp
, aeson , aeson
, lens , lens
@ -53,6 +52,7 @@ library
, pipes , pipes
, containers , containers
, sqlite-simple , sqlite-simple
, reddit-lib
hs-source-dirs: src hs-source-dirs: src
default-language: Haskell2010 default-language: Haskell2010
@ -65,7 +65,7 @@ executable reddit-pub
-- LANGUAGE extensions used by modules in this package. -- LANGUAGE extensions used by modules in this package.
-- other-extensions: -- other-extensions:
build-depends: build-depends:
base ^>=4.14.1.0, base ^>=4.15.1.0,
reddit-pub reddit-pub
hs-source-dirs: app hs-source-dirs: app
@ -82,7 +82,7 @@ test-suite reddit-tests
-- LANGUAGE extensions used by modules in this package. -- LANGUAGE extensions used by modules in this package.
-- other-extensions: -- other-extensions:
build-depends: build-depends:
base ^>=4.14.1.0, base ^>=4.15.1.0,
mtl, mtl,
containers, containers,
bytestring, bytestring,

View File

@ -11,10 +11,17 @@ import Dhall.Deriving
import Numeric.Natural (Natural) import Numeric.Natural (Natural)
import Data.SubReddit (SubReddit) import Data.SubReddit (SubReddit)
data Password
= Password Text
| File FilePath
deriving stock (Generic, Show)
deriving (FromDhall, ToDhall)
via (Codec AsIs Password)
data AMQP = AMQP data AMQP = AMQP
{ amqpVhost :: Text { amqpVhost :: Text
, amqpUsername :: Text , amqpUsername :: Text
, amqpPassword :: Text , amqpPassword :: Password
, amqpHost :: Text , amqpHost :: Text
} }
deriving stock (Generic, Show) deriving stock (Generic, Show)
@ -30,7 +37,7 @@ vhost = lens amqpVhost (\ am txt -> am{amqpVhost=txt})
username :: Lens' AMQP Text username :: Lens' AMQP Text
username = lens amqpUsername (\ am txt -> am{amqpUsername=txt}) username = lens amqpUsername (\ am txt -> am{amqpUsername=txt})
password :: Lens' AMQP Text password :: Lens' AMQP Password
password = lens amqpPassword (\ am txt -> am{amqpPassword=txt}) password = lens amqpPassword (\ am txt -> am{amqpPassword=txt})
data Fetcher = Fetcher data Fetcher = Fetcher

131
reddit_pub/src/MyLib.hs Normal file
View File

@ -0,0 +1,131 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE LambdaCase #-}
module MyLib (defaultMain) where
import Control.Concurrent (threadDelay)
import Control.Exception (bracket)
import Control.Lens
import Control.Monad (forever, void)
import Data.Aeson (FromJSON, ToJSON, Value)
import qualified Data.Aeson as A
import Data.Aeson.Lens (_String, key)
import Data.Bool (bool)
import Data.Config
import Data.Deriving.Aeson
import Data.Foldable (for_)
import Data.Functor.Contravariant ((>$<))
import Data.Text (Text)
import qualified Data.Text.IO as TI
import qualified Data.Text.Strict.Lens as T
import qualified Database.SQLite.Simple as SQL
import GHC.Generics (Generic)
import qualified Membership
import Network.AMQP
( Channel
, DeliveryMode(Persistent)
, closeConnection
, declareExchange
, exchangeName
, exchangeType
, msgBody
, msgDeliveryMode
, newExchange
, newMsg
, openChannel
, openConnection
, publishMsg
)
import Network.Reddit (RedditId(RedditId), publishEntries)
import Network.Wreq.Session (newSession)
import Reddit.Publish (Publish(..))
import Text.Printf (printf)
import Data.ByteString.Lazy (ByteString)
data MessageType = Create | Update
deriving stock (Show, Eq, Generic)
deriving anyclass (ToJSON, FromJSON)
data Message = Message
{ messageType :: MessageType
, messageIdentifier :: RedditId
, messageContent :: Value
}
deriving stock (Show, Eq, Generic)
deriving (ToJSON, FromJSON)
via AesonCodec (Field (CamelCase <<< DropPrefix "message")) Message
toMessage :: SQL.Connection -> Publish IO (Maybe Message) -> Publish IO Value
toMessage sqlConn (Publish p) = Publish $ \entry -> do
case RedditId <$> (entry ^? key "id" . _String) of
Nothing -> p Nothing
Just redditId -> do
event <- bool Create Update <$> Membership.isSeen sqlConn redditId
p $ Just $ Message event redditId entry
sqlRecorder :: SQL.Connection -> Publish IO (Maybe RedditId)
sqlRecorder conn = Publish $ maybe (pure ()) (Membership.recordSeen conn)
amqpPublisher :: Channel -> Text -> Publish IO (Maybe ByteString)
amqpPublisher channel exchange = Publish $ \case
Nothing -> pure ()
Just lbs ->
void $ publishMsg channel exchange routingKey (message lbs)
where
routingKey = "doesn't matter on fanout"
message lbs = newMsg
{ msgBody = lbs
, msgDeliveryMode = Just Persistent
}
stdoutPublisher :: Publish IO String
stdoutPublisher = Publish putStrLn
data Fetch
= Fetch Fetcher
| PublishMessage Message
| ParseFailed
fetchToLog :: Fetch -> String
fetchToLog (Fetch fetcher) = printf "Refreshing %s" (show $ fetcherSubreddit fetcher)
fetchToLog ParseFailed = printf "Failed parsing"
fetchToLog (PublishMessage msg) = messageToLog msg
where
messageToLog :: Message -> String
messageToLog m = printf "Publishing %s as type %s" (show $ messageIdentifier m) (show $ messageType m)
defaultMain :: FilePath -> IO ()
defaultMain path = do
conf <- readConfig path
pass <- getPassword (conf ^. amqp . password)
let rabbitConnect = openConnection
(conf ^. amqp . host . T.unpacked)
(conf ^. amqp . vhost)
(conf ^. amqp . username)
pass
bracket rabbitConnect closeConnection $ \conn -> do
SQL.withConnection (conf ^. sqlite) $ \sqlConn -> do
SQL.execute_ sqlConn "create table if not exists membership (reddit_id primary key)"
chan <- openChannel conn
declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" }
sess <- newSession
let encoder = amqpPublisher chan "reddit_posts"
recorder = sqlRecorder sqlConn
publisher = (fmap A.encode >$< encoder) <> (fmap messageIdentifier >$< recorder) <> (maybe ParseFailed PublishMessage >$< logger)
logger = fetchToLog >$< stdoutPublisher
forever $ do
for_ (conf ^. fetchers) $ \fetcher -> do
publish logger (Fetch fetcher)
publishEntries (toMessage sqlConn publisher) sess fetcher
threadDelay (15 * 60_000_000)
getPassword :: Password -> IO Text
getPassword (Password p) = pure p
getPassword (File path) = TI.readFile path

View File

@ -14,7 +14,7 @@ import Network.Wreq hiding (getWith)
import Network.Wreq.Session (Session, getWith) import Network.Wreq.Session (Session, getWith)
import Pipes (Producer, (>->), for, runEffect) import Pipes (Producer, (>->), for, runEffect)
import qualified Pipes.Prelude as P import qualified Pipes.Prelude as P
import Publish import Reddit.Publish
import Data.Maybe (maybeToList) import Data.Maybe (maybeToList)
import Control.Monad.Trans (liftIO) import Control.Monad.Trans (liftIO)
import Database.SQLite.Simple.ToField (ToField) import Database.SQLite.Simple.ToField (ToField)

View File

5
reddit_tags/CHANGELOG.md Normal file
View File

@ -0,0 +1,5 @@
# Revision history for reddit-tags
## 0.1.0.0 -- YYYY-mm-dd
* First version. Released on an unsuspecting world.

View File

@ -1,8 +1,8 @@
module Main where module Main where
import qualified MyLib (defaultMain) import qualified MyLib (someFunc)
main :: IO () main :: IO ()
main = do main = do
putStrLn "Hello, Haskell!" putStrLn "Hello, Haskell!"
MyLib.defaultMain MyLib.someFunc

18
reddit_tags/default.nix Normal file
View File

@ -0,0 +1,18 @@
{ mkDerivation, aeson, amqp, attoparsec, base, hspec, lens
, lens-aeson, lib, mtl, reddit-lib, text, transformers
}:
mkDerivation {
pname = "reddit-tags";
version = "0.1.0.0";
src = ./.;
isLibrary = true;
isExecutable = true;
libraryHaskellDepends = [
aeson amqp attoparsec base lens lens-aeson mtl reddit-lib text
transformers
];
executableHaskellDepends = [ base ];
testHaskellDepends = [ base hspec ];
license = "unknown";
hydraPlatforms = lib.platforms.none;
}

View File

@ -0,0 +1,67 @@
cabal-version: 3.0
name: reddit-tags
version: 0.1.0.0
synopsis:
-- A longer description of the package.
-- description:
homepage:
-- A URL where users can report bugs.
-- bug-reports:
license: NONE
author: Mats Rauhala
maintainer: mats.rauhala@iki.fi
-- A copyright notice.
-- copyright:
category: Web
extra-source-files: CHANGELOG.md
library
exposed-modules: MyLib
Tags
Transformer
-- Modules included in this library but not exported.
-- other-modules:
-- LANGUAGE extensions used by modules in this package.
-- other-extensions:
build-depends: base ^>=4.15.1.0
, amqp
, reddit-lib
, mtl
, text
, aeson
, lens
, lens-aeson
, transformers
, attoparsec
, bytestring
hs-source-dirs: src
default-language: Haskell2010
executable reddit-tags
main-is: Main.hs
-- Modules included in this executable, other than Main.
-- other-modules:
-- LANGUAGE extensions used by modules in this package.
-- other-extensions:
build-depends:
base ^>=4.15.1.0,
reddit-tags
hs-source-dirs: app
default-language: Haskell2010
test-suite reddit-tags-test
default-language: Haskell2010
type: exitcode-stdio-1.0
hs-source-dirs: test
main-is: MyLibTest.hs
build-depends: base ^>=4.15.1.0
, hspec
, reddit-tags

57
reddit_tags/src/MyLib.hs Normal file
View File

@ -0,0 +1,57 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE OverloadedStrings #-}
module MyLib (someFunc) where
import System.Environment (lookupEnv)
import Control.Monad.Trans.Maybe (MaybeT(..))
import Network.AMQP
import qualified Data.Text as T
import Data.Text (Text)
import Control.Exception (bracket)
import Control.Concurrent (newEmptyMVar, readMVar)
import Control.Monad (void)
import Control.Lens
import Data.Aeson.Lens
import Transformer
import Data.ByteString.Lazy (ByteString)
import Control.Category ((>>>))
import Control.Arrow (arr, (&&&))
import Tags (parseTags)
data AMQP = AMQP
{ host :: String
, vhost :: Text
, username :: Text
, password :: Text
}
getAMQP :: IO (Maybe AMQP)
getAMQP = runMaybeT $
AMQP <$> lookupEnvM "AMQP_HOST" <*> lookupEnvMText "AMQP_VHOST" <*> lookupEnvMText "AMQP_USER" <*> lookupEnvMText "AMQP_PASS"
where
lookupEnvM = MaybeT . lookupEnv
lookupEnvMText = fmap T.pack . lookupEnvM
tagTransformer :: Transformer IO ByteString ()
tagTransformer =
arrOpt (preview (key "content" . key "title" . _String))
>>> arr id &&& arr parseTags
>>> liftTransformer print
someFunc :: IO ()
someFunc = do
Just AMQP{..} <- getAMQP
let rabbitConnect = openConnection host vhost username password
bracket rabbitConnect closeConnection $ \conn -> do
chan <- openChannel conn
qos chan 0 1 False
declareQueue chan newQueue {queueName="reddit_tags"}
bindQueue chan "reddit_tags" "reddit_posts" "key"
consumeMsgs chan "reddit_tags" Ack $ \(msg, env) -> do
void $ runTransformer tagTransformer (msgBody msg)
-- let body = msgBody msg
-- let title = body ^? key "content" . key "title"
-- print title
-- print $ parseTags title
ackEnv env
void getLine

18
reddit_tags/src/Tags.hs Normal file
View File

@ -0,0 +1,18 @@
{-# LANGUAGE ApplicativeDo #-}
module Tags (parseTags) where
import qualified Data.Attoparsec.Text as A
import Data.Attoparsec.Text (Parser)
import Data.Text (Text)
import Data.Either (fromRight)
tag :: Parser Text
tag = do
A.skipWhile (/= '[')
A.char '[' *> A.takeWhile (/= ']') <* A.char ']'
tags :: Parser [Text]
tags = A.many1 tag
parseTags :: Text -> [Text]
parseTags = fromRight [] . A.parseOnly tags

View File

@ -0,0 +1,18 @@
{-# LANGUAGE DerivingVia #-}
module Transformer where
import Control.Category ( Category )
import Control.Arrow ( Arrow, ArrowChoice, Kleisli(Kleisli) )
import Control.Monad.Trans.Maybe ( MaybeT(MaybeT) )
newtype Transformer m a b = Transformer (a -> m (Maybe b))
deriving (Category, Arrow, ArrowChoice) via Kleisli (MaybeT m)
arrOpt :: Applicative m => (a -> Maybe b) -> Transformer m a b
arrOpt f = Transformer (pure . f)
liftTransformer :: Monad m => (a -> m b) -> Transformer m a b
liftTransformer f = Transformer (fmap Just . f)
runTransformer :: Monad m => Transformer m a b -> a -> m (Maybe b)
runTransformer (Transformer tr) = tr

View File

@ -0,0 +1,17 @@
{-# LANGUAGE OverloadedStrings #-}
module Main (main) where
import Tags (parseTags)
import Test.Hspec
main :: IO ()
main = hspec $
describe "Parser" $ do
it "Returns no results if there are no tags" $
parseTags "foo bar" `shouldBe` []
it "Returns a single result" $
parseTags "[foo]" `shouldBe` ["foo"]
it "Finds multiple results" $
parseTags "[foo][bar]" `shouldBe` ["foo", "bar"]
it "Finds multiple results with other text interleaved" $
parseTags "prefix [foo] infix [bar] suffix" `shouldBe` ["foo", "bar"]

View File

@ -1,89 +0,0 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeOperators #-}
module MyLib (defaultMain) where
import Control.Exception (bracket)
import Control.Lens
import Control.Monad (void)
import Data.Aeson (FromJSON, ToJSON, Value)
import qualified Data.Aeson as A
import Data.Config
import Data.Deriving.Aeson
import Data.Foldable (for_)
import Data.Functor.Contravariant ((>$<))
import Data.Text (Text)
import qualified Data.Text.Strict.Lens as T
import qualified Database.SQLite.Simple as SQL
import GHC.Generics (Generic)
import qualified Membership
import Network.AMQP
( Channel
, DeliveryMode(Persistent)
, exchangeName
, exchangeType
, msgBody
, msgDeliveryMode
, newMsg
, publishMsg, openConnection, closeConnection, openChannel, newExchange, declareExchange
)
import Network.Reddit (RedditId (RedditId), publishEntries)
import Network.Wreq.Session (newSession)
import Publish (Publish(..))
import Data.Aeson.Lens (key, _String)
import Data.Bool (bool)
data MessageType = Create | Update
deriving stock (Show, Eq, Generic)
deriving anyclass (ToJSON, FromJSON)
data Message = Message
{ messageType :: MessageType
, messageIdentifier :: RedditId
, messageContent :: Value
}
deriving stock (Show, Eq, Generic)
deriving (ToJSON, FromJSON)
via AesonCodec (Field (CamelCase <<< DropPrefix "message")) Message
toMessage :: Value -> Message
toMessage entry = Message Create (RedditId (entry ^. key "id" . _String)) entry
sqlRecorder :: SQL.Connection -> Publish IO RedditId
sqlRecorder conn = Publish $ Membership.recordSeen conn
amqpPublisher :: SQL.Connection -> Channel -> Text -> Publish IO Message
amqpPublisher sqlConn channel exchange = Publish $ \msg -> do
seen <- Membership.isSeen sqlConn (messageIdentifier msg)
let msg' = msg{messageType = bool Create Update seen}
void $ publishMsg channel exchange routingKey (message (A.encode msg'))
where
routingKey = "doesn't matter on fanout"
message lbs = newMsg
{ msgBody = lbs
, msgDeliveryMode = Just Persistent
}
defaultMain :: IO ()
defaultMain = do
conf <- readConfig "./config.dhall"
let rabbitConnect = openConnection
(conf ^. amqp . host . T.unpacked)
(conf ^. amqp . vhost)
(conf ^. amqp . username)
(conf ^. amqp . password)
bracket rabbitConnect closeConnection $ \conn -> do
SQL.withConnection (conf ^. sqlite) $ \sqlConn -> do
SQL.execute_ sqlConn "create table if not exists membership (reddit_id primary key)"
chan <- openChannel conn
declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" }
sess <- newSession
let encoder = amqpPublisher sqlConn chan "reddit_posts"
recorder = sqlRecorder sqlConn
publisher = encoder <> (messageIdentifier >$< recorder)
for_ (conf ^. fetchers) $ \fetcher -> do
print fetcher
publishEntries (toMessage >$< publisher) sess fetcher