Raise error when a user attempts to update a disabled env role
This commit is contained in:
parent
e8f21acf5b
commit
06a36f23bc
@ -14,7 +14,7 @@ from atst.models import (
|
||||
)
|
||||
from atst.domain.environment_roles import EnvironmentRoles
|
||||
|
||||
from .exceptions import NotFoundError
|
||||
from .exceptions import NotFoundError, DisabledError
|
||||
|
||||
|
||||
class Environments(object):
|
||||
@ -57,34 +57,32 @@ class Environments(object):
|
||||
|
||||
@classmethod
|
||||
def update_env_role(cls, environment, application_role, new_role):
|
||||
updated = False
|
||||
|
||||
env_role = EnvironmentRoles.get_for_update(application_role.id, environment.id)
|
||||
if env_role and (
|
||||
env_role.status == EnvironmentRole.Status.DISABLED or env_role.deleted
|
||||
):
|
||||
raise DisabledError("environment_role", env_role.id)
|
||||
|
||||
if (
|
||||
env_role
|
||||
and env_role.role != new_role
|
||||
and env_role.status != EnvironmentRole.Status.DISABLED
|
||||
):
|
||||
env_role.role = new_role
|
||||
updated = True
|
||||
db.session.add(env_role)
|
||||
elif not env_role and new_role:
|
||||
env_role = EnvironmentRoles.create(
|
||||
application_role=application_role,
|
||||
environment=environment,
|
||||
role=new_role,
|
||||
)
|
||||
updated = True
|
||||
db.session.add(env_role)
|
||||
|
||||
if env_role and not new_role:
|
||||
EnvironmentRoles.disable(env_role.id)
|
||||
updated = True
|
||||
|
||||
if updated:
|
||||
db.session.add(env_role)
|
||||
db.session.commit()
|
||||
|
||||
return updated
|
||||
|
||||
@classmethod
|
||||
def revoke_access(cls, environment, target_user):
|
||||
EnvironmentRoles.delete(environment.id, target_user.id)
|
||||
|
@ -53,3 +53,13 @@ class ClaimFailedException(Exception):
|
||||
f"Could not acquire claim for {resource.__class__.__name__} {resource.id}."
|
||||
)
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class DisabledError(Exception):
|
||||
def __init__(self, resource_name, resource_id=None):
|
||||
self.resource_name = resource_name
|
||||
self.resource_id = resource_id
|
||||
|
||||
@property
|
||||
def message(self):
|
||||
return f"Cannot update disabled {self.resource_name} {self.resource_id}."
|
||||
|
@ -4,7 +4,7 @@ from uuid import uuid4
|
||||
|
||||
from atst.domain.environments import Environments
|
||||
from atst.domain.environment_roles import EnvironmentRoles
|
||||
from atst.domain.exceptions import NotFoundError
|
||||
from atst.domain.exceptions import NotFoundError, DisabledError
|
||||
from atst.models.environment_role import CSPRole, EnvironmentRole
|
||||
|
||||
from tests.factories import (
|
||||
@ -28,8 +28,7 @@ def test_create_environments():
|
||||
def test_update_env_role():
|
||||
env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
|
||||
new_role = CSPRole.TECHNICAL_READ.value
|
||||
|
||||
assert Environments.update_env_role(
|
||||
Environments.update_env_role(
|
||||
env_role.environment, env_role.application_role, new_role
|
||||
)
|
||||
assert env_role.role == new_role
|
||||
@ -37,10 +36,7 @@ def test_update_env_role():
|
||||
|
||||
def test_update_env_role_no_access():
|
||||
env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
|
||||
|
||||
assert Environments.update_env_role(
|
||||
env_role.environment, env_role.application_role, None
|
||||
)
|
||||
Environments.update_env_role(env_role.environment, env_role.application_role, None)
|
||||
|
||||
assert not EnvironmentRoles.get(
|
||||
env_role.application_role.id, env_role.environment.id
|
||||
@ -49,20 +45,15 @@ def test_update_env_role_no_access():
|
||||
assert env_role.status == EnvironmentRole.Status.DISABLED
|
||||
|
||||
|
||||
def test_update_env_role_no_change():
|
||||
env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
|
||||
new_role = CSPRole.BASIC_ACCESS.value
|
||||
|
||||
assert not Environments.update_env_role(
|
||||
env_role.environment, env_role.application_role, new_role
|
||||
)
|
||||
|
||||
|
||||
def test_update_env_role_deleted_role():
|
||||
def test_update_env_role_disabled_role():
|
||||
env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
|
||||
Environments.update_env_role(env_role.environment, env_role.application_role, None)
|
||||
assert not Environments.update_env_role(
|
||||
env_role.environment, env_role.application_role, CSPRole.TECHNICAL_READ.value
|
||||
|
||||
with pytest.raises(DisabledError):
|
||||
Environments.update_env_role(
|
||||
env_role.environment,
|
||||
env_role.application_role,
|
||||
CSPRole.TECHNICAL_READ.value,
|
||||
)
|
||||
assert env_role.role is None
|
||||
assert env_role.status == EnvironmentRole.Status.DISABLED
|
||||
|
Loading…
x
Reference in New Issue
Block a user