Skip to content

Commit 0f75bc5

Browse files
feat(run-tasks-integration): add run tasks integration callback support
- Add RunTasksIntegration resource with callback method - Add RunTaskRequest model for webhook payload parsing - Add TaskResultCallbackOptions, TaskResultOutcome, TaskResultStatus models - Add example Flask server for run tasks webhooks - Add 15 unit tests for run tasks integration - Update client to include run_tasks_integration property - Export RunTaskRequest model
1 parent 0b23635 commit 0f75bc5

6 files changed

Lines changed: 833 additions & 0 deletions

File tree

examples/run_tasks_integration.py

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
"""
2+
Terraform Cloud/Enterprise Run Tasks Integration Example
3+
4+
This example demonstrates how to use the python-tfe SDK to build a run task server
5+
that receives task requests from TFC/TFE and sends results back via the callback API.
6+
7+
IMPORTANT: This example uses Flask as a simple HTTP server for demonstration purposes.
8+
You can use any web framework (FastAPI, Django, etc.) or even the built-in http.server.
9+
The key components are:
10+
1. Receiving POST requests with run task payloads
11+
2. Using TFEClient.run_tasks_integration.callback() to send results back
12+
13+
Prerequisites:
14+
- Install Flask (for this example only): pip install flask
15+
- Expose your server publicly using ngrok, cloudflare tunnel, or similar
16+
- Create a run task in TFC/TFE pointing to your public URL endpoint
17+
- Attach the run task to a workspace
18+
19+
Usage:
20+
python examples/run_tasks_integration.py
21+
22+
Then expose with ngrok:
23+
ngrok http 5000
24+
25+
API Documentation:
26+
https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration
27+
"""
28+
29+
from __future__ import annotations
30+
31+
import os
32+
33+
try:
34+
from flask import Flask, request, jsonify
35+
except ImportError:
36+
print("Error: Flask is required for this example")
37+
print("Install it with: pip install flask")
38+
exit(1)
39+
40+
from pytfe import TFEClient, TFEConfig
41+
from pytfe.models import RunTaskRequest, RunTaskRequestCapabilities
42+
from pytfe.resources.run_tasks_integration import (
43+
RunTasksIntegration,
44+
TaskResultCallbackOptions,
45+
TaskResultOutcome,
46+
TaskResultStatus,
47+
TaskResultTag,
48+
)
49+
50+
app = Flask(__name__)
51+
52+
# Initialize TFE client for callback functionality
53+
# Note: The callback uses the access_token from the run task request,
54+
# NOT your regular TFE API token
55+
config = TFEConfig()
56+
client = TFEClient(config)
57+
58+
59+
@app.route('/run-task', methods=['POST'])
60+
def handle_run_task():
61+
"""Handle incoming run task request from TFC/TFE."""
62+
try:
63+
# Parse the incoming request
64+
run_task_request = RunTaskRequest(**request.json)
65+
66+
print(f"Received run task request:")
67+
print(f" Organization: {run_task_request.organization_name}")
68+
print(f" Workspace: {run_task_request.workspace_name}")
69+
print(f" Run ID: {run_task_request.run_id}")
70+
print(f" Stage: {run_task_request.stage}")
71+
print(f" Enforcement Level: {run_task_request.task_result_enforcement_level}")
72+
73+
# Extract the callback information
74+
callback_url = run_task_request.task_result_callback_url
75+
access_token = run_task_request.access_token
76+
77+
# YOUR CUSTOM LOGIC HERE
78+
# This is where you would perform your actual run task checks
79+
# For example:
80+
# - Download and analyze the plan JSON
81+
# - Check for policy violations
82+
# - Validate resource configurations
83+
# - Run security scans
84+
# - Check cost estimates
85+
86+
# Example: Simple check based on workspace name
87+
if "prod" in run_task_request.workspace_name.lower():
88+
# Production workspace - run strict checks
89+
result = perform_strict_checks(run_task_request)
90+
else:
91+
# Non-production - run basic checks
92+
result = perform_basic_checks(run_task_request)
93+
94+
# Send the callback to TFC/TFE
95+
callback_options = TaskResultCallbackOptions(
96+
status=result["status"],
97+
message=result["message"],
98+
url=result.get("url"),
99+
outcomes=result.get("outcomes", []),
100+
)
101+
102+
client.run_tasks_integration.callback(
103+
callback_url=callback_url,
104+
access_token=access_token,
105+
options=callback_options,
106+
)
107+
108+
print(f"Successfully sent callback with status: {result['status']}")
109+
110+
# Return 200 OK to TFC/TFE
111+
return jsonify({"status": "accepted"}), 200
112+
113+
except Exception as e:
114+
print(f"Error processing run task: {e}")
115+
116+
# Even if processing fails, try to send a failure callback
117+
try:
118+
if 'callback_url' in locals() and 'access_token' in locals():
119+
error_options = TaskResultCallbackOptions(
120+
status=TaskResultStatus.FAILED,
121+
message=f"Run task processing error: {str(e)}",
122+
)
123+
client.run_tasks_integration.callback(
124+
callback_url=callback_url,
125+
access_token=access_token,
126+
options=error_options,
127+
)
128+
except Exception as callback_error:
129+
print(f"Failed to send error callback: {callback_error}")
130+
131+
return jsonify({"error": str(e)}), 500
132+
133+
134+
def perform_strict_checks(run_task_request: RunTaskRequest) -> dict:
135+
"""Perform strict checks for production workspaces.
136+
137+
This is a placeholder for your actual check logic.
138+
"""
139+
# Example: Always pass for demo purposes
140+
# In real implementation, you would:
141+
# - Download the configuration or plan
142+
# - Analyze it for compliance/security
143+
# - Generate detailed outcomes
144+
145+
outcomes = [
146+
TaskResultOutcome(
147+
outcome_id="SECURITY-001",
148+
description="Security check passed",
149+
body="All security requirements met for production deployment.",
150+
tags={
151+
"Category": [TaskResultTag(label="Security")],
152+
"Severity": [TaskResultTag(label="Info", level="info")],
153+
},
154+
),
155+
TaskResultOutcome(
156+
outcome_id="COMPLIANCE-001",
157+
description="Compliance check passed",
158+
body="Configuration meets all compliance requirements.",
159+
tags={
160+
"Category": [TaskResultTag(label="Compliance")],
161+
"Severity": [TaskResultTag(label="Info", level="info")],
162+
},
163+
),
164+
]
165+
166+
return {
167+
"status": TaskResultStatus.PASSED,
168+
"message": "All production checks passed",
169+
"url": "https://your-dashboard.example.com/results/123",
170+
"outcomes": outcomes,
171+
}
172+
173+
174+
def perform_basic_checks(run_task_request: RunTaskRequest) -> dict:
175+
"""Perform basic checks for non-production workspaces.
176+
177+
This is a placeholder for your actual check logic.
178+
"""
179+
# Example: Simple validation
180+
outcomes = [
181+
TaskResultOutcome(
182+
outcome_id="BASIC-001",
183+
description="Basic validation passed",
184+
body="Configuration syntax is valid.",
185+
tags={
186+
"Category": [TaskResultTag(label="Validation")],
187+
},
188+
),
189+
]
190+
191+
return {
192+
"status": TaskResultStatus.PASSED,
193+
"message": "Basic checks completed successfully",
194+
"outcomes": outcomes,
195+
}
196+
197+
198+
@app.route('/health', methods=['GET'])
199+
def health_check():
200+
"""Health check endpoint."""
201+
return jsonify({"status": "healthy"}), 200
202+
203+
204+
if __name__ == '__main__':
205+
print("Starting Run Task server on http://localhost:5000")
206+
print("Make sure to expose this with ngrok or similar for TFC/TFE to reach it")
207+
print("Example: ngrok http 5000")
208+
app.run(host='0.0.0.0', port=5000, debug=True)

