Merge pull request #777 from dod-ccpo/edit-team-env-roles

Form for editing app env settings
This commit is contained in:
leigh-mil 2019-04-25 10:54:35 -04:00 committed by GitHub
commit 8eac86835a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 374 additions and 177 deletions

View File

@ -7,6 +7,7 @@ from atst.domain.portfolios import Portfolios
from atst.domain.task_orders import TaskOrders from atst.domain.task_orders import TaskOrders
from atst.domain.applications import Applications from atst.domain.applications import Applications
from atst.domain.invitations import Invitations from atst.domain.invitations import Invitations
from atst.domain.environments import Environments
from atst.domain.exceptions import UnauthorizedError from atst.domain.exceptions import UnauthorizedError
@ -31,6 +32,11 @@ def check_access(permission, message, override, *args, **kwargs):
g.current_user, kwargs["portfolio_id"] g.current_user, kwargs["portfolio_id"]
) )
elif "environment_id" in kwargs:
environment = Environments.get(kwargs["environment_id"])
access_args["application"] = environment.application
access_args["portfolio"] = environment.application.portfolio
if override is not None and override(g.current_user, **access_args, **kwargs): if override is not None and override(g.current_user, **access_args, **kwargs):
return True return True

View File

@ -6,6 +6,7 @@ from atst.models.environment import Environment
from atst.models.environment_role import EnvironmentRole from atst.models.environment_role import EnvironmentRole
from atst.models.application import Application from atst.models.application import Application
from atst.domain.environment_roles import EnvironmentRoles from atst.domain.environment_roles import EnvironmentRoles
from atst.domain.users import Users
from .exceptions import NotFoundError from .exceptions import NotFoundError
@ -62,39 +63,49 @@ class Environments(object):
return env return env
@classmethod @classmethod
def update_environment_roles(cls, portfolio_role, ids_and_roles): def update_env_role(cls, environment, user, new_role):
updated = False updated = False
for id_and_role in ids_and_roles: if new_role is None:
new_role = id_and_role["role"] updated = EnvironmentRoles.delete(user.id, environment.id)
environment = Environments.get(id_and_role["id"]) else:
env_role = EnvironmentRoles.get(user.id, environment.id)
if new_role is None: if env_role and env_role.role != new_role:
role_deleted = EnvironmentRoles.delete( env_role.role = new_role
portfolio_role.user.id, environment.id updated = True
db.session.add(env_role)
elif not env_role:
env_role = EnvironmentRoles.create(
user=user, environment=environment, role=new_role
) )
if role_deleted: updated = True
updated = True db.session.add(env_role)
else:
env_role = EnvironmentRoles.get(
portfolio_role.user.id, id_and_role["id"]
)
if env_role and env_role.role != new_role:
env_role.role = new_role
updated = True
db.session.add(env_role)
elif not env_role:
env_role = EnvironmentRoles.create(
user=portfolio_role.user, environment=environment, role=new_role
)
updated = True
db.session.add(env_role)
if updated: if updated:
db.session.commit() db.session.commit()
return updated return updated
@classmethod
def update_env_roles_by_environment(cls, environment_id, team_roles):
environment = Environments.get(environment_id)
for member in team_roles:
new_role = member["role"]
user = Users.get(member["user_id"])
Environments.update_env_role(
environment=environment, user=user, new_role=new_role
)
@classmethod
def update_env_roles_by_member(cls, member, env_roles):
for env_roles in env_roles:
new_role = env_roles["role"]
environment = Environments.get(env_roles["id"])
Environments.update_env_role(
environment=environment, user=member, new_role=new_role
)
@classmethod @classmethod
def revoke_access(cls, environment, target_user): def revoke_access(cls, environment, target_user):
EnvironmentRoles.delete(environment.id, target_user.id) EnvironmentRoles.delete(environment.id, target_user.id)

View File

@ -0,0 +1,16 @@
from flask_wtf import FlaskForm
from wtforms.fields import StringField, HiddenField, RadioField, FieldList, FormField
from .forms import BaseForm
from .data import ENV_ROLES
class EnvMemberRoleForm(FlaskForm):
name = StringField()
user_id = HiddenField()
role = RadioField(choices=ENV_ROLES, coerce=BaseForm.remove_empty_string)
class EnvironmentRolesForm(BaseForm):
team_roles = FieldList(FormField(EnvMemberRoleForm))
env_id = HiddenField()

