Skip to content

Commit c1a60c0

Browse files
committed
[DCO01A-121] Make snapshot zip smaller by using symlinks to identical entities archives
1 parent dd5bb40 commit c1a60c0

2 files changed

Lines changed: 114 additions & 8 deletions

File tree

mgmtworker/cloudify_system_workflows/snapshots/snapshot_create.py

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
import pathlib
44
import queue
55
import shutil
6+
import hashlib
7+
import tarfile
68
import tempfile
79
import zipfile
810
from collections import defaultdict
11+
from contextlib import contextmanager
912
from pathlib import Path
1013
from typing import Any
1114

@@ -84,6 +87,11 @@ def __init__(
8487
self._auditlog_queue = queue.Queue()
8588
self._auditlog_listener = AuditLogListener(self._client,
8689
self._auditlog_queue)
90+
self._written_archives: dict[str, dict[tuple[str, ...], str]]
91+
self._written_archives = { # track created entities archives
92+
'plugins': {},
93+
'blueprints': {}, # will do for both blueprints and blueprint_revisions
94+
}
8795

8896
def create(self, timeout: float | None = None):
8997
"""Dumps manager's data and some metadata into a single zip file"""
@@ -263,6 +271,7 @@ def _write_files(
263271
if _should_append_entity(dump_type, entity):
264272
self._auditlog_listener.append_entity(
265273
tenant_name, dump_type, entity)
274+
self._update_written_archives(entity_id, dump_type, output_dir)
266275
# Dump the data as JSON files
267276
filenum = _get_max_filenum_in_dir(output_dir) or 0
268277
for (source, source_id), items in data_buckets.items():
@@ -308,17 +317,24 @@ def _create_archive(self):
308317
) as zf:
309318
base_dir = os.path.join(root_dir, os.curdir)
310319
base_dir = os.path.normpath(base_dir)
311-
for dirpath, dirnames, filenames in os.walk(base_dir):
320+
for dirpath, dirnames, filenames in os.walk(base_dir, followlinks=False):
321+
root_path = Path(dirpath)
312322
arcdirpath = os.path.relpath(dirpath, root_dir)
313323
for name in sorted(dirnames):
314324
path = os.path.join(dirpath, name)
315325
arcname = os.path.join(arcdirpath, name)
316326
zf.write(path, arcname)
317327
for name in filenames:
318-
path = os.path.join(dirpath, name)
319-
path = os.path.normpath(path)
320-
if os.path.isfile(path):
321-
arcname = os.path.join(arcdirpath, name)
328+
path = root_path / name
329+
arcname = path.relative_to(root_dir)
330+
if path.is_symlink():
331+
zip_info = zipfile.ZipInfo(str(arcname))
332+
zip_info.create_system = 3 # Unix
333+
st = os.lstat(path)
334+
zip_info.external_attr = st.st_mode << 16
335+
link_target = os.readlink(path)
336+
zf.writestr(zip_info, link_target)
337+
elif os.path.isfile(path):
322338
zf.write(path, arcname)
323339

324340
def _upload_archive(self):
@@ -392,6 +408,26 @@ def _update_snapshot_status(self, status, error=None):
392408
error=error
393409
)
394410

411+
def _update_written_archives(self, entity_id, dump_type, output_dir):
412+
dest_dir = (output_dir / f'{dump_type}').resolve()
413+
suffix = {
414+
'plugins': '.zip',
415+
'blueprints': '.tar.gz',
416+
}.get(dump_type)
417+
if not suffix:
418+
return
419+
entity_archive = dest_dir / f'{entity_id}{suffix}'
420+
content_hashes = _get_archive_content_hashes(entity_archive)
421+
if existing_path := self._written_archives[dump_type].get(content_hashes):
422+
entity_archive.unlink(missing_ok=False)
423+
os.symlink(
424+
os.path.relpath(existing_path, entity_archive).split("/", 1)[-1],
425+
entity_archive,
426+
)
427+
ctx.logger.debug("Created symlink: %s to %s", entity_archive, existing_path)
428+
return
429+
self._written_archives[dump_type][content_hashes] = entity_archive
430+
395431

396432
def _prepare_temp_dir() -> Path:
397433
"""Prepare temporary (working) directory structure"""
@@ -516,3 +552,59 @@ def get_all(method, kwargs=None):
516552
kwargs['_offset'] = len(data)
517553

