commit
79feb16072
@ -142,6 +142,7 @@ def map_config(config):
|
|||||||
"RQ_REDIS_URL": config["default"]["REDIS_URI"],
|
"RQ_REDIS_URL": config["default"]["REDIS_URI"],
|
||||||
"RQ_QUEUES": [config["default"]["RQ_QUEUES"]],
|
"RQ_QUEUES": [config["default"]["RQ_QUEUES"]],
|
||||||
"DISABLE_CRL_CHECK": config.getboolean("default", "DISABLE_CRL_CHECK"),
|
"DISABLE_CRL_CHECK": config.getboolean("default", "DISABLE_CRL_CHECK"),
|
||||||
|
"CRL_FAIL_OPEN": config.getboolean("default", "CRL_FAIL_OPEN"),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
from atst.domain.exceptions import UnauthenticatedError, NotFoundError
|
from atst.domain.exceptions import UnauthenticatedError, NotFoundError
|
||||||
from atst.domain.users import Users
|
from atst.domain.users import Users
|
||||||
from .utils import parse_sdn, email_from_certificate
|
from .utils import parse_sdn, email_from_certificate
|
||||||
from .crl import CRLRevocationException
|
from .crl import CRLRevocationException, CRLInvalidException
|
||||||
|
|
||||||
|
|
||||||
class AuthenticationContext:
|
class AuthenticationContext:
|
||||||
|
@ -2,9 +2,13 @@ import sys
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import hashlib
|
import hashlib
|
||||||
|
from flask import current_app as app
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from OpenSSL import crypto, SSL
|
from OpenSSL import crypto, SSL
|
||||||
|
|
||||||
|
# error codes from OpenSSL: https://github.com/openssl/openssl/blob/2c75f03b39de2fa7d006bc0f0d7c58235a54d9bb/include/openssl/x509_vfy.h#L111
|
||||||
|
CRL_EXPIRED_ERROR_CODE = 12
|
||||||
|
|
||||||
|
|
||||||
def get_common_name(x509_name_object):
|
def get_common_name(x509_name_object):
|
||||||
for comp in x509_name_object.get_components():
|
for comp in x509_name_object.get_components():
|
||||||
@ -16,6 +20,12 @@ class CRLRevocationException(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CRLInvalidException(Exception):
|
||||||
|
# CRL expired
|
||||||
|
# CRL missing
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class CRLInterface:
|
class CRLInterface:
|
||||||
def __init__(self, *args, logger=None, **kwargs):
|
def __init__(self, *args, logger=None, **kwargs):
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
@ -111,7 +121,7 @@ class CRLCache(CRLInterface):
|
|||||||
issuer_name = get_common_name(issuer)
|
issuer_name = get_common_name(issuer)
|
||||||
|
|
||||||
if not crl_info:
|
if not crl_info:
|
||||||
raise CRLRevocationException(
|
raise CRLInvalidException(
|
||||||
"Could not find matching CRL for issuer with Common Name {}".format(
|
"Could not find matching CRL for issuer with Common Name {}".format(
|
||||||
issuer_name
|
issuer_name
|
||||||
)
|
)
|
||||||
@ -170,6 +180,16 @@ class CRLCache(CRLInterface):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
except crypto.X509StoreContextError as err:
|
except crypto.X509StoreContextError as err:
|
||||||
|
if err.args[0][0] == CRL_EXPIRED_ERROR_CODE:
|
||||||
|
if app.config.get("CRL_FAIL_OPEN"):
|
||||||
|
self._log_info(
|
||||||
|
"Encountered expired CRL for certificate with CN {} and issuer CN {}, failing open.".format(
|
||||||
|
parsed.get_subject().CN, parsed.get_issuer().CN
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
raise CRLInvalidException("CRL expired. Args: {}".format(err.args))
|
||||||
raise CRLRevocationException(
|
raise CRLRevocationException(
|
||||||
"Certificate revoked or errored. Error: {}. Args: {}".format(
|
"Certificate revoked or errored. Error: {}. Args: {}".format(
|
||||||
type(err), err.args
|
type(err), err.args
|
||||||
|
@ -8,6 +8,7 @@ from atst.domain.invitations import (
|
|||||||
ExpiredError as InvitationExpiredError,
|
ExpiredError as InvitationExpiredError,
|
||||||
WrongUserError as InvitationWrongUserError,
|
WrongUserError as InvitationWrongUserError,
|
||||||
)
|
)
|
||||||
|
from atst.domain.authnid.crl import CRLInvalidException
|
||||||
from atst.domain.portfolios import PortfolioError
|
from atst.domain.portfolios import PortfolioError
|
||||||
from atst.utils.flash import formatted_flash as flash
|
from atst.utils.flash import formatted_flash as flash
|
||||||
|
|
||||||
@ -32,6 +33,11 @@ def make_error_pages(app):
|
|||||||
def not_found(e):
|
def not_found(e):
|
||||||
return handle_error(e)
|
return handle_error(e)
|
||||||
|
|
||||||
|
@app.errorhandler(CRLInvalidException)
|
||||||
|
# pylint: disable=unused-variable
|
||||||
|
def missing_crl(e):
|
||||||
|
return handle_error(e, message="Error Code 008", code=401)
|
||||||
|
|
||||||
@app.errorhandler(exceptions.UnauthenticatedError)
|
@app.errorhandler(exceptions.UnauthenticatedError)
|
||||||
# pylint: disable=unused-variable
|
# pylint: disable=unused-variable
|
||||||
def unauthorized(e):
|
def unauthorized(e):
|
||||||
|
@ -7,6 +7,7 @@ CRL_STORAGE_CONTAINER = crls
|
|||||||
CRL_STORAGE_PROVIDER = LOCAL
|
CRL_STORAGE_PROVIDER = LOCAL
|
||||||
CRL_STORAGE_REGION = iad
|
CRL_STORAGE_REGION = iad
|
||||||
DISABLE_CRL_CHECK = false
|
DISABLE_CRL_CHECK = false
|
||||||
|
CRL_FAIL_OPEN = false
|
||||||
DEBUG = true
|
DEBUG = true
|
||||||
ENVIRONMENT = dev
|
ENVIRONMENT = dev
|
||||||
PERMANENT_SESSION_LIFETIME = 600
|
PERMANENT_SESSION_LIFETIME = 600
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
import os
|
import os
|
||||||
import datetime
|
|
||||||
import pytest
|
import pytest
|
||||||
import alembic.config
|
import alembic.config
|
||||||
import alembic.command
|
import alembic.command
|
||||||
@ -14,6 +13,15 @@ from atst.queue import queue as atst_queue
|
|||||||
import tests.factories as factories
|
import tests.factories as factories
|
||||||
from tests.mocks import PDF_FILENAME, PDF_FILENAME2
|
from tests.mocks import PDF_FILENAME, PDF_FILENAME2
|
||||||
|
|
||||||
|
from datetime import datetime, timezone, timedelta
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||||
|
from cryptography import x509
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography.hazmat.primitives.serialization import Encoding
|
||||||
|
from cryptography.x509.oid import NameOID
|
||||||
|
|
||||||
|
|
||||||
dictConfig({"version": 1, "handlers": {"wsgi": {"class": "logging.NullHandler"}}})
|
dictConfig({"version": 1, "handlers": {"wsgi": {"class": "logging.NullHandler"}}})
|
||||||
|
|
||||||
|
|
||||||
@ -153,3 +161,138 @@ def extended_financial_verification_data(pdf_upload):
|
|||||||
def queue():
|
def queue():
|
||||||
yield atst_queue
|
yield atst_queue
|
||||||
atst_queue.get_queue().empty()
|
atst_queue.get_queue().empty()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def crl_failover_open_app(app):
|
||||||
|
app.config.update({"CRL_FAIL_OPEN": True})
|
||||||
|
yield app
|
||||||
|
app.config.update({"CRL_FAIL_OPEN": False})
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def rsa_key():
|
||||||
|
def _rsa_key():
|
||||||
|
return rsa.generate_private_key(
|
||||||
|
public_exponent=65537, key_size=2048, backend=default_backend()
|
||||||
|
)
|
||||||
|
|
||||||
|
return _rsa_key
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def ca_key(rsa_key):
|
||||||
|
return rsa_key()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def make_x509():
|
||||||
|
def _make_x509(private_key, signer_key=None, cn="ATAT", signer_cn="ATAT"):
|
||||||
|
if signer_key is None:
|
||||||
|
signer_key = private_key
|
||||||
|
|
||||||
|
one_day = timedelta(1, 0, 0)
|
||||||
|
public_key = private_key.public_key()
|
||||||
|
builder = x509.CertificateBuilder()
|
||||||
|
builder = builder.subject_name(
|
||||||
|
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])
|
||||||
|
)
|
||||||
|
builder = builder.issuer_name(
|
||||||
|
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, signer_cn)])
|
||||||
|
)
|
||||||
|
if signer_key == private_key:
|
||||||
|
builder = builder.add_extension(
|
||||||
|
x509.BasicConstraints(ca=True, path_length=None), critical=True
|
||||||
|
)
|
||||||
|
builder = builder.not_valid_before(datetime.today() - (one_day * 2))
|
||||||
|
builder = builder.not_valid_after(datetime.today() + (one_day * 30))
|
||||||
|
builder = builder.serial_number(x509.random_serial_number())
|
||||||
|
builder = builder.public_key(public_key)
|
||||||
|
certificate = builder.sign(
|
||||||
|
private_key=signer_key, algorithm=hashes.SHA256(), backend=default_backend()
|
||||||
|
)
|
||||||
|
|
||||||
|
return certificate
|
||||||
|
|
||||||
|
return _make_x509
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def make_crl():
|
||||||
|
def _make_crl(
|
||||||
|
private_key,
|
||||||
|
last_update_days=-1,
|
||||||
|
next_update_days=30,
|
||||||
|
cn="ATAT",
|
||||||
|
expired_serials=None,
|
||||||
|
):
|
||||||
|
one_day = timedelta(1, 0, 0)
|
||||||
|
builder = x509.CertificateRevocationListBuilder()
|
||||||
|
builder = builder.issuer_name(
|
||||||
|
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])
|
||||||
|
)
|
||||||
|
last_update = datetime.today() + (one_day * last_update_days)
|
||||||
|
next_update = datetime.today() + (one_day * next_update_days)
|
||||||
|
builder = builder.last_update(last_update)
|
||||||
|
builder = builder.next_update(next_update)
|
||||||
|
if expired_serials:
|
||||||
|
for serial in expired_serials:
|
||||||
|
builder = add_revoked_cert(builder, serial, last_update)
|
||||||
|
|
||||||
|
crl = builder.sign(
|
||||||
|
private_key=private_key,
|
||||||
|
algorithm=hashes.SHA256(),
|
||||||
|
backend=default_backend(),
|
||||||
|
)
|
||||||
|
|
||||||
|
return crl
|
||||||
|
|
||||||
|
return _make_crl
|
||||||
|
|
||||||
|
|
||||||
|
def add_revoked_cert(crl_builder, serial, revocation_date):
|
||||||
|
revoked_cert = (
|
||||||
|
x509.RevokedCertificateBuilder()
|
||||||
|
.serial_number(serial)
|
||||||
|
.revocation_date(revocation_date)
|
||||||
|
.build(default_backend())
|
||||||
|
)
|
||||||
|
return crl_builder.add_revoked_certificate(revoked_cert)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def serialize_pki_object_to_disk():
|
||||||
|
def _serialize_pki_object_to_disk(obj, name, encoding=Encoding.PEM):
|
||||||
|
with open(name, "wb") as file_:
|
||||||
|
file_.write(obj.public_bytes(encoding))
|
||||||
|
|
||||||
|
return name
|
||||||
|
|
||||||
|
return _serialize_pki_object_to_disk
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def ca_file(make_x509, ca_key, tmpdir, serialize_pki_object_to_disk):
|
||||||
|
ca = make_x509(ca_key)
|
||||||
|
ca_out = tmpdir.join("atat-ca.crt")
|
||||||
|
serialize_pki_object_to_disk(ca, ca_out)
|
||||||
|
|
||||||
|
return ca_out
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def expired_crl_file(make_crl, ca_key, tmpdir, serialize_pki_object_to_disk):
|
||||||
|
crl = make_crl(ca_key, last_update_days=-7, next_update_days=-1)
|
||||||
|
crl_out = tmpdir.join("atat-expired.crl")
|
||||||
|
serialize_pki_object_to_disk(crl, crl_out, encoding=Encoding.DER)
|
||||||
|
|
||||||
|
return crl_out
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def crl_file(make_crl, ca_key, tmpdir, serialize_pki_object_to_disk):
|
||||||
|
crl = make_crl(ca_key)
|
||||||
|
crl_out = tmpdir.join("atat-valid.crl")
|
||||||
|
serialize_pki_object_to_disk(crl, crl_out, encoding=Encoding.DER)
|
||||||
|
|
||||||
|
return crl_out
|
||||||
|
@ -1,7 +1,11 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from atst.domain.authnid import AuthenticationContext
|
from atst.domain.authnid import AuthenticationContext
|
||||||
from atst.domain.authnid.crl import CRLCache, CRLRevocationException
|
from atst.domain.authnid.crl import (
|
||||||
|
CRLCache,
|
||||||
|
CRLRevocationException,
|
||||||
|
CRLInvalidException,
|
||||||
|
)
|
||||||
from atst.domain.exceptions import UnauthenticatedError, NotFoundError
|
from atst.domain.exceptions import UnauthenticatedError, NotFoundError
|
||||||
from atst.domain.users import Users
|
from atst.domain.users import Users
|
||||||
|
|
||||||
@ -12,12 +16,15 @@ CERT = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read()
|
|||||||
|
|
||||||
|
|
||||||
class MockCRLCache:
|
class MockCRLCache:
|
||||||
def __init__(self, valid=True):
|
def __init__(self, valid=True, expired=False):
|
||||||
self.valid = valid
|
self.valid = valid
|
||||||
|
self.expired = expired
|
||||||
|
|
||||||
def crl_check(self, cert):
|
def crl_check(self, cert):
|
||||||
if self.valid:
|
if self.valid:
|
||||||
return True
|
return True
|
||||||
|
elif self.expired == True:
|
||||||
|
raise CRLInvalidException()
|
||||||
|
|
||||||
raise CRLRevocationException()
|
raise CRLRevocationException()
|
||||||
|
|
||||||
@ -45,6 +52,14 @@ def test_crl_check_fails():
|
|||||||
assert "CRL check" in message
|
assert "CRL check" in message
|
||||||
|
|
||||||
|
|
||||||
|
def test_expired_crl_check_fails():
|
||||||
|
auth_context = AuthenticationContext(
|
||||||
|
MockCRLCache(valid=False, expired=True), "SUCCESS", DOD_SDN, CERT
|
||||||
|
)
|
||||||
|
with pytest.raises(CRLInvalidException) as excinfo:
|
||||||
|
assert auth_context.authenticate()
|
||||||
|
|
||||||
|
|
||||||
def test_bad_sdn():
|
def test_bad_sdn():
|
||||||
auth_context = AuthenticationContext(MockCRLCache(), "SUCCESS", "abc123", CERT)
|
auth_context = AuthenticationContext(MockCRLCache(), "SUCCESS", "abc123", CERT)
|
||||||
with pytest.raises(UnauthenticatedError) as excinfo:
|
with pytest.raises(UnauthenticatedError) as excinfo:
|
||||||
|
@ -3,118 +3,19 @@ import pytest
|
|||||||
import re
|
import re
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
from datetime import datetime, timezone, timedelta
|
|
||||||
from OpenSSL import crypto, SSL
|
|
||||||
from cryptography import x509
|
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
from cryptography.hazmat.primitives import hashes
|
|
||||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
||||||
from cryptography.hazmat.primitives.serialization import Encoding
|
from cryptography.hazmat.primitives.serialization import Encoding
|
||||||
from cryptography.x509.oid import NameOID
|
|
||||||
|
|
||||||
from atst.domain.authnid.crl import CRLCache, CRLRevocationException, NoOpCRLCache
|
from atst.domain.authnid.crl import (
|
||||||
|
CRLCache,
|
||||||
|
CRLRevocationException,
|
||||||
|
CRLInvalidException,
|
||||||
|
NoOpCRLCache,
|
||||||
|
)
|
||||||
|
|
||||||
from tests.mocks import FIXTURE_EMAIL_ADDRESS, DOD_CN
|
from tests.mocks import FIXTURE_EMAIL_ADDRESS, DOD_CN
|
||||||
|
|
||||||
|
|
||||||
def rsa_key():
|
|
||||||
return rsa.generate_private_key(
|
|
||||||
public_exponent=65537, key_size=2048, backend=default_backend()
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def ca_key():
|
|
||||||
return rsa_key()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def make_x509():
|
|
||||||
def _make_x509(private_key, signer_key=None, cn="ATAT", signer_cn="ATAT"):
|
|
||||||
if signer_key is None:
|
|
||||||
signer_key = private_key
|
|
||||||
|
|
||||||
one_day = timedelta(1, 0, 0)
|
|
||||||
public_key = private_key.public_key()
|
|
||||||
builder = x509.CertificateBuilder()
|
|
||||||
builder = builder.subject_name(
|
|
||||||
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])
|
|
||||||
)
|
|
||||||
builder = builder.issuer_name(
|
|
||||||
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, signer_cn)])
|
|
||||||
)
|
|
||||||
if signer_key == private_key:
|
|
||||||
builder = builder.add_extension(
|
|
||||||
x509.BasicConstraints(ca=True, path_length=None), critical=True
|
|
||||||
)
|
|
||||||
builder = builder.not_valid_before(datetime.today() - (one_day * 2))
|
|
||||||
builder = builder.not_valid_after(datetime.today() + (one_day * 30))
|
|
||||||
builder = builder.serial_number(x509.random_serial_number())
|
|
||||||
builder = builder.public_key(public_key)
|
|
||||||
certificate = builder.sign(
|
|
||||||
private_key=signer_key, algorithm=hashes.SHA256(), backend=default_backend()
|
|
||||||
)
|
|
||||||
|
|
||||||
return certificate
|
|
||||||
|
|
||||||
return _make_x509
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def make_crl():
|
|
||||||
def _make_crl(private_key, last_update_days=-1, next_update_days=30, cn="ATAT"):
|
|
||||||
one_day = timedelta(1, 0, 0)
|
|
||||||
builder = x509.CertificateRevocationListBuilder()
|
|
||||||
builder = builder.issuer_name(
|
|
||||||
x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])
|
|
||||||
)
|
|
||||||
builder = builder.last_update(datetime.today() + (one_day * last_update_days))
|
|
||||||
builder = builder.next_update(datetime.today() + (one_day * next_update_days))
|
|
||||||
crl = builder.sign(
|
|
||||||
private_key=private_key,
|
|
||||||
algorithm=hashes.SHA256(),
|
|
||||||
backend=default_backend(),
|
|
||||||
)
|
|
||||||
|
|
||||||
return crl
|
|
||||||
|
|
||||||
return _make_crl
|
|
||||||
|
|
||||||
|
|
||||||
def serialize_pki_object_to_disk(obj, name, encoding=Encoding.PEM):
|
|
||||||
with open(name, "wb") as file_:
|
|
||||||
file_.write(obj.public_bytes(encoding))
|
|
||||||
|
|
||||||
return name
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def ca_file(make_x509, ca_key, tmpdir):
|
|
||||||
ca = make_x509(ca_key)
|
|
||||||
ca_out = tmpdir.join("atat-ca.crt")
|
|
||||||
serialize_pki_object_to_disk(ca, ca_out)
|
|
||||||
|
|
||||||
return ca_out
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def expired_crl_file(make_crl, ca_key, tmpdir):
|
|
||||||
crl = make_crl(ca_key, last_update_days=-7, next_update_days=-1)
|
|
||||||
crl_out = tmpdir.join("atat-expired.crl")
|
|
||||||
serialize_pki_object_to_disk(crl, crl_out, encoding=Encoding.DER)
|
|
||||||
|
|
||||||
return crl_out
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def crl_file(make_crl, ca_key, tmpdir):
|
|
||||||
crl = make_crl(ca_key)
|
|
||||||
crl_out = tmpdir.join("atat-valid.crl")
|
|
||||||
serialize_pki_object_to_disk(crl, crl_out, encoding=Encoding.DER)
|
|
||||||
|
|
||||||
return crl_out
|
|
||||||
|
|
||||||
|
|
||||||
class MockX509Store:
|
class MockX509Store:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.crls = []
|
self.crls = []
|
||||||
@ -130,11 +31,8 @@ class MockX509Store:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def test_can_build_crl_list(ca_file, ca_key, make_crl, tmpdir):
|
def test_can_build_crl_list(crl_file, ca_key, ca_file, make_crl, tmpdir):
|
||||||
crl = make_crl(ca_key)
|
crl = make_crl(ca_key)
|
||||||
crl_file = tmpdir.join("atat.crl")
|
|
||||||
serialize_pki_object_to_disk(crl, crl_file, encoding=Encoding.DER)
|
|
||||||
|
|
||||||
cache = CRLCache(ca_file, crl_locations=[crl_file], store_class=MockX509Store)
|
cache = CRLCache(ca_file, crl_locations=[crl_file], store_class=MockX509Store)
|
||||||
issuer_der = crl.issuer.public_bytes(default_backend())
|
issuer_der = crl.issuer.public_bytes(default_backend())
|
||||||
assert len(cache.crl_cache.keys()) == 1
|
assert len(cache.crl_cache.keys()) == 1
|
||||||
@ -175,16 +73,26 @@ def test_can_validate_certificate():
|
|||||||
cache.crl_check(bad_cert)
|
cache.crl_check(bad_cert)
|
||||||
|
|
||||||
|
|
||||||
def test_can_dynamically_update_crls(tmpdir):
|
def test_can_dynamically_update_crls(
|
||||||
crl_file = tmpdir.join("test.crl")
|
ca_key,
|
||||||
shutil.copyfile("ssl/client-certs/client-ca.der.crl", crl_file)
|
ca_file,
|
||||||
cache = CRLCache("ssl/server-certs/ca-chain.pem", crl_locations=[crl_file])
|
crl_file,
|
||||||
cert = open("ssl/client-certs/atat.mil.crt", "rb").read()
|
rsa_key,
|
||||||
assert cache.crl_check(cert)
|
make_x509,
|
||||||
# override the original CRL with one that revokes atat.mil.crt
|
make_crl,
|
||||||
shutil.copyfile("tests/fixtures/test.der.crl", crl_file)
|
serialize_pki_object_to_disk,
|
||||||
|
):
|
||||||
|
cache = CRLCache(ca_file, crl_locations=[crl_file])
|
||||||
|
client_cert = make_x509(rsa_key(), signer_key=ca_key, cn="chewbacca")
|
||||||
|
client_pem = client_cert.public_bytes(Encoding.PEM)
|
||||||
|
assert cache.crl_check(client_pem)
|
||||||
|
|
||||||
|
revoked_crl = make_crl(ca_key, expired_serials=[client_cert.serial_number])
|
||||||
|
# override the original CRL with one that revokes client_cert
|
||||||
|
serialize_pki_object_to_disk(revoked_crl, crl_file, encoding=Encoding.DER)
|
||||||
|
|
||||||
with pytest.raises(CRLRevocationException):
|
with pytest.raises(CRLRevocationException):
|
||||||
assert cache.crl_check(cert)
|
assert cache.crl_check(client_pem)
|
||||||
|
|
||||||
|
|
||||||
def test_throws_error_for_missing_issuer():
|
def test_throws_error_for_missing_issuer():
|
||||||
@ -192,7 +100,7 @@ def test_throws_error_for_missing_issuer():
|
|||||||
# this cert is self-signed, and so the application does not have a
|
# this cert is self-signed, and so the application does not have a
|
||||||
# corresponding CRL for it
|
# corresponding CRL for it
|
||||||
cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read()
|
cert = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS), "rb").read()
|
||||||
with pytest.raises(CRLRevocationException) as exc:
|
with pytest.raises(CRLInvalidException) as exc:
|
||||||
assert cache.crl_check(cert)
|
assert cache.crl_check(cert)
|
||||||
(message,) = exc.value.args
|
(message,) = exc.value.args
|
||||||
# objects that the issuer is missing
|
# objects that the issuer is missing
|
||||||
@ -233,7 +141,29 @@ def test_no_op_crl_cache_logs_common_name():
|
|||||||
assert "ART.GARFUNKEL.1234567890" in logger.messages[-1]
|
assert "ART.GARFUNKEL.1234567890" in logger.messages[-1]
|
||||||
|
|
||||||
|
|
||||||
def test_updates_expired_certs(ca_file, expired_crl_file, crl_file, ca_key, make_x509):
|
def test_expired_crl_raises_CRLInvalidException_with_failover_config_false(
|
||||||
|
app, ca_file, expired_crl_file, ca_key, make_x509, rsa_key
|
||||||
|
):
|
||||||
|
client_cert = make_x509(rsa_key(), signer_key=ca_key, cn="chewbacca")
|
||||||
|
client_pem = client_cert.public_bytes(Encoding.PEM)
|
||||||
|
crl_cache = CRLCache(ca_file, crl_locations=[expired_crl_file])
|
||||||
|
with pytest.raises(CRLInvalidException):
|
||||||
|
crl_cache.crl_check(client_pem)
|
||||||
|
|
||||||
|
|
||||||
|
def test_expired_crl_passes_with_failover_config_true(
|
||||||
|
ca_file, expired_crl_file, ca_key, make_x509, rsa_key, crl_failover_open_app
|
||||||
|
):
|
||||||
|
client_cert = make_x509(rsa_key(), signer_key=ca_key, cn="chewbacca")
|
||||||
|
client_pem = client_cert.public_bytes(Encoding.PEM)
|
||||||
|
crl_cache = CRLCache(ca_file, crl_locations=[expired_crl_file])
|
||||||
|
|
||||||
|
assert crl_cache.crl_check(client_pem)
|
||||||
|
|
||||||
|
|
||||||
|
def test_updates_expired_certs(
|
||||||
|
rsa_key, ca_file, expired_crl_file, crl_file, ca_key, make_x509
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Given a CRLCache object with an expired CRL and a function for updating the
|
Given a CRLCache object with an expired CRL and a function for updating the
|
||||||
CRLs, the CRLCache should run the update function before checking a
|
CRLs, the CRLCache should run the update function before checking a
|
||||||
|
@ -6,6 +6,7 @@ from .mocks import DOD_SDN_INFO, DOD_SDN, FIXTURE_EMAIL_ADDRESS
|
|||||||
from atst.domain.users import Users
|
from atst.domain.users import Users
|
||||||
from atst.domain.roles import Roles
|
from atst.domain.roles import Roles
|
||||||
from atst.domain.exceptions import NotFoundError
|
from atst.domain.exceptions import NotFoundError
|
||||||
|
from atst.domain.authnid.crl import CRLInvalidException
|
||||||
from atst.domain.auth import UNPROTECTED_ROUTES
|
from atst.domain.auth import UNPROTECTED_ROUTES
|
||||||
from .factories import UserFactory
|
from .factories import UserFactory
|
||||||
|
|
||||||
@ -211,3 +212,15 @@ def test_redirected_on_login(client, monkeypatch):
|
|||||||
target_route = url_for("users.user")
|
target_route = url_for("users.user")
|
||||||
response = _login(client, next=target_route)
|
response = _login(client, next=target_route)
|
||||||
assert target_route in response.headers.get("Location")
|
assert target_route in response.headers.get("Location")
|
||||||
|
|
||||||
|
|
||||||
|
def test_error_on_invalid_crl(client, monkeypatch):
|
||||||
|
def _raise_crl_error(*args):
|
||||||
|
raise CRLInvalidException()
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"atst.domain.authnid.AuthenticationContext.authenticate", _raise_crl_error
|
||||||
|
)
|
||||||
|
response = _login(client)
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert "Error Code 008" in response.data.decode()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user