handle exceptions when loading CRLs

This commit is contained in:
dandds 2019-01-17 14:20:04 -05:00
parent 9f71b8313c
commit 0457b0a508
2 changed files with 41 additions and 23 deletions

View File

@ -5,6 +5,12 @@ import hashlib
from OpenSSL import crypto, SSL from OpenSSL import crypto, SSL
def get_common_name(x509_name_object):
for comp in x509_name_object.get_components():
if comp[0] == b"CN":
return comp[1].decode()
class CRLRevocationException(Exception): class CRLRevocationException(Exception):
pass pass
@ -25,9 +31,7 @@ class NoOpCRLCache(CRLInterface):
def _get_cn(self, cert): def _get_cn(self, cert):
try: try:
parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert) parsed = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
for comp in parsed.get_subject().get_components(): return get_common_name(parsed.get_subject())
if comp[0] == b"CN":
return comp[1].decode()
except crypto.Error: except crypto.Error:
pass pass
@ -54,14 +58,14 @@ class CRLCache(CRLInterface):
def __init__( def __init__(
self, root_location, crl_locations=[], store_class=crypto.X509Store, logger=None self, root_location, crl_locations=[], store_class=crypto.X509Store, logger=None
): ):
self.logger = logger
self.store_class = store_class self.store_class = store_class
self.certificate_authorities = {} self.certificate_authorities = {}
self._load_roots(root_location) self._load_roots(root_location)
self._build_crl_cache(crl_locations) self._build_crl_cache(crl_locations)
self.logger = logger
def _get_store(self, cert): def _get_store(self, cert):
return self._build_store(cert.get_issuer().der()) return self._build_store(cert.get_issuer())
def _load_roots(self, root_location): def _load_roots(self, root_location):
with open(root_location, "rb") as f: with open(root_location, "rb") as f:
@ -76,36 +80,40 @@ class CRLCache(CRLInterface):
self.crl_cache = {} self.crl_cache = {}
for crl_location in crl_locations: for crl_location in crl_locations:
crl = self._load_crl(crl_location) crl = self._load_crl(crl_location)
if crl:
self.crl_cache[crl.get_issuer().der()] = crl_location self.crl_cache[crl.get_issuer().der()] = crl_location
def _load_crl(self, crl_location): def _load_crl(self, crl_location):
with open(crl_location, "rb") as crl_file: with open(crl_location, "rb") as crl_file:
try:
return crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read()) return crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
except crypto.Error:
self._log_info("Could not load CRL at location {}".format(crl_location))
def _build_store(self, issuer): def _build_store(self, issuer):
store = self.store_class() store = self.store_class()
self._log_info("STORE ID: {}. Building store.".format(id(store))) self._log_info("STORE ID: {}. Building store.".format(id(store)))
store.set_flags(crypto.X509StoreFlags.CRL_CHECK) store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
crl_location = self._get_crl_location(issuer) crl_location = self.crl_cache.get(issuer.der())
with open(crl_location, "rb") as crl_file: issuer_name = get_common_name(issuer)
crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
if not crl_location:
raise CRLRevocationException(
"Could not find matching CRL for issuer with Common Name {}".format(
issuer_name
)
)
crl = self._load_crl(crl_location)
store.add_crl(crl) store.add_crl(crl)
self._log_info( self._log_info(
"STORE ID: {}. Adding CRL with issuer {}".format( "STORE ID: {}. Adding CRL with issuer Common Name {}".format(
id(store), crl.get_issuer() id(store), issuer_name
) )
) )
store = self._add_certificate_chain_to_store(store, crl.get_issuer()) store = self._add_certificate_chain_to_store(store, crl.get_issuer())
return store return store
def _get_crl_location(self, issuer):
crl_location = self.crl_cache.get(issuer)
if not crl_location:
raise CRLRevocationException("Could not find matching CRL for issuer")
return crl_location
# this _should_ happen just twice for the DoD PKI (intermediary, root) but # this _should_ happen just twice for the DoD PKI (intermediary, root) but
# theoretically it can build a longer certificate chain # theoretically it can build a longer certificate chain

View File

@ -46,6 +46,16 @@ def test_can_build_trusted_root_list():
assert len(cache.certificate_authorities.keys()) == content.count("BEGIN CERT") assert len(cache.certificate_authorities.keys()) == content.count("BEGIN CERT")
def test_can_build_crl_list_with_missing_crls():
location = "ssl/client-certs/client-ca.der.crl"
cache = CRLCache(
"ssl/client-certs/client-ca.crt",
crl_locations=["tests/fixtures/sample.pdf"],
store_class=MockX509Store,
)
assert len(cache.crl_cache.keys()) == 0
def test_can_validate_certificate(): def test_can_validate_certificate():
cache = CRLCache( cache = CRLCache(
"ssl/server-certs/ca-chain.pem", "ssl/server-certs/ca-chain.pem",