Merge branch 'master' into ui/tooltips
This commit is contained in:
@@ -16,7 +16,7 @@ from atst.routes.workspaces import bp as workspace_routes
|
||||
from atst.routes.requests import requests_bp
|
||||
from atst.routes.dev import bp as dev_routes
|
||||
from atst.routes.errors import make_error_pages
|
||||
from atst.domain.authnid.crl.validator import Validator
|
||||
from atst.domain.authnid.crl import Validator
|
||||
from atst.domain.auth import apply_authentication
|
||||
|
||||
|
||||
@@ -68,7 +68,7 @@ def make_flask_callbacks(app):
|
||||
)
|
||||
g.dev = os.getenv("FLASK_ENV", "dev") == "dev"
|
||||
g.matchesPath = lambda href: re.match("^" + href, request.path)
|
||||
g.modalOpen = request.args.get("modal", False)
|
||||
g.modal = request.args.get("modal", None)
|
||||
g.current_user = {
|
||||
"id": "cce17030-4109-4719-b958-ed109dbb87c8",
|
||||
"first_name": "Amanda",
|
||||
@@ -142,8 +142,6 @@ def make_crl_validator(app):
|
||||
for filename in pathlib.Path(app.config["CRL_DIRECTORY"]).glob("*"):
|
||||
crl_locations.append(filename.absolute())
|
||||
app.crl_validator = Validator(
|
||||
roots=[app.config["CA_CHAIN"]], crl_locations=crl_locations
|
||||
roots=[app.config["CA_CHAIN"]], crl_locations=crl_locations, logger=app.logger
|
||||
)
|
||||
for e in app.crl_validator.errors:
|
||||
app.logger.error(e)
|
||||
|
||||
|
@@ -0,0 +1,62 @@
|
||||
from atst.domain.exceptions import UnauthenticatedError, NotFoundError
|
||||
from atst.domain.users import Users
|
||||
from .utils import parse_sdn, email_from_certificate
|
||||
|
||||
|
||||
class AuthenticationContext():
|
||||
|
||||
def __init__(self, crl_validator, auth_status, sdn, cert):
|
||||
if None in locals().values():
|
||||
raise UnauthenticatedError(
|
||||
"Missing required authentication context components"
|
||||
)
|
||||
|
||||
self.crl_validator = crl_validator
|
||||
self.auth_status = auth_status
|
||||
self.sdn = sdn
|
||||
self.cert = cert.encode()
|
||||
self._parsed_sdn = None
|
||||
|
||||
def authenticate(self):
|
||||
if not self.auth_status == "SUCCESS":
|
||||
raise UnauthenticatedError("SSL/TLS client authentication failed")
|
||||
|
||||
elif not self._crl_check():
|
||||
raise UnauthenticatedError("Client certificate failed CRL check")
|
||||
|
||||
return True
|
||||
|
||||
def get_user(self):
|
||||
try:
|
||||
return Users.get_by_dod_id(self.parsed_sdn["dod_id"])
|
||||
|
||||
except NotFoundError:
|
||||
email = self._get_user_email()
|
||||
return Users.create(**{"email": email, **self.parsed_sdn})
|
||||
|
||||
def _get_user_email(self):
|
||||
try:
|
||||
return email_from_certificate(self.cert)
|
||||
|
||||
# this just means it is not an email certificate; we might choose to
|
||||
# log in that case
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def _crl_check(self):
|
||||
if self.cert:
|
||||
result = self.crl_validator.validate(self.cert)
|
||||
return result
|
||||
|
||||
else:
|
||||
return False
|
||||
|
||||
@property
|
||||
def parsed_sdn(self):
|
||||
if not self._parsed_sdn:
|
||||
try:
|
||||
self._parsed_sdn = parse_sdn(self.sdn)
|
||||
except ValueError as exc:
|
||||
raise UnauthenticatedError(str(exc))
|
||||
|
||||
return self._parsed_sdn
|
||||
|
@@ -20,11 +20,11 @@ class Validator:
|
||||
re.DOTALL,
|
||||
)
|
||||
|
||||
def __init__(self, crl_locations=[], roots=[], base_store=crypto.X509Store):
|
||||
self.errors = []
|
||||
def __init__(self, crl_locations=[], roots=[], base_store=crypto.X509Store, logger=None):
|
||||
self.crl_locations = crl_locations
|
||||
self.roots = roots
|
||||
self.base_store = base_store
|
||||
self.logger = logger
|
||||
self._reset()
|
||||
|
||||
def _reset(self):
|
||||
@@ -34,12 +34,16 @@ class Validator:
|
||||
self._add_roots(self.roots)
|
||||
self.store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
|
||||
|
||||
def log_error(self, message):
|
||||
if self.logger:
|
||||
self.logger.error(message)
|
||||
|
||||
def _add_crls(self, locations):
|
||||
for filename in locations:
|
||||
try:
|
||||
self._add_crl(filename)
|
||||
except crypto.Error as err:
|
||||
self.errors.append(
|
||||
self.log_error(
|
||||
"CRL could not be parsed. Filename: {}, Error: {}, args: {}".format(
|
||||
filename, type(err), err.args
|
||||
)
|
||||
@@ -116,7 +120,7 @@ class Validator:
|
||||
return True
|
||||
|
||||
except crypto.X509StoreContextError as err:
|
||||
self.errors.append(
|
||||
self.log_error(
|
||||
"Certificate revoked or errored. Error: {}. Args: {}".format(
|
||||
type(err), err.args
|
||||
)
|
@@ -56,7 +56,6 @@ def refresh_crls(out_dir, logger=None):
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
logging.basicConfig(
|
||||
|
@@ -1,7 +1,9 @@
|
||||
import re
|
||||
|
||||
import cryptography.x509 as x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
|
||||
# TODO: our sample SDN does not have an email address
|
||||
def parse_sdn(sdn):
|
||||
try:
|
||||
parts = sdn.split(",")
|
||||
@@ -9,5 +11,21 @@ def parse_sdn(sdn):
|
||||
cn = cn_string.split("=")[-1]
|
||||
info = cn.split(".")
|
||||
return {"last_name": info[0], "first_name": info[1], "dod_id": info[-1]}
|
||||
|
||||
except (IndexError, AttributeError):
|
||||
raise ValueError("'{}' is not a valid SDN".format(sdn))
|
||||
|
||||
|
||||
def email_from_certificate(cert_file):
|
||||
cert = x509.load_pem_x509_certificate(cert_file, default_backend())
|
||||
try:
|
||||
ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
|
||||
email = ext.value.get_values_for_type(x509.RFC822Name)
|
||||
if email:
|
||||
return email[0]
|
||||
|
||||
else:
|
||||
raise ValueError("No email available for certificate with serial {}".format(cert.serial_number))
|
||||
|
||||
except x509.extensions.ExtensionNotFound:
|
||||
raise ValueError("No subjectAltName available for certificate with serial {}".format(cert.serial_number))
|
||||
|
12
atst/domain/date.py
Normal file
12
atst/domain/date.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import pendulum
|
||||
|
||||
|
||||
def parse_date(data):
|
||||
date_formats = ["YYYY-MM-DD", "MM/DD/YYYY"]
|
||||
for _format in date_formats:
|
||||
try:
|
||||
return pendulum.from_format(data, _format).date()
|
||||
except (ValueError, pendulum.parsing.exceptions.ParserError):
|
||||
pass
|
||||
|
||||
raise ValueError("Unable to parse string {}".format(data))
|
@@ -150,3 +150,7 @@ class Requests(object):
|
||||
@classmethod
|
||||
def is_pending_financial_verification(cls, request):
|
||||
return request.status == RequestStatus.PENDING_FINANCIAL_VERIFICATION
|
||||
|
||||
@classmethod
|
||||
def is_pending_ccpo_approval(cls, request):
|
||||
return request.status == RequestStatus.PENDING_CCPO_APPROVAL
|
||||
|
@@ -1,20 +1,14 @@
|
||||
from wtforms.fields.html5 import DateField
|
||||
from wtforms.fields import Field
|
||||
from wtforms.widgets import TextArea
|
||||
import pendulum
|
||||
|
||||
from atst.domain.date import parse_date
|
||||
|
||||
|
||||
class DateField(DateField):
|
||||
def _value(self):
|
||||
if self.data:
|
||||
date_formats = ["YYYY-MM-DD", "MM/DD/YYYY"]
|
||||
for _format in date_formats:
|
||||
try:
|
||||
return pendulum.from_format(self.data, _format).date()
|
||||
except (ValueError, pendulum.parsing.exceptions.ParserError):
|
||||
pass
|
||||
|
||||
raise ValueError("Unable to parse string {}".format(self.data))
|
||||
return parse_date(self.data)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
@@ -1,7 +1,6 @@
|
||||
import re
|
||||
from wtforms.fields.html5 import EmailField
|
||||
from wtforms.fields import StringField, SelectField
|
||||
from wtforms.form import Form
|
||||
from wtforms.validators import Required, Email
|
||||
|
||||
from atst.domain.exceptions import NotFoundError
|
||||
@@ -41,7 +40,7 @@ def suggest_pe_id(pe_id):
|
||||
|
||||
def validate_pe_id(field, existing_request):
|
||||
try:
|
||||
pe_number = PENumbers.get(field.data)
|
||||
PENumbers.get(field.data)
|
||||
except NotFoundError:
|
||||
suggestion = suggest_pe_id(field.data)
|
||||
error_str = (
|
||||
|
@@ -12,9 +12,11 @@ class OrgForm(ValidatedForm):
|
||||
|
||||
lname_request = StringField("Last Name", validators=[Required(), Alphabet()])
|
||||
|
||||
email_request = EmailField("Email Address", validators=[Required(), Email()])
|
||||
email_request = EmailField("E-mail Address", validators=[Required(), Email()])
|
||||
|
||||
phone_number = TelField("Phone Number", validators=[Required(), PhoneNumber()])
|
||||
phone_number = TelField("Phone Number",
|
||||
description='Enter a 10-digit phone number',
|
||||
validators=[Required(), PhoneNumber()])
|
||||
|
||||
service_branch = StringField(
|
||||
"Service Branch or Agency",
|
||||
|
@@ -2,7 +2,7 @@ from wtforms.fields import StringField
|
||||
from wtforms.fields.html5 import EmailField
|
||||
from wtforms.validators import Required, Email, Length
|
||||
from .forms import ValidatedForm
|
||||
from .validators import IsNumber, Alphabet
|
||||
from .validators import IsNumber
|
||||
|
||||
|
||||
class POCForm(ValidatedForm):
|
||||
|
@@ -1,10 +1,7 @@
|
||||
from wtforms.fields.html5 import IntegerField
|
||||
from wtforms.fields import RadioField, StringField, TextAreaField, SelectField
|
||||
from wtforms.validators import NumberRange, InputRequired
|
||||
from wtforms.fields import RadioField, TextAreaField, SelectField
|
||||
from .fields import DateField
|
||||
from .forms import ValidatedForm
|
||||
from .validators import DateRange
|
||||
import pendulum
|
||||
|
||||
|
||||
class RequestForm(ValidatedForm):
|
||||
|
@@ -2,18 +2,19 @@ import re
|
||||
from wtforms.validators import ValidationError
|
||||
import pendulum
|
||||
|
||||
from atst.domain.date import parse_date
|
||||
|
||||
|
||||
def DateRange(lower_bound=None, upper_bound=None, message=None):
|
||||
def _date_range(form, field):
|
||||
now = pendulum.now().date()
|
||||
date = parse_date(field.data)
|
||||
|
||||
if lower_bound is not None:
|
||||
date = pendulum.parse(field.data).date()
|
||||
if (now - lower_bound) > date:
|
||||
raise ValidationError(message)
|
||||
|
||||
if upper_bound is not None:
|
||||
date = pendulum.parse(field.data).date()
|
||||
if (now + upper_bound) < date:
|
||||
raise ValidationError(message)
|
||||
|
||||
|
@@ -4,8 +4,8 @@ import pendulum
|
||||
|
||||
from atst.domain.requests import Requests
|
||||
from atst.domain.users import Users
|
||||
from atst.domain.authnid.utils import parse_sdn
|
||||
from atst.domain.exceptions import UnauthenticatedError
|
||||
from atst.domain.authnid import AuthenticationContext
|
||||
|
||||
|
||||
bp = Blueprint("atst", __name__)
|
||||
|
||||
@@ -30,28 +30,29 @@ def catch_all(path):
|
||||
return render_template("{}.html".format(path))
|
||||
|
||||
|
||||
# TODO: this should be partly consolidated into a domain function that takes
|
||||
# all the necessary UWSGI environment values as args and either returns a user
|
||||
# or raises the UnauthenticatedError
|
||||
def _make_authentication_context():
|
||||
return AuthenticationContext(
|
||||
crl_validator=app.crl_validator,
|
||||
auth_status=request.environ.get("HTTP_X_SSL_CLIENT_VERIFY"),
|
||||
sdn=request.environ.get("HTTP_X_SSL_CLIENT_S_DN"),
|
||||
cert=request.environ.get("HTTP_X_SSL_CLIENT_CERT")
|
||||
)
|
||||
|
||||
|
||||
@bp.route('/login-redirect')
|
||||
def login_redirect():
|
||||
if request.environ.get('HTTP_X_SSL_CLIENT_VERIFY') == 'SUCCESS' and _is_valid_certificate(request):
|
||||
sdn = request.environ.get('HTTP_X_SSL_CLIENT_S_DN')
|
||||
sdn_parts = parse_sdn(sdn)
|
||||
user = Users.get_or_create_by_dod_id(**sdn_parts)
|
||||
session["user_id"] = user.id
|
||||
auth_context = _make_authentication_context()
|
||||
auth_context.authenticate()
|
||||
user = auth_context.get_user()
|
||||
session["user_id"] = user.id
|
||||
|
||||
return redirect(url_for("atst.home"))
|
||||
else:
|
||||
raise UnauthenticatedError()
|
||||
return redirect(url_for("atst.home"))
|
||||
|
||||
|
||||
def _is_valid_certificate(request):
|
||||
cert = request.environ.get('HTTP_X_SSL_CLIENT_CERT')
|
||||
if cert:
|
||||
result = app.crl_validator.validate(cert.encode())
|
||||
if not result:
|
||||
app.logger.info(app.crl_validator.errors[-1])
|
||||
return result
|
||||
else:
|
||||
return False
|
||||
|
@@ -9,8 +9,10 @@ def map_request(request):
|
||||
time_created = pendulum.instance(request.time_created)
|
||||
is_new = time_created.add(days=1) > pendulum.now()
|
||||
app_count = request.body.get("details_of_use", {}).get("num_software_systems", 0)
|
||||
update_url = url_for('requests.requests_form_update', screen=1, request_id=request.id)
|
||||
verify_url = url_for('requests.financial_verification', request_id=request.id)
|
||||
update_url = url_for(
|
||||
"requests.requests_form_update", screen=1, request_id=request.id
|
||||
)
|
||||
verify_url = url_for("requests.financial_verification", request_id=request.id)
|
||||
|
||||
return {
|
||||
"order_id": request.id,
|
||||
@@ -19,7 +21,9 @@ def map_request(request):
|
||||
"app_count": app_count,
|
||||
"date": time_created.format("M/DD/YYYY"),
|
||||
"full_name": request.creator.full_name,
|
||||
"edit_link": verify_url if Requests.is_pending_financial_verification(request) else update_url
|
||||
"edit_link": verify_url if Requests.is_pending_financial_verification(
|
||||
request
|
||||
) else update_url,
|
||||
}
|
||||
|
||||
|
||||
@@ -33,4 +37,12 @@ def requests_index():
|
||||
|
||||
mapped_requests = [map_request(r) for r in requests]
|
||||
|
||||
return render_template("requests.html", requests=mapped_requests)
|
||||
pending_fv = any(Requests.is_pending_financial_verification(r) for r in requests)
|
||||
pending_ccpo = any(Requests.is_pending_ccpo_approval(r) for r in requests)
|
||||
|
||||
return render_template(
|
||||
"requests.html",
|
||||
requests=mapped_requests,
|
||||
pending_financial_verification=pending_fv,
|
||||
pending_ccpo_approval=pending_ccpo,
|
||||
)
|
||||
|
@@ -11,13 +11,15 @@ class JEDIRequestFlow(object):
|
||||
def __init__(
|
||||
self,
|
||||
current_step,
|
||||
current_user=None,
|
||||
request=None,
|
||||
post_data=None,
|
||||
request_id=None,
|
||||
current_user=None,
|
||||
existing_request=None,
|
||||
):
|
||||
self.current_step = current_step
|
||||
|
||||
self.current_user = current_user
|
||||
self.request = request
|
||||
|
||||
self.post_data = post_data
|
||||
@@ -26,16 +28,13 @@ class JEDIRequestFlow(object):
|
||||
self.request_id = request_id
|
||||
self.form = self._form()
|
||||
|
||||
self.current_user = current_user
|
||||
self.existing_request = existing_request
|
||||
|
||||
def _form(self):
|
||||
if self.is_post:
|
||||
return self.form_class()(self.post_data)
|
||||
elif self.request:
|
||||
return self.form_class()(data=self.current_step_data)
|
||||
else:
|
||||
return self.form_class()()
|
||||
return self.form_class()(data=self.current_step_data)
|
||||
|
||||
def validate(self):
|
||||
return self.form.validate()
|
||||
@@ -59,6 +58,16 @@ class JEDIRequestFlow(object):
|
||||
def form_class(self):
|
||||
return self.current_screen["form"]
|
||||
|
||||
# maps user data to fields in OrgForm; this should be moved into the
|
||||
# request initialization process when we have a request schema, or we just
|
||||
# shouldn't record this data on the request
|
||||
def map_user_data(self, user):
|
||||
return {
|
||||
"fname_request": user.first_name,
|
||||
"lname_request": user.last_name,
|
||||
"email_request": user.email
|
||||
}
|
||||
|
||||
@property
|
||||
def current_step_data(self):
|
||||
data = {}
|
||||
@@ -69,8 +78,13 @@ class JEDIRequestFlow(object):
|
||||
if self.request:
|
||||
if self.form_section == "review_submit":
|
||||
data = self.request.body
|
||||
elif self.form_section == "information_about_you":
|
||||
form_data = self.request.body.get(self.form_section, {})
|
||||
data = { **self.map_user_data(self.request.creator), **form_data }
|
||||
else:
|
||||
data = self.request.body.get(self.form_section, {})
|
||||
elif self.form_section == "information_about_you":
|
||||
data = self.map_user_data(self.current_user)
|
||||
|
||||
return defaultdict(lambda: defaultdict(lambda: "Input required"), data)
|
||||
|
||||
|
@@ -4,12 +4,13 @@ from . import requests_bp
|
||||
from atst.domain.requests import Requests
|
||||
from atst.routes.requests.jedi_request_flow import JEDIRequestFlow
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.models.request_status_event import RequestStatus
|
||||
from atst.domain.exceptions import UnauthorizedError
|
||||
|
||||
|
||||
@requests_bp.route("/requests/new/<int:screen>", methods=["GET"])
|
||||
def requests_form_new(screen):
|
||||
jedi_flow = JEDIRequestFlow(screen, request=None)
|
||||
jedi_flow = JEDIRequestFlow(screen, request=None, current_user=g.current_user)
|
||||
|
||||
return render_template(
|
||||
"requests/screen-%d.html" % int(screen),
|
||||
@@ -31,7 +32,7 @@ def requests_form_update(screen=1, request_id=None):
|
||||
_check_can_view_request(request_id)
|
||||
|
||||
request = Requests.get(request_id) if request_id is not None else None
|
||||
jedi_flow = JEDIRequestFlow(screen, request, request_id=request_id)
|
||||
jedi_flow = JEDIRequestFlow(screen, request=request, request_id=request_id, current_user=g.current_user)
|
||||
|
||||
return render_template(
|
||||
"requests/screen-%d.html" % int(screen),
|
||||
@@ -41,6 +42,7 @@ def requests_form_update(screen=1, request_id=None):
|
||||
current=screen,
|
||||
next_screen=screen + 1,
|
||||
request_id=request_id,
|
||||
jedi_request=jedi_flow.request,
|
||||
can_submit=jedi_flow.can_submit,
|
||||
)
|
||||
|
||||
@@ -99,11 +101,11 @@ def requests_submit(request_id=None):
|
||||
request = Requests.get(request_id)
|
||||
Requests.submit(request)
|
||||
|
||||
if request.status == "approved":
|
||||
return redirect("/requests?modal=True")
|
||||
if request.status == RequestStatus.PENDING_FINANCIAL_VERIFICATION:
|
||||
return redirect("/requests?modal=pendingFinancialVerification")
|
||||
|
||||
else:
|
||||
return redirect("/requests")
|
||||
return redirect("/requests?modal=pendingCCPOApproval")
|
||||
|
||||
|
||||
# TODO: generalize this, along with other authorizations, into a policy-pattern
|
||||
|
Reference in New Issue
Block a user