Test and contract test the s3 interface
This commit is contained in:
23
src/MyLib.hs
23
src/MyLib.hs
@@ -21,11 +21,10 @@ import System.FilePath (isExtensionOf, takeBaseName)
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import Crypto.Hash (HashAlgorithm, Digest, SHA256)
|
||||
import qualified Crypto.Hash as Crypto
|
||||
import Amazonka (newEnv, Region(..), Env, Env'(..), setEndpoint, runResourceT, send, chunkedFile, defaultChunkSize)
|
||||
import Amazonka.Auth (discover)
|
||||
import Amazonka (Env, runResourceT, send, chunkedFile, defaultChunkSize)
|
||||
import qualified Amazonka.S3 as S3
|
||||
import qualified Data.Text as T
|
||||
import Control.Lens (view, set, (&), re)
|
||||
import Control.Lens (view, set, (&), (^.))
|
||||
import Data.Generics.Labels ()
|
||||
import Control.Monad (void, forever)
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
@@ -34,13 +33,13 @@ import qualified Data.HashMap.Strict as HM
|
||||
import System.Environment (getEnv)
|
||||
import qualified Dhall
|
||||
import Config (Config)
|
||||
import Data.Text.Strict.Lens (utf8)
|
||||
import Config.Watcher (Seconds)
|
||||
import Control.Concurrent (threadDelay)
|
||||
import UnliftIO (forConcurrently_)
|
||||
import Data.Word (Word64)
|
||||
import Image.Fingerprint (dhash)
|
||||
import Data.Maybe (catMaybes)
|
||||
import qualified S3.Interface
|
||||
|
||||
newtype Routes route
|
||||
= Routes { _getMetrics :: route :- MetricsRoute }
|
||||
@@ -86,28 +85,24 @@ analyze = C.mapM (\p -> Image p <$> hash p <*> fingerprint p)
|
||||
fingerprint = fmap (either (const Nothing) Just) . liftIO . dhash
|
||||
|
||||
|
||||
watcher :: Env -> S3.BucketName -> FilePath -> Seconds -> IO ()
|
||||
watcher env bucket path delay = forever $ do
|
||||
watcher :: S3.Interface.S3Interface -> FilePath -> Seconds -> IO ()
|
||||
watcher int path delay = forever $ do
|
||||
runResourceT $ runConduit $ scan path (isExtensionOf ".jpg")
|
||||
.| analyze
|
||||
.| C.iterM (liftIO . print)
|
||||
.| C.mapM_ (void . liftIO . uploadImage env bucket)
|
||||
.| C.map (\img -> (imagePath img, imageToMetadata img))
|
||||
.| C.mapM_ (void . liftIO . uncurry (S3.Interface.putFile int))
|
||||
threadDelay (fromIntegral (view #seconds delay) * 100_000)
|
||||
|
||||
someFunc :: IO ()
|
||||
someFunc = do
|
||||
confPath <- getEnv "CONFIG_PATH"
|
||||
conf <- Dhall.inputFile (Dhall.auto @Config) confPath
|
||||
discoveredEnv <- newEnv discover
|
||||
let env = discoveredEnv
|
||||
{ region = Region' $ view (#s3 . #region) conf
|
||||
, overrides = setEndpoint True (view (#s3 . #endpoint . re utf8) conf) 443
|
||||
}
|
||||
bucket = S3.BucketName (view (#s3 . #bucket) conf)
|
||||
s3Interface <- S3.Interface.buildInterface (conf ^. #s3)
|
||||
store <- newStore
|
||||
waiMetrics <- registerWaiMetrics store
|
||||
registerGcMetrics store
|
||||
let watchers = [watcher env bucket path (view (#watcher . #period) conf) | path <- view (#watcher . #directories) conf]
|
||||
let watchers = [watcher s3Interface path (view (#watcher . #period) conf) | path <- view (#watcher . #directories) conf]
|
||||
actions = ( runSettings settings (metrics waiMetrics $ app store) ) : watchers
|
||||
forConcurrently_ actions id
|
||||
-- runSettings settings (metrics waiMetrics $ app store)
|
||||
|
||||
65
src/S3/Interface.hs
Normal file
65
src/S3/Interface.hs
Normal file
@@ -0,0 +1,65 @@
|
||||
{-# LANGUAGE OverloadedLabels #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
|
||||
module S3.Interface where
|
||||
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Data.Text (Text)
|
||||
import Config.S3 (ConfigS3)
|
||||
import Amazonka (newEnv, Region(..), Env, Env'(..), setEndpoint, runResourceT, send, chunkedFile, defaultChunkSize, discover, sinkBody, hashedFile, RequestBody (..))
|
||||
import qualified Amazonka.S3 as S3
|
||||
import Control.Lens (view, re, (&), set)
|
||||
import Data.Text.Strict.Lens (utf8)
|
||||
import Data.Generics.Labels ()
|
||||
import qualified Data.Text as T
|
||||
import System.FilePath (takeBaseName, (</>))
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
|
||||
type Metadata = HashMap Text Text
|
||||
type Key = Text
|
||||
|
||||
data S3Interface = S3Interface
|
||||
{ putFile :: FilePath -> Metadata -> IO Key
|
||||
-- ^ Upload a file from path
|
||||
, getFile :: Key -> FilePath -> IO FilePath
|
||||
-- ^ Download a file to a given root
|
||||
, getMetadata :: Key -> IO Metadata
|
||||
-- ^ Fetch the metadata
|
||||
}
|
||||
|
||||
buildInterface :: ConfigS3 -> IO S3Interface
|
||||
buildInterface s3Conf = do
|
||||
discoveredEnv <- newEnv discover
|
||||
let env = discoveredEnv
|
||||
{ region = Region' $ view #region s3Conf
|
||||
, overrides = setEndpoint True (view (#endpoint . re utf8) s3Conf) 443
|
||||
}
|
||||
bucket = S3.BucketName (view #bucket s3Conf)
|
||||
pure S3Interface
|
||||
{ putFile = uploadFile env bucket
|
||||
, getFile = downloadFile env bucket
|
||||
, getMetadata = fetchMeta env bucket
|
||||
}
|
||||
where
|
||||
uploadFile :: Env -> S3.BucketName -> FilePath -> Metadata -> IO Key
|
||||
uploadFile env bucket path metadata = do
|
||||
obj <- Hashed <$> hashedFile path
|
||||
runResourceT $ do
|
||||
let putObj = S3.newPutObject bucket (S3.ObjectKey key) obj & set #metadata metadata
|
||||
key = T.pack . takeBaseName $ path
|
||||
_ <- send env putObj
|
||||
pure key
|
||||
fetchMeta :: Env -> S3.BucketName -> Key -> IO Metadata
|
||||
fetchMeta env bucket key = do
|
||||
runResourceT $ do
|
||||
let getObjHead = S3.newHeadObject bucket (S3.ObjectKey key)
|
||||
view #metadata <$> send env getObjHead
|
||||
downloadFile :: Env -> S3.BucketName -> Key -> FilePath -> IO FilePath
|
||||
downloadFile env bucket key root = do
|
||||
let getObj = S3.newGetObject bucket (S3.ObjectKey key)
|
||||
runResourceT $ do
|
||||
x <- view #body <$> send env getObj
|
||||
let path = root </> T.unpack key
|
||||
_ <- sinkBody x (C.sinkFileBS path)
|
||||
pure path
|
||||
Reference in New Issue
Block a user