Merge branch 'master' into request-creator-name

This commit is contained in:
richard-dds 2018-08-08 15:28:36 -04:00 committed by GitHub
commit 47957384fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 145 additions and 29 deletions

View File

@ -40,7 +40,7 @@ def upgrade():
WHERE id = :id"""
)
db.execute(query, id=status_event["id"], status=status.name)
except ValueError:
except KeyError:
query = sa.text("DELETE FROM request_status_events WHERE id = :id")
db.execute(query, id=status_event["id"])

View File

@ -15,6 +15,7 @@ from atst.routes import bp
from atst.routes.workspaces import bp as workspace_routes
from atst.routes.requests import requests_bp
from atst.routes.dev import bp as dev_routes
from atst.routes.errors import make_error_pages
from atst.domain.authnid.crl.validator import Validator
from atst.domain.auth import apply_authentication
@ -45,6 +46,7 @@ def make_app(config):
Session(app)
assets_environment.init_app(app)
make_error_pages(app)
app.register_blueprint(bp)
app.register_blueprint(workspace_routes)
app.register_blueprint(requests_bp)

View File

@ -14,3 +14,17 @@ class AlreadyExistsError(Exception):
@property
def message(self):
return "{} already exists".format(self.resource_name)
class UnauthorizedError(Exception):
def __init__(self, user, action):
self.user = user
self.action = action
@property
def message(self):
return "User {} not authorized to {}".format(self.user.id, self.action)
class UnauthenticatedError(Exception):
pass

View File

@ -1,4 +1,4 @@
from sqlalchemy import exists, and_
from sqlalchemy import exists, and_, exc
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm.attributes import flag_modified
@ -42,11 +42,14 @@ class Requests(object):
@classmethod
def exists(cls, request_id, creator_id):
try:
return db.session.query(
exists().where(
and_(Request.id == request_id, Request.creator == creator_id)
)
).scalar()
except exc.DataError:
return False
@classmethod
def get(cls, request_id):

View File

@ -5,6 +5,7 @@ import pendulum
from atst.domain.requests import Requests
from atst.domain.users import Users
from atst.domain.authnid.utils import parse_sdn
from atst.domain.exceptions import UnauthenticatedError
bp = Blueprint("atst", __name__)
@ -29,6 +30,9 @@ def catch_all(path):
return render_template("{}.html".format(path))
# TODO: this should be partly consolidated into a domain function that takes
# all the necessary UWSGI environment values as args and either returns a user
# or raises the UnauthenticatedError
@bp.route('/login-redirect')
def login_redirect():
if request.environ.get('HTTP_X_SSL_CLIENT_VERIFY') == 'SUCCESS' and _is_valid_certificate(request):
@ -39,15 +43,7 @@ def login_redirect():
return redirect(url_for("atst.home"))
else:
return redirect(url_for("atst.unauthorized"))
@bp.route("/unauthorized")
def unauthorized():
template = render_template('unauthorized.html')
response = app.make_response(template)
response.status_code = 401
return response
raise UnauthenticatedError()
def _is_valid_certificate(request):

19
atst/routes/errors.py Normal file
View File

@ -0,0 +1,19 @@
from flask import render_template
import atst.domain.exceptions as exceptions
def make_error_pages(app):
@app.errorhandler(exceptions.NotFoundError)
@app.errorhandler(exceptions.UnauthorizedError)
# pylint: disable=unused-variable
def not_found(e):
return render_template("not_found.html"), 404
@app.errorhandler(exceptions.UnauthenticatedError)
# pylint: disable=unused-variable
def unauthorized(e):
return render_template('unauthorized.html'), 401
return app

View File

@ -3,6 +3,8 @@ from flask import g, redirect, render_template, url_for, request as http_request
from . import requests_bp
from atst.domain.requests import Requests
from atst.routes.requests.jedi_request_flow import JEDIRequestFlow
from atst.models.permissions import Permissions
from atst.domain.exceptions import UnauthorizedError
@requests_bp.route("/requests/new/<int:screen>", methods=["GET"])
@ -25,6 +27,9 @@ def requests_form_new(screen):
)
@requests_bp.route("/requests/new/<int:screen>/<string:request_id>", methods=["GET"])
def requests_form_update(screen=1, request_id=None):
if request_id:
_check_can_view_request(request_id)
request = Requests.get(request_id) if request_id is not None else None
jedi_flow = JEDIRequestFlow(screen, request, request_id=request_id)
@ -79,10 +84,12 @@ def requests_update(screen=1, request_id=None):
request_id=jedi_flow.request_id,
)
return redirect(where)
else:
return render_template(
"requests/screen-%d.html" % int(screen), **rerender_args
)
else:
return render_template("requests/screen-%d.html" % int(screen), **rerender_args)
@ -94,5 +101,18 @@ def requests_submit(request_id=None):
if request.status == "approved":
return redirect("/requests?modal=True")
else:
return redirect("/requests")
# TODO: generalize this, along with other authorizations, into a policy-pattern
# for authorization in the application
def _check_can_view_request(request_id):
if Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST in g.current_user.atat_permissions:
pass
elif Requests.exists(request_id, g.current_user.id):
pass
else:
raise UnauthorizedError(g.current_user, "view request {}".format(request_id))

12
templates/not_found.html Normal file
View File

@ -0,0 +1,12 @@
{% extends "error_base.html" %}
{% block content %}
<main class="usa-section usa-content">
<h1>Not Found</h1>
</main>
{% endblock %}

View File

@ -48,3 +48,11 @@ def test_dont_auto_approve_if_no_dollar_value_specified(new_request):
request = Requests.submit(new_request)
assert request.status == RequestStatus.PENDING_CCPO_APPROVAL
def test_exists(session):
user_allowed = UserFactory.create()
user_denied = UserFactory.create()
request = RequestFactory.create(creator=user_allowed.id)
assert Requests.exists(request.id, user_allowed.id)
assert not Requests.exists(request.id, user_denied.id)

View File

@ -1,3 +1,5 @@
import random
import string
import factory
from uuid import uuid4
@ -7,6 +9,8 @@ from atst.models.pe_number import PENumber
from atst.models.task_order import TaskOrder
from atst.models.user import User
from atst.models.role import Role
from atst.models.request_status_event import RequestStatusEvent
from atst.domain.roles import Roles
@ -22,16 +26,20 @@ class UserFactory(factory.alchemy.SQLAlchemyModelFactory):
model = User
id = factory.Sequence(lambda x: uuid4())
email = "fake.user@mail.com"
first_name = "Fake"
last_name = "User"
email = factory.Faker("email")
first_name = factory.Faker("first_name")
last_name = factory.Faker("last_name")
atat_role = factory.SubFactory(RoleFactory)
dod_id = factory.LazyFunction(lambda: "".join(random.choices(string.digits, k=10)))
class RequestStatusFactory(factory.alchemy.SQLAlchemyModelFactory):
class RequestStatusEventFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = RequestStatusEvent
id = factory.Sequence(lambda x: uuid4())
class RequestFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
@ -84,7 +92,6 @@ class RequestFactory(factory.alchemy.SQLAlchemyModelFactory):
}
class PENumberFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = PENumber
@ -93,4 +100,3 @@ class PENumberFactory(factory.alchemy.SQLAlchemyModelFactory):
class TaskOrderFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = TaskOrder

View File

@ -2,7 +2,8 @@ import re
import pytest
import urllib
from tests.mocks import MOCK_USER, MOCK_REQUEST
from tests.factories import RequestFactory
from tests.factories import RequestFactory, UserFactory
from atst.domain.roles import Roles
ERROR_CLASS = "alert--error"
@ -27,3 +28,41 @@ def test_submit_valid_request_form(monkeypatch, client, user_session):
data="meaning=42",
)
assert "/requests/new/2" in response.headers.get("Location")
def test_owner_can_view_request(client, user_session):
user = UserFactory.create()
user_session(user)
request = RequestFactory.create(creator=user.id)
response = client.get("/requests/new/1/{}".format(request.id), follow_redirects=True)
assert response.status_code == 200
def test_non_owner_cannot_view_request(client, user_session):
user = UserFactory.create()
user_session(user)
request = RequestFactory.create()
response = client.get("/requests/new/1/{}".format(request.id), follow_redirects=True)
assert response.status_code == 404
def test_ccpo_can_view_request(client, user_session):
ccpo = Roles.get("ccpo")
user = UserFactory.create(atat_role=ccpo)
user_session(user)
request = RequestFactory.create()
response = client.get("/requests/new/1/{}".format(request.id), follow_redirects=True)
assert response.status_code == 200
def test_nonexistent_request(client, user_session):
user_session()
response = client.get("/requests/new/1/foo", follow_redirects=True)
assert response.status_code == 404

View File

@ -27,8 +27,7 @@ def test_successful_login_redirect(client, monkeypatch):
def test_unsuccessful_login_redirect(client, monkeypatch):
resp = client.get("/login-redirect")
assert resp.status_code == 302
assert "unauthorized" in resp.headers["Location"]
assert resp.status_code == 401
assert "user_id" not in session
@ -55,7 +54,6 @@ def test_routes_are_protected(client, app):
UNPROTECTED_ROUTES = ["/", "/login-dev", "/login-redirect", "/unauthorized"]
# this implicitly relies on the test config and test CRL in tests/fixtures/crl
@ -72,8 +70,7 @@ def test_crl_validation_on_login(client):
"HTTP_X_SSL_CLIENT_CERT": bad_cert.decode(),
},
)
assert resp.status_code == 302
assert "unauthorized" in resp.headers["Location"]
assert resp.status_code == 401
assert "user_id" not in session
# good cert is not on the test CRL, passes