src/pytfe/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from .resources.run import Runs
2626
from .resources.run_event import RunEvents
2727
from .resources.run_task import RunTasks
28+
from .resources.run_tasks_integration import RunTasksIntegration
2829
from .resources.run_trigger import RunTriggers
2930
from .resources.ssh_keys import SSHKeys
3031
from .resources.state_version_outputs import StateVersionOutputs
@@ -76,6 +77,7 @@ def __init__(self, config: TFEConfig | None = None):
7677
self.state_versions = StateVersions(self._transport)
7778
self.state_version_outputs = StateVersionOutputs(self._transport)
7879
self.run_tasks = RunTasks(self._transport)
80+
self.run_tasks_integration = RunTasksIntegration(self._transport)
7981
self.run_triggers = RunTriggers(self._transport)
8082
self.runs = Runs(self._transport)
8183
self.query_runs = QueryRuns(self._transport)

src/pytfe/models/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,10 @@
256256
Stage,
257257
TaskEnforcementLevel,
258258
)
259+
from .run_task_request import (
260+
RunTaskRequest,
261+
RunTaskRequestCapabilities,
262+
)
259263
from .run_trigger import (
260264
RunTrigger,
261265
RunTriggerCreateOptions,
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"""Run Task Request models for python-tfe.
2+
3+
This module contains the RunTaskRequest model which represents the payload
4+
that TFC/TFE sends to external run task servers.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
from datetime import datetime
10+
11+
from pydantic import BaseModel, ConfigDict, Field
12+
13+
14+
class RunTaskRequestCapabilities(BaseModel):
15+
"""Capabilities that the caller supports."""
16+
17+
outcomes: bool = Field(
18+
default=False,
19+
description="Whether the run task server supports outcomes"
20+
)
21+
22+
23+
class RunTaskRequest(BaseModel):
24+
"""Represents the payload that TFC/TFE sends to a run task's URL.
25+
26+
This is the incoming request that your external run task server receives
27+
from Terraform Cloud/Enterprise when a run task is triggered.
28+
29+
API Documentation:
30+
https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#common-properties
31+
"""
32+
33+
access_token: str = Field(
34+
description="Token to use for authentication when sending callback"
35+
)
36+
capabilities: RunTaskRequestCapabilities | None = Field(
37+
default=None,
38+
description="Capabilities that the caller supports"
39+
)
40+
configuration_version_download_url: str | None = Field(
41+
default=None,
42+
description="URL to download the configuration version"
43+
)
44+
configuration_version_id: str | None = Field(
45+
default=None,
46+
description="ID of the configuration version"
47+
)
48+
is_speculative: bool = Field(
49+
description="Whether this is a speculative run"
50+
)
51+
organization_name: str = Field(
52+
description="Name of the organization"
53+
)
54+
payload_version: int = Field(
55+
description="Version of the payload format"
56+
)
57+
plan_json_api_url: str | None = Field(
58+
default=None,
59+
description="URL to access the plan JSON via API (post_plan, pre_apply, post_apply stages)"
60+
)
61+
run_app_url: str = Field(
62+
description="URL to view the run in TFC/TFE UI"
63+
)
64+
run_created_at: datetime = Field(
65+
description="Timestamp when the run was created"
66+
)
67+
run_created_by: str = Field(
68+
description="Username of the user who created the run"
69+
)
70+
run_id: str = Field(
71+
description="ID of the run"
72+
)
73+
run_message: str = Field(
74+
description="Message associated with the run"
75+
)
76+
stage: str = Field(
77+
description="Stage when the run task is executed (pre_plan, post_plan, pre_apply, post_apply)"
78+
)
79+
task_result_callback_url: str = Field(
80+
description="URL to send the task result callback to"
81+
)
82+
task_result_enforcement_level: str = Field(
83+
description="Enforcement level for the task result (advisory, mandatory)"
84+
)
85+
task_result_id: str = Field(
86+
description="ID of the task result"
87+
)
88+
vcs_branch: str | None = Field(
89+
default=None,
90+
description="VCS branch name"
91+
)
92+
vcs_commit_url: str | None = Field(
93+
default=None,
94+
description="URL to the VCS commit"
95+
)
96+
vcs_pull_request_url: str | None = Field(
97+
default=None,
98+
description="URL to the VCS pull request"
99+
)
100+
vcs_repo_url: str | None = Field(
101+
default=None,
102+
description="URL to the VCS repository"
103+
)
104+
workspace_app_url: str = Field(
105+
description="URL to view the workspace in TFC/TFE UI"
106+
)
107+
workspace_id: str = Field(
108+
description="ID of the workspace"
109+
)
110+
workspace_name: str = Field(
111+
description="Name of the workspace"
112+
)
113+
workspace_working_directory: str | None = Field(
114+
default=None,
115+
description="Working directory for the workspace"
116+
)
117+
118+
model_config = ConfigDict(populate_by_name=True)

0 commit comments

Comments
 (0)