Skip to content

Commit e10118d

Browse files
fix: harden scan resource limits
1 parent 54e79d4 commit e10118d

6 files changed

Lines changed: 54 additions & 8 deletions

File tree

requirements.txt

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
pandas>=1.3.0
2-
tqdm>=4.45.0
3-
colorama>=0.4.4
4-
openpyxl>=3.0.0
5-
pythonping>=1.1.4
6-
argparse>=1.4.0
1+
pandas>=2.0
2+
tqdm>=4.65
3+
openpyxl>=3.1
4+
rich>=13.0

src/ipmg/core/ping.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import ipaddress
22
import platform
33
import re
4-
import subprocess
4+
import subprocess # nosec B404
55
from typing import Optional, Tuple
66

77
from ipmg.exceptions import PingError
@@ -45,7 +45,8 @@ def ping_ip(ip: str, timeout: int, count: int) -> Tuple[str, Optional[float]]:
4545
cmd = ["ping", param, str(count), timeout_param, timeout_val, ip]
4646

4747
try:
48-
result = subprocess.run(
48+
# Fixed argv list, no shell, and ip is validated before this call.
49+
result = subprocess.run( # nosec B603
4950
cmd,
5051
stdout=subprocess.PIPE,
5152
stderr=subprocess.PIPE,

src/ipmg/infrastructure/file_io.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111

1212
SUPPORTED_INPUT_SUFFIXES = {".xlsx", ".xls", ".csv", ".txt", ".list"}
13+
MAX_EXPANDED_TARGETS = 65_536
1314

1415

1516
def _deduplicate(targets: Iterable[str]) -> list[str]:
@@ -31,6 +32,12 @@ def _expand_cidr(target: str) -> list[str]:
3132
if parsed.num_addresses == 1:
3233
return [str(parsed.network_address)]
3334

35+
if parsed.num_addresses - 2 > MAX_EXPANDED_TARGETS:
36+
raise FileIOError(
37+
f"CIDR target '{target}' expands to too many hosts. "
38+
f"Maximum allowed hosts: {MAX_EXPANDED_TARGETS}."
39+
)
40+
3441
return [str(ip) for ip in parsed.hosts()]
3542

3643

src/ipmg/services/scan_service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
def run_scan(args) -> None:
3434
args.threads = clamp_int(args.threads, 1, 500)
35+
args.timeout = clamp_int(args.timeout, 1, 60)
36+
args.count = clamp_int(args.count, 1, 10)
3537

3638
if not os.path.exists(args.input) and not args.discover:
3739
if Path(args.input).suffix.lower() in {".xlsx", ".xls", ".csv", ".txt", ".list"}:

tests/test_file_io.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ def test_load_targets_from_literal_cidr():
2424
assert load_targets("10.0.0.0/30") == ["10.0.0.1", "10.0.0.2"]
2525

2626

27+
def test_load_targets_rejects_large_cidr():
28+
with pytest.raises(Exception):
29+
load_targets("10.0.0.0/8")
30+
31+
2732
def test_load_targets_requires_expected_column(tmp_path):
2833
path = tmp_path / "targets.csv"
2934
pd.DataFrame({"Address": ["8.8.8.8"]}).to_csv(path, index=False)

tests/test_scan_service.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,42 @@ def fake_print_summary(df, batch_timestamp, duration_seconds):
4141

4242
run_scan(args)
4343

44+
assert args.timeout == 1
45+
assert args.count == 1
46+
4447
df: pd.DataFrame = captured["df"]
4548
statuses = dict(zip(df["IP Address"], df["Status"]))
4649
assert statuses["8.8.8.8"] == "Active"
4750
assert statuses["1.1.1.1"] == "Error"
4851
assert len(df["Batch Timestamp"].unique()) == 1
4952
assert (df["Scan Duration (s)"] >= 0).all()
53+
54+
55+
def test_run_scan_clamps_resource_limits(monkeypatch):
56+
captured = {}
57+
58+
def fake_ping_ip(_ip, timeout, count):
59+
captured["limits"] = (timeout, count)
60+
return "Active", 1.0
61+
62+
monkeypatch.setattr("ipmg.services.scan_service.load_targets", lambda _source: ["8.8.8.8"])
63+
monkeypatch.setattr("ipmg.services.scan_service.ping_ip", fake_ping_ip)
64+
monkeypatch.setattr("ipmg.services.scan_service.save_results", lambda *_args: None)
65+
monkeypatch.setattr("ipmg.services.scan_service.print_summary", lambda *_args: None)
66+
67+
args = SimpleNamespace(
68+
input="targets.csv",
69+
output="results",
70+
timeout=999,
71+
count=999,
72+
threads=999,
73+
formats=["csv"],
74+
discover=False,
75+
resolve=False,
76+
interval=None,
77+
)
78+
79+
run_scan(args)
80+
81+
assert args.threads == 500
82+
assert captured["limits"] == (60, 10)

0 commit comments

Comments
 (0)