diff --git a/atst/domain/csp/cloud/azure_cloud_provider.py b/atst/domain/csp/cloud/azure_cloud_provider.py index 39c2e83a..5a42b8a0 100644 --- a/atst/domain/csp/cloud/azure_cloud_provider.py +++ b/atst/domain/csp/cloud/azure_cloud_provider.py @@ -311,6 +311,41 @@ class AzureCloudProvider(CloudProviderInterface): management_group_id=management_group_id, ) + def disable_user(self, tenant_id, cloud_id): + sp_token = self._get_tenant_principal_token(tenant_id) + if sp_token is None: + raise AuthenticationException("Could not resolve token in disable user") + headers = { + "Authorization": f"Bearer {sp_token}", + } + + try: + result = self.sdk.requests.delete( + f"{self.sdk.cloud.endpoints.resource_manager}providers/Microsoft.Authorization/roleAssignments/{cloud_id}?api-version=2015-07-01", + headers=headers, + timeout=30, + ) + result.raise_for_status() + return result.json() + + except self.sdk.requests.exceptions.ConnectionError: + app.logger.error( + f"Could not disable user. Connection Error", exc_info=1, + ) + raise ConnectionException("connection error azure disable user") + except self.sdk.requests.exceptions.Timeout: + app.logger.error( + f"Could not disable user. Request timed out.", exc_info=1, + ) + raise ConnectionException("timout error azure disable user") + except self.sdk.requests.exceptions.HTTPError as exc: + app.logger.error( + result.status_code, "azure application error disable user", exc_info=1, + ) + raise UnknownServerException( + result.status_code, f"azure application error disable user. {str(exc)}", + ) + def create_tenant(self, payload: TenantCSPPayload): sp_token = self._get_root_provisioning_token() if sp_token is None: diff --git a/atst/domain/csp/cloud/mock_cloud_provider.py b/atst/domain/csp/cloud/mock_cloud_provider.py index a34f40d5..87f78b3b 100644 --- a/atst/domain/csp/cloud/mock_cloud_provider.py +++ b/atst/domain/csp/cloud/mock_cloud_provider.py @@ -417,7 +417,7 @@ class MockCloudProvider(CloudProviderInterface): self._maybe_raise(self.UNAUTHORIZED_RATE, self.AUTHORIZATION_EXCEPTION) return self._id() - def disable_user(self, auth_credentials, csp_user_id): + def disable_user(self, tenant_id, cloud_id): self._authorize(auth_credentials) self._maybe_raise(self.NETWORK_FAILURE_PCT, self.NETWORK_EXCEPTION) self._maybe_raise(self.SERVER_FAILURE_PCT, self.SERVER_EXCEPTION) diff --git a/tests/domain/cloud/test_azure_csp.py b/tests/domain/cloud/test_azure_csp.py index 5deddc63..049b291b 100644 --- a/tests/domain/cloud/test_azure_csp.py +++ b/tests/domain/cloud/test_azure_csp.py @@ -206,6 +206,48 @@ def test_create_policy_definition_succeeds(mock_azure: AzureCloudProvider): ) +def test_disable_user(mock_azure: AzureCloudProvider): + mock_result = Mock() + mock_result.json.return_value = { + "properties": { + "roleDefinitionId": "/subscriptions/subId/providers/Microsoft.Authorization/roleDefinitions/roledefinitionId", + "principalId": "Pid", + "scope": "/subscriptions/subId/resourcegroups/rgname", + }, + "id": "/subscriptions/subId/resourcegroups/rgname/providers/Microsoft.Authorization/roleAssignments/roleassignmentId", + "type": "Microsoft.Authorization/roleAssignments", + "name": "roleassignmentId", + } + + mock_result.status_code = 200 + mock_http_error_resp = mock_requests_response( + status=500, + raise_for_status=mock_azure.sdk.requests.exceptions.HTTPError( + "500 Server Error" + ), + ) + mock_azure.sdk.requests.delete.side_effect = [ + mock_azure.sdk.requests.exceptions.ConnectionError, + mock_azure.sdk.requests.exceptions.Timeout, + mock_http_error_resp, + mock_result, + ] + mock_azure = mock_get_secret(mock_azure) + + tenant_id = "60ff9d34-82bf-4f21-b565-308ef0533435" + cloud_id = "roleassignmentId" + + with pytest.raises(ConnectionException): + mock_azure.disable_user(tenant_id, cloud_id) + with pytest.raises(ConnectionException): + mock_azure.disable_user(tenant_id, cloud_id) + with pytest.raises(UnknownServerException, match=r".*500 Server Error.*"): + mock_azure.disable_user(tenant_id, cloud_id) + + result = mock_azure.disable_user(tenant_id, cloud_id) + assert result.get("name") == cloud_id + + def test_create_tenant(mock_azure: AzureCloudProvider): mock_result = Mock() mock_result.json.return_value = {