Source code for wetting_angle_kit.parser.parser_dump

from __future__ import annotations

import logging
from typing import List, Optional, Sequence, Tuple

import numpy as np

from .base_parser import BaseParser

logger = logging.getLogger(__name__)


[docs] class DumpParser(BaseParser): """LAMMPS dump trajectory parser. Parameters ---------- filepath : str Path to LAMMPS dump file. """ def __init__(self, filepath: str): """Initialize LAMMPS dump parser via OVITO pipeline.""" try: from ovito.io import import_file from ovito.modifiers import ComputePropertyModifier except ImportError as e: raise ImportError( "The 'ovito' package is required for DumpParser. Install with: " "pip install wetting_angle_kit[ovito]" ) from e self.filepath = filepath self.pipeline = import_file(self.filepath) self.pipeline.modifiers.append( ComputePropertyModifier(expressions=["1"], output_property="Unity") ) self.num_frames = self.pipeline.source.num_frames
[docs] def parse(self, frame_index: int, indices: np.ndarray | None = None) -> np.ndarray: """Compute and return particle positions for a single frame, with optional filtering by particle indices. Parameters ---------- frame_index : int Frame index. indices : ndarray, optional Atom indices to select; if None return all atoms. Returns ------- ndarray, shape (M, 3) Particle coordinates. """ data = self.pipeline.compute(frame_index) x_par = np.asarray(data.particles["Position"]) particle_ids = np.asarray(data.particles["Particle Identifier"]) if indices is not None: mask = np.isin(particle_ids, indices) x_par = x_par[mask] return x_par
[docs] def get_profile_coordinates( self, frame_indices: Sequence[int], droplet_geometry: str = "cylinder_y", atom_indices: Optional[Sequence[int]] = None, ) -> Tuple[np.ndarray, np.ndarray, int]: """ Compute 2D projection coordinates (r, z) for contact angle analysis. Projects 3D atomic positions onto a 2D plane based on the assumed droplet geometry and simulation box boundaries. Parameters ---------- frame_indices : Sequence[int] List of frames to process. droplet_geometry : str, default 'cylinder_y' The physical shape of the water droplet in the simulation box: * 'cylinder_y': A hemi-cylindrical droplet aligned along the Y-axis. (Returns x as the radial coordinate). * 'cylinder_x': A hemi-cylindrical droplet aligned along the X-axis. (Returns y as the radial coordinate). * 'spherical': A spherical cap droplet. (Returns sqrt(x^2 + y^2) as the radial coordinate). atom_indices : Sequence[int], optional Subset of atom indices to include (e.g., only liquid atoms). Returns ------- r_values : np.ndarray The lateral/radial distances from the droplet center/axis. z_values : np.ndarray The vertical coordinates (height) of the atoms. n_frames : int Number of frames processed. """ r_values = np.array([]) z_values = np.array([]) for frame_idx in frame_indices: x_par = self.parse(frame_idx, atom_indices) # dim = x_par.shape[1] x_cm = np.mean(x_par, axis=0) x_0 = x_par - x_cm x_0[:, 2] = x_par[:, 2] if droplet_geometry == "cylinder_y": r_frame = np.abs(x_0[:, 0] + 0.01) elif droplet_geometry == "cylinder_x": r_frame = np.abs(x_0[:, 1] + 0.01) else: # spherical r_frame = np.sqrt(x_0[:, 0] ** 2 + x_0[:, 1] ** 2) z_frame = x_0[:, 2] r_values = np.concatenate((r_values, r_frame)) z_values = np.concatenate((z_values, z_frame)) if frame_idx % 10 == 0: print(f"Frame: {frame_idx}\nCenter of Mass: {x_cm}") print(f"\nr range:\t({np.min(r_values)},{np.max(r_values)})") print(f"z range:\t({np.min(z_values)},{np.max(z_values)})") return r_values, z_values, len(frame_indices)
[docs] def box_size_y(self, frame_index: int) -> float: """Return y-dimension of simulation box.""" data = self.pipeline.compute(frame_index) y_vector = data.cell.matrix[1, :3] return float(np.linalg.norm(y_vector))
[docs] def box_size_x(self, frame_index: int) -> float: """Return x-dimension of simulation box.""" data = self.pipeline.compute(frame_index) x_vector = data.cell.matrix[0, :3] return float(np.linalg.norm(x_vector))
[docs] def box_length_max(self, frame_index: int) -> float: """Return the maximum dimension of the simulation box. Parameters ---------- frame_index : int Frame index. Returns ------- float Maximum box length. """ data = self.pipeline.compute(frame_index) y_vector = np.linalg.norm(data.cell.matrix[1, :3]) x_vector = np.linalg.norm(data.cell.matrix[0, :3]) z_vector = np.linalg.norm(data.cell.matrix[2, :3]) return float(np.max(np.array([y_vector, x_vector, z_vector])))
[docs] def frame_count(self) -> int: """Return the total number of frames in the trajectory. Returns ------- int Number of frames. """ return self.num_frames
[docs] class DumpWallParser: """LAMMPS dump file parser for extracting wall particle coordinates. Parameters ---------- filepath : str Path to LAMMPS dump file. liquid_particle_types : List[int] Type IDs of particles to exclude as liquid. """ def __init__(self, filepath: str, liquid_particle_types: List[int]): self.filepath = filepath self.liquid_particle_types = liquid_particle_types self.pipeline = self.load_dump_ovito()
[docs] def load_dump_ovito(self): try: from ovito.io import import_file from ovito.modifiers import ( ComputePropertyModifier, DeleteSelectedModifier, SelectTypeModifier, ) except ImportError as e: raise ImportError( "OVITO required. Install with: pip install wetting_angle_kit[ovito]" ) from e pipeline = import_file(self.filepath) pipeline.modifiers.append( SelectTypeModifier( property="Particle Type", types=self.liquid_particle_types ) ) pipeline.modifiers.append(DeleteSelectedModifier()) pipeline.modifiers.append( ComputePropertyModifier(expressions=["1"], output_property="Unity") ) return pipeline
[docs] def parse(self, frame_index): """Compute and return wall particle positions for a single frame. Parameters ---------- frame_index : int Frame index. Returns ------- np.ndarray Array of wall particle positions. """ data = self.pipeline.compute(frame_index) return np.asarray(data.particles["Position"])
[docs] def find_highest_wall_particle(self, frame_index: int) -> float: """Return the maximum z-coordinate among wall particles for a frame. Parameters ---------- frame_index : int Frame index. Returns ------- float Maximum z-coordinate. """ data = self.pipeline.compute(frame_index) x_wall = np.asarray(data.particles["Position"]) return float(np.max(x_wall[:, 2]))
[docs] def get_profile_coordinates( self, frame_indices: Sequence[int], droplet_geometry: str = "cylinder_y", atom_indices: Optional[Sequence[int]] = None, ) -> Tuple[np.ndarray, np.ndarray, int]: """ Compute 2D projection coordinates (r, z) for contact angle analysis. Projects 3D atomic positions onto a 2D plane based on the assumed droplet geometry and simulation box boundaries. Parameters ---------- frame_indices : Sequence[int] List of frames to process. droplet_geometry : str, default 'cylinder_y' The physical shape of the water droplet in the simulation box: * 'cylinder_y': A hemi-cylindrical droplet aligned along the Y-axis. (Returns x as the radial coordinate). * 'cylinder_x': A hemi-cylindrical droplet aligned along the X-axis. (Returns y as the radial coordinate). * 'spherical': A spherical cap droplet. (Returns sqrt(x^2 + y^2) as the radial coordinate). atom_indices : Sequence[int], optional Subset of atom indices to include (e.g., only liquid atoms). Returns ------- r_values : np.ndarray The lateral/radial distances from the droplet center/axis. z_values : np.ndarray The vertical coordinates (height) of the atoms. n_frames : int Number of frames processed. """ r_values = np.array([]) z_values = np.array([]) for frame_idx in frame_indices: data = self.pipeline.compute(frame_idx) x_par = np.asarray(data.particles["Position"]) if atom_indices is not None: # In DumpWallParser, we use Particle Identifier to filter # if indices are provided # But DumpWallParser seems to be designed to # exclude liquid types already. # However, for consistency with the interface: particle_ids = np.asarray(data.particles["Particle Identifier"]) mask = np.isin(particle_ids, atom_indices) x_par = x_par[mask] x_cm = np.mean(x_par, axis=0) x_0 = x_par - x_cm x_0[:, 2] = x_par[:, 2] if droplet_geometry == "cylinder_y": r_frame = np.abs(x_0[:, 0] + 0.01) elif droplet_geometry == "cylinder_x": r_frame = np.abs(x_0[:, 1] + 0.01) else: # spherical r_frame = np.sqrt(x_0[:, 0] ** 2 + x_0[:, 1] ** 2) z_frame = x_0[:, 2] r_values = np.concatenate((r_values, r_frame)) z_values = np.concatenate((z_values, z_frame)) if frame_idx % 10 == 0: print(f"Frame: {frame_idx}\nCenter of Mass: {x_cm}") print(f"\nr range:\t({np.min(r_values)},{np.max(r_values)})") print(f"z range:\t({np.min(z_values)},{np.max(z_values)})") return r_values, z_values, len(frame_indices)
[docs] def box_size_y(self, frame_index: int) -> float: """Return the y-dimension of the simulation box.""" data = self.pipeline.compute(frame_index) y_vector = data.cell.matrix[1, :3] return float(np.linalg.norm(y_vector))
[docs] def box_length_max(self, frame_index): # legacy name kept data = self.pipeline.compute(frame_index) y_vector = np.linalg.norm(data.cell.matrix[1, :3]) x_vector = np.linalg.norm(data.cell.matrix[0, :3]) z_vector = np.linalg.norm(data.cell.matrix[2, :3]) return np.max(np.array([y_vector, x_vector, z_vector]))
[docs] def frame_count(self) -> int: """Return total number of frames.""" return self.pipeline.source.num_frames
[docs] class DumpWaterMoleculeFinder: """Identify water oxygen atoms in a parsed LAMMPS trajectory.""" def __init__( self, filepath: str, particle_type_wall: set, oxygen_type: int = 3, hydrogen_type: int = 2, oh_cutoff: float = 1.2, ): """Initialize water molecule finder with OVITO pipeline.""" self.filepath = filepath self.particle_type_wall = particle_type_wall self.oxygen_type = oxygen_type self.hydrogen_type = hydrogen_type self.oh_cutoff = oh_cutoff self.pipeline = self._setup_pipeline() def _setup_pipeline(self): """Setup OVITO pipeline for water molecule detection.""" try: from ovito.io import import_file from ovito.modifiers import ( ComputePropertyModifier, CoordinationAnalysisModifier, ) except ImportError as e: raise ImportError( "OVITO required for water detection. Install: pip install " "wetting_angle_kit[ovito]" ) from e pipeline = import_file(self.filepath) pipeline.modifiers.append( CoordinationAnalysisModifier(cutoff=self.oh_cutoff, number_of_bins=200) ) expr = f"(ParticleType == {self.oxygen_type}) && (Coordination == 2)" pipeline.modifiers.append( ComputePropertyModifier(expressions=[expr], output_property="IsWaterOxygen") ) return pipeline
[docs] def get_water_oxygen_ids(self, frame_index: int) -> np.ndarray: """Return IDs of oxygen atoms belonging to water molecules.""" data = self.pipeline.compute(frame_index) if "IsWaterOxygen" in data.particles: mask = np.array(data.particles["IsWaterOxygen"].array) == 1 oxygen_indices = np.where(mask)[0] oxygen_ids = data.particles["Particle Identifier"][oxygen_indices] return oxygen_ids return self._manual_identification(data)
Dump_WaterMoleculeFinder = DumpWaterMoleculeFinder DumpParse_wall = DumpWallParser