Format project
This commit is contained in:
parent
e9fa4d9ecb
commit
daa8634cb4
@ -85,7 +85,9 @@ def map_config(config):
|
||||
"SQLALCHEMY_DATABASE_URI": config["default"]["DATABASE_URI"],
|
||||
"SQLALCHEMY_TRACK_MODIFICATIONS": False,
|
||||
"WTF_CSRF_ENABLED": config.getboolean("default", "WTF_CSRF_ENABLED"),
|
||||
"PERMANENT_SESSION_LIFETIME": config.getint("default", "PERMANENT_SESSION_LIFETIME"),
|
||||
"PERMANENT_SESSION_LIFETIME": config.getint(
|
||||
"default", "PERMANENT_SESSION_LIFETIME"
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
@ -127,8 +129,10 @@ def make_config():
|
||||
|
||||
return map_config(config)
|
||||
|
||||
|
||||
def make_redis(config):
|
||||
return redis.Redis.from_url(config['REDIS_URI'])
|
||||
return redis.Redis.from_url(config["REDIS_URI"])
|
||||
|
||||
|
||||
def make_crl_validator(app):
|
||||
crl_locations = []
|
||||
@ -136,5 +140,6 @@ def make_crl_validator(app):
|
||||
crl_locations.append(filename.absolute())
|
||||
app.crl_cache = CRLCache(app.config["CA_CHAIN"], crl_locations, logger=app.logger)
|
||||
|
||||
|
||||
def make_eda_client(app):
|
||||
app.eda_client = MockEDAClient()
|
||||
|
@ -3,14 +3,10 @@ from flask_assets import Environment, Bundle
|
||||
environment = Environment()
|
||||
|
||||
css = Bundle(
|
||||
"../static/assets/index.css",
|
||||
output="../static/assets/index.%(version)s.css",
|
||||
"../static/assets/index.css", output="../static/assets/index.%(version)s.css"
|
||||
)
|
||||
|
||||
environment.register("css", css)
|
||||
|
||||
js = Bundle(
|
||||
'../static/assets/index.js',
|
||||
output='../static/assets/index.%(version)s.js'
|
||||
)
|
||||
environment.register('js_all', js)
|
||||
js = Bundle("../static/assets/index.js", output="../static/assets/index.%(version)s.js")
|
||||
environment.register("js_all", js)
|
||||
|
@ -3,7 +3,14 @@ from flask import g, redirect, url_for, session, request
|
||||
from atst.domain.users import Users
|
||||
|
||||
|
||||
UNPROTECTED_ROUTES = ["atst.root", "dev.login_dev", "atst.login_redirect", "atst.unauthorized", "static"]
|
||||
UNPROTECTED_ROUTES = [
|
||||
"atst.root",
|
||||
"dev.login_dev",
|
||||
"atst.login_redirect",
|
||||
"atst.unauthorized",
|
||||
"static",
|
||||
]
|
||||
|
||||
|
||||
def apply_authentication(app):
|
||||
@app.before_request
|
||||
@ -26,7 +33,7 @@ def get_current_user():
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def _unprotected_route(request):
|
||||
if request.endpoint in UNPROTECTED_ROUTES:
|
||||
return True
|
||||
|
||||
|
@ -4,8 +4,7 @@ from .utils import parse_sdn, email_from_certificate
|
||||
from .crl import CRLRevocationException
|
||||
|
||||
|
||||
class AuthenticationContext():
|
||||
|
||||
class AuthenticationContext:
|
||||
def __init__(self, crl_cache, auth_status, sdn, cert):
|
||||
if None in locals().values():
|
||||
raise UnauthenticatedError(
|
||||
|
@ -9,14 +9,16 @@ class CRLRevocationException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class CRLCache():
|
||||
class CRLCache:
|
||||
|
||||
_PEM_RE = re.compile(
|
||||
b"-----BEGIN CERTIFICATE-----\r?.+?\r?-----END CERTIFICATE-----\r?\n?",
|
||||
re.DOTALL,
|
||||
)
|
||||
|
||||
def __init__(self, root_location, crl_locations=[], store_class=crypto.X509Store, logger=None):
|
||||
def __init__(
|
||||
self, root_location, crl_locations=[], store_class=crypto.X509Store, logger=None
|
||||
):
|
||||
self.store_class = store_class
|
||||
self.certificate_authorities = {}
|
||||
self._load_roots(root_location)
|
||||
@ -57,7 +59,11 @@ class CRLCache():
|
||||
with open(crl_location, "rb") as crl_file:
|
||||
crl = crypto.load_crl(crypto.FILETYPE_ASN1, crl_file.read())
|
||||
store.add_crl(crl)
|
||||
self.log_info("STORE ID: {}. Adding CRL with issuer {}".format(id(store), crl.get_issuer()))
|
||||
self.log_info(
|
||||
"STORE ID: {}. Adding CRL with issuer {}".format(
|
||||
id(store), crl.get_issuer()
|
||||
)
|
||||
)
|
||||
store = self._add_certificate_chain_to_store(store, crl.get_issuer())
|
||||
return store
|
||||
|
||||
@ -75,7 +81,11 @@ class CRLCache():
|
||||
def _add_certificate_chain_to_store(self, store, issuer):
|
||||
ca = self.certificate_authorities.get(issuer.der())
|
||||
store.add_cert(ca)
|
||||
self.log_info("STORE ID: {}. Adding CA with subject {}".format(id(store), ca.get_subject()))
|
||||
self.log_info(
|
||||
"STORE ID: {}. Adding CA with subject {}".format(
|
||||
id(store), ca.get_subject()
|
||||
)
|
||||
)
|
||||
|
||||
if issuer == ca.get_issuer():
|
||||
# i.e., it is the root CA and we are at the end of the chain
|
||||
|
@ -25,7 +25,15 @@ def email_from_certificate(cert_file):
|
||||
return email[0]
|
||||
|
||||
else:
|
||||
raise ValueError("No email available for certificate with serial {}".format(cert.serial_number))
|
||||
raise ValueError(
|
||||
"No email available for certificate with serial {}".format(
|
||||
cert.serial_number
|
||||
)
|
||||
)
|
||||
|
||||
except x509.extensions.ExtensionNotFound:
|
||||
raise ValueError("No subjectAltName available for certificate with serial {}".format(cert.serial_number))
|
||||
raise ValueError(
|
||||
"No subjectAltName available for certificate with serial {}".format(
|
||||
cert.serial_number
|
||||
)
|
||||
)
|
||||
|
@ -6,7 +6,6 @@ from .exceptions import NotFoundError
|
||||
|
||||
|
||||
class PENumbers(object):
|
||||
|
||||
@classmethod
|
||||
def get(cls, number):
|
||||
pe_number = db.session.query(PENumber).get(number)
|
||||
|
@ -73,9 +73,10 @@ class Requests(object):
|
||||
filters.append(Request.creator == creator)
|
||||
|
||||
requests = (
|
||||
db.session.query(Request).filter(*filters).order_by(
|
||||
Request.time_created.desc()
|
||||
).all()
|
||||
db.session.query(Request)
|
||||
.filter(*filters)
|
||||
.order_by(Request.time_created.desc())
|
||||
.all()
|
||||
)
|
||||
return requests
|
||||
|
||||
@ -113,9 +114,10 @@ class Requests(object):
|
||||
# Query for request matching id, acquiring a row-level write lock.
|
||||
# https://www.postgresql.org/docs/10/static/sql-select.html#SQL-FOR-UPDATE-SHARE
|
||||
return (
|
||||
db.session.query(Request).filter_by(id=request_id).with_for_update(
|
||||
of=Request
|
||||
).one()
|
||||
db.session.query(Request)
|
||||
.filter_by(id=request_id)
|
||||
.with_for_update(of=Request)
|
||||
.one()
|
||||
)
|
||||
|
||||
except NoResultFound:
|
||||
@ -153,9 +155,7 @@ class Requests(object):
|
||||
RequestStatus.STARTED: "mission_owner",
|
||||
RequestStatus.PENDING_FINANCIAL_VERIFICATION: "mission_owner",
|
||||
RequestStatus.PENDING_CCPO_APPROVAL: "ccpo",
|
||||
}.get(
|
||||
request.status
|
||||
)
|
||||
}.get(request.status)
|
||||
|
||||
@classmethod
|
||||
def should_auto_approve(cls, request):
|
||||
@ -167,13 +167,16 @@ class Requests(object):
|
||||
return dollar_value < cls.AUTO_APPROVE_THRESHOLD
|
||||
|
||||
_VALID_SUBMISSION_STATUSES = [
|
||||
RequestStatus.STARTED, RequestStatus.CHANGES_REQUESTED
|
||||
RequestStatus.STARTED,
|
||||
RequestStatus.CHANGES_REQUESTED,
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def should_allow_submission(cls, request):
|
||||
all_request_sections = [
|
||||
"details_of_use", "information_about_you", "primary_poc"
|
||||
"details_of_use",
|
||||
"information_about_you",
|
||||
"primary_poc",
|
||||
]
|
||||
existing_request_sections = request.body.keys()
|
||||
return request.status in Requests._VALID_SUBMISSION_STATUSES and all(
|
||||
|
@ -6,7 +6,6 @@ from .exceptions import NotFoundError
|
||||
|
||||
|
||||
class Roles(object):
|
||||
|
||||
@classmethod
|
||||
def get(cls, role_name):
|
||||
try:
|
||||
|
@ -9,7 +9,6 @@ from .exceptions import NotFoundError, AlreadyExistsError
|
||||
|
||||
|
||||
class Users(object):
|
||||
|
||||
@classmethod
|
||||
def get(cls, user_id):
|
||||
try:
|
||||
|
@ -11,7 +11,6 @@ from .exceptions import NotFoundError
|
||||
|
||||
|
||||
class WorkspaceUsers(object):
|
||||
|
||||
@classmethod
|
||||
def get(cls, workspace_id, user_id):
|
||||
try:
|
||||
|
@ -64,8 +64,6 @@ class Workspaces(object):
|
||||
@classmethod
|
||||
def _create_workspace_role(cls, user, workspace, role_name):
|
||||
role = Roles.get(role_name)
|
||||
workspace_role = WorkspaceRole(
|
||||
user=user, role=role, workspace=workspace
|
||||
)
|
||||
workspace_role = WorkspaceRole(user=user, role=role, workspace=workspace)
|
||||
db.session.add(workspace_role)
|
||||
return workspace_role
|
||||
|
@ -1,7 +1,8 @@
|
||||
import re
|
||||
|
||||
|
||||
def iconSvg(name):
|
||||
with open('static/icons/'+name+'.svg') as contents:
|
||||
with open("static/icons/" + name + ".svg") as contents:
|
||||
return contents.read()
|
||||
|
||||
|
||||
@ -14,8 +15,8 @@ def dollars(value):
|
||||
|
||||
|
||||
def usPhone(number):
|
||||
phone = re.sub(r'\D', '', number)
|
||||
return '+1 ({}) {} - {}'.format(phone[0:3], phone[3:6], phone[6:])
|
||||
phone = re.sub(r"\D", "", number)
|
||||
return "+1 ({}) {} - {}".format(phone[0:3], phone[3:6], phone[6:])
|
||||
|
||||
|
||||
def readableInteger(value):
|
||||
@ -31,9 +32,8 @@ def getOptionLabel(value, options):
|
||||
|
||||
|
||||
def register_filters(app):
|
||||
app.jinja_env.filters['iconSvg'] = iconSvg
|
||||
app.jinja_env.filters['dollars'] = dollars
|
||||
app.jinja_env.filters['usPhone'] = usPhone
|
||||
app.jinja_env.filters['readableInteger'] = readableInteger
|
||||
app.jinja_env.filters['getOptionLabel'] = getOptionLabel
|
||||
|
||||
app.jinja_env.filters["iconSvg"] = iconSvg
|
||||
app.jinja_env.filters["dollars"] = dollars
|
||||
app.jinja_env.filters["usPhone"] = usPhone
|
||||
app.jinja_env.filters["readableInteger"] = readableInteger
|
||||
app.jinja_env.filters["getOptionLabel"] = getOptionLabel
|
||||
|
@ -3,7 +3,10 @@ SERVICE_BRANCHES = [
|
||||
("Air Force, Department of the", "Air Force, Department of the"),
|
||||
("Army and Air Force Exchange Service", "Army and Air Force Exchange Service"),
|
||||
("Army, Department of the", "Army, Department of the"),
|
||||
("Defense Advanced Research Projects Agency", "Defense Advanced Research Projects Agency"),
|
||||
(
|
||||
"Defense Advanced Research Projects Agency",
|
||||
"Defense Advanced Research Projects Agency",
|
||||
),
|
||||
("Defense Commissary Agency", "Defense Commissary Agency"),
|
||||
("Defense Contract Audit Agency", "Defense Contract Audit Agency"),
|
||||
("Defense Contract Management Agency", "Defense Contract Management Agency"),
|
||||
@ -19,31 +22,55 @@ SERVICE_BRANCHES = [
|
||||
("Defense Security Cooperation Agency", "Defense Security Cooperation Agency"),
|
||||
("Defense Security Service", "Defense Security Service"),
|
||||
("Defense Technical Information Center", "Defense Technical Information Center"),
|
||||
("Defense Technology Security Administration", "Defense Technology Security Administration"),
|
||||
(
|
||||
"Defense Technology Security Administration",
|
||||
"Defense Technology Security Administration",
|
||||
),
|
||||
("Defense Threat Reduction Agency", "Defense Threat Reduction Agency"),
|
||||
("DoD Education Activity", "DoD Education Activity"),
|
||||
("DoD Human Recourses Activity", "DoD Human Recourses Activity"),
|
||||
("DoD Inspector General", "DoD Inspector General"),
|
||||
("DoD Test Resource Management Center", "DoD Test Resource Management Center"),
|
||||
("Headquarters Defense Human Resource Activity ", "Headquarters Defense Human Resource Activity "),
|
||||
(
|
||||
"Headquarters Defense Human Resource Activity ",
|
||||
"Headquarters Defense Human Resource Activity ",
|
||||
),
|
||||
("Joint Staff", "Joint Staff"),
|
||||
("Missile Defense Agency", "Missile Defense Agency"),
|
||||
("National Defense University", "National Defense University"),
|
||||
("National Geospatial Intelligence Agency (NGA)", "National Geospatial Intelligence Agency (NGA)"),
|
||||
("National Oceanic and Atmospheric Administration (NOAA)", "National Oceanic and Atmospheric Administration (NOAA)"),
|
||||
(
|
||||
"National Geospatial Intelligence Agency (NGA)",
|
||||
"National Geospatial Intelligence Agency (NGA)",
|
||||
),
|
||||
(
|
||||
"National Oceanic and Atmospheric Administration (NOAA)",
|
||||
"National Oceanic and Atmospheric Administration (NOAA)",
|
||||
),
|
||||
("National Reconnaissance Office", "National Reconnaissance Office"),
|
||||
("National Reconnaissance Office (NRO)", "National Reconnaissance Office (NRO)"),
|
||||
("National Security Agency (NSA)", "National Security Agency (NSA)"),
|
||||
("National Security Agency-Central Security Service", "National Security Agency-Central Security Service"),
|
||||
(
|
||||
"National Security Agency-Central Security Service",
|
||||
"National Security Agency-Central Security Service",
|
||||
),
|
||||
("Navy, Department of the", "Navy, Department of the"),
|
||||
("Office of Economic Adjustment", "Office of Economic Adjustment"),
|
||||
("Office of the Secretary of Defense", "Office of the Secretary of Defense"),
|
||||
("Pentagon Force Protection Agency", "Pentagon Force Protection Agency"),
|
||||
("Uniform Services University of the Health Sciences", "Uniform Services University of the Health Sciences"),
|
||||
(
|
||||
"Uniform Services University of the Health Sciences",
|
||||
"Uniform Services University of the Health Sciences",
|
||||
),
|
||||
("US Cyber Command (USCYBERCOM)", "US Cyber Command (USCYBERCOM)"),
|
||||
("US Special Operations Command (USSOCOM)", "US Special Operations Command (USSOCOM)"),
|
||||
(
|
||||
"US Special Operations Command (USSOCOM)",
|
||||
"US Special Operations Command (USSOCOM)",
|
||||
),
|
||||
("US Strategic Command (USSTRATCOM)", "US Strategic Command (USSTRATCOM)"),
|
||||
("US Transportation Command (USTRANSCOM)", "US Transportation Command (USTRANSCOM)"),
|
||||
(
|
||||
"US Transportation Command (USTRANSCOM)",
|
||||
"US Transportation Command (USTRANSCOM)",
|
||||
),
|
||||
("Washington Headquarters Services", "Washington Headquarters Services"),
|
||||
]
|
||||
|
||||
|
@ -24,7 +24,7 @@ class NewlineListField(Field):
|
||||
|
||||
def _value(self):
|
||||
if isinstance(self.data, list):
|
||||
return '\n'.join(self.data)
|
||||
return "\n".join(self.data)
|
||||
elif self.data:
|
||||
return self.data
|
||||
else:
|
||||
@ -46,8 +46,5 @@ class NewlineListField(Field):
|
||||
class SelectField(SelectField_):
|
||||
def __init__(self, *args, **kwargs):
|
||||
render_kw = kwargs.get("render_kw", {})
|
||||
kwargs["render_kw"] = {
|
||||
**render_kw,
|
||||
"required": False
|
||||
}
|
||||
kwargs["render_kw"] = {**render_kw, "required": False}
|
||||
super().__init__(*args, **kwargs)
|
||||
|
@ -26,6 +26,7 @@ TREASURY_CODE_REGEX = re.compile(r"^0*([1-9]{4}|[1-9]{6})$")
|
||||
|
||||
BA_CODE_REGEX = re.compile(r"^0*[1-9]{2}\w?$")
|
||||
|
||||
|
||||
def suggest_pe_id(pe_id):
|
||||
suggestion = pe_id
|
||||
match = PE_REGEX.match(pe_id)
|
||||
@ -94,19 +95,26 @@ class BaseFinancialForm(ValidatedForm):
|
||||
task_order_number = StringField(
|
||||
"Task Order Number associated with this request",
|
||||
description="Include the original Task Order number (including the 000X at the end). Do not include any modification numbers. Note that there may be a lag between approving a task order and when it becomes available in our system.",
|
||||
validators=[Required()]
|
||||
validators=[Required()],
|
||||
)
|
||||
|
||||
uii_ids = NewlineListField(
|
||||
"Unique Item Identifier (UII)s related to your application(s) if you already have them",
|
||||
validators=[Required()]
|
||||
validators=[Required()],
|
||||
)
|
||||
|
||||
pe_id = StringField("Program Element (PE) Number related to your request", validators=[Required()])
|
||||
pe_id = StringField(
|
||||
"Program Element (PE) Number related to your request", validators=[Required()]
|
||||
)
|
||||
|
||||
treasury_code = StringField("Program Treasury Code", validators=[Required(), Regexp(TREASURY_CODE_REGEX)])
|
||||
treasury_code = StringField(
|
||||
"Program Treasury Code", validators=[Required(), Regexp(TREASURY_CODE_REGEX)]
|
||||
)
|
||||
|
||||
ba_code = StringField("Program Budget Activity (BA) Code", validators=[Required(), Regexp(BA_CODE_REGEX)])
|
||||
ba_code = StringField(
|
||||
"Program Budget Activity (BA) Code",
|
||||
validators=[Required(), Regexp(BA_CODE_REGEX)],
|
||||
)
|
||||
|
||||
fname_co = StringField("Contracting Officer First Name", validators=[Required()])
|
||||
lname_co = StringField("Contracting Officer Last Name", validators=[Required()])
|
||||
@ -160,7 +168,7 @@ class ExtendedFinancialForm(BaseFinancialForm):
|
||||
("OTHER", "Other"),
|
||||
],
|
||||
validators=[Required()],
|
||||
render_kw={"required": False}
|
||||
render_kw={"required": False},
|
||||
)
|
||||
|
||||
funding_type_other = StringField("If other, please specify")
|
||||
@ -169,40 +177,40 @@ class ExtendedFinancialForm(BaseFinancialForm):
|
||||
"<dl><dt>CLIN 0001</dt> - <dd>Unclassified IaaS and PaaS Amount</dd></dl>",
|
||||
validators=[Required()],
|
||||
description="Review your task order document, the amounts for each CLIN must match exactly here",
|
||||
filters=[number_to_int]
|
||||
filters=[number_to_int],
|
||||
)
|
||||
|
||||
clin_0003 = StringField(
|
||||
"<dl><dt>CLIN 0003</dt> - <dd>Unclassified Cloud Support Package</dd></dl>",
|
||||
validators=[Required()],
|
||||
description="Review your task order document, the amounts for each CLIN must match exactly here",
|
||||
filters=[number_to_int]
|
||||
filters=[number_to_int],
|
||||
)
|
||||
|
||||
clin_1001 = StringField(
|
||||
"<dl><dt>CLIN 1001</dt> - <dd>Unclassified IaaS and PaaS Amount <br> OPTION PERIOD 1</dd></dl>",
|
||||
validators=[Required()],
|
||||
description="Review your task order document, the amounts for each CLIN must match exactly here",
|
||||
filters=[number_to_int]
|
||||
filters=[number_to_int],
|
||||
)
|
||||
|
||||
clin_1003 = StringField(
|
||||
"<dl><dt>CLIN 1003</dt> - <dd>Unclassified Cloud Support Package <br> OPTION PERIOD 1</dd></dl>",
|
||||
validators=[Required()],
|
||||
description="Review your task order document, the amounts for each CLIN must match exactly here",
|
||||
filters=[number_to_int]
|
||||
filters=[number_to_int],
|
||||
)
|
||||
|
||||
clin_2001 = StringField(
|
||||
"<dl><dt>CLIN 2001</dt> - <dd>Unclassified IaaS and PaaS Amount <br> OPTION PERIOD 2</dd></dl>",
|
||||
validators=[Required()],
|
||||
description="Review your task order document, the amounts for each CLIN must match exactly here",
|
||||
filters=[number_to_int]
|
||||
filters=[number_to_int],
|
||||
)
|
||||
|
||||
clin_2003 = StringField(
|
||||
"<dl><dt>CLIN 2003</dt> - <dd>Unclassified Cloud Support Package <br> OPTION PERIOD 2</dd></dl>",
|
||||
validators=[Required()],
|
||||
description="Review your task order document, the amounts for each CLIN must match exactly here",
|
||||
filters=[number_to_int]
|
||||
filters=[number_to_int],
|
||||
)
|
||||
|
@ -16,9 +16,11 @@ class OrgForm(ValidatedForm):
|
||||
|
||||
email_request = EmailField("E-mail Address", validators=[Required(), Email()])
|
||||
|
||||
phone_number = TelField("Phone Number",
|
||||
description='Enter a 10-digit phone number',
|
||||
validators=[Required(), PhoneNumber()])
|
||||
phone_number = TelField(
|
||||
"Phone Number",
|
||||
description="Enter a 10-digit phone number",
|
||||
validators=[Required(), PhoneNumber()],
|
||||
)
|
||||
|
||||
service_branch = SelectField(
|
||||
"Service Branch or Agency",
|
||||
@ -49,7 +51,7 @@ class OrgForm(ValidatedForm):
|
||||
|
||||
date_latest_training = DateField(
|
||||
"Latest Information Assurance (IA) Training Completion Date",
|
||||
description="To complete the training, you can find it in <a class=\"icon-link\" href=\"https://iatraining.disa.mil/eta/disa_cac2018/launchPage.htm\" target=\"_blank\">Information Assurance Cyber Awareness Challange</a> website.",
|
||||
description='To complete the training, you can find it in <a class="icon-link" href="https://iatraining.disa.mil/eta/disa_cac2018/launchPage.htm" target="_blank">Information Assurance Cyber Awareness Challange</a> website.',
|
||||
validators=[
|
||||
Required(),
|
||||
DateRange(
|
||||
|
@ -6,7 +6,6 @@ from .validators import IsNumber
|
||||
|
||||
|
||||
class POCForm(ValidatedForm):
|
||||
|
||||
def validate(self, *args, **kwargs):
|
||||
if self.am_poc.data:
|
||||
# Prepend Optional validators so that the validation chain
|
||||
@ -18,11 +17,10 @@ class POCForm(ValidatedForm):
|
||||
|
||||
return super().validate(*args, **kwargs)
|
||||
|
||||
|
||||
am_poc = BooleanField(
|
||||
"I am the Workspace Owner",
|
||||
default=False,
|
||||
false_values=(False, "false", "False", "no", "")
|
||||
false_values=(False, "false", "False", "no", ""),
|
||||
)
|
||||
|
||||
fname_poc = StringField("First Name", validators=[Required()])
|
||||
|
@ -4,22 +4,26 @@ from wtforms.validators import Optional, Required
|
||||
|
||||
from .fields import DateField, SelectField
|
||||
from .forms import ValidatedForm
|
||||
from .data import SERVICE_BRANCHES, ASSISTANCE_ORG_TYPES, DATA_TRANSFER_AMOUNTS, COMPLETION_DATE_RANGES
|
||||
from .data import (
|
||||
SERVICE_BRANCHES,
|
||||
ASSISTANCE_ORG_TYPES,
|
||||
DATA_TRANSFER_AMOUNTS,
|
||||
COMPLETION_DATE_RANGES,
|
||||
)
|
||||
from atst.domain.requests import Requests
|
||||
|
||||
|
||||
class RequestForm(ValidatedForm):
|
||||
|
||||
def validate(self, *args, **kwargs):
|
||||
if self.jedi_migration.data == 'no':
|
||||
if self.jedi_migration.data == "no":
|
||||
self.rationalization_software_systems.validators.append(Optional())
|
||||
self.technical_support_team.validators.append(Optional())
|
||||
self.organization_providing_assistance.validators.append(Optional())
|
||||
self.engineering_assessment.validators.append(Optional())
|
||||
self.data_transfers.validators.append(Optional())
|
||||
self.expected_completion_date.validators.append(Optional())
|
||||
elif self.jedi_migration.data == 'yes':
|
||||
if self.technical_support_team.data == 'no':
|
||||
elif self.jedi_migration.data == "yes":
|
||||
if self.technical_support_team.data == "no":
|
||||
self.organization_providing_assistance.validators.append(Optional())
|
||||
self.cloud_native.validators.append(Optional())
|
||||
|
||||
@ -39,16 +43,15 @@ class RequestForm(ValidatedForm):
|
||||
"DoD Component",
|
||||
description="Identify the DoD component that is requesting access to the JEDI Cloud",
|
||||
choices=SERVICE_BRANCHES,
|
||||
validators=[Required()]
|
||||
validators=[Required()],
|
||||
)
|
||||
|
||||
jedi_usage = TextAreaField(
|
||||
"JEDI Usage",
|
||||
description="Your answer will help us provide tangible examples to DoD leadership how and why commercial cloud resources are accelerating the Department's missions",
|
||||
validators=[Required()]
|
||||
validators=[Required()],
|
||||
)
|
||||
|
||||
|
||||
# Details of Use: Cloud Readiness
|
||||
num_software_systems = IntegerField(
|
||||
"Number of Software Systems",
|
||||
@ -121,16 +124,15 @@ class RequestForm(ValidatedForm):
|
||||
|
||||
average_daily_traffic = IntegerField(
|
||||
"Average Daily Traffic (Number of Requests)",
|
||||
description="What is the average daily traffic you expect the systems under this cloud contract to use?"
|
||||
description="What is the average daily traffic you expect the systems under this cloud contract to use?",
|
||||
)
|
||||
|
||||
average_daily_traffic_gb = IntegerField(
|
||||
"Average Daily Traffic (GB)",
|
||||
description="What is the average daily traffic you expect the systems under this cloud contract to use?"
|
||||
description="What is the average daily traffic you expect the systems under this cloud contract to use?",
|
||||
)
|
||||
|
||||
start_date = DateField(
|
||||
description="When do you expect to start using the JEDI Cloud (not for billing purposes)?",
|
||||
validators=[
|
||||
Required()]
|
||||
validators=[Required()],
|
||||
)
|
||||
|
@ -60,5 +60,3 @@ def ListItemRequired(message="Please provide at least one.", empty_values=("", N
|
||||
raise ValidationError(message)
|
||||
|
||||
return _list_item_required
|
||||
|
||||
|
||||
|
@ -2,6 +2,12 @@ from sqlalchemy import Column, func, TIMESTAMP
|
||||
|
||||
|
||||
class TimestampsMixin(object):
|
||||
time_created = Column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now())
|
||||
time_updated = Column(TIMESTAMP(timezone=True), nullable=False, server_default=func.now(), onupdate=func.current_timestamp())
|
||||
|
||||
time_created = Column(
|
||||
TIMESTAMP(timezone=True), nullable=False, server_default=func.now()
|
||||
)
|
||||
time_updated = Column(
|
||||
TIMESTAMP(timezone=True),
|
||||
nullable=False,
|
||||
server_default=func.now(),
|
||||
onupdate=func.current_timestamp(),
|
||||
)
|
||||
|
@ -4,6 +4,7 @@ from sqlalchemy import Column, Integer, String, Enum as SQLAEnum
|
||||
|
||||
from atst.models import Base
|
||||
|
||||
|
||||
class Source(Enum):
|
||||
MANUAL = "Manual"
|
||||
EDA = "eda"
|
||||
|
@ -25,21 +25,21 @@ def styleguide():
|
||||
return render_template("styleguide.html")
|
||||
|
||||
|
||||
@bp.route('/<path:path>')
|
||||
@bp.route("/<path:path>")
|
||||
def catch_all(path):
|
||||
return render_template("{}.html".format(path))
|
||||
|
||||
|
||||
def _make_authentication_context():
|
||||
return AuthenticationContext(
|
||||
crl_cache=app.crl_cache,
|
||||
auth_status=request.environ.get("HTTP_X_SSL_CLIENT_VERIFY"),
|
||||
sdn=request.environ.get("HTTP_X_SSL_CLIENT_S_DN"),
|
||||
cert=request.environ.get("HTTP_X_SSL_CLIENT_CERT")
|
||||
crl_cache=app.crl_cache,
|
||||
auth_status=request.environ.get("HTTP_X_SSL_CLIENT_VERIFY"),
|
||||
sdn=request.environ.get("HTTP_X_SSL_CLIENT_S_DN"),
|
||||
cert=request.environ.get("HTTP_X_SSL_CLIENT_CERT"),
|
||||
)
|
||||
|
||||
|
||||
@bp.route('/login-redirect')
|
||||
@bp.route("/login-redirect")
|
||||
def login_redirect():
|
||||
auth_context = _make_authentication_context()
|
||||
auth_context.authenticate()
|
||||
@ -53,7 +53,7 @@ def login_redirect():
|
||||
|
||||
|
||||
def _is_valid_certificate(request):
|
||||
cert = request.environ.get('HTTP_X_SSL_CLIENT_CERT')
|
||||
cert = request.environ.get("HTTP_X_SSL_CLIENT_CERT")
|
||||
if cert:
|
||||
result = app.crl_validator.validate(cert.encode())
|
||||
return result
|
||||
|
@ -10,45 +10,46 @@ _DEV_USERS = {
|
||||
"first_name": "Sam",
|
||||
"last_name": "Seeceepio",
|
||||
"atat_role_name": "ccpo",
|
||||
"email": "sam@test.com"
|
||||
"email": "sam@test.com",
|
||||
},
|
||||
"amanda": {
|
||||
"dod_id": "2345678901",
|
||||
"first_name": "Amanda",
|
||||
"last_name": "Adamson",
|
||||
"atat_role_name": "default",
|
||||
"email": "amanda@test.com"
|
||||
"email": "amanda@test.com",
|
||||
},
|
||||
"brandon": {
|
||||
"dod_id": "3456789012",
|
||||
"first_name": "Brandon",
|
||||
"last_name": "Buchannan",
|
||||
"atat_role_name": "default",
|
||||
"email": "brandon@test.com"
|
||||
"email": "brandon@test.com",
|
||||
},
|
||||
"christina": {
|
||||
"dod_id": "4567890123",
|
||||
"first_name": "Christina",
|
||||
"last_name": "Collins",
|
||||
"atat_role_name": "default",
|
||||
"email": "christina@test.com"
|
||||
"email": "christina@test.com",
|
||||
},
|
||||
"dominick": {
|
||||
"dod_id": "5678901234",
|
||||
"first_name": "Dominick",
|
||||
"last_name": "Domingo",
|
||||
"atat_role_name": "default",
|
||||
"email": "dominick@test.com"
|
||||
"email": "dominick@test.com",
|
||||
},
|
||||
"erica": {
|
||||
"dod_id": "6789012345",
|
||||
"first_name": "Erica",
|
||||
"last_name": "Eichner",
|
||||
"atat_role_name": "default",
|
||||
"email": "erica@test.com"
|
||||
"email": "erica@test.com",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@bp.route("/login-dev")
|
||||
def login_dev():
|
||||
role = request.args.get("username", "amanda")
|
||||
@ -58,7 +59,7 @@ def login_dev():
|
||||
atat_role_name=user_data["atat_role_name"],
|
||||
first_name=user_data["first_name"],
|
||||
last_name=user_data["last_name"],
|
||||
email=user_data["email"]
|
||||
email=user_data["email"],
|
||||
)
|
||||
session["user_id"] = user.id
|
||||
|
||||
|
@ -11,11 +11,10 @@ def make_error_pages(app):
|
||||
app.logger.error(e.message)
|
||||
return render_template("not_found.html"), 404
|
||||
|
||||
|
||||
@app.errorhandler(exceptions.UnauthenticatedError)
|
||||
# pylint: disable=unused-variable
|
||||
def unauthorized(e):
|
||||
app.logger.error(e.message)
|
||||
return render_template('unauthenticated.html'), 401
|
||||
return render_template("unauthenticated.html"), 401
|
||||
|
||||
return app
|
||||
|
@ -8,6 +8,7 @@ from . import index
|
||||
from . import requests_form
|
||||
from . import financial_verification
|
||||
|
||||
|
||||
@requests_bp.context_processor
|
||||
def annual_spend_threshold():
|
||||
return { "annual_spend_threshold": Requests.ANNUAL_SPEND_THRESHOLD }
|
||||
return {"annual_spend_threshold": Requests.ANNUAL_SPEND_THRESHOLD}
|
||||
|
@ -43,7 +43,13 @@ def update_financial_verification(request_id):
|
||||
if valid:
|
||||
Requests.submit_financial_verification(request_id)
|
||||
new_workspace = Requests.approve_and_create_workspace(updated_request)
|
||||
return redirect(url_for("workspaces.workspace_projects", workspace_id=new_workspace.id, newWorkspace=True))
|
||||
return redirect(
|
||||
url_for(
|
||||
"workspaces.workspace_projects",
|
||||
workspace_id=new_workspace.id,
|
||||
newWorkspace=True,
|
||||
)
|
||||
)
|
||||
|
||||
else:
|
||||
form.reset()
|
||||
|
@ -24,15 +24,18 @@ def map_request(request):
|
||||
"date": time_created.format("M/DD/YYYY"),
|
||||
"full_name": request.creator.full_name,
|
||||
"annual_usage": annual_usage,
|
||||
"edit_link": verify_url if Requests.is_pending_financial_verification(
|
||||
request
|
||||
) else update_url,
|
||||
"edit_link": verify_url
|
||||
if Requests.is_pending_financial_verification(request)
|
||||
else update_url,
|
||||
}
|
||||
|
||||
|
||||
@requests_bp.route("/requests", methods=["GET"])
|
||||
def requests_index():
|
||||
if Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST in g.current_user.atat_permissions:
|
||||
if (
|
||||
Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST
|
||||
in g.current_user.atat_permissions
|
||||
):
|
||||
return _ccpo_view()
|
||||
|
||||
else:
|
||||
|
@ -129,7 +129,9 @@ class JEDIRequestFlow(object):
|
||||
if section == "primary_poc":
|
||||
if data.get("am_poc", False):
|
||||
try:
|
||||
request_user_info = self.existing_request.body.get("information_about_you", {})
|
||||
request_user_info = self.existing_request.body.get(
|
||||
"information_about_you", {}
|
||||
)
|
||||
except AttributeError:
|
||||
request_user_info = {}
|
||||
|
||||
|
@ -6,7 +6,12 @@ from atst.routes.requests.jedi_request_flow import JEDIRequestFlow
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.models.request_status_event import RequestStatus
|
||||
from atst.domain.exceptions import UnauthorizedError
|
||||
from atst.forms.data import SERVICE_BRANCHES, ASSISTANCE_ORG_TYPES, DATA_TRANSFER_AMOUNTS, COMPLETION_DATE_RANGES
|
||||
from atst.forms.data import (
|
||||
SERVICE_BRANCHES,
|
||||
ASSISTANCE_ORG_TYPES,
|
||||
DATA_TRANSFER_AMOUNTS,
|
||||
COMPLETION_DATE_RANGES,
|
||||
)
|
||||
|
||||
|
||||
@requests_bp.route("/requests/new/<int:screen>", methods=["GET"])
|
||||
@ -27,6 +32,7 @@ def requests_form_new(screen):
|
||||
completion_date_ranges=COMPLETION_DATE_RANGES,
|
||||
)
|
||||
|
||||
|
||||
@requests_bp.route(
|
||||
"/requests/new/<int:screen>", methods=["GET"], defaults={"request_id": None}
|
||||
)
|
||||
@ -36,7 +42,9 @@ def requests_form_update(screen=1, request_id=None):
|
||||
_check_can_view_request(request_id)
|
||||
|
||||
request = Requests.get(request_id) if request_id is not None else None
|
||||
jedi_flow = JEDIRequestFlow(screen, request=request, request_id=request_id, current_user=g.current_user)
|
||||
jedi_flow = JEDIRequestFlow(
|
||||
screen, request=request, request_id=request_id, current_user=g.current_user
|
||||
)
|
||||
|
||||
return render_template(
|
||||
"requests/screen-%d.html" % int(screen),
|
||||
@ -114,10 +122,12 @@ def requests_submit(request_id=None):
|
||||
# TODO: generalize this, along with other authorizations, into a policy-pattern
|
||||
# for authorization in the application
|
||||
def _check_can_view_request(request_id):
|
||||
if Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST in g.current_user.atat_permissions:
|
||||
if (
|
||||
Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST
|
||||
in g.current_user.atat_permissions
|
||||
):
|
||||
pass
|
||||
elif Requests.exists(request_id, g.current_user):
|
||||
pass
|
||||
else:
|
||||
raise UnauthorizedError(g.current_user, "view request {}".format(request_id))
|
||||
|
||||
|
@ -8,12 +8,7 @@ from atst.app import make_app, make_config
|
||||
from atst.database import db as _db
|
||||
import tests.factories as factories
|
||||
|
||||
dictConfig({
|
||||
'version': 1,
|
||||
'handlers': {'wsgi': {
|
||||
'class': 'logging.NullHandler',
|
||||
}}
|
||||
})
|
||||
dictConfig({"version": 1, "handlers": {"wsgi": {"class": "logging.NullHandler"}}})
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
|
@ -11,7 +11,7 @@ from tests.factories import UserFactory
|
||||
CERT = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read()
|
||||
|
||||
|
||||
class MockCRLCache():
|
||||
class MockCRLCache:
|
||||
def __init__(self, valid=True):
|
||||
self.valid = valid
|
||||
|
||||
@ -23,16 +23,12 @@ class MockCRLCache():
|
||||
|
||||
|
||||
def test_can_authenticate():
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLCache(), "SUCCESS", DOD_SDN, CERT
|
||||
)
|
||||
auth_context = AuthenticationContext(MockCRLCache(), "SUCCESS", DOD_SDN, CERT)
|
||||
assert auth_context.authenticate()
|
||||
|
||||
|
||||
def test_unsuccessful_status():
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLCache(), "FAILURE", DOD_SDN, CERT
|
||||
)
|
||||
auth_context = AuthenticationContext(MockCRLCache(), "FAILURE", DOD_SDN, CERT)
|
||||
with pytest.raises(UnauthenticatedError) as excinfo:
|
||||
assert auth_context.authenticate()
|
||||
|
||||
@ -41,9 +37,7 @@ def test_unsuccessful_status():
|
||||
|
||||
|
||||
def test_crl_check_fails():
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLCache(False), "SUCCESS", DOD_SDN, CERT
|
||||
)
|
||||
auth_context = AuthenticationContext(MockCRLCache(False), "SUCCESS", DOD_SDN, CERT)
|
||||
with pytest.raises(UnauthenticatedError) as excinfo:
|
||||
assert auth_context.authenticate()
|
||||
|
||||
@ -52,9 +46,7 @@ def test_crl_check_fails():
|
||||
|
||||
|
||||
def test_bad_sdn():
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLCache(), "SUCCESS", "abc123", CERT
|
||||
)
|
||||
auth_context = AuthenticationContext(MockCRLCache(), "SUCCESS", "abc123", CERT)
|
||||
with pytest.raises(UnauthenticatedError) as excinfo:
|
||||
auth_context.get_user()
|
||||
|
||||
@ -64,9 +56,7 @@ def test_bad_sdn():
|
||||
|
||||
def test_user_exists():
|
||||
user = UserFactory.create(**DOD_SDN_INFO)
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLCache(), "SUCCESS", DOD_SDN, CERT
|
||||
)
|
||||
auth_context = AuthenticationContext(MockCRLCache(), "SUCCESS", DOD_SDN, CERT)
|
||||
auth_user = auth_context.get_user()
|
||||
|
||||
assert auth_user == user
|
||||
@ -77,9 +67,7 @@ def test_creates_user():
|
||||
with pytest.raises(NotFoundError):
|
||||
Users.get_by_dod_id(DOD_SDN_INFO["dod_id"])
|
||||
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLCache(), "SUCCESS", DOD_SDN, CERT
|
||||
)
|
||||
auth_context = AuthenticationContext(MockCRLCache(), "SUCCESS", DOD_SDN, CERT)
|
||||
user = auth_context.get_user()
|
||||
assert user.dod_id == DOD_SDN_INFO["dod_id"]
|
||||
assert user.email == FIXTURE_EMAIL_ADDRESS
|
||||
@ -87,9 +75,7 @@ def test_creates_user():
|
||||
|
||||
def test_user_cert_has_no_email():
|
||||
cert = open("ssl/client-certs/atat.mil.crt").read()
|
||||
auth_context = AuthenticationContext(
|
||||
MockCRLCache(), "SUCCESS", DOD_SDN, cert
|
||||
)
|
||||
auth_context = AuthenticationContext(MockCRLCache(), "SUCCESS", DOD_SDN, cert)
|
||||
user = auth_context.get_user()
|
||||
|
||||
assert user.email == None
|
||||
|
@ -11,8 +11,7 @@ import atst.domain.authnid.crl.util as util
|
||||
from tests.mocks import FIXTURE_EMAIL_ADDRESS
|
||||
|
||||
|
||||
class MockX509Store():
|
||||
|
||||
class MockX509Store:
|
||||
def __init__(self):
|
||||
self.crls = []
|
||||
self.certs = []
|
||||
@ -98,8 +97,7 @@ def test_parse_disa_pki_list():
|
||||
assert len(crl_list) == len(href_matches)
|
||||
|
||||
|
||||
class MockStreamingResponse():
|
||||
|
||||
class MockStreamingResponse:
|
||||
def __init__(self, content_chunks, code=200):
|
||||
self.content_chunks = content_chunks
|
||||
self.status_code = code
|
||||
|
@ -18,4 +18,3 @@ def test_invalid_date():
|
||||
date_str = "This is not a valid data"
|
||||
with pytest.raises(ValueError):
|
||||
parse_date(date_str)
|
||||
|
||||
|
@ -7,7 +7,9 @@ from tests.factories import PENumberFactory
|
||||
|
||||
|
||||
def test_can_get_pe_number():
|
||||
new_pen = PENumberFactory.create(number="0701367F", description="Combat Support - Offensive")
|
||||
new_pen = PENumberFactory.create(
|
||||
number="0701367F", description="Combat Support - Offensive"
|
||||
)
|
||||
pen = PENumbers.get(new_pen.number)
|
||||
|
||||
assert pen.number == new_pen.number
|
||||
@ -17,8 +19,9 @@ def test_nonexistent_pe_number_raises():
|
||||
with pytest.raises(NotFoundError):
|
||||
PENumbers.get("some fake number")
|
||||
|
||||
|
||||
def test_create_many():
|
||||
pen_list = [['123456', 'Land Speeder'], ['7891011', 'Lightsaber']]
|
||||
pen_list = [["123456", "Land Speeder"], ["7891011", "Lightsaber"]]
|
||||
PENumbers.create_many(pen_list)
|
||||
|
||||
assert PENumbers.get(pen_list[0][0])
|
||||
|
@ -7,7 +7,12 @@ from atst.models.request import Request
|
||||
from atst.models.request_status_event import RequestStatus
|
||||
from atst.models.task_order import Source as TaskOrderSource
|
||||
|
||||
from tests.factories import RequestFactory, UserFactory, RequestStatusEventFactory, TaskOrderFactory
|
||||
from tests.factories import (
|
||||
RequestFactory,
|
||||
UserFactory,
|
||||
RequestStatusEventFactory,
|
||||
TaskOrderFactory,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@ -55,10 +60,12 @@ def test_dont_auto_approve_if_no_dollar_value_specified(new_request):
|
||||
def test_should_allow_submission(new_request):
|
||||
assert Requests.should_allow_submission(new_request)
|
||||
|
||||
RequestStatusEventFactory.create(request=new_request, new_status=RequestStatus.CHANGES_REQUESTED)
|
||||
RequestStatusEventFactory.create(
|
||||
request=new_request, new_status=RequestStatus.CHANGES_REQUESTED
|
||||
)
|
||||
assert Requests.should_allow_submission(new_request)
|
||||
|
||||
del new_request.body['details_of_use']
|
||||
del new_request.body["details_of_use"]
|
||||
assert not Requests.should_allow_submission(new_request)
|
||||
|
||||
|
||||
@ -76,12 +83,17 @@ def test_status_count(session):
|
||||
|
||||
request1 = RequestFactory.create()
|
||||
request2 = RequestFactory.create()
|
||||
RequestStatusEventFactory.create(sequence=2, request_id=request2.id, new_status=RequestStatus.PENDING_FINANCIAL_VERIFICATION)
|
||||
RequestStatusEventFactory.create(
|
||||
sequence=2,
|
||||
request_id=request2.id,
|
||||
new_status=RequestStatus.PENDING_FINANCIAL_VERIFICATION,
|
||||
)
|
||||
|
||||
assert Requests.status_count(RequestStatus.PENDING_FINANCIAL_VERIFICATION) == 1
|
||||
assert Requests.status_count(RequestStatus.STARTED) == 1
|
||||
assert Requests.in_progress_count() == 2
|
||||
|
||||
|
||||
def test_status_count_scoped_to_creator(session):
|
||||
# make sure table is empty
|
||||
session.query(Request).delete()
|
||||
@ -123,7 +135,7 @@ task_order_financial_data = {
|
||||
|
||||
def test_update_financial_verification_without_task_order():
|
||||
request = RequestFactory.create()
|
||||
financial_data = { **request_financial_data, **task_order_financial_data }
|
||||
financial_data = {**request_financial_data, **task_order_financial_data}
|
||||
Requests.update_financial_verification(request.id, financial_data)
|
||||
assert request.task_order
|
||||
assert request.task_order.clin_0001 == task_order_financial_data["clin_0001"]
|
||||
@ -132,7 +144,7 @@ def test_update_financial_verification_without_task_order():
|
||||
|
||||
def test_update_financial_verification_with_task_order():
|
||||
task_order = TaskOrderFactory.create(source=TaskOrderSource.EDA)
|
||||
financial_data = { **request_financial_data, "task_order_number": task_order.number }
|
||||
financial_data = {**request_financial_data, "task_order_number": task_order.number}
|
||||
request = RequestFactory.create()
|
||||
Requests.update_financial_verification(request.id, financial_data)
|
||||
assert request.task_order == task_order
|
||||
@ -142,4 +154,3 @@ def test_update_financial_verification_with_invalid_task_order():
|
||||
request = RequestFactory.create()
|
||||
Requests.update_financial_verification(request.id, request_financial_data)
|
||||
assert not request.task_order
|
||||
|
||||
|
@ -16,7 +16,9 @@ def test_can_get_task_order():
|
||||
|
||||
|
||||
def test_can_get_task_order_from_eda(monkeypatch):
|
||||
monkeypatch.setattr("atst.domain.task_orders.TaskOrders._client", lambda: MockEDAClient())
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.task_orders.TaskOrders._client", lambda: MockEDAClient()
|
||||
)
|
||||
to = TaskOrders.get(MockEDAClient.MOCK_CONTRACT_NUMBER)
|
||||
|
||||
assert to.number == MockEDAClient.MOCK_CONTRACT_NUMBER
|
||||
@ -29,6 +31,8 @@ def test_nonexistent_task_order_raises_without_client():
|
||||
|
||||
|
||||
def test_nonexistent_task_order_raises_with_client(monkeypatch):
|
||||
monkeypatch.setattr("atst.domain.task_orders.TaskOrders._client", lambda: MockEDAClient())
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.task_orders.TaskOrders._client", lambda: MockEDAClient()
|
||||
)
|
||||
with pytest.raises(NotFoundError):
|
||||
TaskOrders.get("some other fake numer")
|
||||
|
@ -30,13 +30,16 @@ def test_date_insane_format():
|
||||
form.date._value()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input_,expected", [
|
||||
("", []),
|
||||
("hello", ["hello"]),
|
||||
("hello\n", ["hello"]),
|
||||
("hello\nworld", ["hello", "world"]),
|
||||
("hello\nworld\n", ["hello", "world"])
|
||||
])
|
||||
@pytest.mark.parametrize(
|
||||
"input_,expected",
|
||||
[
|
||||
("", []),
|
||||
("hello", ["hello"]),
|
||||
("hello\n", ["hello"]),
|
||||
("hello\nworld", ["hello", "world"]),
|
||||
("hello\nworld\n", ["hello", "world"]),
|
||||
],
|
||||
)
|
||||
def test_newline_list_process(input_, expected):
|
||||
form_data = ImmutableMultiDict({"newline_list": input_})
|
||||
form = NewlineListForm(form_data)
|
||||
@ -45,11 +48,10 @@ def test_newline_list_process(input_, expected):
|
||||
assert form.data == {"newline_list": expected}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input_,expected", [
|
||||
([], ""),
|
||||
(["hello"], "hello"),
|
||||
(["hello", "world"], "hello\nworld")
|
||||
])
|
||||
@pytest.mark.parametrize(
|
||||
"input_,expected",
|
||||
[([], ""), (["hello"], "hello"), (["hello", "world"], "hello\nworld")],
|
||||
)
|
||||
def test_newline_list_value(input_, expected):
|
||||
form_data = {"newline_list": input_}
|
||||
form = NewlineListForm(data=form_data)
|
||||
|
@ -4,45 +4,47 @@ from atst.forms.financial import suggest_pe_id, FinancialForm, ExtendedFinancial
|
||||
from atst.eda_client import MockEDAClient
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input_,expected", [
|
||||
('0603502N', None),
|
||||
('0603502NZ', None),
|
||||
('603502N', '0603502N'),
|
||||
('063502N', '0603502N'),
|
||||
('63502N', '0603502N'),
|
||||
])
|
||||
@pytest.mark.parametrize(
|
||||
"input_,expected",
|
||||
[
|
||||
("0603502N", None),
|
||||
("0603502NZ", None),
|
||||
("603502N", "0603502N"),
|
||||
("063502N", "0603502N"),
|
||||
("63502N", "0603502N"),
|
||||
],
|
||||
)
|
||||
def test_suggest_pe_id(input_, expected):
|
||||
assert suggest_pe_id(input_) == expected
|
||||
|
||||
|
||||
def test_funding_type_other_not_required_if_funding_type_is_not_other():
|
||||
form_data = {
|
||||
"funding_type": "PROC"
|
||||
}
|
||||
form_data = {"funding_type": "PROC"}
|
||||
form = ExtendedFinancialForm(data=form_data)
|
||||
form.validate()
|
||||
assert "funding_type_other" not in form.errors
|
||||
|
||||
|
||||
def test_funding_type_other_required_if_funding_type_is_other():
|
||||
form_data = {
|
||||
"funding_type": "OTHER"
|
||||
}
|
||||
form_data = {"funding_type": "OTHER"}
|
||||
form = ExtendedFinancialForm(data=form_data)
|
||||
form.validate()
|
||||
assert "funding_type_other" in form.errors
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input_,expected", [
|
||||
("1234", True),
|
||||
("123456", True),
|
||||
("0001234", True),
|
||||
("000123456", True),
|
||||
("12345", False),
|
||||
("00012345", False),
|
||||
("0001234567", False),
|
||||
("000000", False),
|
||||
])
|
||||
@pytest.mark.parametrize(
|
||||
"input_,expected",
|
||||
[
|
||||
("1234", True),
|
||||
("123456", True),
|
||||
("0001234", True),
|
||||
("000123456", True),
|
||||
("12345", False),
|
||||
("00012345", False),
|
||||
("0001234567", False),
|
||||
("000000", False),
|
||||
],
|
||||
)
|
||||
def test_treasury_code_validation(input_, expected):
|
||||
form_data = {"treasury_code": input_}
|
||||
form = FinancialForm(data=form_data)
|
||||
@ -52,15 +54,18 @@ def test_treasury_code_validation(input_, expected):
|
||||
assert is_valid == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input_,expected", [
|
||||
("12", True),
|
||||
("00012", True),
|
||||
("12A", True),
|
||||
("000123", True),
|
||||
("00012A", True),
|
||||
("0001", False),
|
||||
("00012AB", False),
|
||||
])
|
||||
@pytest.mark.parametrize(
|
||||
"input_,expected",
|
||||
[
|
||||
("12", True),
|
||||
("00012", True),
|
||||
("12A", True),
|
||||
("000123", True),
|
||||
("00012A", True),
|
||||
("0001", False),
|
||||
("00012AB", False),
|
||||
],
|
||||
)
|
||||
def test_ba_code_validation(input_, expected):
|
||||
form_data = {"ba_code": input_}
|
||||
form = FinancialForm(data=form_data)
|
||||
@ -69,16 +74,21 @@ def test_ba_code_validation(input_, expected):
|
||||
|
||||
assert is_valid == expected
|
||||
|
||||
|
||||
def test_task_order_number_validation(monkeypatch):
|
||||
monkeypatch.setattr("atst.domain.task_orders.TaskOrders._client", lambda: MockEDAClient())
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.task_orders.TaskOrders._client", lambda: MockEDAClient()
|
||||
)
|
||||
monkeypatch.setattr("atst.forms.financial.validate_pe_id", lambda *args: True)
|
||||
form_invalid = FinancialForm(data={"task_order_number": "1234"})
|
||||
form_invalid.perform_extra_validation({})
|
||||
|
||||
assert "task_order_number" in form_invalid.errors
|
||||
|
||||
form_valid = FinancialForm(data={"task_order_number": MockEDAClient.MOCK_CONTRACT_NUMBER}, eda_client=MockEDAClient())
|
||||
form_valid = FinancialForm(
|
||||
data={"task_order_number": MockEDAClient.MOCK_CONTRACT_NUMBER},
|
||||
eda_client=MockEDAClient(),
|
||||
)
|
||||
form_valid.perform_extra_validation({})
|
||||
|
||||
assert "task_order_number" not in form_valid.errors
|
||||
|
||||
|
@ -5,7 +5,6 @@ from atst.forms.validators import Alphabet, IsNumber, PhoneNumber
|
||||
|
||||
|
||||
class TestIsNumber:
|
||||
|
||||
@pytest.mark.parametrize("valid", ["0", "12", "-12"])
|
||||
def test_IsNumber_accepts_integers(self, valid, dummy_form, dummy_field):
|
||||
validator = IsNumber()
|
||||
@ -21,24 +20,18 @@ class TestIsNumber:
|
||||
|
||||
|
||||
class TestPhoneNumber:
|
||||
|
||||
@pytest.mark.parametrize("valid", [
|
||||
"12345",
|
||||
"1234567890",
|
||||
"(123) 456-7890",
|
||||
])
|
||||
@pytest.mark.parametrize("valid", ["12345", "1234567890", "(123) 456-7890"])
|
||||
def test_PhoneNumber_accepts_valid_numbers(self, valid, dummy_form, dummy_field):
|
||||
validator = PhoneNumber()
|
||||
dummy_field.data = valid
|
||||
validator(dummy_form, dummy_field)
|
||||
|
||||
@pytest.mark.parametrize("invalid", [
|
||||
"1234",
|
||||
"123456",
|
||||
"1234567abc",
|
||||
"(123) 456-789012",
|
||||
])
|
||||
def test_PhoneNumber_rejects_invalid_numbers(self, invalid, dummy_form, dummy_field):
|
||||
@pytest.mark.parametrize(
|
||||
"invalid", ["1234", "123456", "1234567abc", "(123) 456-789012"]
|
||||
)
|
||||
def test_PhoneNumber_rejects_invalid_numbers(
|
||||
self, invalid, dummy_form, dummy_field
|
||||
):
|
||||
validator = PhoneNumber()
|
||||
dummy_field.data = invalid
|
||||
with pytest.raises(ValidationError):
|
||||
@ -46,7 +39,6 @@ class TestPhoneNumber:
|
||||
|
||||
|
||||
class TestAlphabet:
|
||||
|
||||
@pytest.mark.parametrize("valid", ["a", "abcde"])
|
||||
def test_Alphabet_accepts_letters(self, valid, dummy_form, dummy_field):
|
||||
validator = Alphabet()
|
||||
|
@ -69,6 +69,7 @@ def test_request_status_pending_deleted_displayname():
|
||||
|
||||
assert request.status_displayname == "Canceled"
|
||||
|
||||
|
||||
def test_annual_spend():
|
||||
request = RequestFactory.create()
|
||||
monthly = request.body.get("details_of_use").get("estimated_monthly_spend")
|
||||
|
@ -22,7 +22,7 @@ class TestPENumberInForm:
|
||||
"office_cor": "WHS",
|
||||
"uii_ids": "1234",
|
||||
"treasury_code": "00123456",
|
||||
"ba_code": "024A"
|
||||
"ba_code": "024A",
|
||||
}
|
||||
extended_data = {
|
||||
"funding_type": "RDTE",
|
||||
@ -36,8 +36,12 @@ class TestPENumberInForm:
|
||||
}
|
||||
|
||||
def _set_monkeypatches(self, monkeypatch):
|
||||
monkeypatch.setattr("atst.forms.financial.FinancialForm.validate", lambda s: True)
|
||||
monkeypatch.setattr("atst.domain.auth.get_current_user", lambda *args: MOCK_USER)
|
||||
monkeypatch.setattr(
|
||||
"atst.forms.financial.FinancialForm.validate", lambda s: True
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.auth.get_current_user", lambda *args: MOCK_USER
|
||||
)
|
||||
|
||||
def submit_data(self, client, data, extended=False):
|
||||
request = RequestFactory.create(body=MOCK_REQUEST.body)
|
||||
@ -64,7 +68,7 @@ class TestPENumberInForm:
|
||||
self._set_monkeypatches(monkeypatch)
|
||||
|
||||
data = dict(self.required_data)
|
||||
data['pe_id'] = MOCK_REQUEST.body['financial_verification']['pe_id']
|
||||
data["pe_id"] = MOCK_REQUEST.body["financial_verification"]["pe_id"]
|
||||
|
||||
response = self.submit_data(client, data)
|
||||
|
||||
@ -76,7 +80,7 @@ class TestPENumberInForm:
|
||||
pe = PENumberFactory.create(number="8675309U", description="sample PE number")
|
||||
|
||||
data = dict(self.required_data)
|
||||
data['pe_id'] = pe.number
|
||||
data["pe_id"] = pe.number
|
||||
|
||||
response = self.submit_data(client, data)
|
||||
|
||||
@ -87,32 +91,36 @@ class TestPENumberInForm:
|
||||
self._set_monkeypatches(monkeypatch)
|
||||
|
||||
data = dict(self.required_data)
|
||||
data['pe_id'] = ''
|
||||
data["pe_id"] = ""
|
||||
|
||||
response = self.submit_data(client, data)
|
||||
|
||||
assert "There were some errors" in response.data.decode()
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_submit_financial_form_with_invalid_task_order(self, monkeypatch, user_session, client):
|
||||
def test_submit_financial_form_with_invalid_task_order(
|
||||
self, monkeypatch, user_session, client
|
||||
):
|
||||
monkeypatch.setattr("atst.domain.requests.Requests.get", lambda i: MOCK_REQUEST)
|
||||
user_session()
|
||||
|
||||
data = dict(self.required_data)
|
||||
data['pe_id'] = MOCK_REQUEST.body['financial_verification']['pe_id']
|
||||
data['task_order_number'] = '1234'
|
||||
data["pe_id"] = MOCK_REQUEST.body["financial_verification"]["pe_id"]
|
||||
data["task_order_number"] = "1234"
|
||||
|
||||
response = self.submit_data(client, data)
|
||||
|
||||
assert "enter TO information manually" in response.data.decode()
|
||||
|
||||
def test_submit_financial_form_with_valid_task_order(self, monkeypatch, user_session, client):
|
||||
def test_submit_financial_form_with_valid_task_order(
|
||||
self, monkeypatch, user_session, client
|
||||
):
|
||||
monkeypatch.setattr("atst.domain.requests.Requests.get", lambda i: MOCK_REQUEST)
|
||||
user_session()
|
||||
|
||||
data = dict(self.required_data)
|
||||
data['pe_id'] = MOCK_REQUEST.body['financial_verification']['pe_id']
|
||||
data['task_order_number'] = MockEDAClient.MOCK_CONTRACT_NUMBER
|
||||
data["pe_id"] = MOCK_REQUEST.body["financial_verification"]["pe_id"]
|
||||
data["task_order_number"] = MockEDAClient.MOCK_CONTRACT_NUMBER
|
||||
|
||||
response = self.submit_data(client, data)
|
||||
|
||||
@ -122,9 +130,9 @@ class TestPENumberInForm:
|
||||
monkeypatch.setattr("atst.domain.requests.Requests.get", lambda i: MOCK_REQUEST)
|
||||
user_session()
|
||||
|
||||
data = { **self.required_data, **self.extended_data }
|
||||
data['pe_id'] = MOCK_REQUEST.body['financial_verification']['pe_id']
|
||||
data['task_order_number'] = "1234567"
|
||||
data = {**self.required_data, **self.extended_data}
|
||||
data["pe_id"] = MOCK_REQUEST.body["financial_verification"]["pe_id"]
|
||||
data["task_order_number"] = "1234567"
|
||||
|
||||
response = self.submit_data(client, data, extended=True)
|
||||
|
||||
|
@ -8,6 +8,7 @@ from tests.assert_util import dict_contains
|
||||
|
||||
ERROR_CLASS = "alert--error"
|
||||
|
||||
|
||||
def test_submit_invalid_request_form(monkeypatch, client, user_session):
|
||||
user_session()
|
||||
response = client.post(
|
||||
@ -35,7 +36,9 @@ def test_owner_can_view_request(client, user_session):
|
||||
user_session(user)
|
||||
request = RequestFactory.create(creator=user)
|
||||
|
||||
response = client.get("/requests/new/1/{}".format(request.id), follow_redirects=True)
|
||||
response = client.get(
|
||||
"/requests/new/1/{}".format(request.id), follow_redirects=True
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
@ -45,7 +48,9 @@ def test_non_owner_cannot_view_request(client, user_session):
|
||||
user_session(user)
|
||||
request = RequestFactory.create()
|
||||
|
||||
response = client.get("/requests/new/1/{}".format(request.id), follow_redirects=True)
|
||||
response = client.get(
|
||||
"/requests/new/1/{}".format(request.id), follow_redirects=True
|
||||
)
|
||||
|
||||
assert response.status_code == 404
|
||||
|
||||
@ -56,7 +61,9 @@ def test_ccpo_can_view_request(client, user_session):
|
||||
user_session(user)
|
||||
request = RequestFactory.create()
|
||||
|
||||
response = client.get("/requests/new/1/{}".format(request.id), follow_redirects=True)
|
||||
response = client.get(
|
||||
"/requests/new/1/{}".format(request.id), follow_redirects=True
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
@ -80,7 +87,9 @@ def test_creator_info_is_autopopulated(monkeypatch, client, user_session):
|
||||
assert "initial-value='{}'".format(user.email) in body
|
||||
|
||||
|
||||
def test_creator_info_is_autopopulated_for_new_request(monkeypatch, client, user_session):
|
||||
def test_creator_info_is_autopopulated_for_new_request(
|
||||
monkeypatch, client, user_session
|
||||
):
|
||||
user = UserFactory.create()
|
||||
user_session(user)
|
||||
|
||||
@ -103,6 +112,7 @@ def test_non_creator_info_is_not_autopopulated(monkeypatch, client, user_session
|
||||
assert not user.last_name in body
|
||||
assert not user.email in body
|
||||
|
||||
|
||||
def test_am_poc_causes_poc_to_be_autopopulated(client, user_session):
|
||||
creator = UserFactory.create()
|
||||
user_session(creator)
|
||||
@ -124,7 +134,7 @@ def test_not_am_poc_requires_poc_info_to_be_completed(client, user_session):
|
||||
"/requests/new/3/{}".format(request.id),
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data="am_poc=no",
|
||||
follow_redirects=True
|
||||
follow_redirects=True,
|
||||
)
|
||||
assert ERROR_CLASS in response.data.decode()
|
||||
|
||||
@ -156,7 +166,7 @@ def test_poc_details_can_be_autopopulated_on_new_request(client, user_session):
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data="am_poc=yes",
|
||||
)
|
||||
request_id = response.headers["Location"].split('/')[-1]
|
||||
request_id = response.headers["Location"].split("/")[-1]
|
||||
request = Requests.get(request_id)
|
||||
|
||||
assert request.body["primary_poc"]["dodid_poc"] == creator.dod_id
|
||||
@ -165,27 +175,31 @@ def test_poc_details_can_be_autopopulated_on_new_request(client, user_session):
|
||||
def test_poc_autofill_checks_information_about_you_form_first(client, user_session):
|
||||
creator = UserFactory.create()
|
||||
user_session(creator)
|
||||
request = RequestFactory.create(creator=creator, body={
|
||||
"information_about_you": {
|
||||
"fname_request": "Alice",
|
||||
"lname_request": "Adams",
|
||||
"email_request": "alice.adams@mail.mil"
|
||||
}
|
||||
})
|
||||
poc_input = {
|
||||
"am_poc": "yes",
|
||||
}
|
||||
request = RequestFactory.create(
|
||||
creator=creator,
|
||||
body={
|
||||
"information_about_you": {
|
||||
"fname_request": "Alice",
|
||||
"lname_request": "Adams",
|
||||
"email_request": "alice.adams@mail.mil",
|
||||
}
|
||||
},
|
||||
)
|
||||
poc_input = {"am_poc": "yes"}
|
||||
client.post(
|
||||
"/requests/new/3/{}".format(request.id),
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data=urlencode(poc_input),
|
||||
)
|
||||
request = Requests.get(request.id)
|
||||
assert dict_contains(request.body["primary_poc"], {
|
||||
"fname_poc": "Alice",
|
||||
"lname_poc": "Adams",
|
||||
"email_poc": "alice.adams@mail.mil"
|
||||
})
|
||||
assert dict_contains(
|
||||
request.body["primary_poc"],
|
||||
{
|
||||
"fname_poc": "Alice",
|
||||
"lname_poc": "Adams",
|
||||
"email_poc": "alice.adams@mail.mil",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def test_can_review_data(user_session, client):
|
||||
|
@ -28,11 +28,16 @@ def test_submit_autoapproved_reviewed_request(monkeypatch, client, user_session)
|
||||
user_session()
|
||||
monkeypatch.setattr("atst.domain.requests.Requests.get", _mock_func)
|
||||
monkeypatch.setattr("atst.domain.requests.Requests.submit", _mock_func)
|
||||
monkeypatch.setattr("atst.models.request.Request.status", RequestStatus.PENDING_FINANCIAL_VERIFICATION)
|
||||
monkeypatch.setattr(
|
||||
"atst.models.request.Request.status",
|
||||
RequestStatus.PENDING_FINANCIAL_VERIFICATION,
|
||||
)
|
||||
response = client.post(
|
||||
"/requests/submit/1",
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data="",
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert "/requests?modal=pendingFinancialVerification" in response.headers["Location"]
|
||||
assert (
|
||||
"/requests?modal=pendingFinancialVerification" in response.headers["Location"]
|
||||
)
|
||||
|
@ -15,8 +15,13 @@ def _fetch_user_info(c, t):
|
||||
|
||||
|
||||
def test_successful_login_redirect_non_ccpo(client, monkeypatch):
|
||||
monkeypatch.setattr("atst.domain.authnid.AuthenticationContext.authenticate", lambda *args: True)
|
||||
monkeypatch.setattr("atst.domain.authnid.AuthenticationContext.get_user", lambda *args: UserFactory.create())
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.authnid.AuthenticationContext.authenticate", lambda *args: True
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.authnid.AuthenticationContext.get_user",
|
||||
lambda *args: UserFactory.create(),
|
||||
)
|
||||
|
||||
resp = client.get(
|
||||
"/login-redirect",
|
||||
@ -31,10 +36,16 @@ def test_successful_login_redirect_non_ccpo(client, monkeypatch):
|
||||
assert "requests" in resp.headers["Location"]
|
||||
assert session["user_id"]
|
||||
|
||||
|
||||
def test_successful_login_redirect_ccpo(client, monkeypatch):
|
||||
monkeypatch.setattr("atst.domain.authnid.AuthenticationContext.authenticate", lambda *args: True)
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.authnid.AuthenticationContext.authenticate", lambda *args: True
|
||||
)
|
||||
role = Roles.get("ccpo")
|
||||
monkeypatch.setattr("atst.domain.authnid.AuthenticationContext.get_user", lambda *args: UserFactory.create(atat_role=role))
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.authnid.AuthenticationContext.get_user",
|
||||
lambda *args: UserFactory.create(atat_role=role),
|
||||
)
|
||||
|
||||
resp = client.get(
|
||||
"/login-redirect",
|
||||
@ -114,7 +125,9 @@ def test_crl_validation_on_login(client):
|
||||
|
||||
|
||||
def test_creates_new_user_on_login(monkeypatch, client):
|
||||
monkeypatch.setattr("atst.domain.authnid.AuthenticationContext.authenticate", lambda *args: True)
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.authnid.AuthenticationContext.authenticate", lambda *args: True
|
||||
)
|
||||
cert_file = open("tests/fixtures/{}.crt".format(FIXTURE_EMAIL_ADDRESS)).read()
|
||||
|
||||
# ensure user does not exist
|
||||
|
@ -3,15 +3,18 @@ from atst.eda_client import MockEDAClient
|
||||
|
||||
client = MockEDAClient()
|
||||
|
||||
|
||||
def test_list_contracts():
|
||||
results = client.list_contracts()
|
||||
assert len(results) == 3
|
||||
|
||||
|
||||
def test_get_contract():
|
||||
result = client.get_contract("DCA10096D0052", "y")
|
||||
assert result["contract_no"] == "DCA10096D0052"
|
||||
assert result["amount"] == 2000000
|
||||
|
||||
|
||||
def test_contract_not_found():
|
||||
result = client.get_contract("abc", "y")
|
||||
assert result is None
|
||||
|
@ -3,13 +3,15 @@ import pytest
|
||||
from atst.filters import dollars
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input,expected", [
|
||||
('0', '$0'),
|
||||
('123.00', '$123'),
|
||||
('1234567', '$1,234,567'),
|
||||
('-1234', '$-1,234'),
|
||||
('one', '$0'),
|
||||
])
|
||||
@pytest.mark.parametrize(
|
||||
"input,expected",
|
||||
[
|
||||
("0", "$0"),
|
||||
("123.00", "$123"),
|
||||
("1234567", "$1,234,567"),
|
||||
("-1234", "$-1,234"),
|
||||
("one", "$0"),
|
||||
],
|
||||
)
|
||||
def test_dollar_fomatter(input, expected):
|
||||
assert dollars(input) == expected
|
||||
|
||||
|
@ -1,6 +1,9 @@
|
||||
import pytest
|
||||
|
||||
@pytest.mark.parametrize("path", (
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path",
|
||||
(
|
||||
"/",
|
||||
"/home",
|
||||
"/workspaces",
|
||||
@ -9,7 +12,8 @@ import pytest
|
||||
"/users",
|
||||
"/reports",
|
||||
"/calculator",
|
||||
))
|
||||
),
|
||||
)
|
||||
def test_routes(path, client, user_session):
|
||||
user_session()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user