Merge pull request #865 from dod-ccpo/new-invitation-flow

New application invitation flow
This commit is contained in:
dandds 2019-06-05 15:13:59 -04:00 committed by GitHub
commit f2f0adde47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 148 additions and 197 deletions

View File

@ -1,13 +1,19 @@
from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm.exc import NoResultFound
from atst.database import db
from . import BaseDomainClass from . import BaseDomainClass
from atst.database import db
from atst.domain.application_roles import ApplicationRoles from atst.domain.application_roles import ApplicationRoles
from atst.domain.environment_roles import EnvironmentRoles from atst.domain.environment_roles import EnvironmentRoles
from atst.domain.environments import Environments from atst.domain.environments import Environments
from atst.domain.exceptions import NotFoundError from atst.domain.exceptions import NotFoundError
from atst.domain.users import Users from atst.domain.invitations import ApplicationInvitations
from atst.models import Application, ApplicationRole, ApplicationRoleStatus from atst.models import (
Application,
ApplicationRole,
ApplicationRoleStatus,
EnvironmentRole,
)
from atst.utils import first_or_none
class Applications(BaseDomainClass): class Applications(BaseDomainClass):
@ -76,33 +82,50 @@ class Applications(BaseDomainClass):
db.session.commit() db.session.commit()
@classmethod @classmethod
def create_member( def invite(
cls, application, user_data, permission_sets=None, environment_roles_data=None cls,
application,
inviter,
user_data,
permission_sets_names=None,
environment_roles_data=None,
): ):
permission_sets = [] if permission_sets is None else permission_sets permission_sets_names = permission_sets_names or []
environment_roles_data = ( permission_sets = ApplicationRoles._permission_sets_for_names(
[] if environment_roles_data is None else environment_roles_data permission_sets_names
)
app_role = ApplicationRole(
application=application, permission_sets=permission_sets
) )
user = Users.get_or_create_by_dod_id( db.session.add(app_role)
user_data["dod_id"],
first_name=user_data["first_name"],
last_name=user_data["last_name"],
phone_number=user_data.get("phone_number"),
email=user_data["email"],
)
application_role = ApplicationRoles.create(user, application, permission_sets)
for env_role_data in environment_roles_data: for env_role_data in environment_roles_data:
role = env_role_data.get("role") env_role_name = env_role_data.get("role")
if role: environment_id = env_role_data.get("environment_id")
environment = Environments.get(env_role_data.get("environment_id")) if env_role_name is not None:
Environments.add_member( # pylint: disable=cell-var-from-loop
environment, application_role, env_role_data.get("role") environment = first_or_none(
lambda e: str(e.id) == str(environment_id), application.environments
) )
if environment is None:
raise NotFoundError("environment")
else:
env_role = EnvironmentRole(
application_role=app_role,
environment=environment,
role=env_role_name,
)
db.session.add(env_role)
return application_role invitation = ApplicationInvitations.create(
inviter=inviter, role=app_role, member_data=user_data
)
db.session.add(invitation)
db.session.commit()
return invitation
@classmethod @classmethod
def remove_member(cls, application, user_id): def remove_member(cls, application, user_id):

View File

