log access attempts in access decorator
This commit is contained in:
parent
4a0dd2d432
commit
dff72422f0
@ -1,10 +1,11 @@
|
||||
from functools import wraps
|
||||
|
||||
from flask import g
|
||||
from flask import g, current_app as app, request
|
||||
|
||||
from . import user_can_access
|
||||
from atst.domain.portfolios import Portfolios
|
||||
from atst.domain.task_orders import TaskOrders
|
||||
from atst.domain.exceptions import UnauthorizedError
|
||||
|
||||
|
||||
def evaluate_exceptions(user, permission, exceptions, **kwargs):
|
||||
@ -15,28 +16,48 @@ def evaluate_exceptions(user, permission, exceptions, **kwargs):
|
||||
)
|
||||
|
||||
|
||||
def check_access(permission, message, exceptions, *args, **kwargs):
|
||||
access_args = {"message": message}
|
||||
|
||||
if "portfolio_id" in kwargs:
|
||||
access_args["portfolio"] = Portfolios.get(
|
||||
g.current_user, kwargs["portfolio_id"]
|
||||
)
|
||||
elif "task_order_id" in kwargs:
|
||||
task_order = TaskOrders.get(kwargs["task_order_id"])
|
||||
access_args["portfolio"] = task_order.portfolio
|
||||
|
||||
if exceptions and evaluate_exceptions(
|
||||
g.current_user, permission, exceptions, **access_args, **kwargs
|
||||
):
|
||||
return True
|
||||
|
||||
user_can_access(g.current_user, permission, **access_args)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def user_can_access_decorator(permission, message=None, exceptions=None):
|
||||
def decorator(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
access_args = {"message": message}
|
||||
|
||||
if "portfolio_id" in kwargs:
|
||||
access_args["portfolio"] = Portfolios.get(
|
||||
g.current_user, kwargs["portfolio_id"]
|
||||
try:
|
||||
check_access(permission, message, exceptions, *args, **kwargs)
|
||||
app.logger.info(
|
||||
"[access] User {} accessed {}".format(
|
||||
g.current_user.id, g.current_user.dod_id, request.path
|
||||
)
|
||||
)
|
||||
elif "task_order_id" in kwargs:
|
||||
task_order = TaskOrders.get(kwargs["task_order_id"])
|
||||
access_args["portfolio"] = task_order.portfolio
|
||||
|
||||
if exceptions and evaluate_exceptions(
|
||||
g.current_user, permission, exceptions, **access_args, **kwargs
|
||||
):
|
||||
return f(*args, **kwargs)
|
||||
except UnauthorizedError as err:
|
||||
app.logger.warning(
|
||||
"[access] User {} denied access to {}".format(
|
||||
g.current_user.id, g.current_user.dod_id, request.path
|
||||
)
|
||||
)
|
||||
|
||||
user_can_access(g.current_user, permission, **access_args)
|
||||
|
||||
return f(*args, **kwargs)
|
||||
raise (err)
|
||||
|
||||
return decorated_function
|
||||
|
||||
|
@ -14,6 +14,7 @@ from atst.domain.authnid.crl import (
|
||||
)
|
||||
|
||||
from tests.mocks import FIXTURE_EMAIL_ADDRESS, DOD_CN
|
||||
from tests.utils import FakeLogger
|
||||
|
||||
|
||||
class MockX509Store:
|
||||
@ -119,20 +120,6 @@ def test_multistep_certificate_chain():
|
||||
assert cache.crl_check(cert)
|
||||
|
||||
|
||||
class FakeLogger:
|
||||
def __init__(self):
|
||||
self.messages = []
|
||||
|
||||
def info(self, msg):
|
||||
self.messages.append(msg)
|
||||
|
||||
def warning(self, msg):
|
||||
self.messages.append(msg)
|
||||
|
||||
def error(self, msg):
|
||||
self.messages.append(msg)
|
||||
|
||||
|
||||
def test_no_op_crl_cache_logs_common_name():
|
||||
logger = FakeLogger()
|
||||
cert = open("ssl/client-certs/atat.mil.crt", "rb").read()
|
||||
|
@ -12,6 +12,8 @@ from atst.domain.permission_sets import PermissionSets
|
||||
from atst.domain.exceptions import UnauthorizedError
|
||||
from atst.models.permissions import Permissions
|
||||
|
||||
from tests.utils import FakeLogger
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def invalid_user():
|
||||
@ -146,3 +148,41 @@ def test_user_can_access_decorator_exceptions(set_current_user):
|
||||
|
||||
set_current_user(rando_calrissian)
|
||||
assert _edit_portfolio_name(portfolio_id=portfolio.id)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_logger(app):
|
||||
real_logger = app.logger
|
||||
app.logger = FakeLogger()
|
||||
|
||||
yield app.logger
|
||||
|
||||
app.logger = real_logger
|
||||
|
||||
|
||||
def test_user_can_access_decorator_logs_access(
|
||||
set_current_user, monkeypatch, mock_logger
|
||||
):
|
||||
user = UserFactory.create()
|
||||
|
||||
@user_can_access_decorator(Permissions.EDIT_PORTFOLIO_NAME)
|
||||
def _do_something(*args, **kwargs):
|
||||
return True
|
||||
|
||||
set_current_user(user)
|
||||
|
||||
monkeypatch.setattr(
|
||||
"atst.domain.authz.decorator.check_access", lambda *a, **k: True
|
||||
)
|
||||
_do_something()
|
||||
assert len(mock_logger.messages) == 1
|
||||
assert "accessed" in mock_logger.messages[0]
|
||||
|
||||
def _unauthorized(*a, **k):
|
||||
raise UnauthorizedError(user, "do something")
|
||||
|
||||
monkeypatch.setattr("atst.domain.authz.decorator.check_access", _unauthorized)
|
||||
with pytest.raises(UnauthorizedError):
|
||||
_do_something()
|
||||
assert len(mock_logger.messages) == 2
|
||||
assert "denied access" in mock_logger.messages[1]
|
||||
|
@ -14,3 +14,17 @@ def captured_templates(app):
|
||||
yield recorded
|
||||
finally:
|
||||
template_rendered.disconnect(record, app)
|
||||
|
||||
|
||||
class FakeLogger:
|
||||
def __init__(self):
|
||||
self.messages = []
|
||||
|
||||
def info(self, msg):
|
||||
self.messages.append(msg)
|
||||
|
||||
def warning(self, msg):
|
||||
self.messages.append(msg)
|
||||
|
||||
def error(self, msg):
|
||||
self.messages.append(msg)
|
||||
|
Loading…
x
Reference in New Issue
Block a user