record portfolio_role permission_set changes with bulk_replace event
This commit is contained in:
		| @@ -11,13 +11,14 @@ ACTION_DELETE = "delete" | ||||
|  | ||||
| class AuditableMixin(object): | ||||
|     @staticmethod | ||||
|     def create_audit_event(connection, resource, action): | ||||
|     def create_audit_event(connection, resource, action, changed_state=None): | ||||
|         user_id = getattr_path(g, "current_user.id") | ||||
|         portfolio_id = resource.portfolio_id | ||||
|         resource_type = resource.resource_type | ||||
|         display_name = resource.displayname | ||||
|         event_details = resource.event_details | ||||
|  | ||||
|         if changed_state is None: | ||||
|             changed_state = resource.history if action == ACTION_UPDATE else None | ||||
|  | ||||
|         audit_event = AuditEvent( | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
| from enum import Enum | ||||
| from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table | ||||
| from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table, event | ||||
| from sqlalchemy.dialects.postgresql import UUID | ||||
| from sqlalchemy.orm import relationship | ||||
|  | ||||
| @@ -11,6 +11,7 @@ from atst.utils import first_or_none | ||||
| from atst.models.environment_role import EnvironmentRole | ||||
| from atst.models.application import Application | ||||
| from atst.models.environment import Environment | ||||
| from atst.models.mixins.auditable import ACTION_UPDATE as AUDIT_ACTION_UPDATE | ||||
|  | ||||
|  | ||||
| MEMBER_STATUSES = { | ||||
| @@ -68,7 +69,6 @@ class PortfolioRole( | ||||
|     def history(self): | ||||
|         previous_state = self.get_changes() | ||||
|         change_set = {} | ||||
|         # TODO: need to update to include permission_sets | ||||
|         if "status" in previous_state: | ||||
|             from_status = previous_state["status"][0].value | ||||
|             to_status = self.status.value | ||||
| @@ -166,3 +166,19 @@ Index( | ||||
|     PortfolioRole.portfolio_id, | ||||
|     unique=True, | ||||
| ) | ||||
|  | ||||
|  | ||||
| @event.listens_for(PortfolioRole.permission_sets, "bulk_replace", raw=True) | ||||
| def record_permission_sets_updates(instance_state, permission_sets, initiator): | ||||
|     old_perm_sets = instance_state.attrs.get("permission_sets").value | ||||
|     if instance_state.persistent and old_perm_sets != permission_sets: | ||||
|         connection = instance_state.session.connection() | ||||
|         old_state = [p.name for p in old_perm_sets] | ||||
|         new_state = [p.name for p in permission_sets] | ||||
|         changed_state = {"permission_sets": (old_state, new_state)} | ||||
|         instance_state.object.create_audit_event( | ||||
|             connection, | ||||
|             instance_state.object, | ||||
|             AUDIT_ACTION_UPDATE, | ||||
|             changed_state=changed_state, | ||||
|         ) | ||||
|   | ||||
| @@ -3,6 +3,7 @@ import datetime | ||||
|  | ||||
| from atst.domain.environments import Environments | ||||
| from atst.domain.portfolios import Portfolios | ||||
| from atst.domain.portfolio_roles import PortfolioRoles | ||||
| from atst.domain.applications import Applications | ||||
| from atst.domain.permission_sets import PermissionSets | ||||
| from atst.models.portfolio_role import Status | ||||
| @@ -38,28 +39,31 @@ def test_has_no_portfolio_role_history(session): | ||||
|     assert not create_event.changed_state | ||||
|  | ||||
|  | ||||
| @pytest.mark.skip(reason="need to update audit log permission set handling") | ||||
| def test_has_portfolio_role_history(session): | ||||
|     owner = UserFactory.create() | ||||
|     user = UserFactory.create() | ||||
|  | ||||
|     portfolio = PortfolioFactory.create(owner=owner) | ||||
|     role = session.query(Role).filter(Role.name == "developer").one() | ||||
|     # in order to get the history, we don't want the PortfolioRoleFactory | ||||
|     #  to commit after create() | ||||
|     PortfolioRoleFactory._meta.sqlalchemy_session_persistence = "flush" | ||||
|     portfolio_role = PortfolioRoleFactory.create(portfolio=portfolio, user=user) | ||||
|     PortfolioRoles.update_role(portfolio_role, "admin") | ||||
|     changed_events = ( | ||||
|     # PortfolioRoleFactory._meta.sqlalchemy_session_persistence = "flush" | ||||
|     portfolio_role = PortfolioRoleFactory.create( | ||||
|         portfolio=portfolio, user=user, permission_sets=[] | ||||
|     ) | ||||
|     PortfolioRoles.update( | ||||
|         portfolio_role, PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS | ||||
|     ) | ||||
|  | ||||
|     changed_event = ( | ||||
|         session.query(AuditEvent) | ||||
|         .filter( | ||||
|             AuditEvent.resource_id == portfolio_role.id, AuditEvent.action == "update" | ||||
|         ) | ||||
|         .all() | ||||
|         .one() | ||||
|     ) | ||||
|     # changed_state["role"] returns a list [previous role, current role] | ||||
|     assert changed_events[0].changed_state["role"][0] == "developer" | ||||
|     assert changed_events[0].changed_state["role"][1] == "admin" | ||||
|     old_state, new_state = changed_event.changed_state["permission_sets"] | ||||
|     assert old_state == [] | ||||
|     assert set(new_state) == PortfolioRoles.DEFAULT_PORTFOLIO_PERMISSION_SETS | ||||
|  | ||||
|  | ||||
| def test_has_portfolio_status_history(session): | ||||
|   | ||||
		Reference in New Issue
	
	Block a user