CRL Provider for syncing CRLs from cached source

This commit is contained in:
dandds
2019-02-24 13:47:38 -05:00
parent beea7d11da
commit 9aa15d57e8
11 changed files with 66 additions and 210 deletions

View File

@@ -6,7 +6,6 @@ import shutil
from OpenSSL import crypto, SSL
from atst.domain.authnid.crl import CRLCache, CRLRevocationException, NoOpCRLCache
import atst.domain.authnid.crl.util as util
from tests.mocks import FIXTURE_EMAIL_ADDRESS, DOD_CN
@@ -104,47 +103,6 @@ def test_multistep_certificate_chain():
assert cache.crl_check(cert)
def test_parse_disa_pki_list():
with open("tests/fixtures/disa-pki.html") as disa:
disa_html = disa.read()
crl_list = util.crl_list_from_disa_html(disa_html)
href_matches = re.findall("DOD(ROOT|EMAIL|ID)?CA", disa_html)
assert len(crl_list) > 0
assert len(crl_list) == len(href_matches)
class MockStreamingResponse:
def __init__(self, content_chunks, code=200):
self.content_chunks = content_chunks
self.status_code = code
def iter_content(self, chunk_size=0):
return self.content_chunks
def __enter__(self):
return self
def __exit__(self, *args):
pass
def test_write_crl(tmpdir, monkeypatch):
monkeypatch.setattr(
"requests.get", lambda u, **kwargs: MockStreamingResponse([b"it worked"])
)
crl = "crl_1"
assert util.write_crl(tmpdir, "random_target_dir", crl)
assert [p.basename for p in tmpdir.listdir()] == [crl]
assert [p.read() for p in tmpdir.listdir()] == ["it worked"]
def test_skips_crl_if_it_has_not_been_modified(tmpdir, monkeypatch):
monkeypatch.setattr(
"requests.get", lambda u, **kwargs: MockStreamingResponse([b"it worked"], 304)
)
assert not util.write_crl(tmpdir, "random_target_dir", "crl_file_name")
class FakeLogger:
def __init__(self):
self.messages = []
@@ -159,26 +117,6 @@ class FakeLogger:
self.messages.append(msg)
def test_refresh_crls_with_error(tmpdir, monkeypatch):
def _mock_create_connection(*args, **kwargs):
raise TimeoutError
fake_crl = "https://fakecrl.com/fake.crl"
monkeypatch.setattr(
"urllib3.util.connection.create_connection", _mock_create_connection
)
monkeypatch.setattr("atst.domain.authnid.crl.util.fetch_disa", lambda *args: None)
monkeypatch.setattr(
"atst.domain.authnid.crl.util.crl_list_from_disa_html", lambda *args: [fake_crl]
)
logger = FakeLogger()
util.refresh_crls(tmpdir, tmpdir, logger)
assert "Error downloading {}".format(fake_crl) in logger.messages[-1]
def test_no_op_crl_cache_logs_common_name():
logger = FakeLogger()
cert = open("ssl/client-certs/atat.mil.crt", "rb").read()