Skip to content

Commit ce2b49a

Browse files
committed
feat: Logging and sim changes to make more HPC friendly
1 parent f56baf8 commit ce2b49a

3 files changed

Lines changed: 226 additions & 101 deletions

File tree

configs/simulator.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ instprm_file: "/app/configs/instruments/11_id.prm.instprm"
1212
worker_base_dir: "/data/workers"
1313

1414
# --- Execution Control ---
15-
parallel_jobs: 10
16-
sims_per_file: 2
15+
parallel_jobs: 256
16+
sims_per_file: 100
1717
master_seed: 42 # Reproducible parameter sampling
1818
cleanup_worker_dirs: true
19+
progress_step_pct: 0.1 # Progress logging interval in percent
20+
log_to_console: false # Useful in HPC settings
1921

2022
# If true, read SG from "# _original_symmetry_space_group_name_H-M" comment in CIF
2123
# If using custom CIFs without this comment, set to false to use standard tags

src/simulator/diffraction_generator.py

Lines changed: 185 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
from multiprocessing import Pool
66
import time
77
import shutil
8-
from tqdm import tqdm
98
import argparse
109
import yaml
1110
from typing import Dict, Any
11+
import logging
1212

1313
# Container: insert parent GSAS-II directory and import package
1414
GSAS_II_PARENT = os.environ.get("GSAS_II_PATH", "/opt/conda/envs/sim/GSAS-II")
@@ -17,6 +17,90 @@
1717

1818
from . import simulation_worker
1919

