Draft implementations of remote admin creation and root management group ownership.

This commit is contained in:
tomdds 2020-01-28 20:14:42 -05:00
parent 7bf6b9addc
commit 144312863c
3 changed files with 261 additions and 19 deletions

View File

@ -1,4 +1,5 @@
import re
import time
from secrets import token_urlsafe
from typing import Dict
from uuid import uuid4
@ -76,6 +77,8 @@ class AzureCloudProvider(CloudProviderInterface):
self.secret_key = config["AZURE_SECRET_KEY"]
self.tenant_id = config["AZURE_TENANT_ID"]
self.vault_url = config["AZURE_VAULT_URL"]
# self.ps_client_id = config["POWERSHELL_CLIENT_ID"]
self.ps_client_id = "1950a258-227b-4e31-a9cf-717495945fc2"
if azure_sdk_provider is None:
self.sdk = AzureSDKProvider()
@ -479,21 +482,188 @@ class AzureCloudProvider(CloudProviderInterface):
else:
return self._error(result.json())
def create_remote_admin(self, creds, tenant_details):
# create app/service principal within tenant, with name constructed from tenant details
# assign principal global admin
# needs to call out to CLI with tenant owner username/password, prototyping for that underway
# return identifier and creds to consumer for storage
response = {"clientId": "string", "secretKey": "string", "tenantId": "string"}
return self._ok(
{
"client_id": response["clientId"],
"secret_key": response["secret_key"],
"tenant_id": response["tenantId"],
}
def assign_root_mg_ownership(self, payload):
import ipdb; ipdb.set_trace()
# elevate
mgmt_token = self.get_tenant_admin_token(
payload.tenant_id, self.sdk.cloud.endpoints.resource_manager
)
if mgmt_token is None:
raise AuthenticationException(
"Could not resolve management token for tenant admin"
)
auth_header = {
"Authorization": f"Bearer {mgmt_token}",
}
url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Authorization/elevateAccess?api-version=2016-07-01"
result = self.sdk.requests.post(url, headers=auth_header)
if not result.ok:
return False
# ----------- NEXT STEP: Root MGMT Group Ownership (tenant admin) -------------
time.sleep(20)
# HARD CODED, MOVE TO CONFIG
ownerRoleId = '8e3af657-a8ff-443c-a75c-2fe8c4bcb635'
role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{ownerRoleId}"
request_body = {
"properties": {
"roleDefinitionId": role_definition_id,
"principalId": payload.user_object_id
}
}
auth_header = {
"Authorization": f"Bearer {mgmt_token}",
}
assignment_guid = str(uuid4())
url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleAssignments/{assignment_guid}?api-version=2015-07-01"
response = self.sdk.requests.put(url, headers=auth_header, json=request_body)
if not response.ok:
return False
# ----------- NEXT STEP: Root MGMT Group Ownership (remote admin SP) -------------
time.sleep(20)
# HARD CODED, MOVE TO CONFIG
ownerRoleId = '8e3af657-a8ff-443c-a75c-2fe8c4bcb635'
# NOTE: the tenant_id is also the id of the root management group, once it is created
role_definition_id = f"/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleDefinitions/{ownerRoleId}"
request_body = {
"properties": {
"roleDefinitionId": role_definition_id,
"principalId": payload.admin_principal_id
}
}
auth_header = {
"Authorization": f"Bearer {mgmt_token}",
}
assignment_guid = str(uuid4())
url = f"{self.sdk.cloud.endpoints.resource_manager}/providers/Microsoft.Management/managementGroups/{payload.tenant_id}/providers/Microsoft.Authorization/roleAssignments/{assignment_guid}?api-version=2015-07-01"
response = self.sdk.requests.put(url, headers=auth_header, json=request_body)
if not response.ok:
return False
def create_remote_admin(self, payload):
import ipdb; ipdb.set_trace()
GRAPH_RESOURCE = "https://graph.microsoft.com"
graph_token = self.get_tenant_admin_token(payload.tenant_id, GRAPH_RESOURCE)
if graph_token is None:
raise AuthenticationException(
"Could not resolve graph token for tenant admin"
)
request_body = {"displayName": "ATAT Remote Admin"}
auth_header = {
"Authorization": f"Bearer {graph_token}",
}
url = f"{GRAPH_RESOURCE}/v1.0/applications"
response = self.sdk.requests.post(url, json=request_body, headers=auth_header)
res = response.json()
result1 = {}
if response.ok:
result1 = {"app_id": res.get("appId"), "object_id": res.get("id")}
# ---- SEPARATE STEP (Create associated Service Principal) ----------
time.sleep(20)
request_body = {"appId": result1.get("app_id")}
auth_header = {
"Authorization": f"Bearer {graph_token}",
}
url = f"{GRAPH_RESOURCE}/beta/servicePrincipals"
response = self.sdk.requests.post(url, json=request_body, headers=auth_header)
res = response.json()
result2 = {}
if response.ok:
result2 = {"sp_id": res.get("id")}
# ---- SEPARATE STEP - Generate Creds (Client Secret)----------
time.sleep(20)
request_body = {
"passwordCredentials": [{"displayName": "ATAT Generated Password"}]
}
auth_header = {
"Authorization": f"Bearer {graph_token}",
}
# Uses OBJECT_ID of App Registration
url = (
f"{GRAPH_RESOURCE}/v1.0/applications/{result1.get('object_id')}/addPassword"
)
response = self.sdk.requests.post(url, json=request_body, headers=auth_header)
result3 = {}
res = response.json()
if response.ok:
result3 = {"client_secret": res.get("secretText")}
# ---- SEPARATE STEP - Source Global Admin Role----------
auth_header = {
"Authorization": f"Bearer {graph_token}",
}
# Uses OBJECT_ID of App Registration
url = f"{GRAPH_RESOURCE}/beta/roleManagement/directory/roleDefinitions"
response = self.sdk.requests.get(url, headers=auth_header)
result = response.json()
roleList = result.get("value")
admin_role_id = "794bb258-3e31-42ff-9ee4-731a72f62851" # May be hard coded? use for fall back
for role in roleList:
if role.get("displayName") == "Company Administrator":
admin_role_id = role.get("id")
break
# ---- SEPARATE STEP - Source Global Admin Role----------
time.sleep(20)
request_body = {
"principalId": result2.get("sp_id"),
"roleDefinitionId": admin_role_id,
"resourceScope": "/",
}
auth_header = {
"Authorization": f"Bearer {graph_token}",
}
url = f"{GRAPH_RESOURCE}/beta/roleManagement/directory/roleAssignments"
response = self.sdk.requests.post(url, headers=auth_header, json=request_body)
if response.ok:
return (result1, result2, result3)
return False
def force_tenant_admin_pw_update(self, creds, tenant_owner_id):
# use creds to update to force password recovery?
@ -563,9 +733,21 @@ class AzureCloudProvider(CloudProviderInterface):
if sub_id_match:
return sub_id_match.group(1)
def get_tenant_principal_token(self, tenant_id):
def get_tenant_admin_token(self, tenant_id, resource):
creds = self.get_secret(tenant_id)
return self._get_sp_token(creds)
return self._get_up_token_for_resource(
creds.get("admin_username"), creds.get("admin_password"), tenant_id, resource
)
def get_tenant_principal_token(self, tenant_id, resource):
# creds = self.get_secret(tenant_id)
# return self._get_up_token_for_resource(
# creds.get("admin_username"),
# creds.get("admin_password"),
# tenat_id,
# resource
# )
pass
def get_root_provisioning_token(self):
return self._get_sp_token(self._root_creds)
@ -586,6 +768,19 @@ class AzureCloudProvider(CloudProviderInterface):
return token_response.get("accessToken", None)
def _get_up_token_for_resource(self, username, password, tenant_id, resource):
context = self.sdk.adal.AuthenticationContext(
f"{self.sdk.cloud.endpoints.active_directory}/{tenant_id}"
)
# TODO: handle failure states here
token_response = context.acquire_token_with_username_password(
resource, username, password, self.ps_client_id
)
return token_response.get("accessToken", None)
def _get_credential_obj(self, creds, resource=None):
return self.sdk.credentials.ServicePrincipalCredentials(
client_id=creds.get("client_id"),

View File

@ -1,4 +1,4 @@
from unittest.mock import Mock
from unittest.mock import Mock, patch
from uuid import uuid4
from tests.factories import ApplicationFactory, EnvironmentFactory
@ -6,6 +6,7 @@ from tests.mock_azure import AUTH_CREDENTIALS, mock_azure
from atst.domain.csp.cloud import AzureCloudProvider
from atst.domain.csp.cloud.models import (
BaseCSPPayload,
BillingInstructionCSPPayload,
BillingInstructionCSPResult,
BillingProfileCreationCSPPayload,
@ -407,3 +408,47 @@ def test_create_billing_instruction(mock_azure: AzureCloudProvider):
body: BillingInstructionCSPResult = result.get("body")
assert body.reported_clin_name == "TO1:CLIN001"
def test_admin_principal_creation(mock_azure: AzureCloudProvider):
# Auth As Tenant Admin
# Create App Registration
# Create Service Principal
# Create App Registration Password Credential
# Lookup global admin role
# Assign global admin role to Service Principal
with patch.object(
AzureCloudProvider, "get_secret", wraps=mock_azure.get_secret
) as mock_get_secret:
mock_get_secret.return_value = {
"admin_username": "",
"admin_password": "",
}
payload = BaseCSPPayload(
**{"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4"}
)
result = mock_azure.create_remote_admin(payload)
print(result)
def test_admin_mg_ownership(mock_azure: AzureCloudProvider):
with patch.object(
AzureCloudProvider, "get_secret", wraps=mock_azure.get_secret
) as mock_get_secret:
mock_get_secret.return_value = {
"admin_username": "",
"admin_password": "",
}
payload = TenantCSPResult(
**{
"user_id": "blach",
"tenant_id": "6d2d2d6c-a6d6-41e1-8bb1-73d11475f8f4",
"user_object_id": "971efe4d-1e80-4e39-b3b9-4e5c63ad446d",
}
)
result = mock_azure.assign_root_mg_ownership(payload)
print(result)

View File

@ -63,13 +63,15 @@ def mock_policy():
def mock_adal():
import adal
return Mock(spec=adal)
return adal
# return Mock(spec=adal)
def mock_requests():
import requests
return Mock(spec=requests)
# return Mock(spec=requests)
return requests
def mock_secrets():