-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathservice_config.py
More file actions
106 lines (89 loc) · 3.4 KB
/
service_config.py
File metadata and controls
106 lines (89 loc) · 3.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""
Exports ServiceConfig class
"""
from aikido_zen.helpers.ip_matcher import IPMatcher
from aikido_zen.helpers.match_endpoints import match_endpoints
# noinspection PyAttributeOutsideInit
class ServiceConfig:
"""Class holding the config of the connection_manager"""
def __init__(
self,
endpoints,
last_updated_at: int,
blocked_uids,
bypassed_ips,
received_any_stats: bool,
excluded_uids_from_rate_limiting=None,
):
# Init the class using update function :
self.update(
endpoints,
last_updated_at,
blocked_uids,
bypassed_ips,
received_any_stats,
excluded_uids_from_rate_limiting,
)
self.block_new_outgoing_requests = False
self.outbound_domains = {}
def update(
self,
endpoints,
last_updated_at: int,
blocked_uids,
bypassed_ips,
received_any_stats: bool,
excluded_uids_from_rate_limiting=None,
):
self.last_updated_at = last_updated_at
self.received_any_stats = bool(received_any_stats)
self.blocked_uids = set(blocked_uids)
self.excluded_uids_from_rate_limiting = set(
excluded_uids_from_rate_limiting or []
)
self.set_endpoints(endpoints)
self.set_bypassed_ips(bypassed_ips)
def set_endpoints(self, endpoints):
"""Sets non-graphql endpoints"""
self.endpoints = [
endpoint for endpoint in endpoints if not endpoint.get("graphql")
]
# Create an IPMatcher instance for each endpoint
for endpoint in self.endpoints:
if not "allowedIPAddresses" in endpoint:
# This feature is not supported by the current aikido server version
continue
if (
not isinstance(endpoint["allowedIPAddresses"], list)
or len(endpoint["allowedIPAddresses"]) == 0
):
# Skip empty allowlist
continue
endpoint["allowedIPAddresses"] = IPMatcher(endpoint["allowedIPAddresses"])
def get_endpoints(self, route_metadata):
"""
Gets the endpoint that matches the current context
route_metadata object includes route, url and method
"""
return match_endpoints(route_metadata, self.endpoints)
def set_bypassed_ips(self, bypassed_ips):
"""Creates an IPMatcher from the given bypassed ip set"""
self.bypassed_ips = IPMatcher(bypassed_ips)
def is_bypassed_ip(self, ip):
"""Checks if the IP is on the bypass list"""
return self.bypassed_ips.has(ip)
def update_outbound_domains(self, domains):
self.outbound_domains = {
domain["hostname"]: domain["mode"] for domain in domains
}
def set_block_new_outgoing_requests(self, value: bool):
"""Set whether to block new outgoing requests"""
self.block_new_outgoing_requests = bool(value)
def should_block_outgoing_request(self, hostname: str) -> bool:
mode = self.outbound_domains.get(hostname)
if self.block_new_outgoing_requests:
# Only allow outgoing requests if the mode is "allow"
# mode is None for unknown hostnames, so they get blocked
return mode != "allow"
# Only block outgoing requests if the mode is "block"
return mode == "block"