20+
21+
def setup_logging(output_dir: Path, level: int = logging.INFO, log_to_console: bool = False) -> Path:
22+
"""Configure root logger to write to a file in the output (data) directory.
23+
24+
Returns the log file path being used.
25+
"""
26+
if not isinstance(output_dir, Path):
27+
output_dir = Path(output_dir)
28+
log_file = output_dir / "simulator.log"
29+
30+
# Clear existing handlers to avoid duplicate logs on re-entry
31+
root_logger = logging.getLogger()
32+
for h in list(root_logger.handlers):
33+
root_logger.removeHandler(h)
34+
35+
root_logger.setLevel(level)
36+
37+
formatter = logging.Formatter(
38+
fmt="%(asctime)s | %(levelname)s | %(processName)s | %(name)s | %(message)s",
39+
datefmt="%Y-%m-%d %H:%M:%S",
40+
)
41+
if not log_to_console:
42+
# File logging
43+
output_dir.mkdir(parents=True, exist_ok=True)
44+
file_handler = logging.FileHandler(log_file, mode="a", encoding="utf-8")
45+
file_handler.setLevel(level)
46+
file_handler.setFormatter(formatter)
47+
root_logger.addHandler(file_handler)
48+
49+
# Optional: keep console lean; INFO to file is the default requirement
50+
console = logging.StreamHandler(stream=sys.stdout)
51+
console.setLevel(level if log_to_console else logging.WARNING)
52+
console.setFormatter(formatter)
53+
root_logger.addHandler(console)
54+
55+
logging.info("Logging initialized.")
56+
if not log_to_console:
57+
logging.info(f"Log file: {log_file}")
58+
return log_file
59+
else:
60+
logging.info("Logging to console")
61+
return Path("CONSOLE")
62+
63+
64+
class ProgressLogger:
65+
"""Helper to log progress at fixed percentage steps and report rate (ips)."""
66+
67+
def __init__(self, total: int, label: str, step_pct: float = 10.0):
68+
self.total = max(1, int(total))
69+
self.label = label
70+
self.step_pct = float(step_pct)
71+
self.completed = 0
72+
self._tick_idx = 0
73+
# lightweight rate tracking
74+
self._start_time = time.monotonic()
75+
self._last_time = self._start_time
76+
self._last_count = 0
77+
logging.info(f"{self.label}: 0% complete (0/{self.total}) | ips=0.00 avg_ips=0.00")
78+
79+
def update(self, n: int = 1) -> None:
80+
self.completed += n
81+
pct = (self.completed / self.total) * 100.0
82+
# compute next threshold based on tick index, guard at 100%
83+
threshold = min(100.0, (self._tick_idx + 1) * self.step_pct)
84+
while pct >= threshold:
85+
now = time.monotonic()
86+
dt_inst = max(now - self._last_time, 1e-9)
87+
dt_avg = max(now - self._start_time, 1e-9)
88+
inst_ips = (self.completed - self._last_count) / dt_inst
89+
avg_ips = self.completed / dt_avg
90+
# choose percent formatting based on step size
91+
if self.step_pct >= 1.0:
92+
pct_str = f"{int(threshold)}%"
93+
else:
94+
pct_str = f"{threshold:.1f}%"
95+
logging.info(
96+
f"{self.label}: {pct_str} complete ({self.completed}/{self.total}) | ips={inst_ips:.2f} avg_ips={avg_ips:.2f}"
97+
)
98+
# reset window
99+
self._last_time = now
100+
self._last_count = self.completed
101+
self._tick_idx += 1
102+
threshold = min(100.0, (self._tick_idx + 1) * self.step_pct)
103+
20104
class DiffractionGenerator:
21105
"""
22106
Library for large-scale diffraction simulation.
@@ -35,115 +119,113 @@ def __init__(self, input_dir, output_dir, instprm_file, n_parallel_sims=70, erro
35119
self.worker_base_dir = Path(worker_base_dir).resolve() if worker_base_dir else self.output_dir / 'worker_temp_dirs'
36120
for dirname in [self.data_dir, self.error_dir, self.worker_base_dir]:
37121
dirname.mkdir(parents=True, exist_ok=True)
38-
print(f"Using instrument parameter file: {self.instprm_file}")
122+
logging.info(f"Using instrument parameter file: {self.instprm_file}")
39123

40124
def find_files(self):
41-
print("Searching for simulation input files...")
125+
logging.info("Searching for simulation input files...")
42126
start_time = time.time()
43127
self.file_list = sorted(list(set(self.input_dir.glob('*.cif'))))
44128
elapsed = time.time() - start_time
45-
print(f"--> Found {len(self.file_list)} unique files in {elapsed:.2f} seconds.")
129+
logging.info(f"--> Found {len(self.file_list)} unique files in {elapsed:.2f} seconds.")
46130

47131

48132
def _generate_simulation_tasks(self, n_sims_per_file, master_seed, cleanup_worker_dirs, **kwargs):
49133
"""Generator that yields tasks, skips completed jobs, and includes noise parameters."""
50134
if not self.file_list: self.find_files()
51-
52-
param_ranges = { # Defaults
53-
'strain_range': (0.0, 0.0), 'size_range': (0.0, 0.0), 'U_range': (0.0, 0.0),
54-
'V_range': (0.0, 0.0), 'W_range': (0.0, 0.0), 'st_range': (5.0, 20.0),
55-
'en_range': (20.0, 20.0), 'Npoints_range': (8192, 8192), 'scaler_range': (1.0, 1.0),
56-
'wl_range': (0.6199, 0.6199), 'proportional_noise_range': (0.0, 0.0),
57-
'constant_noise_range': (0, 0),
58-
}
135+
136+
param_ranges = {}
59137
param_ranges.update(kwargs)
60138

61139
rng = np.random.default_rng(master_seed)
62140
job_id_counter = 0
63-
tasks_to_run, tasks_to_skip = [], 0
64-
141+
tasks_to_skip = 0
142+
processed_count = 0
143+
65144
total_planned = len(self.file_list) * int(n_sims_per_file)
66-
disable_bar = not sys.stderr.isatty()
67-
with tqdm(
68-
total=total_planned,
69-
desc="Preparing tasks",
70-
unit="task",
71-
mininterval=1.0,
72-
disable=disable_bar,
73-
) as pbar:
74-
for file_path in self.file_list:
75-
stem = Path(file_path).stem
76-
for variation_index in range(1, n_sims_per_file + 1):
77-
final_output_path = self.data_dir / f"{stem}-{variation_index}.npy"
78-
if final_output_path.exists():
79-
tasks_to_skip += 1
80-
job_id_counter += 1
81-
pbar.update(1)
82-
continue
83-
84-
worker_dir = self.worker_base_dir / f'job_{job_id_counter:09d}'
85-
params = {
86-
'job_id': job_id_counter, 'worker_dir': str(worker_dir), 'input_file': str(file_path),
87-
'output_data_dir': str(self.data_dir), 'error_dir': str(self.error_dir),
88-
'instprm_file': str(self.instprm_file), 'noise_seed': rng.integers(1e9),
89-
'strain': rng.uniform(*param_ranges['strain_range']), 'size': rng.uniform(*param_ranges['size_range']),
90-
'U': rng.uniform(*param_ranges['U_range']), 'V': rng.uniform(*param_ranges['V_range']),
91-
'W': rng.uniform(*param_ranges['W_range']), 'st': rng.uniform(*param_ranges['st_range']),
92-
'en': rng.uniform(*param_ranges['en_range']), 'Npoints': int(rng.uniform(*param_ranges['Npoints_range'])),
93-
'scaler': rng.uniform(*param_ranges['scaler_range']), 'wl': rng.uniform(*param_ranges['wl_range']),
145+
step_pct = float(kwargs['progress_step_pct'])
146+
prog_logger = ProgressLogger(total_planned, label="Task preparation", step_pct=step_pct)
147+
for file_path in self.file_list:
148+
stem = Path(file_path).stem
149+
for variation_index in range(1, n_sims_per_file + 1):
150+
worker_dir = self.worker_base_dir / f'job_{job_id_counter:09d}'
151+
params = {
152+
'job_id': job_id_counter,
153+
'worker_dir': str(worker_dir),
154+
'input_file': str(file_path),
155+
'output_data_dir': str(self.data_dir),
156+
'error_dir': str(self.error_dir),
157+
'instprm_file': str(self.instprm_file),
158+
# Ranges for worker-side parameter generation
159+
'param_ranges': {
160+
'strain_range': param_ranges['strain_range'],
161+
'size_range': param_ranges['size_range'],
162+
'U_range': param_ranges['U_range'],
163+
'V_range': param_ranges['V_range'],
164+
'W_range': param_ranges['W_range'],
165+
'st_range': param_ranges['st_range'],
166+
'en_range': param_ranges['en_range'],
167+
'Npoints_range': param_ranges['Npoints_range'],
168+
'scaler_range': param_ranges['scaler_range'],
169+
'wl_range': param_ranges['wl_range'],
94170
'proportional_noise_range': param_ranges['proportional_noise_range'],
95171
'constant_noise_range': param_ranges['constant_noise_range'],
96-
'cleanup_worker_dir': cleanup_worker_dirs,
97-
'output_filename': f"{stem}-{variation_index}.npy",
98-
'parse_from_comment': bool(kwargs.get('parse_from_comment', False)),
99-
}
100-
tasks_to_run.append(params)
101-
job_id_counter += 1
102-
pbar.update(1)
103-
104-
if tasks_to_skip > 0: print(f"--> Found and skipped {tasks_to_skip} previously completed jobs.")
105-
for task in tasks_to_run: yield task
172+
},
173+
'seed': int(rng.integers(1_000_000_000)),
174+
'cleanup_worker_dir': cleanup_worker_dirs,
175+
'output_filename': f"{stem}-{variation_index}.npy",
176+
'parse_from_comment': bool(kwargs['parse_from_comment']),
177+
}
178+
job_id_counter += 1
179+
processed_count += 1
180+
prog_logger.update(1)
181+
yield params
106182

107183
def run(self, n_sims_per_file, master_seed=12345, cleanup_worker_dirs=True, **kwargs):
108-
tasks = list(self._generate_simulation_tasks(n_sims_per_file, master_seed, cleanup_worker_dirs, **kwargs))
109-
if not tasks:
110-
print("All simulation jobs are already complete. Nothing to do.")
184+
# Prepare counts but stream tasks to pool to avoid huge memory/time
185+
if not self.file_list:
186+
self.find_files()
187+
total_jobs_to_run = len(self.file_list) * int(n_sims_per_file)
188+
progress_step_pct = float(kwargs['progress_step_pct'])
189+
tasks_iter = self._generate_simulation_tasks(n_sims_per_file, master_seed, cleanup_worker_dirs, **kwargs)
190+
if total_jobs_to_run == 0:
191+
logging.info("All simulation jobs are already complete. Nothing to do.")
111192
return
112193

113-
total_jobs_to_run = len(tasks)
114-
print(f"Starting {total_jobs_to_run} new simulations with {self.n_parallel_sims} parallel processes...")
115-
194+
logging.info(f"Starting {total_jobs_to_run} new simulations with {self.n_parallel_sims} parallel processes...")
195+
116196
simulation_start_time = time.time()
117197
success_count = 0
118-
198+
prog_logger = ProgressLogger(total_jobs_to_run, label="Simulation progress", step_pct=progress_step_pct)
199+
119200
with Pool(self.n_parallel_sims, maxtasksperchild=1000, initializer=simulation_worker.suppress_worker_stdout) as p:
120-
results = tqdm(p.imap_unordered(simulation_worker.run_single_simulation, tasks), total=total_jobs_to_run)
121-
for result in results:
122-
if result: success_count += 1
201+
for result in p.imap_unordered(simulation_worker.run_single_simulation, tasks_iter, chunksize=1000):
202+
if result:
203+
success_count += 1
204+
prog_logger.update(1)
123205

124206
simulation_end_time = time.time()
125207

126208
cleanup_duration = 0
127209
if cleanup_worker_dirs:
128-
print("Cleaning up temporary worker directories...")
210+
logging.info("Cleaning up temporary worker directories...")
129211
cleanup_start_time = time.time()
130212
shutil.rmtree(self.worker_base_dir)
131213
cleanup_duration = time.time() - cleanup_start_time
132-
print(f"--> Cleanup complete in {cleanup_duration:.2f} seconds.")
214+
logging.info(f"--> Cleanup complete in {cleanup_duration:.2f} seconds.")
133215

134216
simulation_duration = simulation_end_time - simulation_start_time
135217
total_duration = simulation_duration + cleanup_duration
136218
failure_count = total_jobs_to_run - success_count
137219
sims_per_sec = (success_count / simulation_duration) if simulation_duration > 0 else 0
138-
avg_time_per_sim = (success_count / total_jobs_to_run) if total_jobs_to_run > 0 else 0
220+
avg_time_per_sim = (simulation_duration / success_count) if success_count > 0 else 0
139221

140-
print("\n--- Performance Summary ---")
141-
print(f"Jobs to Run: {total_jobs_to_run} | Succeeded: {success_count} | Failed: {failure_count}")
142-
print(f"Total Simulation Time: {time.strftime('%H:%M:%S', time.gmtime(simulation_duration))}")
143-
print(f"Overall Run Time: {time.strftime('%H:%M:%S', time.gmtime(total_duration))}")
144-
print(f"Throughput: {sims_per_sec:.2f} simulations/sec")
145-
print(f"Avg. Time/Simulation: {avg_time_per_sim:.3f} seconds")
146-
print("---------------------------")
222+
logging.info("\n--- Performance Summary ---")
223+
logging.info(f"Jobs to Run: {total_jobs_to_run} | Succeeded: {success_count} | Failed: {failure_count}")
224+
logging.info(f"Total Simulation Time: {time.strftime('%H:%M:%S', time.gmtime(simulation_duration))}")
225+
logging.info(f"Overall Run Time: {time.strftime('%H:%M:%S', time.gmtime(total_duration))}")
226+
logging.info(f"Throughput: {sims_per_sec:.2f} simulations/sec")
227+
logging.info(f"Avg. Time/Simulation: {avg_time_per_sim:.3f} seconds")
228+
logging.info("---------------------------")
147229

148230

149231
def load_config(path: Path) -> Dict[str, Any]:
@@ -171,12 +253,23 @@ def main() -> None:
171253
error_directory = str(Path(cfg["error_directory"]))
172254
worker_base_dir = str(Path(cfg["worker_base_dir"]))
173255

256+
# Initialize logging (default to file in /data mount)
257+
log_to_console = bool(cfg["log_to_console"])
258+
try:
259+
log_path = setup_logging('/data', log_to_console=log_to_console)
260+
except Exception as e:
261+
# Fallback to a writable temp directory if /data cannot be used
262+
fallback = Path('/tmp')
263+
log_path = setup_logging(fallback, log_to_console=log_to_console)
264+
logging.warning(f"Failed to initialize logging in /data: {e}. Using fallback: {fallback}")
265+
174266
# Execution controls
175267
parallel_jobs = int(cfg["parallel_jobs"])
176268
sims_per_file = int(cfg["sims_per_file"])
177269
master_seed = int(cfg["master_seed"])
178270
cleanup_worker_dirs = bool(cfg["cleanup_worker_dirs"])
179-
parse_from_comment = bool(cfg.get("parse_from_comment", False))
271+
parse_from_comment = bool(cfg["parse_from_comment"])
272+
progress_step_pct = float(cfg["progress_step_pct"])
180273

181274
# Parameter ranges
182275
ranges = {
@@ -195,21 +288,24 @@ def main() -> None:
195288
}
196289

197290
# Log planned operation
198-
print("\n--- Simulator configuration ---")
199-
print(f"Input directory: {input_directory}")
200-
print(f"Output directory: {output_directory}")
201-
print(f"Error directory: {error_directory}")
202-
print(f"Worker base dir: {worker_base_dir}")
203-
print(f"Instrument file: {instprm_file}")
204-
print(f"Parallel jobs: {parallel_jobs}")
205-
print(f"Sims per file: {sims_per_file}")
206-
print(f"Master seed: {master_seed}")
207-
print(f"Cleanup worker dirs: {cleanup_worker_dirs}")
208-
print(f"Parse from comment: {parse_from_comment}")
209-
print("Parameter ranges:")
291+
logging.info("\n--- Simulator configuration ---")
292+
logging.info(f"Log file: {log_path}")
293+
logging.info(f"Input directory: {input_directory}")
294+
logging.info(f"Output directory: {output_directory}")
295+
logging.info(f"Error directory: {error_directory}")
296+
logging.info(f"Worker base dir: {worker_base_dir}")
297+
logging.info(f"Instrument file: {instprm_file}")
298+
logging.info(f"Parallel jobs: {parallel_jobs}")
299+
logging.info(f"Sims per file: {sims_per_file}")
300+
logging.info(f"Master seed: {master_seed}")
301+
logging.info(f"Cleanup worker dirs: {cleanup_worker_dirs}")
302+
logging.info(f"Parse from comment: {parse_from_comment}")
303+
logging.info(f"Progress step percent: {progress_step_pct}")
304+
logging.info(f"Log to console: {log_to_console}")
305+
logging.info("Parameter ranges:")
210306
for k, v in ranges.items():
211-
print(f" {k}: {v}")
212-
print("--------------------------------\n")
307+
logging.info(f" {k}: {v}")
308+
logging.info("--------------------------------\n")
213309

214310
# Run simulations
215311
try:
@@ -226,10 +322,11 @@ def main() -> None:
226322
master_seed=master_seed,
227323
cleanup_worker_dirs=cleanup_worker_dirs,
228324
parse_from_comment=parse_from_comment,
325+
progress_step_pct=progress_step_pct,
229326
**ranges,
230327
)
231328
except Exception as e:
232-
print(f"FATAL: Simulator run failed: {e}", file=sys.stderr)
329+
logging.exception(f"FATAL: Simulator run failed: {e}")
233330
sys.exit(1)
234331

235332

0 commit comments

Comments
 (0)