record CRL expiration in CRL cache

This commit is contained in:
dandds 2019-02-25 13:16:53 -05:00
parent 725042ab76
commit b420ff2e5b
2 changed files with 120 additions and 11 deletions

View File

@ -81,7 +81,12 @@ class CRLCache(CRLInterface):
for crl_location in crl_locations: for crl_location in crl_locations:
crl = self._load_crl(crl_location) crl = self._load_crl(crl_location)
if crl: if crl:
self.crl_cache[crl.get_issuer().der()] = crl_location issuer_der = crl.get_issuer().der()
expires = crl.to_cryptography().next_update
self.crl_cache[issuer_der] = {
"location": crl_location,
"expires": expires,
}
def _load_crl(self, crl_location): def _load_crl(self, crl_location):
with open(crl_location, "rb") as crl_file: with open(crl_location, "rb") as crl_file:
@ -94,23 +99,25 @@ class CRLCache(CRLInterface):
store = self.store_class() store = self.store_class()
self._log_info("STORE ID: {}. Building store.".format(id(store))) self._log_info("STORE ID: {}. Building store.".format(id(store)))
store.set_flags(crypto.X509StoreFlags.CRL_CHECK) store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
crl_location = self.crl_cache.get(issuer.der()) crl_info = self.crl_cache.get(issuer.der(), {})
issuer_name = get_common_name(issuer) issuer_name = get_common_name(issuer)
if not crl_location: if not crl_info:
raise CRLRevocationException( raise CRLRevocationException(
"Could not find matching CRL for issuer with Common Name {}".format( "Could not find matching CRL for issuer with Common Name {}".format(
issuer_name issuer_name
) )
) )
crl = self._load_crl(crl_location) crl = self._load_crl(crl_info["location"])
store.add_crl(crl) store.add_crl(crl)
self._log_info( self._log_info(
"STORE ID: {}. Adding CRL with issuer Common Name {}".format( "STORE ID: {}. Adding CRL with issuer Common Name {}".format(
id(store), issuer_name id(store), issuer_name
) )
) )
store = self._add_certificate_chain_to_store(store, crl.get_issuer()) store = self._add_certificate_chain_to_store(store, crl.get_issuer())
return store return store

View File

@ -3,7 +3,14 @@ import pytest
import re import re
import os import os
import shutil import shutil
from datetime import datetime, timezone, timedelta
from OpenSSL import crypto, SSL from OpenSSL import crypto, SSL
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509.oid import NameOID
from atst.domain.authnid.crl import CRLCache, CRLRevocationException, NoOpCRLCache from atst.domain.authnid.crl import CRLCache, CRLRevocationException, NoOpCRLCache
@ -25,14 +32,21 @@ class MockX509Store:
pass pass
def test_can_build_crl_list(monkeypatch): def test_can_build_crl_list(ca_key, make_crl, make_x509, tmpdir):
location = "ssl/client-certs/client-ca.der.crl" ca = make_x509(ca_key)
cache = CRLCache( ca_out = tmpdir.join("atat.crt")
"ssl/client-certs/client-ca.crt", serialize_pki_object_to_disk(ca, ca_out)
crl_locations=[location],
store_class=MockX509Store, crl = make_crl(ca_key)
) crl_out = tmpdir.join("atat.crl")
serialize_pki_object_to_disk(crl, crl_out, encoding=Encoding.DER)
cache = CRLCache(ca_out, crl_locations=[crl_out], store_class=MockX509Store)
issuer_der = crl.issuer.public_bytes(default_backend())
assert len(cache.crl_cache.keys()) == 1 assert len(cache.crl_cache.keys()) == 1
assert issuer_der in cache.crl_cache
assert cache.crl_cache[issuer_der]["location"] == crl_out
assert cache.crl_cache[issuer_der]["expires"] == crl.next_update
def test_can_build_trusted_root_list(): def test_can_build_trusted_root_list():
@ -123,3 +137,91 @@ def test_no_op_crl_cache_logs_common_name():
cache = NoOpCRLCache(logger=logger) cache = NoOpCRLCache(logger=logger)
assert cache.crl_check(cert) assert cache.crl_check(cert)
assert "ART.GARFUNKEL.1234567890" in logger.messages[-1] assert "ART.GARFUNKEL.1234567890" in logger.messages[-1]
def rsa_key():
return rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
@pytest.fixture
def ca_key():
return rsa_key()
@pytest.fixture
def make_x509():
def _make_x509(private_key, signer_key=None, cn="ATAT", signer_cn="ATAT"):
if signer_key is None:
signer_key = private_key
one_day = timedelta(1, 0, 0)
public_key = private_key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])
)
builder = builder.issuer_name(
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, signer_cn)])
)
if signer_key == private_key:
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None), critical=True
)
builder = builder.not_valid_before(datetime.today() - (one_day * 2))
builder = builder.not_valid_after(datetime.today() + (one_day * 30))
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(public_key)
certificate = builder.sign(
private_key=signer_key, algorithm=hashes.SHA256(), backend=default_backend()
)
return certificate
return _make_x509
@pytest.fixture
def make_crl():
def _make_crl(private_key, last_update_days=-1, next_update_days=30, cn="ATAT"):
one_day = timedelta(1, 0, 0)
builder = x509.CertificateRevocationListBuilder()
builder = builder.issuer_name(
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])
)
builder = builder.last_update(datetime.today() + (one_day * last_update_days))
builder = builder.next_update(datetime.today() + (one_day * next_update_days))
crl = builder.sign(
private_key=private_key,
algorithm=hashes.SHA256(),
backend=default_backend(),
)
return crl
return _make_crl
def serialize_pki_object_to_disk(obj, name, encoding=Encoding.PEM):
with open(name, "wb") as file_:
file_.write(obj.public_bytes(encoding))
return name
@pytest.mark.skip(reason="not implemented yet")
def test_updates_expired_certs(ca_key, make_crl, make_x509, tmpdir):
ca = make_x509(ca_key)
ca_out = tmpdir.join("atat.crt")
serialize_pki_object_to_disk(ca, ca_out)
crl = make_crl(ca_key, last_update_days=-7, next_update_days=-1)
crl_out = tmpdir.join("atat.crl")
serialize_pki_object_to_disk(crl, crl_out, encoding=Encoding.DER)
client_cert = make_x509(rsa_key(), signer_key=ca_key, cn="chewbacca")
client_pem = client_cert.public_bytes(Encoding.PEM)
crl_cache = CRLCache(ca_out, crl_locations=[crl_out])
crl_cache.crl_check(client_pem)