Source code for omegalpes.energy.units.energy_units

#! usr/bin/env python3
#  -*- coding: utf-8 -*-

"""
**This module defines the energy units of OMEGAlpes. The production,
consumption and storage unit will inherit from it.**

 The energy_units module defines the basic attributes and methods of an
 energy unit in OMEGAlpes.

 The class EnergyUnit includes the following attributes and quantities:
    - p : instantaneous power of the energy unit (kW)
    - p_min : minimal power (kW)
    - p_max : maximal power (kW)
    - e_tot : total energy during the time period (kWh)
    - e_min : minimal energy of the unit (kWh)
    - e_max : maximal energy of the unit (kWh)
    - u : binary describing if the unit is operating or not at t (delivering or
      consuming P)

 This module also includes the classes:
 - FixedEnergyUnit: energy unit with a fixed power profile

 - VariableEnergyUnit: energy unit with a variable power profile

 - SquareEnergyUnit: energy unit with a defined square power profile,
 inheriting from VariableEnergyUnit.

 - ShiftableEnergyUnit: energy unit with a power profile that can be time
 shifted , inheriting from VariableEnergyUnit.

 - TriangleEnergyUnit: energy unit with a defined triangular power profile,
 inheriting from VariableEnergyUnit.

 - SawtoothEnergyUnit: energy unit with a defined sawtooth power profile,
 inheriting from VariableEnergyUnit.

 - SeveralEnergyUnit: Energy unit based on a fixed power curve enabling to
  multiply several times (nb_unit) the same power curve.

 - AssemblyUnit: assembly units have at least a production unit
 and a consumption unit and they are using one or several energy types. It
 can also integrate reversible energy units. It inherits from OptObject and it
 is the parent class of ConversionUnit and ReversibleUnit.

..
    Copyright 2018 G2Elab / MAGE

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
"""

import warnings
import datetime

import numpy as np
import pandas as pd
from pulp import LpBinary, LpInteger, LpContinuous

from ..io.poles import Epole
from ...general.optimisation.elements import Quantity, Constraint, \
    DynamicConstraint, HourlyDynamicConstraint, DailyDynamicConstraint, \
    ExtDynConstraint, Objective
from ...general.optimisation.core import OptObject

__docformat__ = "restructuredtext en"


