-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathclient.py
More file actions
111 lines (102 loc) · 4.93 KB
/
client.py
File metadata and controls
111 lines (102 loc) · 4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Copyright IBM Corp. 2025, 2026
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
from ._http import HTTPTransport
from .config import TFEConfig
from .resources.agent_pools import AgentPools
from .resources.agents import Agents, AgentTokens
from .resources.apply import Applies
from .resources.configuration_version import ConfigurationVersions
from .resources.notification_configuration import NotificationConfigurations
from .resources.oauth_client import OAuthClients
from .resources.oauth_token import OAuthTokens
from .resources.organization_membership import OrganizationMemberships
from .resources.organizations import Organizations
from .resources.plan import Plans
from .resources.policy import Policies
from .resources.policy_check import PolicyChecks
from .resources.policy_evaluation import PolicyEvaluations
from .resources.policy_set import PolicySets
from .resources.policy_set_outcome import PolicySetOutcomes
from .resources.policy_set_parameter import PolicySetParameters
from .resources.policy_set_version import PolicySetVersions
from .resources.projects import Projects
from .resources.query_run import QueryRuns
from .resources.registry_module import RegistryModules
from .resources.registry_provider import RegistryProviders
from .resources.registry_provider_version import RegistryProviderVersions
from .resources.reserved_tag_key import ReservedTagKeys
from .resources.run import Runs
from .resources.run_event import RunEvents
from .resources.run_task import RunTasks
from .resources.run_trigger import RunTriggers
from .resources.ssh_keys import SSHKeys
from .resources.state_version_outputs import StateVersionOutputs
from .resources.state_versions import StateVersions
from .resources.variable import Variables
from .resources.variable_sets import VariableSets, VariableSetVariables
from .resources.workspace_resources import WorkspaceResourcesService
from .resources.workspaces import Workspaces
class TFEClient:
def __init__(self, config: TFEConfig | None = None):
cfg = config or TFEConfig.from_env()
self._transport = HTTPTransport(
cfg.address,
cfg.token,
timeout=cfg.timeout,
verify_tls=cfg.verify_tls,
user_agent_suffix=cfg.user_agent_suffix,
max_retries=cfg.max_retries,
backoff_base=cfg.backoff_base,
backoff_cap=cfg.backoff_cap,
backoff_jitter=cfg.backoff_jitter,
http2=cfg.http2,
proxies=cfg.proxies,
ca_bundle=cfg.ca_bundle,
)
self.oauth_clients = OAuthClients(self._transport)
self.oauth_tokens = OAuthTokens(self._transport)
# Agent resources
self.agent_pools = AgentPools(self._transport)
self.agents = Agents(self._transport)
self.agent_tokens = AgentTokens(self._transport)
# Core resources
self.configuration_versions = ConfigurationVersions(self._transport)
self.notification_configurations = NotificationConfigurations(self._transport)
self.applies = Applies(self._transport)
self.plans = Plans(self._transport)
self.organizations = Organizations(self._transport)
self.organization_memberships = OrganizationMemberships(self._transport)
self.projects = Projects(self._transport)
self.variables = Variables(self._transport)
self.variable_sets = VariableSets(self._transport)
self.variable_set_variables = VariableSetVariables(self._transport)
self.workspaces = Workspaces(self._transport)
self.workspace_resources = WorkspaceResourcesService(self._transport)
self.registry_modules = RegistryModules(self._transport)
self.registry_providers = RegistryProviders(self._transport)
self.registry_provider_versions = RegistryProviderVersions(self._transport)
# State and execution resources
self.state_versions = StateVersions(self._transport)
self.state_version_outputs = StateVersionOutputs(self._transport)
self.run_tasks = RunTasks(self._transport)
self.run_triggers = RunTriggers(self._transport)
self.runs = Runs(self._transport)
self.query_runs = QueryRuns(self._transport)
self.run_events = RunEvents(self._transport)
self.policies = Policies(self._transport)
self.policy_evaluations = PolicyEvaluations(self._transport)
self.policy_checks = PolicyChecks(self._transport)
self.policy_sets = PolicySets(self._transport)
self.policy_set_parameters = PolicySetParameters(self._transport)
self.policy_set_outcomes = PolicySetOutcomes(self._transport)
self.policy_set_versions = PolicySetVersions(self._transport)
# SSH Keys
self.ssh_keys = SSHKeys(self._transport)
# Reserved Tag Key
self.reserved_tag_key = ReservedTagKeys(self._transport)
def close(self) -> None:
try:
self._transport._sync.close()
except Exception:
pass