#! usr/bin/env python3
# -*- coding: utf-8 -*-
"""
**This module enables to fill the optimization model and formulate it in
LP or MILP based on the package PuLP (LpProblem)**
..
Copyright 2018 G2Elab / MAGE
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import warnings
import glob
import collections
import time as pytime
import numpy as np
import copy
from pulp import LpProblem, LpStatus, LpVariable, lpSum
from pulp.apis import LpSolver
from ..optimisation.core import OptObject
from ..optimisation.elements import *
from ..time import TimeUnit
from ...energy.energy_nodes import EnergyNode
from ...actor.actor import Actor
from ...energy.units.energy_units import AssemblyUnit
from lpfics.lpfics import find_infeasible_constraint_set
__docformat__ = "restructuredtext en"
[docs]class OptimisationModel(LpProblem):
"""
**Description**
This class includes the optimization model formulated in LP or MILP
based on the package PuLP (LpProblem)
"""
def __init__(self, time, name='optimisation_model'):
"""
:param time:
:param name:
"""
LpProblem.__init__(self, name)
self.verbose = 1
self.noOverlap = False
self.time = time
self.quantities = collections.OrderedDict()
self._model_units_list = []
self._model_quantities_list = []
self._model_constraints_list = []
self._model_objectives_list = []
self._model_pareto_objectives_list = []
[docs] def add_nodes(self, *nodes):
"""
Add nodes and all connected units to the model
Check that the time is the same for the model and all the units
:param nodes: EnergyNode
"""
for node in nodes:
if not isinstance(node, EnergyNode):
raise TypeError(
'You have to add nodes from type "EnergyNode".')
if node not in self._model_units_list:
self._model_units_list.append(node)
node.set_power_balance()
for unit in node.get_connected_energy_units:
if unit not in self._model_units_list:
self._model_units_list.append(unit)
for unit in self._model_units_list:
if hasattr(unit, 'time'):
if len(self.time.DATES) == len(unit.time.DATES):
for i in range(len(self.time.DATES)):
if not self.time.DATES[i] == unit.time.DATES[i]:
raise ValueError(
'The unit {} does not operate on the '
'same time of the model'.format(
unit.name))
else:
raise ValueError('The unit {} does not operate on the '
'same time of the model'.format(
unit.name))
self._add_unit_parent(unit)
self._add_unit_attributes(unit)
self._add_quantities()
self._add_objectives(nodes[0].time)
self._add_constraints(nodes[0].time)
[docs] def add_nodes_and_actors(self, *nodes_or_actors):
"""
Add nodes, actors and all connected units to the model
Check that the time is the same for the model and all the units
:param nodes_or_actors: EnergyNode or Actor type
"""
init_time = pytime.time()
for node_or_actor in nodes_or_actors:
if isinstance(node_or_actor, Actor):
if node_or_actor not in self._model_units_list:
self._model_units_list.append(node_or_actor)
elif isinstance(node_or_actor, EnergyNode):
if node_or_actor not in self._model_units_list:
self._model_units_list.append(node_or_actor)
node_or_actor.set_power_balance()
for unit in node_or_actor.get_connected_energy_units:
if unit not in self._model_units_list:
self._model_units_list.append(unit)
else:
raise TypeError('You have to add actors or nodes from type '
'"Actor" or "EnergyNode".')
for unit in self._model_units_list:
if hasattr(unit, 'time'):
if len(self.time.DATES) == len(unit.time.DATES):
for i in range(len(self.time.DATES)):
if not self.time.DATES[i] == unit.time.DATES[i]:
raise ValueError(
'The unit {} does not operate on the '
'same time of the model'.format(
unit.name))
else:
raise ValueError('The unit {} does not operate on the '
'same time of the model'.format(
unit.name))
self._add_unit_parent(unit)
self._add_unit_attributes(unit)
self._add_quantities()
self._add_objectives(self.time)
self._add_constraints(self.time)
print('Generation duration =', pytime.time() - init_time,
'seconds.')
def _add_unit_parent(self, unit: OptObject) -> None:
"""
If the parent of the unit is a OptObject, the method adds it to the
list of units : self._model_units_list
Else : nothing happens
:param unit: OptObject whose parent will be added to the list of units
if also a OptObject
"""
try:
parent = getattr(unit, 'parent')
if isinstance(parent,
OptObject) and parent not in self._model_units_list:
self._model_units_list.append(parent)
except AttributeError:
check_if_unit_could_have_parent(unit)
def _add_unit_attributes(self, unit: OptObject) -> None:
"""
Adds :
- The OptObject elements contained in the unit to the list of units
self._model_units_list
- The Quantity elements contained in the unit to the list of
quantities self._model_quantities_list
- The Constraint elements contained in the unit to the list of
constraints self._model_constraints_list
- The Objective elements contained in the unit to the list of
objectives self._model_objectives_list
:param unit: OptObject whose attributes (OptObject, Quantity,
Constraint
and Objective) will be added to the respective lists
"""
try:
for key in list(unit.__dict__.keys()):
child = getattr(unit, key)
if isinstance(child,
OptObject) and child not in \
self._model_units_list:
self._model_units_list.append(child)
elif isinstance(child, Quantity) and child not in \
self._model_quantities_list:
child.parent = unit
self._model_quantities_list.append(child)
elif isinstance(child, Constraint) and child.active and child \
not in self._model_constraints_list:
child.parent = unit
self._model_constraints_list.append(child)
elif isinstance(child, Objective) and child not in \
self._model_objectives_list:
child.parent = unit
self._model_objectives_list.append(child)
except AttributeError:
pass
def _add_quantities(self) -> None:
"""
Adds all quantities as LpVariable to the list self.variables_list
"""
print('\n--- Adding all variables to the model ---')
for quantity in self._model_quantities_list:
q_name = getattr(quantity, 'name')
val = getattr(quantity, 'value')
opt = getattr(quantity, 'opt')
vtyp = getattr(quantity, 'vtype')
lb, ub = getattr(quantity, 'lb'), getattr(quantity, 'ub')
try:
parent = getattr(quantity, 'parent')
p_name = parent.name
except AttributeError:
raise ValueError('Each Quantity object should have a parent')
new_name = p_name + '_' + q_name
self._add_quantity(q_name=new_name, q_val=val, q_type=vtyp,
q_lb=lb, q_ub=ub, q_opt=opt,
parent=quantity.parent)
# Add the whole variable as an attribute of Variable
# added in the optimization model
self.quantities[new_name] = quantity
def _add_quantity(self, q_name, q_val, q_type, q_lb, q_ub, q_opt,
parent=None):
"""
Adds a quantity as LpVariable to the list self.variables_list
"""
if self.verbose:
print('Adding variable : {0}'.format(q_name))
# Static or dynamic bounds
if isinstance(q_lb, list):
lb_cst_exp = q_name + '[t] >= ' + str(q_lb) + '[t]'
setattr(parent, 'set_lb', DefinitionDynamicConstraint(
exp_t=lb_cst_exp,
name='set_lb',
parent=parent))
q_lb = min(q_lb)
if isinstance(q_ub, list):
ub_cst_exp = q_name + '[t] <= ' + str(q_ub) + '[t]'
setattr(parent, 'set_ub', DefinitionDynamicConstraint(
exp_t=ub_cst_exp,
name='set_ub',
parent=parent))
q_ub = max(q_ub)
# If the values are stored in a dictionary
if isinstance(q_val, dict):
if any(i for i in q_opt.values()):
globals()[q_name] = LpVariable.dict(name=q_name,
indexs=q_val.keys(),
lowBound=q_lb,
upBound=q_ub,
cat=q_type)
else:
globals()[q_name] = q_val
# If the values are stored in a list
elif isinstance(q_val, list):
for ind, opt in enumerate(q_opt):
if opt:
var_name = q_name + '_{0}'.format(ind)
globals()[var_name] = LpVariable(name=var_name,
lowBound=q_lb,
upBound=q_ub,
cat=q_type)
else:
if ind == 0:
globals()[q_name] = q_val
# If the values are stored in a int/float
elif isinstance(q_val, (int, float)):
if q_opt:
globals()[q_name] = LpVariable(name=q_name,
lowBound=q_lb,
upBound=q_ub,
cat=q_type)
else:
globals()[q_name] = q_val
else:
raise TypeError('Value type of the quantity {0} of unit {1} '
'is not taken into account'.format(q_name,
parent.name))
def _add_constraints(self, time: TimeUnit) -> None:
"""
Add all constraints to the model
:param time: TimeUnit
"""
print('\n--- Adding all constraints to the model ---')
for cst in self._model_constraints_list:
cst_name = cst.parent.name + '_' + cst.name
cst_exp = cst.exp
# Print the constraint expression
if self.verbose:
print('Adding constraint : {0} , exp = {1}'.format(cst_name,
cst_exp))
if isinstance(cst, DynamicConstraint):
loop_exp = "".join([cst.t_range, ':\n'
'\ttry:\n'
'\t\tself += {0}, '
''.format(cst.exp_t),
'"{0}_{1}".format(cst_name, t)\n'
'\t\tself.constraints[ "{0}_{1}".format('
'cst_name, t)].cst = cst\n'
'\texcept TypeError:\n'
'\t\twarnings.warn("Possible error")'])
exec(loop_exp)
else:
try:
# Add the constraint to the optimization model
self += eval(cst_exp), cst_name
# self.addConstraint(eval(cst_exp), cst_name)
# Add the whole constraint as an attribute of constraint
# added in the optimization model
self.constraints[cst_name].cst = cst
except TypeError:
warnings.warn('Possible error')
def _add_objectives(self, time: TimeUnit) -> None:
"""
Adds all objectives to the model
:param time: TimeUnit
"""
print('\n--- Adding all objectives to the model ---')
objective = 0
for obj in self._model_objectives_list:
obj_name = obj.parent.name + '_' + obj.name
obj_exp = obj.exp
obj_weight = str(obj.weight)
if self.verbose:
print('Adding objective : {0}'.format(obj_name))
if obj.pareto:
self._model_pareto_objectives_list.append(obj)
else:
objective += eval(obj_weight + ' * (' + obj_exp + ')')
self += objective, "obj_tot"
def _pareto_and_solve(self, solver=None, pareto_step=0.1):
"""
Solves the optimization model to produce a pareto front.
"""
pareto_models = []
time = self.time
i = 0
if pareto_step > 1 or pareto_step <= 0:
raise ValueError('The pareto step should be positive and its'
'value should be less than 1')
if len(self._model_pareto_objectives_list) == 2:
for alpha in np.arange(0.0, 1.00001, pareto_step):
i = i + 1
print("\n - - - - RUN OPTIMIZATION {0} - - - - ".format(i))
pareto_model = None
pareto_model = copy.deepcopy(self)
pareto_model.objective += \
eval(str(alpha) + ' * (' +
pareto_model._model_pareto_objectives_list[
0].exp + ')')
pareto_model.objective += \
eval(str(1 - alpha) + ' * (' +
pareto_model._model_pareto_objectives_list[
1].exp + ')')
print('{0} * {1} + {2} * {3}'.format(
alpha, pareto_model._model_pareto_objectives_list[0].exp,
1 - alpha,
pareto_model._model_pareto_objectives_list[1].exp))
if solver is None:
pareto_model.solve(solver=solver, use_mps=False)
else:
pareto_model.solve(solver=solver)
if LpStatus[pareto_model.status] != 'Optimal':
print('Your problem should have solution before launching'
'the method pareto_solve()')
else:
print("\n- - UPDATE RESULTS IN PARETO MODEL {}"
" - - ".format(i))
pareto_model.update_units()
pareto_models.append(copy.deepcopy(pareto_model))
self.pareto_models = pareto_models
else:
print('Your problem should have exactly 2 pareto objectives and '
'not {}'.format(len(self._model_pareto_objectives_list)))
[docs] def update_units(self):
"""
Updates all units values with optimization results
"""
var_dict = self.variablesDict()
for unit in self._model_units_list:
print("Updating unit : {0}".format(unit.name))
for key in list(unit.__dict__.keys()):
quantity = getattr(unit, key)
if isinstance(quantity, Quantity):
q_name = getattr(quantity, 'name')
q_opt = getattr(quantity, 'opt')
q_value = getattr(quantity, 'value')
print("\tQuantity : {0}".format(q_name))
if isinstance(q_value, (float, int)) and q_opt:
quantity.value = var_dict[
"{0}_{1}".format(unit.name, q_name)].varValue
elif isinstance(q_value, list):
for i, _ in enumerate(q_opt):
if quantity.opt[i]:
quantity.value[i] = var_dict[
"{0}_{1}_{2}".format(unit.name,
q_name, i)].varValue
elif isinstance(q_value, dict):
for i in list(q_value.keys()):
try:
quantity.value[i] = var_dict[
"{0}_{1}_{2}".format(unit.name,
q_name, i)].varValue
except KeyError:
pass
[docs] def solve_and_update(self, solver: LpSolver = None,
find_infeasible_cst_set=False, pareto_step=0.1) -> \
None:
"""
Solves the optimization model and updates all variables values.
:param solver: Optimization solver
:type solver: LpSolver
:param pareto_step: if there are pareto objectives, you can change
the step to calculate the pareto front
"""
if not self._model_pareto_objectives_list:
print("\n - - - - - RUN OPTIMIZATION - - - - - ")
if solver is None:
start_time = pytime.time()
self.solve(solver=solver, use_mps=False)
print('Resolution duration =', pytime.time() - start_time,
'seconds.')
else:
self.solve(solver=solver)
if LpStatus[self.status] == 'Optimal':
print("\n - - - - - UPDATE RESULTS - - - - - ")
self.update_units()
else:
# warnings.warn("Your optimization failed with status : {
# }.".format(
# LpStatus[self.status]))
print("\n\n/!\ Your optimization FAILED with status : {} "
"/!\.".format(
LpStatus[self.status]))
if LpStatus[self.status] == 'Infeasible':
print(
'\nIf you want to catch the source of infeasibility:\n'
'* Please download LPFICS and use the method '
'find_infeasible_constraint_set(your_model)\n'
'You can also use according to your needs:\n'
'- '
'find_definition_and_actor_infeasible_constraints_set('
'your_model)\n'
'- find_definition_and_technical_'
'infeasible_constraints_set(your_model)')
print(
'* If you are a Gurobi user, you can also refer to '
'the '
'method "compute_gurobi_IIS()" in '
'general\optimation\model.')
if find_infeasible_cst_set:
find_infeasible_constraint_set(self)
if LpStatus[self.status] == 'Not Solved':
print('You can maybe try with another solver')
else:
self._pareto_and_solve(solver=solver,
pareto_step=pareto_step)
[docs] def get_model_constraints_list(self):
""" Gets constraints of the model """
return self._model_constraints_list
[docs] def get_model_constraints_name_list(self):
""" Gets the names of the constraints of the model """
constraints_name_list = []
for constraint in self._model_constraints_list:
new_objective_name = constraint.parent.name + '_' + constraint.name
constraints_name_list.append(new_objective_name)
return constraints_name_list
[docs] def get_model_quantities_list(self):
""" Gets quantities of the model """
return self._model_quantities_list
[docs] def get_model_quantities_name_list(self):
""" Gets the names of the quantities of the model """
quantities_name_list = []
for quantity in self._model_quantities_list:
new_objective_name = quantity.parent.name + '_' + quantity.name
quantities_name_list.append(new_objective_name)
return quantities_name_list
[docs] def get_model_objectives_list(self):
""" Gets objectives of the model """
return self._model_objectives_list
[docs] def get_model_objectives_name_list(self):
""" Gets the names of the objectives of the model """
objectives_name_list = []
for objective in self._model_objectives_list:
new_objective_name = objective.parent.name + '_' + objective.name
objectives_name_list.append(new_objective_name)
return objectives_name_list
[docs]def check_if_unit_could_have_parent(unit):
""" Checks if the unit has an associated parent
:param unit: unit which parents will be checked
"""
import gc
ref = gc.get_referrers(unit)
attributes = gc.get_referents(unit)[0]
for parent in ref:
try:
parent_name = parent['name']
if parent_name not in attributes:
if not isinstance(unit, (AssemblyUnit, Actor)):
warnings.warn('The unit {} seems to have as parent {} '
'which was not declared as '
'parent.'.format(unit.name, parent_name))
elif isinstance(unit, AssemblyUnit):
prod_names = [prod.name for prod in unit.prod_units]
cons_name = [cons.name for cons in unit.cons_units]
rev_name = [rev.name for rev in unit.rev_units]
if (parent_name in prod_names) or (parent_name in
cons_name) or \
parent_name in rev_name:
pass
else:
warnings.warn('The unit {} seems to have as '
'parent {} '
'which was not declared as '
'parent.'.format(unit.name, parent_name))
except (TypeError, KeyError):
pass
[docs]def compute_gurobi_IIS(gurobi_exe_path=r'C:\gurobi810\win64\bin',
opt_model=None, MPS_model=None):
"""
Identifies the constraints in a .ilp file
:param gurobi_exe_path: Path to the gurobi solver "gurobi_cl.exe"
:param opt_model: OptimisationModel to whom compute IIS
:param MPS_model: name of the mps model
"""
try:
# Remove all the existing .mps in your directory
os.system('"del ' + str('*.mps') + '"')
except:
pass
if MPS_model is None:
# Create the .mps in order to get the .mps
if opt_model is None:
raise ValueError('You should provide either the OptimisationModel '
'or the MPS model')
opt_model.solve()
opt_model.writeMPS('{}.mps'.format(opt_model.name))
MPS_file = glob.glob("*.mps")
else:
if MPS_model[-4:] == '.mps':
MPS_file = [MPS_model]
else:
MPS_file = [MPS_model + '.mps']
# Transform the .mps into a .ilp'
cmd = '"' + gurobi_exe_path + '\gurobi_cl.exe ResultFile=IISresults.ilp ' \
+ MPS_file[0] + '"'
os.system(cmd)