Skip to content
83 changes: 59 additions & 24 deletions xfel/command_line/striping.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# dials.combine_experiments (optionally with clustering and selecting clusters).
#
from dials.util import show_mail_on_error
from dxtbx.model import ExperimentList
from libtbx.phil import parse
from libtbx.utils import Sorry
from libtbx import easy_run
Expand All @@ -22,7 +23,11 @@

multiprocessing_override_str = '''
mp {
method = local
use_mpi = False
mpi_command = source
mpi_option = ""
local.include_mp_in_command = False
}
'''

Expand All @@ -47,7 +52,7 @@
.help = "Enable to select results evenly spaced across each rungroup"
"(stripes) as opposed to contiguous chunks."
chunk_size = 1000
.type = float
.type = int(value_min=1)
.help = "Maximum number of images per chunk or stripe."
respect_rungroup_barriers = True
.type = bool
Expand Down Expand Up @@ -243,6 +248,46 @@
for interactive unit cell clustering, use combine_experiments.clustering.dendrogram=True
"""


def chunk_pairs(expt_paths, refl_paths, max_size=1000):
"""Distribute matching expt-refl pairs into chunks with < `max_size` expts"""
expt_lengths = [len(ExperimentList.from_file(expt_path, check_format=False))
for expt_path in expt_paths]
chunk_count = math.ceil(sum(expt_lengths) / max_size)
estimated_fill = sum(expt_lengths) / chunk_count
chunks_indices = [[] for _ in range(chunk_count)]
chunk_lengths = [0] * chunk_count
currently_filled_chunk = 0
for len_index, len_ in enumerate(expt_lengths):
if len_ / 2 + chunk_lengths[currently_filled_chunk] > estimated_fill:
currently_filled_chunk = min(chunk_count - 1, currently_filled_chunk + 1)
chunks_indices[currently_filled_chunk].append(len_index)
chunk_lengths[currently_filled_chunk] += len_
chunked_expts, chunked_refls = [], []
for chunk_indices in chunks_indices:
chunked_expts.append([expt_paths[i] for i in chunk_indices])
chunked_refls.append([refl_paths[i] for i in chunk_indices])
return chunked_expts, chunked_refls, chunk_lengths


def stripe_pairs(expt_paths, refl_paths, max_size=1000):
"""Distribute matching expt-refl pairs into stripes with <`max_size` expts"""
expt_lengths = [len(ExperimentList.from_file(expt_path, check_format=False))
for expt_path in expt_paths]
stripe_count = math.ceil(sum(expt_lengths) / max_size)
stripe_indices = [[] for _ in range(stripe_count)]
stripe_lengths = [0, ] * stripe_count
for len_index, len_ in enumerate(expt_lengths):
currently_filled_stripe = stripe_lengths.index(min(stripe_lengths))
stripe_indices[currently_filled_stripe].append(len_index)
stripe_lengths[currently_filled_stripe] += len_
striped_expts, striped_refls = [], []
for chunk_indices in stripe_indices:
striped_expts.append([expt_paths[i] for i in chunk_indices])
striped_refls.append([refl_paths[i] for i in chunk_indices])
return striped_expts, striped_refls, stripe_lengths


def allocate_chunks(results_dir,
trial_no,
rgs_selected=None,
Expand Down Expand Up @@ -273,7 +318,6 @@ def allocate_chunks(results_dir,
rgs[rg] = [run]
else:
rgs[rg].append(run)
batch_chunk_nums_sizes = {}
batch_contents = {}
if respect_rungroup_barriers:
batchable = {rg:{rg:runs} for rg, runs in six.iteritems(rgs)}
Expand Down Expand Up @@ -312,35 +356,26 @@ def allocate_chunks(results_dir,
print("no images found for %s" % batch)
del batch_contents[batch]
continue
n_chunks = int(math.ceil(n_img/max_size))
chunk_size = int(math.ceil(n_img/n_chunks))
batch_chunk_nums_sizes[batch] = (n_chunks, chunk_size)
if len(batch_contents) == 0:
raise Sorry("no DIALS integration results found.")
refl_ending += extension
batch_chunks = {}
for batch, num_size_tuple in six.iteritems(batch_chunk_nums_sizes):
num, size = num_size_tuple
for batch in batchable:
batch_chunks[batch] = []
contents = batch_contents[batch]
expts = [c for c in contents if c.endswith(expt_ending)]
refls = [c for c in contents if c.endswith(refl_ending)]
expts = sorted([c for c in contents if c.endswith(expt_ending)])
refls = sorted([c for c in contents if c.endswith(refl_ending)])
expts, refls = match_dials_files(expts, refls, expt_ending, refl_ending)
if stripe:
for i in range(num):
expts_stripe = expts[i::num]
refls_stripe = refls[i::num]
batch_chunks[batch].append((expts_stripe, refls_stripe))
print("striped %d experiments in %s with %d experiments per stripe and %d stripes" % \
(len(expts), batch, len(batch_chunks[batch][0][0]), len(batch_chunks[batch])))
else:
for i in range(num):
expts_chunk = expts[i*size:(i+1)*size]
refls_chunk = refls[i*size:(i+1)*size]
batch_chunks[batch].append((expts_chunk, refls_chunk))
print("chunked %d experiments in %s with %d experiments per chunk and %d chunks" % \
(len(expts), batch, len(batch_chunks[batch][0][0]), len(batch_chunks[batch])))
return batch_chunks
pack_func = stripe_pairs if stripe else chunk_pairs
expts_packs, refls_packs, pack_lengths = pack_func(expts, refls, max_size)
for expts_pack, refls_pack in zip(expts_packs, refls_packs):
batch_chunks[batch].append((expts_pack, refls_pack))
r = '{} {} experiments from {} files in {} into {} {} with sizes = {}'
print(r.format("Striped" if stripe else "Chunked", sum(pack_lengths),
len(expts), batch, len(pack_lengths),
"stripes" if stripe else "chunks", pack_lengths))
return batch_chunks


def parse_retaining_scope(args, phil_scope=phil_scope):
if "-c" in args:
Expand Down