From 7a8b96d2a8df8687b0c20fd526d5f9483511f52c Mon Sep 17 00:00:00 2001 From: richard-dds Date: Wed, 2 Oct 2019 11:44:44 -0400 Subject: [PATCH] Use _get_client everywhere --- atst/domain/csp/cloud.py | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/atst/domain/csp/cloud.py b/atst/domain/csp/cloud.py index 06b9b430..cbcbf893 100644 --- a/atst/domain/csp/cloud.py +++ b/atst/domain/csp/cloud.py @@ -594,12 +594,7 @@ class AWSCloudProvider(CloudProviderInterface): credentials = assumed_role_object["Credentials"] # Use the temporary credentials that AssumeRole returns to make a new connection to IAM - iam_client = self.boto3.client( - "iam", - aws_access_key_id=credentials["AccessKeyId"], - aws_secret_access_key=credentials["SecretAccessKey"], - aws_session_token=credentials["SessionToken"], - ) + iam_client = self._get_client("iam", credentials=credentials) # Create the user with a PermissionBoundary permission_boundary_arn = "arn:aws:iam::aws:policy/AlexaForBusinessDeviceSetup" @@ -671,16 +666,29 @@ class AWSCloudProvider(CloudProviderInterface): def _get_client(self, service: str, credentials=None): """ A helper for creating a client of a given AWS service. - """ - credentials = credentials or { - "AccessKeyId": self.access_key_id, - "SecretAccessKey": self.secret_key, + + If `credentials` aren't provided, the configured root credentials will be used. + + `credentials` format: + { + "AccessKeyId": "access-key-id", + "SecretAccessKey": "secret-access-key", + "SessionToken": "session-token" # optional } + """ + + credentials = credentials or {} + credential_kwargs = { + "aws_access_key_id": credentials.get("AccessKeyId", self.access_key_id), + "aws_secret_access_key": credentials.get( + "SecretAccessKey", self.secret_key + ), + } + if "SessionToken" in credentials: + credential_kwargs["aws_session_token"] = credentials["SessionToken"] + return self.boto3.client( - service, - aws_access_key_id=credentials["AccessKeyId"], - aws_secret_access_key=credentials["SecretAccessKey"], - region_name=self.region_name, + service, region_name=self.region_name, **credential_kwargs ) def _inline_org_management_policy(self, account_id: str) -> Dict: