diff --git a/atst/domain/exceptions.py b/atst/domain/exceptions.py index 802997d4..b92e64c1 100644 --- a/atst/domain/exceptions.py +++ b/atst/domain/exceptions.py @@ -14,3 +14,17 @@ class AlreadyExistsError(Exception): @property def message(self): return "{} already exists".format(self.resource_name) + + +class UnauthorizedError(Exception): + def __init__(self, user, action): + self.user = user + self.action = action + + @property + def message(self): + return "User {} not authorized to {}".format(self.user.id, self.action) + + +class UnauthenticatedError(Exception): + pass diff --git a/atst/routes/__init__.py b/atst/routes/__init__.py index 4e346fa9..965b4b37 100644 --- a/atst/routes/__init__.py +++ b/atst/routes/__init__.py @@ -1,10 +1,11 @@ -from flask import Blueprint, abort, render_template, g, redirect, session, url_for, request +from flask import Blueprint, render_template, g, redirect, session, url_for, request from flask import current_app as app import pendulum from atst.domain.requests import Requests from atst.domain.users import Users from atst.domain.authnid.utils import parse_sdn +from atst.domain.exceptions import UnauthenticatedError bp = Blueprint("atst", __name__) @@ -29,6 +30,9 @@ def catch_all(path): return render_template("{}.html".format(path)) +# TODO: this should be partly consolidated into a domain function that takes +# all the necessary UWSGI environment values as args and either returns a user +# or raises the UnauthenticatedError @bp.route('/login-redirect') def login_redirect(): if request.environ.get('HTTP_X_SSL_CLIENT_VERIFY') == 'SUCCESS' and _is_valid_certificate(request): @@ -39,7 +43,7 @@ def login_redirect(): return redirect(url_for("atst.home")) else: - return abort(401) + raise UnauthenticatedError() def _is_valid_certificate(request): diff --git a/atst/routes/errors.py b/atst/routes/errors.py index 261c654c..0d0211b8 100644 --- a/atst/routes/errors.py +++ b/atst/routes/errors.py @@ -1,13 +1,19 @@ from flask import render_template +import atst.domain.exceptions as exceptions + def make_error_pages(app): - @app.errorhandler(404) + @app.errorhandler(exceptions.NotFoundError) + @app.errorhandler(exceptions.UnauthorizedError) + # pylint: disable=unused-variable def not_found(e): return render_template("not_found.html"), 404 - @app.errorhandler(401) + @app.errorhandler(exceptions.UnauthenticatedError) + # pylint: disable=unused-variable def unauthorized(e): return render_template('unauthorized.html'), 401 + return app diff --git a/atst/routes/requests/requests_form.py b/atst/routes/requests/requests_form.py index 688a4ce0..ee03c155 100644 --- a/atst/routes/requests/requests_form.py +++ b/atst/routes/requests/requests_form.py @@ -1,9 +1,10 @@ -from flask import abort, g, redirect, render_template, url_for, request as http_request +from flask import g, redirect, render_template, url_for, request as http_request from . import requests_bp from atst.domain.requests import Requests from atst.routes.requests.jedi_request_flow import JEDIRequestFlow from atst.models.permissions import Permissions +from atst.domain.exceptions import UnauthorizedError @requests_bp.route("/requests/new/", methods=["GET"]) @@ -26,8 +27,8 @@ def requests_form_new(screen): ) @requests_bp.route("/requests/new//", methods=["GET"]) def requests_form_update(screen=1, request_id=None): - if request_id and not _can_view_request(request_id): - abort(404) + if request_id: + _check_can_view_request(request_id) request = Requests.get(request_id) if request_id is not None else None jedi_flow = JEDIRequestFlow(screen, request, request_id=request_id) @@ -107,8 +108,11 @@ def requests_submit(request_id=None): # TODO: generalize this, along with other authorizations, into a policy-pattern # for authorization in the application -def _can_view_request(request_id): - return ( - Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST in g.current_user.atat_permissions - or Requests.is_creator(request_id, g.current_user.id) - ) +def _check_can_view_request(request_id): + if Permissions.REVIEW_AND_APPROVE_JEDI_WORKSPACE_REQUEST in g.current_user.atat_permissions: + pass + elif Requests.is_creator(request_id, g.current_user.id): + pass + else: + raise UnauthorizedError(g.current_user, "view request {}".format(request_id)) +