Compare commits
No commits in common. "main" and "44d1541503e2a50148844796b2813d6ff3b0472f" have entirely different histories.
main
...
44d1541503
@ -1,8 +1,8 @@
|
||||
module Main where
|
||||
|
||||
import qualified MyLib (someFunc)
|
||||
import qualified MyLib (defaultMain)
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
putStrLn "Hello, Haskell!"
|
||||
MyLib.someFunc
|
||||
MyLib.defaultMain
|
@ -1 +0,0 @@
|
||||
packages: */*
|
@ -3,7 +3,7 @@ let config = ./dhall/package.dhall
|
||||
in { amqp = config.AMQP::{
|
||||
, vhost = "reddit"
|
||||
, username = env:AMQP_USER as Text ? "reddit_pub"
|
||||
, password = config.Password.Type.Password (env:AMQP_PASS as Text ? "tester")
|
||||
, password = env:AMQP_PASS as Text ? "tester"
|
||||
, host = env:AMQP_HOST as Text ? "127.0.0.1"
|
||||
-- , host = "10.233.5.2"
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
{ mkDerivation, aeson, amqp, base, bytestring, containers, dhall
|
||||
, hedgehog, hspec, hspec-hedgehog, lens, lens-aeson, lib, mtl
|
||||
, pipes, reddit-lib, sqlite-simple, text, wreq
|
||||
, pipes, sqlite-simple, text, wreq
|
||||
}:
|
||||
mkDerivation {
|
||||
pname = "reddit-pub";
|
||||
@ -8,10 +8,9 @@ mkDerivation {
|
||||
src = ./.;
|
||||
isLibrary = true;
|
||||
isExecutable = true;
|
||||
enableSeparateDataOutput = true;
|
||||
libraryHaskellDepends = [
|
||||
aeson amqp base bytestring containers dhall lens lens-aeson mtl
|
||||
pipes reddit-lib sqlite-simple text wreq
|
||||
pipes sqlite-simple text wreq
|
||||
];
|
||||
executableHaskellDepends = [ base ];
|
||||
testHaskellDepends = [
|
@ -1,5 +1,5 @@
|
||||
{ vhost : Text
|
||||
, username : Text
|
||||
, password : ../Password/Type.dhall
|
||||
, password : Text
|
||||
, host : Text
|
||||
}
|
@ -2,5 +2,4 @@
|
||||
, default = ./default.dhall
|
||||
, AMQP = ./AMQP/package.dhall
|
||||
, Fetcher = ./Fetcher/package.dhall
|
||||
, Password = ./Password/package.dhall
|
||||
}
|
48
flake.lock
48
flake.lock
@ -1,12 +1,30 @@
|
||||
{
|
||||
"nodes": {
|
||||
"easy-hls": {
|
||||
"inputs": {
|
||||
"nixpkgs": "nixpkgs"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1636606878,
|
||||
"narHash": "sha256-rLxYl7iYP9vQhSvVlV2uRCdgrqKDz/vN1Z8ZmA8itkM=",
|
||||
"owner": "jkachmar",
|
||||
"repo": "easy-hls-nix",
|
||||
"rev": "edd5710946d46ea40810ef9a708b084d7e05a118",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "jkachmar",
|
||||
"repo": "easy-hls-nix",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"flake-utils": {
|
||||
"locked": {
|
||||
"lastModified": 1649676176,
|
||||
"narHash": "sha256-OWKJratjt2RW151VUlJPRALb7OU2S5s+f0vLj4o1bHM=",
|
||||
"lastModified": 1637014545,
|
||||
"narHash": "sha256-26IZAc5yzlD9FlDT54io1oqG/bBoyka+FJk5guaX4x4=",
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"rev": "a4b154ebbdc88c8498a5c7b01589addc9e9cb678",
|
||||
"rev": "bba5dcc8e0b20ab664967ad83d24d64cb64ec4f4",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@ -17,11 +35,26 @@
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1650440614,
|
||||
"narHash": "sha256-7mF7gyS5P3UmZmQuo9jbikP2wMyRUnimI7HcKLJ9OZQ=",
|
||||
"lastModified": 1615055905,
|
||||
"narHash": "sha256-Ig7CXgE5C3mwtTVBl8nSTessa1oMAm6Pm/p+T6iAJD8=",
|
||||
"owner": "nixos",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "f3fc074642a25ef5a1423412f09946149237e338",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "nixos",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nixpkgs_2": {
|
||||
"locked": {
|
||||
"lastModified": 1637238590,
|
||||
"narHash": "sha256-zGI/dR9WZvfZE0Eyk7zC4Y8ukxC/oSbfApqIvSCtB+o=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "0bbb65673c0ba31047c9ba6c4cd211556b534a4e",
|
||||
"rev": "57407fed58509a8d731d525f39842ea5ab696237",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
@ -31,8 +64,9 @@
|
||||
},
|
||||
"root": {
|
||||
"inputs": {
|
||||
"easy-hls": "easy-hls",
|
||||
"flake-utils": "flake-utils",
|
||||
"nixpkgs": "nixpkgs"
|
||||
"nixpkgs": "nixpkgs_2"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
22
flake.nix
22
flake.nix
@ -2,20 +2,19 @@
|
||||
description = "A very basic flake";
|
||||
|
||||
inputs = {
|
||||
easy-hls = { url = "github:jkachmar/easy-hls-nix"; };
|
||||
flake-utils = {
|
||||
url = "github:numtide/flake-utils";
|
||||
inputs.nixpkgs.follows = "nixpkgs";
|
||||
};
|
||||
};
|
||||
|
||||
outputs = { self, nixpkgs, flake-utils }:
|
||||
outputs = { self, nixpkgs, flake-utils, easy-hls }:
|
||||
{
|
||||
overlay = final: prev: {
|
||||
haskellPackages = prev.haskellPackages.override ( old: {
|
||||
overrides = final.lib.composeExtensions ( old.overrides or (_: _: {})) (f: p: rec {
|
||||
reddit-pub = f.callPackage ./reddit_pub {};
|
||||
reddit-lib = f.callPackage ./reddit_lib {};
|
||||
reddit-tags = f.callPackage ./reddit_tags {};
|
||||
overrides = final.lib.composeExtensions ( old.overrides or (_: _: {})) (f: p: {
|
||||
reddit-pub = f.callPackage ./. {};
|
||||
});
|
||||
} );
|
||||
};
|
||||
@ -25,18 +24,16 @@
|
||||
let
|
||||
pkgs = import nixpkgs { inherit system; overlays = [ self.overlay ]; };
|
||||
hp = pkgs.haskellPackages;
|
||||
hls = (easy-hls.withGhcs [ hp.ghc.version ] ).${system};
|
||||
in
|
||||
rec {
|
||||
|
||||
packages.reddit-pub = pkgs.haskell.lib.justStaticExecutables hp.reddit-pub;
|
||||
packages.reddit-pub-dhall = pkgs.dhallPackages.callPackage ./dhall.nix {};
|
||||
packages.reddit-tags = pkgs.haskell.lib.justStaticExecutables hp.reddit-tags;
|
||||
packages = { inherit (hp) reddit-pub; };
|
||||
|
||||
defaultPackage = packages.reddit-pub;
|
||||
|
||||
devShell =
|
||||
hp.shellFor {
|
||||
packages = h: [h.reddit-pub h.reddit-tags];
|
||||
packages = h: [h.reddit-pub];
|
||||
withHoogle = true;
|
||||
buildInputs = with pkgs; [
|
||||
dhall-lsp-server
|
||||
@ -46,6 +43,7 @@
|
||||
hp.hlint
|
||||
stylish-haskell
|
||||
ghcid
|
||||
hls
|
||||
rrdtool
|
||||
jq
|
||||
sqlite-interactive
|
||||
@ -53,10 +51,6 @@
|
||||
sqlite-interactive
|
||||
|
||||
hp.graphmod
|
||||
|
||||
hp.dhall-nixpkgs
|
||||
|
||||
hp.haskell-language-server
|
||||
];
|
||||
};
|
||||
}
|
||||
|
16
rabbitmq.nix
16
rabbitmq.nix
@ -1,27 +1,11 @@
|
||||
{ lib, config, pkgs, ... }:
|
||||
|
||||
let
|
||||
cookie = "dontusethisinprod";
|
||||
setup = pkgs.writeScriptBin "setup-rabbitmq.sh" ''
|
||||
rabbitmqctl --erlang-cookie "${cookie}" add_vhost reddit
|
||||
rabbitmqctl --erlang-cookie "${cookie}" add_user admin_user password
|
||||
rabbitmqctl --erlang-cookie "${cookie}" set_user_tags admin_user administrator
|
||||
rabbitmqctl --erlang-cookie "${cookie}" set_permissions -p / admin_user ".*" ".*" ".*"
|
||||
rabbitmqctl --erlang-cookie "${cookie}" set_permissions -p reddit admin_user ".*" ".*" ".*"
|
||||
|
||||
rabbitmqctl --erlang-cookie "${cookie}" add_user reddit_user password
|
||||
rabbitmqctl --erlang-cookie "${cookie}" set_permissions -p reddit reddit_user ".*" ".*" ".*"
|
||||
'';
|
||||
in
|
||||
{
|
||||
services.rabbitmq = {
|
||||
enable = true;
|
||||
listenAddress = "0.0.0.0";
|
||||
managementPlugin.enable = true;
|
||||
cookie = "dontusethisinprod";
|
||||
};
|
||||
networking.firewall.allowedTCPPorts = [ 5672 15672 ];
|
||||
|
||||
environment.systemPackages = [ setup ];
|
||||
}
|
||||
|
||||
|
@ -32,6 +32,7 @@ library
|
||||
Data.Deriving.Aeson
|
||||
Network.Reddit
|
||||
Data.SubReddit
|
||||
Publish
|
||||
Membership
|
||||
|
||||
-- Modules included in this library but not exported.
|
||||
@ -39,7 +40,7 @@ library
|
||||
|
||||
-- LANGUAGE extensions used by modules in this package.
|
||||
-- other-extensions:
|
||||
build-depends: base ^>=4.15.1.0
|
||||
build-depends: base ^>=4.14.1.0
|
||||
, amqp
|
||||
, aeson
|
||||
, lens
|
||||
@ -52,7 +53,6 @@ library
|
||||
, pipes
|
||||
, containers
|
||||
, sqlite-simple
|
||||
, reddit-lib
|
||||
hs-source-dirs: src
|
||||
default-language: Haskell2010
|
||||
|
||||
@ -65,7 +65,7 @@ executable reddit-pub
|
||||
-- LANGUAGE extensions used by modules in this package.
|
||||
-- other-extensions:
|
||||
build-depends:
|
||||
base ^>=4.15.1.0,
|
||||
base ^>=4.14.1.0,
|
||||
reddit-pub
|
||||
|
||||
hs-source-dirs: app
|
||||
@ -82,7 +82,7 @@ test-suite reddit-tests
|
||||
-- LANGUAGE extensions used by modules in this package.
|
||||
-- other-extensions:
|
||||
build-depends:
|
||||
base ^>=4.15.1.0,
|
||||
base ^>=4.14.1.0,
|
||||
mtl,
|
||||
containers,
|
||||
bytestring,
|
@ -1,5 +0,0 @@
|
||||
# Revision history for reddit-lib
|
||||
|
||||
## 0.1.0.0 -- YYYY-mm-dd
|
||||
|
||||
* First version. Released on an unsuspecting world.
|
@ -1,10 +0,0 @@
|
||||
{ mkDerivation, base, lib }:
|
||||
mkDerivation {
|
||||
pname = "reddit-lib";
|
||||
version = "0.1.0.0";
|
||||
src = ./.;
|
||||
libraryHaskellDepends = [ base ];
|
||||
testHaskellDepends = [ base ];
|
||||
license = "unknown";
|
||||
hydraPlatforms = lib.platforms.none;
|
||||
}
|
@ -1,38 +0,0 @@
|
||||
cabal-version: 3.0
|
||||
name: reddit-lib
|
||||
version: 0.1.0.0
|
||||
synopsis:
|
||||
|
||||
-- A longer description of the package.
|
||||
-- description:
|
||||
homepage:
|
||||
|
||||
-- A URL where users can report bugs.
|
||||
-- bug-reports:
|
||||
license: NONE
|
||||
author: Mats Rauhala
|
||||
maintainer: mats.rauhala@iki.fi
|
||||
|
||||
-- A copyright notice.
|
||||
-- copyright:
|
||||
category: Web
|
||||
extra-source-files: CHANGELOG.md
|
||||
|
||||
library
|
||||
exposed-modules: Reddit.Publish
|
||||
|
||||
-- Modules included in this library but not exported.
|
||||
-- other-modules:
|
||||
|
||||
-- LANGUAGE extensions used by modules in this package.
|
||||
-- other-extensions:
|
||||
build-depends: base ^>=4.15.1.0
|
||||
hs-source-dirs: src
|
||||
default-language: Haskell2010
|
||||
|
||||
test-suite reddit-lib-test
|
||||
default-language: Haskell2010
|
||||
type: exitcode-stdio-1.0
|
||||
hs-source-dirs: test
|
||||
main-is: MyLibTest.hs
|
||||
build-depends: base ^>=4.15.1.0
|
@ -1,4 +0,0 @@
|
||||
module Main (main) where
|
||||
|
||||
main :: IO ()
|
||||
main = putStrLn "Test suite not yet implemented."
|
@ -1,9 +0,0 @@
|
||||
module Main where
|
||||
|
||||
import qualified MyLib (defaultMain)
|
||||
import System.Environment (getArgs)
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
[path] <- getArgs
|
||||
MyLib.defaultMain path
|
@ -1,9 +0,0 @@
|
||||
{ buildDhallDirectoryPackage }:
|
||||
buildDhallDirectoryPackage {
|
||||
name = "";
|
||||
src = ./dhall;
|
||||
file = "package.dhall";
|
||||
source = false;
|
||||
document = false;
|
||||
dependencies = [];
|
||||
}
|
@ -1 +0,0 @@
|
||||
< Password : Text | File : Text >
|
@ -1 +0,0 @@
|
||||
{ Type = ./Type.dhall }
|
@ -1,131 +0,0 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DeriveAnyClass #-}
|
||||
{-# LANGUAGE DeriveGeneric #-}
|
||||
{-# LANGUAGE DerivingVia #-}
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE TypeOperators #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
module MyLib (defaultMain) where
|
||||
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Exception (bracket)
|
||||
import Control.Lens
|
||||
import Control.Monad (forever, void)
|
||||
import Data.Aeson (FromJSON, ToJSON, Value)
|
||||
import qualified Data.Aeson as A
|
||||
import Data.Aeson.Lens (_String, key)
|
||||
import Data.Bool (bool)
|
||||
import Data.Config
|
||||
import Data.Deriving.Aeson
|
||||
import Data.Foldable (for_)
|
||||
import Data.Functor.Contravariant ((>$<))
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text.IO as TI
|
||||
import qualified Data.Text.Strict.Lens as T
|
||||
import qualified Database.SQLite.Simple as SQL
|
||||
import GHC.Generics (Generic)
|
||||
import qualified Membership
|
||||
import Network.AMQP
|
||||
( Channel
|
||||
, DeliveryMode(Persistent)
|
||||
, closeConnection
|
||||
, declareExchange
|
||||
, exchangeName
|
||||
, exchangeType
|
||||
, msgBody
|
||||
, msgDeliveryMode
|
||||
, newExchange
|
||||
, newMsg
|
||||
, openChannel
|
||||
, openConnection
|
||||
, publishMsg
|
||||
)
|
||||
import Network.Reddit (RedditId(RedditId), publishEntries)
|
||||
import Network.Wreq.Session (newSession)
|
||||
import Reddit.Publish (Publish(..))
|
||||
import Text.Printf (printf)
|
||||
import Data.ByteString.Lazy (ByteString)
|
||||
|
||||
data MessageType = Create | Update
|
||||
deriving stock (Show, Eq, Generic)
|
||||
deriving anyclass (ToJSON, FromJSON)
|
||||
|
||||
data Message = Message
|
||||
{ messageType :: MessageType
|
||||
, messageIdentifier :: RedditId
|
||||
, messageContent :: Value
|
||||
}
|
||||
deriving stock (Show, Eq, Generic)
|
||||
deriving (ToJSON, FromJSON)
|
||||
via AesonCodec (Field (CamelCase <<< DropPrefix "message")) Message
|
||||
|
||||
|
||||
toMessage :: SQL.Connection -> Publish IO (Maybe Message) -> Publish IO Value
|
||||
toMessage sqlConn (Publish p) = Publish $ \entry -> do
|
||||
case RedditId <$> (entry ^? key "id" . _String) of
|
||||
Nothing -> p Nothing
|
||||
Just redditId -> do
|
||||
event <- bool Create Update <$> Membership.isSeen sqlConn redditId
|
||||
p $ Just $ Message event redditId entry
|
||||
|
||||
sqlRecorder :: SQL.Connection -> Publish IO (Maybe RedditId)
|
||||
sqlRecorder conn = Publish $ maybe (pure ()) (Membership.recordSeen conn)
|
||||
|
||||
amqpPublisher :: Channel -> Text -> Publish IO (Maybe ByteString)
|
||||
amqpPublisher channel exchange = Publish $ \case
|
||||
Nothing -> pure ()
|
||||
Just lbs ->
|
||||
void $ publishMsg channel exchange routingKey (message lbs)
|
||||
where
|
||||
routingKey = "doesn't matter on fanout"
|
||||
message lbs = newMsg
|
||||
{ msgBody = lbs
|
||||
, msgDeliveryMode = Just Persistent
|
||||
}
|
||||
|
||||
stdoutPublisher :: Publish IO String
|
||||
stdoutPublisher = Publish putStrLn
|
||||
|
||||
data Fetch
|
||||
= Fetch Fetcher
|
||||
| PublishMessage Message
|
||||
| ParseFailed
|
||||
|
||||
fetchToLog :: Fetch -> String
|
||||
fetchToLog (Fetch fetcher) = printf "Refreshing %s" (show $ fetcherSubreddit fetcher)
|
||||
fetchToLog ParseFailed = printf "Failed parsing"
|
||||
fetchToLog (PublishMessage msg) = messageToLog msg
|
||||
where
|
||||
messageToLog :: Message -> String
|
||||
messageToLog m = printf "Publishing %s as type %s" (show $ messageIdentifier m) (show $ messageType m)
|
||||
|
||||
|
||||
defaultMain :: FilePath -> IO ()
|
||||
defaultMain path = do
|
||||
conf <- readConfig path
|
||||
pass <- getPassword (conf ^. amqp . password)
|
||||
let rabbitConnect = openConnection
|
||||
(conf ^. amqp . host . T.unpacked)
|
||||
(conf ^. amqp . vhost)
|
||||
(conf ^. amqp . username)
|
||||
pass
|
||||
bracket rabbitConnect closeConnection $ \conn -> do
|
||||
SQL.withConnection (conf ^. sqlite) $ \sqlConn -> do
|
||||
SQL.execute_ sqlConn "create table if not exists membership (reddit_id primary key)"
|
||||
chan <- openChannel conn
|
||||
declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" }
|
||||
sess <- newSession
|
||||
let encoder = amqpPublisher chan "reddit_posts"
|
||||
recorder = sqlRecorder sqlConn
|
||||
publisher = (fmap A.encode >$< encoder) <> (fmap messageIdentifier >$< recorder) <> (maybe ParseFailed PublishMessage >$< logger)
|
||||
logger = fetchToLog >$< stdoutPublisher
|
||||
forever $ do
|
||||
for_ (conf ^. fetchers) $ \fetcher -> do
|
||||
publish logger (Fetch fetcher)
|
||||
publishEntries (toMessage sqlConn publisher) sess fetcher
|
||||
threadDelay (15 * 60_000_000)
|
||||
|
||||
getPassword :: Password -> IO Text
|
||||
getPassword (Password p) = pure p
|
||||
getPassword (File path) = TI.readFile path
|
@ -1,5 +0,0 @@
|
||||
# Revision history for reddit-tags
|
||||
|
||||
## 0.1.0.0 -- YYYY-mm-dd
|
||||
|
||||
* First version. Released on an unsuspecting world.
|
@ -1,18 +0,0 @@
|
||||
{ mkDerivation, aeson, amqp, attoparsec, base, hspec, lens
|
||||
, lens-aeson, lib, mtl, reddit-lib, text, transformers
|
||||
}:
|
||||
mkDerivation {
|
||||
pname = "reddit-tags";
|
||||
version = "0.1.0.0";
|
||||
src = ./.;
|
||||
isLibrary = true;
|
||||
isExecutable = true;
|
||||
libraryHaskellDepends = [
|
||||
aeson amqp attoparsec base lens lens-aeson mtl reddit-lib text
|
||||
transformers
|
||||
];
|
||||
executableHaskellDepends = [ base ];
|
||||
testHaskellDepends = [ base hspec ];
|
||||
license = "unknown";
|
||||
hydraPlatforms = lib.platforms.none;
|
||||
}
|
@ -1,67 +0,0 @@
|
||||
cabal-version: 3.0
|
||||
name: reddit-tags
|
||||
version: 0.1.0.0
|
||||
synopsis:
|
||||
|
||||
-- A longer description of the package.
|
||||
-- description:
|
||||
homepage:
|
||||
|
||||
-- A URL where users can report bugs.
|
||||
-- bug-reports:
|
||||
license: NONE
|
||||
author: Mats Rauhala
|
||||
maintainer: mats.rauhala@iki.fi
|
||||
|
||||
-- A copyright notice.
|
||||
-- copyright:
|
||||
category: Web
|
||||
extra-source-files: CHANGELOG.md
|
||||
|
||||
library
|
||||
exposed-modules: MyLib
|
||||
Tags
|
||||
Transformer
|
||||
|
||||
-- Modules included in this library but not exported.
|
||||
-- other-modules:
|
||||
|
||||
-- LANGUAGE extensions used by modules in this package.
|
||||
-- other-extensions:
|
||||
build-depends: base ^>=4.15.1.0
|
||||
, amqp
|
||||
, reddit-lib
|
||||
, mtl
|
||||
, text
|
||||
, aeson
|
||||
, lens
|
||||
, lens-aeson
|
||||
, transformers
|
||||
, attoparsec
|
||||
, bytestring
|
||||
hs-source-dirs: src
|
||||
default-language: Haskell2010
|
||||
|
||||
executable reddit-tags
|
||||
main-is: Main.hs
|
||||
|
||||
-- Modules included in this executable, other than Main.
|
||||
-- other-modules:
|
||||
|
||||
-- LANGUAGE extensions used by modules in this package.
|
||||
-- other-extensions:
|
||||
build-depends:
|
||||
base ^>=4.15.1.0,
|
||||
reddit-tags
|
||||
|
||||
hs-source-dirs: app
|
||||
default-language: Haskell2010
|
||||
|
||||
test-suite reddit-tags-test
|
||||
default-language: Haskell2010
|
||||
type: exitcode-stdio-1.0
|
||||
hs-source-dirs: test
|
||||
main-is: MyLibTest.hs
|
||||
build-depends: base ^>=4.15.1.0
|
||||
, hspec
|
||||
, reddit-tags
|
@ -1,57 +0,0 @@
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
module MyLib (someFunc) where
|
||||
import System.Environment (lookupEnv)
|
||||
import Control.Monad.Trans.Maybe (MaybeT(..))
|
||||
import Network.AMQP
|
||||
import qualified Data.Text as T
|
||||
import Data.Text (Text)
|
||||
import Control.Exception (bracket)
|
||||
import Control.Concurrent (newEmptyMVar, readMVar)
|
||||
import Control.Monad (void)
|
||||
|
||||
import Control.Lens
|
||||
import Data.Aeson.Lens
|
||||
import Transformer
|
||||
import Data.ByteString.Lazy (ByteString)
|
||||
import Control.Category ((>>>))
|
||||
import Control.Arrow (arr, (&&&))
|
||||
import Tags (parseTags)
|
||||
|
||||
data AMQP = AMQP
|
||||
{ host :: String
|
||||
, vhost :: Text
|
||||
, username :: Text
|
||||
, password :: Text
|
||||
}
|
||||
|
||||
getAMQP :: IO (Maybe AMQP)
|
||||
getAMQP = runMaybeT $
|
||||
AMQP <$> lookupEnvM "AMQP_HOST" <*> lookupEnvMText "AMQP_VHOST" <*> lookupEnvMText "AMQP_USER" <*> lookupEnvMText "AMQP_PASS"
|
||||
where
|
||||
lookupEnvM = MaybeT . lookupEnv
|
||||
lookupEnvMText = fmap T.pack . lookupEnvM
|
||||
|
||||
tagTransformer :: Transformer IO ByteString ()
|
||||
tagTransformer =
|
||||
arrOpt (preview (key "content" . key "title" . _String))
|
||||
>>> arr id &&& arr parseTags
|
||||
>>> liftTransformer print
|
||||
|
||||
someFunc :: IO ()
|
||||
someFunc = do
|
||||
Just AMQP{..} <- getAMQP
|
||||
let rabbitConnect = openConnection host vhost username password
|
||||
bracket rabbitConnect closeConnection $ \conn -> do
|
||||
chan <- openChannel conn
|
||||
qos chan 0 1 False
|
||||
declareQueue chan newQueue {queueName="reddit_tags"}
|
||||
bindQueue chan "reddit_tags" "reddit_posts" "key"
|
||||
consumeMsgs chan "reddit_tags" Ack $ \(msg, env) -> do
|
||||
void $ runTransformer tagTransformer (msgBody msg)
|
||||
-- let body = msgBody msg
|
||||
-- let title = body ^? key "content" . key "title"
|
||||
-- print title
|
||||
-- print $ parseTags title
|
||||
ackEnv env
|
||||
void getLine
|
@ -1,18 +0,0 @@
|
||||
{-# LANGUAGE ApplicativeDo #-}
|
||||
module Tags (parseTags) where
|
||||
|
||||
import qualified Data.Attoparsec.Text as A
|
||||
import Data.Attoparsec.Text (Parser)
|
||||
import Data.Text (Text)
|
||||
import Data.Either (fromRight)
|
||||
|
||||
tag :: Parser Text
|
||||
tag = do
|
||||
A.skipWhile (/= '[')
|
||||
A.char '[' *> A.takeWhile (/= ']') <* A.char ']'
|
||||
|
||||
tags :: Parser [Text]
|
||||
tags = A.many1 tag
|
||||
|
||||
parseTags :: Text -> [Text]
|
||||
parseTags = fromRight [] . A.parseOnly tags
|
@ -1,18 +0,0 @@
|
||||
{-# LANGUAGE DerivingVia #-}
|
||||
module Transformer where
|
||||
|
||||
import Control.Category ( Category )
|
||||
import Control.Arrow ( Arrow, ArrowChoice, Kleisli(Kleisli) )
|
||||
import Control.Monad.Trans.Maybe ( MaybeT(MaybeT) )
|
||||
|
||||
newtype Transformer m a b = Transformer (a -> m (Maybe b))
|
||||
deriving (Category, Arrow, ArrowChoice) via Kleisli (MaybeT m)
|
||||
|
||||
arrOpt :: Applicative m => (a -> Maybe b) -> Transformer m a b
|
||||
arrOpt f = Transformer (pure . f)
|
||||
|
||||
liftTransformer :: Monad m => (a -> m b) -> Transformer m a b
|
||||
liftTransformer f = Transformer (fmap Just . f)
|
||||
|
||||
runTransformer :: Monad m => Transformer m a b -> a -> m (Maybe b)
|
||||
runTransformer (Transformer tr) = tr
|
@ -1,17 +0,0 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
module Main (main) where
|
||||
|
||||
import Tags (parseTags)
|
||||
import Test.Hspec
|
||||
|
||||
main :: IO ()
|
||||
main = hspec $
|
||||
describe "Parser" $ do
|
||||
it "Returns no results if there are no tags" $
|
||||
parseTags "foo bar" `shouldBe` []
|
||||
it "Returns a single result" $
|
||||
parseTags "[foo]" `shouldBe` ["foo"]
|
||||
it "Finds multiple results" $
|
||||
parseTags "[foo][bar]" `shouldBe` ["foo", "bar"]
|
||||
it "Finds multiple results with other text interleaved" $
|
||||
parseTags "prefix [foo] infix [bar] suffix" `shouldBe` ["foo", "bar"]
|
@ -11,17 +11,10 @@ import Dhall.Deriving
|
||||
import Numeric.Natural (Natural)
|
||||
import Data.SubReddit (SubReddit)
|
||||
|
||||
data Password
|
||||
= Password Text
|
||||
| File FilePath
|
||||
deriving stock (Generic, Show)
|
||||
deriving (FromDhall, ToDhall)
|
||||
via (Codec AsIs Password)
|
||||
|
||||
data AMQP = AMQP
|
||||
{ amqpVhost :: Text
|
||||
, amqpUsername :: Text
|
||||
, amqpPassword :: Password
|
||||
, amqpPassword :: Text
|
||||
, amqpHost :: Text
|
||||
}
|
||||
deriving stock (Generic, Show)
|
||||
@ -37,7 +30,7 @@ vhost = lens amqpVhost (\ am txt -> am{amqpVhost=txt})
|
||||
username :: Lens' AMQP Text
|
||||
username = lens amqpUsername (\ am txt -> am{amqpUsername=txt})
|
||||
|
||||
password :: Lens' AMQP Password
|
||||
password :: Lens' AMQP Text
|
||||
password = lens amqpPassword (\ am txt -> am{amqpPassword=txt})
|
||||
|
||||
data Fetcher = Fetcher
|
89
src/MyLib.hs
Normal file
89
src/MyLib.hs
Normal file
@ -0,0 +1,89 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DeriveAnyClass #-}
|
||||
{-# LANGUAGE DeriveGeneric #-}
|
||||
{-# LANGUAGE DerivingVia #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE TypeOperators #-}
|
||||
module MyLib (defaultMain) where
|
||||
|
||||
import Control.Exception (bracket)
|
||||
import Control.Lens
|
||||
import Control.Monad (void)
|
||||
import Data.Aeson (FromJSON, ToJSON, Value)
|
||||
import qualified Data.Aeson as A
|
||||
import Data.Config
|
||||
import Data.Deriving.Aeson
|
||||
import Data.Foldable (for_)
|
||||
import Data.Functor.Contravariant ((>$<))
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text.Strict.Lens as T
|
||||
import qualified Database.SQLite.Simple as SQL
|
||||
import GHC.Generics (Generic)
|
||||
import qualified Membership
|
||||
import Network.AMQP
|
||||
( Channel
|
||||
, DeliveryMode(Persistent)
|
||||
, exchangeName
|
||||
, exchangeType
|
||||
, msgBody
|
||||
, msgDeliveryMode
|
||||
, newMsg
|
||||
, publishMsg, openConnection, closeConnection, openChannel, newExchange, declareExchange
|
||||
)
|
||||
import Network.Reddit (RedditId (RedditId), publishEntries)
|
||||
import Network.Wreq.Session (newSession)
|
||||
import Publish (Publish(..))
|
||||
import Data.Aeson.Lens (key, _String)
|
||||
import Data.Bool (bool)
|
||||
|
||||
data MessageType = Create | Update
|
||||
deriving stock (Show, Eq, Generic)
|
||||
deriving anyclass (ToJSON, FromJSON)
|
||||
|
||||
data Message = Message
|
||||
{ messageType :: MessageType
|
||||
, messageIdentifier :: RedditId
|
||||
, messageContent :: Value
|
||||
}
|
||||
deriving stock (Show, Eq, Generic)
|
||||
deriving (ToJSON, FromJSON)
|
||||
via AesonCodec (Field (CamelCase <<< DropPrefix "message")) Message
|
||||
|
||||
toMessage :: Value -> Message
|
||||
toMessage entry = Message Create (RedditId (entry ^. key "id" . _String)) entry
|
||||
|
||||
sqlRecorder :: SQL.Connection -> Publish IO RedditId
|
||||
sqlRecorder conn = Publish $ Membership.recordSeen conn
|
||||
|
||||
amqpPublisher :: SQL.Connection -> Channel -> Text -> Publish IO Message
|
||||
amqpPublisher sqlConn channel exchange = Publish $ \msg -> do
|
||||
seen <- Membership.isSeen sqlConn (messageIdentifier msg)
|
||||
let msg' = msg{messageType = bool Create Update seen}
|
||||
void $ publishMsg channel exchange routingKey (message (A.encode msg'))
|
||||
where
|
||||
routingKey = "doesn't matter on fanout"
|
||||
message lbs = newMsg
|
||||
{ msgBody = lbs
|
||||
, msgDeliveryMode = Just Persistent
|
||||
}
|
||||
|
||||
defaultMain :: IO ()
|
||||
defaultMain = do
|
||||
conf <- readConfig "./config.dhall"
|
||||
let rabbitConnect = openConnection
|
||||
(conf ^. amqp . host . T.unpacked)
|
||||
(conf ^. amqp . vhost)
|
||||
(conf ^. amqp . username)
|
||||
(conf ^. amqp . password)
|
||||
bracket rabbitConnect closeConnection $ \conn -> do
|
||||
SQL.withConnection (conf ^. sqlite) $ \sqlConn -> do
|
||||
SQL.execute_ sqlConn "create table if not exists membership (reddit_id primary key)"
|
||||
chan <- openChannel conn
|
||||
declareExchange chan newExchange { exchangeName = "reddit_posts", exchangeType = "fanout" }
|
||||
sess <- newSession
|
||||
let encoder = amqpPublisher sqlConn chan "reddit_posts"
|
||||
recorder = sqlRecorder sqlConn
|
||||
publisher = encoder <> (messageIdentifier >$< recorder)
|
||||
for_ (conf ^. fetchers) $ \fetcher -> do
|
||||
print fetcher
|
||||
publishEntries (toMessage >$< publisher) sess fetcher
|
@ -14,7 +14,7 @@ import Network.Wreq hiding (getWith)
|
||||
import Network.Wreq.Session (Session, getWith)
|
||||
import Pipes (Producer, (>->), for, runEffect)
|
||||
import qualified Pipes.Prelude as P
|
||||
import Reddit.Publish
|
||||
import Publish
|
||||
import Data.Maybe (maybeToList)
|
||||
import Control.Monad.Trans (liftIO)
|
||||
import Database.SQLite.Simple.ToField (ToField)
|
@ -1,5 +1,5 @@
|
||||
{-# LANGUAGE DerivingVia #-}
|
||||
module Reddit.Publish where
|
||||
module Publish where
|
||||
|
||||
import Data.Functor.Contravariant
|
||||
import Data.Monoid (Ap(..))
|
Loading…
Reference in New Issue
Block a user