diff --git a/atst/app.py b/atst/app.py index ae207b01..ddf66a40 100644 --- a/atst/app.py +++ b/atst/app.py @@ -16,7 +16,7 @@ from atst.routes.workspaces import bp as workspace_routes from atst.routes.requests import requests_bp from atst.routes.dev import bp as dev_routes from atst.routes.errors import make_error_pages -from atst.domain.authnid.crl import Validator +from atst.domain.authnid.crl import CRLCache from atst.domain.auth import apply_authentication @@ -141,7 +141,5 @@ def make_crl_validator(app): crl_locations = [] for filename in pathlib.Path(app.config["CRL_DIRECTORY"]).glob("*"): crl_locations.append(filename.absolute()) - app.crl_validator = Validator( - roots=[app.config["CA_CHAIN"]], crl_locations=crl_locations, logger=app.logger - ) + app.crl_cache = CRLCache(app.config["CA_CHAIN"], crl_locations) diff --git a/atst/domain/authnid/__init__.py b/atst/domain/authnid/__init__.py index 80d645b8..b7f31717 100644 --- a/atst/domain/authnid/__init__.py +++ b/atst/domain/authnid/__init__.py @@ -1,17 +1,18 @@ from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.users import Users from .utils import parse_sdn, email_from_certificate +from .crl import CRLRevocationException class AuthenticationContext(): - def __init__(self, crl_validator, auth_status, sdn, cert): + def __init__(self, crl_cache, auth_status, sdn, cert): if None in locals().values(): raise UnauthenticatedError( "Missing required authentication context components" ) - self.crl_validator = crl_validator + self.crl_cache = crl_cache self.auth_status = auth_status self.sdn = sdn self.cert = cert.encode() @@ -21,8 +22,7 @@ class AuthenticationContext(): if not self.auth_status == "SUCCESS": raise UnauthenticatedError("SSL/TLS client authentication failed") - elif not self._crl_check(): - raise UnauthenticatedError("Client certificate failed CRL check") + self._crl_check() return True @@ -44,12 +44,10 @@ class AuthenticationContext(): return None def _crl_check(self): - if self.cert: - result = self.crl_validator.validate(self.cert) - return result - - else: - return False + try: + self.crl_cache.crl_check(self.cert) + except CRLRevocationException as exc: + raise UnauthenticatedError("CRL check failed. " + str(exc)) @property def parsed_sdn(self): diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index 68358e37..cad27b79 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -5,124 +5,88 @@ import hashlib from OpenSSL import crypto, SSL -def sha256_checksum(filename, block_size=65536): - sha256 = hashlib.sha256() - with open(filename, "rb") as f: - for block in iter(lambda: f.read(block_size), b""): - sha256.update(block) - return sha256.hexdigest() +class CRLRevocationException(Exception): + pass -class Validator: +class CRLCache(): _PEM_RE = re.compile( b"-----BEGIN CERTIFICATE-----\r?.+?\r?-----END CERTIFICATE-----\r?\n?", re.DOTALL, ) - def __init__(self, crl_locations=[], roots=[], base_store=crypto.X509Store, logger=None): - self.crl_locations = crl_locations - self.roots = roots - self.base_store = base_store - self.logger = logger - self._reset() + def __init__(self, root_location, crl_locations=[], store_class=crypto.X509Store): + self.store_class = store_class + self.certificate_authorities = {} + self._load_roots(root_location) + self._build_crl_cache(crl_locations) - def _reset(self): - self.cache = {} - self.store = self.base_store() - self._add_crls(self.crl_locations) - self._add_roots(self.roots) - self.store.set_flags(crypto.X509StoreFlags.CRL_CHECK) + def _get_store(self, cert): + return self._build_store(cert.get_issuer().der()) - def log_error(self, message): - if self.logger: - self.logger.error(message) - - def _add_crls(self, locations): - for filename in locations: - try: - self._add_crl(filename) - except crypto.Error as err: - self.log_error( - "CRL could not be parsed. Filename: {}, Error: {}, args: {}".format( - filename, type(err), err.args - ) - ) - - # This caches the CRL issuer with the CRL filepath and a checksum, in addition to adding the CRL to the store. - - def _add_crl(self, filename): - with open(filename, "rb") as crl_file: - crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) - self.cache[crl.get_issuer().der()] = (filename, sha256_checksum(filename)) - self._add_carefully("add_crl", crl) + def _load_roots(self, root_location): + with open(root_location, "rb") as f: + for raw_ca in self._parse_roots(f.read()): + ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca) + self.certificate_authorities[ca.get_subject().der()] = ca def _parse_roots(self, root_str): return [match.group(0) for match in self._PEM_RE.finditer(root_str)] - def _add_roots(self, roots): - for filename in roots: - with open(filename, "rb") as f: - for raw_ca in self._parse_roots(f.read()): - ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca) - self._add_carefully("add_cert", ca) + def _build_crl_cache(self, crl_locations): + self.crl_cache = {} + for crl_location in crl_locations: + crl = self._load_crl(crl_location) + self.crl_cache[crl.get_issuer().der()] = crl_location - # in testing, it seems that openssl is maintaining a local cache of certs - # in a hash table and throws errors if you try to add redundant certs or - # CRLs. For now, we catch and ignore that error with great specificity. + def _load_crl(self, crl_location): + with open(crl_location, "rb") as crl_file: + return crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) - def _add_carefully(self, method_name, obj): - try: - getattr(self.store, method_name)(obj) - except crypto.Error as error: - if self._is_preloaded_error(error): - pass - else: - raise error + def _build_store(self, issuer): + store = self.store_class() + store.set_flags(crypto.X509StoreFlags.CRL_CHECK) + crl_location = self._get_crl_location(issuer) + with open(crl_location, "rb") as crl_file: + crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) + store.add_crl(crl) + store = self._add_certificate_chain_to_store(store, crl.get_issuer()) + return store - PRELOADED_CRL = ( - [ - ( - "x509 certificate routines", - "X509_STORE_add_crl", - "cert already in hash table", - ) - ], - ) - PRELOADED_CERT = ( - [ - ( - "x509 certificate routines", - "X509_STORE_add_cert", - "cert already in hash table", - ) - ], - ) + def _get_crl_location(self, issuer): + crl_location = self.crl_cache.get(issuer) - def _is_preloaded_error(self, error): - return error.args == self.PRELOADED_CRL or error.args == self.PRELOADED_CERT + if not crl_location: + raise CRLRevocationException("Could not find matching CRL for issuer") - # Checks that the CRL currently in-memory is up-to-date via the checksum. + return crl_location - def refresh_cache(self, cert): - der = cert.get_issuer().der() - if der in self.cache: - filename, checksum = self.cache[der] - if sha256_checksum(filename) != checksum: - self._reset() + # this _should_ happen just twice for the DoD PKI (intermediary, root) but + # theoretically it can build a longer certificate chain - def validate(self, cert): + def _add_certificate_chain_to_store(self, store, issuer): + ca = self.certificate_authorities.get(issuer.der()) + store.add_cert(ca) + + if issuer == ca.get_subject(): + # i.e., it is the root CA and we are at the end of the chain + return store + + else: + return self._add_certificate_chain_to_store(store, ca.get_issuer()) + + def crl_check(self, cert): parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert) - self.refresh_cache(parsed) - context = crypto.X509StoreContext(self.store, parsed) + store = self._get_store(parsed) + context = crypto.X509StoreContext(store, parsed) try: context.verify_certificate() return True except crypto.X509StoreContextError as err: - self.log_error( + raise CRLRevocationException( "Certificate revoked or errored. Error: {}. Args: {}".format( type(err), err.args ) ) - return False diff --git a/atst/routes/__init__.py b/atst/routes/__init__.py index f8c4199c..68c83437 100644 --- a/atst/routes/__init__.py +++ b/atst/routes/__init__.py @@ -32,7 +32,7 @@ def catch_all(path): def _make_authentication_context(): return AuthenticationContext( - crl_validator=app.crl_validator, + crl_cache=app.crl_cache, auth_status=request.environ.get("HTTP_X_SSL_CLIENT_VERIFY"), sdn=request.environ.get("HTTP_X_SSL_CLIENT_S_DN"), cert=request.environ.get("HTTP_X_SSL_CLIENT_CERT") diff --git a/tests/domain/authnid/test_authentication_context.py b/tests/domain/authnid/test_authentication_context.py index f2a359af..81a9b5d3 100644 --- a/tests/domain/authnid/test_authentication_context.py +++ b/tests/domain/authnid/test_authentication_context.py @@ -1,6 +1,7 @@ import pytest from atst.domain.authnid import AuthenticationContext +from atst.domain.authnid.crl import CRLCache, CRLRevocationException from atst.domain.exceptions import UnauthenticatedError, NotFoundError from atst.domain.users import Users @@ -10,25 +11,27 @@ from tests.factories import UserFactory CERT = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read() -class MockCRLValidator(): +class MockCRLCache(): + def __init__(self, valid=True): + self.valid = valid - def __init__(self, value): - self.value = value + def crl_check(self, cert): + if self.valid: + return True - def validate(self, cert): - return self.value + raise CRLRevocationException() def test_can_authenticate(): auth_context = AuthenticationContext( - MockCRLValidator(True), "SUCCESS", DOD_SDN, CERT + MockCRLCache(), "SUCCESS", DOD_SDN, CERT ) assert auth_context.authenticate() def test_unsuccessful_status(): auth_context = AuthenticationContext( - MockCRLValidator(True), "FAILURE", DOD_SDN, CERT + MockCRLCache(), "FAILURE", DOD_SDN, CERT ) with pytest.raises(UnauthenticatedError) as excinfo: assert auth_context.authenticate() @@ -39,7 +42,7 @@ def test_unsuccessful_status(): def test_crl_check_fails(): auth_context = AuthenticationContext( - MockCRLValidator(False), "SUCCESS", DOD_SDN, CERT + MockCRLCache(False), "SUCCESS", DOD_SDN, CERT ) with pytest.raises(UnauthenticatedError) as excinfo: assert auth_context.authenticate() @@ -50,7 +53,7 @@ def test_crl_check_fails(): def test_bad_sdn(): auth_context = AuthenticationContext( - MockCRLValidator(True), "SUCCESS", "abc123", CERT + MockCRLCache(), "SUCCESS", "abc123", CERT ) with pytest.raises(UnauthenticatedError) as excinfo: auth_context.get_user() @@ -62,7 +65,7 @@ def test_bad_sdn(): def test_user_exists(): user = UserFactory.create(**DOD_SDN_INFO) auth_context = AuthenticationContext( - MockCRLValidator(True), "SUCCESS", DOD_SDN, CERT + MockCRLCache(), "SUCCESS", DOD_SDN, CERT ) auth_user = auth_context.get_user() @@ -75,7 +78,7 @@ def test_creates_user(): Users.get_by_dod_id(DOD_SDN_INFO["dod_id"]) auth_context = AuthenticationContext( - MockCRLValidator(True), "SUCCESS", DOD_SDN, CERT + MockCRLCache(), "SUCCESS", DOD_SDN, CERT ) user = auth_context.get_user() assert user.dod_id == DOD_SDN_INFO["dod_id"] @@ -85,7 +88,7 @@ def test_creates_user(): def test_user_cert_has_no_email(): cert = open("ssl/client-certs/atat.mil.crt").read() auth_context = AuthenticationContext( - MockCRLValidator(True), "SUCCESS", DOD_SDN, cert + MockCRLCache(), "SUCCESS", DOD_SDN, cert ) user = auth_context.get_user() diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index 1b9fa2ec..7ba3a213 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -4,11 +4,15 @@ import re import os import shutil from OpenSSL import crypto, SSL -from atst.domain.authnid.crl import Validator + +from atst.domain.authnid.crl import CRLCache, CRLRevocationException import atst.domain.authnid.crl.util as util +from tests.mocks import FIXTURE_EMAIL_ADDRESS + class MockX509Store(): + def __init__(self): self.crls = [] self.certs = [] @@ -22,50 +26,71 @@ class MockX509Store(): def set_flags(self, flag): pass + def test_can_build_crl_list(monkeypatch): - location = 'ssl/client-certs/client-ca.der.crl' - validator = Validator(crl_locations=[location], base_store=MockX509Store) - assert len(validator.store.crls) == 1 + location = "ssl/client-certs/client-ca.der.crl" + cache = CRLCache( + "ssl/client-certs/client-ca.crt", + crl_locations=[location], + store_class=MockX509Store, + ) + assert len(cache.crl_cache.keys()) == 1 + def test_can_build_trusted_root_list(): - location = 'ssl/server-certs/ca-chain.pem' - validator = Validator(roots=[location], base_store=MockX509Store) + location = "ssl/server-certs/ca-chain.pem" + cache = CRLCache( + root_location=location, crl_locations=[], store_class=MockX509Store + ) with open(location) as f: content = f.read() - assert len(validator.store.certs) == content.count('BEGIN CERT') + assert len(cache.certificate_authorities.keys()) == content.count("BEGIN CERT") + def test_can_validate_certificate(): - validator = Validator( - roots=['ssl/server-certs/ca-chain.pem'], - crl_locations=['ssl/client-certs/client-ca.der.crl'] - ) - good_cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() - bad_cert = open('ssl/client-certs/bad-atat.mil.crt', 'rb').read() - assert validator.validate(good_cert) - assert validator.validate(bad_cert) == False + cache = CRLCache( + "ssl/server-certs/ca-chain.pem", + crl_locations=["ssl/client-certs/client-ca.der.crl"], + ) + good_cert = open("ssl/client-certs/atat.mil.crt", "rb").read() + bad_cert = open("ssl/client-certs/bad-atat.mil.crt", "rb").read() + assert cache.crl_check(good_cert) + with pytest.raises(CRLRevocationException): + cache.crl_check(bad_cert) + def test_can_dynamically_update_crls(tmpdir): - crl_file = tmpdir.join('test.crl') - shutil.copyfile('ssl/client-certs/client-ca.der.crl', crl_file) - validator = Validator( - roots=['ssl/server-certs/ca-chain.pem'], - crl_locations=[crl_file] - ) - cert = open('ssl/client-certs/atat.mil.crt', 'rb').read() - assert validator.validate(cert) + crl_file = tmpdir.join("test.crl") + shutil.copyfile("ssl/client-certs/client-ca.der.crl", crl_file) + cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[crl_file]) + cert = open("ssl/client-certs/atat.mil.crt", "rb").read() + assert cache.crl_check(cert) # override the original CRL with one that revokes atat.mil.crt - shutil.copyfile('tests/fixtures/test.der.crl', crl_file) - assert validator.validate(cert) == False + shutil.copyfile("tests/fixtures/test.der.crl", crl_file) + with pytest.raises(CRLRevocationException): + assert cache.crl_check(cert) + + +def test_throws_error_for_missing_issuer(): + cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[]) + cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read() + with pytest.raises(CRLRevocationException) as exc: + assert cache.crl_check(cert) + (message,) = exc.value.args + assert "issuer" in message + def test_parse_disa_pki_list(): - with open('tests/fixtures/disa-pki.html') as disa: + with open("tests/fixtures/disa-pki.html") as disa: disa_html = disa.read() crl_list = util.crl_list_from_disa_html(disa_html) - href_matches = re.findall('DOD(ROOT|EMAIL|ID)?CA', disa_html) + href_matches = re.findall("DOD(ROOT|EMAIL|ID)?CA", disa_html) assert len(crl_list) > 0 assert len(crl_list) == len(href_matches) + class MockStreamingResponse(): + def __init__(self, content_chunks, code=200): self.content_chunks = content_chunks self.status_code = code @@ -79,13 +104,19 @@ class MockStreamingResponse(): def __exit__(self, *args): pass + def test_write_crl(tmpdir, monkeypatch): - monkeypatch.setattr('requests.get', lambda u, **kwargs: MockStreamingResponse([b'it worked'])) - crl = 'crl_1' + monkeypatch.setattr( + "requests.get", lambda u, **kwargs: MockStreamingResponse([b"it worked"]) + ) + crl = "crl_1" assert util.write_crl(tmpdir, "random_target_dir", crl) assert [p.basename for p in tmpdir.listdir()] == [crl] - assert [p.read() for p in tmpdir.listdir()] == ['it worked'] + assert [p.read() for p in tmpdir.listdir()] == ["it worked"] + def test_skips_crl_if_it_has_not_been_modified(tmpdir, monkeypatch): - monkeypatch.setattr('requests.get', lambda u, **kwargs: MockStreamingResponse([b'it worked'], 304)) - assert not util.write_crl(tmpdir, "random_target_dir", 'crl_file_name') + monkeypatch.setattr( + "requests.get", lambda u, **kwargs: MockStreamingResponse([b"it worked"], 304) + ) + assert not util.write_crl(tmpdir, "random_target_dir", "crl_file_name") diff --git a/tests/fixtures/test.der.crl b/tests/fixtures/test.der.crl index dc8310f2..24708b0b 100644 Binary files a/tests/fixtures/test.der.crl and b/tests/fixtures/test.der.crl differ