import argparse import copy import itertools import os import re import warnings from typing import Optional import matplotlib.colors import matplotlib.pyplot as plt import MDAnalysis as mda import numpy as np from database import MoleculeDatabase from scipy.stats import gaussian_kde def find_species_indices( names: np.ndarray | mda.Universe, species_name: str ) -> np.ndarray: """Calculate the index of each particle of species `species_name` in a topology HDF5 file or a MDAnalysis trajectory""" if isinstance(names, np.ndarray): indices = np.where(names == np.bytes_(species_name)) elif isinstance(names, mda.Universe): indices = names.atoms.select_atoms(f"type {species_name}").indices return indices def parse_molecule_species(names: mda.Universe, molecules=None): """Sorts particle species into index sets""" assert isinstance(names, mda.Universe), TypeError( "names must be an MDAnalysis.Universe instance." ) _, idx = np.unique(names.residues.resnames, return_index=True) unique_resnames = names.residues.resnames[np.sort(idx)] db = MoleculeDatabase() # TODO: this parsing is bullshit, we should read the types from the topology # and not simply use the atom name initial from the gro file species = {} for mol in unique_resnames: mol_bk = mol if mol in ("W", "HOH", "SOL", "TIP3"): mol_bk = "W" try: d = getattr(db, mol_bk.lower()) except: raise ValueError( f"The molecule {mol_bk} is not present in the database. Please add it to 'database.py'." ) types_ = {s.decode("UTF-8"): None for s in d[0] if s != b"type:"} names_ = {s: None for s in d[1]} for t in types_: s_ = t.replace(":", " ").replace("lipid", "resname") _, letter = s_.split() species[t] = names.atoms.select_atoms( f"resname {mol} and name {letter}*" ).indices for n in names_: s_ = n.replace(":", " ").replace("lipid", "resname") species[n] = names.atoms.select_atoms(f"{s_}").indices species = {k: np.array(v, dtype=int) for k, v in species.items() if v is not None} return species def calculate_center_of_mass( positions: np.ndarray | mda.Universe, simulation_box: np.ndarray, indices: np.ndarray, axis: int, ) -> np.ndarray: """Calculate the center of mass of the particles in the `positions` array with indices given by the array `indices` along direction `axis` for each time step snapshot. In order to calculate the position of the center of mass under periodic boundary conditions, the dimension in question of the simulation box is mapped onto a circle of circumference equal to the box length. Then the angles corresponding to particle positions are averaged (not weighted by the masses, which are assumed to be equal for all particles), and the *center of mass angle* is back-mapped to the corresponding position in the simulation box. """ def compute_center_of_mass_single(p, n, b, c, t): p_mapped = 2 * np.pi * p / b cos_p_mapped = np.cos(p_mapped) sin_p_mapped = np.sin(p_mapped) cos_average = np.sum(cos_p_mapped) / n sin_average = np.sum(sin_p_mapped) / n theta = np.arctan2(-sin_average, -cos_average) + np.pi c[t] = b * theta / (2 * np.pi) return c if isinstance(positions, np.ndarray): M = positions.shape[0] N = positions.shape[1] center_of_mass = np.empty(shape=(M,), dtype=np.float64) for time_step in range(M): p = positions[time_step, indices, axis] center_of_mass = compute_center_of_mass_single( p, N, simulation_box[time_step, axis, axis], center_of_mass, time_step ) elif isinstance(positions, mda.Universe): M = len(positions.trajectory) N = len(positions.atoms) center_of_mass = np.empty(shape=(M,), dtype=np.float64) for time_step in positions.trajectory: p = positions.atoms.positions[indices, axis] center_of_mass = compute_center_of_mass_single( p, N, time_step.triclinic_dimensions[axis, axis], center_of_mass, time_step.frame, ) else: raise TypeError( "positions must be either type numpy.ndarray or " "MDAnalysis.Universe." ) return center_of_mass def load_gromacs_simulation( topology_file_path: str, trajectory_file_path: str ) -> mda.Universe: """Load a Gromacs trajectory from .gro and .trr files using MDAnalysis""" with warnings.catch_warnings(): warnings.filterwarnings( action="ignore", category=Warning, module="MDAnalysis", message=r"Failed to guess the mass for the following atom types", ) universe = mda.Universe(topology_file_path, trajectory_file_path) return universe def compute_centered_histogram( centers: np.ndarray, positions: np.ndarray | mda.Universe, simulation_box: np.ndarray, species: dict[str, np.ndarray], axis: int, bins: int = 10, density: bool = False, symmetrize: bool = False, skip_first: int = 0, skip: int = 1, frames: Optional[int] = None, time: Optional[np.ndarray] = None, file_names: str = "", silent: bool = False, range_: Optional[float] = None, ) -> tuple[np.ndarray, dict[str, np.ndarray]]: """Calculate the density histogram along a direction of positions at specified indices realative to a given simlulation box center Calculates the average histogram of `positions` along direction `axis` centered at position `center` in the `simulation_box`. Only positions satisfying the condition specified in `indices` are counted. """ if range_ is None: range_ = np.min(simulation_box[:, axis, axis]) if isinstance(positions, mda.Universe): range_ = range_ / 10.0 warnings.warn( f"No --range specified, using min(simulation_box[:, axis]) by " f"default ({range_})" ) histograms = {s: np.zeros(shape=(bins,), dtype=np.float64) for s in species} kde = {s: np.zeros(shape=(bins,)) for s in species} def compute_histogram_single(p, c): p += 0.5 * box_length_axis - c p[p > box_length_axis] -= box_length_axis p[p < 0.0] += box_length_axis p -= 0.5 * box_length_axis for ss, ind in species.items(): pos = p[ind] gauss = gaussian_kde(pos, bw_method=0.5 * bin_size) hist, _ = np.histogram(pos, bins=histogram_bins, density=density) if density: scaling_factor = 1.0 else: box_length[axis, axis] = axis_range scaling_factor = np.prod(np.diag(box_length)) / len(histogram_bins) histograms[ss] += hist / scaling_factor kde[ss] += gauss(bin_centers) / scaling_factor * len(pos) * bin_size return histograms, kde if not silent: print(f" → Computing histograms from {file_names}:") H = 0 L = len(positions.trajectory) M = min(L - 1, skip_first + frames * skip) if frames is not None else L for time_step in range(skip_first, M, skip): p = positions.trajectory[time_step].positions[:, axis] / 10.0 box_length = simulation_box[time_step, ...] / 10.0 box_length_axis = simulation_box[time_step, axis, axis] / 10.0 x_start = -0.5 * range_ x_end = +0.5 * range_ range__ = (x_start, x_end) axis_range = x_end - x_start histogram_bins = np.histogram_bin_edges( np.zeros(shape=(1,)), bins=bins, range=range__, ) bin_size = np.mean(np.diff(histogram_bins)) bin_centers = histogram_bins - 0.5 * bin_size bin_centers = bin_centers[1:] histograms, kde = compute_histogram_single(p, centers[time_step] / 10.0) last_frame = time_step H += 1 if not silent: if positions.trajectory[skip_first].time > 1000.0: t0 = positions.trajectory[skip_first].time / 1000.0 t0_unit = "ns" else: t0 = positions.trajectory[skip_first].time t0_unit = "ps" if positions.trajectory[last_frame].time > 1000.0: t1 = positions.trajectory[last_frame].time / 1000.0 t1_unit = "ns" else: t1 = positions.trajectory[last_frame].time t1_unit = "ps" print( f"Calculated histograms for {H} frames (first frame " f"{skip_first}, last frame {last_frame}, skipping every " f"{skip}) starting at {t0:.3f} {t0_unit}, ending at {t1:.3f} " f"{t1_unit}." ) for s in histograms: histograms[s] /= float(H) kde[s] /= float(H) if symmetrize: for s in histograms: histograms[s] = 0.5 * (histograms[s] + np.flip(histograms[s])) return bin_size, bin_centers, histograms, kde def plot_histogram( bin_size, bin_midpoints, histogram, kde, bw=1.0, figwidth=4.0, figheight=3.0, show=True, ignore=None, one_side=False, vmd_colors=False, remove_legend=False, out_ref=None, out_kde=None, ): """Visualize a histogram""" fig, ax = plt.subplots(1, 1) fig.set_figwidth(figwidth) fig.set_figheight(figheight) colors = matplotlib.colors.TABLEAU_COLORS markers = ( # fmt: off ".", "o", "v", "^", ">", "<", "s", "p", "x", "D", "H", "1", "2", "3", ) colors_cycle = itertools.cycle(colors) markers_cycle = itertools.cycle(markers) ignore = [re.compile(i) for i in ignore] keys = list(histogram.keys()) for k in keys: for i in ignore: if re.fullmatch(i, k) is not None: del histogram[k] del kde[k] # N = bin_midpoints.size # if np.mod(N, 2) == 2: # bin_midpoints -= np.mean(bin_midpoints[N // 2 - 1 : N // 2 + 1]) # else: # bin_midpoints -= bin_midpoints[N // 2] # Save gaussian kde reference and histograms if out_ref is not None: np.save( out_ref, np.insert(np.array(list(histogram.values())), 0, bin_midpoints, axis=0), ) if out_kde is not None: np.save( out_kde, np.insert(np.array(list(kde.values())), 0, bin_midpoints, axis=0), ) for s in histogram.keys(): label = s marker = next(markers_cycle) if len(histogram[s]) < 26 else None if vmd_colors and "type" in s: if s == "type:N": color = (0.0, 0.0, 1.0) # blue label = "N" marker = "o" elif s == "type:C": color = (0.25, 0.75, 0.75) # cyan label = "C" marker = "x" elif s == "type:P": color = (0.5, 0.5, 0.2) # tan label = "P" marker = "^" elif s == "type:G": color = (1.0, 0.6, 0.6) # pink label = "G" marker = "v" elif s == "type:W": color = (0.0, 0.0, 0.0) # black label = "W" marker = "s" elif s == "type:D": color = (0.3, 0.8, 0.8) label = "D" marker = "<" elif s == "type:L": color = (0.3, 0.3, 0.3) label = "L" marker = ">" else: color = next(colors_cycle) ax.plot( bin_midpoints, histogram[s], color=color, label=label, marker=marker, ) fontsize = 15 plt.subplots_adjust(bottom=0.2, left=0.22, right=0.97, top=0.97) if not remove_legend: ax.legend(loc="best", fontsize=fontsize, framealpha=0) ax.tick_params(axis="both", which="major", labelsize=15) ax.tick_params(axis="both", which="minor", labelsize=15) ax.set_xlabel("Position along normal, nm", fontsize=fontsize) ax.set_ylabel("Number density, nm⁻³", fontsize=fontsize) if one_side: _, x_upper = ax.get_xlim() ax.set_xlim((0.0, x_upper)) if show: plt.show() return fig, ax def calculate_area_per_lipid( positions, simulation_box, axis, parser, ref, time=None, molecules=None, names=None, ): """Compute the area per lipid for each simulation frame""" box = copy.deepcopy(simulation_box) box = box / 10.0 # Convert to nm from Å in MDA. t = np.array( [ts.time for ts in positions.trajectory], dtype=np.float64, ) lipid_atoms = positions.select_atoms("not type W CA CL NA") n_lipids = len(np.unique([a.resid for a in lipid_atoms.atoms])) n_frames = len(t) M = simulation_box.shape[0] area_per_lipid = np.zeros(shape=(M,), dtype=np.float64) for i, b in enumerate(box[:, ...]): b_ = copy.deepcopy(b) b_[axis, axis] = 1.0 area_per_lipid[i] = np.prod(np.diag(b_)) area_per_lipid = 2 * area_per_lipid / n_lipids if ref is False: skip_ = parser.skip skip_first_ = parser.skip_first frames_ = parser.frames else: skip_ = parser.skip_ref skip_first_ = parser.skip_first_ref frames_ = parser.frames_ref M = ( min(n_frames - 1, skip_first_ + frames_ * skip_) if frames_ is not None else n_frames ) return np.mean(area_per_lipid[range(skip_first_, M, skip_)]), np.std( area_per_lipid[range(skip_first_, M, skip_)] ) def action_compute_histogram(parser, ref=False): """Compute histogram and area per lipid from H5MD or Gromacs trajectory""" if ref: axis = parser.axis_ref if parser.axis_ref is not None else parser.axis traj_path = parser.ref_traj top_path = parser.ref_top else: axis = parser.axis traj_path = parser.traj top_path = parser.top universe = load_gromacs_simulation(top_path, traj_path) simulation_box = np.zeros( shape=(len(universe.trajectory), 3, 3), dtype=np.float64, ) for i, ts in enumerate(universe.trajectory): simulation_box[i, ...] = ts.triclinic_dimensions # NOTE: Calculate COM only for atom names that start with C (potentially dangerous!) com_indices = find_species_indices(universe, "C") com_center_of_mass = calculate_center_of_mass( universe, simulation_box, com_indices, axis ) species = parse_molecule_species(universe, None) if parser.resolution == "names": total_re = re.compile("name:.*") elif parser.resolution == "types": total_re = re.compile("type:.*") else: raise ValueError("resolution must be either 'names' or 'types'.") keys = list(species.keys()) for k in keys: if re.fullmatch(total_re, k) is None: del species[k] if ref is False: skip_ = parser.skip skip_first_ = parser.skip_first frames_ = parser.frames else: skip_ = parser.skip_ref skip_first_ = parser.skip_first_ref frames_ = parser.frames_ref bin_size, histogram_bins, histograms, kde = compute_centered_histogram( com_center_of_mass, universe, simulation_box, species, axis=axis, bins=parser.bins, symmetrize=parser.symmetrize, density=parser.density, skip=skip_, skip_first=skip_first_, frames=frames_, range_=parser.range, file_names=((os.path.abspath(top_path) + ", " + os.path.abspath(traj_path))), ) area_per_lipid = calculate_area_per_lipid( universe, simulation_box, axis, parser, ref=ref ) if parser.cg_water: if not parser.density: # Density already normalizes by the total number of molecules if parser.resolution == "names": histograms["name:W"] /= 4.0 kde["name:W"] /= 4.0 else: histograms["type:W"] /= 4.0 kde["type:W"] /= 4.0 return bin_size, histogram_bins, histograms, area_per_lipid, kde def action_plot(parser): """Execute the 'plot' action of hymd_optimize""" ( bin_size, histogram_bins, histograms, area_per_lipid, kde, ) = action_compute_histogram(parser) print(f"Average area per lipid = {area_per_lipid[0]} ({area_per_lipid[1]}) nm²") print( "NOTE: MDAnalysis does not recognize residues when using H5MD, " "therefore the number printed above is (2 * average XY area). " "Divide it by the number of lipids in your system to obtain the area per lipid." ) fig, ax = plot_histogram( bin_size, histogram_bins, histograms, kde, bw=parser.bw, ignore=parser.ignore, show=not parser.no_show, one_side=parser.one_side, vmd_colors=parser.vmd_colors, out_ref=parser.out_ref, out_kde=parser.out_kde, ) return fig, ax if __name__ == "__main__": parser = argparse.ArgumentParser( description=( "Tools for calculating and plotting coarse grained membrane density profiles." ) ) parser.add_argument( "--out", type=str, default=None, metavar="file name", help="output file path", ) parser.add_argument( "--out_ref", type=str, default=None, metavar="file name", help="output file path", ) parser.add_argument( "--out_kde", type=str, default=None, metavar="file name", help="output file path", ) parser.add_argument( "--no-show", "--noshow", default=False, action="store_true", dest="no_show", help="do not show the plot", ) parser.add_argument( "--save-plot", action="store_true", default=False, dest="save_plot", help="save the histogram plot to this file path", ) parser.add_argument( "--force", "-f", action="store_true", default=False, dest="force", help="overwrite existing output file path", ) parser.add_argument( "--one-side", "--oneside", default=False, action="store_true", dest="one_side", help="only plot one side of the membrane", ) parser.add_argument( "--traj", type=str, default=None, help="test trajectory file path (.trr or .H5MD)", ) parser.add_argument( "--top", type=str, default=None, help="test topology file path (.gro or HyMD-input .H5)", ) parser.add_argument( "--bins", type=int, default=25, help="number of bins to use in the histograms and histogram plots", ) parser.add_argument( "--density", action="store_true", help="compute density distribution histograms, not the number density", ) parser.add_argument( "--symmetrize", action="store_true", default=False, help=( "symmetrize the histogram(s) by averaging the histogram(s) and " "the flipped histogram." ), ) parser.add_argument( "--range", type=float, default=None, help="histogram x range to consider", ) parser.add_argument( "--axis", type=int, choices=[0, 1, 2], default=2, help="the direction in which to calculate the histogram(s)", ) parser.add_argument( "--ignore", default=[], nargs="+", help=( "species names to ignore in the calculation of the histograms " "(evaluated as list of regex expressions)" ), ) parser.add_argument( "--resolution", type=str, choices=["types", "names"], default="types", help=( "if 'names', the specific Martini particle subspecies names (NC3, " "PO4, etc.) are compared in the fitness. If 'types', particle " "types (N, P, G, etc.) are compared in the fitness" ), ) parser.add_argument( "--skip", type=int, default=1, help="consider only every N frames when calculating the histograms", ) parser.add_argument( "--skip-first", type=int, default=0, help="skip the first N frames when calculating the histograms", ) parser.add_argument( "--frames", type=int, default=None, help=( "Only consider N frames, starting at --skip_first (with step " "size --skip)." ), ) parser.add_argument( "--vmd-colors", "--vmdcolors", "-vmdcolors", action="store_true", default=False, help="use the same colors for beads as default in vmd", ) parser.add_argument( "--cg_water", action="store_true", help="Divide water density profile by 4.0." ) parser.add_argument( "--bw", type=float, default=1.0, help="Ratio for the KDE width (bw * bin_size). Default is 1.0.", ) args = parser.parse_args() traj_name, traj_ext = os.path.splitext(args.traj) def cleanup(): if traj_ext == ".h5": os.rename(args.traj, traj_name + traj_ext) if traj_ext == ".h5": new_name = traj_name + ".h5md" os.rename(args.traj, new_name) args.traj = new_name if args.traj is None or args.top is None: parser.error("Must specify both --traj and --top.") if args.out is None and args.no_show: warnings.warn( "No --out path specified and --no-show specified, not doing anything(!)." ) elif args.out is None and not args.no_show: warnings.warn("No --out path specified, not saving plot.") try: fig, _ = action_plot(args) cleanup() if args.out is not None: if args.force and os.path.exists(args.out): print(f"Saving figure to {args.out} (overwriting existing).") elif os.path.exists(args.out): raise FileExistsError( f"The file {args.out} already exists. To force overwrite, " "use the -f option." ) else: print(f"Saving figure to {args.out}.") fig.savefig(args.out, format="pdf", transparent=True) except KeyboardInterrupt: print("Program interrupted, cleaning up...") cleanup() exit() except Exception as e: cleanup() raise e