@ -59,12 +59,17 @@ class ApplicationRole(
), ),
) )
@property
def latest_invitation(self):
if self.invitations:
return self.invitations[-1]
@property @property
def user_name(self): def user_name(self):
if self.user: if self.user:
return self.user.full_name return self.user.full_name
else: elif self.latest_invitation:
return None return self.latest_invitation.user_name
def __repr__(self): def __repr__(self):
return "<ApplicationRole(application='{}', user_id='{}', id='{}', permissions={})>".format( return "<ApplicationRole(application='{}', user_id='{}', id='{}', permissions={})>".format(

View File

@ -13,8 +13,9 @@ from atst.domain.users import Users
from atst.forms.application_member import NewForm as NewMemberForm from atst.forms.application_member import NewForm as NewMemberForm
from atst.forms.team import TeamForm from atst.forms.team import TeamForm
from atst.models import Permissions from atst.models import Permissions
from atst.services.invitation import Invitation as InvitationService
from atst.utils.flash import formatted_flash as flash from atst.utils.flash import formatted_flash as flash
from atst.utils.localization import translate
from atst.queue import queue
def get_form_permission_value(member, edit_perm_set): def get_form_permission_value(member, edit_perm_set):
@ -125,6 +126,17 @@ def update_team(application_id):
return (render_team_page(application), 400) return (render_team_page(application), 400)
def send_application_invitation(invitee_email, inviter_name, token):
body = render_template(
"emails/application/invitation.txt", owner=inviter_name, token=token
)
queue.send_mail(
[invitee_email],
translate("email.application_invite", {"inviter_name": inviter_name}),
body,
)
@applications_bp.route("/application/<application_id>/members/new", methods=["POST"]) @applications_bp.route("/application/<application_id>/members/new", methods=["POST"])
@user_can( @user_can(
Permissions.CREATE_APPLICATION_MEMBER, message="create new application member" Permissions.CREATE_APPLICATION_MEMBER, message="create new application member"
@ -135,19 +147,21 @@ def create_member(application_id):
if form.validate(): if form.validate():
try: try:
member = Applications.create_member( invite = Applications.invite(
application, application=application,
form.user_data.data, inviter=g.current_user,
permission_sets=form.permission_sets.data, user_data=form.user_data.data,
permission_sets_names=form.permission_sets.data,
environment_roles_data=form.environment_roles.data, environment_roles_data=form.environment_roles.data,
) )
invite_service = InvitationService( send_application_invitation(
g.current_user, member, form.user_data.data.get("email") invitee_email=invite.email,
inviter_name=g.current_user.full_name,
token=invite.token,
) )
invite_service.invite()
flash("new_portfolio_member", new_member=member) flash("new_application_member", user_name=invite.user_name)
except AlreadyExistsError: except AlreadyExistsError:
return render_template( return render_template(

View File

@ -8,6 +8,7 @@ from atst.domain.portfolios import Portfolios
from atst.models import Permissions from atst.models import Permissions
from atst.queue import queue from atst.queue import queue
from atst.utils.flash import formatted_flash as flash from atst.utils.flash import formatted_flash as flash
from atst.utils.localization import translate
import atst.forms.portfolio_member as member_forms import atst.forms.portfolio_member as member_forms
@ -17,7 +18,7 @@ def send_portfolio_invitation(invitee_email, inviter_name, token):
) )
queue.send_mail( queue.send_mail(
[invitee_email], [invitee_email],
"{} has invited you to a JEDI cloud portfolio".format(inviter_name), translate("email.portfolio_invite", {"inviter_name": inviter_name}),
body, body,
) )
@ -54,7 +55,11 @@ def revoke_invitation(portfolio_id, portfolio_token):
@user_can(Permissions.EDIT_PORTFOLIO_USERS, message="resend invitation") @user_can(Permissions.EDIT_PORTFOLIO_USERS, message="resend invitation")
def resend_invitation(portfolio_id, portfolio_token): def resend_invitation(portfolio_id, portfolio_token):
invite = PortfolioInvitations.resend(g.current_user, portfolio_token) invite = PortfolioInvitations.resend(g.current_user, portfolio_token)
send_portfolio_invitation(invite.email, g.current_user.full_name, invite.token) send_portfolio_invitation(
invitee_email=invite.email,
inviter_name=g.current_user.full_name,
token=invite.token,
)
flash("resend_portfolio_invitation", user_name=invite.user_name) flash("resend_portfolio_invitation", user_name=invite.user_name)
return redirect( return redirect(
url_for( url_for(
@ -76,7 +81,9 @@ def invite_member(portfolio_id):
try: try:
invite = Portfolios.invite(portfolio, g.current_user, form.update_data) invite = Portfolios.invite(portfolio, g.current_user, form.update_data)
send_portfolio_invitation( send_portfolio_invitation(
invite.email, g.current_user.full_name, invite.token invitee_email=invite.email,
inviter_name=g.current_user.full_name,
token=invite.token,
) )
flash( flash(

View File

@ -1,94 +0,0 @@
from flask import render_template
from atst.domain.invitations import PortfolioInvitations, ApplicationInvitations
from atst.queue import queue
from atst.domain.task_orders import TaskOrders
from atst.domain.portfolio_roles import PortfolioRoles
from atst.models import ApplicationRole, PortfolioRole
OFFICER_INVITATIONS = {
"ko_invite": {
"role": "contracting_officer",
"subject": "Review a task order",
"template": "emails/portfolio/invitation.txt",
},
"cor_invite": {
"role": "contracting_officer_representative",
"subject": "Help with a task order",
"template": "emails/portfolio/invitation.txt",
},
"so_invite": {
"role": "security_officer",
"subject": "Review security for a task order",
"template": "emails/portfolio/invitation.txt",
},
}
def update_officer_invitations(user, task_order):
for invite_type in dict.keys(OFFICER_INVITATIONS):
invite_opts = OFFICER_INVITATIONS[invite_type]
if getattr(task_order, invite_type) and not getattr(
task_order, invite_opts["role"]
):
officer_data = task_order.officer_dictionary(invite_opts["role"])
officer = TaskOrders.add_officer(
task_order, invite_opts["role"], officer_data
)
pf_officer_member = PortfolioRoles.get(task_order.portfolio.id, officer.id)
invite_service = Invitation(
user,
pf_officer_member,
officer_data["email"],
subject=invite_opts["subject"],
email_template=invite_opts["template"],
)
invite_service.invite()
class Invitation:
def __init__(self, inviter, member, email, subject="", email_template=None):
self.inviter = inviter
self.member = member
self.email = email
self.subject = subject
self.email_template = email_template
if isinstance(member, PortfolioRole):
self.email_template = (
self.email_template or "emails/portfolio/invitation.txt"
)
self.subject = (
self.subject or "{} has invited you to a JEDI cloud portfolio"
)
self.domain_class = PortfolioInvitations
elif isinstance(member, ApplicationRole):
self.email_template = (
self.email_template or "emails/application/invitation.txt"
)
self.subject = (
self.subject or "{} has invited you to a JEDI cloud application"
)
self.domain_class = ApplicationInvitations
def invite(self):
invite = self._create_invite()
self._send_invite_email(invite.token)
return invite
def _create_invite(self):
user = self.member.user
return self.domain_class.create(
self.inviter,
self.member,
{"email": self.email, "dod_id": user.dod_id},
commit=True,
)
def _send_invite_email(self, token):
body = render_template(
self.email_template, owner=self.inviter.full_name, token=token
)
queue.send_mail([self.email], self.subject.format(self.inviter.full_name), body)

View File

@ -67,7 +67,7 @@ MESSAGES = {
"new_application_member": { "new_application_member": {
"title_template": translate("flash.success"), "title_template": translate("flash.success"),
"message_template": """ "message_template": """
<p>{{ "flash.new_application_member" | translate({ "user_name": new_member.user_name }) }}</p> <p>{{ "flash.new_application_member" | translate({ "user_name": user_name }) }}</p>
""", """,
"category": "success", "category": "success",
}, },

View File

@ -107,32 +107,6 @@ def test_delete_application(session):
assert not session.dirty assert not session.dirty
def test_create_member():
application = ApplicationFactory.create()
env1 = EnvironmentFactory.create(application=application)
env2 = EnvironmentFactory.create(application=application)
user_data = UserFactory.dictionary()
permission_set_names = [PermissionSets.EDIT_APPLICATION_TEAM]
member_role = Applications.create_member(
application,
user_data,
permission_set_names,
environment_roles_data=[
{"environment_id": env1.id, "role": CSPRole.BASIC_ACCESS.value},
{"environment_id": env2.id, "role": None},
],
)
assert member_role.user.dod_id == user_data["dod_id"]
# view application AND edit application team
assert len(member_role.permission_sets) == 2
env_roles = member_role.environment_roles
assert len(env_roles) == 1
assert env_roles[0].environment == env1
def test_for_user(): def test_for_user():
user = UserFactory.create() user = UserFactory.create()
portfolio = PortfolioFactory.create() portfolio = PortfolioFactory.create()
@ -189,3 +163,48 @@ def test_remove_member():
) )
is None is None
) )
def test_invite():
application = ApplicationFactory.create()
env1 = EnvironmentFactory.create(application=application)
env2 = EnvironmentFactory.create(application=application)
user_data = UserFactory.dictionary()
permission_sets_names = [PermissionSets.EDIT_APPLICATION_TEAM]
invitation = Applications.invite(
application=application,
inviter=application.portfolio.owner,
user_data=user_data,
permission_sets_names=permission_sets_names,
environment_roles_data=[
{"environment_id": env1.id, "role": CSPRole.BASIC_ACCESS.value},
{"environment_id": env2.id, "role": None},
],
)
member_role = invitation.role
assert invitation.dod_id == user_data["dod_id"]
# view application AND edit application team
assert len(member_role.permission_sets) == 2
env_roles = member_role.environment_roles
assert len(env_roles) == 1
assert env_roles[0].environment == env1
def test_invite_to_nonexistent_environment():
application = ApplicationFactory.create()
env1 = EnvironmentFactory.create(application=application)
user_data = UserFactory.dictionary()
with pytest.raises(NotFoundError):
Applications.invite(
application=application,
inviter=application.portfolio.owner,
user_data=user_data,
environment_roles_data=[
{"environment_id": env1.id, "role": CSPRole.BASIC_ACCESS.value},
{"environment_id": uuid4(), "role": CSPRole.BASIC_ACCESS.value},
],
)

View File

@ -1,10 +1,10 @@
import pytest
import uuid import uuid
from flask import url_for from flask import url_for
from atst.domain.permission_sets import PermissionSets from atst.domain.permission_sets import PermissionSets
from atst.models import CSPRole from atst.models import CSPRole
from atst.forms.data import ENV_ROLE_NO_ACCESS as NO_ACCESS from atst.forms.data import ENV_ROLE_NO_ACCESS as NO_ACCESS
from atst.queue import queue
from tests.factories import * from tests.factories import *
@ -145,7 +145,8 @@ def test_update_team_revoke_environment_access(client, user_session, db, session
assert not session.query(env_role_exists).scalar() assert not session.query(env_role_exists).scalar()
def test_create_member(client, user_session): def test_create_member(client, user_session, session):
queue_length = len(queue.get_queue())
user = UserFactory.create() user = UserFactory.create()
application = ApplicationFactory.create( application = ApplicationFactory.create(
environments=[{"name": "Naboo"}, {"name": "Endor"}] environments=[{"name": "Naboo"}, {"name": "Endor"}]
@ -179,14 +180,18 @@ def test_create_member(client, user_session):
_external=True, _external=True,
) )
assert response.location == expected_url assert response.location == expected_url
assert len(user.application_roles) == 1 assert len(application.roles) == 1
assert user.application_roles[0].application == application environment_roles = application.roles[0].environment_roles
environment_roles = [
er for ar in user.application_roles for er in ar.environment_roles
]
assert len(environment_roles) == 1 assert len(environment_roles) == 1
assert environment_roles[0].environment == env assert environment_roles[0].environment == env
invitation = (
session.query(ApplicationInvitation).filter_by(dod_id=user.dod_id).one()
)
assert invitation.role.application == application
assert len(queue.get_queue()) == queue_length + 1
def test_remove_member_success(client, user_session): def test_remove_member_success(client, user_session):
user = UserFactory.create() user = UserFactory.create()

View File

@ -1,31 +0,0 @@
from tests.factories import (
ApplicationFactory,
ApplicationRoleFactory,
UserFactory,
PortfolioFactory,
PortfolioRoleFactory,
)
from atst.services.invitation import Invitation
def test_invite_portfolio_member(queue):
inviter = UserFactory.create()
new_member = UserFactory.create()
portfolio = PortfolioFactory.create(owner=inviter)
ws_member = PortfolioRoleFactory.create(user=new_member, portfolio=portfolio)
invite_service = Invitation(inviter, ws_member, new_member.email)
new_invitation = invite_service.invite()
assert new_invitation == new_member.portfolio_invitations[0]
assert len(queue.get_queue()) == 1
def test_invite_application_member(queue):
inviter = UserFactory.create()
new_member = UserFactory.create()
application = ApplicationFactory.create()
member = ApplicationRoleFactory.create(user=new_member, application=application)
invite_service = Invitation(inviter, member, new_member.email)
new_invitation = invite_service.invite()
assert new_invitation == new_member.application_invitations[0]
assert len(queue.get_queue()) == 1

View File

@ -65,6 +65,9 @@ components:
The <strong>https://</strong> ensures that you are connecting to the official website and that any information you provide is encrypted and transmitted securely. The <strong>https://</strong> ensures that you are connecting to the official website and that any information you provide is encrypted and transmitted securely.
</p> </p>
title: Heres how you know title: Heres how you know
email:
application_invite: "{inviter_name} has invited you to a JEDI cloud application"
portfolio_invite: "{inviter_name} has invited you to a JEDI cloud portfolio"
flash: flash:
application: application:
deleted: 'You have successfully deleted the {application_name} application. To view the retained activity log, visit the portfolio administration page.' deleted: 'You have successfully deleted the {application_name} application. To view the retained activity log, visit the portfolio administration page.'