From e603c9ce8102c85b1a7b019b13551d29c93cdca6 Mon Sep 17 00:00:00 2001 From: ThomSerg Date: Fri, 30 Jan 2026 14:16:36 +0100 Subject: [PATCH] Add IO module with readers and writers --- cpmpy/tools/io/__init__.py | 24 ++ cpmpy/tools/io/jsplib.py | 152 ++++++++ cpmpy/tools/io/nurserostering.py | 111 ++++++ cpmpy/tools/io/opb.py | 454 +++++++++++++++++++++++ cpmpy/tools/io/rcpsp.py | 175 +++++++++ cpmpy/tools/io/reader.py | 121 +++++++ cpmpy/tools/io/scip.py | 596 +++++++++++++++++++++++++++++++ cpmpy/tools/io/utils.py | 35 ++ cpmpy/tools/io/wcnf.py | 136 +++++++ cpmpy/tools/io/writer.py | 124 +++++++ 10 files changed, 1928 insertions(+) create mode 100644 cpmpy/tools/io/__init__.py create mode 100644 cpmpy/tools/io/jsplib.py create mode 100644 cpmpy/tools/io/nurserostering.py create mode 100644 cpmpy/tools/io/opb.py create mode 100644 cpmpy/tools/io/rcpsp.py create mode 100644 cpmpy/tools/io/reader.py create mode 100644 cpmpy/tools/io/scip.py create mode 100644 cpmpy/tools/io/utils.py create mode 100644 cpmpy/tools/io/wcnf.py create mode 100644 cpmpy/tools/io/writer.py diff --git a/cpmpy/tools/io/__init__.py b/cpmpy/tools/io/__init__.py new file mode 100644 index 000000000..ba8df9b78 --- /dev/null +++ b/cpmpy/tools/io/__init__.py @@ -0,0 +1,24 @@ +""" +IO tools for CPMpy. + +This module provides tools to read and write models in various formats. +Use the generic `read(..., format="...")` and `write(..., format="...")` functions to read and write +models in one of the supported formats. + +Some formats can be auto-detected from the file extension, so only a file path is required as argument. +""" + +from .writer import write, write_formats +from .reader import read, read_formats +from .utils import get_extension, get_format + +# Problem datasets +from .jsplib import read_jsplib +from .nurserostering import read_nurserostering +from .rcpsp import read_rcpsp + +# Model datasets +from .opb import read_opb, write_opb +from .scip import read_scip, write_scip +from .wcnf import read_wcnf +from ..xcsp3 import read_xcsp3 \ No newline at end of file diff --git a/cpmpy/tools/io/jsplib.py b/cpmpy/tools/io/jsplib.py new file mode 100644 index 000000000..7f1c13c1a --- /dev/null +++ b/cpmpy/tools/io/jsplib.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- +## +## jsplib.py +## +""" +Parser for the JSPLib format. + + +================= +List of functions +================= + +.. autosummary:: + :nosignatures: + + read_jsplib +""" + + +import os +import sys +import argparse +import cpmpy as cp +import numpy as np +from io import StringIO +from typing import Union + + +_std_open = open +def read_jsplib(jsp: Union[str, os.PathLike], open=open) -> cp.Model: + """ + Parser for JSPLib format. Reads in an instance and returns its matching CPMpy model. + + Arguments: + jsp (str or os.PathLike): + - A file path to a JSPlib file + - OR a string containing the JSPLib content directly + open: (callable): + If jsp is the path to a file, a callable to "open" that file (default=python standard library's 'open'). + + Returns: + cp.Model: The CPMpy model of the JSPLib instance. + """ + # If rcpsp is a path to a file -> open file + if isinstance(jsp, (str, os.PathLike)) and os.path.exists(jsp): + if open is not None: + f = open(jsp) + else: + f = _std_open(jsp, "rt") + # If rcpsp is a string containing a model -> create a memory-mapped file + else: + f = StringIO(jsp) + + + task_to_machines, task_durations = _parse_jsplib(f) + model, (start, makespan) = _model_jsplib(task_to_machines=task_to_machines, task_durations=task_durations) + return model + + +def _parse_jsplib(f): + """ + Parse a JSPLib instance file + Returns two matrices: + - task to machines indicating on which machine to run which task + - task durations: indicating the duration of each task + """ + + line = f.readline() + while line.startswith("#"): + line = f.readline() + n_jobs, n_tasks = map(int, line.strip().split(" ")) + matrix = np.fromstring(f.read(), sep=" ", dtype=int).reshape((n_jobs, n_tasks*2)) + + task_to_machines = np.empty(dtype=int, shape=(n_jobs, n_tasks)) + task_durations = np.empty(dtype=int, shape=(n_jobs, n_tasks)) + + for t in range(n_tasks): + task_to_machines[:, t] = matrix[:, t*2] + task_durations[:, t] = matrix[:, t*2+1] + + return task_to_machines, task_durations + + + +def _model_jsplib(task_to_machines, task_durations): + + task_to_machines = np.array(task_to_machines) + dur = np.array(task_durations) + + assert task_to_machines.shape == task_durations.shape + + n_jobs, n_tasks = task_to_machines.shape + + start = cp.intvar(0, task_durations.sum(), name="start", shape=(n_jobs,n_tasks)) # extremely bad upperbound... TODO + end = cp.intvar(0, task_durations.sum(), name="end", shape=(n_jobs,n_tasks)) # extremely bad upperbound... TODO + makespan = cp.intvar(0, task_durations.sum(), name="makespan") # extremely bad upperbound... TODO + + model = cp.Model() + model += start + dur == end + model += end[:,:-1] <= start[:,1:] # precedences + + for machine in set(task_to_machines.flat): + model += cp.NoOverlap(start[task_to_machines == machine], + dur[task_to_machines == machine], + end[task_to_machines == machine]) + + model += end <= makespan + model.minimize(makespan) + + return model, (start, makespan) + + + +def main(): + parser = argparse.ArgumentParser(description="Parse and solve a JSPLib model using CPMpy") + parser.add_argument("model", help="Path to a JSPLib file (or raw RCPSP string if --string is given)") + parser.add_argument("-s", "--solver", default=None, help="Solver name to use (default: CPMpy's default)") + parser.add_argument("--string", action="store_true", help="Interpret the first argument (model) as a raw JSPLib string instead of a file path") + parser.add_argument("-t", "--time-limit", type=int, default=None, help="Time limit for the solver in seconds (default: no limit)") + args = parser.parse_args() + + # Build the CPMpy model + try: + if args.string: + model = read_jsplib(args.model) + else: + model = read_jsplib(os.path.expanduser(args.model)) + except Exception as e: + sys.stderr.write(f"Error reading model: {e}\n") + sys.exit(1) + + # Solve the model + try: + if args.solver: + result = model.solve(solver=args.solver, time_limit=args.time_limit) + else: + result = model.solve(time_limit=args.time_limit) + except Exception as e: + sys.stderr.write(f"Error solving model: {e}\n") + sys.exit(1) + + # Print results + print("Status:", model.status()) + if result is not None: + if model.has_objective(): + print("Objective:", model.objective_value()) + else: + print("No solution found.") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cpmpy/tools/io/nurserostering.py b/cpmpy/tools/io/nurserostering.py new file mode 100644 index 000000000..89e292085 --- /dev/null +++ b/cpmpy/tools/io/nurserostering.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- +## +## nurserostering.py +## +""" +Parser for the Nurse Rostering format. + + +================= +List of functions +================= + +.. autosummary:: + :nosignatures: + + read_nurserostering +""" + + +import os +import sys +import argparse +import tempfile +import cpmpy as cp +from typing import Union + +from cpmpy.tools.dataset.nurserostering import ( + parse_scheduling_period, + nurserostering_model +) + + +_std_open = open +def read_nurserostering(instance: Union[str, os.PathLike], open=open) -> cp.Model: + """ + Parser for Nurse Rostering format. Reads in an instance and returns its matching CPMpy model. + + Arguments: + instance (str or os.PathLike): + - A file path to a Nurse Rostering file + - OR a string containing the Nurse Rostering content directly + open (callable): + If instance is the path to a file, a callable to "open" that file (default=python standard library's 'open'). + + Returns: + cp.Model: The CPMpy model of the Nurse Rostering instance. + """ + # If instance is a path to a file that exists -> use it directly + if isinstance(instance, (str, os.PathLike)) and os.path.exists(instance): + fname = instance + # If instance is a string containing file content -> write to temp file + else: + # Create a temporary file and write the content + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as tmp: + tmp.write(instance) + fname = tmp.name + + try: + # Use the existing parser from the dataset (expects a file path) + data = parse_scheduling_period(fname) + + # Create the CPMpy model using the existing model builder + model, _ = nurserostering_model(**data) + + return model + finally: + # Clean up temporary file if we created one + if isinstance(instance, str) and not os.path.exists(instance) and os.path.exists(fname): + os.unlink(fname) + + +def main(): + parser = argparse.ArgumentParser(description="Parse and solve a Nurse Rostering model using CPMpy") + parser.add_argument("model", help="Path to a Nurse Rostering file (or raw content string if --string is given)") + parser.add_argument("-s", "--solver", default=None, help="Solver name to use (default: CPMpy's default)") + parser.add_argument("--string", action="store_true", help="Interpret the first argument (model) as a raw Nurse Rostering string instead of a file path") + parser.add_argument("-t", "--time-limit", type=int, default=None, help="Time limit for the solver in seconds (default: no limit)") + args = parser.parse_args() + + # Build the CPMpy model + try: + if args.string: + model = read_nurserostering(args.model) + else: + model = read_nurserostering(os.path.expanduser(args.model)) + except Exception as e: + sys.stderr.write(f"Error reading model: {e}\n") + sys.exit(1) + + # Solve the model + try: + if args.solver: + result = model.solve(solver=args.solver, time_limit=args.time_limit) + else: + result = model.solve(time_limit=args.time_limit) + except Exception as e: + sys.stderr.write(f"Error solving model: {e}\n") + sys.exit(1) + + # Print results + print("Status:", model.status()) + if result is not None: + if model.has_objective(): + print("Objective:", model.objective_value()) + else: + print("No solution found.") + +if __name__ == "__main__": + main() + diff --git a/cpmpy/tools/io/opb.py b/cpmpy/tools/io/opb.py new file mode 100644 index 000000000..949ad4f44 --- /dev/null +++ b/cpmpy/tools/io/opb.py @@ -0,0 +1,454 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- +## +## opb.py +## +""" +OPB parser. + +Currently only the restricted OPB PB24 format is supported (without WBO). + + +================= +List of functions +================= + +.. autosummary:: + :nosignatures: + + read_opb +""" + + +import os +import re +import sys +import argparse +from io import StringIO +from typing import Union +from functools import reduce +from operator import mul + + +import cpmpy as cp +from cpmpy.transformations.normalize import toplevel_list,simplify_boolean +from cpmpy.transformations.safening import no_partial_functions, safen_objective +from cpmpy.transformations.decompose_global import decompose_in_tree, decompose_objective +from cpmpy.transformations.flatten_model import flatten_constraint, flatten_objective +from cpmpy.transformations.reification import only_implies, only_bv_reifies +from cpmpy.transformations.linearize import linearize_constraint, only_positive_bv_wsum +from cpmpy.transformations.int2bool import int2bool, _encode_lin_expr +from cpmpy.transformations.get_variables import get_variables +from cpmpy.expressions.variables import _IntVarImpl, NegBoolView, _BoolVarImpl +from cpmpy.expressions.core import Operator, Comparison +from cpmpy import __version__ + + +# Regular expressions +HEADER_RE = re.compile(r'(.*)\s*#variable=\s*(\d+)\s*#constraint=\s*(\d+).*') +TERM_RE = re.compile(r"([+-]?\d+)((?:\s+~?x\d+)+)") +OBJ_TERM_RE = re.compile(r'^min:') +IND_TERM_RE = re.compile(r'([>=|<=|=]+)\s+([+-]?\d+)') +IND_TERM_RE = re.compile(r'(>=|<=|=)\s*([+-]?\d+)') + + +def _parse_term(line, vars): + """ + Parse a line containing OPB terms into a CPMpy expression. + + Supports: + - Linear terms (e.g., +2 x1) + - Non-linear terms (e.g., -1 x1 x14) + - Negated variables using '~' (e.g., ~x5) + + Arguments: + line (str): A string containing one or more terms. + vars (list[cp.boolvar]): List or array of CPMpy Boolean variables. + + Returns: + cp.Expression: A CPMpy expression representing the sum of all parsed terms. + + Example: + >>> _parse_term("2 x2 x3 +3 x4 ~x5", vars) + sum([2, 3] * [(IV2*IV3), (IV4*~IV5)]) + """ + + terms = [] + for w, vars_str in TERM_RE.findall(line): + factors = [] + + for v in vars_str.split(): + if v.startswith("~x"): + idx = int(v[2:]) - 1 # remove "~x" and opb is 1-based indexing + factors.append(~vars[idx]) + else: + idx = int(v[1:]) - 1 # remove "x" and opb is 1-based indexing + factors.append(vars[idx]) + + term = int(w) * reduce(mul, factors, 1) # create weighted term + terms.append(term) + + return cp.sum(terms) + +def _parse_constraint(line, vars): + """ + Parse a single OPB constraint line into a CPMpy comparison expression. + + Arguments: + line (str): A string representing a single OPB constraint. + vars (list[cp.boolvar]): List or array of CPMpy Boolean variables. Will be index to get the variables for the constraint. + + Returns: + cp.expressions.core.Comparison: A CPMpy comparison expression representing + the constraint. + + Example: + >>> _parse_constraint("-1 x1 x14 -1 x1 ~x17 >= -1", vars) + sum([-1, -1] * [(IV1*IV14), (IV1*~IV17)]) >= -1 + """ + + op, ind_term = IND_TERM_RE.search(line).groups() + lhs = _parse_term(line, vars) + + rhs = int(ind_term) if ind_term.lstrip("+-").isdigit() else vars[int(ind_term)] + + return cp.expressions.core.Comparison( + name="==" if op == "=" else ">=", + left=lhs, + right=rhs + ) + +_std_open = open +def read_opb(opb: Union[str, os.PathLike], open=open) -> cp.Model: + """ + Parser for OPB (Pseudo-Boolean) format. Reads in an instance and returns its matching CPMpy model. + + Based on PyPBLib's example parser: https://hardlog.udl.cat/static/doc/pypblib/html/library/index.html#example-from-opb-to-cnf-file + + Supports: + - Linear and non-linear terms (e.g., -1 x1 x14 +2 x2) + - Negated variables using '~' (e.g., ~x5) + - Minimisation objective + - Comparison operators in constraints: '=', '>=' + + Arguments: + opb (str or os.PathLike): + - A file path to an OPB file (optionally LZMA-compressed with `.xz`) + - OR a string containing the OPB content directly + open: (callable): + If wcnf is the path to a file, a callable to "open" that file (default=python standard library's 'open'). + + Returns: + cp.Model: The CPMpy model of the OPB instance. + + Example: + >>> opb_text = ''' + ... * #variable= 5 #constraint= 2 #equal= 1 intsize= 64 #product= 5 sizeproduct= 13 + ... min: 2 x2 x3 +3 x4 ~x5 +2 ~x1 x2 +3 ~x1 x2 x3 ~x4 ~x5 ; + ... 2 x2 x3 -1 x1 ~x3 = 5 ; + ... ''' + >>> model = read_opb(opb_text) + >>> print(model) + Model(...) + + Notes: + - Comment lines starting with '*' are ignored. + - Only "min:" objectives are supported; "max:" is not recognized. + """ + + + # If opb is a path to a file -> open file + if isinstance(opb, (str, os.PathLike)) and os.path.exists(opb): + if open is not None: + f = open(opb) + else: + f = _std_open(opb, "rt") + # If opb is a string containing a model -> create a memory-mapped file + else: + f = StringIO(opb) + + # Look for header on first line + line = f.readline() + header = HEADER_RE.match(line) + if not header: # If not found on first line, look on second (happens when passing multi line string) + _line = f.readline() + header = HEADER_RE.match(_line) + if not header: + raise ValueError(f"Missing or incorrect header: \n0: {line}1: {_line}2: ...") + nr_vars = int(header.group(2)) + + # Generator without comment lines + reader = (l for l in map(str.strip, f) if l and l[0] != '*') + + # CPMpy objects + vars = cp.boolvar(shape=nr_vars, name="x") + if nr_vars == 1: + vars = cp.cpm_array([vars]) # ensure vars is indexable even for single variable case + model = cp.Model() + + # Special case for first line -> might contain objective function + first_line = next(reader) + if OBJ_TERM_RE.match(first_line): + obj_expr = _parse_term(first_line, vars) + model.minimize(obj_expr) + else: # no objective found, parse as a constraint instead + model.add(_parse_constraint(first_line, vars)) + + # Start parsing line by line + for line in reader: + model.add(_parse_constraint(line, vars)) + + return model + +def write_opb(model, fname=None, encoding="auto"): + """ + Export a CPMpy model to the OPB (Pseudo-Boolean) format. + + This function transforms the given CPMpy model into OPB format, which is a standard textual + format for representing Pseudo-Boolean optimization problems. The OPB file will contain + a header specifying the number of variables and constraints, the objective (optional), and the + list of constraints using integer-weighted Boolean variables. + + Args: + model (cp.Model): The CPMpy model to export. + fname (str, optional): The file name to write the OPB output to. If None, the OPB string is returned. + encoding (str, optional): The encoding used for `int2bool`. Options: ("auto", "direct", "order", "binary"). + + Returns: + str or None: The OPB string if `fname` is None, otherwise nothing (writes to file). + + Format: + * #variable= #constraint= + * OPB file generated by CPMpy version + min/max: ; + ; + ; + ... + + Note: + Some solvers only support variable names of the form x. The OPB writer will remap + all CPMpy variables to such a format internally. + + Example: + >>> from cpmpy import * + >>> x = boolvar(shape=3) + >>> m = Model(x[0] + x[1] + x[2] >= 2) + >>> print(write_opb(m)) + """ + + csemap, ivarmap = dict(), dict() + opb_cons = _transform(model.constraints, csemap, ivarmap, encoding) + + if model.objective_ is not None: + opb_obj, const, extra_cons = _transform_objective(model.objective_, csemap, ivarmap, encoding) + opb_cons += extra_cons + else: + opb_obj = None + + # Form header and variable mapping + # Use all variables occurring in constraints and the objective + all_vars = get_variables(opb_cons + ([opb_obj] if opb_obj is not None else [])) + out = [ + f"* #variable= {len(all_vars)} #constraint= {len(opb_cons)}", + f"* OPB file generated by CPMpy version {__version__}", + ] + # Remap variables to 'x1', 'x2', ..., the standard OPB way + varmap = {v: f"x{i+1}" for i, v in enumerate(all_vars)} + + # Write objective, if present + if model.objective_ is not None: + objective_str = _wsum_to_str(opb_obj, varmap) + out.append(f"{'min' if model.objective_is_min else 'max'}: {objective_str};") + + # Write constraints + for cons in opb_cons: + assert isinstance(cons, Comparison), f"Expected a comparison, but got {cons}" + lhs, rhs = cons.args + constraint_str = f"{_wsum_to_str(lhs, varmap)} {cons.name} {rhs};" + out.append(constraint_str) + + # Output to file or string + contents = "\n".join(out) + if fname is None: + return contents + else: + with open(fname, "w") as f: + f.write(contents) + +def _normalized_comparison(lst_of_expr): + """ + Convert a list of linear CPMpy expressions into OPB-compatible pseudo-Boolean constraints. + + Transforms a list of Boolean-linear CPMpy expressions (as output by `linearize_constraint`) into a list + of OPB-normalized constraints, expressed as comparisons between weighted Boolean sums + (using "wsum") and integer constants. Handles Boolean vars, reifications, implications, + and ensures all equalities are decomposed into two inequalities. + + Args: + lst_of_expr (list): List of CPMpy Boolean-linear expressions. + + Returns: + list: List of normalized CPMpy `Comparison` objects representing pseudo-Boolean constraints. + """ + newlist = [] + for cpm_expr in lst_of_expr: + if isinstance(cpm_expr, cp.BoolVal) and cpm_expr.value() is False: + raise NotImplementedError(f"Cannot transform {cpm_expr} to OPB constraint") + + # single Boolean variable + if isinstance(cpm_expr, _BoolVarImpl): + cpm_expr = Operator("sum", [cpm_expr]) >= 1 + + # implication + if isinstance(cpm_expr, Operator) and cpm_expr.name == "->": + bv, subexpr = cpm_expr.args + assert isinstance(subexpr, _BoolVarImpl), "Only bv -> bv should reach here, but got {subexpr}" + cpm_expr = Operator("wsum", [[-1, 1], [bv, subexpr]]) >= 0 + newlist.append(cpm_expr) + continue + + # Comparison, can be single Boolean variable or (weighted) sum of Boolean variables + if isinstance(cpm_expr, Comparison): + lhs, rhs = cpm_expr.args + + if isinstance(lhs, _BoolVarImpl): + lhs = Operator("sum", [lhs]) + if lhs.name == "sum": + lhs = Operator("wsum", [[1]*len(lhs.args), lhs.args]) + + assert isinstance(lhs, Operator) and lhs.name == "wsum", f"Expected a wsum, but got {lhs}" + + # convert comparisons into >= constraints + if cpm_expr.name == "==": + newlist += _normalized_comparison([lhs <= rhs]) + newlist += _normalized_comparison([lhs >= rhs]) + elif cpm_expr.name == ">=": + newlist.append(lhs >= rhs) + elif cpm_expr.name == "<=": + new_weights = [-w for w in lhs.args[0]] + newlist.append(Operator("wsum", [new_weights, lhs.args[1]]) >= -rhs) + else: + raise ValueError(f"Unknown comparison {cpm_expr.name}") + else: + raise NotImplementedError(f"Expected a comparison, but got {cpm_expr}") + + return newlist + +def _wsum_to_str(cpm_expr, varmap): + """ + Convert a weighted sum CPMpy expression to a string in OPB format. + + args: + cpm_expr (Operator): wsum CPMpy expression + varmap (dict): dictionary mapping CPMpy variables to OPB variable names + """ + assert isinstance(cpm_expr, Operator) and cpm_expr.name == "wsum", f"Expected a wsum, but got {cpm_expr}" + weights, args = cpm_expr.args + + out = [] + for w, var in zip(weights, args): + var = varmap[var] if not isinstance(var, NegBoolView) else f"~{varmap[var._bv]}" + if w < 0: + out.append(f"- {w} {var}") + elif w > 0: + out.append(f"+ {w} {var}") + else: + pass # zero weight, ignore + + str_out = " ".join(out) + return str_out + +def _transform(cpm_expr, csemap, ivarmap, encoding="auto"): + """ + Transform a list of CPMpy expressions into a list of Pseudo-Boolean constraints. + """ + + cpm_cons = toplevel_list(cpm_expr) + cpm_cons = no_partial_functions(cpm_cons, safen_toplevel={"div", "mod", "element"}) + cpm_cons = decompose_in_tree(cpm_cons, + supported={"alldifferent"}, # alldiff has a specialized MIP decomp in linearize + csemap=csemap + ) + cpm_cons = simplify_boolean(cpm_cons) + cpm_cons = flatten_constraint(cpm_cons, csemap=csemap) # flat normal form + cpm_cons = only_bv_reifies(cpm_cons, csemap=csemap) + cpm_cons = only_implies(cpm_cons, csemap=csemap) + cpm_cons = linearize_constraint( + cpm_cons, supported=frozenset({"sum", "wsum"}), csemap=csemap + ) + cpm_cons = int2bool(cpm_cons, ivarmap, encoding=encoding) + + return _normalized_comparison(cpm_cons) + +def _transform_objective(expr, csemap, ivarmap, encoding="auto"): + """ + Transform a CPMpy objective expression into a weighted sum expression + """ + + # transform objective + obj, safe_cons = safen_objective(expr) + obj, decomp_cons = decompose_objective(obj, supported={"alldifferent"}, + csemap=csemap) + obj, flat_cons = flatten_objective(obj, csemap=csemap) + obj = only_positive_bv_wsum(obj) # remove negboolviews + + weights, xs, const = [], [], 0 + # we assume obj is a var, a sum or a wsum (over int and bool vars) + if isinstance(obj, _IntVarImpl) or isinstance(obj, NegBoolView): # includes _BoolVarImpl + weights = [1] + xs = [obj] + elif obj.name == "sum": + xs = obj.args + weights = [1] * len(xs) + elif obj.name == "wsum": + weights, xs = obj.args + else: + raise NotImplementedError(f"OPB: Non supported objective {obj} (yet?)") + + terms, cons, k = _encode_lin_expr(ivarmap, xs, weights, encoding) + + # remove terms with coefficient 0 (`only_positive_coefficients_` may return them and RC2 does not accept them) + terms = [(w, x) for w,x in terms if w != 0] + + obj = Operator("wsum", [[w for w,x in terms], [x for w,x in terms]]) + return obj, const, safe_cons + decomp_cons + flat_cons + + +def main(): + parser = argparse.ArgumentParser(description="Parse and solve an OPB model using CPMpy") + parser.add_argument("model", help="Path to an OPB file (or raw OPB string if --string is given)") + parser.add_argument("-s", "--solver", default=None, help="Solver name to use (default: CPMpy's default)") + parser.add_argument("--string", action="store_true", help="Interpret the first argument (model) as a raw OPB string instead of a file path") + parser.add_argument("-t", "--time-limit", type=int, default=None, help="Time limit for the solver in seconds (default: no limit)") + args = parser.parse_args() + + # Build the CPMpy model + try: + if args.string: + model = read_opb(args.model) + else: + model = read_opb(os.path.expanduser(args.model)) + except Exception as e: + sys.stderr.write(f"Error reading model: {e}\n") + sys.exit(1) + + # Solve the model + try: + if args.solver: + result = model.solve(solver=args.solver, time_limit=args.time_limit) + else: + result = model.solve(time_limit=args.time_limit) + except Exception as e: + sys.stderr.write(f"Error solving model: {e}\n") + sys.exit(1) + + # Print results + print("Status:", model.status()) + if result is not None: + if model.has_objective(): + print("Objective:", model.objective_value()) + else: + print("No solution found.") + +if __name__ == "__main__": + main() diff --git a/cpmpy/tools/io/rcpsp.py b/cpmpy/tools/io/rcpsp.py new file mode 100644 index 000000000..84aa29afa --- /dev/null +++ b/cpmpy/tools/io/rcpsp.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- +## +## rcpsp.py +## +""" +Parser for the PSPLIB RCPSP format. + + +================= +List of functions +================= + +.. autosummary:: + :nosignatures: + + read_rcpsp +""" + + +import os +import sys +import argparse +import cpmpy as cp +from io import StringIO +from typing import Union + + +_std_open = open +def read_rcpsp(rcpsp: Union[str, os.PathLike], open=open) -> cp.Model: + """ + Parser for PSPLIB RCPSP format. Reads in an instance and returns its matching CPMpy model. + + Arguments: + rcpsp (str or os.PathLike): + - A file path to a PSPLIB RCPSP file + - OR a string containing the RCPSP content directly + open: (callable): + If rcpsp is the path to a file, a callable to "open" that file (default=python standard library's 'open'). + + Returns: + cp.Model: The CPMpy model of the PSPLIB RCPSP instance. + """ + # If rcpsp is a path to a file -> open file + if isinstance(rcpsp, (str, os.PathLike)) and os.path.exists(rcpsp): + if open is not None: + f = open(rcpsp) + else: + f = _std_open(rcpsp, "rt") + # If rcpsp is a string containing a model -> create a memory-mapped file + else: + f = StringIO(rcpsp) + + + table, capacities = _parse_rcpsp(f) + model, (start, end, makespan) = _model_rcpsp(job_data=table, capacities=capacities) + return model + +def _parse_rcpsp(f): + + data = dict() + + line = f.readline() + while not line.startswith("PRECEDENCE RELATIONS:"): + line = f.readline() + + f.readline() # skip keyword line + line = f.readline() # first line of table, skip + while not line.startswith("*****"): + jobnr, n_modes, n_succ, *succ = [int(x) for x in line.split(" ") if len(x.strip())] + assert len(succ) == n_succ, "Expected %d successors for job %d, got %d" % (n_succ, jobnr, len(succ)) + data[jobnr] = dict(num_modes=n_modes, successors=succ) + line = f.readline() + + # skip to job info + while not line.startswith("REQUESTS/DURATIONS:"): + line = f.readline() + + line = f.readline() + _j, _m, _d, *_r = [x.strip() for x in line.split(" ") if len(x.strip())] # first line of table + resource_names = [f"{_r[i]}{_r[i+1]}" for i in range(0,len(_r),2)] + line = f.readline() # first line of table + if line.startswith("----") or line.startswith("*****"): # intermediate line in table... + line = f.readline() # skip + + while not line.startswith("*****"): + jobnr, mode, duration, *resources = [int(x) for x in line.split(" ") if len(x.strip())] + assert len(resources) == len(resource_names), "Expected %d resources for job %d, got %d" % (len(resource_names), jobnr, len(resources)) + data[jobnr].update(dict(mode=mode, duration=duration)) + data[jobnr].update({name : req for name, req in zip(resource_names, resources)}) + line = f.readline() + + # read resource availabilities + while not line.startswith("RESOURCEAVAILABILITIES:"): + line = f.readline() + + f.readline() # skip header + capacities = [int(x) for x in f.readline().split(" ") if len(x)] + + import pandas as pd + df =pd.DataFrame([dict(jobnr=k ,**info) for k, info in data.items()], + columns=["jobnr", "mode", "duration", "successors", *resource_names]) + df.set_index("jobnr", inplace=True) + + return df, dict(zip(resource_names, capacities)) + +def _model_rcpsp(job_data, capacities): + + model = cp.Model() + + horizon = job_data.duration.sum() # worst case, all jobs sequential on a machine + makespan = cp.intvar(0, horizon, name="makespan") + + start = cp.intvar(0, horizon, name="start", shape=len(job_data)) + end = cp.intvar(0, horizon, name="end", shape=len(job_data)) + + # ensure capacity is not exceeded + for rescource, capa in capacities.items(): + model += cp.Cumulative( + start = start, + duration = job_data['duration'].tolist(), + end = end, + demand = job_data[rescource].tolist(), + capacity = capa + ) + + # enforce precedences + for idx, (jobnr, info) in enumerate(job_data.iterrows()): + for succ in info['successors']: + model += end[idx] <= start[succ-1] # job ids start at idx 1 + + model += end <= makespan + model.minimize(makespan) + + return model, (start, end, makespan) + + +def main(): + parser = argparse.ArgumentParser(description="Parse and solve a PSPLIB RCPSP model using CPMpy") + parser.add_argument("model", help="Path to a PSPLIB RCPSP file (or raw RCPSP string if --string is given)") + parser.add_argument("-s", "--solver", default=None, help="Solver name to use (default: CPMpy's default)") + parser.add_argument("--string", action="store_true", help="Interpret the first argument (model) as a raw RCPSP string instead of a file path") + parser.add_argument("-t", "--time-limit", type=int, default=None, help="Time limit for the solver in seconds (default: no limit)") + args = parser.parse_args() + + # Build the CPMpy model + try: + if args.string: + model = read_rcpsp(args.model) + else: + model = read_rcpsp(os.path.expanduser(args.model)) + except Exception as e: + sys.stderr.write(f"Error reading model: {e}\n") + sys.exit(1) + + # Solve the model + try: + if args.solver: + result = model.solve(solver=args.solver, time_limit=args.time_limit) + else: + result = model.solve(time_limit=args.time_limit) + except Exception as e: + sys.stderr.write(f"Error solving model: {e}\n") + sys.exit(1) + + # Print results + print("Status:", model.status()) + if result is not None: + if model.has_objective(): + print("Objective:", model.objective_value()) + else: + print("No solution found.") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cpmpy/tools/io/reader.py b/cpmpy/tools/io/reader.py new file mode 100644 index 000000000..6bc161a2f --- /dev/null +++ b/cpmpy/tools/io/reader.py @@ -0,0 +1,121 @@ +""" +CPMpy tools for reading models from files. + +================= +List of functions +================= + +.. autosummary:: + :nosignatures: + + read + read_formats +""" + +from typing import Callable, List, Optional + +import cpmpy as cp +from cpmpy.tools.dimacs import read_dimacs +from cpmpy.tools.io.scip import read_scip +from cpmpy.tools.io.wcnf import read_wcnf +from cpmpy.tools.io.opb import read_opb +from cpmpy.tools.io.utils import get_format + +# mapping format names to appropriate reader functions +_reader_map = { + "mps": read_scip, + "lp": read_scip, + "cip": read_scip, + "fzn": read_scip, + "gms": read_scip, + "pip": read_scip, + "dimacs": read_dimacs, + "opb": read_opb, + "wcnf": read_wcnf, + # xcsp3 has a generic .xml extension, so it is not included here +} + + +def _get_reader(format: str) -> Callable[[str], cp.Model]: + """ + Get the reader function for a given format. + + Arguments: + format (str): The name of the format to get a reader for. + + Raises: + ValueError: If the format is not supported. + + Returns: + A callable that reads a model from a file. + """ + + if format not in _reader_map: + raise ValueError(f"Unsupported format: {format}") + + return _reader_map[format] + +def read_formats() -> List[str]: + """ + List of supported read formats. + + Each can be used as the `format` argument to the `read` function. + E.g.: + + .. code-block:: python + + from cpmpy.tools.io import read + model = read(file_path, format="mps") + model = read(file_path, format="lp") + """ + return list(_reader_map.keys()) + +def _derive_format(file_path: str) -> str: + """ + Derive the format of a file from its path. + + Arguments: + file_path (str): The path to the file to derive the format from. + + Raises: + ValueError: If the format could not be derived from the file path. + + Returns: + The name of the format. + + Example: + >>> _derive_format("instance.mps") + "mps" + >>> _derive_format("instance.lp.xz") + "lp" + """ + + # Iterate over the file path extensions in reverse order + for ext in file_path.split(".")[::-1]: + try: + return get_format(ext) + except ValueError: + continue + + raise ValueError(f"No file format provided and could not derive format from file path: {file_path}") + +def read(file_path: str, format: Optional[str] = None) -> cp.Model: + """ + Read a model from a file. + + Arguments: + file_path (str): The path to the file to read. + format (Optional[str]): The format of the file to read. If None, the format will be derived from the file path. + + Raises: + ValueError: If the format is not supported. + + Returns: + A CPMpy model. + """ + + if format is None: + format = _derive_format(file_path) + + reader = _get_reader(format) + return reader(file_path) \ No newline at end of file diff --git a/cpmpy/tools/io/scip.py b/cpmpy/tools/io/scip.py new file mode 100644 index 000000000..3ea825c52 --- /dev/null +++ b/cpmpy/tools/io/scip.py @@ -0,0 +1,596 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- +## +## scip.py +## +""" +This file implements helper functions for converting CPMpy models to and from various data +formats supported by the SCIP optimization suite. + +============ +Installation +============ + +The 'pyscipopt' python package must be installed separately through `pip`: + +.. code-block:: console + + $ pip install cpmpy[io.scip] + +================= +List of functions +================= + +.. autosummary:: + :nosignatures: + + read_scip + write_scip + to_scip +""" + + +import argparse +import math +import os +import sys +import tempfile +import numpy as np +import cpmpy as cp +import warnings + +from typing import Union, Optional + +from cpmpy.expressions.core import BoolVal, Comparison, Operator +from cpmpy.expressions.variables import _NumVarImpl, _BoolVarImpl, NegBoolView, _IntVarImpl +from cpmpy.transformations.comparison import only_numexpr_equality +from cpmpy.transformations.decompose_global import decompose_in_tree +from cpmpy.transformations.flatten_model import flatten_constraint, flatten_objective +from cpmpy.transformations.get_variables import get_variables +from cpmpy.transformations.linearize import linearize_constraint, only_positive_bv +from cpmpy.transformations.normalize import toplevel_list +from cpmpy.transformations.reification import only_implies, reify_rewrite +from cpmpy.expressions.utils import is_any_list, is_num +from cpmpy.expressions.globalconstraints import DirectConstraint +from cpmpy.expressions.variables import ignore_variable_name_check + + +_std_open = open +def read_scip(fname: Union[str, os.PathLike], open=open, assume_integer:bool=False) -> cp.Model: + """ + Read a SCIP-compatible model from a file and return a CPMpy model. + + Arguments: + fname: The path to the SCIP-compatible file to read. + open: The function to use to open the file. (SCIP does not require this argument, will be ignored) + assume_integer: Whether to assume that all variables are integer. + + Returns: + A CPMpy model. + """ + if not _SCIPWriter.supported(): + raise Exception("SCIP: Install SCIP IO dependencies: cpmpy[io.scip]") + + with ignore_variable_name_check(): + + from pyscipopt import Model + + # Load file into pyscipopt model + scip = Model() + scip.hideOutput() + scip.readProblem(filename=fname) + scip.hideOutput(quiet=False) + + # 1) translate variables + scip_vars = scip.getVars() + var_map = {} + for var in scip_vars: + name = var.name # name of the variable + vtype = var.vtype() # type of the variable + if vtype == "BINARY": + var_map[name] = cp.boolvar(name=name) + elif vtype == "INTEGER": + lb = int(var.getLbOriginal()) + ub = int(var.getUbOriginal()) + var_map[name] = cp.intvar(lb, ub, name=name) + elif vtype == "CONTINUOUS": + if assume_integer: + lb = int(math.ceil(var.getLbOriginal())) + ub = int(math.floor(var.getUbOriginal())) + if lb != var.getLbOriginal() or ub != var.getUbOriginal(): + warnings.warn(f"Continuous variable {name} has non-integer bounds {var.getLbOriginal()} - {var.getUbOriginal()}. CPMpy will assume it is integer.") + var_map[name] = cp.intvar(lb, ub, name=name) + else: + raise ValueError(f"CPMpy does not support continious variables: {name}") + else: + raise ValueError(f"Unsupported variable type: {vtype}") + + + model = cp.Model() + + # 2) translate constraints + scip_cons = scip.getConss() + for cons in scip_cons: + ctype = cons.getConshdlrName() # type of the constraint + + if ctype == "linear": + cons_vars = scip.getConsVars(cons) # variables in the constraint (x) + cons_coeff = scip.getConsVals(cons) # coefficients of the variables (A) + + cpm_vars = [var_map[v.name] for v in cons_vars] # convert to CPMpy variables + cpm_sum = cp.sum(var*coeff for (var,coeff) in zip(cpm_vars, cons_coeff)) # Ax + + lhs = scip.getLhs(cons) # lhs of the constraint + rhs = scip.getRhs(cons) # rhs of the constraint + + # convert to integer bounds + _lhs = int(math.ceil(lhs)) + _rhs = int(math.floor(rhs)) + if _lhs != int(lhs) or _rhs != int(rhs): + if assume_integer: + warnings.warn(f"Constraint {cons.name} has non-integer bounds. CPMpy will assume it is integer.") + else: + raise ValueError(f"Constraint {cons.name} has non-integer bounds. CPMpy does not support non-integer bounds.") + + # add the constraint to the model + model += _lhs <= cpm_sum + model += cpm_sum <= _rhs + + else: + raise ValueError(f"Unsupported constraint type: {ctype}") + + # 3) translate objective + scip_objective = scip.getObjective() + direction = scip.getObjectiveSense() + + n_terms = len(scip_objective.terms) + obj_vars = cp.cpm_array([None]*n_terms) + obj_coeffs = np.zeros(n_terms, dtype=int) + + for i, (term, coeff) in enumerate(scip_objective.terms.items()): # terms is a dictionary mapping terms to coefficients + if len(term.vartuple) > 1: + raise ValueError(f"Unsupported objective term: {term}") # TODO <- assumes linear, support higher-order terms + cpm_var = var_map[term.vartuple[0].name] # TODO <- assumes linear + obj_vars[i] = cpm_var + + _coeff = int(math.floor(coeff)) + if _coeff != int(coeff): + if assume_integer: + warnings.warn(f"Objective term {term} has non-integer coefficient. CPMpy will assume it is integer.") + else: + raise ValueError(f"Objective term {term} has non-integer coefficient. CPMpy does not support non-integer coefficients.") + obj_coeffs[i] = _coeff + + if direction == "minimize": + model.minimize(cp.sum(obj_vars * obj_coeffs)) + elif direction == "maximize": + model.maximize(cp.sum(obj_vars * obj_coeffs)) + else: + raise ValueError(f"Unsupported objective sense: {direction}") + + return model + + + +class _SCIPWriter: + """ + A helper class aiding in translating CPMpy models to SCIP models. + + Borrows a lot of its implementation from the prototype SCIP solver interface from git branch `scip2`. + + TODO: code should be reused once SCIP has been added as a solver backend. + """ + + @staticmethod + def supported(): + # try to import the package + try: + import pyscipopt as scip + return True + except: + return False + + def __init__(self, problem_name: Optional[str] = None): + if not self.supported(): + raise Exception( + "SCIP: Install SCIP IO dependencies: cpmpy[io.scip]") + import pyscipopt as scip + + self.scip_model = scip.Model(problem_name) + + self.user_vars = set() + self._varmap = dict() # maps cpmpy variables to native solver variables + self._csemap = dict() # maps cpmpy expressions to solver expressions + + self._cons_counter = 0 + + def solver_var(self, cpm_var): + """ + Creates solver variable for cpmpy variable + or returns from cache if previously created + """ + if is_num(cpm_var): # shortcut, eases posting constraints + return cpm_var + + # special case, negative-bool-view + # work directly on var inside the view + if isinstance(cpm_var, NegBoolView): + raise Exception("Negative literals should not be part of any equation. See /transformations/linearize for more details") + + # create if it does not exit + if cpm_var not in self._varmap: + if isinstance(cpm_var, _BoolVarImpl): + revar = self.scip_model.addVar(vtype='B', name=cpm_var.name) + elif isinstance(cpm_var, _IntVarImpl): + revar = self.scip_model.addVar(lb=cpm_var.lb, ub=cpm_var.ub, vtype='I', name=cpm_var.name) + else: + raise NotImplementedError("Not a known var {}".format(cpm_var)) + self._varmap[cpm_var] = revar + + # return from cache + return self._varmap[cpm_var] + + + def solver_vars(self, cpm_vars): + """ + Like `solver_var()` but for arbitrary shaped lists/tensors + """ + if is_any_list(cpm_vars): + return [self.solver_vars(v) for v in cpm_vars] + return self.solver_var(cpm_vars) + + def objective(self, expr, minimize=True): + """ + Post the given expression to the solver as objective to minimize/maximize + + 'objective()' can be called multiple times, only the last one is stored + + (technical side note: any constraints created during conversion of the objective + are premanently posted to the solver) + """ + + # make objective function non-nested + (flat_obj, flat_cons) = (flatten_objective(expr)) + self += flat_cons + get_variables(flat_obj, collect=self.user_vars) # add potentially created constraints + + # make objective function or variable and post + obj = self._make_numexpr(flat_obj) + if minimize: + self.scip_model.setObjective(obj, sense='minimize') + else: + self.scip_model.setObjective(obj, sense='maximize') + + + def _make_numexpr(self, cpm_expr): + """ + Turns a numeric CPMpy 'flat' expression into a solver-specific + numeric expression + + Used especially to post an expression as objective function + """ + import pyscipopt as scip + + if is_num(cpm_expr): + return cpm_expr + + # decision variables, check in varmap + if isinstance(cpm_expr, _NumVarImpl): # cp.boolvar is subclass of _NumVarImpl + return self.solver_var(cpm_expr) + + # sum + if cpm_expr.name == "sum": + return scip.quicksum(self.solver_vars(cpm_expr.args)) + if cpm_expr.name == "sub": + a,b = self.solver_vars(cpm_expr.args) + return a - b + # wsum + if cpm_expr.name == "wsum": + return scip.quicksum(w * self.solver_var(var) for w, var in zip(*cpm_expr.args)) + + raise NotImplementedError("scip: Not a known supported numexpr {}".format(cpm_expr)) + + + def transform(self, cpm_expr): + """ + Transform arbitrary CPMpy expressions to constraints the solver supports + + Implemented through chaining multiple solver-independent **transformation functions** from + the `cpmpy/transformations/` directory. + + See the 'Adding a new solver' docs on readthedocs for more information. + + :param cpm_expr: CPMpy expression, or list thereof + :type cpm_expr: Expression or list of Expression + + :return: list of Expression + """ + # apply transformations, then post internally + # expressions have to be linearized to fit in MIP model. See /transformations/linearize + cpm_cons = toplevel_list(cpm_expr) + supported = {"alldifferent"} # alldiff has a specialized MIP decomp in linearize + cpm_cons = decompose_in_tree(cpm_cons, supported) + cpm_cons = flatten_constraint(cpm_cons) # flat normal form + cpm_cons = reify_rewrite(cpm_cons, supported=frozenset(['sum', 'wsum','sub'])) # constraints that support reification + cpm_cons = only_numexpr_equality(cpm_cons, supported=frozenset(["sum", "wsum", "sub"])) # supports >, <, != + cpm_cons = only_implies(cpm_cons) # anything that can create full reif should go above... + cpm_cons = linearize_constraint(cpm_cons, supported=frozenset({"sum", "wsum","sub", "mul", "div"})) # the core of the MIP-linearization + cpm_cons = only_positive_bv(cpm_cons) # after linearization, rewrite ~bv into 1-bv + return cpm_cons + + def _get_constraint_name(self): + name = f"cons_{self._cons_counter}" + self._cons_counter += 1 + return name + + + def add(self, cpm_expr_orig): + """ + Eagerly add a constraint to the underlying solver. + + Any CPMpy expression given is immediately transformed (through `transform()`) + and then posted to the solver in this function. + + This can raise 'NotImplementedError' for any constraint not supported after transformation + + The variables used in expressions given to add are stored as 'user variables'. Those are the only ones + the user knows and cares about (and will be populated with a value after solve). All other variables + are auxiliary variables created by transformations. + + :param cpm_expr: CPMpy expression, or list thereof + :type cpm_expr: Expression or list of Expression + + :return: self + """ + + # add new user vars to the set + get_variables(cpm_expr_orig, collect=self.user_vars) + + # transform and post the constraints + for cpm_expr in self.transform(cpm_expr_orig): + + # Comparisons: only numeric ones as 'only_bv_implies()' has removed the '==' reification for Boolean expressions + # numexpr `comp` bvar|const + if isinstance(cpm_expr, Comparison): + lhs, rhs = cpm_expr.args + sciprhs = self.solver_var(rhs) + + # Thanks to `only_numexpr_equality()` only supported comparisons should remain + if cpm_expr.name == '<=': + if (isinstance(lhs, Operator) and lhs.name == "sum" and all(a.is_bool() and not isinstance(a, NegBoolView) for a in lhs.args)): + if rhs == 1: # special SOS1 constraint? + self.scip_model.addConsSOS1(self.solver_vars(lhs.args), name=self._get_constraint_name()) + else: # cardinality constraint + self.scip_model.addConsCardinality(self.solver_vars(lhs.args), rhs, name=self._get_constraint_name()) + else: + sciplhs = self._make_numexpr(lhs) + self.scip_model.addCons(sciplhs <= sciprhs, name=self._get_constraint_name()) + + elif cpm_expr.name == '>=': + sciplhs = self._make_numexpr(lhs) + self.scip_model.addCons(sciplhs >= sciprhs, name=self._get_constraint_name()) + elif cpm_expr.name == '==': + if isinstance(lhs, _NumVarImpl) \ + or (isinstance(lhs, Operator) and (lhs.name == 'sum' or lhs.name == 'wsum' or lhs.name == "sub")): + # a BoundedLinearExpression LHS, special case, like in objective + sciplhs = self._make_numexpr(lhs) + self.scip_model.addCons(sciplhs == sciprhs, name=self._get_constraint_name()) + + elif lhs.name == 'mul': + scp_vars = self.solver_vars(lhs.args) + scp_lhs = scp_vars[0] * scp_vars[1] + for v in scp_vars[2:]: + scp_lhs *= v + self.scip_model.addCons(scp_lhs == sciprhs, name=self._get_constraint_name()) + + elif lhs.name == 'div': + a, b = self.solver_vars(lhs.args) + self.scip_model.addCons(a / b == sciprhs, name=self._get_constraint_name()) + + else: + raise NotImplementedError( + "Not a known supported scip comparison '{}' {}".format(lhs.name, cpm_expr)) + + # SCIP does have 'addConsAnd', 'addConsOr', 'addConsXor', 'addConsSOS2' #TODO? + else: + raise NotImplementedError( + "Not a known supported scip comparison '{}' {}".format(lhs.name, cpm_expr)) + + elif isinstance(cpm_expr, Operator) and cpm_expr.name == "->": + # Indicator constraints + # Takes form bvar -> sum(x,y,z) >= rvar + cond, sub_expr = cpm_expr.args + assert isinstance(cond, cp.boolvar), f"Implication constraint {cpm_expr} must have BoolVar as lhs" + assert isinstance(sub_expr, Comparison), "Implication must have linear constraints on right hand side" + + lhs, rhs = sub_expr.args + assert isinstance(lhs, _NumVarImpl) or lhs.name == "sum" or lhs.name == "wsum", f"Unknown linear expression {lhs} on right side of indicator constraint: {cpm_expr}" + assert is_num(rhs), f"linearize should only leave constants on rhs of comparison but got {rhs}" + + if sub_expr.name == ">=": # change sign + if lhs.name == "sum": + lhs = Operator("wsum", [[-1] * len(lhs.args), lhs.args]) + elif lhs.name == "wsum": + lhs = Operator("wsum", [[-w for w in lhs.args[0]], lhs.args[1]]) + else: + lhs = Operator("wsum",[[-1], [lhs]]) + sub_expr = lhs <= -rhs + + if sub_expr.name == "<=": + lhs, rhs = sub_expr.args + lin_expr = self._make_numexpr(lhs) + if isinstance(cond, NegBoolView): + self.scip_model.addConsIndicator(lin_expr <= rhs, name=self._get_constraint_name(), + binvar=self.solver_var(cond._bv), activeone=False) + else: + self.scip_model.addConsIndicator(lin_expr <= rhs, name=self._get_constraint_name(), + binvar=self.solver_var(cond), activeone=True) + + elif sub_expr.name == "==": # split into <= and >= + # TODO: refactor to avoid re-transforming constraints? + self += [cond.implies(lhs <= rhs), cond.implies(lhs >= rhs)] + else: + raise Exception(f"Unknown linear expression {sub_expr} name") + + # True or False + elif isinstance(cpm_expr, BoolVal): + # not sure how else to do it + if cpm_expr.args[0] is False: + bv = self.solver_var(cp.boolvar()) + self.scip_model.addCons(bv <= -1, name=self._get_constraint_name()) + + # a direct constraint, pass to solver + elif isinstance(cpm_expr, DirectConstraint): + cpm_expr.callSolver(self, self.scip_model) + + else: + raise NotImplementedError(cpm_expr) # if you reach this... please report on github + + return self + __add__ = add + + +def _to_writer(model: cp.Model, problem_name: Optional[str] = None) -> _SCIPWriter: + """ + Convert a CPMpy model to a SCIP writer + """ + writer = _SCIPWriter(problem_name=problem_name) + # 1) post constraints + for constraint in model.constraints: + writer += constraint + # 2) post objective + if not model.has_objective(): + raise ValueError("Model has no objective function") + writer.objective(model.objective_, model.objective_is_min) + return writer + + +def to_scip(model: cp.Model) -> "pyscipopt.Model": + """ + Convert a CPMpy model to a SCIP model + + Arguments: + model: CPMpy model + + Returns: + pyscipopt.Model: SCIP model + """ + writer = _to_writer(model) + return writer.scip_model + + +def _add_header(fname: os.PathLike, format: str, header: Optional[str] = None): + """ + Add a header to a file. + + Arguments: + fname: The path to the file to add the header to. + format: The format of the file. + header: The header to add. + """ + + with open(fname, "r") as f: + lines = f.readlines() + + if format == "mps": + header = ["* " + line + "\n" for line in header.splitlines()] + lines = header + lines + + elif format == "lp": + header = ["\\ " + line + "\n" for line in header.splitlines()] + lines = header + lines + + elif format == "cip": + header = ["# " + line + "\n" for line in header.splitlines()] + lines = header + lines + + elif format == "fzn": + header = ["% " + line + "\n" for line in header.splitlines()] + lines = header + lines + + elif format == "gms": + header = ["* " + line + "\n" for line in header.splitlines()] + lines = [lines[0]] + header + lines[1:] # handle first line: $OFFLISTING + + elif format == "pip": + header = ["\\ " + line + "\n" for line in header.splitlines()] + lines = header + lines + + with open(fname, "w") as f: + f.writelines(lines) + + +def write_scip(model: cp.Model, fname: Optional[str] = None, format: str = "mps", header: Optional[str] = None, verbose: bool = False) -> str: + """ + Write a CPMpy model to file using a SCIP provided writer. + Supported formats include: + - "mps" + - "lp" + - "cip" + - "fzn" + - "gms" + - "pip" + + More formats can be supported upon the installation of additional dependencies (like SIMPL). + For more information, see the SCIP documentation: https://pyscipopt.readthedocs.io/en/latest/tutorials/readwrite.html + """ + + writer = _to_writer(model, problem_name="CPMpy Model") + + # Decide where to write + if fname is None: + with tempfile.NamedTemporaryFile(suffix=f".{format}", delete=False) as tmp: + fname = tmp.name + try: + writer.scip_model.writeProblem(fname) + _add_header(fname, format, header) + with open(fname, "r") as f: + return f.read() + finally: + os.remove(fname) + else: + if not verbose: writer.scip_model.hideOutput() + writer.scip_model.writeProblem(fname, verbose=verbose) + if not verbose: writer.scip_model.hideOutput(quiet=False) + _add_header(fname, format, header) + with open(fname, "r") as f: + return f.read() + +def main(): + parser = argparse.ArgumentParser(description="Parse and solve a SCIP compatible model using CPMpy") + parser.add_argument("model", help="Path to a SCIP compatible file (or raw string if --string is given)") + parser.add_argument("-s", "--solver", default=None, help="Solver name to use (default: CPMpy's default)") + parser.add_argument("--string", action="store_true", help="Interpret the first argument (model) as a raw OPB string instead of a file path") + parser.add_argument("-t", "--time-limit", type=int, default=None, help="Time limit for the solver in seconds (default: no limit)") + args = parser.parse_args() + + # Build the CPMpy model + try: + if args.string: + model = read_scip(args.model) + else: + model = read_scip(os.path.expanduser(args.model)) + except Exception as e: + sys.stderr.write(f"Error reading model: {e}\n") + sys.exit(1) + + # Solve the model + try: + if args.solver: + result = model.solve(solver=args.solver, time_limit=args.time_limit) + else: + result = model.solve(time_limit=args.time_limit) + except Exception as e: + sys.stderr.write(f"Error solving model: {e}\n") + sys.exit(1) + + # Print results + print("Status:", model.status()) + if result is not None: + if model.has_objective(): + print("Objective:", model.objective_value()) + else: + print("No solution found.") + +if __name__ == "__main__": + main() diff --git a/cpmpy/tools/io/utils.py b/cpmpy/tools/io/utils.py new file mode 100644 index 000000000..e14f56b02 --- /dev/null +++ b/cpmpy/tools/io/utils.py @@ -0,0 +1,35 @@ +import warnings + + +# mapping file extensions to appropriate format names +_format_map = { + "mps" : "mps", + "lp" : "lp", + "cip" : "cip", + "fzn" : "fzn", + "gms" : "gms", + "pip" : "pip", + "wcnf" : "wcnf", + "cnf" : "dimacs", + "opb" : "opb", + "xcsp3" : "xcsp3", +} + +_extension_map = {} +for extension, format in _format_map.items(): + _extension_map[format] = _extension_map.get(format, []) + [extension] + +def get_extension(format: str) -> str: + """ + Get the file extension for a given format. + """ + if len(_extension_map[format]) > 1: + warnings.warn(f"Multiple extensions found for format {format}: {_extension_map[format]}. Using the first one: {_extension_map[format][0]}") + + return _extension_map[format][0] + +def get_format(extension: str) -> str: + """ + Get the format for a given file extension. + """ + return _format_map[extension] \ No newline at end of file diff --git a/cpmpy/tools/io/wcnf.py b/cpmpy/tools/io/wcnf.py new file mode 100644 index 000000000..5cea77608 --- /dev/null +++ b/cpmpy/tools/io/wcnf.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- +## +## wcnf.py +## +""" +Parser for the WCNF format. + + +================= +List of functions +================= + +.. autosummary:: + :nosignatures: + + read_wcnf +""" + + +import os +import sys +import argparse +import cpmpy as cp +from io import StringIO +from typing import Union + + +def _get_var(i, vars_dict): + """ + Returns CPMpy boolean decision variable matching to index `i` if exists, else creates a new decision variable. + + Arguments: + i: index + vars_dict (dict): dictionary to keep track of previously generated decision variables + """ + if i not in vars_dict: + vars_dict[i] = cp.boolvar(name=f"x{i}") # <- be carefull that name doesn't clash with generated variables during transformations / user variables + return vars_dict[i] + +_std_open = open +def read_wcnf(wcnf: Union[str, os.PathLike], open=open) -> cp.Model: + """ + Parser for WCNF format. Reads in an instance and returns its matching CPMpy model. + + Arguments: + wcnf (str or os.PathLike): + - A file path to an WCNF file (optionally LZMA-compressed with `.xz`) + - OR a string containing the WCNF content directly + open: (callable): + If wcnf is the path to a file, a callable to "open" that file (default=python standard library's 'open'). + + Returns: + cp.Model: The CPMpy model of the WCNF instance. + """ + # If wcnf is a path to a file -> open file + if isinstance(wcnf, (str, os.PathLike)) and os.path.exists(wcnf): + if open is not None: + f = open(wcnf) + else: + f = _std_open(wcnf, "rt") + # If wcnf is a string containing a model -> create a memory-mapped file + else: + f = StringIO(wcnf) + + model = cp.Model() + vars = {} + soft_terms = [] + + for raw in f: + line = raw.strip() + + # Empty line or a comment -> skip + if not line or line.startswith("c"): + continue + + # Hard clause + if line[0] == "h": + literals = map(int, line[1:].split()) + clause = [_get_var(i, vars) if i > 0 else ~_get_var(-i, vars) + for i in literals if i != 0] + model.add(cp.any(clause)) + + # Soft clause (weight first) + else: + parts = line.split() + weight = int(parts[0]) + literals = map(int, parts[1:]) + clause = [_get_var(i, vars) if i > 0 else ~_get_var(-i, vars) + for i in literals if i != 0] + soft_terms.append(weight * cp.any(clause)) + + # Objective = sum of soft clause terms + if soft_terms: + model.maximize(sum(soft_terms)) + + return model + +def main(): + parser = argparse.ArgumentParser(description="Parse and solve a WCNF model using CPMpy") + parser.add_argument("model", help="Path to a WCNF file (or raw WCNF string if --string is given)") + parser.add_argument("-s", "--solver", default=None, help="Solver name to use (default: CPMpy's default)") + parser.add_argument("--string", action="store_true", help="Interpret the first argument (model) as a raw WCNF string instead of a file path") + parser.add_argument("-t", "--time-limit", type=int, default=None, help="Time limit for the solver in seconds (default: no limit)") + args = parser.parse_args() + + # Build the CPMpy model + try: + if args.string: + model = read_wcnf(args.model) + else: + model = read_wcnf(os.path.expanduser(args.model)) + except Exception as e: + sys.stderr.write(f"Error reading model: {e}\n") + sys.exit(1) + + # Solve the model + try: + if args.solver: + result = model.solve(solver=args.solver, time_limit=args.time_limit) + else: + result = model.solve(time_limit=args.time_limit) + except Exception as e: + sys.stderr.write(f"Error solving model: {e}\n") + sys.exit(1) + + # Print results + print("Status:", model.status()) + if result is not None: + if model.has_objective(): + print("Objective:", model.objective_value()) + else: + print("No solution found.") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cpmpy/tools/io/writer.py b/cpmpy/tools/io/writer.py new file mode 100644 index 000000000..c4074e22d --- /dev/null +++ b/cpmpy/tools/io/writer.py @@ -0,0 +1,124 @@ +""" +CPMpy tools for writing models to files. + +================= +List of functions +================= + +.. autosummary:: + :nosignatures: + + write + write_formats + +============== +Module details +============== +""" + +import inspect +from typing import Callable, Optional, List +from functools import partial + +import cpmpy as cp +from cpmpy.tools.dimacs import write_dimacs +from cpmpy.tools.io.scip import write_scip +from cpmpy.tools.io.opb import write_opb + +# mapping format names to appropriate writer functions +_writer_map = { + "mps": partial(write_scip, format="mps"), + "lp": partial(write_scip, format="lp"), + "cip": partial(write_scip, format="cip"), + # "cnf": partial(write_scip, format="cnf"), # requires SIMPL, not included in pip package + # "diff": partial(write_scip, format="diff"), # requires SIMPL, not included in pip package + "fzn": partial(write_scip, format="fzn"), + "gms": partial(write_scip, format="gms"), + # "opb": partial(write_scip, format="opb"), # requires SIMPL, not included in pip package + # "osil": partial(write_scip, format="osil"), + "pip": partial(write_scip, format="pip"), + # "sol": partial(write_scip, format="sol"), # requires SIMPL, not included in pip package + # "wbo": partial(write_scip, format="wbo"), # requires SIMPL, not included in pip package + # "zpl": partial(write_scip, format="zpl"), # requires SIMPL, not included in pip package + "dimacs": write_dimacs, + "opb": write_opb, + # "wcnf": write_wcnf, # currently not supported +} + +def _get_writer(format: str) -> Callable: + """ + Get the writer function for a given format. + + Arguments: + format (str): The name of the format to get a writer for. + + Raises: + ValueError: If the format is not supported. + + Returns: + A callable that writes a model to a file. + """ + + if format not in _writer_map: + raise ValueError(f"Unsupported format: {format}") + + return _writer_map[format] + +def write_formats() -> List[str]: + """ + List of supported write formats. + + Each can be used as the `format` argument to the `write` function. + E.g.: + + .. code-block:: python + + from cpmpy.tools.io import write, write_formats, get_extension + write(model, format=write_formats()[0]) + write(model, format=write_formats()[1], file_path=f"model.{get_extension(write_formats()[1])}") + """ + return list(_writer_map.keys()) + +def _create_header(format: str) -> str: + """ + Default header for a file. + """ + header = "-"*100 + "\n" + header += "File written by CPMpy\n" + header += f" Format: '{format}'\n" + header += f" CPMpy Version: {cp.__version__}\n" + header += "-"*100 + "\n" + return header + +def write(model: cp.Model, format: str, file_path: Optional[str] = None, verbose: bool = False, header: Optional[str] = None, **kwargs) -> str: + """ + Write a model to a file. + + Arguments: + model (cp.Model): The model to write. + format (str): The format to write the model in. + file_path (Optional[str]): The path to the file to write the model to. If None, only a string containing the model will be returned. + verbose (bool): Whether to print verbose output. + header (Optional[str]): The header to put at the top of the file. If None, a default header will be created. Pass an empty string to skip adding a header. + **kwargs: Additional arguments to pass to the writer. + """ + + writer = _get_writer(format) + + kwargs["verbose"] = verbose + + # keep only kwargs the writer accepts + sig = inspect.signature(writer) + allowed = sig.parameters + filtered_kwargs = { + k: v for k, v in kwargs.items() + if k in allowed + } + + # create header if not provided + if header is None: + header = _create_header(format) + if header == "": + header = None + + return writer(model, fname=file_path, header=header, **filtered_kwargs) \ No newline at end of file