Merge branch 'master' into ui/input-field-frontend-validation
This commit is contained in:
@@ -0,0 +1,62 @@
|
||||
from atst.domain.exceptions import UnauthenticatedError, NotFoundError
|
||||
from atst.domain.users import Users
|
||||
from .utils import parse_sdn, email_from_certificate
|
||||
|
||||
|
||||
class AuthenticationContext():
|
||||
|
||||
def __init__(self, crl_validator, auth_status, sdn, cert):
|
||||
if None in locals().values():
|
||||
raise UnauthenticatedError(
|
||||
"Missing required authentication context components"
|
||||
)
|
||||
|
||||
self.crl_validator = crl_validator
|
||||
self.auth_status = auth_status
|
||||
self.sdn = sdn
|
||||
self.cert = cert.encode()
|
||||
self._parsed_sdn = None
|
||||
|
||||
def authenticate(self):
|
||||
if not self.auth_status == "SUCCESS":
|
||||
raise UnauthenticatedError("SSL/TLS client authentication failed")
|
||||
|
||||
elif not self._crl_check():
|
||||
raise UnauthenticatedError("Client certificate failed CRL check")
|
||||
|
||||
return True
|
||||
|
||||
def get_user(self):
|
||||
try:
|
||||
return Users.get_by_dod_id(self.parsed_sdn["dod_id"])
|
||||
|
||||
except NotFoundError:
|
||||
email = self._get_user_email()
|
||||
return Users.create(**{"email": email, **self.parsed_sdn})
|
||||
|
||||
def _get_user_email(self):
|
||||
try:
|
||||
return email_from_certificate(self.cert)
|
||||
|
||||
# this just means it is not an email certificate; we might choose to
|
||||
# log in that case
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def _crl_check(self):
|
||||
if self.cert:
|
||||
result = self.crl_validator.validate(self.cert)
|
||||
return result
|
||||
|
||||
else:
|
||||
return False
|
||||
|
||||
@property
|
||||
def parsed_sdn(self):
|
||||
if not self._parsed_sdn:
|
||||
try:
|
||||
self._parsed_sdn = parse_sdn(self.sdn)
|
||||
except ValueError as exc:
|
||||
raise UnauthenticatedError(str(exc))
|
||||
|
||||
return self._parsed_sdn
|
||||
|
@@ -20,11 +20,11 @@ class Validator:
|
||||
re.DOTALL,
|
||||
)
|
||||
|
||||
def __init__(self, crl_locations=[], roots=[], base_store=crypto.X509Store):
|
||||
self.errors = []
|
||||
def __init__(self, crl_locations=[], roots=[], base_store=crypto.X509Store, logger=None):
|
||||
self.crl_locations = crl_locations
|
||||
self.roots = roots
|
||||
self.base_store = base_store
|
||||
self.logger = logger
|
||||
self._reset()
|
||||
|
||||
def _reset(self):
|
||||
@@ -34,12 +34,16 @@ class Validator:
|
||||
self._add_roots(self.roots)
|
||||
self.store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
|
||||
|
||||
def log_error(self, message):
|
||||
if self.logger:
|
||||
self.logger.error(message)
|
||||
|
||||
def _add_crls(self, locations):
|
||||
for filename in locations:
|
||||
try:
|
||||
self._add_crl(filename)
|
||||
except crypto.Error as err:
|
||||
self.errors.append(
|
||||
self.log_error(
|
||||
"CRL could not be parsed. Filename: {}, Error: {}, args: {}".format(
|
||||
filename, type(err), err.args
|
||||
)
|
||||
@@ -116,7 +120,7 @@ class Validator:
|
||||
return True
|
||||
|
||||
except crypto.X509StoreContextError as err:
|
||||
self.errors.append(
|
||||
self.log_error(
|
||||
"Certificate revoked or errored. Error: {}. Args: {}".format(
|
||||
type(err), err.args
|
||||
)
|
@@ -56,7 +56,6 @@ def refresh_crls(out_dir, logger=None):
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
logging.basicConfig(
|
||||
|
@@ -1,7 +1,9 @@
|
||||
import re
|
||||
|
||||
import cryptography.x509 as x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
|
||||
# TODO: our sample SDN does not have an email address
|
||||
def parse_sdn(sdn):
|
||||
try:
|
||||
parts = sdn.split(",")
|
||||
@@ -9,5 +11,21 @@ def parse_sdn(sdn):
|
||||
cn = cn_string.split("=")[-1]
|
||||
info = cn.split(".")
|
||||
return {"last_name": info[0], "first_name": info[1], "dod_id": info[-1]}
|
||||
|
||||
except (IndexError, AttributeError):
|
||||
raise ValueError("'{}' is not a valid SDN".format(sdn))
|
||||
|
||||
|
||||
def email_from_certificate(cert_file):
|
||||
cert = x509.load_pem_x509_certificate(cert_file, default_backend())
|
||||
try:
|
||||
ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
|
||||
email = ext.value.get_values_for_type(x509.RFC822Name)
|
||||
if email:
|
||||
return email[0]
|
||||
|
||||
else:
|
||||
raise ValueError("No email available for certificate with serial {}".format(cert.serial_number))
|
||||
|
||||
except x509.extensions.ExtensionNotFound:
|
||||
raise ValueError("No subjectAltName available for certificate with serial {}".format(cert.serial_number))
|
||||
|
Reference in New Issue
Block a user