import os import re from configparser import ConfigParser import pendulum from flask import Flask, request, g, session, url_for as flask_url_for from flask_session import Session import redis from unipath import Path from flask_wtf.csrf import CSRFProtect from urllib.parse import urljoin from atat.database import db from atat.assets import environment as assets_environment from atat.filters import register_filters from atat.routes import bp from atat.routes.portfolios import portfolios_bp as portfolio_routes from atat.routes.task_orders import task_orders_bp from atat.routes.applications import applications_bp from atat.routes.dev import bp as dev_routes from atat.routes.users import bp as user_routes from atat.routes.errors import make_error_pages from atat.routes.ccpo import bp as ccpo_routes from atat.domain.authnid.crl import CRLCache, NoOpCRLCache from atat.domain.auth import apply_authentication from atat.domain.authz import Authorization from atat.domain.csp import make_csp_provider from atat.domain.portfolios import Portfolios from atat.models.permissions import Permissions from atat.queue import celery, update_celery from atat.utils import mailer from atat.utils.form_cache import FormCache from atat.utils.json import CustomJSONEncoder, sqlalchemy_dumps from atat.utils.notification_sender import NotificationSender from atat.utils.session_limiter import SessionLimiter from logging.config import dictConfig from atat.utils.logging import JsonFormatter, RequestContextFilter from atat.utils.context_processors import assign_resources ENV = os.getenv("FLASK_ENV", "dev") def make_app(config): if ENV == "prod" or config.get("LOG_JSON"): apply_json_logger() parent_dir = Path().parent app = Flask( __name__, template_folder=str(object=parent_dir.child("templates").absolute()), static_folder=str(object=parent_dir.child("static").absolute()), ) app.json_encoder = CustomJSONEncoder make_redis(app, config) csrf = CSRFProtect() app.config.update(config) app.config.update({"SESSION_REDIS": app.redis}) update_celery(celery, app) make_flask_callbacks(app) register_filters(app) register_jinja_globals(app) make_csp_provider(app, config.get("CSP", "mock")) make_crl_validator(app) make_mailer(app) make_notification_sender(app) db.init_app(app) csrf.init_app(app) Session(app) make_session_limiter(app, session, config) assets_environment.init_app(app) make_error_pages(app) app.register_blueprint(bp) app.register_blueprint(portfolio_routes) app.register_blueprint(task_orders_bp) app.register_blueprint(applications_bp) app.register_blueprint(user_routes) app.register_blueprint(ccpo_routes) if ENV != "prod": app.register_blueprint(dev_routes) app.form_cache = FormCache(app.redis) apply_authentication(app) set_default_headers(app) @app.before_request def _set_resources(): assign_resources(request.view_args) return app def make_flask_callbacks(app): @app.before_request def _set_globals(): g.current_user = None g.dev = os.getenv("FLASK_ENV", "dev") == "dev" g.matchesPath = lambda href: re.search(href, request.full_path) g.modal = request.args.get("modal", None) g.Authorization = Authorization g.Permissions = Permissions @app.context_processor def _portfolios(): if not g.current_user: return {} portfolios = Portfolios.for_user(g.current_user) return {"portfolios": portfolios} @app.after_request def _cleanup(response): g.current_user = None g.portfolio = None g.application = None g.task_order = None return response def set_default_headers(app): # pragma: no cover static_url = app.config.get("STATIC_URL") blob_storage_url = app.config.get("BLOB_STORAGE_URL") @app.after_request def _set_security_headers(response): response.headers[ "Strict-Transport-Security" ] = "max-age=31536000; includeSubDomains" response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "SAMEORIGIN" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Access-Control-Allow-Origin"] = app.config.get("CDN_ORIGIN") if ENV == "dev": response.headers[ "Content-Security-Policy" ] = "default-src 'self' 'unsafe-eval' 'unsafe-inline'; connect-src *" else: response.headers[ "Content-Security-Policy" ] = f"default-src 'self' 'unsafe-eval' 'unsafe-inline' {blob_storage_url} {static_url}" return response def map_config(config): return { **config["default"], "USE_AUDIT_LOG": config["default"].getboolean("USE_AUDIT_LOG"), "ENV": config["default"]["ENVIRONMENT"], "DEBUG": config["default"].getboolean("DEBUG"), "DEBUG_MAILER": config["default"].getboolean("DEBUG_MAILER"), "SQLALCHEMY_ECHO": config["default"].getboolean("SQLALCHEMY_ECHO"), "PORT": int(config["default"]["PORT"]), "SQLALCHEMY_DATABASE_URI": config["default"]["DATABASE_URI"], "SQLALCHEMY_TRACK_MODIFICATIONS": False, "SQLALCHEMY_ENGINE_OPTIONS": { "json_serializer": sqlalchemy_dumps, "connect_args": { "sslmode": config["default"]["PGSSLMODE"], "sslrootcert": config["default"]["PGSSLROOTCERT"], }, }, "WTF_CSRF_ENABLED": config.getboolean("default", "WTF_CSRF_ENABLED"), "PERMANENT_SESSION_LIFETIME": config.getint( "default", "PERMANENT_SESSION_LIFETIME" ), "DISABLE_CRL_CHECK": config.getboolean("default", "DISABLE_CRL_CHECK"), "CRL_FAIL_OPEN": config.getboolean("default", "CRL_FAIL_OPEN"), "LOG_JSON": config.getboolean("default", "LOG_JSON"), "LIMIT_CONCURRENT_SESSIONS": config.getboolean( "default", "LIMIT_CONCURRENT_SESSIONS" ), # Store the celery task results in a database table (celery_taskmeta) "CELERY_RESULT_BACKEND": "db+{}".format(config.get("default", "DATABASE_URI")), # Do not automatically delete results (by default, Celery will do this # with a Beat job once a day) "CELERY_RESULT_EXPIRES": 0, "CELERY_RESULT_EXTENDED": True, "OFFICE_365_DOMAIN": "onmicrosoft.com", "CONTRACT_START_DATE": pendulum.from_format( config.get("default", "CONTRACT_START_DATE"), "YYYY-MM-DD" ).date(), "CONTRACT_END_DATE": pendulum.from_format( config.get("default", "CONTRACT_END_DATE"), "YYYY-MM-DD" ).date(), "SESSION_COOKIE_SECURE": config.getboolean("default", "SESSION_COOKIE_SECURE"), } def make_config(direct_config=None): BASE_CONFIG_FILENAME = os.path.join(os.path.dirname(__file__), "../config/base.ini") ENV_CONFIG_FILENAME = os.path.join( os.path.dirname(__file__), "../config/", "{}.ini".format(ENV.lower()) ) OVERRIDE_CONFIG_DIRECTORY = os.getenv("OVERRIDE_CONFIG_DIRECTORY") config = ConfigParser(allow_no_value=True) config.optionxform = str config_files = [BASE_CONFIG_FILENAME, ENV_CONFIG_FILENAME] # ENV_CONFIG will override values in BASE_CONFIG. config.read(config_files) if OVERRIDE_CONFIG_DIRECTORY: apply_config_from_directory(OVERRIDE_CONFIG_DIRECTORY, config) # Check for ENV variables as a final source of overrides apply_config_from_environment(config) # override if a dictionary of options has been given if direct_config: config.read_dict({"default": direct_config}) # Assemble DATABASE_URI value database_uri = "postgresql://{}:{}@{}:{}/{}".format( # pragma: allowlist secret config.get("default", "PGUSER"), config.get("default", "PGPASSWORD"), config.get("default", "PGHOST"), config.get("default", "PGPORT"), config.get("default", "PGDATABASE"), ) config.set("default", "DATABASE_URI", database_uri) # Assemble REDIS_URI value redis_use_tls = config["default"].getboolean("REDIS_TLS") redis_uri = "redis{}://{}:{}@{}".format( # pragma: allowlist secret ("s" if redis_use_tls else ""), (config.get("default", "REDIS_USER") or ""), (config.get("default", "REDIS_PASSWORD") or ""), config.get("default", "REDIS_HOST"), ) celery_uri = redis_uri if redis_use_tls: tls_mode = config.get("default", "REDIS_SSLMODE") tls_mode_str = tls_mode.lower() if tls_mode else "none" redis_uri = f"{redis_uri}/?ssl_cert_reqs={tls_mode_str}" # TODO: Kombu, one of Celery's dependencies, still requires # that ssl_cert_reqs be passed as the string version of an # option on the ssl module. We can clean this up and use # the REDIS_URI for both when this PR to Kombu is released: # https://github.com/celery/kombu/pull/1139 kombu_modes = { "none": "CERT_NONE", "required": "CERT_REQUIRED", "optional": "CERT_OPTIONAL", } celery_tls_mode_str = kombu_modes[tls_mode_str] celery_uri = f"{celery_uri}/?ssl_cert_reqs={celery_tls_mode_str}" config.set("default", "REDIS_URI", redis_uri) config.set("default", "BROKER_URL", celery_uri) return map_config(config) def apply_config_from_directory(config_dir, config, section="default"): """ Loop files in a directory, check if the names correspond to known config values, and apply the file contents as the value for that setting if they do. """ for confsetting in os.listdir(config_dir): if confsetting in config.options(section): full_path = os.path.join(config_dir, confsetting) with open(full_path, "r") as conf_file: config.set(section, confsetting, conf_file.read().strip()) return config def apply_config_from_environment(config, section="default"): """ Loops all the configuration settins in a given section of a config object and checks whether those settings also exist as environment variables. If so, it applies the environment variables value as the new configuration setting value. """ for confsetting in config.options(section): env_override = os.getenv(confsetting.upper()) if env_override: config.set(section, confsetting, env_override) return config def make_redis(app, config): r = redis.Redis.from_url(config["REDIS_URI"]) app.redis = r def make_crl_validator(app): if app.config.get("DISABLE_CRL_CHECK"): app.crl_cache = NoOpCRLCache(logger=app.logger) else: crl_dir = app.config["CRL_STORAGE_CONTAINER"] if not os.path.isdir(crl_dir): os.makedirs(crl_dir, exist_ok=True) app.crl_cache = CRLCache(app.config["CA_CHAIN"], crl_dir, logger=app.logger,) def make_mailer(app): if app.config["DEBUG"] or app.config["DEBUG_MAILER"]: mailer_connection = mailer.RedisConnection(app.redis) else: mailer_connection = mailer.SMTPConnection( server=app.config.get("MAIL_SERVER"), port=app.config.get("MAIL_PORT"), username=app.config.get("MAIL_SENDER"), password=app.config.get("MAIL_PASSWORD"), use_tls=app.config.get("MAIL_TLS"), ) sender = app.config.get("MAIL_SENDER") app.mailer = mailer.Mailer(mailer_connection, sender) def make_notification_sender(app): app.notification_sender = NotificationSender() def make_session_limiter(app, session, config): app.session_limiter = SessionLimiter(config, session, app.redis) def apply_json_logger(): dictConfig( { "version": 1, "formatters": {"default": {"()": lambda *a, **k: JsonFormatter()}}, "filters": {"requests": {"()": lambda *a, **k: RequestContextFilter()}}, "handlers": { "wsgi": { "class": "logging.StreamHandler", "stream": "ext://flask.logging.wsgi_errors_stream", "formatter": "default", "filters": ["requests"], } }, "root": {"level": "INFO", "handlers": ["wsgi"]}, } ) def register_jinja_globals(app): static_url = app.config.get("STATIC_URL", "/static/") def _url_for(endpoint, **values): if endpoint == "static": filename = values["filename"] return urljoin(static_url, filename) else: return flask_url_for(endpoint, **values) app.jinja_env.globals["url_for"] = _url_for