188 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			188 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Import installed packages
 | |
| import pytest
 | |
| import re
 | |
| import os
 | |
| import shutil
 | |
| from OpenSSL import crypto, SSL
 | |
| 
 | |
| from atst.domain.authnid.crl import CRLCache, CRLRevocationException, NoOpCRLCache
 | |
| import atst.domain.authnid.crl.util as util
 | |
| 
 | |
| from tests.mocks import FIXTURE_EMAIL_ADDRESS, DOD_CN
 | |
| 
 | |
| 
 | |
| class MockX509Store:
 | |
|     def __init__(self):
 | |
|         self.crls = []
 | |
|         self.certs = []
 | |
| 
 | |
|     def add_crl(self, crl):
 | |
|         self.crls.append(crl)
 | |
| 
 | |
|     def add_cert(self, cert):
 | |
|         self.certs.append(cert)
 | |
| 
 | |
|     def set_flags(self, flag):
 | |
|         pass
 | |
| 
 | |
| 
 | |
| def test_can_build_crl_list(monkeypatch):
 | |
|     location = "ssl/client-certs/client-ca.der.crl"
 | |
|     cache = CRLCache(
 | |
|         "ssl/client-certs/client-ca.crt",
 | |
|         crl_locations=[location],
 | |
|         store_class=MockX509Store,
 | |
|     )
 | |
|     assert len(cache.crl_cache.keys()) == 1
 | |
| 
 | |
| 
 | |
| def test_can_build_trusted_root_list():
 | |
|     location = "ssl/server-certs/ca-chain.pem"
 | |
|     cache = CRLCache(
 | |
|         root_location=location, crl_locations=[], store_class=MockX509Store
 | |
|     )
 | |
|     with open(location) as f:
 | |
|         content = f.read()
 | |
|         assert len(cache.certificate_authorities.keys()) == content.count("BEGIN CERT")
 | |
| 
 | |
| 
 | |
| def test_can_build_crl_list_with_missing_crls():
 | |
|     location = "ssl/client-certs/client-ca.der.crl"
 | |
|     cache = CRLCache(
 | |
|         "ssl/client-certs/client-ca.crt",
 | |
|         crl_locations=["tests/fixtures/sample.pdf"],
 | |
|         store_class=MockX509Store,
 | |
|     )
 | |
|     assert len(cache.crl_cache.keys()) == 0
 | |
| 
 | |
| 
 | |
| def test_can_validate_certificate():
 | |
|     cache = CRLCache(
 | |
|         "ssl/server-certs/ca-chain.pem",
 | |
|         crl_locations=["ssl/client-certs/client-ca.der.crl"],
 | |
|     )
 | |
|     good_cert = open("ssl/client-certs/atat.mil.crt", "rb").read()
 | |
|     bad_cert = open("ssl/client-certs/bad-atat.mil.crt", "rb").read()
 | |
|     assert cache.crl_check(good_cert)
 | |
|     with pytest.raises(CRLRevocationException):
 | |
|         cache.crl_check(bad_cert)
 | |
| 
 | |
| 
 | |
| def test_can_dynamically_update_crls(tmpdir):
 | |
|     crl_file = tmpdir.join("test.crl")
 | |
|     shutil.copyfile("ssl/client-certs/client-ca.der.crl", crl_file)
 | |
|     cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[crl_file])
 | |
|     cert = open("ssl/client-certs/atat.mil.crt", "rb").read()
 | |
|     assert cache.crl_check(cert)
 | |
|     # override the original CRL with one that revokes atat.mil.crt
 | |
|     shutil.copyfile("tests/fixtures/test.der.crl", crl_file)
 | |
|     with pytest.raises(CRLRevocationException):
 | |
|         assert cache.crl_check(cert)
 | |
| 
 | |
| 
 | |
| def test_throws_error_for_missing_issuer():
 | |
|     cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[])
 | |
|     # this cert is self-signed, and so the application does not have a
 | |
|     # corresponding CRL for it
 | |
|     cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read()
 | |
|     with pytest.raises(CRLRevocationException) as exc:
 | |
|         assert cache.crl_check(cert)
 | |
|     (message,) = exc.value.args
 | |
|     # objects that the issuer is missing
 | |
|     assert "issuer" in message
 | |
|     # names the issuer we were expecting to find a CRL for; same as the
 | |
|     # certificate subject in this case because the cert is self-signed
 | |
