import argparse
import logging
import os
import sys
import warnings
import numpy as np
from intermol.gromacs import gromacs_driver
from intermol.lammps import lammps_driver
import intermol.tests
# Make a global logging object.
from intermol.tests.testing_tools import which
logger = logging.getLogger('InterMolLog')
if __name__ == "__main__":
# Specifies lowest severity log messages to handle.
logger.setLevel(logging.DEBUG)
h = logging.StreamHandler()
h.setLevel(logging.INFO) # Ignores DEBUG level for now.
f = logging.Formatter("%(levelname)s %(asctime)s %(message)s",
"%Y-%m-%d %H:%M:%S")
h.setFormatter(f)
logger.addHandler(h)
# Redirect warnings module messages to logging system.
logging.captureWarnings(True)
warning_logger = logging.getLogger('py.warnings')
warning_logger.addHandler(h)
[docs]def parse_args(args):
parser = argparse.ArgumentParser(description='Perform a file conversion')
# Input arguments.
group_in = parser.add_argument_group('Choose input conversion format')
group_in.add_argument('--des_in', nargs=1, metavar='file',
help='.cms file for conversion from DESMOND file format')
group_in.add_argument('--gro_in', nargs=2, metavar='file',
help='.gro and .top file for conversion from GROMACS file format')
group_in.add_argument('--lmp_in', nargs=1, metavar='file',
help='input file for conversion from LAMMPS file format (expects'
' data file in same directory and a read_data call)')
# Output arguments.
group_out = parser.add_argument_group('Choose output conversion format(s)')
group_out.add_argument('--desmond', action='store_true',
help='convert to DESMOND')
group_out.add_argument('--gromacs', action='store_true',
help='convert to GROMACS')
group_out.add_argument('--lammps', action='store_true',
help='convert to LAMMPS')
# Other options.
group_misc = parser.add_argument_group('Other optional arguments')
group_misc.add_argument('--odir', metavar='directory', default='.',
help='specification of output directory (default: ./)')
group_misc.add_argument('--oname', metavar='prefix', default='',
help='specification of prefix for output filenames '
'(default: inputprefix_converted)')
group_misc.add_argument('-e', '--energy', dest='energy', action='store_true',
help='evaluate energy of input and output files for comparison')
group_misc.add_argument('--efile', default='',
help='optional run file for energy evaluation (e.g. .cfg, .mdp, .input)')
group_misc.add_argument('-dp', '--despath', dest='despath',
metavar='path', default='',
help='path for DESMOND binary, needed for energy evaluation')
group_misc.add_argument('-gp', '--gropath', dest='gropath',
metavar='path', default='',
help='path for GROMACS binary, needed for energy evaluation')
group_misc.add_argument('-lp', '--lmppath', dest='lmppath',
metavar='path', default='lmp_openmpi',
help='path for LAMMPS binary, needed for energy evaluation')
group_misc.add_argument('-f', '--force', dest='force', action='store_true',
help='ignore warnings (NOTE: may lead to partially correct topologies)')
group_misc.add_argument('-v', '--verbose', dest='verbose', action='store_true',
help='high verbosity, includes DEBUG level output')
# Prints help if no arguments given.
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
return parser.parse_args(args)
[docs]def main(args=None):
logger.info('Beginning InterMol conversion')
if not args:
args = vars(parse_args(args))
if args.get('verbose'):
h.setLevel(logging.DEBUG)
# Print warnings.
warnings.simplefilter("always")
if not args.get('force'):
# Warnings will be treated as exceptions unless force flag is used.
warnings.simplefilter("error")
gropath = args.get('gropath')
if not gropath:
gropath = ''
lmppath = args.get('lmppath')
if not lmppath:
for exe in ['lmp_mpi', 'lmp_openmpi']:
if which(exe):
lmppath = exe
else:
logger.exception('Found no LAMMPS executable.')
# --------------- PROCESS INPUTS ----------------- #
if args.get('gro_in'):
gromacs_files = args['gro_in']
prefix = os.path.splitext(os.path.basename(gromacs_files[0]))[0]
# Find the top file since order of inputs is not enforced.
top_in = [x for x in gromacs_files if x.endswith('.top')]
assert(len(top_in) == 1)
top_in = top_in[0]
# Find the gro file since order of inputs is not enforced.
gro_in = [x for x in gromacs_files if x.endswith('.gro')]
assert(len(gro_in) == 1)
gro_in = gro_in[0]
system = gromacs_driver.read_file(top_in, gro_in, gropath)
elif args.get('lmp_in'):
lammps_file = args['lmp_in']
prefix = os.path.splitext(os.path.basename(lammps_file))[0]
system = lammps_driver.read_file(in_file=lammps_file)
else:
logger.error('No input file')
sys.exit(1)
# --------------- WRITE OUTPUTS ----------------- #
if not args.get('oname'):
oname = '{0}_converted'.format(prefix)
else:
oname = args['oname']
oname = os.path.join(args['odir'], oname) # Prepend output directory to oname.
output_status = dict()
# TODO: factor out exception handling
if args.get('gromacs'):
try:
gromacs_driver.write_file(system,
'{0}.top'.format(oname), '{0}.gro'.format(oname))
except Exception as e:
logger.exception(e)
output_status['gromacs'] = e
else:
output_status['gromacs'] = 0
if args.get('lammps'):
try:
lammps_driver.write_file('{0}.input'.format(oname), system)
except Exception as e:
logger.exception(e)
output_status['lammps'] = e
else:
output_status['lammps'] = 0
# --------------- ENERGY EVALUATION ----------------- #
if args.get('energy'):
# Run control file paths.
tests_path = os.path.dirname(intermol.tests.__file__)
# Required for all gromacs conversions.
mdp_path = os.path.join(tests_path, 'gromacs', 'grompp.mdp')
# Evaluate input energies.
if args.get('gro_in'):
input_type = 'gromacs'
tests_path = os.path.dirname(intermol.tests.__file__)
mdp_path = os.path.join(tests_path, 'gromacs', 'grompp.mdp')
e_in, e_infile = gromacs_driver.gromacs_energies(top_in, gro_in,
mdp_path, gropath, '')
else:
logger.warn('Code should have never made it here!')
# Evaluate output energies.
output_type = []
e_outfile = []
e_out = []
if args.get('gromacs') and output_status['gromacs'] == 0:
output_type.append('gromacs')
try:
out, outfile = gromacs_driver.gromacs_energies(
'{0}.top'.format(oname), '{0}.gro'.format(oname),
mdp_path, gropath, '')
except Exception as e:
output_status['gromacs'] = e
logger.exception(e)
e_out.append(-1)
e_outfile.append(-1)
else:
output_status['gromacs'] = get_diff(e_in, out)
e_out.append(out)
e_outfile.append(outfile)
if args.get('lammps') and output_status['lammps'] == 0:
output_type.append('lammps')
try:
out, outfile = lammps_driver.lammps_energies(
'{0}.input'.format(oname), lmppath)
except Exception as e:
output_status['lammps'] = e
logger.exception(e)
e_out.append(-1)
e_outfile.append(-1)
else:
output_status['lammps'] = get_diff(e_in, out)
e_out.append(out)
e_outfile.append(outfile)
# Display energy comparison results.
out = ['InterMol Conversion Energy Comparison Results','']
out.append('{0} input energy file: {1}'.format(input_type, e_infile))
for out_type, file in zip(output_type, e_outfile):
out.append('{0} output energy file: {1}'.format(out_type, file))
out += summarize_energy_results(e_in, e_out, input_type, output_type)
logger.info('\n'.join(out))
logger.info('Finished!')
return output_status
[docs]def get_diff(e_in, e_out):
"""Returns difference in potential energy.
arguments:
e_in - dictionary of energy groups from input file
e_out - dictionary of energy groups from output file
returns:
potential energy difference in units of the input
"""
type = 'Potential'
input = e_in[type]
diff = e_out[type].in_units_of(input.unit) - input
return diff._value
[docs]def find_match(key, dictionary, unit):
"""Helper function for `summarize_energy_results`. """
if key in dictionary:
return dictionary[key].value_in_unit(unit)
else:
return np.nan
[docs]def summarize_energy_results(energy_input, energy_outputs, input_type, output_types):
"""Creates a table comparing input and output energy groups.
Args:
energy_input (dict): energy groups from input file
energy_output(list): containing dictionary of energy groups or -1 for
each output file
input_type (str): input engine
output_types (list): containing output formats
Returns:
out (list of strings): which forms a summary table using "\n".join(out)
"""
out = []
# remove failed evaluations (-1 in energy_outputs)
failed_i = [i for i, x in enumerate(energy_outputs) if x == -1]
failed = [output_types[i] for i in failed_i]
output_types = [x for i, x in enumerate(output_types) if i not in failed_i]
energy_outputs = [x for x in energy_outputs if x != -1]
# find all distinct labels
labels = set(energy_input.keys())
for e_out in energy_outputs:
for key, value in e_out.iteritems():
labels.add(key)
# set up energy comparison table
labels = list(labels)
unit = energy_input[energy_input.keys()[0]].unit
energy_all = [energy_input] + energy_outputs
data = np.empty((len(labels), len(energy_all)))
for i in range(len(data)):
for j in range(len(energy_all)):
data[i, j] = find_match(labels[i], energy_all[j], unit)
# TODO: sort table
out.append('')
out.append('Energy group summary')
out.append('=======================================================================')
header = '%20s %18s ' % ('type', 'input (%s)' % input_type)
for otype in output_types:
header += '%37s' %('output (%s) diff (%s)' % (otype, otype))
out.append(header)
for i in range(len(data)):
line = '%20s ' % labels[i]
line += '%18.8f ' % data[i][0]
for j in range(1, len(data[i])):
line += '%18.8f %18.8f' % (data[i][j], data[i][j]-data[i][0])
out.append(line)
out.append('')
# get differences in potential energy
i = labels.index('Potential')
diff = data[i, 1::] - data[i, 0]
for d, otype in zip(diff, output_types):
out.append('difference in potential energy from %s=>%s conversion: %18.8f'
% (input_type, otype, d))
for f in failed:
out.append('energy comparison for {0} output failed'.format(f))
out.append('=======================================================================')
return out
if __name__ == '__main__':
main()