From 0457b0a50875b4b00f24f32324b499775094ae67 Mon Sep 17 00:00:00 2001 From: dandds Date: Thu, 17 Jan 2019 14:20:04 -0500 Subject: [PATCH 1/2] handle exceptions when loading CRLs --- atst/domain/authnid/crl/__init__.py | 54 +++++++++++++++++------------ tests/domain/authnid/test_crl.py | 10 ++++++ 2 files changed, 41 insertions(+), 23 deletions(-) diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index 673f1a5c..fb81d54d 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -5,6 +5,12 @@ import hashlib from OpenSSL import crypto, SSL +def get_common_name(x509_name_object): + for comp in x509_name_object.get_components(): + if comp[0] == b"CN": + return comp[1].decode() + + class CRLRevocationException(Exception): pass @@ -25,9 +31,7 @@ class NoOpCRLCache(CRLInterface): def _get_cn(self, cert): try: parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert) - for comp in parsed.get_subject().get_components(): - if comp[0] == b"CN": - return comp[1].decode() + return get_common_name(parsed.get_subject()) except crypto.Error: pass @@ -54,14 +58,14 @@ class CRLCache(CRLInterface): def __init__( self, root_location, crl_locations=[], store_class=crypto.X509Store, logger=None ): + self.logger = logger self.store_class = store_class self.certificate_authorities = {} self._load_roots(root_location) self._build_crl_cache(crl_locations) - self.logger = logger def _get_store(self, cert): - return self._build_store(cert.get_issuer().der()) + return self._build_store(cert.get_issuer()) def _load_roots(self, root_location): with open(root_location, "rb") as f: @@ -76,35 +80,39 @@ class CRLCache(CRLInterface): self.crl_cache = {} for crl_location in crl_locations: crl = self._load_crl(crl_location) - self.crl_cache[crl.get_issuer().der()] = crl_location + if crl: + self.crl_cache[crl.get_issuer().der()] = crl_location def _load_crl(self, crl_location): with open(crl_location, "rb") as crl_file: - return crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) + try: + return crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) + except crypto.Error: + self._log_info("Could not load CRL at location {}".format(crl_location)) def _build_store(self, issuer): store = self.store_class() self._log_info("STORE ID: {}. Building store.".format(id(store))) store.set_flags(crypto.X509StoreFlags.CRL_CHECK) - crl_location = self._get_crl_location(issuer) - with open(crl_location, "rb") as crl_file: - crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) - store.add_crl(crl) - self._log_info( - "STORE ID: {}. Adding CRL with issuer {}".format( - id(store), crl.get_issuer() - ) - ) - store = self._add_certificate_chain_to_store(store, crl.get_issuer()) - return store - - def _get_crl_location(self, issuer): - crl_location = self.crl_cache.get(issuer) + crl_location = self.crl_cache.get(issuer.der()) + issuer_name = get_common_name(issuer) if not crl_location: - raise CRLRevocationException("Could not find matching CRL for issuer") + raise CRLRevocationException( + "Could not find matching CRL for issuer with Common Name {}".format( + issuer_name + ) + ) - return crl_location + crl = self._load_crl(crl_location) + store.add_crl(crl) + self._log_info( + "STORE ID: {}. Adding CRL with issuer Common Name {}".format( + id(store), issuer_name + ) + ) + store = self._add_certificate_chain_to_store(store, crl.get_issuer()) + return store # this _should_ happen just twice for the DoD PKI (intermediary, root) but # theoretically it can build a longer certificate chain diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index 48daecd5..8ea7c008 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -46,6 +46,16 @@ def test_can_build_trusted_root_list(): assert len(cache.certificate_authorities.keys()) == content.count("BEGIN CERT") +def test_can_build_crl_list_with_missing_crls(): + location = "ssl/client-certs/client-ca.der.crl" + cache = CRLCache( + "ssl/client-certs/client-ca.crt", + crl_locations=["tests/fixtures/sample.pdf"], + store_class=MockX509Store, + ) + assert len(cache.crl_cache.keys()) == 0 + + def test_can_validate_certificate(): cache = CRLCache( "ssl/server-certs/ca-chain.pem", From 9d141d1ea4d62f1fe56eb810d1a71c53bdead872 Mon Sep 17 00:00:00 2001 From: dandds Date: Fri, 18 Jan 2019 10:33:43 -0500 Subject: [PATCH 2/2] clarify test of missing CRL --- tests/domain/authnid/test_crl.py | 8 +++++++- tests/mocks.py | 3 ++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index 8ea7c008..c03d353b 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -8,7 +8,7 @@ from OpenSSL import crypto, SSL from atst.domain.authnid.crl import CRLCache, CRLRevocationException, NoOpCRLCache import atst.domain.authnid.crl.util as util -from tests.mocks import FIXTURE_EMAIL_ADDRESS +from tests.mocks import FIXTURE_EMAIL_ADDRESS, DOD_CN class MockX509Store: @@ -82,11 +82,17 @@ def test_can_dynamically_update_crls(tmpdir): def test_throws_error_for_missing_issuer(): cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[]) + # this cert is self-signed, and so the application does not have a + # corresponding CRL for it cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read() with pytest.raises(CRLRevocationException) as exc: assert cache.crl_check(cert) (message,) = exc.value.args + # objects that the issuer is missing assert "issuer" in message + # names the issuer we were expecting to find a CRL for; same as the + # certificate subject in this case because the cert is self-signed + assert DOD_CN in message def test_multistep_certificate_chain(): diff --git a/tests/mocks.py b/tests/mocks.py index d5995f1b..f5e9c2d3 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -2,7 +2,8 @@ from tests.factories import RequestFactory, UserFactory DOD_SDN_INFO = {"first_name": "ART", "last_name": "GARFUNKEL", "dod_id": "5892460358"} -DOD_SDN = f"CN={DOD_SDN_INFO['last_name']}.{DOD_SDN_INFO['first_name']}.G.{DOD_SDN_INFO['dod_id']},OU=OTHER,OU=PKI,OU=DoD,O=U.S. Government,C=US" +DOD_CN = f"{DOD_SDN_INFO['last_name']}.{DOD_SDN_INFO['first_name']}.G.{DOD_SDN_INFO['dod_id']}" +DOD_SDN = f"CN={DOD_CN},OU=OTHER,OU=PKI,OU=DoD,O=U.S. Government,C=US" MOCK_VALID_PE_ID = "080675309U"