518554
return data
555+
556+
557+
def _hash_it(content) -> str:
558+
if isinstance(content, str):
559+
content = content.encode('utf-8')
560+
elif not isinstance(content, bytes):
561+
content = str(content).encode('utf-8')
562+
return hashlib.md5(content).hexdigest()
563+
564+
565+
@contextmanager
566+
def _open_archive(path: Path):
567+
if path.name.endswith(".zip"):
568+
with zipfile.ZipFile(path, "r") as arc:
569+
yield "zip", arc
570+
elif path.name.endswith(".tar.gz"):
571+
with tarfile.open(path, "r:gz") as arc:
572+
yield "tar.gz", arc
573+
else:
574+
raise RuntimeWarning("not supported archive type '{}'".format(path))
575+
576+
577+
def _iter_archive_members(arc_type: str, archive: zipfile.ZipFile | tarfile.TarFile):
578+
if arc_type == "zip":
579+
for info in archive.infolist():
580+
yield info.filename, info.is_dir(), lambda i=info: archive.open(i)
581+
else:
582+
for member in archive.getmembers():
583+
yield member.name, member.isdir(), lambda m=member: archive.extractfile(m)
584+
585+
586+
def _get_archive_content_hashes(path: Path) -> tuple[str, ...]:
587+
hashes: set[str] = set()
588+
all_dirs: set[str] = set()
589+
not_empty_dirs: set[str] = set()
590+
filenames: set[str] = set()
591+
592+
with _open_archive(path) as (arc_type, arc):
593+
for name, is_dir, open_file in _iter_archive_members(arc_type, arc):
594+
if not is_dir:
595+
filenames.add(name)
596+
parts = name.split('/')
597+
for i in range(1, len(parts)):
598+
not_empty_dirs.add('/'.join(parts[:i]))
599+
with open_file() as fileobj:
600+
if fileobj:
601+
content_hash = _hash_it(fileobj.read())
602+
hashes.add(content_hash)
603+
else:
604+
all_dirs.add(name.rstrip("/"))
605+
606+
if filenames:
607+
hashes.add(_hash_it(":".join(filenames)))
608+
if empty_dirs := all_dirs - not_empty_dirs:
609+
hashes.add(_hash_it(":".join(empty_dirs)))
610+
return tuple(hashes)

mgmtworker/cloudify_system_workflows/snapshots/snapshot_restore.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
import uuid
77
import base64
88
import shutil
9+
import stat
910
import zipfile
1011
import tempfile
1112
import threading
1213
import subprocess
1314
from contextlib import contextmanager
1415
from functools import partial
16+
from pathlib import Path
1517
from typing import Any
1618

1719
from cloudify.workflows import ctx
@@ -76,6 +78,11 @@
7678

7779
# Reproduced/modified from patch for https://bugs.python.org/issue15795
7880
class ZipFile(zipfile.ZipFile):
81+
82+
def __init__(self, *args, **kwargs):
83+
super().__init__(*args, **kwargs)
84+
self._all_entries = {info.filename.rstrip('/'): info for info in self.infolist()}
85+
7986
def _extract_member(self, member, targetpath, pwd):
8087
"""Extract the ZipInfo object 'member' to a physical
8188
file on the path targetpath.
@@ -112,11 +119,18 @@ def _extract_member(self, member, targetpath, pwd):
112119
os.mkdir(targetpath)
113120
return targetpath
114121

115-
with self.open(member, pwd=pwd) as source, \
116-
open(targetpath, "wb") as target:
122+
_mode = member.external_attr >> 16
123+
if stat.S_ISLNK(_mode):
124+
link = self.read(member.filename).decode('utf-8')
125+
source_path = Path(member.filename).parent / link
126+
member_to_extract = self._all_entries[os.path.normpath(source_path)]
127+
else:
128+
member_to_extract = member
129+
130+
with self.open(member_to_extract, pwd=pwd) as source, open(targetpath, "wb") as target:
117131
shutil.copyfileobj(source, target)
118132

119-
mode = member.external_attr >> 16 & 0xFFF
133+
mode = member_to_extract.external_attr >> 16 & 0xFFF
120134
os.chmod(targetpath, mode)
121135
return targetpath
122136

0 commit comments

Comments
 (0)