Source code for mdtraj.formats.mdcrd

##############################################################################
# MDTraj: A Python Library for Loading, Saving, and Manipulating
#         Molecular Dynamics Trajectories.
# Copyright 2012-2013 Stanford University and the Authors
#
# Authors: Robert McGibbon
# Contributors:
#
# MDTraj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 2.1
# of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with MDTraj. If not, see <http://www.gnu.org/licenses/>.
##############################################################################


import itertools
import os

import numpy as np

from mdtraj.formats.registry import FormatRegistry
from mdtraj.utils.unit import in_units_of
from mdtraj.utils.validation import cast_indices, ensure_type

__all__ = ["MDCRDTrajectoryFile", "load_mdcrd"]


class _EOF(IOError):
    pass


@FormatRegistry.register_loader(".mdcrd")
@FormatRegistry.register_loader(".crd")
def load_mdcrd(filename, top=None, stride=None, atom_indices=None, frame=None):
    """Load an AMBER mdcrd file.

    Parameters
    ----------
    filename : path-like
        Path of AMBER mdcrd file.
    top : {str, Trajectory, Topology}
        The mdcrd format does not contain topology information. Pass in either
        the path to a pdb file, a trajectory, or a topology to supply this
        information.
    stride : int, default=None
        Only read every stride-th frame
    atom_indices : array_like, optional
        If not none, then read only a subset of the atoms coordinates from the
        file.
    frame : int, optional
        Use this option to load only a single frame from a trajectory on disk.
        If frame is None, the default, the entire trajectory will be loaded.
        If supplied, ``stride`` will be ignored.

    Returns
    -------
    trajectory : md.Trajectory
        The resulting trajectory, as an md.Trajectory object.

    See Also
    --------
    mdtraj.MDCRDTrajectoryFile :  Low level interface to MDCRD files
    """
    from mdtraj.core.trajectory import _parse_topology

    # we make it not required in the signature, but required here. although this
    # is a little wierd, its good because this function is usually called by a
    # dispatch from load(), where top comes from **kwargs. So if its not supplied
    # we want to give the user an informative error message
    if top is None:
        raise ValueError('"top" argument is required for load_mdcrd')

    if not isinstance(filename, (str, os.PathLike)):
        raise TypeError(
            "filename must be of type path-like for load_mdcrd. " "you supplied %s" % type(filename),
        )

    topology = _parse_topology(top)
    atom_indices = cast_indices(atom_indices)

    with MDCRDTrajectoryFile(filename, topology.n_atoms) as f:
        if frame is not None:
            f.seek(frame)
            n_frames = 1
        else:
            n_frames = None
        return f.read_as_traj(
            topology,
            n_frames=n_frames,
            stride=stride,
            atom_indices=atom_indices,
        )


