no op version of CRL cache for disabling crl check

This commit is contained in:
dandds 2019-01-17 10:22:00 -05:00
parent 1ee0c11a63
commit becaec8d42
4 changed files with 62 additions and 14 deletions

View File

@ -18,7 +18,7 @@ from atst.routes.task_orders import task_orders_bp
from atst.routes.dev import bp as dev_routes
from atst.routes.users import bp as user_routes
from atst.routes.errors import make_error_pages
from atst.domain.authnid.crl import CRLCache
from atst.domain.authnid.crl import CRLCache, NoOpCRLCache
from atst.domain.auth import apply_authentication
from atst.domain.authz import Authorization
from atst.domain.csp import make_csp_provider
@ -48,6 +48,8 @@ def make_app(config):
app.config.update({"SESSION_REDIS": app.redis})
make_flask_callbacks(app)
# TODO: deprecate the REQUIRE_CRLs setting in favor of the
# DISABLE_CRL_CHECK; both have the effect of never loading CRLs
if app.config.get("REQUIRE_CRLS"):
make_crl_validator(app)
register_filters(app)
@ -133,6 +135,7 @@ def map_config(config):
"REQUIRE_CRLS": config.getboolean("default", "REQUIRE_CRLS"),
"RQ_REDIS_URL": config["default"]["REDIS_URI"],
"RQ_QUEUES": [config["default"]["RQ_QUEUES"]],
"DISABLE_CRL_CHECK": config.getboolean("default", "DISABLE_CRL_CHECK"),
}
@ -183,10 +186,15 @@ def make_redis(app, config):
def make_crl_validator(app):
if app.config.get("DISABLE_CRL_CHECK"):
app.crl_cache = NoOpCRLCache(logger=app.logger)
else:
crl_locations = []
for filename in pathlib.Path(app.config["CRL_DIRECTORY"]).glob("*.crl"):
crl_locations.append(filename.absolute())
app.crl_cache = CRLCache(app.config["CA_CHAIN"], crl_locations, logger=app.logger)
app.crl_cache = CRLCache(
app.config["CA_CHAIN"], crl_locations, logger=app.logger
)
def make_eda_client(app):

View File

@ -9,7 +9,42 @@ class CRLRevocationException(Exception):
pass
class CRLCache:
class CRLInterface:
def __init__(self, *args, logger=None, **kwargs):
self.logger = logger
def _log_info(self, message):
if self.logger:
self.logger.info(message)
def crl_check(self, cert):
raise NotImplementedError()
class NoOpCRLCache(CRLInterface):
def _get_cn(self, cert):
try:
parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
for comp in parsed.get_subject().get_components():
if comp[0] == b"CN":
return comp[1].decode()
except crypto.Error:
pass
return "unknown"
def crl_check(self, cert):
cn = self._get_cn(cert)
self._log_info(
"Did not perform CRL validation for certificate with Common Name '{}'".format(
cn
)
)
return True
class CRLCache(CRLInterface):
_PEM_RE = re.compile(
b"-----BEGIN CERTIFICATE-----\r?.+?\r?-----END CERTIFICATE-----\r?\n?",
@ -25,10 +60,6 @@ class CRLCache:
self._build_crl_cache(crl_locations)
self.logger = logger
def log_info(self, message):
if self.logger:
self.logger.info(message)
def _get_store(self, cert):
return self._build_store(cert.get_issuer().der())
@ -53,13 +84,13 @@ class CRLCache:
def _build_store(self, issuer):
store = self.store_class()
self.log_info("STORE ID: {}. Building store.".format(id(store)))
self._log_info("STORE ID: {}. Building store.".format(id(store)))
store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
crl_location = self._get_crl_location(issuer)
with open(crl_location, "rb") as crl_file:
crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
store.add_crl(crl)
self.log_info(
self._log_info(
"STORE ID: {}. Adding CRL with issuer {}".format(
id(store), crl.get_issuer()
)
@ -81,7 +112,7 @@ class CRLCache:
def _add_certificate_chain_to_store(self, store, issuer):
ca = self.certificate_authorities.get(issuer.der())
store.add_cert(ca)
self.log_info(
self._log_info(
"STORE ID: {}. Adding CA with subject {}".format(
id(store), ca.get_subject()
)

View File

@ -4,6 +4,7 @@ CA_CHAIN = ssl/server-certs/ca-chain.pem
CLASSIFIED = false
COOKIE_SECRET = some-secret-please-replace
CRL_DIRECTORY = crl
DISABLE_CRL_CHECK = false
DEBUG = true
ENVIRONMENT = dev
PERMANENT_SESSION_LIFETIME = 600

View File

@ -5,7 +5,7 @@ import os
import shutil
from OpenSSL import crypto, SSL
from atst.domain.authnid.crl import CRLCache, CRLRevocationException
from atst.domain.authnid.crl import CRLCache, CRLRevocationException, NoOpCRLCache
import atst.domain.authnid.crl.util as util
from tests.mocks import FIXTURE_EMAIL_ADDRESS
@ -161,3 +161,11 @@ def test_refresh_crls_with_error(tmpdir, monkeypatch):
util.refresh_crls(tmpdir, tmpdir, logger)
assert "Error downloading {}".format(fake_crl) in logger.messages[-1]
def test_no_op_crl_cache_logs_common_name():
logger = FakeLogger()
cert = open("ssl/client-certs/atat.mil.crt", "rb").read()
cache = NoOpCRLCache(logger=logger)
assert cache.crl_check(cert)
assert "ART.GARFUNKEL.1234567890" in logger.messages[-1]