diff --git a/atst/models/mixins/auditable.py b/atst/models/mixins/auditable.py index d8133afe..f0487dad 100644 --- a/atst/models/mixins/auditable.py +++ b/atst/models/mixins/auditable.py @@ -11,14 +11,15 @@ ACTION_DELETE = "delete" class AuditableMixin(object): @staticmethod - def create_audit_event(connection, resource, action): + def create_audit_event(connection, resource, action, changed_state=None): user_id = getattr_path(g, "current_user.id") portfolio_id = resource.portfolio_id resource_type = resource.resource_type display_name = resource.displayname event_details = resource.event_details - changed_state = resource.history if action == ACTION_UPDATE else None + if changed_state is None: + changed_state = resource.history if action == ACTION_UPDATE else None audit_event = AuditEvent( user_id=user_id, diff --git a/atst/models/portfolio_role.py b/atst/models/portfolio_role.py index 73949ce4..c7eb7f2d 100644 --- a/atst/models/portfolio_role.py +++ b/atst/models/portfolio_role.py @@ -1,5 +1,5 @@ from enum import Enum -from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table +from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table, event from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import relationship @@ -11,6 +11,7 @@ from atst.utils import first_or_none from atst.models.environment_role import EnvironmentRole from atst.models.application import Application from atst.models.environment import Environment +from atst.models.mixins.auditable import ACTION_UPDATE as AUDIT_ACTION_UPDATE MEMBER_STATUSES = { @@ -68,7 +69,6 @@ class PortfolioRole( def history(self): previous_state = self.get_changes() change_set = {} - # TODO: need to update to include permission_sets if "status" in previous_state: from_status = previous_state["status"][0].value to_status = self.status.value @@ -166,3 +166,19 @@ Index( PortfolioRole.portfolio_id, unique=True, ) + + +@event.listens_for(PortfolioRole.permission_sets, "bulk_replace", raw=True) +def record_permission_sets_updates(instance_state, permission_sets, initiator): + old_perm_sets = instance_state.attrs.get("permission_sets").value + if instance_state.persistent and old_perm_sets != permission_sets: + connection = instance_state.session.connection() + old_state = [p.name for p in old_perm_sets] + new_state = [p.name for p in permission_sets] + changed_state = {"permission_sets": (old_state, new_state)} + instance_state.object.create_audit_event( + connection, + instance_state.object, + AUDIT_ACTION_UPDATE, + changed_state=changed_state, + ) diff --git a/tests/models/test_portfolio_role.py b/tests/models/test_portfolio_role.py index 7747a5c6..3d9ac0ec 100644 --- a/tests/models/test_portfolio_role.py +++ b/tests/models/test_portfolio_role.py @@ -3,6 +3,7 @@ import datetime from atst.domain.environments import Environments from atst.domain.portfolios import Portfolios +from atst.domain.portfolio_roles import PortfolioRoles from atst.domain.applications import Applications from atst.domain.permission_sets import PermissionSets from atst.models.portfolio_role import Status @@ -38,28 +39,31 @@ def test_has_no_portfolio_role_history(session): assert not create_event.changed_state -@pytest.mark.skip(reason="need to update audit log permission set handling") def test_has_portfolio_role_history(session): owner = UserFactory.create() user = UserFactory.create() portfolio = PortfolioFactory.create(owner=owner) - role = session.query(Role).filter(Role.name == "developer").one() # in order to get the history, we don't want the PortfolioRoleFactory # to commit after create() - PortfolioRoleFactory._meta.sqlalchemy_session_persistence = "flush" - portfolio_role = PortfolioRoleFactory.create(portfolio=portfolio, user=user) - PortfolioRoles.update_role(portfolio_role, "admin") - changed_events = ( + # PortfolioRoleFactory._meta.sqlalchemy_session_persistence = "flush" + portfolio_role = PortfolioRoleFactory.create( + portfolio=portfolio, user=user, permission_sets=[] + ) + PortfolioRoles.update( + portfolio_role, PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS + ) + + changed_event = ( session.query(AuditEvent) .filter( AuditEvent.resource_id == portfolio_role.id, AuditEvent.action == "update" ) - .all() + .one() ) - # changed_state["role"] returns a list [previous role, current role] - assert changed_events[0].changed_state["role"][0] == "developer" - assert changed_events[0].changed_state["role"][1] == "admin" + old_state, new_state = changed_event.changed_state["permission_sets"] + assert old_state == [] + assert set(new_state) == PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS def test_has_portfolio_status_history(session):