Merge pull request #746 from dod-ccpo/application_roles
Application roles
This commit is contained in:
commit
d0bf5745e6
59
alembic/versions/32438a35cfb5_add_application_roles.py
Normal file
59
alembic/versions/32438a35cfb5_add_application_roles.py
Normal file
@ -0,0 +1,59 @@
|
||||
"""add application_roles
|
||||
|
||||
Revision ID: 32438a35cfb5
|
||||
Revises: 49e12ae7c9ca
|
||||
Create Date: 2019-04-04 06:49:57.693753
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '32438a35cfb5'
|
||||
down_revision = '49e12ae7c9ca'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('application_roles',
|
||||
sa.Column('time_created', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'), nullable=False),
|
||||
sa.Column('time_updated', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'), nullable=False),
|
||||
sa.Column('id', postgresql.UUID(as_uuid=True), server_default=sa.text('uuid_generate_v4()'), nullable=False),
|
||||
sa.Column('application_id', postgresql.UUID(as_uuid=True), nullable=False),
|
||||
sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=False),
|
||||
sa.Column('status', sa.Enum('ACTIVE', 'DISABLED', 'PENDING', name='status', native_enum=False), nullable=True),
|
||||
sa.ForeignKeyConstraint(['application_id'], ['applications.id'], ),
|
||||
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
op.create_index('application_role_user_application', 'application_roles', ['user_id', 'application_id'], unique=True)
|
||||
op.create_index(op.f('ix_application_roles_application_id'), 'application_roles', ['application_id'], unique=False)
|
||||
op.create_index(op.f('ix_application_roles_user_id'), 'application_roles', ['user_id'], unique=False)
|
||||
op.create_table('application_roles_permission_sets',
|
||||
sa.Column('application_role_id', postgresql.UUID(as_uuid=True), nullable=True),
|
||||
sa.Column('permission_set_id', postgresql.UUID(as_uuid=True), nullable=True),
|
||||
sa.ForeignKeyConstraint(['application_role_id'], ['application_roles.id'], ),
|
||||
sa.ForeignKeyConstraint(['permission_set_id'], ['permission_sets.id'], )
|
||||
)
|
||||
op.create_index(op.f('ix_permission_sets_name'), 'permission_sets', ['name'], unique=True)
|
||||
op.create_index(op.f('ix_permission_sets_permissions'), 'permission_sets', ['permissions'], unique=False)
|
||||
op.drop_index('ix_roles_name', table_name='permission_sets')
|
||||
op.drop_index('ix_roles_permissions', table_name='permission_sets')
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_index('ix_roles_permissions', 'permission_sets', ['permissions'], unique=False)
|
||||
op.create_index('ix_roles_name', 'permission_sets', ['name'], unique=True)
|
||||
op.drop_index(op.f('ix_permission_sets_permissions'), table_name='permission_sets')
|
||||
op.drop_index(op.f('ix_permission_sets_name'), table_name='permission_sets')
|
||||
op.drop_table('application_roles_permission_sets')
|
||||
op.drop_index(op.f('ix_application_roles_user_id'), table_name='application_roles')
|
||||
op.drop_index(op.f('ix_application_roles_application_id'), table_name='application_roles')
|
||||
op.drop_index('application_role_user_application', table_name='application_roles')
|
||||
op.drop_table('application_roles')
|
||||
# ### end Alembic commands ###
|
@ -2,11 +2,19 @@ from atst.utils import first_or_none
|
||||
from atst.models.permissions import Permissions
|
||||
from atst.domain.exceptions import UnauthorizedError
|
||||
from atst.models.portfolio_role import Status as PortfolioRoleStatus
|
||||
from atst.models.application_role import Status as ApplicationRoleStatus
|
||||
|
||||
|
||||
class Authorization(object):
|
||||
@classmethod
|
||||
def has_atat_permission(cls, user, permission):
|
||||
return permission in user.permissions
|
||||
|
||||
@classmethod
|
||||
def has_portfolio_permission(cls, user, portfolio, permission):
|
||||
if Authorization.has_atat_permission(user, permission):
|
||||
return True
|
||||
|
||||
port_role = first_or_none(
|
||||
lambda pr: pr.portfolio == portfolio, user.portfolio_roles
|
||||
)
|
||||
@ -16,22 +24,37 @@ class Authorization(object):
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def has_atat_permission(cls, user, permission):
|
||||
return permission in user.permissions
|
||||
def has_application_permission(cls, user, application, permission):
|
||||
if Authorization.has_portfolio_permission(
|
||||
user, application.portfolio, permission
|
||||
):
|
||||
return True
|
||||
|
||||
app_role = first_or_none(
|
||||
lambda app_role: app_role.application == application, user.application_roles
|
||||
)
|
||||
if app_role and app_role.status is not ApplicationRoleStatus.DISABLED:
|
||||
return permission in app_role.permissions
|
||||
else:
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def check_portfolio_permission(cls, user, portfolio, permission, message):
|
||||
if not (
|
||||
Authorization.has_atat_permission(user, permission)
|
||||
or Authorization.has_portfolio_permission(user, portfolio, permission)
|
||||
):
|
||||
def check_atat_permission(cls, user, permission, message):
|
||||
if not Authorization.has_atat_permission(user, permission):
|
||||
raise UnauthorizedError(user, message)
|
||||
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def check_atat_permission(cls, user, permission, message):
|
||||
if not Authorization.has_atat_permission(user, permission):
|
||||
def check_portfolio_permission(cls, user, portfolio, permission, message):
|
||||
if not Authorization.has_portfolio_permission(user, portfolio, permission):
|
||||
raise UnauthorizedError(user, message)
|
||||
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def check_application_permission(cls, user, portfolio, permission, message):
|
||||
if not Authorization.has_application_permission(user, portfolio, permission):
|
||||
raise UnauthorizedError(user, message)
|
||||
|
||||
return True
|
||||
@ -70,8 +93,12 @@ class Authorization(object):
|
||||
raise UnauthorizedError(user, message)
|
||||
|
||||
|
||||
def user_can_access(user, permission, portfolio=None, message=None):
|
||||
if portfolio:
|
||||
def user_can_access(user, permission, portfolio=None, application=None, message=None):
|
||||
if application:
|
||||
Authorization.check_application_permission(
|
||||
user, application, permission, message
|
||||
)
|
||||
elif portfolio:
|
||||
Authorization.check_portfolio_permission(user, portfolio, permission, message)
|
||||
else:
|
||||
Authorization.check_atat_permission(user, permission, message)
|
||||
|
@ -15,6 +15,7 @@ def check_access(permission, message, override, *args, **kwargs):
|
||||
|
||||
if "application_id" in kwargs:
|
||||
application = Applications.get(kwargs["application_id"])
|
||||
access_args["application"] = application
|
||||
access_args["portfolio"] = application.portfolio
|
||||
|
||||
elif "task_order_id" in kwargs:
|
||||
|
@ -18,6 +18,11 @@ class PermissionSets(object):
|
||||
PORTFOLIO_POC = "portfolio_poc"
|
||||
VIEW_AUDIT_LOG = "view_audit_log"
|
||||
|
||||
VIEW_APPLICATION = "view_application"
|
||||
EDIT_APPLICATION_ENVIRONMENTS = "edit_application_environments"
|
||||
EDIT_APPLICATION_TEAM = "edit_application_team"
|
||||
DELETE_APPLICATION_ENVIRONMENTS = "delete_application_environments"
|
||||
|
||||
@classmethod
|
||||
def get(cls, perms_set_name):
|
||||
try:
|
||||
@ -85,6 +90,8 @@ _PORTFOLIO_APP_MGMT_PERMISSION_SETS = [
|
||||
Permissions.CREATE_APPLICATION_MEMBER,
|
||||
Permissions.EDIT_ENVIRONMENT,
|
||||
Permissions.CREATE_ENVIRONMENT,
|
||||
Permissions.DELETE_ENVIRONMENT,
|
||||
Permissions.ASSIGN_ENVIRONMENT_MEMBER,
|
||||
],
|
||||
},
|
||||
]
|
||||
@ -167,3 +174,51 @@ PORTFOLIO_PERMISSION_SETS = (
|
||||
+ _PORTFOLIO_ADMIN_PERMISSION_SETS
|
||||
+ _PORTFOLIO_POC_PERMISSION_SETS
|
||||
)
|
||||
|
||||
_APPLICATION_BASIC_PERMISSION_SET = {
|
||||
"name": PermissionSets.VIEW_APPLICATION,
|
||||
"description": "View application data",
|
||||
"display_name": "View applications",
|
||||
"permissions": [
|
||||
Permissions.VIEW_APPLICATION,
|
||||
Permissions.VIEW_APPLICATION_MEMBER,
|
||||
Permissions.VIEW_ENVIRONMENT,
|
||||
],
|
||||
}
|
||||
|
||||
# need perm to assign and unassign users to environments
|
||||
_APPLICATION_ENVIRONMENTS_PERMISSION_SET = {
|
||||
"name": PermissionSets.EDIT_APPLICATION_ENVIRONMENTS,
|
||||
"description": "Manage environments for an application",
|
||||
"display_name": "Manage environments",
|
||||
"permissions": [
|
||||
Permissions.EDIT_ENVIRONMENT,
|
||||
Permissions.CREATE_ENVIRONMENT,
|
||||
Permissions.ASSIGN_ENVIRONMENT_MEMBER,
|
||||
],
|
||||
}
|
||||
|
||||
_APPLICATION_TEAM_PERMISSION_SET = {
|
||||
"name": PermissionSets.EDIT_APPLICATION_TEAM,
|
||||
"description": "Manage team members for an application",
|
||||
"display_name": "Manage team",
|
||||
"permissions": [
|
||||
Permissions.EDIT_APPLICATION_MEMBER,
|
||||
Permissions.CREATE_APPLICATION_MEMBER,
|
||||
Permissions.ASSIGN_ENVIRONMENT_MEMBER,
|
||||
],
|
||||
}
|
||||
|
||||
_APPLICATION_ENVIRONMENT_DELETE_PERMISSION_SET = {
|
||||
"name": PermissionSets.DELETE_APPLICATION_ENVIRONMENTS,
|
||||
"description": "Delete environments within an application",
|
||||
"display_name": "Delete environments",
|
||||
"permissions": [Permissions.DELETE_ENVIRONMENT],
|
||||
}
|
||||
|
||||
APPLICATION_PERMISSION_SETS = [
|
||||
_APPLICATION_BASIC_PERMISSION_SET,
|
||||
_APPLICATION_TEAM_PERMISSION_SET,
|
||||
_APPLICATION_ENVIRONMENTS_PERMISSION_SET,
|
||||
_APPLICATION_ENVIRONMENT_DELETE_PERMISSION_SET,
|
||||
]
|
||||
|
@ -6,6 +6,8 @@ from .permissions import Permissions
|
||||
from .permission_set import PermissionSet
|
||||
from .user import User
|
||||
from .portfolio_role import PortfolioRole
|
||||
from .application_role import ApplicationRole
|
||||
from .environment_role import EnvironmentRole
|
||||
from .portfolio import Portfolio
|
||||
from .application import Application
|
||||
from .environment import Environment
|
||||
|
@ -16,10 +16,11 @@ class Application(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
|
||||
portfolio_id = Column(ForeignKey("portfolios.id"), nullable=False)
|
||||
portfolio = relationship("Portfolio")
|
||||
environments = relationship("Environment", back_populates="application")
|
||||
roles = relationship("ApplicationRole")
|
||||
|
||||
@property
|
||||
def users(self):
|
||||
return set([user for env in self.environments for user in env.users])
|
||||
return set(role.user for role in self.roles)
|
||||
|
||||
@property
|
||||
def num_users(self):
|
||||
|
68
atst/models/application_role.py
Normal file
68
atst/models/application_role.py
Normal file
@ -0,0 +1,68 @@
|
||||
from enum import Enum
|
||||
from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table
|
||||
from sqlalchemy.dialects.postgresql import UUID
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.event import listen
|
||||
|
||||
from atst.models import Base, mixins
|
||||
from atst.models.mixins.auditable import record_permission_sets_updates
|
||||
from .types import Id
|
||||
|
||||
|
||||
class Status(Enum):
|
||||
ACTIVE = "active"
|
||||
DISABLED = "disabled"
|
||||
PENDING = "pending"
|
||||
|
||||
|
||||
application_roles_permission_sets = Table(
|
||||
"application_roles_permission_sets",
|
||||
Base.metadata,
|
||||
Column(
|
||||
"application_role_id", UUID(as_uuid=True), ForeignKey("application_roles.id")
|
||||
),
|
||||
Column("permission_set_id", UUID(as_uuid=True), ForeignKey("permission_sets.id")),
|
||||
)
|
||||
|
||||
|
||||
class ApplicationRole(
|
||||
Base, mixins.TimestampsMixin, mixins.AuditableMixin, mixins.PermissionsMixin
|
||||
):
|
||||
__tablename__ = "application_roles"
|
||||
|
||||
id = Id()
|
||||
application_id = Column(
|
||||
UUID(as_uuid=True), ForeignKey("applications.id"), index=True, nullable=False
|
||||
)
|
||||
application = relationship("Application", back_populates="roles")
|
||||
|
||||
user_id = Column(
|
||||
UUID(as_uuid=True), ForeignKey("users.id"), index=True, nullable=False
|
||||
)
|
||||
|
||||
status = Column(SQLAEnum(Status, native_enum=False), default=Status.PENDING)
|
||||
|
||||
permission_sets = relationship(
|
||||
"PermissionSet", secondary=application_roles_permission_sets
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return "<ApplicationRole(application='{}', user_id='{}', id='{}', permissions={})>".format(
|
||||
self.application.name, self.user_id, self.id, self.permissions
|
||||
)
|
||||
|
||||
|
||||
Index(
|
||||
"application_role_user_application",
|
||||
ApplicationRole.user_id,
|
||||
ApplicationRole.application_id,
|
||||
unique=True,
|
||||
)
|
||||
|
||||
|
||||
listen(
|
||||
ApplicationRole.permission_sets,
|
||||
"bulk_replace",
|
||||
record_permission_sets_updates,
|
||||
raw=True,
|
||||
)
|
@ -98,3 +98,18 @@ class AuditableMixin(object):
|
||||
@property
|
||||
def displayname(self):
|
||||
return None
|
||||
|
||||
|
||||
def record_permission_sets_updates(instance_state, permission_sets, initiator):
|
||||
old_perm_sets = instance_state.attrs.get("permission_sets").value
|
||||
if instance_state.persistent and old_perm_sets != permission_sets:
|
||||
connection = instance_state.session.connection()
|
||||
old_state = [p.name for p in old_perm_sets]
|
||||
new_state = [p.name for p in permission_sets]
|
||||
changed_state = {"permission_sets": (old_state, new_state)}
|
||||
instance_state.object.create_audit_event(
|
||||
connection,
|
||||
instance_state.object,
|
||||
ACTION_UPDATE,
|
||||
changed_state=changed_state,
|
||||
)
|
||||
|
@ -14,6 +14,8 @@ class Permissions(object):
|
||||
VIEW_ENVIRONMENT = "view_environment"
|
||||
EDIT_ENVIRONMENT = "edit_environment"
|
||||
CREATE_ENVIRONMENT = "create_environment"
|
||||
DELETE_ENVIRONMENT = "delete_environment"
|
||||
ASSIGN_ENVIRONMENT_MEMBER = "assign_environment_member"
|
||||
|
||||
# funding
|
||||
VIEW_PORTFOLIO_FUNDING = "view_portfolio_funding" # TO summary page
|
||||
|
@ -1,7 +1,8 @@
|
||||
from enum import Enum
|
||||
from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table, event
|
||||
from sqlalchemy import Index, ForeignKey, Column, Enum as SQLAEnum, Table
|
||||
from sqlalchemy.dialects.postgresql import UUID
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.event import listen
|
||||
|
||||
from atst.models import Base, mixins
|
||||
from .types import Id
|
||||
@ -11,7 +12,7 @@ from atst.utils import first_or_none
|
||||
from atst.models.environment_role import EnvironmentRole
|
||||
from atst.models.application import Application
|
||||
from atst.models.environment import Environment
|
||||
from atst.models.mixins.auditable import ACTION_UPDATE as AUDIT_ACTION_UPDATE
|
||||
from atst.models.mixins.auditable import record_permission_sets_updates
|
||||
|
||||
|
||||
MEMBER_STATUSES = {
|
||||
@ -168,17 +169,9 @@ Index(
|
||||
)
|
||||
|
||||
|
||||
@event.listens_for(PortfolioRole.permission_sets, "bulk_replace", raw=True)
|
||||
def record_permission_sets_updates(instance_state, permission_sets, initiator):
|
||||
old_perm_sets = instance_state.attrs.get("permission_sets").value
|
||||
if instance_state.persistent and old_perm_sets != permission_sets:
|
||||
connection = instance_state.session.connection()
|
||||
old_state = [p.name for p in old_perm_sets]
|
||||
new_state = [p.name for p in permission_sets]
|
||||
changed_state = {"permission_sets": (old_state, new_state)}
|
||||
instance_state.object.create_audit_event(
|
||||
connection,
|
||||
instance_state.object,
|
||||
AUDIT_ACTION_UPDATE,
|
||||
changed_state=changed_state,
|
||||
)
|
||||
listen(
|
||||
PortfolioRole.permission_sets,
|
||||
"bulk_replace",
|
||||
record_permission_sets_updates,
|
||||
raw=True,
|
||||
)
|
||||
|
@ -25,6 +25,7 @@ class User(
|
||||
permission_sets = relationship("PermissionSet", secondary=users_permission_sets)
|
||||
|
||||
portfolio_roles = relationship("PortfolioRole", backref="user")
|
||||
application_roles = relationship("ApplicationRole", backref="user")
|
||||
|
||||
email = Column(String, unique=True)
|
||||
dod_id = Column(String, unique=True, nullable=False)
|
||||
|
@ -9,12 +9,18 @@ sys.path.append(parent_dir)
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
from atst.app import make_config, make_app
|
||||
from atst.database import db
|
||||
from atst.models import PermissionSet, Permissions
|
||||
from atst.domain.permission_sets import ATAT_PERMISSION_SETS, PORTFOLIO_PERMISSION_SETS
|
||||
from atst.models import PermissionSet
|
||||
from atst.domain.permission_sets import (
|
||||
ATAT_PERMISSION_SETS,
|
||||
PORTFOLIO_PERMISSION_SETS,
|
||||
APPLICATION_PERMISSION_SETS,
|
||||
)
|
||||
|
||||
|
||||
def seed_roles():
|
||||
for permission_set_info in ATAT_PERMISSION_SETS + PORTFOLIO_PERMISSION_SETS:
|
||||
for permission_set_info in (
|
||||
ATAT_PERMISSION_SETS + PORTFOLIO_PERMISSION_SETS + APPLICATION_PERMISSION_SETS
|
||||
):
|
||||
permission_set = PermissionSet(**permission_set_info)
|
||||
try:
|
||||
existing_permission_set = (
|
||||
|
@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
|
||||
from tests.factories import (
|
||||
ApplicationRoleFactory,
|
||||
TaskOrderFactory,
|
||||
UserFactory,
|
||||
PortfolioFactory,
|
||||
@ -69,6 +70,22 @@ def test_has_portfolio_permission():
|
||||
)
|
||||
|
||||
|
||||
def test_has_application_permission():
|
||||
role_one = PermissionSets.get(PermissionSets.EDIT_APPLICATION_TEAM)
|
||||
role_two = PermissionSets.get(PermissionSets.EDIT_APPLICATION_ENVIRONMENTS)
|
||||
app_role = ApplicationRoleFactory.create(permission_sets=[role_one, role_two])
|
||||
different_user = UserFactory.create()
|
||||
assert Authorization.has_application_permission(
|
||||
app_role.user, app_role.application, Permissions.EDIT_ENVIRONMENT
|
||||
)
|
||||
assert not Authorization.has_portfolio_permission(
|
||||
app_role.user, app_role.application, Permissions.DELETE_ENVIRONMENT
|
||||
)
|
||||
assert not Authorization.has_portfolio_permission(
|
||||
different_user, app_role.application, Permissions.DELETE_ENVIRONMENT
|
||||
)
|
||||
|
||||
|
||||
def test_user_can_access():
|
||||
ccpo = UserFactory.create_ccpo()
|
||||
edit_admin = UserFactory.create()
|
||||
@ -120,7 +137,23 @@ def set_current_user(request_ctx):
|
||||
request_ctx.g.current_user = None
|
||||
|
||||
|
||||
def test_user_can_access_decorator(set_current_user):
|
||||
def test_user_can_access_decorator_atat_level(set_current_user):
|
||||
ccpo = UserFactory.create_ccpo()
|
||||
rando = UserFactory.create()
|
||||
|
||||
@user_can_access_decorator(Permissions.VIEW_AUDIT_LOG)
|
||||
def _access_activity_log(*args, **kwargs):
|
||||
return True
|
||||
|
||||
set_current_user(ccpo)
|
||||
assert _access_activity_log()
|
||||
|
||||
set_current_user(rando)
|
||||
with pytest.raises(UnauthorizedError):
|
||||
_access_activity_log()
|
||||
|
||||
|
||||
def test_user_can_access_decorator_portfolio_level(set_current_user):
|
||||
ccpo = UserFactory.create_ccpo()
|
||||
edit_admin = UserFactory.create()
|
||||
view_admin = UserFactory.create()
|
||||
@ -144,6 +177,36 @@ def test_user_can_access_decorator(set_current_user):
|
||||
_edit_portfolio_name(portfolio_id=portfolio.id)
|
||||
|
||||
|
||||
def test_user_can_access_decorator_application_level(set_current_user):
|
||||
ccpo = UserFactory.create_ccpo()
|
||||
port_admin = UserFactory.create()
|
||||
app_user = UserFactory.create()
|
||||
rando = UserFactory.create()
|
||||
|
||||
portfolio = PortfolioFactory.create(
|
||||
owner=port_admin, applications=[{"name": "Mos Eisley"}]
|
||||
)
|
||||
app = portfolio.applications[0]
|
||||
ApplicationRoleFactory.create(application=app, user=app_user)
|
||||
|
||||
@user_can_access_decorator(Permissions.VIEW_APPLICATION)
|
||||
def _stroll_into_mos_eisley(*args, **kwargs):
|
||||
return True
|
||||
|
||||
set_current_user(ccpo)
|
||||
assert _stroll_into_mos_eisley(application_id=app.id)
|
||||
|
||||
set_current_user(port_admin)
|
||||
assert _stroll_into_mos_eisley(application_id=app.id)
|
||||
|
||||
set_current_user(app_user)
|
||||
assert _stroll_into_mos_eisley(application_id=app.id)
|
||||
|
||||
set_current_user(rando)
|
||||
with pytest.raises(UnauthorizedError):
|
||||
_stroll_into_mos_eisley(application_id=app.id)
|
||||
|
||||
|
||||
def test_user_can_access_decorator_override(set_current_user):
|
||||
rando_calrissian = UserFactory.create()
|
||||
darth_vader = UserFactory.create()
|
||||
|
@ -4,22 +4,14 @@ import string
|
||||
import factory
|
||||
from uuid import uuid4
|
||||
import datetime
|
||||
from faker import Faker as _Faker
|
||||
|
||||
from atst.forms import data
|
||||
from atst.models.attachment import Attachment
|
||||
from atst.models.environment import Environment
|
||||
from atst.models.application import Application
|
||||
from atst.models.task_order import TaskOrder
|
||||
from atst.models.user import User
|
||||
from atst.models.permission_set import PermissionSet
|
||||
from atst.models.portfolio import Portfolio
|
||||
from atst.domain.permission_sets import PermissionSets, PORTFOLIO_PERMISSION_SETS
|
||||
from atst.models.portfolio_role import PortfolioRole, Status as PortfolioRoleStatus
|
||||
from atst.models.environment_role import EnvironmentRole
|
||||
from atst.models.invitation import Invitation, Status as InvitationStatus
|
||||
from atst.models.dd_254 import DD254
|
||||
from atst.models import *
|
||||
from atst.models.portfolio_role import Status as PortfolioRoleStatus
|
||||
from atst.models.application_role import Status as ApplicationRoleStatus
|
||||
from atst.models.invitation import Status as InvitationStatus
|
||||
from atst.domain.invitations import Invitations
|
||||
from atst.domain.permission_sets import PermissionSets
|
||||
from atst.domain.portfolio_roles import PortfolioRoles
|
||||
|
||||
|
||||
@ -71,6 +63,10 @@ def base_portfolio_permission_sets():
|
||||
]
|
||||
|
||||
|
||||
def base_application_permission_sets():
|
||||
return [PermissionSets.get(PermissionSets.VIEW_APPLICATION)]
|
||||
|
||||
|
||||
def get_all_portfolio_permission_sets():
|
||||
return PermissionSets.get_many(PortfolioRoles.PORTFOLIO_PERMISSION_SETS)
|
||||
|
||||
@ -169,6 +165,20 @@ class ApplicationFactory(Base):
|
||||
with_environments = kwargs.pop("environments", [])
|
||||
application = super()._create(model_class, *args, **kwargs)
|
||||
|
||||
# need to create application roles for environment users
|
||||
app_members_from_envs = set()
|
||||
for env in with_environments:
|
||||
with_members = env.get("members", [])
|
||||
for member_data in with_members:
|
||||
member = member_data.get("user", UserFactory.create())
|
||||
app_members_from_envs.add(member)
|
||||
# set for environments in case we just created the
|
||||
# user for the application
|
||||
member_data["user"] = member
|
||||
|
||||
for member in app_members_from_envs:
|
||||
ApplicationRoleFactory.create(application=application, user=member)
|
||||
|
||||
environments = [
|
||||
EnvironmentFactory.create(application=application, **e)
|
||||
for e in with_environments
|
||||
@ -207,6 +217,16 @@ class PortfolioRoleFactory(Base):
|
||||
permission_sets = factory.LazyFunction(base_portfolio_permission_sets)
|
||||
|
||||
|
||||
class ApplicationRoleFactory(Base):
|
||||
class Meta:
|
||||
model = ApplicationRole
|
||||
|
||||
application = factory.SubFactory(ApplicationFactory)
|
||||
user = factory.SubFactory(UserFactory)
|
||||
status = ApplicationRoleStatus.PENDING
|
||||
permission_sets = factory.LazyFunction(base_application_permission_sets)
|
||||
|
||||
|
||||
class EnvironmentRoleFactory(Base):
|
||||
class Meta:
|
||||
model = EnvironmentRole
|
||||
|
@ -1,5 +1,4 @@
|
||||
from atst.domain.environments import Environments
|
||||
from tests.factories import ApplicationFactory, UserFactory
|
||||
from tests.factories import ApplicationFactory, ApplicationRoleFactory
|
||||
|
||||
|
||||
def test_application_num_users():
|
||||
@ -8,15 +7,5 @@ def test_application_num_users():
|
||||
)
|
||||
assert application.num_users == 0
|
||||
|
||||
first_env = application.environments[0]
|
||||
user1 = UserFactory()
|
||||
Environments.add_member(first_env, user1, "developer")
|
||||
ApplicationRoleFactory.create(application=application)
|
||||
assert application.num_users == 1
|
||||
|
||||
second_env = application.environments[-1]
|
||||
Environments.add_member(second_env, user1, "developer")
|
||||
assert application.num_users == 1
|
||||
|
||||
user2 = UserFactory()
|
||||
Environments.add_member(second_env, user2, "developer")
|
||||
assert application.num_users == 2
|
||||
|
40
tests/models/test_application_role.py
Normal file
40
tests/models/test_application_role.py
Normal file
@ -0,0 +1,40 @@
|
||||
from atst.domain.permission_sets import PermissionSets
|
||||
from atst.models.audit_event import AuditEvent
|
||||
|
||||
from tests.factories import PortfolioFactory, UserFactory
|
||||
|
||||
|
||||
def test_has_application_role_history(session):
|
||||
owner = UserFactory.create()
|
||||
user = UserFactory.create()
|
||||
|
||||
PortfolioFactory.create(
|
||||
owner=owner,
|
||||
applications=[
|
||||
{
|
||||
"name": "starkiller",
|
||||
"environments": [
|
||||
{
|
||||
"name": "bridge",
|
||||
"members": [{"user": user, "role_name": "developer"}],
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
app_role = user.application_roles[0]
|
||||
app_role.permission_sets = [
|
||||
PermissionSets.get(PermissionSets.EDIT_APPLICATION_TEAM)
|
||||
]
|
||||
session.add(app_role)
|
||||
session.commit()
|
||||
|
||||
changed_event = (
|
||||
session.query(AuditEvent)
|
||||
.filter(AuditEvent.resource_id == app_role.id, AuditEvent.action == "update")
|
||||
.one()
|
||||
)
|
||||
old_state, new_state = changed_event.changed_state["permission_sets"]
|
||||
assert old_state == [PermissionSets.VIEW_APPLICATION]
|
||||
assert new_state == [PermissionSets.EDIT_APPLICATION_TEAM]
|
@ -12,6 +12,7 @@ from atst.models.portfolio_role import Status as PortfolioRoleStatus
|
||||
|
||||
from tests.factories import (
|
||||
AttachmentFactory,
|
||||
ApplicationRoleFactory,
|
||||
InvitationFactory,
|
||||
PortfolioFactory,
|
||||
PortfolioRoleFactory,
|
||||
@ -156,12 +157,14 @@ def test_portfolios_access_environment_access(get_url_assert_status):
|
||||
def test_portfolios_application_members_access(get_url_assert_status):
|
||||
ccpo = user_with(PermissionSets.VIEW_PORTFOLIO_APPLICATION_MANAGEMENT)
|
||||
owner = user_with()
|
||||
app_dev = user_with()
|
||||
rando = user_with()
|
||||
portfolio = PortfolioFactory.create(
|
||||
owner=owner,
|
||||
applications=[{"name": "Mos Eisley", "description": "Where Han shot first"}],
|
||||
)
|
||||
app = portfolio.applications[0]
|
||||
ApplicationRoleFactory.create(application=app, user=app_dev)
|
||||
|
||||
url = url_for(
|
||||
"portfolios.application_members",
|
||||
@ -170,6 +173,7 @@ def test_portfolios_application_members_access(get_url_assert_status):
|
||||
)
|
||||
get_url_assert_status(ccpo, url, 200)
|
||||
get_url_assert_status(owner, url, 200)
|
||||
get_url_assert_status(app_dev, url, 200)
|
||||
get_url_assert_status(rando, url, 404)
|
||||
|
||||
|
||||
@ -570,7 +574,7 @@ def test_portfolios_update_member_access(post_url_assert_status):
|
||||
prt_member = user_with()
|
||||
|
||||
portfolio = PortfolioFactory.create(owner=owner)
|
||||
prr = PortfolioRoleFactory.create(user=prt_member, portfolio=portfolio)
|
||||
PortfolioRoleFactory.create(user=prt_member, portfolio=portfolio)
|
||||
|
||||
url = url_for(
|
||||
"portfolios.update_member", portfolio_id=portfolio.id, member_id=prt_member.id
|
||||
@ -588,7 +592,7 @@ def test_portfolios_view_member_access(get_url_assert_status):
|
||||
prt_member = user_with()
|
||||
|
||||
portfolio = PortfolioFactory.create(owner=owner)
|
||||
prr = PortfolioRoleFactory.create(user=prt_member, portfolio=portfolio)
|
||||
PortfolioRoleFactory.create(user=prt_member, portfolio=portfolio)
|
||||
|
||||
url = url_for(
|
||||
"portfolios.view_member", portfolio_id=portfolio.id, member_id=prt_member.id
|
||||
|
Loading…
x
Reference in New Issue
Block a user