Source code for wcraas_storage.config
import logging
from collections import namedtuple
from environs import Env
from wcraas_common import AMQPConfig
[docs]class MongoConfig(namedtuple("MongoConfig", ("host", "port", "db", "user", "password"))):
__slots__ = ()
[docs] @classmethod
def fromenv(cls):
"""
Create a `wcraas_storage.MongoConfig` from Environment Variables.
>>> conf = MongoConfig.fromenv()
>>> type(conf)
<class 'config.MongoConfig'>
>>> conf._fields
('host', 'port', 'db', 'user', 'password')
>>> conf.host
'localhost'
>>> conf.port
27017
"""
env = Env()
env.read_env()
with env.prefixed('MONGO_'):
return cls(
host=env.str("HOST", "localhost"),
port=env.int("PORT", 27017),
db=env.str("DB", "wcraas"),
user=env.str("USER", None),
password=env.str("PASS", None),
)
[docs]class Config(namedtuple("Config", ("amqp", "mongo", "mapping", "loglevel"))):
__slots__ = ()
[docs] @classmethod
def fromenv(cls):
"""
Create a `wcraas_storage.Config` from Environment Variables.
>>> conf = Config.fromenv()
>>> type(conf)
<class 'config.Config'>
>>> conf._fields
('amqp', 'mongo', 'mapping', 'loglevel')
>>> conf.amqp
AMQPConfig(host='localhost', port=5672, user='guest', password='guest')
>>> conf.mongo
mongo = MongoConfig(host='localhost', port=27017, db='wcraas', user=None, password=None)
>>> conf.mapping
{}
>>> conf.loglevel
20
"""
env = Env()
env.read_env()
return cls(
amqp=AMQPConfig.fromenv(),
mongo=MongoConfig.fromenv(),
mapping=env.json("QUEUE_COLLECTION_MAP", "{}") or dict(),
loglevel=getattr(logging, env.str("LOGLEVEL", "INFO")),
)