Merge pull request #744 from dod-ccpo/record-portfolio-permission-changes

record portfolio_role permission_set changes with bulk_replace event
This commit is contained in:
dandds 2019-04-08 11:03:36 -04:00 committed by GitHub
commit 80254c6b8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 38 additions and 14 deletions

View File

@ -11,14 +11,15 @@ ACTION_DELETE = "delete"
class AuditableMixin(object):
@staticmethod
def create_audit_event(connection, resource, action):
def create_audit_event(connection, resource, action, changed_state=None):
user_id = getattr_path(g, "current_user.id")
portfolio_id = resource.portfolio_id
resource_type = resource.resource_type
display_name = resource.displayname
event_details = resource.event_details
changed_state = resource.history if action == ACTION_UPDATE else None
if changed_state is None:
changed_state = resource.history if action == ACTION_UPDATE else None
audit_event = AuditEvent(
user_id=user_id,

View File

@ -1,5 +1,5 @@
from enum import Enum
from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table
from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table, event
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
@ -11,6 +11,7 @@ from atst.utils import first_or_none
from atst.models.environment_role import EnvironmentRole
from atst.models.application import Application
from atst.models.environment import Environment
from atst.models.mixins.auditable import ACTION_UPDATE as AUDIT_ACTION_UPDATE
MEMBER_STATUSES = {
@ -68,7 +69,6 @@ class PortfolioRole(
def history(self):
previous_state = self.get_changes()
change_set = {}
# TODO: need to update to include permission_sets
if "status" in previous_state:
from_status = previous_state["status"][0].value
to_status = self.status.value
@ -166,3 +166,19 @@ Index(
PortfolioRole.portfolio_id,
unique=True,
)
@event.listens_for(PortfolioRole.permission_sets, "bulk_replace", raw=True)
def record_permission_sets_updates(instance_state, permission_sets, initiator):
old_perm_sets = instance_state.attrs.get("permission_sets").value
if instance_state.persistent and old_perm_sets != permission_sets:
connection = instance_state.session.connection()
old_state = [p.name for p in old_perm_sets]
new_state = [p.name for p in permission_sets]
changed_state = {"permission_sets": (old_state, new_state)}
instance_state.object.create_audit_event(
connection,
instance_state.object,
AUDIT_ACTION_UPDATE,
changed_state=changed_state,
)

View File

@ -36,6 +36,7 @@ PORTFOLIO_USERS = [
"email": "knight@mil.gov",
"portfolio_role": "developer",
"dod_id": "0000000001",
"permission_sets": PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS,
},
{
"first_name": "Mario",
@ -43,6 +44,7 @@ PORTFOLIO_USERS = [
"email": "hudson@mil.gov",
"portfolio_role": "billing_auditor",
"dod_id": "0000000002",
"permission_sets": PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS,
},
{
"first_name": "Louise",
@ -50,6 +52,7 @@ PORTFOLIO_USERS = [
"email": "greer@mil.gov",
"portfolio_role": "admin",
"dod_id": "0000000003",
"permission_sets": PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS,
},
]

View File

@ -3,6 +3,7 @@ import datetime
from atst.domain.environments import Environments
from atst.domain.portfolios import Portfolios
from atst.domain.portfolio_roles import PortfolioRoles
from atst.domain.applications import Applications
from atst.domain.permission_sets import PermissionSets
from atst.models.portfolio_role import Status
@ -38,28 +39,31 @@ def test_has_no_portfolio_role_history(session):
assert not create_event.changed_state
@pytest.mark.skip(reason="need to update audit log permission set handling")
def test_has_portfolio_role_history(session):
owner = UserFactory.create()
user = UserFactory.create()
portfolio = PortfolioFactory.create(owner=owner)
role = session.query(Role).filter(Role.name == "developer").one()
# in order to get the history, we don't want the PortfolioRoleFactory
# to commit after create()
PortfolioRoleFactory._meta.sqlalchemy_session_persistence = "flush"
portfolio_role = PortfolioRoleFactory.create(portfolio=portfolio, user=user)
PortfolioRoles.update_role(portfolio_role, "admin")
changed_events = (
# PortfolioRoleFactory._meta.sqlalchemy_session_persistence = "flush"
portfolio_role = PortfolioRoleFactory.create(
portfolio=portfolio, user=user, permission_sets=[]
)
PortfolioRoles.update(
portfolio_role, PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS
)
changed_event = (
session.query(AuditEvent)
.filter(
AuditEvent.resource_id == portfolio_role.id, AuditEvent.action == "update"
)
.all()
.one()
)
# changed_state["role"] returns a list [previous role, current role]
assert changed_events[0].changed_state["role"][0] == "developer"
assert changed_events[0].changed_state["role"][1] == "admin"
old_state, new_state = changed_event.changed_state["permission_sets"]
assert old_state == []
assert set(new_state) == PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS
def test_has_portfolio_status_history(session):