-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtest_ram_usage.py
More file actions
119 lines (96 loc) · 3.42 KB
/
test_ram_usage.py
File metadata and controls
119 lines (96 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from collections.abc import Callable, Iterator
from io import DEFAULT_BUFFER_SIZE
from lzma import compress
from pathlib import Path
from random import randbytes, seed
from typing import BinaryIO, Optional, cast
import pytest
from xz import XZFile
from xz.common import create_xz_index_footer, parse_xz_footer, parse_xz_index
from xz.io import IOCombiner, IOStatic
@pytest.fixture
def ram_usage() -> Iterator[Callable[[], int]]:
try:
import tracemalloc # pylint: disable=import-outside-toplevel
except ImportError: # e.g. PyPy
pytest.skip("tracemalloc module not available")
try:
tracemalloc.start()
yield lambda: tracemalloc.get_traced_memory()[1]
finally:
tracemalloc.stop()
BLOCK_SIZE = 1_000_000
@pytest.fixture
def fileobj() -> BinaryIO:
# create xz raw data composed of many identical blocks
nb_blocks = 50
seed(0)
data = compress(randbytes(BLOCK_SIZE))
header = data[:12]
footer = data[-12:]
check, backward_size = parse_xz_footer(footer)
block = data[12 : -12 - backward_size]
records = parse_xz_index(data[-12 - backward_size : -12])
index_footer = create_xz_index_footer(check, records * nb_blocks)
return cast(
BinaryIO,
IOCombiner(
IOStatic(header),
*[IOStatic(block)] * nb_blocks,
IOStatic(index_footer),
),
)
def test_read_linear(
# pylint: disable=redefined-outer-name
fileobj: BinaryIO,
ram_usage: Callable[[], int],
) -> None:
with XZFile(fileobj) as xz_file:
# read almost one block
xz_file.read(BLOCK_SIZE - 1)
one_block_memory = ram_usage()
# read all the file
while xz_file.read(DEFAULT_BUFFER_SIZE):
assert (
# should not use much more memory, take 2 as error margin
ram_usage() < one_block_memory * 2
), f"Consumes too much RAM (at {100 * xz_file.tell() / len(xz_file):.0f}%)"
def test_partial_read_each_block(
# pylint: disable=redefined-outer-name
fileobj: BinaryIO,
ram_usage: Callable[[], int],
) -> None:
one_block_memory: Optional[int] = None
with XZFile(fileobj) as xz_file:
for pos in xz_file.block_boundaries[1:]:
# read second-to last byte of each block
xz_file.seek(pos - 2)
xz_file.read(1)
if one_block_memory is None:
one_block_memory = ram_usage()
else:
assert (
# default strategy is max 8 blocks, take 10 as error margin
ram_usage() < one_block_memory * 10
), (
f"Consumes too much RAM (at {100 * xz_file.tell() / len(xz_file):.0f}%)"
)
def test_write(
tmp_path: Path,
# pylint: disable=redefined-outer-name
ram_usage: Callable[[], int],
) -> None:
nb_blocks = 10
seed(0)
one_block_memory: Optional[int] = None
with XZFile(tmp_path / "archive.xz", "w") as xz_file:
for i in range(nb_blocks):
xz_file.change_block()
xz_file.write(randbytes(BLOCK_SIZE))
if one_block_memory is None:
one_block_memory = ram_usage()
else:
assert (
# should not use much more memory, take 2 as error margin
ram_usage() < one_block_memory * 2
), f"Consumes too much RAM (at {i / nb_blocks:.0f}%)"