Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 107 additions & 17 deletions s3tests/functional/test_sts.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
get_iam_path_prefix,
make_iam_name,
get_client,
get_main_user_id,
get_alt_client,
get_alt_user_id,
get_config_endpoint,
get_new_bucket_name,
get_new_bucket,
get_parameter_name,
get_main_aws_access_key,
get_main_aws_secret_key,
Expand All @@ -55,6 +58,7 @@
get_azp,
get_user_token
)
from .utils import (assert_raises, _get_status_and_error_code)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -394,6 +398,97 @@ def test_assume_role_allow_head_nonexistent():
status = e.response['ResponseMetadata']['HTTPStatusCode']
assert status == 404

@pytest.mark.test_of_sts
@pytest.mark.fails_on_dbstore
def test_assume_role_owner_allow():
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_name=get_parameter_name()
role_session_name=get_parameter_name()

policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}'
role_response = iam_client.create_role(Path='/', RoleName=role_name, AssumeRolePolicyDocument=policy_document)

resp = sts_client.assume_role(RoleArn=role_response['Role']['Arn'], RoleSessionName=role_session_name)

s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='')

# create a bucket with the alt user
bucket_name = get_new_bucket(get_alt_client())

# access allowed from role assumed by alt user
s3_client.get_bucket_location(Bucket=bucket_name)

@pytest.mark.test_of_sts
@pytest.mark.fails_on_dbstore
def test_assume_role_nonowner_deny():
iam_client=get_iam_client()
sts_client=get_sts_client()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_name=get_parameter_name()
role_session_name=get_parameter_name()

policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}'
role_response = iam_client.create_role(Path='/', RoleName=role_name, AssumeRolePolicyDocument=policy_document)

resp = sts_client.assume_role(RoleArn=role_response['Role']['Arn'], RoleSessionName=role_session_name)

s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='')

# create a bucket with the main user
main_client = get_client()
bucket_name = get_new_bucket(main_client)

# access denied from role assumed by alt user
e = assert_raises(ClientError, s3_client.get_bucket_location, Bucket=bucket_name)
assert (403, 'AccessDenied') == _get_status_and_error_code(e.response)

@pytest.mark.test_of_sts
@pytest.mark.fails_on_dbstore
def test_assume_role_acl_allow():
iam_client=get_iam_client()
sts_client=get_sts_client()
main_user_id=get_main_user_id()
sts_user_id=get_alt_user_id()
default_endpoint=get_config_endpoint()
role_name=get_parameter_name()
role_session_name=get_parameter_name()

policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}'
role_response = iam_client.create_role(Path='/', RoleName=role_name, AssumeRolePolicyDocument=policy_document)

resp = sts_client.assume_role(RoleArn=role_response['Role']['Arn'], RoleSessionName=role_session_name)

s3_client = boto3.client('s3',
aws_access_key_id = resp['Credentials']['AccessKeyId'],
aws_secret_access_key = resp['Credentials']['SecretAccessKey'],
aws_session_token = resp['Credentials']['SessionToken'],
endpoint_url=default_endpoint,
region_name='')

# create a bucket with the main user and grant read acl to alt user
main_client = get_client()
bucket_name = get_new_bucket(main_client)
main_client.put_bucket_acl(Bucket=bucket_name,
GrantFullControl=f'id={main_user_id}',
GrantReadACP=f'id={sts_user_id}')

# access allowed from role assumed by alt user
s3_client.get_bucket_location(Bucket=bucket_name)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cbodley : should we add a test that has no role/session policy and no acl grant also for a non-owner and check if access is actually denied

Copy link
Contributor Author

@cbodley cbodley Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_assume_role_owner_deny() above tests the non-owner case with no policy

edit: renamed to test_assume_role_nonowner_deny


@pytest.mark.webidentity_test
@pytest.mark.token_claims_trust_policy_test
Expand Down Expand Up @@ -965,15 +1060,15 @@ def test_session_policy_bucket_policy_role_arn():
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "{}".format(rolearn)},
"Action": ["s3:GetObject","s3:PutObject"],
"Action": ["s3:GetBucketLocation","s3:ListBucket"],
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})
s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:ListBucket\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"

resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
Expand All @@ -985,15 +1080,12 @@ def test_session_policy_bucket_policy_role_arn():
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200

try:
obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt")
except ClientError as e:
s3object_error = e.response.get("Error", {}).get("Code")
assert s3object_error == 'AccessDenied'
s3_client.head_bucket(Bucket=bucket_name_1)

# s3:GetBucketLocation not allowed by session policy
e = assert_raises(ClientError, s3_client.get_bucket_location, Bucket=bucket_name_1)
assert (403, 'AccessDenied') == _get_status_and_error_code(e.response)

oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
Expand Down Expand Up @@ -1041,15 +1133,15 @@ def test_session_policy_bucket_policy_session_arn():
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "{}".format(rolesessionarn)},
"Action": ["s3:GetObject","s3:PutObject"],
"Action": ["s3:GetBucketLocation","s3:ListBucket"],
"Resource": [
"{}".format(resource1),
"{}".format(resource2)
]
}]
})
s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy)
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"
session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:ListBucket\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}"

resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy)
assert resp['ResponseMetadata']['HTTPStatusCode'] == 200
Expand All @@ -1061,13 +1153,11 @@ def test_session_policy_bucket_policy_session_arn():
endpoint_url=default_endpoint,
region_name='',
)
bucket_body = 'this is a test file'
s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt")
assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200

s3_client.head_bucket(Bucket=bucket_name_1)

s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt")
assert s3_get_obj['ResponseMetadata']['HTTPStatusCode'] == 200
# s3:GetBucketLocation allowed by bucket policy
s3_client.get_bucket_location(Bucket=bucket_name_1)

oidc_remove=iam_client.delete_open_id_connect_provider(
OpenIDConnectProviderArn=oidc_arn
Expand Down