From b420ff2e5b79947970cb0eafbcf9fffbae8d0531 Mon Sep 17 00:00:00 2001 From: dandds Date: Mon, 25 Feb 2019 13:16:53 -0500 Subject: [PATCH] record CRL expiration in CRL cache --- atst/domain/authnid/crl/__init__.py | 15 +++- tests/domain/authnid/test_crl.py | 116 ++++++++++++++++++++++++++-- 2 files changed, 120 insertions(+), 11 deletions(-) diff --git a/atst/domain/authnid/crl/__init__.py b/atst/domain/authnid/crl/__init__.py index fb81d54d..038f32f1 100644 --- a/atst/domain/authnid/crl/__init__.py +++ b/atst/domain/authnid/crl/__init__.py @@ -81,7 +81,12 @@ class CRLCache(CRLInterface): for crl_location in crl_locations: crl = self._load_crl(crl_location) if crl: - self.crl_cache[crl.get_issuer().der()] = crl_location + issuer_der = crl.get_issuer().der() + expires = crl.to_cryptography().next_update + self.crl_cache[issuer_der] = { + "location": crl_location, + "expires": expires, + } def _load_crl(self, crl_location): with open(crl_location, "rb") as crl_file: @@ -94,23 +99,25 @@ class CRLCache(CRLInterface): store = self.store_class() self._log_info("STORE ID: {}. Building store.".format(id(store))) store.set_flags(crypto.X509StoreFlags.CRL_CHECK) - crl_location = self.crl_cache.get(issuer.der()) + crl_info = self.crl_cache.get(issuer.der(), {}) issuer_name = get_common_name(issuer) - if not crl_location: + if not crl_info: raise CRLRevocationException( "Could not find matching CRL for issuer with Common Name {}".format( issuer_name ) ) - crl = self._load_crl(crl_location) + crl = self._load_crl(crl_info["location"]) store.add_crl(crl) + self._log_info( "STORE ID: {}. Adding CRL with issuer Common Name {}".format( id(store), issuer_name ) ) + store = self._add_certificate_chain_to_store(store, crl.get_issuer()) return store diff --git a/tests/domain/authnid/test_crl.py b/tests/domain/authnid/test_crl.py index 691f66bc..92c6bd73 100644 --- a/tests/domain/authnid/test_crl.py +++ b/tests/domain/authnid/test_crl.py @@ -3,7 +3,14 @@ import pytest import re import os import shutil +from datetime import datetime, timezone, timedelta from OpenSSL import crypto, SSL +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.serialization import Encoding +from cryptography.x509.oid import NameOID from atst.domain.authnid.crl import CRLCache, CRLRevocationException, NoOpCRLCache @@ -25,14 +32,21 @@ class MockX509Store: pass -def test_can_build_crl_list(monkeypatch): - location = "ssl/client-certs/client-ca.der.crl" - cache = CRLCache( - "ssl/client-certs/client-ca.crt", - crl_locations=[location], - store_class=MockX509Store, - ) +def test_can_build_crl_list(ca_key, make_crl, make_x509, tmpdir): + ca = make_x509(ca_key) + ca_out = tmpdir.join("atat.crt") + serialize_pki_object_to_disk(ca, ca_out) + + crl = make_crl(ca_key) + crl_out = tmpdir.join("atat.crl") + serialize_pki_object_to_disk(crl, crl_out, encoding=Encoding.DER) + + cache = CRLCache(ca_out, crl_locations=[crl_out], store_class=MockX509Store) + issuer_der = crl.issuer.public_bytes(default_backend()) assert len(cache.crl_cache.keys()) == 1 + assert issuer_der in cache.crl_cache + assert cache.crl_cache[issuer_der]["location"] == crl_out + assert cache.crl_cache[issuer_der]["expires"] == crl.next_update def test_can_build_trusted_root_list(): @@ -123,3 +137,91 @@ def test_no_op_crl_cache_logs_common_name(): cache = NoOpCRLCache(logger=logger) assert cache.crl_check(cert) assert "ART.GARFUNKEL.1234567890" in logger.messages[-1] + + +def rsa_key(): + return rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + + +@pytest.fixture +def ca_key(): + return rsa_key() + + +@pytest.fixture +def make_x509(): + def _make_x509(private_key, signer_key=None, cn="ATAT", signer_cn="ATAT"): + if signer_key is None: + signer_key = private_key + + one_day = timedelta(1, 0, 0) + public_key = private_key.public_key() + builder = x509.CertificateBuilder() + builder = builder.subject_name( + x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]) + ) + builder = builder.issuer_name( + x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, signer_cn)]) + ) + if signer_key == private_key: + builder = builder.add_extension( + x509.BasicConstraints(ca=True, path_length=None), critical=True + ) + builder = builder.not_valid_before(datetime.today() - (one_day * 2)) + builder = builder.not_valid_after(datetime.today() + (one_day * 30)) + builder = builder.serial_number(x509.random_serial_number()) + builder = builder.public_key(public_key) + certificate = builder.sign( + private_key=signer_key, algorithm=hashes.SHA256(), backend=default_backend() + ) + + return certificate + + return _make_x509 + + +@pytest.fixture +def make_crl(): + def _make_crl(private_key, last_update_days=-1, next_update_days=30, cn="ATAT"): + one_day = timedelta(1, 0, 0) + builder = x509.CertificateRevocationListBuilder() + builder = builder.issuer_name( + x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]) + ) + builder = builder.last_update(datetime.today() + (one_day * last_update_days)) + builder = builder.next_update(datetime.today() + (one_day * next_update_days)) + crl = builder.sign( + private_key=private_key, + algorithm=hashes.SHA256(), + backend=default_backend(), + ) + + return crl + + return _make_crl + + +def serialize_pki_object_to_disk(obj, name, encoding=Encoding.PEM): + with open(name, "wb") as file_: + file_.write(obj.public_bytes(encoding)) + + return name + + +@pytest.mark.skip(reason="not implemented yet") +def test_updates_expired_certs(ca_key, make_crl, make_x509, tmpdir): + ca = make_x509(ca_key) + ca_out = tmpdir.join("atat.crt") + serialize_pki_object_to_disk(ca, ca_out) + + crl = make_crl(ca_key, last_update_days=-7, next_update_days=-1) + crl_out = tmpdir.join("atat.crl") + serialize_pki_object_to_disk(crl, crl_out, encoding=Encoding.DER) + + client_cert = make_x509(rsa_key(), signer_key=ca_key, cn="chewbacca") + client_pem = client_cert.public_bytes(Encoding.PEM) + + crl_cache = CRLCache(ca_out, crl_locations=[crl_out]) + crl_cache.crl_check(client_pem)