Source code for omegalpes.general.utils.plots

#! usr/bin/env python3
#  -*- coding: utf-8 -*-

"""
**This module includes the following display utils:**

    - plot_node_energy_flows() : enables one to plot the energy flows
      through an EnergyNode
    - plot_energy_mix() : enables one to plot the energy flows connected to a
      node
    - plot_pareto2D() : enables one to plot a pareto front based on two
      quantities
    - plot_quantity() : enables one to plot easily a Quantity
    - plot_quantity_bar() : enables one to plot easily a Quantity as a bar
    - sum_quantities_in_quantity() : enables one to to plot several quantities
      in one once the optimisation is done

..
    Copyright 2018 G2Elab / MAGE

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
"""

from __future__ import annotations

import logging
import os
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import List, Optional, Sequence, Tuple, Union

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import plotly.graph_objects as go

from ...energy.energy_nodes import EnergyNode
from ...energy.units.consumption_units import ConsumptionUnit
from ...energy.units.production_units import ProductionUnit
from ...energy.units.storage_units import StorageUnit
from ..optimisation.elements import Quantity

__docformat__ = "restructuredtext en"
__all__ = [
    "Backend",
    "FigureOptions",
    "plot_node_energy_flows",
    "plot_energy_mix",
    "plot_pareto2D",
    "plot_quantity",
    "plot_quantity_bar",
    "sum_quantities_in_quantity",
]

log = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Types & constants
# ---------------------------------------------------------------------------

_MplFigure = matplotlib.figure.Figure
_MplAxes = matplotlib.axes.Axes

FigureType = Union[_MplFigure, go.Figure]

_PLOTLY_TEMPLATE = "plotly_white"
_PLOTLY_REF_DPI = 96  # Plotly's internal reference DPI for px ↔ inch conversion

