users have permission sets for site-wide perms

This commit is contained in:
dandds 2019-03-18 16:42:53 -04:00
parent 27314b8120
commit bec5d11bfe
16 changed files with 114 additions and 103 deletions

View File

@ -0,0 +1,37 @@
"""users to permission_sets join table, remove role rel
Revision ID: fc08d99bb7f7
Revises: a19138e386c4
Create Date: 2019-03-18 06:13:43.128905
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = 'fc08d99bb7f7'
down_revision = 'a19138e386c4'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('users_permission_sets',
sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('permission_set_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.ForeignKeyConstraint(['permission_set_id'], ['permission_sets.id'], ),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], )
)
op.drop_constraint('users_atat_role_id_fkey', 'users', type_='foreignkey')
op.drop_column('users', 'atat_role_id')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('users', sa.Column('atat_role_id', postgresql.UUID(), autoincrement=False, nullable=True))
op.create_foreign_key('users_atat_role_id_fkey', 'users', 'permission_sets', ['atat_role_id'], ['id'])
op.drop_table('users_permission_sets')
# ### end Alembic commands ###

View File

@ -31,9 +31,7 @@ class AuthenticationContext:
except NotFoundError: except NotFoundError:
email = self._get_user_email() email = self._get_user_email()
return Users.create( return Users.create(permission_sets=[], email=email, **self.parsed_sdn)
atat_role_name="default", email=email, **self.parsed_sdn
)
def _get_user_email(self): def _get_user_email(self):
try: try:

View File

@ -16,7 +16,7 @@ class Authorization(object):
@classmethod @classmethod
def has_atat_permission(cls, user, permission): def has_atat_permission(cls, user, permission):
return permission in user.atat_role.permissions return permission in user.permissions
@classmethod @classmethod
def is_in_portfolio(cls, user, portfolio): def is_in_portfolio(cls, user, portfolio):
@ -36,10 +36,6 @@ class Authorization(object):
def can_view_audit_log(cls, user): def can_view_audit_log(cls, user):
return Authorization.has_atat_permission(user, Permissions.VIEW_AUDIT_LOG) return Authorization.has_atat_permission(user, Permissions.VIEW_AUDIT_LOG)
@classmethod
def is_ccpo(cls, user):
return user.atat_role.name == "ccpo"
@classmethod @classmethod
def is_ko(cls, user, task_order): def is_ko(cls, user, task_order):
return user == task_order.contracting_officer return user == task_order.contracting_officer

View File

@ -100,7 +100,6 @@ class Portfolios(object):
first_name=data["first_name"], first_name=data["first_name"],
last_name=data["last_name"], last_name=data["last_name"],
email=data["email"], email=data["email"],
atat_role_name="default",
provisional=True, provisional=True,
) )
permission_sets = data.get("permission_sets", []) permission_sets = data.get("permission_sets", [])

View File

@ -28,11 +28,14 @@ class Users(object):
return user return user
@classmethod @classmethod
def create(cls, dod_id, atat_role_name=None, **kwargs): def create(cls, dod_id, permission_sets=None, **kwargs):
atat_role = PermissionSets.get(atat_role_name) if permission_sets:
permission_sets = PermissionSets.get_many(permission_sets)
else:
permission_sets = []
try: try:
user = User(dod_id=dod_id, atat_role=atat_role, **kwargs) user = User(dod_id=dod_id, permission_sets=permission_sets, **kwargs)
db.session.add(user) db.session.add(user)
db.session.commit() db.session.commit()
except IntegrityError: except IntegrityError:
@ -52,18 +55,6 @@ class Users(object):
return user return user
@classmethod
def update_role(cls, user_id, atat_role_name):
user = Users.get(user_id)
atat_role = PermissionSets.get(atat_role_name)
user.atat_role = atat_role
db.session.add(user)
db.session.commit()
return user
_UPDATEABLE_ATTRS = { _UPDATEABLE_ATTRS = {
"first_name", "first_name",
"last_name", "last_name",

View File

@ -1,2 +1,3 @@
from .timestamps import TimestampsMixin from .timestamps import TimestampsMixin
from .auditable import AuditableMixin from .auditable import AuditableMixin
from .permissions import PermissionsMixin

View File

@ -0,0 +1,6 @@
class PermissionsMixin(object):
@property
def permissions(self):
return [
perm for permset in self.permission_sets for perm in permset.permissions
]

View File

@ -37,7 +37,9 @@ portfolio_roles_permission_sets = Table(
) )
class PortfolioRole(Base, mixins.TimestampsMixin, mixins.AuditableMixin): class PortfolioRole(
Base, mixins.TimestampsMixin, mixins.AuditableMixin, mixins.PermissionsMixin
):
__tablename__ = "portfolio_roles" __tablename__ = "portfolio_roles"
id = Id() id = Id()
@ -56,12 +58,6 @@ class PortfolioRole(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
"PermissionSet", secondary=portfolio_roles_permission_sets "PermissionSet", secondary=portfolio_roles_permission_sets
) )
@property
def permissions(self):
return [
perm for permset in self.permission_sets for perm in permset.permissions
]
def __repr__(self): def __repr__(self):
return "<PortfolioRole(portfolio='{}', user_id='{}', id='{}', permissions={})>".format( return "<PortfolioRole(portfolio='{}', user_id='{}', id='{}', permissions={})>".format(
self.portfolio.name, self.user_id, self.id, self.permissions self.portfolio.name, self.user_id, self.id, self.permissions

View File

@ -1,4 +1,4 @@
from sqlalchemy import String, ForeignKey, Column, Date, Boolean from sqlalchemy import String, ForeignKey, Column, Date, Boolean, Table
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.dialects.postgresql import UUID
@ -6,14 +6,24 @@ from atst.models import Base, types, mixins
from atst.models.permissions import Permissions from atst.models.permissions import Permissions
class User(Base, mixins.TimestampsMixin, mixins.AuditableMixin): users_permission_sets = Table(
"users_permission_sets",
Base.metadata,
Column("user_id", UUID(as_uuid=True), ForeignKey("users.id")),
Column("permission_set_id", UUID(as_uuid=True), ForeignKey("permission_sets.id")),
)
class User(
Base, mixins.TimestampsMixin, mixins.AuditableMixin, mixins.PermissionsMixin
):
__tablename__ = "users" __tablename__ = "users"
id = types.Id() id = types.Id()
username = Column(String) username = Column(String)
atat_role_id = Column(UUID(as_uuid=True), ForeignKey("permission_sets.id"))
atat_role = relationship("PermissionSet") permission_sets = relationship("PermissionSet", secondary=users_permission_sets)
portfolio_roles = relationship("PortfolioRole", backref="user") portfolio_roles = relationship("PortfolioRole", backref="user")
email = Column(String, unique=True) email = Column(String, unique=True)
@ -52,36 +62,21 @@ class User(Base, mixins.TimestampsMixin, mixins.AuditableMixin):
] ]
) )
@property
def atat_permissions(self):
return self.atat_role.permissions
@property
def atat_role_name(self):
return self.atat_role.name
@property @property
def full_name(self): def full_name(self):
return "{} {}".format(self.first_name, self.last_name) return "{} {}".format(self.first_name, self.last_name)
@property @property
def has_portfolios(self): def has_portfolios(self):
return ( return (Permissions.VIEW_PORTFOLIO in self.permissions) or self.portfolio_roles
Permissions.VIEW_PORTFOLIO in self.atat_role.permissions
) or self.portfolio_roles
@property @property
def displayname(self): def displayname(self):
return self.full_name return self.full_name
def __repr__(self): def __repr__(self):
return "<User(name='{}', dod_id='{}', email='{}', role='{}', has_portfolios='{}', id='{}')>".format( return "<User(name='{}', dod_id='{}', email='{}', has_portfolios='{}', id='{}')>".format(
self.full_name, self.full_name, self.dod_id, self.email, self.has_portfolios, self.id
self.dod_id,
self.email,
self.atat_role_name,
self.has_portfolios,
self.id,
) )
def to_dictionary(self): def to_dictionary(self):

View File

@ -11,18 +11,33 @@ import pendulum
from . import redirect_after_login_url from . import redirect_after_login_url
from atst.domain.users import Users from atst.domain.users import Users
from atst.domain.permission_sets import PermissionSets
from atst.queue import queue from atst.queue import queue
from tests.factories import random_service_branch from tests.factories import random_service_branch
from atst.utils import pick from atst.utils import pick
bp = Blueprint("dev", __name__) bp = Blueprint("dev", __name__)
_ALL_PERMS = [
"ccpo",
PermissionSets.VIEW_PORTFOLIO,
PermissionSets.VIEW_PORTFOLIO_APPLICATION_MANAGEMENT,
PermissionSets.VIEW_PORTFOLIO_FUNDING,
PermissionSets.VIEW_PORTFOLIO_REPORTS,
PermissionSets.VIEW_PORTFOLIO_ADMIN,
PermissionSets.EDIT_PORTFOLIO_APPLICATION_MANAGEMENT,
PermissionSets.EDIT_PORTFOLIO_FUNDING,
PermissionSets.EDIT_PORTFOLIO_REPORTS,
PermissionSets.EDIT_PORTFOLIO_ADMIN,
PermissionSets.PORTFOLIO_POC,
]
_DEV_USERS = { _DEV_USERS = {
"sam": { "sam": {
"dod_id": "6346349876", "dod_id": "6346349876",
"first_name": "Sam", "first_name": "Sam",
"last_name": "Stevenson", "last_name": "Stevenson",
"atat_role_name": "ccpo", "permission_sets": _ALL_PERMS,
"email": "sam@example.com", "email": "sam@example.com",
"service_branch": random_service_branch(), "service_branch": random_service_branch(),
"phone_number": "1234567890", "phone_number": "1234567890",
@ -34,7 +49,6 @@ _DEV_USERS = {
"dod_id": "2345678901", "dod_id": "2345678901",
"first_name": "Amanda", "first_name": "Amanda",
"last_name": "Adamson", "last_name": "Adamson",
"atat_role_name": "default",
"email": "amanda@example.com", "email": "amanda@example.com",
"service_branch": random_service_branch(), "service_branch": random_service_branch(),
"phone_number": "1234567890", "phone_number": "1234567890",
@ -46,7 +60,6 @@ _DEV_USERS = {
"dod_id": "3456789012", "dod_id": "3456789012",
"first_name": "Brandon", "first_name": "Brandon",
"last_name": "Buchannan", "last_name": "Buchannan",
"atat_role_name": "default",
"email": "brandon@example.com", "email": "brandon@example.com",
"service_branch": random_service_branch(), "service_branch": random_service_branch(),
"phone_number": "1234567890", "phone_number": "1234567890",
@ -58,7 +71,6 @@ _DEV_USERS = {
"dod_id": "4567890123", "dod_id": "4567890123",
"first_name": "Christina", "first_name": "Christina",
"last_name": "Collins", "last_name": "Collins",
"atat_role_name": "default",
"email": "christina@example.com", "email": "christina@example.com",
"service_branch": random_service_branch(), "service_branch": random_service_branch(),
"phone_number": "1234567890", "phone_number": "1234567890",
@ -70,7 +82,6 @@ _DEV_USERS = {
"dod_id": "5678901234", "dod_id": "5678901234",
"first_name": "Dominick", "first_name": "Dominick",
"last_name": "Domingo", "last_name": "Domingo",
"atat_role_name": "default",
"email": "dominick@example.com", "email": "dominick@example.com",
"service_branch": random_service_branch(), "service_branch": random_service_branch(),
"phone_number": "1234567890", "phone_number": "1234567890",
@ -82,7 +93,6 @@ _DEV_USERS = {
"dod_id": "6789012345", "dod_id": "6789012345",
"first_name": "Erica", "first_name": "Erica",
"last_name": "Eichner", "last_name": "Eichner",
"atat_role_name": "default",
"email": "erica@example.com", "email": "erica@example.com",
"service_branch": random_service_branch(), "service_branch": random_service_branch(),
"phone_number": "1234567890", "phone_number": "1234567890",
@ -101,7 +111,7 @@ def login_dev():
user_data["dod_id"], user_data["dod_id"],
**pick( **pick(
[ [
"atat_role_name", "permission_sets",
"first_name", "first_name",
"last_name", "last_name",
"email", "email",

View File

@ -14,7 +14,7 @@ from tests.factories import (
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def ccpo(): def ccpo():
return UserFactory.from_atat_role("ccpo") return UserFactory.create_ccpo()
@pytest.fixture(scope="function") @pytest.fixture(scope="function")

View File

@ -151,7 +151,7 @@ def test_owner_can_view_portfolio_members(portfolio, portfolio_owner):
@pytest.mark.skip(reason="no ccpo access yet") @pytest.mark.skip(reason="no ccpo access yet")
def test_ccpo_can_view_portfolio_members(portfolio, portfolio_owner): def test_ccpo_can_view_portfolio_members(portfolio, portfolio_owner):
ccpo = UserFactory.from_atat_role("ccpo") ccpo = UserFactory.create_ccpo()
assert Portfolios.get_with_members(ccpo, portfolio.id) assert Portfolios.get_with_members(ccpo, portfolio.id)
@ -277,7 +277,7 @@ def test_for_user_does_not_return_inactive_portfolios(portfolio, portfolio_owner
@pytest.mark.skip(reason="CCPO status not fully implemented") @pytest.mark.skip(reason="CCPO status not fully implemented")
def test_for_user_returns_all_portfolios_for_ccpo(portfolio, portfolio_owner): def test_for_user_returns_all_portfolios_for_ccpo(portfolio, portfolio_owner):
sam = UserFactory.from_atat_role("ccpo") sam = UserFactory.create_ccpo()
PortfolioFactory.create() PortfolioFactory.create()
sams_portfolios = Portfolios.for_user(sam) sams_portfolios = Portfolios.for_user(sam)
@ -297,7 +297,7 @@ def test_get_for_update_information(portfolio, portfolio_owner):
assert portfolio == admin_ws assert portfolio == admin_ws
# TODO: implement ccpo roles # TODO: implement ccpo roles
# ccpo = UserFactory.from_atat_role("ccpo") # ccpo = UserFactory.create_ccpo()
# assert Portfolios.get_for_update_information(ccpo, portfolio.id) # assert Portfolios.get_for_update_information(ccpo, portfolio.id)
developer = UserFactory.create() developer = UserFactory.create()

View File

@ -4,81 +4,64 @@ from uuid import uuid4
from atst.domain.users import Users from atst.domain.users import Users
from atst.domain.exceptions import NotFoundError, AlreadyExistsError, UnauthorizedError from atst.domain.exceptions import NotFoundError, AlreadyExistsError, UnauthorizedError
from tests.factories import UserFactory
DOD_ID = "my_dod_id" DOD_ID = "my_dod_id"
def test_create_user(): def test_create_user():
user = Users.create(DOD_ID, "default") user = Users.create(DOD_ID)
assert user.atat_role.name == "default" assert user.dod_id == DOD_ID
def test_create_user_with_existing_email(): def test_create_user_with_existing_email():
Users.create(DOD_ID, "default", email="thisusersemail@usersRus.com") Users.create(DOD_ID, email="thisusersemail@usersRus.com")
with pytest.raises(AlreadyExistsError): with pytest.raises(AlreadyExistsError):
Users.create(DOD_ID, "ccpo", email="thisusersemail@usersRus.com") Users.create(DOD_ID, email="thisusersemail@usersRus.com")
@pytest.mark.skip(reason="no more roles")
def test_create_user_with_nonexistent_role(): def test_create_user_with_nonexistent_role():
with pytest.raises(NotFoundError): with pytest.raises(NotFoundError):
Users.create(DOD_ID, "nonexistent") Users.create(DOD_ID, "nonexistent")
def test_get_or_create_nonexistent_user(): def test_get_or_create_nonexistent_user():
user = Users.get_or_create_by_dod_id(DOD_ID, atat_role_name="default") user = Users.get_or_create_by_dod_id(DOD_ID)
assert user.dod_id == DOD_ID assert user.dod_id == DOD_ID
def test_get_or_create_existing_user(): def test_get_or_create_existing_user():
Users.get_or_create_by_dod_id(DOD_ID, atat_role_name="default") fact_user = UserFactory.create()
user = Users.get_or_create_by_dod_id(DOD_ID, atat_role_name="default") user = Users.get_or_create_by_dod_id(fact_user.dod_id)
assert user assert user == fact_user
def test_get_user(): def test_get_user():
new_user = Users.create(DOD_ID, "default") new_user = UserFactory.create()
user = Users.get(new_user.id) user = Users.get(new_user.id)
assert user.id == new_user.id assert user.id == new_user.id
def test_get_nonexistent_user(): def test_get_nonexistent_user():
Users.create(DOD_ID, "default")
with pytest.raises(NotFoundError): with pytest.raises(NotFoundError):
Users.get(uuid4()) Users.get(uuid4())
def test_get_user_by_dod_id(): def test_get_user_by_dod_id():
new_user = Users.create(DOD_ID, "default") new_user = UserFactory.create()
user = Users.get_by_dod_id(DOD_ID) user = Users.get_by_dod_id(new_user.dod_id)
assert user == new_user assert user == new_user
def test_update_role():
new_user = Users.create(DOD_ID, "default")
updated_user = Users.update_role(new_user.id, "ccpo")
assert updated_user.atat_role.name == "ccpo"
def test_update_role_with_nonexistent_user():
Users.create(DOD_ID, "default")
with pytest.raises(NotFoundError):
Users.update_role(uuid4(), "ccpo")
def test_update_existing_user_with_nonexistent_role():
new_user = Users.create(DOD_ID, "default")
with pytest.raises(NotFoundError):
Users.update_role(new_user.id, "nonexistent")
def test_update_user(): def test_update_user():
new_user = Users.create(DOD_ID, "default") new_user = UserFactory.create()
updated_user = Users.update(new_user, {"first_name": "Jabba"}) updated_user = Users.update(new_user, {"first_name": "Jabba"})
assert updated_user.first_name == "Jabba" assert updated_user.first_name == "Jabba"
def test_update_user_with_dod_id(): def test_update_user_with_dod_id():
new_user = Users.create(DOD_ID, "default") new_user = UserFactory.create()
with pytest.raises(UnauthorizedError) as excinfo: with pytest.raises(UnauthorizedError) as excinfo:
Users.update(new_user, {"dod_id": "1234567890"}) Users.update(new_user, {"dod_id": "1234567890"})

View File

@ -89,7 +89,7 @@ class UserFactory(Base):
email = factory.Faker("email") email = factory.Faker("email")
first_name = factory.Faker("first_name") first_name = factory.Faker("first_name")
last_name = factory.Faker("last_name") last_name = factory.Faker("last_name")
atat_role = factory.LazyFunction(lambda: PermissionSets.get("default")) permission_sets = []
dod_id = factory.LazyFunction(random_dod_id) dod_id = factory.LazyFunction(random_dod_id)
phone_number = factory.LazyFunction(random_phone_number) phone_number = factory.LazyFunction(random_phone_number)
service_branch = factory.LazyFunction(random_service_branch) service_branch = factory.LazyFunction(random_service_branch)
@ -101,9 +101,8 @@ class UserFactory(Base):
) )
@classmethod @classmethod
def from_atat_role(cls, atat_role_name, **kwargs): def create_ccpo(cls, **kwargs):
role = PermissionSets.get(atat_role_name) return cls.create(permission_sets=PermissionSets.get_all(), **kwargs)
return cls.create(atat_role=role, **kwargs)
class PortfolioFactory(Base): class PortfolioFactory(Base):

View File

@ -42,7 +42,7 @@ def test_user_without_permission_has_no_budget_report_link(client, user_session)
@pytest.mark.skip(reason="Temporarily no add activity log link") @pytest.mark.skip(reason="Temporarily no add activity log link")
def test_user_with_permission_has_activity_log_link(client, user_session): def test_user_with_permission_has_activity_log_link(client, user_session):
portfolio = PortfolioFactory.create() portfolio = PortfolioFactory.create()
ccpo = UserFactory.from_atat_role("ccpo") ccpo = UserFactory.create_ccpo()
admin = UserFactory.create() admin = UserFactory.create()
PortfolioRoleFactory.create( PortfolioRoleFactory.create(
portfolio=portfolio, user=admin, status=PortfolioRoleStatus.ACTIVE portfolio=portfolio, user=admin, status=PortfolioRoleStatus.ACTIVE

View File

@ -52,7 +52,7 @@ def test_successful_login_redirect_ccpo(client, monkeypatch):
role = PermissionSets.get("ccpo") role = PermissionSets.get("ccpo")
monkeypatch.setattr( monkeypatch.setattr(
"atst.domain.authnid.AuthenticationContext.get_user", "atst.domain.authnid.AuthenticationContext.get_user",
lambda *args: UserFactory.create(atat_role=role), lambda *args: UserFactory.create(),
) )
resp = _login(client) resp = _login(client)