[docs] @FormatRegistry.register_fileobject(".mdcrd") @FormatRegistry.register_fileobject(".crd") class MDCRDTrajectoryFile: """Interface for reading and writing to an AMBER mdcrd files. This is a file-like object, that both reading or writing depending on the `mode` flag. It implements the context manager protocol, so you can also use it with the python 'with' statement. The conventional units in the mdcrd file are angstroms. The format only supports storing the cartesian coordinates and box lengths. Parameters ---------- filename : path-like The filename to open. A path to a file on disk. n_atoms : int The number of atoms in the system. This is _required_ when mode == 'r' and irrelevant when mode == 'w'. mode : {'r', 'w'} The mode in which to open the file, either 'r' for read or 'w' for write. has_box = 'detect' Does the mdcrd file contain box length information? This is optional when mode == 'r' (and irrelevant when mode == 'w'). The presence or absence of box information can generally be inferred from the file, but there might be corner cases in which this is not possible, because of limitations in the mdcrd format. force_overwrite : bool If opened in write mode, and a file by the name of `filename` already exists on disk, should we overwrite it? """ distance_unit = "angstroms"
[docs] def __init__( self, filename, n_atoms=None, mode="r", has_box="detect", force_overwrite=True, ): """Open an AMBER mdcrd file for reading/writing.""" self._is_open = False self._filename = filename self._n_atoms = n_atoms self._mode = mode self._w_has_box = None self._frame_index = 0 self._has_box = has_box # track which line we're on. this is not essential, but its useful # when reporting errors to the user to say what line it occured on. self._line_counter = 0 if has_box not in [True, False, "detect"]: raise ValueError('has_box must be one of [True, False, "detect"]') if mode == "r": if n_atoms is None: raise ValueError( 'To open a mdcrd file in mode="r", you must ' 'supply the number of atoms, "n_atoms"', ) if not os.path.exists(filename): raise OSError("The file '%s' doesn't exist" % filename) self._fh = open(filename, "rb") self._is_open = True self._fh.readline() # read comment self._line_counter += 1 elif mode == "w": if os.path.exists(filename) and not force_overwrite: raise OSError('"%s" already exists' % filename) self._fh = open(filename, "wb") self._is_open = True else: raise ValueError( 'mode must be one of "r" or "w". ' 'you supplied "%s"' % mode, )
def close(self): """Close the mdcrd file""" if self._is_open: self._fh.close() self._is_open = False def __del__(self): self.close() def __enter__(self): "Support the context manager protocol" return self def __exit__(self, *exc_info): "Support the context manager protocol" self.close() def read_as_traj(self, topology, n_frames=None, stride=None, atom_indices=None): """Read a trajectory from a mdcrd file Parameters ---------- topology : Topology The system topology n_frames : int, optional If positive, then read only the next `n_frames` frames. Otherwise read all of the frames in the file. stride : np.ndarray, optional Read only every stride-th frame. atom_indices : array_like, optional If not none, then read only a subset of the atoms coordinates from the file. This may be slightly slower than the standard read because it required an extra copy, but will save memory. Returns ------- trajectory : Trajectory A trajectory object containing the loaded portion of the file. """ from mdtraj.core.trajectory import Trajectory if atom_indices is not None: topology = topology.subset(atom_indices) initial = int(self._frame_index) xyz, cell_lengths = self.read( n_frames=n_frames, stride=stride, atom_indices=atom_indices, ) if len(xyz) == 0: return Trajectory(xyz=np.zeros((0, topology.n_atoms, 3)), topology=topology) in_units_of(xyz, self.distance_unit, Trajectory._distance_unit, inplace=True) in_units_of( cell_lengths, self.distance_unit, Trajectory._distance_unit, inplace=True, ) if cell_lengths is None: cell_angles = None else: # Assume that its a rectilinear box cell_angles = 90.0 * np.ones_like(cell_lengths) if stride is None: stride = 1 time = (stride * np.arange(len(xyz))) + initial t = Trajectory(xyz=xyz, topology=topology, time=time) t.unitcell_lengths = cell_lengths t.unitcell_angles = cell_angles return t def read(self, n_frames=None, stride=None, atom_indices=None): """Read data from a mdcrd file Parameters ---------- n_frames : int, None The number of frames you would like to read from the file. If None, all of the remaining frames will be loaded. stride : np.ndarray, optional Read only every stride-th frame. atom_indices : array_like, optional If not none, then read only a subset of the atoms coordinates from the file. Returns ------- xyz : np.ndarray, shape=(n_frames, n_atoms, 3), dtype=np.float32 The cartesian coordinates, in angstroms cell_lengths : {np.ndarray, None} If the file contains unitcell lengths, they will be returned as an array of shape=(n_frames, 3). Otherwise, unitcell_angles will be None. """ if not self._mode == "r": raise ValueError( "read() is only available when file is opened " 'in mode="r"', ) if n_frames is None: frame_counter = itertools.count() else: frame_counter = range(n_frames) if stride is None: stride = 1 coords, boxes = [], [] for i in frame_counter: try: coord, box = self._read() if atom_indices is not None: coord = coord[atom_indices, :] except _EOF: break coords.append(coord) boxes.append(box) for j in range(stride - 1): # throw away these frames try: self._read() except _EOF: break coords = np.array(coords) if all(b is None for b in boxes): # if there was no box information in any frame, that's cool return coords, None if not all(b is not None for b in boxes): # but if some of them had box information and others didn't # that probably means there was a bug in the parsing. raise OSError( "Inconsistent box information. Try manually " "setting has_box? Your mdcrd file might be " "corrupt.", ) return coords, np.array(boxes, dtype=np.float32) def _read(self): "Read a single frame" i = 0 coords = np.empty(self._n_atoms * 3, dtype=np.float32) box = None while i < self._n_atoms * 3: line = self._fh.readline() self._line_counter += 1 if line == b"": raise _EOF() try: items = [float(line[j : j + 8]) for j in range(0, len(line.rstrip()), 8)] assert 0 < len(items) <= 10 except Exception: raise OSError( 'mdcrd parse error on line %d of "%s". This file ' "does not appear to be a valid mdcrd file." % (self._line_counter, self._filename), ) length = len(items) if i + length > len(coords): raise OSError( "mdcrd parse error: specified n_atoms (%d) is likely incorrect. " "Incorrect buffer size encountered on line=%d" % ( self._n_atoms, self._line_counter, ), ) coords[i : i + length] = items i += length if i == self._n_atoms * 3: if self._has_box is False: break # peek ahead for box here = self._fh.tell() line = self._fh.readline() peek = [float(elem) for elem in line.strip().split()] if len(peek) == 3: box = peek else: if self._has_box is True: raise OSError("Box information not found in file.") self._fh.seek(-len(line), 1) self._fh.seek(here) break self._frame_index += 1 return coords.reshape(self._n_atoms, 3), box def write(self, xyz, cell_lengths=None): """Write one or more frames of data to a mdcrd file Parameters ---------- xyz : np.ndarray, shape=(n_frames, n_atoms, 3) The cartesian coordinates of the atoms to write. By convention, the lengths should be in units of angstroms. cell_lengths : np.ndarray, shape=(n_frames, 3), dtype=float32, optional The length of the periodic box in each frame, in each direction, `a`, `b`, `c`. By convention the lengths should be in units of angstroms. """ if not self._mode == "w": raise ValueError( "write() is only available when file is opened " 'in mode="w"', ) xyz = ensure_type( xyz, np.float32, 3, "xyz", can_be_none=False, shape=(None, None, 3), warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) cell_lengths = ensure_type( cell_lengths, np.float32, 2, "cell_lengths", can_be_none=True, shape=(len(xyz), 3), warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) if self._w_has_box is None: # this is the first write() self._n_atoms = xyz.shape[1] comment = "TITLE : Created by MDTraj with %d atoms\n" % self._n_atoms comment = comment.encode("ascii") self._fh.write(comment) if cell_lengths is None: self._w_has_box = False else: self._w_has_box = True elif self._w_has_box is True: if cell_lengths is None: raise ValueError( "This mdcrd file must contain unitcell " "information", ) elif self._w_has_box is False: if cell_lengths is not None: raise ValueError( "This mdcrd file must not contain unitcell " "information", ) else: raise RuntimeError() for i in range(xyz.shape[0]): for j, coord in enumerate(xyz[i].reshape(-1)): lfdone = False out = "%8.3f" % coord if len(out) > 8: raise ValueError("Overflow error") out = out.encode("ascii") self._fh.write(out) if (j + 1) % 10 == 0: self._fh.write(b"\n") lfdone = True if not lfdone: self._fh.write(b"\n") if cell_lengths is not None: line = "{:8.3f} {:8.3f} {:8.3f}\n".format(*tuple(cell_lengths[i])) line = line.encode("ascii") self._fh.write(line) def seek(self, offset, whence=0): """Move to a new file position Parameters ---------- offset : int A number of frames. whence : {0, 1, 2} 0: offset from start of file, offset should be >=0. 1: move relative to the current position, positive or negative 2: move relative to the end of file, offset should be <= 0. Seeking beyond the end of a file is not supported """ if self._mode == "r": advance, absolute = None, None if whence == 0 and offset >= 0: if offset >= self._frame_index: advance = offset - self._frame_index else: absolute = offset elif whence == 1 and offset >= 0: advance = offset elif whence == 1 and offset < 0: absolute = offset + self._frame_index elif whence == 2 and offset <= 0: raise NotImplementedError("offsets from the end are not supported yet") else: raise OSError("Invalid argument") if advance is not None: for i in range(advance): self._read() # advance and throw away these frames elif absolute is not None: self._fh.close() self._fh = open(self._filename, "rb") self._fh.readline() # read comment self._frame_index = 0 self._line_counter = 1 for i in range(absolute): self._read() else: raise RuntimeError() else: raise NotImplementedError("offsets in write mode are not supported yet") def tell(self): """Current file position Returns ------- offset : int The current frame in the file. """ return int(self._frame_index) def __len__(self): "Number of frames in the file" raise NotImplementedError()