[docs] @dataclass class FigureOptions: """ Portable figure size and PNG export settings accepted by every plotting functions in this module :param figsize: ``(width, height)`` in **inches**. * **Matplotlib** – passed directly to ``plt.subplots(figsize=...)``. * **Plotly** – converted to pixels at *dpi* and applied via ``fig.update_layout(width=..., height=...)``. ``None`` keeps each backend's own default size. :param save_png: When ``True`` the finished figure is written to disk as a PNG. Requires *save_path* to be provided. :param save_path: Destination file path for the PNG. The ``.png`` extension is appended automatically if absent. Parent directories are created on demand. :param dpi: Dots-per-inch used when **saving** the figure (both backends) and when converting *figsize* inches → Plotly pixels. Defaults to ``150`` for crisp exports. .. note:: Plotly PNG export requires the ``kaleido`` package:: pip install kaleido Examples -------- >>> opts = FigureOptions(figsize=(12, 4), save_png=True, ... save_path="out/flows.png", dpi=200) >>> fig = plot_node_energy_flows(node, fig_opts=opts) >>> fig.show() """ figsize: Optional[Tuple[float, float]] =None save_png: bool = False save_path: Optional[Union[str, Path]] = None fontsize: Optional[int] = None dpi: int = 96 def __post_init__(self) -> None: if self.save_png and self.save_path is None: raise ValueError( "FigureOptions: 'save_path' must be provided when save_png = True ." ) if self.figsize is not None: if ( len(self.figsize) != 2 or not all( isinstance(v, (int, float)) and v > 0 for v in self.figsize ) ): raise ValueError( "FigureOptions: 'figsize' must be a (width, height) tuple " "of positive numbers." ) if self.dpi <= 0: raise ValueError( "FigureOptions: 'dpi' must be a positive integer." ) if self.fontsize is not None and self.fontsize <= 0: raise ValueError("FigureOptions: 'fontsize' must be a positive integer.") @property def mpl_figsize(self) -> Optional[Tuple[float, float]]: """``figsize`` ready for ``plt.subplots(figsize=...)``, or ``None``.""" return self.figsize @property def plotly_wh(self) -> Tuple[Optional[int], Optional[int]]: """``(width_px, height_px)`` for Plotly, or ``(None, None)``.""" if self.figsize is None: return None, None w_px = int(self.figsize[0] * self.dpi) h_px = int(self.figsize[1] * self.dpi) return w_px, h_px @property def resolved_save_path(self) -> Optional[Path]: """Resolved :class:`Path` with ``.png`` suffix, or ``None``.""" if not self.save_png or self.save_path is None: return None p = Path(self.save_path) return p if p.suffix.lower() == ".png" else p.with_suffix(".png")
_DEFAULT_OPTS = FigureOptions()
[docs] class Backend(str, Enum): """Supported rendering backends.""" MATPLOTLIB = "matplotlib" PLOTLY = "plotly"
def _use_seaborn_style() -> None: """Apply seaborn styling with a graceful fallback for older Matplotlib.""" for style in ("seaborn-v0_8-whitegrid", "seaborn"): try: matplotlib.style.use(style) return except OSError: continue log.debug("No seaborn style available; using Matplotlib defaults.") def _flow_to_array(flow_value) -> np.ndarray: """Normalise a flow value (list or dict) to a NumPy array.""" if isinstance(flow_value, list): return np.asarray(flow_value, dtype=float) if isinstance(flow_value, dict): return np.asarray(list(flow_value.values()), dtype=float) raise TypeError( f"Unsupported flow value type: {type(flow_value).__name__!r}. " "Expected list or dict." ) def _quantity_to_xy(time, quantity: Quantity) -> Tuple[np.ndarray, np.ndarray]: """Extract ``(x, y)`` arrays from a :class:`Quantity`.""" v = quantity.value if isinstance(v, (list, np.ndarray)): x = np.asarray(time.I, dtype=float) * time.DT y = np.asarray(v, dtype=float) if x.shape != y.shape: x = np.arange(len(y), dtype=float) * time.DT return x, y if isinstance(v, dict): pairs = np.array(sorted(v.items()), dtype=float) return pairs[:, 0] * time.DT, pairs[:, 1] raise TypeError( f"Unsupported Quantity value type: {type(v).__name__!r}. " "Expected list, ndarray, or dict." ) def _resolve_backend(interactive: Optional[bool], fig: Optional[FigureType]) -> Backend: """ Determine the rendering backend. Priority: explicit *fig* type > *interactive* flag > Matplotlib. """ if isinstance(fig, go.Figure): return Backend.PLOTLY if isinstance(fig, matplotlib.figure.Figure): return Backend.MATPLOTLIB if interactive is True: return Backend.PLOTLY return Backend.MATPLOTLIB def _axis_labels_from_node(node: EnergyNode) -> Tuple[str, str]: """Return ``(x_label, y_label)`` matching *node*'s time resolution.""" dt = node.time.DT if dt == 1: return "Time (hours)", "Hourly mean power (kW)" if dt == 24: return "Time (days)", "Daily total energy (kWh)" return f"Time (hours), time step: {dt} hours", "Mean power (kW)" def _scalar_or_sum(quantity: Quantity, dt: float) -> float: """Return a scalar from *quantity*, summing and scaling if it is a series.""" vals = quantity.get_value() if quantity.vlen > 1: log.warning( "Quantity %r has length %d and was summed for the Pareto plot.", quantity.name, quantity.vlen, ) return float(sum(vals)) * dt return float(vals) if np.isscalar(vals) else float(vals[0]) def _quantity_title(quantity: Quantity, title: Optional[str]) -> str: if title is None: return f"Evolution of {quantity.name} over the studied period" if isinstance(title, str): return title raise TypeError(f"title must be str or None, got {type(title).__name__!r}.") # helper functions for saving a plot as png------------------------------------- def _save_mpl(fig: _MplFigure, opts: FigureOptions) -> None: """Save a Matplotlib figure to PNG when requested by *opts*.""" path = opts.resolved_save_path if path is None: return path.parent.mkdir(parents=True, exist_ok=True) fig.savefig(path, dpi=opts.dpi, bbox_inches="tight") log.info("Figure saved → %s (dpi=%d).", path, opts.dpi) def _save_plotly(fig: go.Figure, opts: FigureOptions) -> None: """Save a Plotly figure to PNG when requested by *opts*. Requires the ``kaleido`` package (``pip install kaleido``). """ path = opts.resolved_save_path if path is None: return path.parent.mkdir(parents=True, exist_ok=True) try: fig.write_image(str(path), scale=opts.dpi / _PLOTLY_REF_DPI) log.info("Figure saved → %s.", path) except Exception as exc: log.error( "Could not save Plotly figure to %s: %s. " "Install 'kaleido' to enable PNG export.", path, exc, ) # helper functions for figure layout---------------- def _apply_plotly_layout( fig: go.Figure, title: str, xlabel: Optional[str], ylabel: Optional[str], opts: FigureOptions, ) -> None: layout: dict = {"title": title, "template": _PLOTLY_TEMPLATE} if xlabel: layout["xaxis_title"] = xlabel if ylabel: layout["yaxis_title"] = ylabel w, h = opts.plotly_wh if w is not None: layout["width"] = w if h is not None: layout["height"] = h if opts.fontsize is not None: layout["font"] = dict(size=opts.fontsize) fig.update_layout(**layout) def _apply_mpl_labels( ax: _MplAxes, title: str, xlabel: Optional[str], ylabel: Optional[str], opts: FigureOptions ) -> None: ax.set_title(title) if xlabel: ax.set_xlabel(xlabel) if ylabel: ax.set_ylabel(ylabel) if opts.fontsize is not None: ax.title.set_fontsize(opts.fontsize + 2) # Make title slightly larger if xlabel: ax.xaxis.label.set_fontsize(opts.fontsize) if ylabel: ax.yaxis.label.set_fontsize(opts.fontsize) ax.tick_params(axis='both', which='major', labelsize=opts.fontsize) # If the axis has a legend, update its font size too legend = ax.get_legend() if legend: for text in legend.get_texts(): text.set_fontsize(opts.fontsize) def _ensure_mpl_fig_ax( fig: Optional[_MplFigure], ax: Optional[_MplAxes], opts: FigureOptions, ) -> Tuple[_MplFigure, _MplAxes]: """Return a validated ``(fig, ax)`` pair, applying *figsize* from *opts*.""" _use_seaborn_style() if fig is None: fig, ax = plt.subplots(figsize=opts.mpl_figsize) return fig, ax if not isinstance(fig, matplotlib.figure.Figure): raise TypeError( f"fig must be a matplotlib Figure or None, got {type(fig).__name__!r}." ) if opts.figsize is not None: fig.set_size_inches(opts.figsize) if ax is None: ax = fig.add_subplot(111) elif not isinstance(ax, matplotlib.axes.Axes): raise TypeError( f"ax must be a matplotlib Axes or None, got {type(ax).__name__!r}." ) return fig, ax
[docs] def plot_node_energy_flows(node: EnergyNode, interactive: bool =False, fig_opts: FigureOptions = _DEFAULT_OPTS): """ **Description** This function allows to plot the energy flows through an EnergyNode The display is realized : - with stacked bars for production and storage flow - with dotted lines for consumption and export flow :param node: The node whose flows are to be visualised. :type node: EnergyNode :param interactive: When True, uses Plotly; otherwise Matplotlib. :type interactive: bool :raises TypeError: If node is not an EnergyNode. **Examples:** .. code-block:: python opts = FigureOptions(figsize=(14, 5), save_png=True, save_path="results/flows.png", dpi=200) fig = plot_node_energy_flows(node, interactive=True, fig_opts=opts) fig.show() """ if not isinstance(node, EnergyNode): raise TypeError( 'The element {0} should be an EnergyNode.'.format(node)) print( "\nPreparing to plot the energy flows through the node {0}.".format( node.name)) xlabel, ylabel = _axis_labels_from_node(node) time_axis = np.asarray(node.time.I, dtype=float) * node.time.DT if interactive: return _node_flows_plotly(node, time_axis, xlabel, ylabel, fig_opts) return _node_flows_mpl(node, time_axis, xlabel, ylabel, fig_opts)
def _node_flows_plotly( node: EnergyNode, time_axis: np.ndarray, xlabel: str, ylabel: str, opts: FigureOptions) -> go.Figure: fig = go.Figure() for flow in node.get_flows: print(("\tAdding power from {0}.".format(flow.parent.name))) energy_flow = _flow_to_array(flow.value) label = flow.parent.name parent = flow.parent if isinstance(parent, ProductionUnit) or node.is_import_flow(flow): fig.add_trace( go.Bar(x=time_axis, y=energy_flow, name=label) ) elif isinstance(parent, ConsumptionUnit) or node.is_export_flow(flow): fig.add_trace( go.Scatter( x=time_axis, y=energy_flow, name=label, mode='lines+markers', line=dict(width=1.5, dash="dot"), marker=dict(size=5), ) ) elif isinstance(parent, StorageUnit): arr =-energy_flow fig.add_trace(go.Bar(x=time_axis, y=np.maximum(arr, 0), name=f"{label} discharge")) fig.add_trace(go.Bar(x=time_axis, y=np.minimum(arr, 0), name=f"{label} charge")) _apply_plotly_layout( fig, title=f"Power flow for the units connected to the node {node.name}", xlabel=xlabel, ylabel=ylabel, opts=opts, ) fig.update_layout(barmode="relative") _save_plotly(fig, opts) fig.show() def _node_flows_mpl( node: EnergyNode,time_axis: np.ndarray, xlabel: str,ylabel: str,opts: FigureOptions)-> _MplFigure: fig, ax = _ensure_mpl_fig_ax(None, None, opts) cumulative = np.zeros(node.time.LEN) bar_width = 0.8 * node.time.DT for flow in node.get_flows: print(("\tAdd power from {0}.".format(flow.parent.name))) energy_flow = _flow_to_array(flow.value) label = flow.parent.name parent = flow.parent if isinstance(parent, ProductionUnit) or node.is_import_flow(flow): ax.bar(time_axis, energy_flow, width=bar_width, bottom=cumulative, label=label) cumulative += energy_flow elif isinstance(parent, ConsumptionUnit) or node.is_export_flow(flow): ax.plot(time_axis, energy_flow, marker=".", linestyle="dotted", linewidth=1.5, markersize=10, label=label) elif isinstance(parent, StorageUnit): arr = -energy_flow discharge, charge = np.maximum(arr, 0), np.minimum(arr, 0) ax.bar(time_axis, discharge, width=bar_width, bottom=cumulative, label=f"{label} discharge") cumulative += discharge ax.bar(time_axis, charge, width=bar_width, label=f"{label} charge") _apply_mpl_labels(ax, f"Power flow for the units connected to the node {node.name}", xlabel, ylabel, opts=opts) ax.legend() _save_mpl(fig, opts) fig.show()
[docs] def plot_energy_mix(node: EnergyNode, interactive: bool=False, fig_opts: FigureOptions = _DEFAULT_OPTS): """ **Description** This function allows to plot the energy flows through an EnergyNode The display is realized : - with stacked bars for production and storage flow - with dotted lines for consumption and export flow :param node: The node whose flows are to be visualised. :type node: EnergyNode :param interactive: When True, uses Plotly; otherwise Matplotlib. :type interactive: bool :raises TypeError: If node is not an EnergyNode. :raises ValueError: If no production units are connected to *node*. **Examples:** .. code-block:: python opts = FigureOptions(figsize=(14, 5), save_png=True, save_path="results/flows.png", dpi=200) fig = plot_energy_mix(node, interactive=True, fig_opts=opts) fig.show() """ if not isinstance(node, EnergyNode): raise TypeError( 'The element {0} should be an EnergyNode.'.format(node)) labels, values, legend = [], [], [] for flow in node.get_flows: if isinstance(flow.parent, ProductionUnit): labels.append(flow.parent.name) values.append(sum(flow.value.values())) legend.append(flow.parent.name) if not values: raise ValueError( f"No production units found on node {node.name!r}. " "Cannot build an energy mix chart." ) title = f"Energy mix of the node {node.name}" if interactive: fig = go.Figure(data=[go.Pie(labels=legend, values=values, hole=.3)]) _apply_plotly_layout(fig, title, None, None, fig_opts) _save_plotly(fig, fig_opts) fig.show() return fig else: fig, ax = plt.subplots(figsize=fig_opts.mpl_figsize) plt.axis('equal') # Should be a circle ax.pie(values, labels=legend, autopct='%1.1f%%') ax.set_title(title) plt.legend() _save_mpl(fig, fig_opts) plt.show()
[docs] def plot_pareto2D(model, quantity_1, quantity_2, title=None, legend_on=True, interactive=False, fig_opts = _DEFAULT_OPTS): """ **Description** Plot a Pareto front for two quantities. Before using it, you should have added in your model two objectives with the pareto parameter activated (pareto=True) **Parameters** :param model: :param quantity_1: the first quantity for the pareto front :param quantity_2: the second quantity for the pareto front :param title: Optional figure title; auto-generated when ``None`` :param legend_on: Whether to annotate each Pareto point with its run index. :param interactive: Selects the Plotly backend when ``True``. :param fig_opts: `FigureOptions` controlling figure size and PNG export """ print("\n Preparing 2D Pareto front ") q1_key = '{0}_{1}'.format(quantity_1.parent.name, quantity_1.name) q2_key = '{0}_{1}'.format(quantity_2.parent.name, quantity_2.name) final_title = title if title else f'Pareto front between {q1_key} and {q2_key}' x_vals, y_vals, point_labels = [], [], [] for i, m in enumerate(model.pareto_models, start=1): x_vals.append(_scalar_or_sum(m.quantities[q1_key], m.time.DT)) y_vals.append(_scalar_or_sum(m.quantities[q2_key], m.time.DT)) point_labels.append(f"Optimisation {i}") if interactive: fig = go.Figure() fig.add_trace( go.Scatter( x=x_vals, y=y_vals, mode='markers+text',marker_symbol=dict(symbol="cross", size=10), text=point_labels if legend_on else None, textposition="top center",name="Pareto front", ) ) _apply_plotly_layout(fig, final_title, q1_key, q2_key, fig_opts) _save_plotly(fig, fig_opts) fig.show() # return fig else: _use_seaborn_style() fig, ax = plt.subplots(figsize=fig_opts.mpl_figsize) ax.plot(x_vals, y_vals, marker="+", linestyle="none") if legend_on: for x, y, lbl in zip(x_vals, y_vals, point_labels): ax.annotate(lbl, (x, y), textcoords="offset points", xytext=(5, 5)) _apply_mpl_labels(ax, final_title, q1_key, q2_key, opts=fig_opts) _save_mpl(fig, fig_opts) plt.show()
# return fig
[docs] def plot_quantity(time, quantity, fig=None, ax=None, color=None, label=None, xlabel=None, ylabel=None, title=None, interactive=False, fig_opts=_DEFAULT_OPTS): """ **Description** Function that plots a OMEGAlpes.general.optimisation.elements.Quantity **Parameters** - time: TimeUnit for the studied horizon as defined in general.time - quantity: OMEGAlpes.general.optimisation.elements.Quantity - fig: Figure as defined in matplotlib.pyplot.Figure - ax: axes as defined in matplotlib.pyplot.Axes - color: color of the plot - label: label for the quantity - title: title of the plot - interactive: Boolean for interactive plot - fig_opts: `FigureOptions` controlling figure size and PNG export **Returns** - arg1 the matplotlib.pyplot.Figure handle object - arg2 the matplotlib.pyplot.Axes handle object - arg3 the matplotlib.pyplot.Line2D handle object """ backend = _resolve_backend(interactive, fig) x, y = _quantity_to_xy(time, quantity) eff_label = label or quantity.name final_title = _quantity_title(quantity, title) if backend is Backend.PLOTLY: if not isinstance(fig, go.Figure): fig = go.Figure() fig.add_trace( go.Scatter(x=x, y=y, mode="lines+markers", name=eff_label, line=dict(color=color)) ) _apply_plotly_layout(fig, final_title, xlabel, ylabel, fig_opts) _save_plotly(fig, fig_opts) return fig fig, ax = _ensure_mpl_fig_ax(fig, ax, fig_opts) (line,) = ax.plot(x, y, "x-", color=color, label=eff_label) _apply_mpl_labels(ax, final_title, xlabel, ylabel, opts=fig_opts) _save_mpl(fig, fig_opts) return line, ax, fig
[docs] def plot_quantity_bar(time, quantity, fig=None, ax=None, color=None,label=None, xlabel=None, ylabel=None, title=None, interactive=False, fig_opts = _DEFAULT_OPTS): """ **Description** Function that plots a OMEGALPES.general.optimisation.elements.Quantity as a bar **Attributes** - time: TimeUnit for the studied horizon as defined in general.time - quantity: OMEGAlpes.general.optimisation.elements.Quantity - fig: Existing figure to draw into; backend is inferred from its type. - ax: Existing Matplotlib :class:`Axes` (ignored for the Plotly backend). - color: color of the plot - label: label for the quantity - title: title of the plot - interactive: Boolean for interactive plot - fig_opts: `FigureOptions` controlling figure size and PNG export **Returns** - arg1 the matplotlib.pyplot.Figure handle object - arg2 the matplotlib.pyplot.Axes handle object - arg3 the matplotlib.pyplot.Line2D handle object """ backend = _resolve_backend(interactive, fig) x, y = _quantity_to_xy(time, quantity) eff_label = label or quantity.name final_title = _quantity_title(quantity, title) if backend is Backend.PLOTLY: if not isinstance(fig, go.Figure): fig = go.Figure() fig.add_trace(go.Bar(x=x, y=y, name=eff_label, marker_color=color)) _apply_plotly_layout(fig, final_title, xlabel, ylabel, fig_opts) _save_plotly(fig, fig_opts) return fig fig, ax = _ensure_mpl_fig_ax(fig, ax, fig_opts) bar_container = ax.bar(x, y, color=color, label=eff_label) _apply_mpl_labels(ax, final_title, xlabel, ylabel, opts=fig_opts) _save_mpl(fig, fig_opts) return bar_container, ax, fig
[docs] def sum_quantities_in_quantity(quantities_list=[], tot_quantity_name='sum_quantity'): """ **Description** Function that creates a new quantity gathering several values of quantities Should be used in order to plot several quantities in one once the optimisation is done **Attributes** * quantities_list: a list of Quantities (OMEGALPES.general.optimisation.elements.Quantity) * tot_quantity_name: string : name of the new quantity **Returns** - tot_quantity: the new quantity created and filled **Raises** - ValueError: If *quantities_list* is empty, or units / lengths are inconsistent. - TypeError: If an element is not a :class:`Quantity`. """ if not quantities_list: raise ValueError("quantities_list must contain at least one Quantity.") reference = quantities_list[0] if not isinstance(reference, Quantity): raise TypeError( f"All elements must be Quantity objects; " f"got {type(reference).__name__!r} at index 0." ) ref_unit = reference.unit accumulated = np.zeros(reference.vlen, dtype=float) for idx, q in enumerate(quantities_list): if isinstance(q, Quantity): if q.unit == ref_unit: for i in range(q.vlen): accumulated[i] = accumulated[i] + q.value[i] else: TypeError("The unit {0} of the quantity {1} is not the same " "as the first quantity {2} which unit is {3}" "".format(q.unit, q.name, quantities_list[0].name, ref_unit)) else: TypeError("{0} should be a Quantity and is a {1}".format(q.name, q.__class__)) tot_quantity = Quantity(value=accumulated.tolist(), name=tot_quantity_name, unit=ref_unit, description='create a new quantity summing ' + ", ".join(q.name for q in quantities_list)) return tot_quantity