View File

@ -1,4 +1,5 @@
from atst.utils.localization import translate, translate_duration from atst.utils.localization import translate, translate_duration
from atst.models.environment_role import CSPRole
SERVICE_BRANCHES = [ SERVICE_BRANCHES = [
@ -215,3 +216,5 @@ REQUIRED_DISTRIBUTIONS = [
("administrative_ko", "Administrative Contracting Officer"), ("administrative_ko", "Administrative Contracting Officer"),
("other", "Other as necessary"), ("other", "Other as necessary"),
] ]
ENV_ROLES = [(role.value, role.value) for role in CSPRole] + [(None, "No access")]

View File

@ -35,3 +35,7 @@ class BaseForm(FlaskForm):
if not valid: if not valid:
flash("form_errors") flash("form_errors")
return valid return valid
@classmethod
def remove_empty_string(cls, value):
return value or None

View File

@ -27,10 +27,6 @@ from .data import (
from atst.utils.localization import translate from atst.utils.localization import translate
def remove_empty_string(value):
return value or None
class AppInfoWithExistingPortfolioForm(BaseForm): class AppInfoWithExistingPortfolioForm(BaseForm):
scope = TextAreaField( scope = TextAreaField(
translate("forms.task_order.scope_label"), translate("forms.task_order.scope_label"),
@ -55,28 +51,28 @@ class AppInfoWithExistingPortfolioForm(BaseForm):
description=translate("forms.task_order.complexity.description"), description=translate("forms.task_order.complexity.description"),
choices=APPLICATION_COMPLEXITY, choices=APPLICATION_COMPLEXITY,
default=None, default=None,
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
widget=ListWidget(prefix_label=False), widget=ListWidget(prefix_label=False),
option_widget=CheckboxInput(), option_widget=CheckboxInput(),
) )
complexity_other = StringField( complexity_other = StringField(
translate("forms.task_order.complexity_other_label"), translate("forms.task_order.complexity_other_label"),
default=None, default=None,
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
dev_team = SelectMultipleField( dev_team = SelectMultipleField(
translate("forms.task_order.dev_team.label"), translate("forms.task_order.dev_team.label"),
description=translate("forms.task_order.dev_team.description"), description=translate("forms.task_order.dev_team.description"),
choices=DEV_TEAM, choices=DEV_TEAM,
default=None, default=None,
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
widget=ListWidget(prefix_label=False), widget=ListWidget(prefix_label=False),
option_widget=CheckboxInput(), option_widget=CheckboxInput(),
) )
dev_team_other = StringField( dev_team_other = StringField(
translate("forms.task_order.dev_team_other_label"), translate("forms.task_order.dev_team_other_label"),
default=None, default=None,
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
team_experience = RadioField( team_experience = RadioField(
translate("forms.task_order.team_experience.label"), translate("forms.task_order.team_experience.label"),
@ -91,7 +87,7 @@ class AppInfoForm(AppInfoWithExistingPortfolioForm):
portfolio_name = StringField( portfolio_name = StringField(
translate("forms.task_order.portfolio_name_label"), translate("forms.task_order.portfolio_name_label"),
description=translate("forms.task_order.portfolio_name_description"), description=translate("forms.task_order.portfolio_name_description"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
validators=[ validators=[
Required(), Required(),
Length( Length(
@ -105,7 +101,7 @@ class AppInfoForm(AppInfoWithExistingPortfolioForm):
translate("forms.task_order.defense_component_label"), translate("forms.task_order.defense_component_label"),
choices=SERVICE_BRANCHES, choices=SERVICE_BRANCHES,
default="", default="",
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
@ -147,36 +143,36 @@ class FundingForm(BaseForm):
class UnclassifiedFundingForm(FundingForm): class UnclassifiedFundingForm(FundingForm):
clin_02 = StringField( clin_02 = StringField(
translate("forms.task_order.unclassified_clin_02_label"), translate("forms.task_order.unclassified_clin_02_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
clin_04 = StringField( clin_04 = StringField(
translate("forms.task_order.unclassified_clin_04_label"), translate("forms.task_order.unclassified_clin_04_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
class OversightForm(BaseForm): class OversightForm(BaseForm):
ko_first_name = StringField( ko_first_name = StringField(
translate("forms.task_order.oversight_first_name_label"), translate("forms.task_order.oversight_first_name_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
ko_last_name = StringField( ko_last_name = StringField(
translate("forms.task_order.oversight_last_name_label"), translate("forms.task_order.oversight_last_name_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
ko_email = StringField( ko_email = StringField(
translate("forms.task_order.oversight_email_label"), translate("forms.task_order.oversight_email_label"),
validators=[Optional(), Email()], validators=[Optional(), Email()],
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
ko_phone_number = TelField( ko_phone_number = TelField(
translate("forms.task_order.oversight_phone_label"), translate("forms.task_order.oversight_phone_label"),
validators=[Optional(), PhoneNumber()], validators=[Optional(), PhoneNumber()],
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
ko_dod_id = StringField( ko_dod_id = StringField(
translate("forms.task_order.oversight_dod_id_label"), translate("forms.task_order.oversight_dod_id_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
validators=[ validators=[
RequiredIf(lambda form: form._fields.get("ko_invite").data), RequiredIf(lambda form: form._fields.get("ko_invite").data),
Length(min=10), Length(min=10),
@ -187,20 +183,20 @@ class OversightForm(BaseForm):
am_cor = BooleanField(translate("forms.task_order.oversight_am_cor_label")) am_cor = BooleanField(translate("forms.task_order.oversight_am_cor_label"))
cor_first_name = StringField( cor_first_name = StringField(
translate("forms.task_order.oversight_first_name_label"), translate("forms.task_order.oversight_first_name_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
cor_last_name = StringField( cor_last_name = StringField(
translate("forms.task_order.oversight_last_name_label"), translate("forms.task_order.oversight_last_name_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
cor_email = StringField( cor_email = StringField(
translate("forms.task_order.oversight_email_label"), translate("forms.task_order.oversight_email_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
validators=[Optional(), Email()], validators=[Optional(), Email()],
) )
cor_phone_number = TelField( cor_phone_number = TelField(
translate("forms.task_order.oversight_phone_label"), translate("forms.task_order.oversight_phone_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
validators=[ validators=[
RequiredIf(lambda form: not form._fields.get("am_cor").data), RequiredIf(lambda form: not form._fields.get("am_cor").data),
Optional(), Optional(),
@ -209,7 +205,7 @@ class OversightForm(BaseForm):
) )
cor_dod_id = StringField( cor_dod_id = StringField(
translate("forms.task_order.oversight_dod_id_label"), translate("forms.task_order.oversight_dod_id_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
validators=[ validators=[
RequiredIf( RequiredIf(
lambda form: not form._fields.get("am_cor").data lambda form: not form._fields.get("am_cor").data
@ -222,25 +218,25 @@ class OversightForm(BaseForm):
so_first_name = StringField( so_first_name = StringField(
translate("forms.task_order.oversight_first_name_label"), translate("forms.task_order.oversight_first_name_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
so_last_name = StringField( so_last_name = StringField(
translate("forms.task_order.oversight_last_name_label"), translate("forms.task_order.oversight_last_name_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
) )
so_email = StringField( so_email = StringField(
translate("forms.task_order.oversight_email_label"), translate("forms.task_order.oversight_email_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
validators=[Optional(), Email()], validators=[Optional(), Email()],
) )
so_phone_number = TelField( so_phone_number = TelField(
translate("forms.task_order.oversight_phone_label"), translate("forms.task_order.oversight_phone_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
validators=[Optional(), PhoneNumber()], validators=[Optional(), PhoneNumber()],
) )
so_dod_id = StringField( so_dod_id = StringField(
translate("forms.task_order.oversight_dod_id_label"), translate("forms.task_order.oversight_dod_id_label"),
filters=[remove_empty_string], filters=[BaseForm.remove_empty_string],
validators=[ validators=[
RequiredIf(lambda form: form._fields.get("so_invite").data), RequiredIf(lambda form: form._fields.get("so_invite").data),
Length(min=10), Length(min=10),

View File

@ -2,8 +2,10 @@ from flask import redirect, render_template, request as http_request, url_for
from . import applications_bp from . import applications_bp
from atst.domain.environment_roles import EnvironmentRoles from atst.domain.environment_roles import EnvironmentRoles
from atst.domain.environments import Environments
from atst.domain.applications import Applications from atst.domain.applications import Applications
from atst.forms.application import ApplicationForm from atst.forms.application import ApplicationForm
from atst.forms.app_settings import EnvironmentRolesForm
from atst.domain.authz.decorator import user_can_access_decorator as user_can from atst.domain.authz.decorator import user_can_access_decorator as user_can
from atst.models.permissions import Permissions from atst.models.permissions import Permissions
from atst.utils.flash import formatted_flash as flash from atst.utils.flash import formatted_flash as flash
@ -23,17 +25,40 @@ def get_environments_obj_for_app(application):
return environments_obj return environments_obj
def serialize_env_member_form_data(application):
environments_list = []
for env in application.environments:
env_info = {"env_id": env.id, "team_roles": []}
for user in env.users:
env_role = EnvironmentRoles.get(user.id, env.id)
env_info["team_roles"].append(
{
"name": user.full_name,
"user_id": user.id,
"role": env_role.displayname,
}
)
environments_list.append(env_info)
return environments_list
@applications_bp.route("/applications/<application_id>/settings") @applications_bp.route("/applications/<application_id>/settings")
@user_can(Permissions.VIEW_APPLICATION, message="view application edit form") @user_can(Permissions.VIEW_APPLICATION, message="view application edit form")
def settings(application_id): def settings(application_id):
# refactor like portfolio admin render function
application = Applications.get(application_id) application = Applications.get(application_id)
form = ApplicationForm(name=application.name, description=application.description) form = ApplicationForm(name=application.name, description=application.description)
app_envs_data = serialize_env_member_form_data(application)
env_forms = {}
for env_data in app_envs_data:
env_forms[env_data["env_id"]] = EnvironmentRolesForm(data=env_data)
return render_template( return render_template(
"portfolios/applications/edit.html", "portfolios/applications/settings.html",
application=application, application=application,
form=form, form=form,
environments_obj=get_environments_obj_for_app(application=application), environments_obj=get_environments_obj_for_app(application=application),
env_forms=env_forms,
) )
@ -53,11 +78,45 @@ def update(application_id):
) )
) )
else: else:
env_data = serialize_env_member_form_data(application)
env_forms = {}
for data in env_data:
env_forms[data["env_id"]] = EnvironmentRolesForm(data=data)
return render_template( return render_template(
"portfolios/applications/edit.html", "portfolios/applications/settings.html",
application=application, application=application,
form=form, form=form,
environments_obj=get_environments_obj_for_app(application=application), environments_obj=get_environments_obj_for_app(application=application),
env_forms=env_forms,
)
@applications_bp.route("/environments/<environment_id>/roles", methods=["POST"])
@user_can(Permissions.ASSIGN_ENVIRONMENT_MEMBER, message="update environment roles")
def update_env_roles(environment_id):
environment = Environments.get(environment_id)
application = environment.application
env_roles_form = EnvironmentRolesForm(http_request.form)
if env_roles_form.validate():
env_data = env_roles_form.data
Environments.update_env_roles_by_environment(
environment_id=environment_id, team_roles=env_data["team_roles"]
)
return redirect(url_for("applications.settings", application_id=application.id))
else:
# TODO: Create a better pattern to handle when a form doesn't validate
# if a user is submitting the data via the web page then they
# should never have any form validation errors
return render_template(
"portfolios/applications/settings.html",
application=application,
form=ApplicationForm(
name=application.name, description=application.description
),
environments_obj=get_environments_obj_for_app(application=application),
env_forms=env_roles_form,
) )

View File

@ -5,7 +5,7 @@ from sqlalchemy.orm.exc import NoResultFound
from atst.database import db from atst.database import db
from atst.domain.authz import Authorization from atst.domain.authz import Authorization
from atst.models import Application, Portfolio, TaskOrder from atst.models import Application, Environment, Portfolio, TaskOrder
from atst.models.permissions import Permissions from atst.models.permissions import Permissions
from atst.domain.portfolios.scopes import ScopedPortfolio from atst.domain.portfolios.scopes import ScopedPortfolio
@ -25,6 +25,14 @@ def get_portfolio_from_context(view_args):
.filter(Application.id == view_args["application_id"]) .filter(Application.id == view_args["application_id"])
) )
elif "environment_id" in view_args:
query = (
db.session.query(Portfolio)
.join(Application, Application.portfolio_id == Portfolio.id)
.join(Environment, Environment.application_id == Application.id)
.filter(Environment.id == view_args["environment_id"])
)
elif "task_order_id" in view_args: elif "task_order_id" in view_args:
query = ( query = (
db.session.query(Portfolio) db.session.query(Portfolio)

View File

@ -2,8 +2,8 @@ import pytest
from atst.domain.environments import Environments from atst.domain.environments import Environments
from atst.domain.environment_roles import EnvironmentRoles from atst.domain.environment_roles import EnvironmentRoles
from atst.domain.portfolio_roles import PortfolioRoles
from atst.domain.exceptions import NotFoundError from atst.domain.exceptions import NotFoundError
from atst.models.environment_role import CSPRole
from tests.factories import ( from tests.factories import (
ApplicationFactory, ApplicationFactory,
@ -21,143 +21,109 @@ def test_create_environments():
assert env.cloud_id is not None assert env.cloud_id is not None
def test_create_environment_role_creates_cloud_id(session): def test_update_env_role():
owner = UserFactory.create() env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
developer = UserFactory.create() new_role = CSPRole.TECHNICAL_READ.value
portfolio = PortfolioFactory.create( assert Environments.update_env_role(env_role.environment, env_role.user, new_role)
owner=owner, assert env_role.role == new_role
members=[{"user": developer, "role_name": "developer"}],
applications=[
{"name": "application1", "environments": [{"name": "application1 prod"}]} def test_update_env_role_no_access():
], env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
assert Environments.update_env_role(env_role.environment, env_role.user, None)
assert not EnvironmentRoles.get(env_role.user.id, env_role.environment.id)
def test_update_env_role_no_change():
env_role = EnvironmentRoleFactory.create(role=CSPRole.BASIC_ACCESS.value)
new_role = CSPRole.BASIC_ACCESS.value
assert not Environments.update_env_role(
env_role.environment, env_role.user, new_role
) )
env = portfolio.applications[0].environments[0]
new_role = [{"id": env.id, "role": "developer"}]
portfolio_role = portfolio.members[0] def test_update_env_role_creates_cloud_id_for_new_member(session):
assert not portfolio_role.user.cloud_id user = UserFactory.create()
assert Environments.update_environment_roles(portfolio_role, new_role) env = EnvironmentFactory.create()
assert not user.cloud_id
assert portfolio_role.user.cloud_id is not None assert Environments.update_env_role(env, user, CSPRole.TECHNICAL_READ.value)
assert EnvironmentRoles.get(user.id, env.id)
assert user.cloud_id is not None
def test_update_environment_roles(): def test_update_env_roles_by_environment():
owner = UserFactory.create() environment = EnvironmentFactory.create()
developer = UserFactory.create() env_role_1 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value
portfolio = PortfolioFactory.create( )
owner=owner, env_role_2 = EnvironmentRoleFactory.create(
members=[{"user": developer, "role_name": "developer"}], environment=environment, role=CSPRole.NETWORK_ADMIN.value
applications=[ )
{ env_role_3 = EnvironmentRoleFactory.create(
"name": "application1", environment=environment, role=CSPRole.TECHNICAL_READ.value
"environments": [
{
"name": "application1 dev",
"members": [{"user": developer, "role_name": "devlops"}],
},
{
"name": "application1 staging",
"members": [{"user": developer, "role_name": "developer"}],
},
{"name": "application1 prod"},
],
}
],
) )
dev_env = portfolio.applications[0].environments[0] team_roles = [
staging_env = portfolio.applications[0].environments[1] {
new_ids_and_roles = [ "user_id": env_role_1.user.id,
{"id": dev_env.id, "role": "billing_admin"}, "name": env_role_1.user.full_name,
{"id": staging_env.id, "role": "developer"}, "role": CSPRole.BUSINESS_READ.value,
},
{
"user_id": env_role_2.user.id,
"name": env_role_2.user.full_name,
"role": CSPRole.NETWORK_ADMIN.value,
},
{
"user_id": env_role_3.user.id,
"name": env_role_3.user.full_name,
"role": None,
},
] ]
portfolio_role = portfolio.members[0] Environments.update_env_roles_by_environment(environment.id, team_roles)
assert Environments.update_environment_roles(portfolio_role, new_ids_and_roles) assert env_role_1.role == CSPRole.BUSINESS_READ.value
new_dev_env_role = EnvironmentRoles.get(portfolio_role.user.id, dev_env.id) assert env_role_2.role == CSPRole.NETWORK_ADMIN.value
staging_env_role = EnvironmentRoles.get(portfolio_role.user.id, staging_env.id) assert not EnvironmentRoles.get(env_role_3.user.id, environment.id)
assert new_dev_env_role.role == "billing_admin"
assert staging_env_role.role == "developer"
def test_remove_environment_role(): def test_update_env_roles_by_member():
owner = UserFactory.create() user = UserFactory.create()
developer = UserFactory.create() application = ApplicationFactory.create(
portfolio = PortfolioFactory.create( environments=[
owner=owner,
members=[{"user": developer, "role_name": "developer"}],
applications=[
{ {
"name": "application1", "name": "dev",
"environments": [ "members": [{"user": user, "role_name": CSPRole.BUSINESS_READ.value}],
{ },
"name": "application1 dev", {
"members": [{"user": developer, "role_name": "devops"}], "name": "staging",
}, "members": [{"user": user, "role_name": CSPRole.BUSINESS_READ.value}],
{ },
"name": "application1 staging", {"name": "prod"},
"members": [{"user": developer, "role_name": "developer"}], {
}, "name": "testing",
{ "members": [{"user": user, "role_name": CSPRole.BUSINESS_READ.value}],
"name": "application1 uat", },
"members": [ ]
{"user": developer, "role_name": "financial_auditor"}
],
},
{"name": "application1 prod"},
],
}
],
) )
application = portfolio.applications[0] dev, staging, prod, testing = application.environments
now_ba = application.environments[0].id env_roles = [
now_none = application.environments[1].id {"id": dev.id, "role": CSPRole.NETWORK_ADMIN.value},
still_fa = application.environments[2].id {"id": staging.id, "role": CSPRole.BUSINESS_READ.value},
{"id": prod.id, "role": CSPRole.TECHNICAL_READ.value},
new_environment_roles = [ {"id": testing.id, "role": None},
{"id": now_ba, "role": "billing_auditor"},
{"id": now_none, "role": None},
] ]
portfolio_role = PortfolioRoles.get(portfolio.id, developer.id) Environments.update_env_roles_by_member(user, env_roles)
assert Environments.update_environment_roles(portfolio_role, new_environment_roles)
assert portfolio_role.num_environment_roles == 2 assert EnvironmentRoles.get(user.id, dev.id).role == CSPRole.NETWORK_ADMIN.value
assert EnvironmentRoles.get(developer.id, now_ba).role == "billing_auditor" assert EnvironmentRoles.get(user.id, staging.id).role == CSPRole.BUSINESS_READ.value
assert EnvironmentRoles.get(developer.id, now_none) is None assert EnvironmentRoles.get(user.id, prod.id).role == CSPRole.TECHNICAL_READ.value
assert EnvironmentRoles.get(developer.id, still_fa).role == "financial_auditor" assert not EnvironmentRoles.get(user.id, testing.id)
def test_no_update_to_environment_roles():
owner = UserFactory.create()
developer = UserFactory.create()
portfolio = PortfolioFactory.create(
owner=owner,
members=[{"user": developer, "role_name": "developer"}],
applications=[
{
"name": "application1",
"environments": [
{
"name": "application1 dev",
"members": [{"user": developer, "role_name": "devops"}],
}
],
}
],
)
dev_env = portfolio.applications[0].environments[0]
new_ids_and_roles = [{"id": dev_env.id, "role": "devops"}]
portfolio_role = PortfolioRoles.get(portfolio.id, developer.id)
assert not Environments.update_environment_roles(portfolio_role, new_ids_and_roles)
def test_get_scoped_environments(db): def test_get_scoped_environments(db):

View File

@ -10,6 +10,7 @@ from atst.models import *
from atst.models.portfolio_role import Status as PortfolioRoleStatus from atst.models.portfolio_role import Status as PortfolioRoleStatus
from atst.models.application_role import Status as ApplicationRoleStatus from atst.models.application_role import Status as ApplicationRoleStatus
from atst.models.invitation import Status as InvitationStatus from atst.models.invitation import Status as InvitationStatus
from atst.models.environment_role import CSPRole
from atst.domain.invitations import Invitations from atst.domain.invitations import Invitations
from atst.domain.permission_sets import PermissionSets from atst.domain.permission_sets import PermissionSets
from atst.domain.portfolio_roles import PortfolioRoles from atst.domain.portfolio_roles import PortfolioRoles
@ -193,6 +194,7 @@ class EnvironmentFactory(Base):
model = Environment model = Environment
name = factory.Faker("domain_word") name = factory.Faker("domain_word")
application = factory.SubFactory(ApplicationFactory)
@classmethod @classmethod
def _create(cls, model_class, *args, **kwargs): def _create(cls, model_class, *args, **kwargs):
@ -234,7 +236,7 @@ class EnvironmentRoleFactory(Base):
model = EnvironmentRole model = EnvironmentRole
environment = factory.SubFactory(EnvironmentFactory) environment = factory.SubFactory(EnvironmentFactory)
role = factory.Faker("name") role = random.choice([e.value for e in CSPRole])
user = factory.SubFactory(UserFactory) user = factory.SubFactory(UserFactory)

View File

@ -123,8 +123,8 @@ def test_has_env_role_history(session):
env_role = EnvironmentRoleFactory.create( env_role = EnvironmentRoleFactory.create(
user=user, environment=environment, role="developer" user=user, environment=environment, role="developer"
) )
Environments.update_environment_roles( Environments.update_env_roles_by_member(
portfolio_role, [{"role": "admin", "id": environment.id}] user, [{"role": "admin", "id": environment.id}]
) )
changed_events = ( changed_events = (
session.query(AuditEvent) session.query(AuditEvent)

View File

@ -7,10 +7,15 @@ from tests.factories import (
EnvironmentRoleFactory, EnvironmentRoleFactory,
EnvironmentFactory, EnvironmentFactory,
ApplicationFactory, ApplicationFactory,
ApplicationRoleFactory,
) )
from atst.domain.applications import Applications from atst.domain.applications import Applications
from atst.domain.environment_roles import EnvironmentRoles
from atst.domain.environments import Environments
from atst.domain.permission_sets import PermissionSets
from atst.domain.portfolios import Portfolios from atst.domain.portfolios import Portfolios
from atst.models.environment_role import CSPRole
from atst.models.portfolio_role import Status as PortfolioRoleStatus from atst.models.portfolio_role import Status as PortfolioRoleStatus
from tests.utils import captured_templates from tests.utils import captured_templates
@ -65,6 +70,47 @@ def test_edit_application_environments_obj(app, client, user_session):
} }
def test_edit_app_serialize_env_member_form_data(app, client, user_session):
portfolio = PortfolioFactory.create()
application = Applications.create(
portfolio,
"Snazzy Application",
"A new application for me and my friends",
{"env1", "env2"},
)
user1 = UserFactory.create()
user2 = UserFactory.create()
env1 = application.environments[0]
env2 = application.environments[1]
env_role1 = EnvironmentRoleFactory.create(environment=env1, user=user1)
env_role2 = EnvironmentRoleFactory.create(environment=env1, user=user2)
env_role3 = EnvironmentRoleFactory.create(environment=env2, user=user1)
user_session(portfolio.owner)
with captured_templates(app) as templates:
response = app.test_client().get(
url_for("applications.settings", application_id=application.id)
)
assert response.status_code == 200
_, context = templates[0]
for env_id in context["env_forms"]:
env = Environments.get(environment_id=env_id)
form_data = {"env_id": env_id, "team_roles": []}
for user in env.users:
env_role = EnvironmentRoles.get(user.id, env.id)
form_data["team_roles"].append(
{
"name": user.full_name,
"user_id": user.id,
"role": env_role.displayname,
}
)
assert context["env_forms"][env_id].data == form_data
def test_user_with_permission_can_update_application(client, user_session): def test_user_with_permission_can_update_application(client, user_session):
owner = UserFactory.create() owner = UserFactory.create()
portfolio = PortfolioFactory.create( portfolio = PortfolioFactory.create(
@ -120,6 +166,49 @@ def test_user_without_permission_cannot_update_application(client, user_session)
assert application.description == "Cool stuff happening here!" assert application.description == "Cool stuff happening here!"
def test_update_team_env_roles(client, user_session):
environment = EnvironmentFactory.create()
application = environment.application
env_role_1 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value
)
env_role_2 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value
)
env_role_3 = EnvironmentRoleFactory.create(
environment=environment, role=CSPRole.BASIC_ACCESS.value
)
app_role = ApplicationRoleFactory.create(application=application)
form_data = {
"env_id": environment.id,
"team_roles-0-user_id": env_role_1.user.id,
"team_roles-0-name": env_role_1.user.full_name,
"team_roles-0-role": CSPRole.NETWORK_ADMIN.value,
"team_roles-1-user_id": env_role_2.user.id,
"team_roles-1-name": env_role_2.user.full_name,
"team_roles-1-role": CSPRole.BASIC_ACCESS.value,
"team_roles-2-user_id": env_role_3.user.id,
"team_roles-2-name": env_role_3.user.full_name,
"team_roles-2-role": "",
"team_roles-3-user_id": app_role.user.id,
"team_roles-3-name": app_role.user.full_name,
"team_roles-3-role": CSPRole.TECHNICAL_READ.value,
}
user_session(application.portfolio.owner)
response = client.post(
url_for("applications.update_env_roles", environment_id=environment.id),
data=form_data,
follow_redirects=True,
)
assert response.status_code == 200
assert env_role_1.role == CSPRole.NETWORK_ADMIN.value
assert env_role_2.role == CSPRole.BASIC_ACCESS.value
assert not EnvironmentRoles.get(env_role_3.user.id, environment.id)
assert EnvironmentRoles.get(app_role.user.id, environment.id)
def test_user_can_only_access_apps_in_their_portfolio(client, user_session): def test_user_can_only_access_apps_in_their_portfolio(client, user_session):
portfolio = PortfolioFactory.create() portfolio = PortfolioFactory.create()
other_portfolio = PortfolioFactory.create( other_portfolio = PortfolioFactory.create(

View File

@ -8,12 +8,14 @@ import atst
from atst.app import make_app, make_config from atst.app import make_app, make_config
from atst.domain.auth import UNPROTECTED_ROUTES as _NO_LOGIN_REQUIRED from atst.domain.auth import UNPROTECTED_ROUTES as _NO_LOGIN_REQUIRED
from atst.domain.permission_sets import PermissionSets from atst.domain.permission_sets import PermissionSets
from atst.models.environment_role import CSPRole
from atst.models.portfolio_role import Status as PortfolioRoleStatus from atst.models.portfolio_role import Status as PortfolioRoleStatus
from tests.factories import ( from tests.factories import (
AttachmentFactory, AttachmentFactory,
ApplicationFactory, ApplicationFactory,
ApplicationRoleFactory, ApplicationRoleFactory,
EnvironmentFactory,
InvitationFactory, InvitationFactory,
PortfolioFactory, PortfolioFactory,
PortfolioRoleFactory, PortfolioRoleFactory,
@ -168,6 +170,41 @@ def test_applications_create_access(post_url_assert_status):
post_url_assert_status(rando, url, 404) post_url_assert_status(rando, url, 404)
# applications.update_env_roles
def test_applications_update_team_env_roles(post_url_assert_status):
ccpo = UserFactory.create_ccpo()
owner = user_with()
app_admin = user_with()
rando = user_with()
app_member = UserFactory.create()
portfolio = PortfolioFactory.create(
owner=owner, applications=[{"name": "mos eisley"}]
)
application = portfolio.applications[0]
environment = EnvironmentFactory.create(application=application)
ApplicationRoleFactory.create(
user=app_admin,
application=application,
permission_sets=PermissionSets.get_many(
[
PermissionSets.VIEW_APPLICATION,
PermissionSets.EDIT_APPLICATION_ENVIRONMENTS,
PermissionSets.EDIT_APPLICATION_TEAM,
PermissionSets.DELETE_APPLICATION_ENVIRONMENTS,
]
),
)
ApplicationRoleFactory.create(user=app_member)
url = url_for("applications.update_env_roles", environment_id=environment.id)
post_url_assert_status(ccpo, url, 302)
post_url_assert_status(owner, url, 302)
post_url_assert_status(app_admin, url, 302)
post_url_assert_status(rando, url, 404)
# portfolios.create_member # portfolios.create_member
def test_portfolios_create_member_access(post_url_assert_status): def test_portfolios_create_member_access(post_url_assert_status):
ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN) ccpo = user_with(PermissionSets.EDIT_PORTFOLIO_ADMIN)