Source code for implicit_solvent_ddm.runner

import os
import re
from email import message
from importlib.metadata import files
from pathlib import Path
from sre_constants import ANY
from typing import Optional, Type, TypedDict, Union

import pandas as pd
from matplotlib.backend_bases import key_press_handler
from toil.batchSystems import abstractBatchSystem
from toil.job import FileID, Job, JobFunctionWrappingJob, PromisedRequirement

from implicit_solvent_ddm.config import Config
from implicit_solvent_ddm.postTreatment import create_mdout_dataframe
from implicit_solvent_ddm.restraints import RestraintMaker
from implicit_solvent_ddm.simulations import Simulation
from itertools import islice
from numba import cuda


def chunked(iterable, size):
    """Yield successive chunks from iterable of given size."""
    it = iter(iterable)
    return iter(lambda: list(islice(it, size)), [])

def get_gpu_count():
    return len(cuda.gpus)


[docs] class IntermidateRunner(Job): """ Manages and executes MD simulations and post-analysis steps for a given system phase. This runner class handles the full lifecycle for a batch of simulation jobs (e.g., for ligand, receptor, or complex), including: - Launching intermediate MD simulations (if not already run), - Collecting and caching output data (parsed from mdout), - Running post-analysis jobs (e.g., energy decomposition), - Supporting adaptive workflows and optional "post-only" modes. Parameters ---------- simulations : list of Simulation List of simulation objects to run. Each one encapsulates the inputs for a ligand, receptor, or complex phase. restraints : RestraintMaker Object that manages creation of restraint files or logic for the simulations. post_process_no_solv_mdin : FileID Input file for post-processing jobs that should exclude implicit solvent (e.g., for igb=6). post_process_mdin : FileID Standard input file for post-analysis energy evaluation (e.g., sander). post_process_distruct : str Directory structure key or identifier used to organize post-processing job outputs. post_only : bool If True, skips MD simulations and runs post-analysis only. config : Config Global configuration object for the workflow. adaptive : bool, optional Enables adaptive lambda window or restraint scheduling if True. loaded_dataframe : list, optional Tracks previously parsed output directories to avoid reprocessing. post_output : list or list of pd.DataFrame, optional Stores collected energy analysis output dataframes from post-processing. memory : int or str, optional Memory allocation for the job (used by the underlying workflow engine). cores : int or float or str, optional Number of CPU cores to request for each job. disk : int or str, optional Disk allocation for the job (used by the underlying workflow engine). preemptable : bool or int or str, optional Flag to mark the job as preemptable (depending on scheduler). unitName : str, optional Name for job unit (optional; used in workflow diagnostics). checkpoint : bool, optional If True, enables checkpointing of the job state. displayName : str, optional Custom name for the job (for logs/monitoring). descriptionClass : str, optional Optional tag or label for job type. Attributes ---------- post_output : list Contains parsed pandas DataFrames of energy terms from post-analysis jobs. ligand_output : list Placeholder list for ligand-specific outputs. receptor_output : list Placeholder list for receptor-specific outputs. complex_output : list Placeholder list for complex-specific outputs. _loaded_dataframe : list Tracks directories already parsed to prevent duplicate processing. Notes ----- - Each Simulation object is checked for output; if missing, MD is run first. - If `post_only` is True, only analysis is performed using available trajectories. - Uses `sander.MPI` for post-processing and `create_mdout_dataframe` for parsing outputs. - Designed for use in generalized workflows involving restraint-free energies or DDM. Returns ------- self : IntermidateRunner Returns itself to support chaining or retrieval in a workflow graph. """ # simulations: dict[Simulation, int]
[docs] def __init__( self, simulations: list[Simulation], restraints: RestraintMaker, post_process_no_solv_mdin: FileID, post_process_mdin: FileID, post_process_distruct: str, post_only: bool, config: Config, adaptive: bool = False, loaded_dataframe: list = [], post_output: Union[list, list[pd.DataFrame]] = [], memory: Optional[Union[int, str]] = None, cores: Optional[Union[int, float, str]] = None, disk: Optional[Union[int, str]] = None, preemptable: Optional[Union[bool, int, str]] = None, unitName: Optional[str] = "", checkpoint: Optional[bool] = False, displayName: Optional[str] = "", descriptionClass: Optional[str] = None, ) -> None: super().__init__( memory, cores, disk, accelerators=None, preemptible="false", unitName=unitName, checkpoint=checkpoint, displayName=displayName, ) self.simulations = simulations self.restraints = restraints self.no_solvent_mdin = post_process_no_solv_mdin self.mdin = post_process_mdin self.post_only = post_only self.config = config self.adaptive = adaptive self.post_output = post_output self.ligand_output = [] self.receptor_output = [] self.complex_output = [] self._loaded_dataframe = loaded_dataframe self.post_process_distruct = post_process_distruct
def run(self, fileStore): """ Submits molecular dynamics (MD) or post-processing jobs based on configuration. This method checks whether to run MD simulations (`post_only=False`) or to perform post-analysis on previously completed MD runs (`post_only=True`). In MD mode, all simulations in `self.simulations` are submitted as Toil child jobs. In post-only mode, it checks if the corresponding MD outputs exist, then launches energy analysis jobs using `sander.MPI` and appends the results. Parameters ---------- fileStore : toil.job.FileStore A Toil file store object for handling file access, logging, and output exporting within the job store environment. Returns ------- self : IntermidateRunner The current job instance, after scheduling MD or post-analysis jobs. Notes ----- - When `post_only` is True, this function will skip simulations for which MD output is missing or incomplete. - Trajectories from MD runs are imported into the job store before being passed to post-processing steps. - Simulations with `state_label == "no_flat_bottom"` are ignored entirely. - This function should be run twice in a complete workflow: first to schedule MD, then again with `post_only=True` to schedule analysis jobs after MD has completed. """ fileStore.logToMaster(f"IntermidateRunner: Running a total of {len(self.simulations)} simulations") fileStore.logToMaster(f"post only is {self.post_only}") gpu_jobs = [] cpu_jobs = [] # Separate GPU and non-GPU simulations for simulation in self.simulations: if simulation.directory_args.get("state_label") == "no_flat_bottom": continue if self.post_only: # Post-analysis logic if self._check_mdout(simulation) or simulation.inptraj is not None: if simulation.inptraj is None: fileStore.logToMaster(f"Importing MD traj from: {simulation.output_dir}") simulation.inptraj = [ fileStore.import_file( "file://" + self._get_md_traj(simulation, fileStore) ) ] self.only_post_analysis( completed_sim=simulation, md_traj=simulation.inptraj, fileStore=fileStore ) else: fileStore.logToMaster( f"[WARNING] Expected MD output missing for {simulation.output_dir}, skipping post-analysis." ) else: # MD execution logic if self._check_mdout(simulation) or simulation.inptraj is not None: fileStore.logToMaster(f"[SKIP] MD already complete for: {simulation.output_dir}") continue fileStore.logToMaster(f"Running MD for: {simulation.output_dir}") # Separate GPU and CPU jobs if simulation.CUDA: gpu_jobs.append(simulation) else: cpu_jobs.append(simulation) # Distribute GPU jobs across available devices if gpu_jobs: num_gpus = get_gpu_count() fileStore.logToMaster(f"Detected {num_gpus} GPUs") fileStore.logToMaster(f"Total GPU jobs: {len(gpu_jobs)}") # Group jobs by GPU ID for sequential execution per GPU gpu_batches = {} for i, sim in enumerate(gpu_jobs): gpu_id = i % num_gpus sim.env = os.environ.copy() sim.env["CUDA_VISIBLE_DEVICES"] = str(gpu_id) if gpu_id not in gpu_batches: gpu_batches[gpu_id] = [] gpu_batches[gpu_id].append(sim) # Start one job per GPU, then chain the rest sequentially per GPU for gpu_id, jobs in gpu_batches.items(): if jobs: fileStore.logToMaster(f"GPU {gpu_id}: {len(jobs)} jobs") # Start first job in this GPU batch current_job = jobs[0] self.addChild(current_job) # Chain remaining jobs for this GPU (sequential execution) for next_job in jobs[1:]: current_job.addFollowOn(next_job) current_job = next_job # Submit CPU-only jobs in parallel for sim in cpu_jobs: self.addChild(sim) return self def only_post_analysis(self, completed_sim: Simulation, md_traj, fileStore): """ Run post-analysis calculations on a completed MD simulation. This function schedules post-processing jobs that analyze energy terms using existing trajectory and restart files. Parameters ---------- completed_sim : Simulation A `Simulation` object representing the completed MD run to be analyzed. This provides context such as working directories and system parameters. md_traj : str Path to the input trajectory file (`.nc`, `.dcd`, etc.) generated by the completed MD simulation. fileStore : FileStore-like An object used for logging and managing output and job submission context (e.g., within a workflow engine). Returns ------- None All results are appended to `self.post_output` and tracked via `self._loaded_dataframe`. Notes ----- - If analysis output (`simulation_mdout.parquet.gzip`) already exists and is cached, the job is skipped. - Post-analysis jobs are scheduled using `sander.MPI`, and energy data is parsed into pandas DataFrames. - This method does not perform MD; it assumes all dynamics are already complete. """ fileStore.logToMaster("RUNNING POST only\n") fileStore.logToMaster(f"loaded dataframe: {self._loaded_dataframe}") for post_simulation in self.simulations: directory_args = post_simulation.directory_args.copy() #fileStore.logToMaster(f"directory args before update: {directory_args}\n") # fileStore.logToMaster(f"args {completed_sim.directory_args} & {md_traj}") directory_args.update(self.update_postprocess_dirstruct(completed_sim.directory_args)) # type: ignore #fileStore.logToMaster(f"directory args after update: {directory_args}\n") mdin = self.mdin if post_simulation.directory_args["igb_value"] == 6: mdin = self.no_solvent_mdin # run simulation if its not endstate with endstate post_dirstruct = self.get_system_dirs(post_simulation.system_type) fileStore.logToMaster(f"post dirstruct {post_dirstruct}\n") post_process_job = Simulation( executable="sander.MPI", mpi_command=post_simulation.mpi_command, #mpi_command=post_simulation.mpi_command, num_cores=1, CUDA=False, prmtop=post_simulation.prmtop, incrd=post_simulation.incrd, input_file=mdin, restraint_file=post_simulation.restraint_file, working_directory=post_simulation.working_directory, directory_args=directory_args, dirstruct=post_dirstruct, inptraj=md_traj, post_analysis=True, restraint_key=post_simulation.restraint_key, sim_debug=True, ) if completed_sim.directory_args["runtype"] == "lambda_window": fileStore.logToMaster(f"COMPLETED MD simulation of lambda window") fileStore.logToMaster( f"Using trajectory from {completed_sim.output_dir}\n" ) if not "simulation_mdout.parquet.gzip" in os.listdir( post_process_job.output_dir ): fileStore.logToMaster( f"simulations_mdout.parquet is not found in {post_process_job.output_dir}\n" ) fileStore.logToMaster( f"RUNNING post analysis with inptraj trajecory: {md_traj}" ) # fileStore.logToMaster( # f"output postProces: {post_process_job.output_dir}" # ) fileStore.logToMaster( f"State potential energy {post_simulation.directory_args['state_label']}" ) # fileStore.logToMaster(f"state args: {post_simulation.directory_args}") self.addChild(post_process_job) data_frame = post_process_job.addFollowOnJobFn( create_mdout_dataframe, post_process_job.directory_args, post_process_job.dirstruct, post_process_job.output_dir, ) self.post_output.append(data_frame.rv()) elif post_process_job.output_dir in self._loaded_dataframe: fileStore.logToMaster(f"Energy post-analysis already completed and already loaded the results") fileStore.logToMaster( f"Already loaded the Energy post-analysis results in the directory {post_process_job.output_dir}\n" ) continue else: if post_simulation.directory_args["state_label"] == "lambda_window": fileStore.logToMaster(f"Energy post-analysis already completed and loading the results") fileStore.logToMaster( f"Loading the Energy post-analysis results in the directory {post_process_job.output_dir}\n" ) self.post_output.append( pd.read_parquet( os.path.join( post_process_job.output_dir, "simulation_mdout.parquet.gzip" ), ) ) self._loaded_dataframe.append(post_process_job.output_dir) def _add_complex_simulation( self, conformational, orientational, mdin, restraint_file, charge=1.0, charge_parm=None, gb_extdiel=None, ): con_force = float(round(conformational, 3)) orient_force = float(round(orientational, 3)) dirs_args = ( { "topology": self.config.endstate_files.complex_parameter_filename, "state_label": "lambda_window", "extdiel": 78.5, "charge": charge, "igb": f"igb_{self.config.intermediate_args.igb_solvent}", "igb_value": self.config.intermediate_args.igb_solvent, "conformational_restraint": con_force, "orientational_restraints": orient_force, "filename": f"state_8_{con_force}_{orient_force}_prod", "runtype": f"Running restraint window. Conformational restraint: {con_force} and orientational restraint: {orient_force}", "topdir": self.config.system_settings.top_directory_path, }, ) parm_file = self.config.endstate_files.complex_parameter_filename # scaling GB external dielectric if gb_extdiel is not None: # prmtop was create with charge = 0 parm_file = charge_parm.rv() # type: ignore dirs_args[0].update({"state_label": "gb_dielectric"}) dirs_args[0].update({"extdiel": gb_extdiel}) dirs_args[0].update({"charge": charge}) # scaling ligand charge windows elif charge_parm is not None: parm_file = charge_parm.rv() dirs_args[0].update({"state_label": "electrostatics"}) # type: ignore # scaling restraint windows else: restraint_file = restraint_file.rv() new_job = Simulation( executable=self.config.system_settings.executable, mpi_command=self.config.system_settings.mpi_command, num_cores=self.config.num_cores_per_system.complex_ncores, CUDA=False, prmtop=parm_file, incrd=self.config.inputs["endstate_complex_lastframe"], input_file=mdin, restraint_file=restraint_file, working_directory=self.config.system_settings.working_directory, system_type="complex", directory_args=dirs_args[0], dirstruct="dirstruct_halo", ) self.simulations.append(new_job) def _add_ligand_simulation( self, conformational, mdin, restraint_file, charge=1.0, charge_parm=None, ): con_force = float(round(conformational, 3)) dirs_args = ( { "topology": self.config.endstate_files.ligand_parameter_filename, "state_label": "lambda_window", "conformational_restraint": con_force, "igb": f"igb_{self.config.intermediate_args.igb_solvent}", "extdiel": 78.5, "charge": charge, "igb_value": self.config.intermediate_args.igb_solvent, "filename": f"state_2_{con_force}_prod", "runtype": f"Running restraint window, Conformational restraint: {con_force}", "topdir": self.config.system_settings.top_directory_path, }, ) parm_file = self.config.endstate_files.ligand_parameter_filename if charge_parm is not None: parm_file = charge_parm.rv() dirs_args[0].update({"state_label": "electrostatics"}) # type: ignore dirs_args[0].update({"igb": "igb_6"}) dirs_args[0].update({"filename": "state_4_prod"}) dirs_args[0].update({"extdiel": 0.0}) dirs_args[0].update( { "runtype": f"Scailing ligand charges: {charge}", } ) else: restraint_file = restraint_file.rv() new_job = Simulation( executable=self.config.system_settings.executable, mpi_command=self.config.system_settings.mpi_command, num_cores=self.config.num_cores_per_system.ligand_ncores, CUDA=self.config.system_settings.CUDA, prmtop=parm_file, incrd=self.config.inputs["ligand_endstate_frame"], input_file=mdin, restraint_file=restraint_file, working_directory=self.config.system_settings.working_directory, system_type="ligand", directory_args=dirs_args[0], dirstruct="dirstruct_apo", ) self.simulations.append(new_job) def _add_receptor_simulation(self, conformational, mdin, restraint_file): con_force = float(round(conformational, 3)) new_job = Simulation( executable=self.config.system_settings.executable, mpi_command=self.config.system_settings.mpi_command, num_cores=self.config.num_cores_per_system.receptor_ncores, CUDA=self.config.system_settings.CUDA, prmtop=self.config.endstate_files.receptor_parameter_filename, incrd=self.config.inputs["receptor_endstate_frame"], input_file=mdin, restraint_file=restraint_file.rv(), working_directory=self.config.system_settings.working_directory, system_type="receptor", directory_args={ "topology": self.config.endstate_files.receptor_parameter_filename, "state_label": "lambda_window", "extdiel": 78.5, "charge": 1.0, "igb": f"igb_{self.config.intermediate_args.igb_solvent}", "igb_value": self.config.intermediate_args.igb_solvent, "conformational_restraint": con_force, "filename": f"state_2_{con_force}_prod", "runtype": f"Running restraint window, Conformational restraint: {con_force}", "topdir": self.config.system_settings.top_directory_path, }, dirstruct="dirstruct_apo", ) self.simulations.append(new_job) @classmethod def new_runner( cls: Type["IntermidateRunner"], config: Config, obj: dict, ): return cls( simulations=obj["simulations"], restraints=obj["restraints"], config=config, post_process_distruct=obj["post_process_distruct"], post_process_no_solv_mdin=config.inputs["post_nosolv_mdin"], post_process_mdin=config.inputs["post_mdin"], adaptive=True, post_only=True, post_output=obj["post_output"], loaded_dataframe=obj["_loaded_dataframe"], ) @staticmethod def _get_md_traj(simulation: Simulation, fileStore): """Return an absolute path to completed AMBER (.nc) trajectory filename. Parameters ---------- simulation: Simulation Simulation class object which contains all required MD input arguments. fileStore: job.fileStore Toil interface to read and write files. Returns ------- Filepath to AMBER trajectory (.nc) file. """ return os.path.join( simulation.output_dir, list( filter( lambda file: re.match(r"^.*\.nc$", file), os.listdir(simulation.output_dir), ) )[0], ) @staticmethod def _check_mdout(simulation: Simulation) -> bool: if "mdout" in os.listdir(simulation.output_dir): for line in reversed( open(os.path.join(simulation.output_dir, "mdout")).readlines() ): if "Final Performance Info" in line: return True return False @staticmethod def update_postprocess_dirstruct( run_time_args: dict, ) -> dict[str, Union[str, object]]: if "orientational_restraints" in run_time_args.keys(): return { "traj_state_label": run_time_args["state_label"], "trajectory_restraint_conrest": run_time_args[ "conformational_restraint" ], "trajectory_restraint_orenrest": run_time_args[ "orientational_restraints" ], "traj_extdiel": run_time_args["extdiel"], "traj_igb": run_time_args["igb"], "traj_charge": run_time_args["charge"], "filename": f"{run_time_args['filename']}_postprocess", } return { "traj_state_label": run_time_args["state_label"], "trajectory_restraint_conrest": run_time_args["conformational_restraint"], "traj_igb": run_time_args["igb"], "traj_extdiel": run_time_args["extdiel"], "traj_charge": run_time_args["charge"], "filename": f"{run_time_args['filename']}_postprocess", } @staticmethod def get_system_dirs(system_type): if system_type == "ligand" or system_type == "receptor": return "post_process_apo" return "post_process_halo"
# /nas0/ayoub/sampl9_runs/sampl9_extend_windows_diel/WP6_G2_Hmass/lambda_window/1.0/78.5/-0.2857142857142864/3.7142857142857135/WP6_G2_Hmass_state_8_0.8203353560076375_13.1253656961222_prod_traj.nc