Skip to content

Commit a413b41

Browse files
Merge pull request #3 from Contrast-Security-OSS/feature/app-server-metadata
Feature/app server metadata
2 parents cdc5fa4 + 17b90c0 commit a413b41

2 files changed

Lines changed: 270 additions & 1 deletion

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ output.json
44
*.logs
55
.DS_Store
66
output.csv
7-
apps_and_servers.csv
7+
apps_and_servers.csv
8+
server_app_metadata.csv
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
import requests
2+
import base64
3+
import getpass
4+
import json
5+
import csv
6+
7+
class Application:
8+
def __init__(self, app_id, name):
9+
self.app_id = app_id
10+
self.name = name
11+
12+
def getApplications(headers, params, org_id):
13+
url = f"{contrast_url}/api/ng/{org_id}/applications"
14+
response = requests.get(url, headers=headers, params=params)
15+
return response
16+
17+
def getServers(headers, servers_href):
18+
url = f"{contrast_url}/api{servers_href}"
19+
response = requests.get(url, headers=headers)
20+
return response
21+
22+
def read_creds_file(filename="../.creds"):
23+
"""Read credentials from a .creds file"""
24+
creds = {}
25+
try:
26+
with open(filename, "r") as f:
27+
for line in f:
28+
line = line.strip()
29+
if line and not line.startswith("#"):
30+
key, value = line.split("=", 1)
31+
creds[key] = value
32+
except FileNotFoundError:
33+
print(f"Warning: {filename} file not found. Please input values.")
34+
return creds
35+
36+
# Read credentials from .creds file
37+
creds = read_creds_file()
38+
39+
# Set the default for re-using within your organization.
40+
contrast_url = creds.get("CONTRAST_URL", "")
41+
org_id = creds.get("ORG_ID", "")
42+
username = creds.get("USERNAME", "")
43+
api_key = creds.get("API_KEY", "")
44+
service_key = creds.get("SERVICE_KEY", "")
45+
app_id = creds.get("APP_ID", "")
46+
47+
headers = {
48+
"Accept": "application/json"
49+
}
50+
params = {
51+
"expand": ["apps", "vulns", "metadata"]
52+
}
53+
54+
def main():
55+
global contrast_url, org_id, username, api_key, service_key
56+
57+
msg = f"Enter your Contrast URL (blank will use default '{contrast_url}'): "
58+
contrast_url_input = input(msg)
59+
if contrast_url_input.strip():
60+
contrast_url = contrast_url_input
61+
else:
62+
while not contrast_url_input.strip() and not contrast_url.strip():
63+
print("Contrast URL cannot be blank.")
64+
contrast_url_input = input(msg)
65+
contrast_url = contrast_url_input
66+
67+
msg = f"Enter your Organization ID (blank will use default '{org_id}'): "
68+
org_id_input = input(msg)
69+
if org_id_input.strip():
70+
org_id = org_id_input
71+
else:
72+
while not org_id_input.strip() and not org_id.strip():
73+
print("Organization ID cannot be blank.")
74+
org_id_input = input(msg)
75+
org_id = org_id_input
76+
77+
msg = f"Enter your username (blank will use default '{username}'): "
78+
username_input = input(msg)
79+
if username_input.strip():
80+
username = username_input
81+
else:
82+
while not username_input.strip() and not username.strip():
83+
print("Username cannot be blank.")
84+
username_input = input(msg)
85+
username = username_input
86+
87+
msg = f"Enter your API key (blank will use default '****************************'): "
88+
api_key_input = getpass.getpass(msg)
89+
if api_key_input.strip():
90+
api_key = api_key_input
91+
else:
92+
while not api_key_input.strip() and not api_key.strip():
93+
print("API key cannot be blank.")
94+
api_key_input = getpass.getpass(msg)
95+
api_key = api_key_input
96+
97+
msg = f"Enter your service key (blank will use default '************'): "
98+
service_key_input = getpass.getpass(msg)
99+
if service_key_input.strip():
100+
service_key = service_key_input
101+
else:
102+
while not service_key_input.strip() and not service_key.strip():
103+
print("Service key cannot be blank.")
104+
service_key_input = getpass.getpass(msg)
105+
service_key = service_key_input
106+
107+
auth_str = f"{username}:{service_key}"
108+
auth_b64 = base64.b64encode(auth_str.encode()).decode()
109+
headers["Authorization"] = f"Basic {auth_b64}"
110+
headers["API-Key"] = api_key
111+
112+
response = getApplications(headers, params, org_id)
113+
if response.status_code == 200:
114+
data = response.json()
115+
with open("output.json", "w") as f:
116+
json.dump(data, f, indent=2)
117+
print("Applications response saved to output.json")
118+
119+
# List to collect CSV data
120+
csv_data = []
121+
122+
if data.get("applications"):
123+
application_list = data["applications"]
124+
print(f"Total Applications: {len(application_list)}")
125+
126+
for app in application_list:
127+
app_name = app.get("name", "N/A")
128+
app_id = app.get("app_id", "N/A")
129+
metadata_entities = app.get("metadataEntities", [])
130+
131+
print(f"\nApplication: {app_name} (ID: {app_id})")
132+
133+
# Output metadata entities
134+
if metadata_entities:
135+
print(f" Metadata Entities: {len(metadata_entities)} found")
136+
else:
137+
print(f" Metadata Entities: None")
138+
139+
# Get the links array from the application
140+
links = app.get("links", [])
141+
142+
# Find the servers link
143+
servers_href = None
144+
for link in links:
145+
if link.get("rel") == "servers":
146+
servers_href = link.get("href")
147+
break
148+
149+
if servers_href:
150+
servers_response = getServers(headers, servers_href)
151+
152+
if servers_response.status_code == 200:
153+
servers_data = servers_response.json()
154+
servers = servers_data.get("servers", [])
155+
print(f" Total Servers: {len(servers)}")
156+
157+
if servers:
158+
for server in servers:
159+
server_name = server.get("name", "N/A")
160+
server_id = server.get("server_id", "N/A")
161+
print(f" - Server: {server_name} (ID: {server_id})")
162+
163+
# Add rows to CSV for each metadata entity
164+
if metadata_entities:
165+
for entity in metadata_entities:
166+
field_name = entity.get("fieldName", "N/A")
167+
field_value = entity.get("fieldValue", "N/A")
168+
csv_data.append({
169+
"server_name": server_name,
170+
"server_id": server_id,
171+
"application_name": app_name,
172+
"application_id": app_id,
173+
"metadata_field_name": field_name,
174+
"metadata_field_value": field_value
175+
})
176+
else:
177+
# If no metadata, still add a row with the server and app
178+
csv_data.append({
179+
"server_name": server_name,
180+
"server_id": server_id,
181+
"application_name": app_name,
182+
"application_id": app_id,
183+
"metadata_field_name": "N/A",
184+
"metadata_field_value": "N/A"
185+
})
186+
else:
187+
# No servers found, but add application with metadata if exists
188+
if metadata_entities:
189+
for entity in metadata_entities:
190+
field_name = entity.get("fieldName", "N/A")
191+
field_value = entity.get("fieldValue", "N/A")
192+
csv_data.append({
193+
"server_name": "N/A",
194+
"server_id": "N/A",
195+
"application_name": app_name,
196+
"application_id": app_id,
197+
"metadata_field_name": field_name,
198+
"metadata_field_value": field_value
199+
})
200+
else:
201+
# No servers and no metadata
202+
csv_data.append({
203+
"server_name": "N/A",
204+
"server_id": "N/A",
205+
"application_name": app_name,
206+
"application_id": app_id,
207+
"metadata_field_name": "N/A",
208+
"metadata_field_value": "N/A"
209+
})
210+
else:
211+
print(f" Error fetching servers: {servers_response.status_code}")
212+
# Add application with metadata even if server fetch fails
213+
if metadata_entities:
214+
for entity in metadata_entities:
215+
field_name = entity.get("fieldName", "N/A")
216+
field_value = entity.get("fieldValue", "N/A")
217+
csv_data.append({
218+
"server_name": "N/A",
219+
"server_id": "N/A",
220+
"application_name": app_name,
221+
"application_id": app_id,
222+
"metadata_field_name": field_name,
223+
"metadata_field_value": field_value
224+
})
225+
else:
226+
print(" No servers link found for this application")
227+
# Add application with metadata even if no servers link
228+
if metadata_entities:
229+
for entity in metadata_entities:
230+
field_name = entity.get("fieldName", "N/A")
231+
field_value = entity.get("fieldValue", "N/A")
232+
csv_data.append({
233+
"server_name": "N/A",
234+
"server_id": "N/A",
235+
"application_name": app_name,
236+
"application_id": app_id,
237+
"metadata_field_name": field_name,
238+
"metadata_field_value": field_value
239+
})
240+
else:
241+
# No servers link and no metadata
242+
csv_data.append({
243+
"server_name": "N/A",
244+
"server_id": "N/A",
245+
"application_name": app_name,
246+
"application_id": app_id,
247+
"metadata_field_name": "N/A",
248+
"metadata_field_value": "N/A"
249+
})
250+
251+
# Write CSV file
252+
if csv_data:
253+
with open("server_app_metadata.csv", "w", newline="") as csvfile:
254+
fieldnames = ["server_name", "server_id", "application_name", "application_id", "metadata_field_name", "metadata_field_value"]
255+
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
256+
writer.writeheader()
257+
writer.writerows(csv_data)
258+
print(f"\nCSV file created: server_app_metadata.csv ({len(csv_data)} rows)")
259+
else:
260+
print("\nNo data to write to CSV file.")
261+
else:
262+
print("No applications found in response.")
263+
return
264+
else:
265+
print(f"Error: {response.status_code} - {response.text}")
266+
267+
if __name__ == "__main__":
268+
main()

0 commit comments

Comments
 (0)