build individual x509 stores for each CRL
This commit is contained in:
@@ -10,25 +10,23 @@ from tests.factories import UserFactory
|
||||
CERT = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read()
|
||||
|
||||
|
||||
class MockCRLValidator():
|
||||
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def validate(self, cert):
|
||||
return self.value
|
||||
class MockCRLCache():
|
||||
def get_store(self, cert):
|
||||
pass
|
||||
|
||||
|
||||
def test_can_authenticate():
|
||||
def test_can_authenticate(monkeypatch):
|
||||
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True)
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLValidator(True), "SUCCESS", DOD_SDN, CERT
|
||||
MockCRLCache(), "SUCCESS", DOD_SDN, CERT
|
||||
)
|
||||
assert auth_context.authenticate()
|
||||
|
||||
|
||||
def test_unsuccessful_status():
|
||||
def test_unsuccessful_status(monkeypatch):
|
||||
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True)
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLValidator(True), "FAILURE", DOD_SDN, CERT
|
||||
MockCRLCache(), "FAILURE", DOD_SDN, CERT
|
||||
)
|
||||
with pytest.raises(UnauthenticatedError) as excinfo:
|
||||
assert auth_context.authenticate()
|
||||
@@ -37,9 +35,10 @@ def test_unsuccessful_status():
|
||||
assert "client authentication" in message
|
||||
|
||||
|
||||
def test_crl_check_fails():
|
||||
def test_crl_check_fails(monkeypatch):
|
||||
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: False)
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLValidator(False), "SUCCESS", DOD_SDN, CERT
|
||||
MockCRLCache(), "SUCCESS", DOD_SDN, CERT
|
||||
)
|
||||
with pytest.raises(UnauthenticatedError) as excinfo:
|
||||
assert auth_context.authenticate()
|
||||
@@ -48,9 +47,10 @@ def test_crl_check_fails():
|
||||
assert "CRL check" in message
|
||||
|
||||
|
||||
def test_bad_sdn():
|
||||
def test_bad_sdn(monkeypatch):
|
||||
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True)
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLValidator(True), "SUCCESS", "abc123", CERT
|
||||
MockCRLCache(), "SUCCESS", "abc123", CERT
|
||||
)
|
||||
with pytest.raises(UnauthenticatedError) as excinfo:
|
||||
auth_context.get_user()
|
||||
@@ -59,33 +59,36 @@ def test_bad_sdn():
|
||||
assert "SDN" in message
|
||||
|
||||
|
||||
def test_user_exists():
|
||||
def test_user_exists(monkeypatch):
|
||||
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True)
|
||||
user = UserFactory.create(**DOD_SDN_INFO)
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLValidator(True), "SUCCESS", DOD_SDN, CERT
|
||||
MockCRLCache(), "SUCCESS", DOD_SDN, CERT
|
||||
)
|
||||
auth_user = auth_context.get_user()
|
||||
|
||||
assert auth_user == user
|
||||
|
||||
|
||||
def test_creates_user():
|
||||
def test_creates_user(monkeypatch):
|
||||
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True)
|
||||
# check user does not exist
|
||||
with pytest.raises(NotFoundError):
|
||||
Users.get_by_dod_id(DOD_SDN_INFO["dod_id"])
|
||||
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLValidator(True), "SUCCESS", DOD_SDN, CERT
|
||||
MockCRLCache(), "SUCCESS", DOD_SDN, CERT
|
||||
)
|
||||
user = auth_context.get_user()
|
||||
assert user.dod_id == DOD_SDN_INFO["dod_id"]
|
||||
assert user.email == FIXTURE_EMAIL_ADDRESS
|
||||
|
||||
|
||||
def test_user_cert_has_no_email():
|
||||
def test_user_cert_has_no_email(monkeypatch):
|
||||
monkeypatch.setattr("atst.domain.authnid.Validator.validate", lambda s: True)
|
||||
cert = open("ssl/client-certs/atat.mil.crt").read()
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLValidator(True), "SUCCESS", DOD_SDN, cert
|
||||
MockCRLCache(), "SUCCESS", DOD_SDN, cert
|
||||
)
|
||||
user = auth_context.get_user()
|
||||
|
||||
|
@@ -4,7 +4,7 @@ import re
|
||||
import os
|
||||
import shutil
|
||||
from OpenSSL import crypto, SSL
|
||||
from atst.domain.authnid.crl import Validator
|
||||
from atst.domain.authnid.crl import Validator, CRLCache
|
||||
import atst.domain.authnid.crl.util as util
|
||||
|
||||
|
||||
@@ -24,38 +24,33 @@ class MockX509Store():
|
||||
|
||||
def test_can_build_crl_list(monkeypatch):
|
||||
location = 'ssl/client-certs/client-ca.der.crl'
|
||||
validator = Validator(crl_locations=[location], base_store=MockX509Store)
|
||||
assert len(validator.store.crls) == 1
|
||||
cache = CRLCache('ssl/client-certs/client-ca.crt', crl_locations=[location], store_class=MockX509Store)
|
||||
for store in cache.x509_stores.values():
|
||||
assert len(store.crls) == 1
|
||||
|
||||
def test_can_build_trusted_root_list():
|
||||
location = 'ssl/server-certs/ca-chain.pem'
|
||||
validator = Validator(roots=[location], base_store=MockX509Store)
|
||||
cache = CRLCache(root_location=location, crl_locations=[], store_class=MockX509Store)
|
||||
with open(location) as f:
|
||||
content = f.read()
|
||||
assert len(validator.store.certs) == content.count('BEGIN CERT')
|
||||
assert len(cache.certificate_authorities.keys()) == content.count('BEGIN CERT')
|
||||
|
||||
def test_can_validate_certificate():
|
||||
validator = Validator(
|
||||
roots=['ssl/server-certs/ca-chain.pem'],
|
||||
crl_locations=['ssl/client-certs/client-ca.der.crl']
|
||||
)
|
||||
cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=['ssl/client-certs/client-ca.der.crl'])
|
||||
good_cert = open('ssl/client-certs/atat.mil.crt', 'rb').read()
|
||||
bad_cert = open('ssl/client-certs/bad-atat.mil.crt', 'rb').read()
|
||||
assert validator.validate(good_cert)
|
||||
assert validator.validate(bad_cert) == False
|
||||
assert Validator(cache, good_cert).validate()
|
||||
assert Validator(cache, bad_cert).validate() == False
|
||||
|
||||
def test_can_dynamically_update_crls(tmpdir):
|
||||
crl_file = tmpdir.join('test.crl')
|
||||
shutil.copyfile('ssl/client-certs/client-ca.der.crl', crl_file)
|
||||
validator = Validator(
|
||||
roots=['ssl/server-certs/ca-chain.pem'],
|
||||
crl_locations=[crl_file]
|
||||
)
|
||||
cache = CRLCache('ssl/server-certs/ca-chain.pem', crl_locations=[crl_file])
|
||||
cert = open('ssl/client-certs/atat.mil.crt', 'rb').read()
|
||||
assert validator.validate(cert)
|
||||
assert Validator(cache, cert).validate()
|
||||
# override the original CRL with one that revokes atat.mil.crt
|
||||
shutil.copyfile('tests/fixtures/test.der.crl', crl_file)
|
||||
assert validator.validate(cert) == False
|
||||
assert Validator(cache, cert).validate() == False
|
||||
|
||||
def test_parse_disa_pki_list():
|
||||
with open('tests/fixtures/disa-pki.html') as disa:
|
||||
|
BIN
tests/fixtures/test.der.crl
vendored
BIN
tests/fixtures/test.der.crl
vendored
Binary file not shown.
Reference in New Issue
Block a user