Source code for omegalpes.general.optimisation.model

#! usr/bin/env python3
#  -*- coding: utf-8 -*-

"""
**This module enables to fill the optimization model and formulate it in
LP or MILP based on the package PuLP (LpProblem)**

..
    Copyright 2018 G2Elab / MAGE

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
"""

import os
import warnings
import glob
import time as pytime

from pulp import LpProblem, LpStatus, LpVariable, lpSum, LpVariableDict
from pulp.solvers import LpSolver

from ..optimisation.core import OptObject
from ..time import TimeUnit
from ...energy.energy_nodes import EnergyNode
from ...actor.actor import Actor
from ...energy.units.energy_units import AssemblyUnit
from ..optimisation.elements import Constraint, DynamicConstraint
from ..optimisation.elements import Objective
from ..optimisation.elements import Quantity

__docformat__ = "restructuredtext en"


[docs]class OptimisationModel(LpProblem): """ **Description** This class includes the optimization model formulated in LP or MILP based on the package PuLP (LpProblem) """ def __init__(self, time, name='optimisation_model'): """ :param name: Name of your optimisation model (str) """ LpProblem.__init__(self, name) self.verbose = 1 self.noOverlap = False self.time = time self._model_units_list = [] self._model_quantities_list = [] self._model_constraints_list = [] self._model_objectives_list = []
[docs] def add_nodes(self, *nodes): """ Add nodes and all connected units to the model Check that the time is the same for the model and all the units :param nodes: EnergyNode """ for node in nodes: if not isinstance(node, EnergyNode): raise TypeError('You have to add nodes from type "EnergyNode".') if node not in self._model_units_list: self._model_units_list.append(node) node.set_power_balance() for unit in node.get_connected_energy_units: if unit not in self._model_units_list: self._model_units_list.append(unit) for unit in self._model_units_list: if hasattr(unit, 'time'): if len(self.time.DATES) == len(unit.time.DATES): for i in range(len(self.time.DATES)): if not self.time.DATES[i] == unit.time.DATES[i]: raise ValueError( 'The unit {} does not operate on the ' 'same time of the model'.format( unit.name)) else: raise ValueError('The unit {} does not operate on the ' 'same time of the model'.format( unit.name)) self._add_unit_parent(unit) self._add_unit_attributes(unit) self._add_quantities() self._add_objectives(nodes[0].time) self._add_constraints(nodes[0].time)
[docs] def add_nodes_and_actors(self, *nodes_or_actors): """ Add nodes, actors and all connected units to the model Check that the time is the same for the model and all the units :param nodes_or_actors: EnergyNode or Actor type """ for node_or_actor in nodes_or_actors: if isinstance(node_or_actor, Actor): if node_or_actor not in self._model_units_list: self._model_units_list.append(node_or_actor) elif isinstance(node_or_actor, EnergyNode): if node_or_actor not in self._model_units_list: self._model_units_list.append(node_or_actor) node_or_actor.set_power_balance() for unit in node_or_actor.get_connected_energy_units: if unit not in self._model_units_list: self._model_units_list.append(unit) else: raise TypeError('You have to add actors or nodes from type ' '"Actor" or "EnergyNode".') for unit in self._model_units_list: if hasattr(unit, 'time'): if len(self.time.DATES) == len(unit.time.DATES): for i in range(len(self.time.DATES)): if not self.time.DATES[i] == unit.time.DATES[i]: raise ValueError( 'The unit {} does not operate on the ' 'same time of the model'.format( unit.name)) else: raise ValueError('The unit {} does not operate on the ' 'same time of the model'.format( unit.name)) self._add_unit_parent(unit) self._add_unit_attributes(unit) self._add_quantities() self._add_objectives(self.time) self._add_constraints(self.time)
def _add_unit_parent(self, unit: OptObject) -> None: """ If the parent of the unit is a OptObject, the method adds it to the list of units : self._model_units_list Else : nothing happens :param unit: OptObject whose parent will be added to the list of units if also a OptObject """ try: parent = getattr(unit, 'parent') if isinstance(parent, OptObject) and parent not in self._model_units_list: self._model_units_list.append(parent) except AttributeError: check_if_unit_could_have_parent(unit) def _add_unit_attributes(self, unit: OptObject) -> None: """ Adds : - The OptObject elements contained in the unit to the list of units self._model_units_list - The Quantity elements contained in the unit to the list of quantities self._model_quantities_list - The Constraint elements contained in the unit to the list of constraints self._model_constraints_list - The Objective elements contained in the unit to the list of objectives self._model_objectives_list :param unit: OptObject whose attributes (OptObject, Quantity, Constraint and Objective) will be added to the respective lists """ try: for key in list(unit.__dict__.keys()): child = getattr(unit, key) if isinstance(child, OptObject) and child not in \ self._model_units_list: self._model_units_list.append(child) elif isinstance(child, Quantity) and child not in \ self._model_quantities_list: child.parent = unit self._model_quantities_list.append(child) elif isinstance(child, Constraint) and child.active and child \ not in self._model_constraints_list: child.parent = unit self._model_constraints_list.append(child) elif isinstance(child, Objective) and child not in \ self._model_objectives_list: child.parent = unit self._model_objectives_list.append(child) except AttributeError: pass def _add_quantities(self) -> None: """ Adds all quantities as LpVariable to the list self.variables_list """ print('\n--- Adding all variables to the model ---') for quantity in self._model_quantities_list: q_name = getattr(quantity, 'name') val = getattr(quantity, 'value') opt = getattr(quantity, 'opt') vtyp = getattr(quantity, 'vtype') lb, ub = getattr(quantity, 'lb'), getattr(quantity, 'ub') try: parent = getattr(quantity, 'parent') p_name = parent.name except AttributeError: raise ValueError('Each Quantity object should have a parent') new_name = p_name + '_' + q_name self._add_quantity(q_name=new_name, q_val=val, q_type=vtyp, q_lb=lb, q_ub=ub, q_opt=opt, parent=quantity.parent) def _add_quantity(self, q_name, q_val, q_type, q_lb, q_ub, q_opt, parent=None): """ Adds a quantity as LpVariable to the list self.variables_list """ if self.verbose: print('Adding variable : {0}'.format(q_name)) # Static or dynamic bounds if isinstance(q_lb, list): lb_cst_exp = q_name + '[t] >= ' + str(q_lb) + '[t]' setattr(parent, 'set_lb', DynamicConstraint(exp_t=lb_cst_exp, name='set_lb', parent=parent)) q_lb = min(q_lb) if isinstance(q_ub, list): ub_cst_exp = q_name + '[t] <= ' + str(q_ub) + '[t]' setattr(parent, 'set_ub', DynamicConstraint(exp_t=ub_cst_exp, name='set_ub', parent=parent)) q_ub = max(q_ub) # If the values are stored in a dictionary if isinstance(q_val, dict): if any(i for i in q_opt.values()): globals()[q_name] = LpVariable.dict(name=q_name, indexs=q_val.keys(), lowBound=q_lb, upBound=q_ub, cat=q_type) else: globals()[q_name] = q_val # If the values are stored in a list elif isinstance(q_val, list): for ind, opt in enumerate(q_opt): if opt: var_name = q_name + '_{0}'.format(ind) globals()[var_name] = LpVariable(name=var_name, lowBound=q_lb, upBound=q_ub, cat=q_type) else: if ind == 0: globals()[q_name] = q_val # If the values are stored in a int/float elif isinstance(q_val, (int, float)): if q_opt: globals()[q_name] = LpVariable(name=q_name, lowBound=q_lb, upBound=q_ub, cat=q_type) else: globals()[q_name] = q_val else: raise TypeError('Value type of the quantity {0} of unit {1} ' 'is not taken into account'.format(q_name, parent.name)) def _add_constraints(self, time: TimeUnit) -> None: """ Add all constraints to the model :param time: TimeUnit """ print('\n--- Adding all constraints to the model ---') for cst in self._model_constraints_list: cst_name = cst.parent.name + '_' + cst.name cst_exp = cst.exp # Print the constraint expression if self.verbose: print('Adding constraint : {0} , exp = {1}'.format(cst_name, cst_exp)) if isinstance(cst, DynamicConstraint): loop_exp = "".join([cst.t_range, ':\n' '\ttry:\n' '\t\tself += {0}, ' ''.format(cst.exp_t), '"{0}_{1}".format(cst_name, t)\n' '\texcept TypeError:\n' '\t\twarnings.warn("Possible error")']) exec(loop_exp) else: try: # Add the constraint to the optimization model self += eval(cst_exp), cst_name # self.addConstraint(eval(cst_exp), cst_name) except TypeError: warnings.warn('Possible error') def _add_objectives(self, time: TimeUnit) -> None: """ Adds all objectives to the model :param time: TimeUnit """ print('\n--- Adding all objectives to the model ---') objective = 0 for obj in self._model_objectives_list: obj_name = obj.parent.name + '_' + obj.name obj_exp = obj.exp obj_weight = str(obj.weight) if self.verbose: print('Adding objective : {0}'.format(obj_name)) objective += eval(obj_weight + ' * (' + obj_exp + ')') self += objective, "obj_tot"
[docs] def update_units(self): """ Updates all units values with optimization results """ var_dict = self.variablesDict() for unit in self._model_units_list: print("Updating unit : {0}".format(unit.name)) for key in list(unit.__dict__.keys()): quantity = getattr(unit, key) if isinstance(quantity, Quantity): q_name = getattr(quantity, 'name') q_opt = getattr(quantity, 'opt') q_value = getattr(quantity, 'value') print("\tQuantity : {0}".format(q_name)) if isinstance(q_value, (float, int)) and q_opt: quantity.value = var_dict[ "{0}_{1}".format(unit.name, q_name)].varValue elif isinstance(q_value, list): for i, _ in enumerate(q_opt): if quantity.opt[i]: quantity.value[i] = var_dict[ "{0}_{1}_{2}".format(unit.name, q_name, i)].varValue elif isinstance(q_value, dict): for i in list(q_value.keys()): try: quantity.value[i] = var_dict[ "{0}_{1}_{2}".format(unit.name, q_name, i)].varValue except KeyError: pass
[docs] def solve_and_update(self, solver: LpSolver = None) -> None: """ Solves the optimization model and updates all variables values. :param solver: Optimization solver :type solver: LpSolver """ print("\n - - - - - RUN OPTIMIZATION - - - - - ") if solver is None: start_time = pytime.time() self.solve(solver=solver, use_mps=False) print('Resolution duration =', pytime.time() - start_time, 'seconds.') else: self.solve(solver=solver) if LpStatus[self.status] == 'Optimal': print("\n - - - - - UPDATE RESULTS - - - - - ") self.update_units() else: warnings.warn("Your optimization failed with status : {}.".format( LpStatus[self.status])) if LpStatus[self.status] == 'Infeasible': print('If you are a Gurobi user and you want to catch the ' 'source of infeasibility, please refer to the method ' '"compute_gurobi_IIS()" in general\optimation\model.') if LpStatus[self.status] == 'Not Solved': print('You can maybe try with another solver')
[docs] def get_model_constraints_list(self): """ Gets constraints of the model """ return self._model_constraints_list
[docs] def get_model_constraints_name_list(self): """ Gets the names of the constraints of the model """ constraints_name_list = [] for constraint in self._model_constraints_list: new_objective_name = constraint.parent.name + '_' + constraint.name constraints_name_list.append(new_objective_name) return constraints_name_list
[docs] def get_model_quantities_list(self): """ Gets quantities of the model """ return self._model_quantities_list
[docs] def get_model_quantities_name_list(self): """ Gets the names of the quantities of the model """ quantities_name_list = [] for quantity in self._model_quantities_list: new_objective_name = quantity.parent.name + '_' + quantity.name quantities_name_list.append(new_objective_name) return quantities_name_list
[docs] def get_model_objectives_list(self): """ Gets objectives of the model """ return self._model_objectives_list
[docs] def get_model_objectives_name_list(self): """ Gets the names of the objectives of the model """ objectives_name_list = [] for objective in self._model_objectives_list: new_objective_name = objective.parent.name + '_' + objective.name objectives_name_list.append(new_objective_name) return objectives_name_list
[docs]def check_if_unit_could_have_parent(unit): """ Checks if the unit has an associated parent :param unit: unit which parents will be checked """ import gc ref = gc.get_referrers(unit) attributes = gc.get_referents(unit)[0] for parent in ref: try: parent_name = parent['name'] if parent_name not in attributes: if not isinstance(unit, (AssemblyUnit, Actor)): warnings.warn('The unit {} seems to have as parent {} ' 'which was not declared as ' 'parent.'.format(unit.name, parent_name)) elif isinstance(unit, AssemblyUnit): prod_names = [prod.name for prod in unit.prod_units] cons_name = [cons.name for cons in unit.cons_units] rev_name = [rev.name for rev in unit.rev_units] if (parent_name in prod_names) or (parent_name in cons_name) or \ parent_name in rev_name: pass else: warnings.warn('The unit {} seems to have as ' 'parent {} ' 'which was not declared as ' 'parent.'.format(unit.name, parent_name)) except (TypeError, KeyError): pass
[docs]def compute_gurobi_IIS(gurobi_exe_path=r'C:\gurobi800\win64\bin', opt_model=None, MPS_model=None): """ Identifies the constraints in a .ilp file :param gurobi_exe_path: Path to the gurobi solver "gurobi_cl.exe" :param opt_model: OptimisationModel to whom compute IIS :param MPS_model: name of the mps model """ try: # Remove all the existing .mps in your directory os.system('"del ' + str('*.mps') + '"') except: pass if MPS_model is None: # Create the .mps in order to get the .mps if opt_model is None: raise ValueError('You should provide either the OptimisationModel ' 'or the MPS model') opt_model.solve() opt_model.writeMPS('{}.mps'.format(opt_model.name)) MPS_file = glob.glob("*.mps") else: if MPS_model[-4:] == '.mps': MPS_file = [MPS_model] else: MPS_file = [MPS_model + '.mps'] # Transform the .mps into a .ilp' cmd = '"' + gurobi_exe_path + '\gurobi_cl.exe ResultFile=IISresults.ilp ' \ + MPS_file[0] + '"' os.system(cmd)