diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index 5c53ffe9..da1d0b4d 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -4,9 +4,11 @@ import re import hashlib from OpenSSL import crypto, SSL + class CRLException(Exception): pass + def sha256_checksum(filename, block_size=65536): sha256 = hashlib.sha256() with open(filename, "rb") as f: @@ -39,13 +41,15 @@ class CRLCache(): ) def __init__(self, root_location, crl_locations=[], store_class=crypto.X509Store): - self.crl_cache = {} self.store_class = store_class + self.certificate_authorities = {} self._load_roots(root_location) - self._build_x509_stores(crl_locations) + self._build_crl_cache(crl_locations) + + def get_store(self, cert): + return self._build_store(cert.get_issuer().der()) def _load_roots(self, root_location): - self.certificate_authorities = {} with open(root_location, "rb") as f: for raw_ca in self._parse_roots(f.read()): ca = crypto.load_certificate(crypto.FILETYPE_PEM, raw_ca) @@ -54,21 +58,25 @@ class CRLCache(): def _parse_roots(self, root_str): return [match.group(0) for match in self._PEM_RE.finditer(root_str)] - def _build_x509_stores(self, crl_locations): - self.x509_stores = {} - for crl_path in crl_locations: - issuer, store = self._build_store(crl_path) - self.x509_stores[issuer] = store + def _build_crl_cache(self, crl_locations): + self.crl_cache = {} + for crl_location in crl_locations: + crl = self._load_crl(crl_location) + self.crl_cache[crl.get_issuer().der()] = crl_location - def _build_store(self, crl_location): + def _load_crl(self, crl_location): + with open(crl_location, "rb") as crl_file: + return crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) + + def _build_store(self, issuer): store = self.store_class() store.set_flags(crypto.X509StoreFlags.CRL_CHECK) + crl_location = self.crl_cache[issuer] with open(crl_location, "rb") as crl_file: crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) - self.crl_cache[crl.get_issuer().der()] = (crl_location, sha256_checksum(crl_location)) store.add_crl(crl) store = self._add_certificate_chain_to_store(store, crl.get_issuer()) - return (crl.get_issuer().der(), store) + return store # this _should_ happen just twice for the DoD PKI (intermediary, root) but # theoretically it can build a longer certificate chain @@ -82,16 +90,3 @@ class CRLCache(): else: return self._add_certificate_chain_to_store(store, ca.get_issuer()) - def get_store(self, cert): - return self._check_cache(cert.get_issuer().der()) - - def _check_cache(self, issuer): - if issuer in self.crl_cache: - filename, checksum = self.crl_cache[issuer] - if sha256_checksum(filename) != checksum: - issuer, store = self._build_store(filename) - self.x509_stores[issuer] = store - return store - else: - return self.x509_stores[issuer] - diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index 34828567..8fdc74a4 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -22,11 +22,12 @@ class MockX509Store(): def set_flags(self, flag): pass + def test_can_build_crl_list(monkeypatch): location = 'ssl/client-certs/client-ca.der.crl' cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=[location], store_class=MockX509Store) - for store in cache.x509_stores.values(): - assert len(store.crls) == 1 + assert len(cache.crl_cache.keys()) == 1 + def test_can_build_trusted_root_list(): location = 'ssl/server-certs/ca-chain.pem'