Adds an ORM permission set listener for application roles.
Application role changes will be recorded in the audit log. Generalizes pre-existing listener that was in user for portfolio roles.
This commit is contained in:
parent
9c10a14827
commit
b17741acd1
@ -2,8 +2,10 @@ from enum import Enum
|
||||
from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table
|
||||
from sqlalchemy.dialects.postgresql import UUID
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.event import listen
|
||||
|
||||
from atst.models import Base, mixins
|
||||
from atst.models.mixins.auditable import record_permission_sets_updates
|
||||
from .types import Id
|
||||
|
||||
|
||||
@ -56,3 +58,11 @@ Index(
|
||||
ApplicationRole.application_id,
|
||||
unique=True,
|
||||
)
|
||||
|
||||
|
||||
listen(
|
||||
ApplicationRole.permission_sets,
|
||||
"bulk_replace",
|
||||
record_permission_sets_updates,
|
||||
raw=True,
|
||||
)
|
||||
|
@ -98,3 +98,18 @@ class AuditableMixin(object):
|
||||
@property
|
||||
def displayname(self):
|
||||
return None
|
||||
|
||||
|
||||
def record_permission_sets_updates(instance_state, permission_sets, initiator):
|
||||
old_perm_sets = instance_state.attrs.get("permission_sets").value
|
||||
if instance_state.persistent and old_perm_sets != permission_sets:
|
||||
connection = instance_state.session.connection()
|
||||
old_state = [p.name for p in old_perm_sets]
|
||||
new_state = [p.name for p in permission_sets]
|
||||
changed_state = {"permission_sets": (old_state, new_state)}
|
||||
instance_state.object.create_audit_event(
|
||||
connection,
|
||||
instance_state.object,
|
||||
ACTION_UPDATE,
|
||||
changed_state=changed_state,
|
||||
)
|
||||
|
@ -1,7 +1,8 @@
|
||||
from enum import Enum
|
||||
from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table, event
|
||||
from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table
|
||||
from sqlalchemy.dialects.postgresql import UUID
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.event import listen
|
||||
|
||||
from atst.models import Base, mixins
|
||||
from .types import Id
|
||||
@ -11,7 +12,7 @@ from atst.utils import first_or_none
|
||||
from atst.models.environment_role import EnvironmentRole
|
||||
from atst.models.application import Application
|
||||
from atst.models.environment import Environment
|
||||
from atst.models.mixins.auditable import ACTION_UPDATE as AUDIT_ACTION_UPDATE
|
||||
from atst.models.mixins.auditable import record_permission_sets_updates
|
||||
|
||||
|
||||
MEMBER_STATUSES = {
|
||||
@ -168,17 +169,9 @@ Index(
|
||||
)
|
||||
|
||||
|
||||
@event.listens_for(PortfolioRole.permission_sets, "bulk_replace", raw=True)
|
||||
def record_permission_sets_updates(instance_state, permission_sets, initiator):
|
||||
old_perm_sets = instance_state.attrs.get("permission_sets").value
|
||||
if instance_state.persistent and old_perm_sets != permission_sets:
|
||||
connection = instance_state.session.connection()
|
||||
old_state = [p.name for p in old_perm_sets]
|
||||
new_state = [p.name for p in permission_sets]
|
||||
changed_state = {"permission_sets": (old_state, new_state)}
|
||||
instance_state.object.create_audit_event(
|
||||
connection,
|
||||
instance_state.object,
|
||||
AUDIT_ACTION_UPDATE,
|
||||
changed_state=changed_state,
|
||||
)
|
||||
listen(
|
||||
PortfolioRole.permission_sets,
|
||||
"bulk_replace",
|
||||
record_permission_sets_updates,
|
||||
raw=True,
|
||||
)
|
||||
|
40
tests/models/test_application_role.py
Normal file
40
tests/models/test_application_role.py
Normal file
@ -0,0 +1,40 @@
|
||||
from atst.domain.permission_sets import PermissionSets
|
||||
from atst.models.audit_event import AuditEvent
|
||||
|
||||
from tests.factories import PortfolioFactory, UserFactory
|
||||
|
||||
|
||||
def test_has_application_role_history(session):
|
||||
owner = UserFactory.create()
|
||||
user = UserFactory.create()
|
||||
|
||||
PortfolioFactory.create(
|
||||
owner=owner,
|
||||
applications=[
|
||||
{
|
||||
"name": "starkiller",
|
||||
"environments": [
|
||||
{
|
||||
"name": "bridge",
|
||||
"members": [{"user": user, "role_name": "developer"}],
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
app_role = user.application_roles[0]
|
||||
app_role.permission_sets = [
|
||||
PermissionSets.get(PermissionSets.EDIT_APPLICATION_TEAM)
|
||||
]
|
||||
session.add(app_role)
|
||||
session.commit()
|
||||
|
||||
changed_event = (
|
||||
session.query(AuditEvent)
|
||||
.filter(AuditEvent.resource_id == app_role.id, AuditEvent.action == "update")
|
||||
.one()
|
||||
)
|
||||
old_state, new_state = changed_event.changed_state["permission_sets"]
|
||||
assert old_state == [PermissionSets.VIEW_APPLICATION]
|
||||
assert new_state == [PermissionSets.EDIT_APPLICATION_TEAM]
|
Loading…
x
Reference in New Issue
Block a user