119 lines
4.3 KiB
Haskell
119 lines
4.3 KiB
Haskell
{-# LANGUAGE TypeOperators #-}
|
|
{-# LANGUAGE NumericUnderscores #-}
|
|
{-# LANGUAGE OverloadedStrings #-}
|
|
{-# LANGUAGE TypeApplications #-}
|
|
{-# LANGUAGE DeriveGeneric #-}
|
|
{-# LANGUAGE DataKinds #-}
|
|
{-# LANGUAGE OverloadedLabels #-}
|
|
{-# LANGUAGE RankNTypes #-}
|
|
{-# LANGUAGE TupleSections #-}
|
|
module MyLib where
|
|
|
|
import Network.Wai.Metrics (metrics, registerWaiMetrics)
|
|
import System.Metrics
|
|
(Store, newStore, registerGcMetrics)
|
|
import Servant.Metrics.Prometheus
|
|
import Network.Wai.Handler.Warp (runSettings, defaultSettings, setPort)
|
|
import Servant
|
|
import GHC.Generics (Generic)
|
|
import Conduit (runConduit, (.|), MonadUnliftIO, liftIO, ConduitT, MonadResource)
|
|
import System.FilePath (isExtensionOf, takeBaseName)
|
|
import qualified Data.Conduit.Combinators as C
|
|
import Crypto.Hash (HashAlgorithm, Digest, SHA256)
|
|
import qualified Crypto.Hash as Crypto
|
|
import Amazonka (newEnv, Region(..), Env, Env'(..), setEndpoint, runResourceT, send, chunkedFile, defaultChunkSize)
|
|
import Amazonka.Auth (discover)
|
|
import qualified Amazonka.S3 as S3
|
|
import qualified Data.Text as T
|
|
import Control.Lens (view, set, (&), re)
|
|
import Data.Generics.Labels ()
|
|
import Control.Monad (void, forever)
|
|
import Data.HashMap.Strict (HashMap)
|
|
import Data.Text (Text)
|
|
import qualified Data.HashMap.Strict as HM
|
|
import System.Environment (getEnv)
|
|
import qualified Dhall
|
|
import Config (Config)
|
|
import Data.Text.Strict.Lens (utf8)
|
|
import Config.Watcher (Seconds)
|
|
import Control.Concurrent (threadDelay)
|
|
import UnliftIO (forConcurrently_)
|
|
import Data.Word (Word64)
|
|
import Image.Fingerprint (dhash)
|
|
import Data.Maybe (catMaybes)
|
|
|
|
newtype Routes route
|
|
= Routes { _getMetrics :: route :- MetricsRoute }
|
|
deriving (Generic)
|
|
|
|
type API = NamedRoutes Routes
|
|
|
|
app :: Store -> Application
|
|
app store = serve (Proxy @API) Routes { _getMetrics = metricsServer store }
|
|
|
|
hash :: (HashAlgorithm a, MonadUnliftIO m) => FilePath -> m (Digest a)
|
|
hash path = C.withSourceFile path $ \source ->
|
|
Crypto.hashFinalize <$> runConduit (source .| C.foldl Crypto.hashUpdate Crypto.hashInit)
|
|
|
|
data Image = Image
|
|
{ imagePath :: !FilePath
|
|
, imageHash :: !(Digest SHA256)
|
|
, imageFingerprint :: !(Maybe Word64)
|
|
}
|
|
deriving Show
|
|
|
|
imageToMetadata :: Image -> HashMap Text Text
|
|
imageToMetadata img = HM.fromList $ catMaybes
|
|
[ Just ("sha256", T.pack . show . imageHash $ img)
|
|
, ("fingerprint", ) . T.pack . show <$> imageFingerprint img
|
|
]
|
|
|
|
uploadImage :: Env -> S3.BucketName -> Image -> IO S3.PutObjectResponse
|
|
uploadImage env bucket img = do
|
|
obj <- chunkedFile defaultChunkSize (imagePath img)
|
|
runResourceT $ do
|
|
let putObj = S3.newPutObject bucket key obj & set #metadata mt & set #contentType (Just "image/jpeg")
|
|
key = S3.ObjectKey (T.pack . takeBaseName . imagePath $ img)
|
|
mt = imageToMetadata img
|
|
send env putObj
|
|
|
|
scan :: (MonadResource m) => FilePath -> (FilePath -> Bool) -> ConduitT () FilePath m ()
|
|
scan root predicate = C.sourceDirectoryDeep False root .| C.filter predicate
|
|
|
|
analyze :: (MonadUnliftIO m) => ConduitT FilePath Image m ()
|
|
analyze = C.mapM (\p -> Image p <$> hash p <*> fingerprint p)
|
|
where
|
|
fingerprint = fmap (either (const Nothing) Just) . liftIO . dhash
|
|
|
|
|
|
watcher :: Env -> S3.BucketName -> FilePath -> Seconds -> IO ()
|
|
watcher env bucket path delay = forever $ do
|
|
runResourceT $ runConduit $ scan path (isExtensionOf ".jpg")
|
|
.| analyze
|
|
.| C.iterM (liftIO . print)
|
|
.| C.mapM_ (void . liftIO . uploadImage env bucket)
|
|
threadDelay (fromIntegral (view #seconds delay) * 100_000)
|
|
|
|
someFunc :: IO ()
|
|
someFunc = do
|
|
confPath <- getEnv "CONFIG_PATH"
|
|
conf <- Dhall.inputFile (Dhall.auto @Config) confPath
|
|
discoveredEnv <- newEnv discover
|
|
let env = discoveredEnv
|
|
{ region = Region' $ view (#s3 . #region) conf
|
|
, overrides = setEndpoint True (view (#s3 . #endpoint . re utf8) conf) 443
|
|
}
|
|
bucket = S3.BucketName (view (#s3 . #bucket) conf)
|
|
store <- newStore
|
|
waiMetrics <- registerWaiMetrics store
|
|
registerGcMetrics store
|
|
let watchers = [watcher env bucket path (view (#watcher . #period) conf) | path <- view (#watcher . #directories) conf]
|
|
actions = ( runSettings settings (metrics waiMetrics $ app store) ) : watchers
|
|
forConcurrently_ actions id
|
|
-- runSettings settings (metrics waiMetrics $ app store)
|
|
where
|
|
settings =
|
|
defaultSettings &
|
|
setPort 8099
|
|
-- setOnException onException
|