diff --git a/examples/user.py b/examples/user.py new file mode 100644 index 0000000..4052167 --- /dev/null +++ b/examples/user.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 +"""Example usage of the Users API. + +This example demonstrates how to read a user by ID using the Python TFE SDK. +""" + +import os +import sys + +# Add the src directory to the Python path so we can import the local package. +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from pytfe import TFEClient, TFEConfig + + +def main() -> None: + """Read and print user details from Terraform Cloud.""" + user_id = os.getenv("TFE_USER_ID") + + try: + client = TFEClient(TFEConfig.from_env()) + + current_user = client.users.read_current() + print("=== Current Terraform Cloud User ===") + print(f"User ID: {current_user.id}") + print(f"Username: {current_user.username}") + print(f"Email: {current_user.email or 'N/A'}") + print(f"Auth Method: {current_user.auth_method or 'N/A'}") + + if not user_id: + print("\nTFE_USER_ID not set. Skipping client.users.read(user_id).") + return + + user = client.users.read(user_id) + + print("\n=== Terraform Cloud User By ID ===") + print(f"User ID: {user.id}") + print(f"Username: {user.username}") + print(f"Email: {user.email or 'N/A'}") + print(f"Auth Method: {user.auth_method or 'N/A'}") + except Exception as e: + print(f"Error running user example: {e}") + + +if __name__ == "__main__": + main() diff --git a/src/pytfe/client.py b/src/pytfe/client.py index d1c8337..df04eaf 100644 --- a/src/pytfe/client.py +++ b/src/pytfe/client.py @@ -32,6 +32,7 @@ from .resources.ssh_keys import SSHKeys from .resources.state_version_outputs import StateVersionOutputs from .resources.state_versions import StateVersions +from .resources.user import Users from .resources.variable import Variables from .resources.variable_sets import VariableSets, VariableSetVariables from .resources.workspace_resources import WorkspaceResourcesService @@ -69,6 +70,7 @@ def __init__(self, config: TFEConfig | None = None): self.plans = Plans(self._transport) self.organizations = Organizations(self._transport) self.organization_memberships = OrganizationMemberships(self._transport) + self.users = Users(self._transport) self.projects = Projects(self._transport) self.variables = Variables(self._transport) self.variable_sets = VariableSets(self._transport) diff --git a/src/pytfe/models/user.py b/src/pytfe/models/user.py index 26b902e..bf8a019 100644 --- a/src/pytfe/models/user.py +++ b/src/pytfe/models/user.py @@ -3,21 +3,51 @@ from pydantic import BaseModel, ConfigDict, Field +class TwoFactor(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + enabled: bool = Field(default=False, alias="enabled") + verified: bool = Field(default=False, alias="verified") + + +class UserPermissions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + can_create_organizations: bool = Field( + default=False, alias="can-create-organizations" + ) + can_change_email: bool = Field(default=False, alias="can-change-email") + can_change_username: bool = Field(default=False, alias="can-change-username") + can_manage_user_tokens: bool = Field(default=False, alias="can-manage-user-tokens") + can_view_2fa_settings: bool = Field(default=False, alias="can-view2fa-settings") + can_manage_hcp_account: bool = Field(default=False, alias="can-manage-hcp-account") + + class User(BaseModel): model_config = ConfigDict(populate_by_name=True, validate_by_name=True) id: str = Field(..., alias="id") - avatar_url: str = Field(default="", alias="avatar-url") - email: str = Field(default="", alias="email") + auth_method: str | None = Field(default=None, alias="auth-method") + avatar_url: str | None = Field(default=None, alias="avatar-url") + email: str | None = Field(default=None, alias="email") is_service_account: bool = Field(default=False, alias="is-service-account") - two_factor: dict = Field(default_factory=dict, alias="two-factor") - unconfirmed_email: str = Field(default="", alias="unconfirmed-email") + two_factor: TwoFactor | None = Field(default=None, alias="two-factor") + unconfirmed_email: str | None = Field(default=None, alias="unconfirmed-email") username: str = Field(default="", alias="username") v2_only: bool = Field(default=False, alias="v2-only") - is_site_admin: bool = Field(default=False, alias="is-site-admin") # Deprecated - is_admin: bool = Field(default=False, alias="is-admin") - is_sso_login: bool = Field(default=False, alias="is-sso-login") - permissions: dict = Field(default_factory=dict, alias="permissions") + is_site_admin: bool | None = Field( + default=None, alias="is-site-admin" + ) # Deprecated + is_admin: bool | None = Field(default=None, alias="is-admin") + is_sso_login: bool | None = Field(default=None, alias="is-sso-login") + permissions: UserPermissions | None = Field(default=None, alias="permissions") # Relations # authentication_tokens: AuthenticationTokens = Field(..., alias="authentication-tokens") + + +class UserUpdateCurrentOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + username: str | None = Field(default=None, alias="username") + email: str | None = Field(default=None, alias="email") diff --git a/src/pytfe/resources/user.py b/src/pytfe/resources/user.py new file mode 100644 index 0000000..ab5a7e3 --- /dev/null +++ b/src/pytfe/resources/user.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from ..models.user import User, UserUpdateCurrentOptions +from ..utils import valid_string_id +from ._base import _Service + + +class Users(_Service): + def read(self, user_id: str) -> User: + if not valid_string_id(user_id): + raise ValueError("invalid user id") + + r = self.t.request("GET", f"/api/v2/users/{user_id}") + d = r.json()["data"] + attr = d.get("attributes", {}) or {} + user_data = dict(attr) + user_data["id"] = d.get("id") + return User(**user_data) + + def read_current(self) -> User: + r = self.t.request("GET", "/api/v2/account/details") + d = r.json()["data"] + attr = d.get("attributes", {}) or {} + user_data = dict(attr) + user_data["id"] = d.get("id") + return User(**user_data) + + def update_current(self, options: UserUpdateCurrentOptions) -> User: + body = { + "data": { + "type": "users", + "attributes": options.model_dump(exclude_none=True), + } + } + r = self.t.request("PATCH", "/api/v2/account/update", json_body=body) + d = r.json()["data"] + attr = d.get("attributes", {}) or {} + user_data = dict(attr) + user_data["id"] = d.get("id") + return User(**user_data) diff --git a/tests/units/test_user.py b/tests/units/test_user.py new file mode 100644 index 0000000..2be9565 --- /dev/null +++ b/tests/units/test_user.py @@ -0,0 +1,167 @@ +"""Unit tests for the Users resource.""" + +import copy +from unittest.mock import Mock + +import pytest + +from pytfe.models.user import User, UserPermissions, UserUpdateCurrentOptions +from pytfe.resources.user import Users + + +class TestUsers: + """Test suite for user resource operations.""" + + @pytest.fixture + def mock_transport(self): + """Mock HTTP transport.""" + return Mock() + + @pytest.fixture + def users_service(self, mock_transport): + """Create users service with mocked transport.""" + return Users(mock_transport) + + @pytest.fixture + def sample_user_response(self): + """Sample JSON:API response for a user.""" + return { + "data": { + "id": "user-MA4GL63FmYRpSFxa", + "type": "users", + "attributes": { + "username": "admin", + "email": "admin@example.com", + "is-service-account": False, + "auth-method": "hcp_sso", + "avatar-url": "https://example.com/avatar.png", + "v2-only": True, + "permissions": { + "can-create-organizations": False, + "can-change-email": True, + "can-change-username": True, + }, + }, + } + } + + def test_read_user(self, users_service, mock_transport, sample_user_response): + """Test reading a specific user by ID.""" + mock_transport.request.return_value.json.return_value = sample_user_response + + user_id = "user-MA4GL63FmYRpSFxa" + user = users_service.read(user_id) + + mock_transport.request.assert_called_once_with( + "GET", f"/api/v2/users/{user_id}" + ) + assert isinstance(user, User) + assert user.id == user_id + assert user.username == "admin" + assert user.email == "admin@example.com" + assert user.is_service_account is False + assert user.auth_method == "hcp_sso" + assert user.avatar_url == "https://example.com/avatar.png" + assert user.v2_only is True + assert isinstance(user.permissions, UserPermissions) + assert user.permissions is not None + assert user.permissions.can_create_organizations is False + assert user.permissions.can_change_email is True + assert user.permissions.can_change_username is True + assert user.permissions.can_manage_user_tokens is False + assert user.permissions.can_view_2fa_settings is False + assert user.permissions.can_manage_hcp_account is False + + def test_read_user_invalid_id(self, users_service): + """Test reading a user with an invalid user ID.""" + with pytest.raises(ValueError, match="invalid user id"): + users_service.read("") + + def test_read_user_with_null_unconfirmed_email( + self, users_service, mock_transport, sample_user_response + ): + """Test reading a user when unconfirmed-email is null.""" + sample_user_response["data"]["attributes"]["unconfirmed-email"] = None + mock_transport.request.return_value.json.return_value = sample_user_response + + user = users_service.read("user-MA4GL63FmYRpSFxa") + + assert isinstance(user, User) + assert user.unconfirmed_email is None + + def test_read_user_two_factor_parsing( + self, users_service, mock_transport, sample_user_response + ): + """Test reading a user with two-factor data.""" + modified_response = copy.deepcopy(sample_user_response) + modified_response["data"]["attributes"]["two-factor"] = { + "enabled": True, + "verified": False, + } + mock_transport.request.return_value.json.return_value = modified_response + + user_id = "user-MA4GL63FmYRpSFxa" + user = users_service.read(user_id) + + assert user.two_factor is not None + assert user.two_factor.enabled is True + assert user.two_factor.verified is False + + def test_read_user_nullable_bools( + self, users_service, mock_transport, sample_user_response + ): + """Test reading a user when pointer-style boolean fields are null.""" + modified_response = copy.deepcopy(sample_user_response) + modified_response["data"]["attributes"]["is-site-admin"] = None + modified_response["data"]["attributes"]["is-admin"] = None + modified_response["data"]["attributes"]["is-sso-login"] = None + mock_transport.request.return_value.json.return_value = modified_response + + user_id = "user-MA4GL63FmYRpSFxa" + user = users_service.read(user_id) + + assert user.is_site_admin is None + assert user.is_admin is None + assert user.is_sso_login is None + + def test_read_current_user( + self, users_service, mock_transport, sample_user_response + ): + """Test reading the currently authenticated user.""" + mock_transport.request.return_value.json.return_value = sample_user_response + + user = users_service.read_current() + + mock_transport.request.assert_called_once_with("GET", "/api/v2/account/details") + assert isinstance(user, User) + assert user.id == "user-MA4GL63FmYRpSFxa" + assert user.username == "admin" + assert user.email == "admin@example.com" + + def test_update_current_user( + self, users_service, mock_transport, sample_user_response + ): + """Test updating the currently authenticated user.""" + mock_transport.request.return_value.json.return_value = sample_user_response + options = UserUpdateCurrentOptions( + username="new-admin", + email="new-admin@example.com", + ) + + user = users_service.update_current(options) + + mock_transport.request.assert_called_once_with( + "PATCH", + "/api/v2/account/update", + json_body={ + "data": { + "type": "users", + "attributes": { + "username": "new-admin", + "email": "new-admin@example.com", + }, + } + }, + ) + assert isinstance(user, User) + assert user.id == "user-MA4GL63FmYRpSFxa"