|     assert DOD_CN in message
 | |
| 
 | |
| 
 | |
| def test_multistep_certificate_chain():
 | |
|     cache = CRLCache(
 | |
|         "tests/fixtures/chain/ca-chain.pem",
 | |
|         crl_locations=["tests/fixtures/chain/intermediate.crl"],
 | |
|     )
 | |
|     cert = open("tests/fixtures/chain/client.crt", "rb").read()
 | |
|     assert cache.crl_check(cert)
 | |
| 
 | |
| 
 | |
| def test_parse_disa_pki_list():
 | |
|     with open("tests/fixtures/disa-pki.html") as disa:
 | |
|         disa_html = disa.read()
 | |
|         crl_list = util.crl_list_from_disa_html(disa_html)
 | |
|         href_matches = re.findall("DOD(ROOT|EMAIL|ID)?CA", disa_html)
 | |
|         assert len(crl_list) > 0
 | |
|         assert len(crl_list) == len(href_matches)
 | |
| 
 | |
| 
 | |
| class MockStreamingResponse:
 | |
|     def __init__(self, content_chunks, code=200):
 | |
|         self.content_chunks = content_chunks
 | |
|         self.status_code = code
 | |
| 
 | |
|     def iter_content(self, chunk_size=0):
 | |
|         return self.content_chunks
 | |
| 
 | |
|     def __enter__(self):
 | |
|         return self
 | |
| 
 | |
|     def __exit__(self, *args):
 | |
|         pass
 | |
| 
 | |
| 
 | |
| def test_write_crl(tmpdir, monkeypatch):
 | |
|     monkeypatch.setattr(
 | |
|         "requests.get", lambda u, **kwargs: MockStreamingResponse([b"it worked"])
 | |
|     )
 | |
|     crl = "crl_1"
 | |
|     assert util.write_crl(tmpdir, "random_target_dir", crl)
 | |
|     assert [p.basename for p in tmpdir.listdir()] == [crl]
 | |
|     assert [p.read() for p in tmpdir.listdir()] == ["it worked"]
 | |
| 
 | |
| 
 | |
| def test_skips_crl_if_it_has_not_been_modified(tmpdir, monkeypatch):
 | |
|     monkeypatch.setattr(
 | |
|         "requests.get", lambda u, **kwargs: MockStreamingResponse([b"it worked"], 304)
 | |
|     )
 | |
|     assert not util.write_crl(tmpdir, "random_target_dir", "crl_file_name")
 | |
| 
 | |
| 
 | |
| class FakeLogger:
 | |
|     def __init__(self):
 | |
|         self.messages = []
 | |
| 
 | |
|     def info(self, msg):
 | |
|         self.messages.append(msg)
 | |
| 
 | |
|     def warning(self, msg):
 | |
|         self.messages.append(msg)
 | |
| 
 | |
|     def error(self, msg):
 | |
|         self.messages.append(msg)
 | |
| 
 | |
| 
 | |
| def test_refresh_crls_with_error(tmpdir, monkeypatch):
 | |
|     def _mock_create_connection(*args, **kwargs):
 | |
|         raise TimeoutError
 | |
| 
 | |
|     fake_crl = "https://fakecrl.com/fake.crl"
 | |
| 
 | |
|     monkeypatch.setattr(
 | |
|         "urllib3.util.connection.create_connection", _mock_create_connection
 | |
|     )
 | |
|     monkeypatch.setattr("atst.domain.authnid.crl.util.fetch_disa", lambda *args: None)
 | |
|     monkeypatch.setattr(
 | |
|         "atst.domain.authnid.crl.util.crl_list_from_disa_html", lambda *args: [fake_crl]
 | |
|     )
 | |
| 
 | |
|     logger = FakeLogger()
 | |
|     util.refresh_crls(tmpdir, tmpdir, logger)
 | |
| 
 | |
|     assert "Error downloading {}".format(fake_crl) in logger.messages[-1]
 | |
| 
 | |
| 
 | |
| def test_no_op_crl_cache_logs_common_name():
 | |
|     logger = FakeLogger()
 | |
|     cert = open("ssl/client-certs/atat.mil.crt", "rb").read()
 | |
|     cache = NoOpCRLCache(logger=logger)
 | |
|     assert cache.crl_check(cert)
 | |
|     assert "ART.GARFUNKEL.1234567890" in logger.messages[-1]
 |