[docs]class EnergyUnit(OptObject): """ **Description** Module dedicated to the parent class (EnergyUnit) of : - production units - consumption units - storage units """ def __init__(self, time, name, flow_direction='in', p=None, p_min=-1e+4, p_max=1e+4, e_min=-1e6, e_max=1e6, starting_cost=None, operating_cost=None, min_time_on=None, min_time_off=None, max_ramp_up=None, max_ramp_down=None, co2_out=None, availability_hours=None, energy_type=None, no_warn=True, verbose=True): OptObject.__init__(self, name=name, description='Energy unit', verbose=verbose) self.parent = None self.time = time # Time unit self.energy_type = energy_type self.set_e_min = None self.set_e_max = None if isinstance(p_max, (int, float)): p_ub = max(0, p_max) # Could be 0 when turn off max_e = p_ub * time.DT * time.LEN elif isinstance(p_max, list): p_ub = [max(0, p) for p in p_max] # Could be 0 when turn off max_e = max(p_ub) * time.DT * time.LEN if isinstance(p_min, (int, float)): p_lb = min(0, p_min) # Could be 0 when turn off elif isinstance(p_min, list): p_lb = [min(0, p) for p in p_min] # Could be 0 when turn off self.p = Quantity(name='p', description='instantaneous power of the energy unit', value=p, lb=p_lb, ub=p_ub, vlen=time.LEN, unit='kW', parent=self) self.e_tot = Quantity(name='e_tot', description='total energy during the time ' 'period', lb=e_min, ub=e_max, vlen=1, unit='kWh', parent=self) self.calc_e_tot = Constraint(name='calc_e_tot', parent=self, exp='{0}_e_tot == time.DT * ' 'lpSum({0}_p[t] for t in time.I)' .format(self.name)) if isinstance(self, VariableEnergyUnit): if max_e > e_max and not isinstance(self, ShiftableEnergyUnit): if not no_warn: warnings.warn("Your EnergyUnit {0} won't be able to " "operate every time steps at its maximal " "power {1}.".format(name, max_e / (time.DT * time.LEN)), UserWarning) # CONSTRAINTS self.u = Quantity(name='u', description='indicates if the unit is operating ' 'at t', vtype=LpBinary, vlen=time.LEN, parent=self) if isinstance(p_max, (int, float)): self.on_off_max = DynamicConstraint( exp_t='{0}_p[t] <= {0}_u[t] * {p_M}'.format(self.name, p_M=p_max), t_range='for t in time.I', name='on_off_max', parent=self) elif isinstance(p_max, list): self.on_off_max = DynamicConstraint( exp_t='{0}_p[t] <= {0}_u[t] * {p_M}[t]'.format(self.name, p_M=p_max), t_range='for t in time.I', name='on_off_max', parent=self) if isinstance(p_min, (int, float)): self.on_off_min = DynamicConstraint( exp_t='{0}_p[t] >= {0}_u[t] * {p_m}'.format(self.name, p_m=p_min), t_range='for t in time.I', name='on_off_min', parent=self) elif isinstance(p_min, list): self.on_off_min = DynamicConstraint( exp_t='{0}_p[t] >= {0}_u[t] * {p_m}[t]'.format(self.name, p_m=p_min), t_range='for t in time.I', name='on_off_min', parent=self) # Poles of the energy unit self.poles = {1: Epole(self.p, flow_direction, energy_type)} self._add_opt_parameters(starting_cost, operating_cost, co2_out) self._add_ext_constraints(min_time_on, min_time_off, max_ramp_up, max_ramp_down, availability_hours) def _set_optobject_as_attr(self, optobject: OptObject, attribute_name=None): if isinstance(optobject, OptObject): if attribute_name: self.__setattr__(attribute_name, optobject) else: self.__setattr__(optobject.name, optobject) else: raise TypeError('In the add_optobject method, the object should ' 'be an OptObject class and {0} is a ' '{1}'.format(optobject, type(optobject))) # Optional parameters def _add_opt_parameters(self, starting_cost, operating_cost, co2_out): self.start_up = None self.switch_off = None self.starting_cost = None self.operating_cost = None self.co2_emissions = None # Starting cost if starting_cost is not None: self.add_starting_cost(starting_cost) # Operating cost if operating_cost is not None: self.add_operating_cost(operating_cost) # CO2 emissions if co2_out is not None: self.add_co2_emissions(co2_out) def _add_start_up(self): time = self.time # Add a variable for start up self.start_up = Quantity(name='start_up', description='The EnergyUnit is ' 'starting :1 or not :0', vtype=LpBinary, vlen=time.LEN, parent=self) # When u[t] = 0 and u[t+1] = 1, start_up[t+1] = 1 self.def_start_up = DynamicConstraint( exp_t='{0}_u[t+1] - {0}_u[t] <= ' '{0}_start_up[t+1]'.format(self.name), t_range='for t in time.I[:-1]', name='def_start_up', parent=self) # Else start_up[t+1] = 0 self.def_no_start_up = DynamicConstraint( exp_t='{0}_start_up[t+1] <= ({0}_u[t+1] - {0}_u[t]' ' + 1)/2'.format(self.name), t_range='for t in time.I[:-1]', name='def_no_start_up', parent=self) # Def initial start_up self.def_init_start_up = Constraint( exp='{0}_start_up[0] == {0}_u[0]'.format(self.name), name='def_init_start_up', parent=self) def _add_switch_off(self): time = self.time if self.start_up is None: self._add_start_up() # Add a variable for switch off self.switch_off = Quantity(name='switch_off', description='The EnergyUnit is ' 'switching off :1 or not :0', vtype=LpBinary, vlen=time.LEN, parent=self) # When u[t} = 1 and u[t+1] = 0, switch_off[t+1] = 0 self.def_switch_off = DynamicConstraint( exp_t='{0}_switch_off[t+1] == {0}_start_up[t+1] ' '+ {0}_u[t] - {0}_u[t+1]'.format(self.name), t_range='for t in time.I[:-1]', name='def_switch_off', parent=self) # Set initial switch_off to 0 self.def_init_switch_off = Constraint( exp='{0}_switch_off[0] == 0'.format(self.name), name='def_init_switch_off', parent=self)
[docs] def add_starting_cost(self, start_cost: float): """ Add a starting cost associated to the energy unit based on the value start_cost. Each time the energy unit is starting (or restarting) i.e. not functioning time t and functioning time t+1 ( When start_up[t+1] = 1 corresponding to u[t] = 0 and u[t+1] = 1) the start_cost value is added to the starting_costs. :param start_cost: float: cost corresponding to the start-up of the energy unit """ if self.starting_cost is None: if self.start_up is None: self._add_start_up() # Adding starting cost self.starting_cost = Quantity(name='starting_cost', description='Dynamic cost for the ' 'start' ' of EnergyUnit', lb=0, vlen=self.time.LEN, parent=self) # Defining how the starting cost is calculated self.calc_start_cost = DynamicConstraint( exp_t='{0}_starting_cost[t] == {1} * {0}_start_up[t]'.format( self.name, start_cost), t_range='for t in time.I[:-1]', name='calc_start_cost', parent=self) else: raise ValueError("The EnergyUnit {} already has a " "starting cost defined.".format(self.name))
[docs] def add_operating_cost(self, operating_cost: float): """ Add an operating cost associated to the energy unit based on the value operating_cost. For each time step the energy unit is running the operating_cost value is multiplied by the power production or consumption and added to the operating_costs. :param operating_cost: float: cost corresponding operation of the energy unit. To be multiplied by the power at each time step """ if self.operating_cost is None: # Adding operating cost self.operating_cost = Quantity(name='operating_cost', description='Dynamic cost for the ' 'operation ' 'of the EnergyUnit', lb=0, vlen=self.time.LEN, parent=self) if isinstance(operating_cost, (int, float)): self.calc_operating_cost = DynamicConstraint( name='calc_operating_cost', exp_t='{0}_operating_cost[t] == {1} * ' '{0}_p[t] * time.DT'.format(self.name, operating_cost), t_range='for t in time.I', parent=self) elif isinstance(operating_cost, list): if len(operating_cost) != self.time.LEN: raise IndexError( "Your operating cost should be the size of the time " "period. The time period is of {0} and your operating " "cost have a size of {1}".format(self.time.LEN, len(operating_cost))) else: self.calc_operating_cost = DynamicConstraint( name='calc_operating_cost', exp_t='{0}_operating_cost[t] == {1}[t] * ' '{0}_p[t] * time.DT'.format(self.name, operating_cost), t_range='for t in time.I', parent=self) else: raise TypeError('The operating_cost should be an int, a float ' 'or a list.') else: raise ValueError("The EnergyUnit {} already has an " "operating cost defined.".format(self.name))
[docs] def add_co2_emissions(self, co2_out: float): """ Add an CO2 emissions associated to the energy unit based on the value co2_out. For each time step the energy unit is running the co2_out value is multiplied by the power production or consumption and added to the co2_emissions of the energy unit. :param co2_out: float: co2 emissions corresponding to the operation of the energy unit. To be multiplied by the power at each time step """ if self.co2_emissions is None: # Adding CO2 emissions from production/consumption self.co2_emissions = Quantity( name='co2_emissions', description='Dynamic CO2 emissions ' 'generated by the ' 'EnergyUnit', lb=0, vlen=self.time.LEN, parent=self) if isinstance(co2_out, (int, float)): self.calc_co2_emissions = DynamicConstraint( exp_t='{0}_co2_emissions[t] == {1} * ' '{0}_p[t] * time.DT'.format(self.name, co2_out), name='calc_co2_emissions', parent=self) elif isinstance(co2_out, list): if len(co2_out) != self.time.LEN: raise IndexError( "Your CO2 emissions (CO2_out) should be the size of " "the time period. The time period is of {0} and your " "CO2 emissions have a size of {1}".format(self.time.LEN, len(co2_out))) else: self.calc_co2_emissions = DynamicConstraint( exp_t='{0}_co2_emissions[t] == {1}[t] * ' '{0}_p[t] * time.DT'.format(self.name, co2_out), name='calc_co2_emissions', parent=self) else: raise TypeError('co2_out should be an int, a float or a list.') else: raise ValueError("The EnergyUnit {} already has CO2 " "emissions defined.".format(self.name))
# External constraints def _add_ext_constraints(self, min_time_on, min_time_off, max_ramp_up, max_ramp_down, availability_hours): self.set_max_ramp_up = None self.set_max_ramp_down = None self.set_min_up_time = None self.set_min_down_time = None self.set_availability = None # Adding a maximal ramp up if max_ramp_up is not None: self.add_max_ramp_up(max_ramp_up) # Adding a maximal ramp down if max_ramp_down is not None: self.add_max_ramp_down(max_ramp_down) # Adding a minimum time on if min_time_on is not None: self.add_min_time_on(min_time_on) # Adding a minimum time off if min_time_off is not None: self.add_min_time_off(min_time_off) # Adding a number of available hours of operation if availability_hours is not None: self.add_availability(availability_hours)
[docs] def add_max_ramp_up(self, max_ramp_up: float): """ Add a maximal ramp value between two consecutive power values increasing :param max_ramp_up: float: maximal ramp value between two consecutive power values increasing """ if self.set_max_ramp_up is None: self.set_max_ramp_up = ExtDynConstraint( exp_t='{0}_p[t+1] - {0}_p[t] <= {1}'.format(self.name, max_ramp_up), t_range='for t in time.I[:-1]', name='set_max_ramp_up', parent=self) else: raise ValueError("The EnergyUnit {} already has a maximal " "ramp up defined.".format(self.name))
[docs] def add_max_ramp_down(self, max_ramp_down: float): """ Add a maximal ramp value between two consecutive power values decreasing :param max_ramp_down: float: maximal ramp value between two consecutive power values decreasing """ if self.set_max_ramp_down is None: self.set_max_ramp_down = ExtDynConstraint( exp_t='{0}_p[t] - {0}_p[t+1] <= {1}'.format(self.name, max_ramp_down), t_range='for t in time.I[:-1]', name='set_max_ramp_down', parent=self) else: raise ValueError("The EnergyUnit {} already has a maximal " "ramp down".format(self.name))
[docs] def add_min_time_on(self, min_time_on: float): """ Add a minimal time during which the energy unit should function once it is started-up :param min_time_on: float: minimal time during which the energy unit should function once it is started-up """ if self.set_min_up_time is None: if self.start_up is None: self._add_start_up() # When the unit starts, it should be on during min_time_on self.set_min_up_time = ExtDynConstraint( exp_t='{0}_u[t] >= lpSum({0}_start_up[i] for i in range(' 'max(t - {1} + 1, 0), t))'.format(self.name, min_time_on), t_range='for t in time.I', name='set_min_up_time', parent=self) else: raise ValueError("The EnergyUnit {} already has a " "minimum time on.".format(self.name))
[docs] def add_min_time_off(self, min_time_off: float): """ Add a minimal time during which the energy unit has to remain off once it is switched off :param min_time_off: float: minimal time during which the energy unit has to remain off once it is switched off """ if self.set_min_down_time is None: if self.switch_off is None: self._add_switch_off() # When the unit switches off, it should be off during min_time_off self.set_min_down_time = ExtDynConstraint( exp_t='1 - {0}_u[t] >= lpSum({0}_switch_off[i] for i in range(' 'max(t - {1} + 1, 0), t))'.format(self.name, min_time_off), t_range='for t in time.I', name='set_min_down_time', parent=self) else: raise ValueError("The EnergyUnit {} already has a " "minimum time down.".format(self.name))
[docs] def add_availability(self, av_hours: int): """ Add a number of hours of availability of the energy unit during the study period :param av_hours: int: number of hours of availability of the energy unit during the study period """ if self.set_availability is None: self.set_availability = Constraint( exp='lpSum({dt} * {name}_u[t] for t in time.I) <= ' '{av_h}'.format(dt=self.time.DT, name=self.name, av_h=av_hours), name='set_availability', parent=self) else: raise ValueError("The EnergyUnit {} already has hours of " "availability defined.".format(self.name))
[docs] def set_operating_time_range(self, operating_time_range=[[str, str]]): """ Add a range of hours during which the energy unit can be operated. The final time should be greater than the initial time within a time range, except when the final time is '00:00'. example: set_operating_time_range([['10:00', '12:00'], ['14:00', '17:00']]) :param operating_time_range: list of lists of strings in the format HH:MM [[first hour operating: str, hour to stop (not operating): str], [second hour operating: str, hour to stop (not operating): str], etc] """ # get the index of each hour for each range for time_range in range(len(operating_time_range)): (init_h_str, init_min_str) = operating_time_range[ time_range][0].split(":") (final_h_str, final_min_str) = operating_time_range[ time_range][1].split(":") init_h = int(init_h_str) init_min = int(init_min_str) final_h = int(final_h_str) final_min = int(final_min_str) start_time = datetime.datetime(year=self.time.get_days[0].year, month=self.time.get_days[ 0].month, day=self.time.get_days[0].day, hour=init_h, minute=init_min) start_index = self.time.get_index_for_date(date=start_time) if final_h != 0 or final_min != 0: end_time = datetime.datetime(year=self.time.get_days[0].year, month=self.time.get_days[0].month, day=self.time.get_days[0].day, hour=final_h, minute=final_min) end_index = self.time.get_index_for_date(date=end_time) else: end_index = 24 * 1 / self.time.DT operating_time_range[time_range] = [start_index, end_index] print('operating time range index {0}: '.format(time_range+1), operating_time_range[time_range]) # First non-operating period before the first operating time range if operating_time_range[0][0] == 0: pass else: final_time = self.time.get_date_for_index( operating_time_range[0][0] - 1) set_start_time_range = DailyDynamicConstraint( exp_t='{name}_u[t] == 0'.format(name=self.name), time=self.time, init_time='00:00', final_time="{0}:{1}".format(final_time.hour, final_time.minute), name='set_operating_init_time_range_{}'.format( operating_time_range[0][0]), parent=self) setattr(self, 'set_start_time_range_{}'.format( operating_time_range[0][0]), set_start_time_range) # Following non-operating periods between the given time range(s) if len(operating_time_range) != 1: set_time_range = [] for i in range(1, len(operating_time_range)): init_time = self.time.get_date_for_index( operating_time_range[i - 1][1]) end_time = self.time.get_date_for_index( operating_time_range[i][0] - 1) set_time_range.append(DailyDynamicConstraint( exp_t='{name}_u[t] == 0'.format(name=self.name), time=self.time, init_time="{0}:{1}".format(init_time.hour, init_time.minute), final_time="{0}:{1}".format(end_time.hour, end_time.minute), name='set_time_range_{}_{}'.format( operating_time_range[i - 1][1], operating_time_range[i][0]), parent=self)) setattr(self, 'set_time_range_{}_{}'.format( operating_time_range[i - 1][1], operating_time_range[i][0]), set_time_range[i - 1]) # Last non-operating period after the last operating time range if operating_time_range[-1][1] == int(24 * 1 / self.time.DT): pass else: init_time = self.time.get_date_for_index( operating_time_range[-1][1]) end_time = self.time.get_date_for_index( int(24 * 1 / self.time.DT - 1)) set_end_time_range = DailyDynamicConstraint( exp_t='{name}_u[t] == 0'.format(name=self.name), time=self.time, init_time="{0}:{1}".format(init_time.hour, init_time.minute), final_time="{0}:{1}".format(end_time.hour, end_time.minute), name='set_operating_final_time_range_{}'.format( operating_time_range[-1][1]), parent=self) setattr(self, 'set_end_time_range_{}'.format(operating_time_range[-1][ 1]), set_end_time_range)
[docs] def add_operating_time_range(self, operating_time_range=[[int, int]]): """ DEPRECATED : please use set_operating_time_range Add a range of hours during which the energy unit can be operated example: [[10, 12], [14, 17]] :param operating_time_range: [[first hour of functioning : int, hour to stop (not in functioning): int]] """ import warnings deprecated_msg = "DEPRECATED: add_operating_time_range is deprecated" \ " and will be removed in omegalpes version 0.4 " \ "Please use set_operating_time_range that allows to" \ " use hour and minutes in the operating_time_range " \ "definition" warnings.warn(deprecated_msg, DeprecationWarning) if operating_time_range[0][0] == 0: pass else: set_start_time_range = HourlyDynamicConstraint( exp_t='{name}_u[t] == 0'.format(name=self.name), time=self.time, init_h=0, final_h=operating_time_range[0][0] - 1, name='set_operating_init_time_range_{}'.format( operating_time_range[0][0]), parent=self) setattr(self, 'set_start_time_range_{}'.format( operating_time_range[0][0]), set_start_time_range) if len(operating_time_range) != 1: set_time_range = [] for i in range(1, len(operating_time_range)): set_time_range.append(HourlyDynamicConstraint( exp_t='{name}_u[t] == 0'.format(name=self.name), time=self.time, init_h=operating_time_range[i - 1][1], final_h=operating_time_range[i][0] - 1, name='set_time_range_{}_{}'.format( operating_time_range[i - 1][1], operating_time_range[i][0]), parent=self)) setattr(self, 'set_time_range_{}_{}'.format( operating_time_range[i - 1][1], operating_time_range[i][0]), set_time_range[i - 1]) if operating_time_range[-1][1] == 23: pass else: set_end_time_range = HourlyDynamicConstraint( exp_t='{name}_u[t] == 0'.format(name=self.name), time=self.time, init_h=operating_time_range[-1][1], final_h=23, name='set_operating_final_time_range_{}'.format( operating_time_range[0][1]), parent=self) setattr(self, 'set_end_time_range_{}'.format(operating_time_range[0][1]), set_end_time_range)
[docs] def set_energy_limits_on_time_period(self, e_min=0, e_max=None, start='YYYY-MM-DD HH:MM:SS', end='YYYY-MM-DD HH:MM:SS', period_index=None): """ Add an energy limit during a defined time period :param e_min: Minimal energy set during the time period (int or float) :param e_max: Maximal energy set during the time period (int or float) :param start: Date of start of the time period YYYY-MM-DD HH:MM:SS ( str) :param end: Date of end of the time period YYYY-MM-DD HH:MM:SS (str) """ if period_index is None: if start == 'YYYY-MM-DD HH:MM:SS': index_start = '' else: index_start = self.time.get_index_for_date(start) if end == 'YYYY-MM-DD HH:MM:SS': index_end = '' else: index_end = self.time.get_index_for_date(end) period_index = 'time.I[{start}:{end}]'.format(start=index_start, end=index_end) if e_min != 0: self.set_e_min = Constraint( exp='time.DT * lpSum({0}_p[t] for t in {1}) ' '>= {2}'.format(self.name, period_index, e_min), name='set_e_min', parent=self) if e_max is not None: if e_max > self.e_tot.ub: self.e_tot.ub = e_max self.set_e_max = Constraint( exp='time.DT * lpSum({0}_p[t] for t in {1}) ' '<= {2}'.format(self.name, period_index, e_max), name='set_e_max', parent=self)
# OBJECTIVES #
[docs] def minimize_starting_cost(self, weight=1): """ Objective to minimize the starting costs :param weight: Weight coefficient for the objective """ if self.starting_cost is not None: self.min_start_cost = Objective(name='min_start_cost', exp='lpSum({0}_starting_cost[t] ' 'for t ' 'in time.I)' .format(self.name), weight=weight, parent=self) else: raise ValueError("You should add a starting cost before trying " "to minimize_starting_cost.")
[docs] def minimize_operating_cost(self, weight=1): """ Objective to minimize the operating costs :param weight: Weight coefficient for the objective """ if self.operating_cost is not None: self.min_operating_cost = Objective( name='min_operating_cost', exp='lpSum({}_operating_cost[t] for t in time.I)'.format( self.name), weight=weight, parent=self) else: raise ValueError("You should add an operating cost before trying " "to minimize_operating_cost.")
[docs] def minimize_costs(self, weight=1): """ Objective to minimize the costs (starting and operating costs) :param weight: Weight coefficient for the objective """ if self.starting_cost is not None: self.minimize_starting_cost(weight) if self.operating_cost is not None: self.minimize_operating_cost(weight)
[docs] def minimize_energy(self, weight=1): """ Objective to minimize the energy of the energy unit :param weight: Weight coefficient for the objective """ self.min_energy = Objective(name='min_energy', exp='lpSum({0}_p[t] for t in time.I)' .format(self.name), weight=weight, parent=self)
[docs] def minimize_time_of_use(self, weight=1): """ Objective to minimize the time of running of the energy unit :param weight: Weight coefficient for the objective """ self.min_time_of_use = Objective(name='min_time_of_use', exp='lpSum({0}_u[t] for t in time.I)' .format(self.name), weight=weight, parent=self)
[docs] def minimize_co2_emissions(self, weight=1): """ Objective to minimize the co2 emissions of the energy unit :param weight: Weight coefficient for the objective """ self.min_co2_emissions = Objective(name='min_CO2_emissions', exp='lpSum({0}_co2_emissions[t] ' 'for t in time.I)'.format( self.name), weight=weight, parent=self)
[docs] def minimize_exergy_destruction(self, weight=1): """ This is the main objective of any exergetic optimization. """ if hasattr(self, 'exergy_dest'): self.min_exergy_dest = Objective(name='min_exergy_destruction', exp='lpSum({0}_exergy_dest[t] ' 'for t in time.I)'. format(self.name), weight=weight, parent=self) else: raise ValueError("You should initialize exergy calculations " "on the unit named {0} before trying to " "minimize its exergy destruction." .format(self.name))
[docs] def minimize_exergy(self, energy_unit=None, weight=1): """ Alternate objective of exergy optimization that may be interesting in some cases. """ if hasattr(self, 'exergy'): self.min_exergy = Objective(name='min_exergy', exp='lpSum({0}_exergy[t] ' 'for t in time.I)'. format(energy_unit.name), weight=weight, parent=energy_unit) else: raise ValueError("You should initialize exergy calculations " "on the unit named {0} before trying to " "minimize its exergy." .format(energy_unit.name))
[docs]class FixedEnergyUnit(EnergyUnit): """ **Description** Energy unit with a fixed power profile. **Attributes** * p : instantaneous power known by advance (kW) * energy_type : type of energy ('Electrical', 'Heat', ...) """ def __init__(self, time, name: str, p: list or dict or pd.DataFrame, flow_direction='in', starting_cost=None, operating_cost=None, co2_out=None, energy_type=None, verbose=True): if p is None: # FixedEnergyUnits should have data for the power values raise TypeError( "You have to define the power profile (p) for the " "FixedEnergyUnit !") # p may be a list, a dict or a dataframe (1 column only) if isinstance(p, list): e_tot = sum(p) * time.DT p_min = min(p) p_max = max(p) elif isinstance(p, dict): # Checking the length of the dictionary corresponds to the length # of the time unit if len(time.DATES) == len(p): e_tot = sum(p.values()) * time.DT p_min = min(p.values()) p_max = max(p.values()) elif isinstance(p, pd.DataFrame): # Checking the dataframe is composed of only one column : # - dates for the indexes # - power values for the column if len(p.columns) != 1: raise ValueError('You should have only one columns for the ' 'dataframe of the power values of the ' 'FixedEnergyUnit {0} and you have {1} ' 'columns'.format(name, len(p.columns))) # Checking that the dates of the values correspond to the date of # the study based on the time unit if len(time.DATES) == len(p.index): for i in range(len(time.DATES)): if not time.DATES[i] == p.index[i]: raise ValueError( 'The FixedEnergyUnit {0} does not operate on the ' 'same time of the model: \n' 'dates for the model time: {1} \n' 'dates for the power values: {2}'.format(name, time.DATES, p.index)) else: raise ValueError( 'The FixedEnergyUnit {0} does not operate on the ' 'same time of the model: \n' 'dates for the model time: {1} \n' 'dates for the power values: {2}'.format(name, time.DATES, p.index)) # convert the dataframe into a list of power values to # calculate e_tot label = p.columns.values p = p[label[0]].tolist() e_tot = sum(p) * time.DT p_min = min(p) p_max = max(p) else: raise TypeError( "The power profile (p) for the FixedEnergyUnit should be a " "list, a dictionary or a dataframe") EnergyUnit.__init__(self, time=time, name=name, p=p, p_min=p_min, p_max=p_max, e_min=e_tot, e_max=e_tot, flow_direction=flow_direction, starting_cost=starting_cost, operating_cost=operating_cost, min_time_on=None, min_time_off=None, max_ramp_up=None, max_ramp_down=None, co2_out=co2_out, availability_hours=None, energy_type=energy_type, verbose=verbose, no_warn=True)
[docs]class VariableEnergyUnit(EnergyUnit): def __init__(self, time, name, flow_direction='in', p_min=-1e+4, p_max=1e+4, e_min=-1e6, e_max=1e6, starting_cost=None, operating_cost=None, min_time_on=None, min_time_off=None, max_ramp_up=None, max_ramp_down=None, co2_out=None, availability_hours=None, energy_type=None, verbose=True, no_warn=True): EnergyUnit.__init__(self, time, name, flow_direction=flow_direction, p=None, p_min=p_min, p_max=p_max, e_min=e_min, e_max=e_max, starting_cost=starting_cost, operating_cost=operating_cost, min_time_on=min_time_on, min_time_off=min_time_off, max_ramp_up=max_ramp_up, max_ramp_down=max_ramp_down, co2_out=co2_out, availability_hours=availability_hours, energy_type=energy_type, verbose=verbose, no_warn=no_warn)
[docs]class SquareEnergyUnit(VariableEnergyUnit): def __init__(self, time, name, p_square, n_square, t_between_sq, t_square=1, flow_direction='in', starting_cost=None, operating_cost=None, co2_out=None, energy_type=None, verbose=True, no_warn=True): """ :param time: :param name: :param p_square: Power of the square :param n_square: Number of squares :param t_square: Duration of a square [h] :param t_between_sq: Duration between squares [h] :param flow_direction: :param e_min: :param e_max: :param starting_cost: :param operating_cost: :param min_time_on: :param min_time_off: :param max_ramp_up: :param max_ramp_down: :param co2_out: :param availability_hours: :param energy_type: """ if not isinstance(t_square, int): raise TypeError('t_squre should be an integer, but is ' 'a {}'.format(type(t_square))) energy = n_square * t_square * p_square if n_square == 1: min_time_on = None min_time_off = None av_h = t_square else: min_time_on = t_square min_time_off = t_between_sq av_h = None VariableEnergyUnit.__init__(self, time, name, flow_direction=flow_direction, p_min=p_square, p_max=p_square, e_min=energy, e_max=energy, starting_cost=starting_cost, operating_cost=operating_cost, min_time_on=min_time_on, min_time_off=min_time_off, max_ramp_up=None, max_ramp_down=None, co2_out=co2_out, availability_hours=av_h, energy_type=energy_type, verbose=verbose, no_warn=no_warn)
[docs]class ShiftableEnergyUnit(VariableEnergyUnit): """ **Description** EnergyUnit with shiftable power profile. **AttributsAttributes** * power_values : power profile to shift (kW) * mandatory : indicates if the power is mandatory (True) or not (False) * starting_cost : cost of the starting of the EnergyUnit * operating_cost : cost of the operation (€/kW) * energy_type : type of energy ('Electrical', 'Heat', ...) """ def __init__(self, time, name: str, flow_direction, power_values: list, mandatory=True, co2_out=None, starting_cost=None, operating_cost=None, energy_type=None, verbose=True): # Crop the power profile while power_values[0] == 0: power_values = power_values[1:] while power_values[-1] == 0: power_values = power_values[:-1] # Works if all values are strictly positives epsilon = 0.00001 * min(p > 0 for p in power_values) power_profile = [max(epsilon, p) for p in power_values] e_max = sum(power_profile) * time.DT if mandatory: e_min = e_max else: e_min = 0 p_min = min(power_profile) p_max = max(power_profile) VariableEnergyUnit.__init__(self, time, name=name, flow_direction=flow_direction, p_min=p_min, p_max=p_max, e_min=e_min, e_max=e_max, starting_cost=starting_cost, operating_cost=operating_cost, min_time_on=None, min_time_off=None, max_ramp_up=None, max_ramp_down=None, co2_out=co2_out, availability_hours=None, energy_type=energy_type, verbose=verbose, no_warn=True) self._add_start_up() self.power_values = Quantity(name='power_values', opt=False, value=power_values, parent=self) for i, _ in enumerate(power_values): cst_name = 'def_{}_power_value'.format(i) exp_t = "{0}_p[t] >= {0}_power_values[{1}] * " \ "{0}_start_up[t-{1}]".format(self.name, i) cst = DynamicConstraint(name=cst_name, exp_t=exp_t, t_range="for t in time.I[{}:-1]".format(i), parent=self) setattr(self, cst_name, cst)
[docs]class TriangleEnergyUnit(ShiftableEnergyUnit): def __init__(self, time, name, flow_direction, p_peak, alpha_peak, t_triangle: list, mandatory=True, starting_cost=None, operating_cost=None, co2_out=None, energy_type=None, verbose=True): if not isinstance(t_triangle, int): raise TypeError('t_triangle should be an integer, but is ' 'a {}'.format(type(t_triangle))) t_peak = alpha_peak * t_triangle if alpha_peak == 0: ramp_1 = 0 ramp_2 = - p_peak / (t_triangle - t_peak) elif alpha_peak == 1: ramp_1 = p_peak / t_peak ramp_2 = 0 else: ramp_1 = p_peak / t_peak ramp_2 = - p_peak / (t_triangle - t_peak) t = np.arange(0, t_triangle) triangle_profile = np.piecewise( t, [t <= t_peak - 1, (t_peak - 1 < t) & (t < t_peak), t_peak <= t], [lambda t: ramp_1 * (2 * t + 1) / 2, lambda t: (ramp_1 * t * (t_peak - t) + ramp_2 * (t + 1 - t_triangle) * (t + 1 - t_peak) + p_peak) / 2, lambda t: ramp_2 * (2 * t + 1 - 2 * t_triangle) / 2]) triangle_profile = [float(p) for p in triangle_profile] ShiftableEnergyUnit.__init__(self, time, name, flow_direction=flow_direction, power_values=list(triangle_profile), mandatory=mandatory, co2_out=co2_out, starting_cost=starting_cost, operating_cost=operating_cost, energy_type=energy_type, verbose=verbose, no_warn=True)
[docs]class SawtoothEnergyUnit(ShiftableEnergyUnit): def __init__(self, time, name, flow_direction, p_peak, p_low, alpha_peak, t_triangle, t_sawtooth, mandatory=True, starting_cost=None, operating_cost=None, co2_out=None, energy_type=None, verbose=True): if not isinstance(t_triangle, int): raise TypeError('t_triangle should be an integer, but is ' 'a {}'.format(type(t_triangle))) if not isinstance(t_sawtooth, int): raise TypeError('t_sawtooth should be an integer, but is ' 'a {}'.format(type(t_sawtooth))) t_peak = alpha_peak * t_triangle if alpha_peak == 0: ramp_1 = 0 ramp_2 = - p_peak / (t_triangle - t_peak) elif alpha_peak == 1: ramp_1 = p_peak / t_peak ramp_2 = 0 else: ramp_1 = p_peak / t_peak ramp_2 = - p_peak / (t_triangle - t_peak) t = np.arange(0, t_triangle) triangle_profile = np.piecewise( t, [t <= t_peak - 1, (t_peak - 1 < t) & (t < t_peak), t_peak <= t], [lambda t: ramp_1 * (2 * t + 1) / 2, lambda t: (ramp_1 * t * (t_peak - t) + ramp_2 * (t + 1 - t_triangle) * (t + 1 - t_peak) + p_peak) / 2, lambda t: ramp_2 * (2 * t + 1 - 2 * t_triangle) / 2]) for espace in range(t_sawtooth - 2 * t_triangle + 1): if (t_sawtooth - t_triangle) % (t_triangle + espace) == 0: N = int((t_sawtooth + espace) / (t_triangle + espace)) break triangle = [float(P) for P in triangle_profile] sawtooth_profile = triangle + (N - 1) * (espace * [p_low] + triangle) ShiftableEnergyUnit.__init__(self, time, name, flow_direction=flow_direction, power_values=list(sawtooth_profile), mandatory=mandatory, co2_out=co2_out, starting_cost=starting_cost, operating_cost=operating_cost, energy_type=energy_type, verbose=verbose, no_warn=True)
[docs]class SeveralEnergyUnit(VariableEnergyUnit): """ **Description** Energy unit based on a fixed power curve enabling to multiply several times (nb_unit) the same power curve. Be careful, if imaginary == True, the solution may be imaginary as nb_unit can be continuous. The accurate number of the power unit should be calculated later **Attributes** * fixed_power : fixed power curve """ def __init__(self, time, name, fixed_power, p_min=1e-5, p_max=1e+5, imaginary=False, e_min=0, e_max=1e6, nb_unit_min=0, nb_unit_max=None, flow_direction='in', starting_cost=None, operating_cost=None, max_ramp_up=None, max_ramp_down=None, co2_out=None, energy_type=None, verbose=True, no_warn=True): VariableEnergyUnit.__init__(self, time=time, name=name, flow_direction=flow_direction, p_min=p_min, p_max=p_max, e_min=e_min, e_max=e_max, starting_cost=starting_cost, operating_cost=operating_cost, min_time_on=None, min_time_off=None, max_ramp_up=max_ramp_up, max_ramp_down=max_ramp_down, co2_out=co2_out, availability_hours=None, energy_type=energy_type, verbose=verbose, no_warn=no_warn) self.power_curve = Quantity(name='power_curve', opt=False, value=fixed_power, vlen=time.LEN, parent=self) if imaginary: self.nb_unit = Quantity(name='nb_unit', opt=True, vtype=LpContinuous, lb=nb_unit_min, ub=nb_unit_max, vlen=1, parent=self) warnings.warn( 'The solution may be imaginary as nb_unit is continuous') else: self.nb_unit = Quantity(name='nb_unit', opt=True, vtype=LpInteger, lb=nb_unit_min, ub=nb_unit_max, vlen=1, parent=self) self.calc_power_with_nb_unit_cst = DynamicConstraint( exp_t='{0}_p[t] == {0}_nb_unit * {0}_power_curve[t]'.format( self.name), name='calc_power_with_nb_unit', t_range='for t in time.I', parent=self)
[docs]class AssemblyUnit(OptObject): """ **Description** Simple Assembly unit: assembly units have at least a production unit and a consumption unit and they are using one or several energy types. It can also integrate reversible energy units. It inherits from OptObject and it is the parent class of ConversionUnit and ReversibleUnit. **Attributes** * time: TimeUnit describing the studied time period * prod_units: list of the production units in the assembly unit. * cons_units: list of the consumption units in the assembly unit. * rev_units: list of the reversible units in the assembly unit. * poles: dictionary of the poles of the assembly unit """ def __init__(self, time, name, prod_units=None, cons_units=None, rev_units=None, verbose=True): OptObject.__init__(self, name=name, description='Assembly unit', verbose=verbose) from .production_units import ProductionUnit from .consumption_units import ConsumptionUnit from .reversible_units import ReversibleUnit self.time = time self.prod_units = [] # Initialize an empty list for the # production units self.cons_units = [] # Initialize an empty list for the consumption # units self.rev_units = [] # Initialize an empty list for the reversible # units self.poles = {} # Initialize an empty dictionary for the poles # An assembly unit is created with at least a production unit and a # consumption unit,or a reversible unit. # If a reversible unit is added, possibility to add (or not) # production and/or consumption units. if rev_units: if not isinstance(rev_units, list): raise TypeError('rev_units should be a list.') else: # if list or rev_units, adding rev_units for rev_unit in rev_units: # rev_units should only contain ReversibleUnit objects if not isinstance(rev_unit, ReversibleUnit): raise TypeError( 'The elements in rev_units have to be the' ' type "ReversibleUnit".') else: self._add_reversible_unit(rev_unit) # if rev_units is not None, possibility to add prod_units # (or not) if prod_units is None: pass elif not isinstance(prod_units, list): raise TypeError('prod_units should be a list') else: for prod_unit in prod_units: # prod_units should only contain ProductionUnit objects if not isinstance(prod_unit, ProductionUnit): raise TypeError( 'The elements in prod_units have to be the' ' type "ProductionUnit".') else: self._add_production_unit(prod_unit) # if rev_units is not None, possibility to add cons_units # (or not) if cons_units is None: pass elif not isinstance(cons_units, list): raise TypeError('cons_units should be a list') else: for cons_unit in cons_units: # cons_units should only contain ConsumptionUnit # objects if not isinstance(cons_unit, ConsumptionUnit): raise TypeError( 'The elements in cons_units have to be the' ' type "ConsumptionUnit".') else: self._add_consumption_unit(cons_unit) # If there is no reversible unit, the assembly unit needs at least # one consumption and one production unit. else: if not prod_units: raise IndexError( 'You have to fill at least a production unit.') elif not isinstance(prod_units, list): raise TypeError('prod_units should be a list.') else: for prod_unit in prod_units: # prod_units should only contain ProductionUnit objects if not isinstance(prod_unit, ProductionUnit): raise TypeError( 'The elements in prod_units have to be the' ' type "ProductionUnit".') else: self._add_production_unit(prod_unit) if not cons_units: raise IndexError( 'You have to fill at least a consumption unit.') elif not isinstance(cons_units, list): raise TypeError('cons_units should be a list.') else: for cons_unit in cons_units: # cons_units should only contain ConsumptionUnit if not isinstance(cons_unit, ConsumptionUnit): raise TypeError( 'The elements in cons_units have to be the' ' type "ConsumptionUnit".') else: self._add_consumption_unit(cons_unit) def _add_production_unit(self, prod_unit): """ :param prod_unit: production unit to be added to the production_units list """ if prod_unit not in self.prod_units: poles_nb = len(self.poles) self.poles[poles_nb + 1] = prod_unit.poles[1] self.prod_units.append(prod_unit) prod_unit.parent = self else: print('Production unit {0} already in the production_units ' 'list'.format(prod_unit.name)) def _add_consumption_unit(self, cons_unit): """ :param cons_unit: consumption unit to be added to the consumption_units list """ if cons_unit not in self.cons_units: poles_nb = len(self.poles) self.poles[poles_nb + 1] = cons_unit.poles[1] self.cons_units.append(cons_unit) cons_unit.parent = self else: print('Consumption unit {0} already in the consumption_units ' 'list'.format(cons_unit.name)) def _add_reversible_unit(self, rev_unit): """ :param rev_unit: reversible unit to be added to the reversible_units list """ if rev_unit not in self.rev_units: poles_nb = len(self.poles) # Adding the various poles of the reversible_unit to the # assembly_unit. for p in range(1, len(rev_unit.poles) + 1): self.poles[poles_nb + p] = rev_unit.poles[p] self.rev_units.append(rev_unit) rev_unit.parent = self else: print('Reversible unit {0} already in the reversible_units list' .format(rev_unit.name))
[docs] def minimize_exergy_destruction(self, weight=1): """ This is the main objective of any exergetic optimization. """ if hasattr(self, 'exergy_dest'): self.min_exergy_dest = Objective(name='min_exergy_destruction', exp='lpSum({0}_exergy_dest[t] ' 'for t in time.I)'. format(self.name), weight=weight, parent=self) else: raise ValueError("You should initialize exergy calculations " "on the unit named {0} before trying to " "minimize its exergy destruction." .format(self.name))