"""
Module for the construction of all the groups necessary for the proper interaction of the
power train module with the aircraft sizing modules from FAST-OAD-GA based on the power train file.
"""
# This file is part of FAST-OAD_CS23-HE : A framework for rapid Overall Aircraft Design of Hybrid
# Electric Aircraft.
# Copyright (C) 2025 ISAE-SUPAERO
import copy
import json
import logging
import sys
import os.path as pth
import pathlib
from abc import ABC
from importlib.resources import open_text
from typing import Tuple, List, Dict
import re
import numpy as np
from jsonschema import validate
from ruamel.yaml import YAML
import networkx as nx
from .exceptions import (
FASTGAHEUnknownComponentID,
FASTGAHEUnknownOption,
FASTGAHEInvalidOptionDefinition,
FASTGAHESingleSSPCAtEndOfLine,
FASTGAHEImpossiblePair,
FASTGAHEIncoherentVoltage,
FASTGAHEComponentConnectionError,
FASTGAHECriticalComponentMissingError,
FASTGAHEInputCountError,
FASTGAHEOutputCountError,
FASTGAHEComponentsNotIdentified,
)
from . import resources
_LOGGER = logging.getLogger(__name__) # Logger for this module
JSON_SCHEMA_NAME = "power_train.json"
KEY_TITLE = "title"
KEY_PT_COMPONENTS = "power_train_components"
KEY_PT_CONNECTIONS = "component_connections"
KEY_PT_WATCHER = "watcher_file_path"
PT_DATA_PREFIX = "data:propulsion:he_power_train:"
PROMOTION_FROM_MISSION = {
"thrust": "N",
"altitude": "m",
"time_step": "s",
"true_airspeed": "m/s",
"exterior_temperature": "degK",
}
# TODO: Find a more generic way to do that, as an attributes in registered_components.py maybe ?
TYPE_TO_FUEL = {
"turboshaft": "jet_fuel",
"ICE": "avgas",
"high_rpm_ICE": "avgas",
"PEMFC_stack": "hydrogen",
}
ELECTRICITY_STORAGE_TYPES = ["battery_pack"]
DEFAULT_VOLTAGE_VALUE = 737.800
[docs]
class FASTGAHEPowerTrainConfigurator:
"""
Class for the configuration of the components necessary for the performances and sizing of the
power train.
:param power_train_file_path: if provided, power train will be read directly from it
"""
_connection_check_cache = {}
def __init__(self, power_train_file_path=None):
self._power_train_file = None
self._serializer = _YAMLSerializer()
# Contains the id of the components
self._components_id = None
# Contains the position of the components
self._components_position = None
# Contains the name of the component as it will be found in the input/output file to
# contain the data. Will also be used as subsystem name
self._components_name = None
# Contains the name of the options used to provide the names in self._components_name
self._components_name_id = None
# Contains the suffix of the component added to Performances and Sizing, will be used to
# instantiate the subsystems
self._components_om_type = None
# Contains the type of the component as it will be found in the input/output file to
# contain the data
self._components_type = None
# Contains a special tag on the class of element as some may need specific assemblers to
# work such as propulsor
self._components_type_class = None
# Contains the options of the component which will be given during object instantiation
self._components_options = None
# Contains the list of aircraft inputs that are necessary to promote in the performances
# modules for the code to work
self._components_promotes = None
# Contains the list of aircraft inputs that are necessary to promote in the slipstream
# modules for the code to work
self._components_slipstream_promotes = None
# Contains the list of variables that needs to be promoted from the performances
# computations to slipstream computation
self._components_performances_to_slipstream = None
# Contains a list with, for each component, a boolean telling whether the component
# needs the flaps position for the computation of the slipstream effects
self._components_slipstream_flaps = None
# Contains a list with, for each component, a boolean telling whether the component
# lift increase is added to the wing. Will be used for the increase in induced drag
self._components_slipstream_wing_lift = None
# Contains a basic list of the connections in the power train, with no processing whatsoever
self._connection_list = None
# Contains the list of all outputs (in the OpenMDAO sense of the term) needed to make the
# connections between components
self._components_connection_outputs = None
# Contains the list of all inputs (in the OpenMDAO sense of the term) needed to make the
# connections between components
self._components_connection_inputs = None
# Contains a list, for each component, of all the variables that will be monitored in the
# performances watcher of the power train, meaning this should be a list of list
self._components_perf_watchers = None
# Contains a list, for each component, of all the variables in the slipstream computation
# that will be monitored in the performances watcher of the power train, meaning this
# should be a list of list
self._components_slipstream_perf_watchers = None
# Contains a list of all pair of components which are symmetrical on the y axis with
# respect to the fuselage center line. This is for now intended for the computation of the
# loads on the wing to avoid accounting twice for the components as the wing mass will be
# computed as twice the weight of a half-wing
self._components_symmetrical_pairs = None
# Contains the list of all boolean telling whether the components will make the
# aircraft weight vary during flight
self._components_makes_mass_vary = None
# Contains the list of all boolean telling whether the components are energy
# sources that do not make the aircraft vary (ergo they will have a non-nil unconsumable
# energy)
self._source_does_not_make_mass_vary = None
# Contains the list of an initial guess of the component's efficiency. Is used to compute
# the initial of the currents and power of each component
self._components_efficiency = None
# Contains the list of control parameters name for each component. Is used to detect
# them in cas we want to give them a different name during the mission
self._components_control_parameters = None
# Because of their very peculiar role, we will scan the architecture for any SSPC defined
# by the user and whether they are at the output of a bus, because a specific
# option needs to be turned on in this was
self._sspc_list = {}
# Contains the default state of the SSPC, will be used if other states are not specified
# as an option of the performances group
self._sspc_default_state = {}
# After construction contains a graph (graph theory) with all components and their
# connection. It will for instance allow to check if a cable has SSPC's at both its end
# or check if a propulsor is not connected to a power source, in which case, we should not
# be able to require thrust from him (will come later)
self._connection_graph = None
# Contains the results of the function that sets the power in the graphs, is declared as
# an attribute to avoid having to recompute everything
self._power_at_each_node = None
# Contains the results of the function that sets the voltage in the graphs, is declared as
# an attribute to avoid having to recompute everything
self._voltage_at_each_node = None
if power_train_file_path:
self.load(power_train_file_path)
[docs]
def load(self, power_train_file):
"""
Reads the power train definition
:param power_train_file: Path to the file to open.
"""
self._power_train_file = pth.abspath(power_train_file)
self._serializer = _YAMLSerializer()
self._serializer.read(self._power_train_file)
# Syntax validation
with open_text(resources, JSON_SCHEMA_NAME) as json_file:
json_schema = json.loads(json_file.read())
validate(self._serializer.data, json_schema)
for key in self._serializer.data:
if key not in json_schema["properties"].keys():
_LOGGER.warning('Power train file: "%s" is not a FAST-OAD-GA-HE key.', key)
[docs]
def get_watcher_file_path(self):
"""
Returns the path to where the performance watch file will be. If name is not absolute
we complete it.
"""
watcher_file_path = self._serializer.data.get(KEY_PT_WATCHER)
if watcher_file_path:
if not pth.isabs(watcher_file_path):
return pth.join(pth.dirname(self._power_train_file), watcher_file_path)
else:
return watcher_file_path
else:
return None
def _get_components(self):
# We will work under the assumption that is one list is empty, all are hence only one if
# statement. This allows us to know whether re-triggering the identification of
# components is necessary
if self._components_id is None:
self._generate_components_list()
def _generate_components_list(self):
components_list = self._serializer.data.get(KEY_PT_COMPONENTS)
components_id = []
components_position = []
components_name_id_list = []
components_type_list = []
components_om_type_list = []
components_options_list = []
components_promote_list = []
components_slip_promote_list = []
components_perf_to_slip_list = []
components_type_class_list = []
components_perf_watchers_list = []
components_slipstream_perf_watchers_list = []
components_slipstream_needs_flaps = []
components_slipstream_wing_lift = []
components_symmetrical_pairs = []
components_makes_mass_vary = []
source_does_not_make_mass_vary = []
components_efficiency = []
components_control_parameter = []
# Doing it like that allows us to have the names of the components before we start the
# loop, which I'm gonna use to check if the pairs are valid
components_name_list = list(components_list.keys())
for component_name in components_list:
component = copy.deepcopy(components_list[component_name])
component_id = component["id"]
components_id.append(component_id)
if component_id not in resources.KNOWN_ID:
raise FASTGAHEUnknownComponentID(
component_id + " is not a known ID of a power train component"
)
if "position" in component:
component_position = component["position"]
components_position.append(component_position)
else:
components_position.append("")
if "symmetrical" in component:
component_symmetrical = component["symmetrical"]
if component_symmetrical not in components_name_list:
raise FASTGAHEImpossiblePair(
"Cannot pair "
+ component_name
+ " with "
+ component_symmetrical
+ " because "
+ component_symmetrical
+ " does not exist. Valid pair choice are among the following list: "
+ ", ".join(components_name_list)
+ ". \nBest regards."
)
# We sort the pair to ensure that if the pair is already there because the
# symmetrical tag is defined twice (propeller1 is symmetrical to propeller2 and
# propeller2 is symmetrical to propeller1) it will have the same name, and we
# don't have to register it twice.
sorted_pair = sorted([component_name, component_symmetrical])
if sorted_pair not in components_symmetrical_pairs:
components_symmetrical_pairs.append(sorted_pair)
# We don't put an else because as opposed to options, we don't expected all
# components to have symmetrical tag
if component_id == "fastga_he.pt_component.dc_sspc":
# Create a dictionary with SSPC name and a tag to see if they are at bus output
# or not, it will be set at False by default but be changed later on
self._sspc_list[component_name] = False
if "options" in component.keys():
if "closed_by_default" in component["options"]:
self._sspc_default_state[component_name] = component["options"][
"closed_by_default"
]
else:
self._sspc_default_state[component_name] = True
else:
self._sspc_default_state[component_name] = True
components_name_id_list.append(resources.DICTIONARY_CN_ID[component_id])
components_type_list.append(resources.DICTIONARY_CT[component_id])
components_om_type_list.append(resources.DICTIONARY_CN[component_id])
components_promote_list.append(resources.DICTIONARY_PT[component_id])
components_slip_promote_list.append(resources.DICTIONARY_SPT[component_id])
components_perf_to_slip_list.append(resources.DICTIONARY_PTS[component_id])
components_type_class_list.append(resources.DICTIONARY_CTC[component_id])
components_perf_watchers_list.append(resources.DICTIONARY_MP[component_id])
components_slipstream_perf_watchers_list.append(resources.DICTIONARY_SMP[component_id])
components_slipstream_needs_flaps.append(resources.DICTIONARY_SFR[component_id])
components_slipstream_wing_lift.append(resources.DICTIONARY_SWL[component_id])
components_makes_mass_vary.append(resources.DICTIONARY_VARIES_MASS[component_id])
source_does_not_make_mass_vary.append(resources.DICTIONARY_VARIESN_T_MASS[component_id])
components_efficiency.append(resources.DICTIONARY_ETA[component_id])
components_control_parameter.append(resources.DICTIONARY_CTRL_PARAM[component_id])
if "options" in component.keys():
# SSPC is treated above, this way of doing things however makes no other option
# for SSPC can be set, may need to be changed
if component_id != "fastga_he.pt_component.dc_sspc":
components_options_list.append(component["options"])
# While we are at it, we also check that we have the right options and with the
# right names
if set(component["options"].keys()) != set(
resources.DICTIONARY_ATT[component_id]
):
raise FASTGAHEUnknownOption(
"Component "
+ component_id
+ " does not have all options declare or they "
"have an erroneous name. The following options should be declared: "
+ ", ".join(resources.DICTIONARY_ATT[component_id])
)
else:
components_options_list.append(None)
else:
components_options_list.append(None)
self._components_id = components_id
self._components_position = components_position
self._components_name = components_name_list
self._components_name_id = components_name_id_list
self._components_type = components_type_list
self._components_om_type = components_om_type_list
self._components_options = components_options_list
self._components_promotes = components_promote_list
self._components_slipstream_promotes = components_slip_promote_list
self._components_performances_to_slipstream = components_perf_to_slip_list
self._components_type_class = components_type_class_list
self._components_perf_watchers = components_perf_watchers_list
self._components_slipstream_perf_watchers = components_slipstream_perf_watchers_list
self._components_slipstream_flaps = components_slipstream_needs_flaps
self._components_slipstream_wing_lift = components_slipstream_wing_lift
self._components_symmetrical_pairs = components_symmetrical_pairs
self._components_makes_mass_vary = components_makes_mass_vary
self._source_does_not_make_mass_vary = source_does_not_make_mass_vary
self._components_efficiency = components_efficiency
self._components_control_parameters = components_control_parameter
def _get_connections(self):
"""
This function inspects all the connections detected in the power train file and prepare
the list necessary to do the connections in the performance file.
The _get_components method must be run beforehand.
"""
# This should do nothing if it has already been run.
self._get_components()
connections_list = self._serializer.data.get(KEY_PT_CONNECTIONS)
if not self._check_existing_instance(self._power_train_file):
self._check_connection(connections_list)
self._add_connection_check_cache_instance(self._power_train_file)
_LOGGER.info("Powertrain components' connections checked.")
self._connection_list = connections_list
# Create a dictionary to translate component name back to component_id to identify
# outputs and inputs in each case
translator = dict(zip(self._components_name, self._components_id))
openmdao_output_list = []
openmdao_input_list = []
for connection in connections_list:
# Check in case the source or target is not a string but an array, meaning we are
# dealing with a component which might have multiple inputs/outputs (bus, gearbox,
# splitter, ...)
if type(connection["source"]) is str:
source_name = connection["source"]
source_id = translator[source_name]
source_number = ""
source_inputs = resources.DICTIONARY_IN[source_id]
else:
source_name = connection["source"][0]
source_id = translator[source_name]
source_number = str(connection["source"][1])
source_inputs = resources.DICTIONARY_IN[source_id]
if type(connection["target"]) is str:
target_name = connection["target"]
target_id = translator[target_name]
target_number = ""
target_outputs = resources.DICTIONARY_OUT[target_id]
else:
target_name = connection["target"][0]
target_id = translator[target_name]
target_number = str(connection["target"][1])
target_outputs = resources.DICTIONARY_OUT[target_id]
# First we check if we are dealing with an SSPC, because of their nature explained
# more in depth in the perf_voltage_out module, they will get a special treatment.
# They will always be connected to a bus and even more, their 'input' side will
# always be connected to a bus.
# If SSPC is source and connected to a bus there should be no worries, else we need a
# special treatment since the "input" side of the SSPC should be connected to the
# bus. Same reasoning apply for splitter since they are a type of bus.
if source_id == "fastga_he.pt_component.dc_sspc" and not (
target_id == "fastga_he.pt_component.dc_bus"
or target_id == "fastga_he.pt_component.dc_splitter"
):
# We reverse the SSPC inputs and outputs
source_inputs = resources.DICTIONARY_OUT[source_id]
# Same reasoning here, we just have to reverse the SSPC inputs and outputs
elif target_id == "fastga_he.pt_component.dc_sspc" and (
source_id == "fastga_he.pt_component.dc_bus"
or source_id == "fastga_he.pt_component.dc_splitter"
):
# We reverse the SSPC outputs and input
target_outputs = resources.DICTIONARY_IN[target_id]
# Because we need to know if the SSPC is at a bus output for the model to work,
# this check is necessary
if source_id == "fastga_he.pt_component.dc_sspc" and (
target_id == "fastga_he.pt_component.dc_bus"
or target_id == "fastga_he.pt_component.dc_splitter"
):
self._sspc_list[source_name] = True
# The possibility to connect a battery or a PEMFC stack directly to a bus has been
# added. However, to make it backward compatible (whatever it means today because I
# have no users) and to impart less burden during the writing of the pt file,
# we won't ask the user to set the option accordingly, rather, we will do it here.
if (
target_id == "fastga_he.pt_component.battery_pack"
or target_id == "fastga_he.pt_component.pemfc_stack"
) and (
source_id == "fastga_he.pt_component.dc_bus"
or source_id == "fastga_he.pt_component.dc_splitter"
or source_id == "fastga_he.pt_component.dc_sspc"
):
# First we'll check if the option has already been set or no, just to avoid
# losing time
target_index = self._components_name.index(target_name)
target_option = self._components_options[target_index]
if not target_option:
self._components_options[target_index] = {"direct_bus_connection": True}
current_outputs = resources.DICTIONARY_OUT[target_id]
target_outputs = []
for current_output in current_outputs:
target_outputs.append(tuple(reversed(current_output)))
# Compressor connection for the PEMFC stack. This if condition won't be activated until
# the implementation of the compressor component.
# if (
# target_id == "fastga_he.pt_component.pemfc_stack"
# and source_id == "fastga_he.pt_component.compressor"
# ):
# # First we'll check if the option has already been set or no, just to avoid
# # losing time
#
# target_index = self._components_name.index(target_name)
# target_option = self._components_options[target_index]
#
# if not target_option:
# self._components_options[target_index] = {"compressor_connection": True}
#
# current_outputs = resources.DICTIONARY_OUT[target_id]
#
# target_outputs = []
# for current_output in current_outputs:
# target_outputs.append(tuple(reversed(current_output)))
for system_input, system_output in zip(source_inputs, target_outputs):
if system_input[0]:
if system_input[0][-1] == "_":
system_input_str = system_input[0] + source_number
else:
system_input_str = system_input[0]
if system_output[1][-1] == "_":
system_output_str = system_output[1] + target_number
else:
system_output_str = system_output[1]
openmdao_input_list.append(source_name + "." + system_input_str)
openmdao_output_list.append(target_name + "." + system_output_str)
else:
if system_input[1][-1] == "_":
system_input_str = system_input[1] + source_number
else:
system_input_str = system_input[1]
if system_output[0][-1] == "_":
system_output_str = system_output[0] + target_number
else:
system_output_str = system_output[0]
openmdao_input_list.append(target_name + "." + system_output_str)
openmdao_output_list.append(source_name + "." + system_input_str)
self._components_connection_outputs = openmdao_output_list
self._components_connection_inputs = openmdao_input_list
def _check_connection(self, connections_list):
"""
This function ensures that all the connections defined in the powertrain respect the
components connection limit. It also checks that no components are left unconnected and
ensure at least one propulsor and one energy storing device is defined.
The _get_components method must be run beforehand.
"""
# This should do nothing if it has already been run.
self._get_components()
propulsor_component = []
aux_load_component = []
energy_storage_component = []
one_to_one_component = []
connector_component = []
connector_option = []
connector_type = []
for components_name, components_options, components_type_class, components_type in zip(
self._components_name,
self._components_options,
self._components_type_class,
self._components_type,
):
if components_type_class == "propulsor":
propulsor_component.append(components_name)
elif components_type_class == "tank" or components_type_class == "source":
energy_storage_component.append(components_name)
elif "propulsive_load" in components_type_class:
one_to_one_component.append(components_name)
elif components_type_class == "load":
aux_load_component.append(components_name)
elif components_type_class == "connector":
connector_component.append(components_name)
connector_option.append(components_options)
connector_type.append(components_type)
if not propulsor_component:
raise FASTGAHECriticalComponentMissingError("Propulsor missing!")
if not energy_storage_component:
raise FASTGAHECriticalComponentMissingError("Storage tank or battery missing!")
(
one_to_one_component,
one_to_many_component,
many_to_one_component,
many_to_many_component,
many_to_many_input_count,
many_to_many_output_count,
many_to_one_input_count,
one_to_many_output_count,
option_defined_many_to_one,
option_defined_one_to_many,
) = self._categorize_connector_type_component(
one_to_one_component, connector_component, connector_option, connector_type
)
# Check component existence
if many_to_one_component:
for components_name, input_count_defined, option_defined in zip(
many_to_one_component, many_to_one_input_count, option_defined_many_to_one
):
# counter reset
input_count = 0
option_defined_string = " from the option definition" if option_defined else ""
for connection in connections_list:
if components_name in connection.get("source"):
input_count += 1
if int(input_count_defined) != input_count:
raise FASTGAHEInputCountError(
f"Component {components_name} defines {input_count_defined} inputs"
+ option_defined_string
+ f", but "
f"{input_count} input(s) is/are listed in the connection section"
)
# Check component existence
if one_to_many_component:
for components_name, output_count_defined, option_defined in zip(
one_to_many_component, one_to_many_output_count, option_defined_one_to_many
):
# counter reset
output_count = 0
option_defined_string = " from the option definition" if option_defined else ""
for connection in connections_list:
if components_name in connection.get("target"):
output_count += 1
if int(output_count_defined) != output_count:
raise FASTGAHEOutputCountError(
f"Component {components_name} defines {output_count_defined} outputs"
+ option_defined_string
+ f", but "
f"{output_count} output(s) is/are listed in the connection section"
)
# Check component existence
if many_to_many_component:
for components_name, input_count_defined, output_count_defined in zip(
many_to_many_component, many_to_many_input_count, many_to_many_output_count
):
input_count = 0
output_count = 0
for connection in connections_list:
if components_name in connection.get("source"):
input_count += 1
elif components_name in connection.get("target"):
output_count += 1
if int(input_count_defined) != input_count:
raise FASTGAHEInputCountError(
f"Component {components_name} defines {input_count_defined} inputs from "
f"the option definition, but {output_count} input(s) is/are listed in the "
f"connection section"
)
if int(output_count_defined) != output_count:
raise FASTGAHEOutputCountError(
f"Component {components_name} defines {output_count_defined} outputs from "
f"the option definition, but {output_count} output(s) is/are listed in the "
f"connection section"
)
# Check typo or undefined component
for connection in connections_list:
source_name = (
connection["source"][0]
if type(connection["source"]) is list
else connection.get("source")
)
target_name = (
connection["target"][0]
if type(connection["target"]) is list
else connection.get("target")
)
if source_name not in self._components_name or target_name not in self._components_name:
if source_name not in self._components_name:
raise FASTGAHEComponentsNotIdentified(
f"{source_name} is not defined as a component!"
)
else:
raise FASTGAHEComponentsNotIdentified(
f"{target_name} is not defined as a component!"
)
# Check if there is any component missing in connection
for components_name in self._components_name:
if components_name not in propulsor_component + aux_load_component and not any(
components_name in connection.get("target") for connection in connections_list
):
raise FASTGAHEComponentConnectionError(f"{components_name} is missing as output!")
if components_name not in energy_storage_component and not any(
components_name in connection.get("source") for connection in connections_list
):
raise FASTGAHEComponentConnectionError(f"{components_name} is missing as input!")
def _categorize_connector_type_component(
self, one_to_one_component, connector_names, connector_options, connector_type
):
"""
This function categorizes the components in the connector component type class according
to their number of input and output connections, the generator and turbo_generator are
exceptions that are energy source component but categorized as connector for the
powertrain component registry. This only applies in the _check_connection function.
"""
one_to_many_component = []
many_to_one_component = []
many_to_many_component = []
many_to_one_input_count = []
one_to_many_output_count = []
many_to_many_input_count = []
many_to_many_output_count = []
option_defined_one_to_many = []
option_defined_many_to_one = []
for name, options, type in zip(connector_names, connector_options, connector_type):
defined_multi_connection_exists = options is not None and any(
key.startswith("number_of_") for key in options.keys()
)
if defined_multi_connection_exists:
# This is for the connectors having given input and output numbers from pt file
# TODO: A concise way to define the input/output number option list
input_options = ["number_of_inputs", "number_of_tanks"]
output_options = [
"number_of_outputs",
"number_of_engines",
"number_of_power_sources",
]
for option, num in zip(options.keys(), options.values()):
if option in input_options:
num_input = int(num)
if num_input < num or num_input <= 0:
raise FASTGAHEInvalidOptionDefinition(
f"{num} is invalid as input option value, only positive integers "
f"are allowed"
)
elif option in output_options:
num_output = int(num)
if num_output < num or num_output <= 0:
raise FASTGAHEInvalidOptionDefinition(
f"{num} is invalid as output option value, only positive integers "
f"are allowed"
)
# First check if there is any side having multiple connection
if num_input > 1 or num_output > 1:
# This is to identify many-to-many component
if num_input > 1 and num_output > 1:
many_to_many_component.append(name)
many_to_many_input_count.append(num_input)
many_to_many_output_count.append(num_output)
# This is to identify many-to-one component
elif num_input > 1:
many_to_one_component.append(name)
many_to_one_input_count.append(num_input)
option_defined_many_to_one.append(True)
# This is to identify one-to-many component
else:
one_to_many_component.append(name)
one_to_many_output_count.append(num_output)
option_defined_one_to_many.append(True)
else:
one_to_one_component.append(name)
else:
if type == "DC_splitter" or type == "planetary_gear":
many_to_one_component.append(name)
many_to_one_input_count.append(2)
option_defined_many_to_one.append(False)
elif type == "gearbox":
one_to_many_component.append(name)
one_to_many_output_count.append(2)
option_defined_one_to_many.append(False)
else:
one_to_one_component.append(name)
return (
one_to_one_component,
one_to_many_component,
many_to_one_component,
many_to_many_component,
many_to_many_input_count,
many_to_many_output_count,
many_to_one_input_count,
one_to_many_output_count,
option_defined_many_to_one,
option_defined_one_to_many,
)
def _construct_connection_graph(self):
graph = nx.Graph()
for component in self._components_name:
graph.add_node(component)
for connection in self._connection_list:
# When the component is connected to a bus, the output number is also specified but it
# isn't meaningful when drawing a graph, so we will just filter it
if type(connection["source"]) is list:
source = connection["source"][0]
else:
source = connection["source"]
if type(connection["target"]) is list:
target = connection["target"][0]
else:
target = connection["target"]
graph.add_edge(source, target)
self._connection_graph = graph
[docs]
def get_distance_from_propulsive_load(self):
propulsor_name = []
propulsive_load_names = []
# First and for reason that will appear clear later, we get a list of propulsor
for component_type_class, component_name in zip(
self._components_type_class, self._components_name
):
if "propulsor" in component_type_class:
propulsor_name.append(component_name)
self._construct_connection_graph()
graph = self._connection_graph
# We now get a list of propulsive loads. Because of the use envisioned for this function (
# mostly post-processing), the ICE won't be considered a propulsive load if no propulsor
# is attached to it
for component_type_class, component_name in zip(
self._components_type_class, self._components_name
):
if "propulsive_load" == [component_type_class]:
propulsive_load_names.append(component_name)
# This case will correspond to ICE/turbomachinery
elif "propulsive_load" in component_type_class:
# Check whether or not at least one neighbor is a propulsor
neighbors = list(graph.neighbors(component_name))
if set(neighbors).intersection(propulsor_name):
propulsive_load_names.append(component_name)
# If there are gearboxes among neighbor, we also check for the neighbor of the
# gearbox. Not very generic way to do things :/
else:
name_to_id = dict(zip(self._components_name, self._components_id))
for neighbor in set(neighbors):
if (
name_to_id[neighbor] == "fastga_he.pt_component.speed_reducer"
or name_to_id[neighbor] == "fastga_he.pt_component.planetary_gear"
or name_to_id[neighbor] == "fastga_he.pt_component.gearbox"
):
neighbors_gb = list(graph.neighbors(neighbor))
if set(neighbors_gb).intersection(propulsor_name):
propulsive_load_names.append(component_name)
distance_from_propulsive_load = {}
connections_length_between_nodes = dict(nx.all_pairs_shortest_path_length(graph))
for component_name in self._components_name:
# When there are two separate sub propulsion chain in the same propulsion file,
# these line will cause an issue because, as it will browse all propulsive load he
# will attempt to reach loads he is not connected to and therefore not in
# connections_length_between_nodes. So first we must make sure to only browse
# connected loads.
connected_components = list(connections_length_between_nodes[component_name].keys())
connected_propulsive_loads = list(
set(propulsive_load_names) & set(connected_components)
)
min_distance = np.inf
for prop_load in connected_propulsive_loads:
distance_to_load = connections_length_between_nodes[component_name][prop_load]
if distance_to_load < min_distance:
min_distance = distance_to_load
distance_from_propulsive_load[component_name] = min_distance
return distance_from_propulsive_load, propulsive_load_names
[docs]
def get_distance_from_propulsor(self):
propulsor_names = []
# First and for reason that will appear clear later, we get a list of propulsor
for component_type_class, component_name in zip(
self._components_type_class, self._components_name
):
if "propulsor" in component_type_class:
propulsor_names.append(component_name)
self._construct_connection_graph()
graph = self._connection_graph
distance_from_propulsor = {}
connections_length_between_nodes = dict(nx.all_pairs_shortest_path_length(graph))
for component_name in self._components_name:
connected_components = list(connections_length_between_nodes[component_name].keys())
connected_propulsors = list(set(propulsor_names) & set(connected_components))
min_distance = np.inf
for prop in connected_propulsors:
distance_to_load = connections_length_between_nodes[component_name][prop]
if distance_to_load < min_distance:
min_distance = distance_to_load
distance_from_propulsor[component_name] = min_distance
return distance_from_propulsor
[docs]
def get_distance_from_energy_storage(self):
energy_storage_names = []
# First and for reason that will appear clear later, we get a list of energy storage
# component
for component_type, component_name in zip(self._components_type, self._components_name):
if (
"gaseous_hydrogen_tank" in component_type
or "fuel_tank" in component_type
or "battery_pack" in component_type
):
energy_storage_names.append(component_name)
self._construct_connection_graph()
graph = self._connection_graph
distance_from_energy_storage = {}
connections_length_between_nodes = dict(nx.all_pairs_shortest_path_length(graph))
for component_name in self._components_name:
connected_components = list(connections_length_between_nodes[component_name].keys())
connected_energy_storages = list(set(energy_storage_names) & set(connected_components))
min_distance = np.inf
for storage in connected_energy_storages:
distance_to_storage = connections_length_between_nodes[component_name][storage]
if distance_to_storage < min_distance:
min_distance = distance_to_storage
distance_from_energy_storage[component_name] = min_distance
return distance_from_energy_storage
[docs]
def reorder_components(self, *lists):
"""
Reorders components by their distance from the nearest propeller and assigns proper
sequential indices. Takes multiple property lists where the first list contains component
names/keys, and reorders all lists according to the distance-based mapping. This improves
robustness by ensuring that variables are updated in a correct order for each run.
:param *lists: Variable number of property lists to be reordered. The first list should
contain component names/keys that correspond to keys in the distance_from_propulsor
dictionary. All subsequent lists will be reordered according to the same mapping.
:return: tuple: All input lists reordered according to distance from propulsor, maintaining
the same order and count as input lists.
"""
# Sort items by value first, then by original key order to maintain consistency
distance_from_prop = self.get_distance_from_propulsor()
sorted_items = sorted(distance_from_prop.items(), key=lambda x: (x[1], x[0]))
# Create new dictionary with proper sequential indices
reindexed_dict = {}
for index, (key, original_value) in enumerate(sorted_items):
reindexed_dict[key] = index
# Reorder other property lists using the same mapping
reordered_lists = []
for lst in lists:
reordered = [None] * len(lst)
for old_pos, key in enumerate(lists[0]):
new_pos = reindexed_dict[key]
reordered[new_pos] = lst[old_pos]
reordered_lists.append(reordered)
return tuple(reordered_lists)
[docs]
def check_sspc_states(self, declared_state):
self._construct_connection_graph()
graph = self._connection_graph
components_to_check = {}
name_to_id = dict(zip(self._components_name, self._components_id))
# For now we will only check cable that have SSPC on both ends
for component_id, component_name in zip(self._components_id, self._components_name):
if component_id == "fastga_he.pt_component.dc_line":
neighbors = graph.adj[component_name]
# If component is a dc line, check that it has neighbors, then check if one at
# least one of those is an sspc
if neighbors:
sspc_neighbors = []
for neighbor in neighbors:
if name_to_id[neighbor] == "fastga_he.pt_component.dc_sspc":
sspc_neighbors.append(neighbor)
if len(sspc_neighbors) == 1:
raise FASTGAHESingleSSPCAtEndOfLine(
"Line " + component_name + " is connected to a single SSPC, this will "
"work as long as the SSPC is closed, but won't allow to open it"
)
# If there are no SSPC neighbor, no need to check the harness
elif len(sspc_neighbors) == 2:
components_to_check[component_name] = sspc_neighbors
# For all case to check, see the default state that was given to them and change it if
# need be
actual_state = copy.deepcopy(declared_state)
if components_to_check:
for component_to_check in components_to_check:
front_end, back_end = components_to_check[component_to_check]
# If both are in a different state raise a warning and change both of them to
# False (circuit open)
if declared_state[front_end] ^ declared_state[back_end]:
_LOGGER.warning(
"SSPCs " + front_end + " and " + back_end + " should be in "
"the same state, they are thus forced at open"
)
actual_state[front_end] = False
actual_state[back_end] = False
return actual_state
[docs]
def get_sizing_element_lists(self) -> tuple:
"""
Returns the list of parameters necessary to create the sizing group based on what is
inside the power train file.
"""
self._get_components()
return (
self._components_name,
self._components_name_id,
self._components_type,
self._components_om_type,
self._components_options,
self._components_position,
)
[docs]
def get_slipstream_element_lists(self) -> tuple:
"""
Returns the list of parameters necessary to create the slipstream group based on what is
inside the power train file.
"""
self._get_components()
return (
self._components_name,
self._components_name_id,
self._components_type,
self._components_om_type,
self._components_slipstream_promotes,
self._components_slipstream_flaps,
self._components_slipstream_wing_lift,
)
[docs]
def get_control_parameter_list(self) -> List[str]:
"""
Returns the list of control parameters of the components inside the powertrain.
"""
self._get_components()
# Because we might want different thrust distribution for mission and landing. As the
# default is always an array of one for that variable it shouldn't cause any problem.
ctrl_param_list = ["data:propulsion:he_power_train:thrust_distribution"]
for comp_name, comp_type, comp_ctrl_params in zip(
self._components_name, self._components_type, self._components_control_parameters
):
for comp_ctrl_param in comp_ctrl_params:
ctrl_param_name = (
PT_DATA_PREFIX + comp_type + ":" + comp_name + ":" + comp_ctrl_param
)
ctrl_param_list.append(ctrl_param_name)
return ctrl_param_list
[docs]
@staticmethod
def enforce_sspc_last(
components_name: list,
components_name_id: list,
components_om_type: list,
components_options: list,
components_promotes: list,
) -> Tuple[list, list, list, list, list]:
"""
It turns out that the SSPC can cause a bit of a mess when connected to cable, because,
as one side is computed and the other not, this might create huge current which will more
often than not prevent the code from converging. A solution found was to make it so that
the SSPC are always computed last in the performances. So far it works.
:param components_name: list that contains the name of the component as it will be found
in the input/output file
:param components_name_id: list that contains the name of the options used to provide the
names in self._components_name
:param components_om_type: list that contains the suffix of the component added to
Performances and Sizing
:param components_options: list that contains the options of the components
:param components_promotes: list that contains the list of aircraft inputs that are
necessary to promote in the performances modules for the code to work
"""
sspc_list_components_name = []
sspc_list_components_name_id = []
sspc_list_components_om_type = []
sspc_list_components_options = []
sspc_list_components_promotes = []
other_components_name = []
other_components_name_id = []
other_components_om_type = []
other_components_options = []
other_components_promotes = []
for (
component_name,
component_name_id,
component_om_type,
component_options,
component_promotes,
) in zip(
components_name,
components_name_id,
components_om_type,
components_options,
components_promotes,
):
if resources.DICTIONARY_CN_ID["fastga_he.pt_component.dc_sspc"] in component_name_id:
sspc_list_components_name.append(component_name)
sspc_list_components_name_id.append(component_name_id)
sspc_list_components_om_type.append(component_om_type)
sspc_list_components_options.append(component_options)
sspc_list_components_promotes.append(component_promotes)
else:
other_components_name.append(component_name)
other_components_name_id.append(component_name_id)
other_components_om_type.append(component_om_type)
other_components_options.append(component_options)
other_components_promotes.append(component_promotes)
components_name = other_components_name + sspc_list_components_name
components_name_id = other_components_name_id + sspc_list_components_name_id
components_om_type = other_components_om_type + sspc_list_components_om_type
components_options = other_components_options + sspc_list_components_options
components_promotes = other_components_promotes + sspc_list_components_promotes
return (
components_name,
components_name_id,
components_om_type,
components_options,
components_promotes,
)
[docs]
def get_mass_element_lists(self) -> list:
"""
Returns the list of OpenMDAO variables necessary to create the component which computes
the mass of the power train.
"""
self._get_components()
variable_names = []
for component_type, component_name in zip(self._components_type, self._components_name):
variable_names.append(PT_DATA_PREFIX + component_type + ":" + component_name + ":mass")
return variable_names
[docs]
def get_cg_element_lists(self) -> list:
"""
Returns the list of OpenMDAO variables necessary to create the component which computes
the center of gravity of the power train.
"""
self._get_components()
variable_names_cg = []
for component_type, component_name in zip(self._components_type, self._components_name):
variable_names_cg.append(
PT_DATA_PREFIX + component_type + ":" + component_name + ":CG:x"
)
return variable_names_cg
[docs]
def get_drag_element_lists(self) -> Tuple[list, list]:
"""
Returns the list of OpenMDAO variables necessary to create the component which computes
the drag of the power train. Will return both the name of high speed and low speed drag
as they are meant to be used once in the same component
"""
self._get_components()
variable_names_drag_ls = []
variable_names_drag_cruise = []
for component_type, component_name in zip(self._components_type, self._components_name):
variable_names_drag_ls.append(
PT_DATA_PREFIX + component_type + ":" + component_name + ":low_speed:CD0"
)
variable_names_drag_cruise.append(
PT_DATA_PREFIX + component_type + ":" + component_name + ":cruise:CD0"
)
return variable_names_drag_ls, variable_names_drag_cruise
[docs]
def get_thrust_element_list(self) -> list:
"""
Returns the list of OpenMDAO variables necessary to create the component which computes
the repartition of thrust among propellers.
"""
self._get_components()
components_names = []
for component_type_class, component_name in zip(
self._components_type_class, self._components_name
):
if "propulsor" in component_type_class:
components_names.append(component_name)
return components_names
[docs]
def get_propulsive_element_list(self) -> tuple:
"""
Returns the list of OpenMDAO variables necessary to create the component which computes
the ratio between the required power and the max available power on propulsive loads.
"""
self._get_components()
components_names = []
components_types = []
for component_type_class, component_name, component_type in zip(
self._components_type_class, self._components_name, self._components_type
):
if "propulsive_load" in component_type_class:
components_names.append(component_name)
components_types.append(component_type)
return components_names, components_types
[docs]
def get_energy_consumption_list(self) -> list:
"""
Returns the list of OpenMDAO variables necessary to create the component which sum the
contribution of each source to the global energy consumption.
"""
self._get_components()
components_names = []
for component_type_class, component_name in zip(
self._components_type_class, self._components_name
):
if "source" in component_type_class:
components_names.append(component_name)
return components_names
[docs]
def get_fuel_tank_list(self) -> Tuple[list, list]:
"""
Returns the list of components inside the power train which may cause the CG to shift
during flight because of a varying mass (but a constant position)
"""
self._get_components()
components_names = []
components_types = []
for component_type_class, component_name, component_type in zip(
self._components_type_class, self._components_name, self._components_type
):
if "tank" in component_type_class:
components_names.append(component_name)
components_types.append(component_type)
return components_names, components_types
[docs]
def get_fuel_tank_list_and_fuel(self) -> Tuple[list, list, list]:
"""
Returns the list of components inside the power train which contain fuel and what type of
fuel they contain. To do so we'll analyse the source they are connected to.
"""
fuel_tanks_names, fuel_tanks_types = self.get_fuel_tank_list()
source_names = self.get_energy_consumption_list()
name_to_type = dict(zip(self._components_name, self._components_type))
fuel_types = []
connected_graphs = self.get_connection_graph()
for fuel_tank_name in fuel_tanks_names:
for connected_graph in connected_graphs:
if fuel_tank_name not in list(connected_graph.nodes):
continue
else:
# We check which sources is the closest neighbor
distance_closest_source = np.inf
distances_in_graph = dict(nx.all_pairs_shortest_path_length(connected_graph))
distances_to_tank = distances_in_graph[fuel_tank_name]
for source_name in source_names:
if source_name in distances_to_tank:
if distances_to_tank[source_name] < distance_closest_source:
closest_source = source_name
distance_closest_source = distances_to_tank[source_name]
# I trust that there will always be at least one source connected to tank.
# I shouldn't
# But I do
fuel_types.append(TYPE_TO_FUEL[name_to_type[closest_source]])
return fuel_tanks_names, fuel_tanks_types, fuel_types
[docs]
def get_electricity_storage_list(self) -> Tuple[list, list]:
"""
Returns the list of electricity storage components inside the power train.
"""
self._get_components()
components_names = []
components_types = []
for component_id, component_name, component_type in zip(
self._components_id, self._components_name, self._components_type
):
if component_id in ELECTRICITY_STORAGE_TYPES:
components_names.append(component_name)
components_types.append(component_type)
return components_names, components_types
[docs]
def get_residuals_watcher_elements_list(self) -> tuple:
"""
Returns the list of OpenMDAO variables that are interesting to monitor in the residuals
watcher.
"""
self._get_components()
components_residuals_watchers_name_organised_list = []
components_name_organised_list = []
for component_name, component_id in zip(self._components_name, self._components_id):
component_res_list = resources.DICTIONARY_RSD[component_id]
for components_res_watcher in component_res_list:
components_name_organised_list.append(component_name)
components_residuals_watchers_name_organised_list.append(components_res_watcher)
return components_name_organised_list, components_residuals_watchers_name_organised_list
[docs]
def get_wing_punctual_mass_element_list(self) -> Tuple[list, list, list]:
"""
This function returns a list of the components that are to be considered as punctual
masses acting on the wing due to their positions as defined in the powertrain file
"""
self._get_components()
punctual_mass_names = []
punctual_mass_types = []
component_pairs = copy.deepcopy(self._components_symmetrical_pairs)
for component_id, component_name, component_position, component_type in zip(
self._components_id,
self._components_name,
self._components_position,
self._components_type,
):
if component_position in resources.DICTIONARY_PCT_W[component_id]:
punctual_mass_names.append(component_name)
punctual_mass_types.append(component_type)
# TODO: improve the way this is done, as I'm not satisfied with it
for component_pair in self._components_symmetrical_pairs:
if component_pair[0] in punctual_mass_names:
continue
elif component_pair[1] in punctual_mass_names:
continue
else:
component_pairs.remove(component_pair)
return punctual_mass_names, punctual_mass_types, component_pairs
[docs]
def get_wing_punctual_fuel_element_list(self) -> Tuple[list, list, list]:
"""
This function returns a list of the components that are to be considered as punctual
fuel tanks acting on the wing due to their positions as defined in the powertrain file
"""
self._get_components()
punctual_tank_names = []
punctual_tank_types = []
component_pairs = copy.deepcopy(self._components_symmetrical_pairs)
for component_id, component_name, component_position, component_type in zip(
self._components_id,
self._components_name,
self._components_position,
self._components_type,
):
if component_position in resources.DICTIONARY_PCT_W_F[component_id]:
punctual_tank_names.append(component_name)
punctual_tank_types.append(component_type)
# TODO: improve the way this is done, as I'm not satisfied with it
for component_pair in self._components_symmetrical_pairs:
if component_pair[0] in punctual_tank_names:
continue
elif component_pair[1] in punctual_tank_names:
continue
else:
component_pairs.remove(component_pair)
return punctual_tank_names, punctual_tank_types, component_pairs
[docs]
def get_wing_distributed_mass_element_list(self) -> Tuple[list, list, list]:
"""
This function returns a list of the components that are to be considered as distributed
masses acting on the wing due to their positions as defined in the powertrain file
"""
self._get_components()
distributed_mass_names = []
distributed_mass_types = []
component_pairs = copy.deepcopy(self._components_symmetrical_pairs)
for component_id, component_name, component_position, component_type in zip(
self._components_id,
self._components_name,
self._components_position,
self._components_type,
):
if component_position in resources.DICTIONARY_DST_W[component_id]:
distributed_mass_names.append(component_name)
distributed_mass_types.append(component_type)
# TODO: improve the way this is done, as I'm not satisfied with it
for component_pair in self._components_symmetrical_pairs:
if component_pair[0] in distributed_mass_names:
continue
elif component_pair[1] in distributed_mass_names:
continue
else:
component_pairs.remove(component_pair)
return distributed_mass_names, distributed_mass_types, component_pairs
[docs]
def get_wing_distributed_fuel_element_list(self) -> Tuple[list, list, list]:
"""
This function returns a list of the components that are to be considered as distributed
fuel tanks acting on the wing due to their positions as defined in the powertrain file
"""
self._get_components()
distributed_tanks_names = []
distributed_tanks_types = []
component_pairs = copy.deepcopy(self._components_symmetrical_pairs)
for component_id, component_name, component_position, component_type in zip(
self._components_id,
self._components_name,
self._components_position,
self._components_type,
):
if component_position in resources.DICTIONARY_DST_W_F[component_id]:
distributed_tanks_names.append(component_name)
distributed_tanks_types.append(component_type)
# TODO: improve the way this is done, as I'm not satisfied with it
for component_pair in self._components_symmetrical_pairs:
if component_pair[0] in distributed_tanks_names:
continue
elif component_pair[1] in distributed_tanks_names:
continue
else:
component_pairs.remove(component_pair)
return distributed_tanks_names, distributed_tanks_types, component_pairs
[docs]
def will_aircraft_mass_vary(self):
"""
This function returns a boolean telling whether or not there are components in the
powertrain that will make the aircraft mass vary during the flight (like burning fuel or
certain types of batteries). For now, will only be used in the initial guess.
"""
self._get_components()
return any(self._components_makes_mass_vary)
[docs]
def has_fuel_non_consumable_energy_source(self):
"""
This function returns a boolean telling whether or not there are energy sources in the
powertrain that will not make the aircraft vary (like batteries). For now is only used to
provide smart initial guess.
"""
self._get_components()
return any(self._source_does_not_make_mass_vary)
[docs]
def get_connection_graph(self) -> list:
"""
This function returns a graph of connection inside the powertrain without doubling the
components like what get_graphs_connected_voltage() does
"""
self._get_connections()
graph = nx.Graph()
for component_name in self._components_name:
graph.add_node(component_name)
for connection in self._connection_list:
# For bus and splitter, we don't really care about what number of input it is
# connected to, so we do the following
if type(connection["source"]) is list:
source = connection["source"][0]
else:
source = connection["source"]
if type(connection["target"]) is list:
target = connection["target"][0]
else:
target = connection["target"]
graph.add_edge(source, target)
sub_graphs = [graph.subgraph(c).copy() for c in nx.connected_components(graph)]
return sub_graphs
[docs]
def get_graphs_connected_voltage(self) -> list:
"""
This function returns a list of graphs of connected PT components that have more or less
the same imposed voltage. What is meant by that is that since some component impose the
voltage on the circuit while other have independent I/O in terms of voltage e.g the DC/DC
converter this will make it so that there might some subgraph of the architecture with
different connected voltage.
"""
self._get_connections()
graph = nx.Graph()
for component_name, component_id in zip(self._components_name, self._components_id):
graph.add_node(
component_name + "_out",
)
graph.add_node(
component_name + "_in",
)
if not resources.DICTIONARY_IO_INDEP_V[component_id]:
graph.add_edge(
component_name + "_out",
component_name + "_in",
)
for connection in self._connection_list:
# For bus and splitter, we don't really care about what number of input it is
# connected to, so we do the following
if type(connection["source"]) is list:
source = connection["source"][0]
else:
source = connection["source"]
if type(connection["target"]) is list:
target = connection["target"][0]
else:
target = connection["target"]
graph.add_edge(
source + "_in",
target + "_out",
)
sub_graphs = [graph.subgraph(c).copy() for c in nx.connected_components(graph)]
return sub_graphs
def _list_voltage_coherence_to_check(self) -> Tuple[list, list]:
"""
Makes a list, for all sub graphs, of the components that sets the voltage inside of them,
the check on the coherency of the value will be ade later.
"""
# This line prompts the identification of the power train from the file
sub_graphs = self.get_graphs_connected_voltage()
# We create a dictionary to associate name to id
name_to_id_dict = dict(zip(self._components_name, self._components_id))
sub_graphs_voltage_setter = []
for sub_graph in sub_graphs:
# First we make a list of all components in the sub graph that sets the voltage.
nodes_list = list(sub_graph.nodes)
# Then we turns those nodes (components name) in the corresponding component id and
# check if this id sets the voltage and count how many of them do.
node_that_sets_voltage = []
for node in nodes_list:
clean_node_name = node.replace("_in", "").replace("_out", "")
node_id = name_to_id_dict[clean_node_name]
# Since only the output of the components set the voltage, we will oly include
# them in the list
if resources.DICTIONARY_SETS_V[node_id] and "_out" in node:
node_that_sets_voltage.append(node)
sub_graphs_voltage_setter.append(node_that_sets_voltage)
return sub_graphs, sub_graphs_voltage_setter
[docs]
def check_voltage_coherence(self, inputs, number_of_points: int):
"""
Check that all the sub graphs of independent voltage are compatible, meaning that if
there is more than one component that sets the voltage, they have the same target voltage.
:param inputs: inputs vector, in the OpenMDAO format, which contains the value of the
voltages to check
:param number_of_points: number of points in the data to check
"""
sub_graphs_voltage_setters = self._list_voltage_coherence_to_check()[1]
name_to_type = dict(zip(self._components_name, self._components_type))
for sub_graph_voltage_setters in sub_graphs_voltage_setters:
ref_voltage = None
# If zero or one voltage setters nothing to check
if len(sub_graph_voltage_setters) < 2:
pass
else:
# Now for all those setter, we put them in the same format (if it was given as a
# float we transform it in array)
will_work = True
variables_to_check = []
for voltage_setter in sub_graph_voltage_setters:
clean_setter_name = voltage_setter.replace("_in", "").replace("_out", "")
setter_type = name_to_type[clean_setter_name]
input_name = (
PT_DATA_PREFIX
+ setter_type
+ ":"
+ clean_setter_name
+ ":voltage_out_target_mission"
)
variables_to_check.append(input_name)
data_value = inputs[input_name]
# We initiate the test with the first value we find
if ref_voltage is None:
# If not a float, it is an array !
if len(data_value) == 1:
ref_voltage = np.full(number_of_points, data_value)
else:
ref_voltage = data_value
# We check if coherent with other value
else:
if len(data_value) == 1:
data_value = np.full(number_of_points, data_value)
if not np.array_equal(ref_voltage, data_value):
will_work = False
if not will_work:
raise FASTGAHEIncoherentVoltage(
"The target voltage chosen for the following input: "
+ ", ".join(variables_to_check)
+ " is incoherent. Ensure that they have the same value and/or units"
)
[docs]
def get_voltage_to_set(self, inputs, number_of_points: int) -> List[dict]:
"""
Returns a list of the dict of voltage variable names and the value they should be set at
for each of the subgraph. Dict will be empty if there is no voltage to set. The voltage
to set are defined in the registered_components.py file. Note that this function was
coded based on the assumption that the voltage coherence checker was ran before hand. It
also assumes that all the voltage setter are RMS value when the voltage to set is an AC
voltage.
:param inputs: inputs vector, in the OpenMDAO format, which contains the value of the
voltages to check
:param number_of_points: number of points in the data to check
"""
# This line prompts the identification of the power train from the file
sub_graphs, sub_graphs_voltage_setters = self._list_voltage_coherence_to_check()
name_to_type = dict(zip(self._components_name, self._components_type))
name_to_id = dict(zip(self._components_name, self._components_id))
name_to_ct = dict(zip(self._components_name, self._components_type))
name_to_option = dict(zip(self._components_name, self._components_options))
final_list = []
voltage_at_each_node = {}
for sub_graph, sub_graph_voltage_setters in zip(sub_graphs, sub_graphs_voltage_setters):
# First and foremost, we get the value that will serve as the for the setting of the
# voltage in this subgraph. If there are not setters in this subgraph we just pass along
voltage_dict_subgraph = {}
if sub_graph_voltage_setters:
voltage_setter = sub_graph_voltage_setters[0]
clean_setter_name = voltage_setter.replace("_in", "").replace("_out", "")
setter_type = name_to_type[clean_setter_name]
input_name = (
PT_DATA_PREFIX
+ setter_type
+ ":"
+ clean_setter_name
+ ":voltage_out_target_mission"
)
reference_voltage = inputs[input_name]
else:
# We need to use a fake value here, but, a priori, since there are no voltage
# setter there won't be any voltage to set so we can put anything there. Expect,
# again, for the battery which is a particular case. It doesn't appear as a
# setter in the condition above but actually is when it is directly connected to
# a bus.
reference_voltage = None
# The way we'll do it is a bit horrible but it is quick and works
for node in sub_graph.nodes:
component_name = node.replace("_in", "").replace("_out", "")
component_id = name_to_id[component_name]
if (
component_id == "fastga_he.pt_component.battery_pack"
and "_out" in node
and name_to_option[component_name]
):
number_of_cell_in_series = self.get_number_of_cell_in_series(
component_name=component_name,
component_type=name_to_ct[component_name],
inputs=inputs,
)
reference_voltage = (
np.linspace(4.2, 2.65, number_of_points) * number_of_cell_in_series
)
if reference_voltage is None:
reference_voltage = np.array([DEFAULT_VOLTAGE_VALUE])
# We now transform it in the proper array, if it already has the right shape,
# this line does nothing
if len(reference_voltage):
reference_voltage = np.full(number_of_points, reference_voltage)
# Now that we have the voltage to set, we can go through the node in the
# architecture, check if they have a voltage to set, how far from the setter they are
# and then compute what the voltage to set is.
nodes_list = list(sub_graph.nodes)
for node in nodes_list:
component_name = node.replace("_in", "").replace("_out", "")
component_id = name_to_id[component_name]
# Now that we have the id of the node, we can check if there are some voltage to set
voltages_to_set = resources.DICTIONARY_V_TO_SET[component_id]
for voltage_to_set in voltages_to_set:
variable_name = component_name + "." + voltage_to_set
voltage_dict_subgraph[variable_name] = reference_voltage
# If the node in question is the output of a battery in "normal" mode,
# we can guesstimate the voltage but since it is so peculiar (not constant during
# mission) we won't make it appear in the registered_components.py. Yet another
# point of the code where the battery is a exception ^^'
if (
component_id == "fastga_he.pt_component.battery_pack"
and "_out" in node
and not name_to_option[component_name]
):
number_of_cell_in_series = self.get_number_of_cell_in_series(
component_name=component_name,
component_type=name_to_ct[component_name],
inputs=inputs,
)
voltage_to_set = (
np.linspace(4.2, 2.65, number_of_points) * number_of_cell_in_series
)
voltage_dict_subgraph[component_name + ".voltage_out"] = voltage_to_set
voltage_at_each_node[node] = reference_voltage
final_list.append(voltage_dict_subgraph)
self._voltage_at_each_node = voltage_at_each_node
return final_list
[docs]
@staticmethod
def get_number_of_cell_in_series(component_name: str, component_type: str, inputs) -> float:
"""
This function returns the number of cell in series inside a battery module. Was put there
because there is quite a process to extract the value and we will need it twice at least.
:param component_name: name of the battery pack
:param component_type: type of battery pack, for now there is only one but who knows
:param inputs: inputs vector, in the OpenMDAO format
"""
# We only know the promoted name of the variable so we can't access it
# directly, but in the error message that will pop up when we try to use it,
# we have all the info we need
try:
number_of_cell_in_series = inputs[
PT_DATA_PREFIX + component_type + ":" + component_name + ":module:number_cells"
].item()
except RuntimeError as e:
error_message = e.args[0]
abs_names = re.findall(r"\[.*?\]", error_message)[0][1:-1].replace(" ", "").split(",")
abs_name = abs_names[0]
split_abs_name = abs_name.split(".")
# Sometimes the number of cells is not one deep but two deep so we try both
try:
proper_abs_name = ".".join(split_abs_name[1:])
number_of_cell_in_series = inputs[proper_abs_name].item()
except KeyError:
proper_abs_name = ".".join(split_abs_name[2:])
number_of_cell_in_series = inputs[proper_abs_name].item()
return number_of_cell_in_series
[docs]
def get_directed_graph_sub_propulsion_chain(self):
"""
This function returns a list of directed graphs of connected PT sub propulsion chain. As a
prevision for next step all component will be split between their inputs and outputs to
allow to include efficiency.
"""
self._get_connections()
graph = nx.DiGraph()
for component_name, component_id in zip(self._components_name, self._components_id):
graph.add_node(
component_name + "_out",
)
graph.add_node(
component_name + "_in",
)
graph.add_edge(
component_name + "_out",
component_name + "_in",
)
for connection in self._connection_list:
# For bus and splitter, we don't really care about what number of input it is
# connected to so we do the following
if type(connection["source"]) is list:
source = connection["source"][0]
else:
source = connection["source"]
if type(connection["target"]) is list:
target = connection["target"][0]
else:
target = connection["target"]
graph.add_edge(
source + "_in",
target + "_out",
)
return graph
[docs]
def are_propulsor_connected_to_source(self):
"""
This function returns a dictionary which contains, for each propulsor, a boolean which
tells whether the propulsor can be actuated (meaning its connected to a source).
"""
propulsor_list = self.get_thrust_element_list()
source_list = self.get_energy_consumption_list()
is_propulsor_connected_dict = {}
source_output_name_list = []
for source in source_list:
source_output_name_list.append(source + "_out")
powertrain_graph = self.get_directed_graph_sub_propulsion_chain()
undirected_graph = powertrain_graph.to_undirected()
# We will iterate through the default state of the sspc and remover the connections
# between sspc in and sspc out if they are open (meaning no current goes through it so it
# doesn't carry power).
for sspc_name, sspc_default_state in self._sspc_default_state.items():
# If not closed by default
if not sspc_default_state:
undirected_graph.remove_edge(sspc_name + "_in", sspc_name + "_out")
for propulsor in propulsor_list:
# For each propulsor we check that its input is connected to a source output. The
# should only be on input for each propulsor and one output for each source which allows
# to do like this
propulsor_input_name = propulsor + "_in"
connected_nodes = nx.node_connected_component(undirected_graph, propulsor_input_name)
is_propulsor_connected = bool(
connected_nodes.intersection(set(source_output_name_list))
)
is_propulsor_connected_dict[propulsor] = is_propulsor_connected
return is_propulsor_connected_dict
[docs]
def get_power_to_set(self, inputs, propulsive_power_dict: dict) -> Tuple[dict, dict]:
"""
Returns a list of the power at each nodes of each subgraph. Also returns a list of the
dict of current variable names and the value they should be set at for each of the
subgraph. Dict will be empty if there is no power to set. The power to set are defined in
the registered_components.py file.
:param inputs: inputs vector, in the OpenMDAO format, which contains the value of the
voltages to check
:param propulsive_power_dict: dictionary with the propulsive power of each propulsor
"""
# We rewrite the propulsive power dict to match the name of the nodes
propulsive_loads_name = list(propulsive_power_dict.keys())
propulsive_loads_proper_name = []
proper_propulsive_power_dict = {}
for propulsive_load_name in propulsive_loads_name:
propulsive_load_proper_name = propulsive_load_name + "_out"
propulsive_loads_proper_name.append(propulsive_load_name)
proper_propulsive_power_dict[propulsive_load_proper_name] = propulsive_power_dict[
propulsive_load_name
]
graph = self.get_directed_graph_sub_propulsion_chain()
# Need to be put here else the _get_component hasn't triggered yet
name_to_id = dict(zip(self._components_name, self._components_id))
name_to_eta = dict(zip(self._components_name, self._components_efficiency))
# Get a list of nodes who hasn't been treated, will serve as way to check that good
# progress is made.
untreated_nodes = list(graph.nodes)
power_at_each_node = {}
# Initialize the dict with the power at each node with an array full of zeros except for
# propulsors.
template_power = list(proper_propulsive_power_dict.values())[0]
treated_nodes = []
for untreated_node in untreated_nodes:
if untreated_node in list(proper_propulsive_power_dict.keys()):
power_at_each_node[untreated_node] = proper_propulsive_power_dict[untreated_node]
treated_nodes.append(untreated_node)
else:
power_at_each_node[untreated_node] = np.zeros_like(template_power)
node_to_remove_at_the_end = []
# Remove treated nodes
untreated_nodes = [x for x in untreated_nodes if x not in treated_nodes]
# Now we can start treating the nodes. We'll just keep going over the graph until all
# nodes have been treated and we'll check that progress is being made by monitoring the
# number of nodes treated each loop.
previous_treated_node_number = 1
while len(untreated_nodes) != 0 and previous_treated_node_number != 0:
previous_treated_node_number = 0
for node in untreated_nodes:
# First check that we can treat the node. If we can't we move to the next node.
# A node can be treated if all its predecessor were treated
can_be_treated = True
for predecessor in graph.pred[node]:
if predecessor not in treated_nodes:
can_be_treated = False
if not can_be_treated:
continue
component_name = copy.deepcopy(node)
component_end = component_name[-1]
if component_end.isdigit():
str_to_replace = "_" + component_end
component_name = component_name.replace(str_to_replace, "")
component_name = component_name.replace("_out", "").replace("_in", "")
# Here we are sure the node can be treated. But we'll add it to the list of
# treated nodes only after we compute its value. Several case can however happen.
# 1) If it only has one predecessor and that predecessor only has one successor it
# means its either the connection between the input and output of a component so
# we include the efficiency or its the output of a component connected to the
# input of another one.
if len(list(graph.pred[node])) == 1:
predecessor = list(graph.pred[node])[0]
if predecessor.endswith("_1"):
predecessor = predecessor.replace("_1", "")
predecessor_name = predecessor.replace("_out", "").replace("_in", "")
if len(list(graph.succ[predecessor])) == 1:
# To get the component name, we remove the "_in" the "_out" and the "_1".
# We should only have to remove the "_1" as we are ensure that there is
# only one predecessor it must be a "_1"
# if name is the same, we compute the power below base on the efficiency,
# else its the same power, to refactor, we'll just say the efficiency is 1
if component_name == predecessor_name:
eta = name_to_eta[component_name]
else:
eta = 1.0
power_at_each_node[node] = power_at_each_node[predecessor] / eta
treated_nodes.append(node)
previous_treated_node_number += 1
continue
# 2) If it only has one predecessor and that predecessor only more than successor it
# means its a component that splits_power (either planetary gear, splitter, ...).
if len(list(graph.pred[node])) == 1:
predecessor = list(graph.pred[node])[0]
if predecessor.endswith("_1"):
predecessor = predecessor.replace("_1", "")
predecessor_name = predecessor.replace("_out", "").replace("_in", "")
if len(list(graph.succ[predecessor])) > 1:
# Check the predecessor type
predecessor_type = name_to_id[predecessor_name]
if predecessor_type == "fastga_he.pt_component.dc_splitter":
(
primary_input_power,
secondary_power_output,
) = self.splitter_power_inputs(
inputs=inputs,
components_name=predecessor_name,
power_output=power_at_each_node[predecessor],
)
# Then identify which current output it correspond to. Here if the
# component in question is not an sspc, the following should work.
# Else we will try both way
output_name = component_name + ".dc_current_out"
if name_to_id[component_name] == "fastga_he.pt_component.dc_sspc":
if output_name not in self._components_connection_outputs:
output_name = component_name + ".dc_current_in"
if name_to_id[component_name] == "fastga_he.pt_component.dc_line":
output_name = component_name + ".dc_current"
# We look at the number of the corresponding splitter input
index = self._components_connection_outputs.index(output_name)
splitter_input_name = self._components_connection_inputs[index]
if splitter_input_name.endswith("1"):
power_at_each_node[node] = primary_input_power
power_at_each_node[predecessor + "_1"] = primary_input_power
else:
power_at_each_node[node] = secondary_power_output
power_at_each_node[predecessor + "_2"] = secondary_power_output
if predecessor not in node_to_remove_at_the_end:
node_to_remove_at_the_end.append(predecessor)
elif predecessor_type == "fastga_he.pt_component.planetary_gear":
(
primary_input_power,
secondary_power_output,
) = self.gearbox_power_inputs(
inputs=inputs,
components_name=predecessor_name,
power_output=power_at_each_node[predecessor],
)
# We look at the number of the corresponding gearbox input. Will be a
# bit farfetched as we need to find using the current neighbor. For
# gearboxes, the two connexion are rpm and shaft power and they are
# both output of the input side of the gearbox, so we'll have to use
# the input list
# We look at the number of the corresponding splitter input
input_name = component_name + ".shaft_power_out"
index = self._components_connection_inputs.index(input_name)
gearbox_input_name = self._components_connection_outputs[index]
if gearbox_input_name.endswith("1"):
power_at_each_node[node] = primary_input_power
power_at_each_node[predecessor + "_1"] = primary_input_power
else:
power_at_each_node[node] = secondary_power_output
power_at_each_node[predecessor + "_2"] = primary_input_power
if predecessor not in node_to_remove_at_the_end:
node_to_remove_at_the_end.append(predecessor)
elif predecessor_type == "fastga_he.pt_component.fuel_system":
input_power_dict = self.fuel_system_power_inputs(
inputs=inputs,
components_name=predecessor_name,
power_output=power_at_each_node[predecessor],
)
output_name = component_name + ".fuel_consumed_t"
index = self._components_connection_inputs.index(output_name)
fuel_system_input_name = self._components_connection_outputs[index]
input_number = fuel_system_input_name[-1]
power_at_each_node[node] = input_power_dict[
"fuel_consumed_in_t_" + input_number
]
power_at_each_node[predecessor + "_" + input_number] = input_power_dict[
"fuel_consumed_in_t_" + input_number
]
if predecessor not in node_to_remove_at_the_end:
node_to_remove_at_the_end.append(predecessor)
treated_nodes.append(node)
previous_treated_node_number += 1
continue
# 2) If it has more than one predecessor it is the output of a bus/fuel
# system/gear. In this case we simply sum all the predecessor. We should be able
# to do so since we ensured to get there that all predecessor were already treated
if len(list(graph.pred[node])) > 1:
power = np.zeros_like(template_power)
for current_predecessor in graph.pred[node]:
power += power_at_each_node[current_predecessor]
power_at_each_node[node] = power
treated_nodes.append(node)
previous_treated_node_number += 1
continue
# Now we remove all nodes we treated in this while loop
untreated_nodes = [x for x in untreated_nodes if x not in treated_nodes]
for node_to_remove in node_to_remove_at_the_end:
power_at_each_node.pop(node_to_remove)
self._power_at_each_node = power_at_each_node
final_list = {}
for node in list(graph.nodes):
component_name = node.replace("_in", "").replace("_out", "")
component_id = name_to_id[component_name]
power_to_set = resources.DICTIONARY_P_TO_SET[component_id]
for power in power_to_set:
# These are tuple which contains the "in" or "out" tag plus the name of the
# variable
if power[1] == "in" and node.endswith("_in"):
variable_name = component_name + "." + power[0]
# If we are to set the power of a component with multiple inputs,
# the node name will not match and will need to be modified. We will
# check that we are in this case if the variable name endswith a number
if variable_name[-1].isdigit():
final_list[variable_name] = power_at_each_node[
node + "_" + variable_name[-1]
]
else:
final_list[variable_name] = power_at_each_node[node]
elif power[1] == "out" and node.endswith("_out"):
variable_name = component_name + "." + power[0]
final_list[variable_name] = power_at_each_node[node]
return power_at_each_node, final_list
[docs]
def get_network_elements_list(self) -> tuple:
"""
Returns the name of the components and their connections for the visualisation of the
power train as a network graph.
"""
self._get_connections()
icons_name = []
icons_size = []
for component_id in self._components_id:
icons_name.append(resources.DICTIONARY_ICON[component_id])
icons_size.append(resources.DICTIONARY_ICON_SIZE[component_id])
# If the connection is between a bus and an sspc, we shorten the length
curated_connection_list = []
for connections in self._connection_list:
curated_connection_list.append((connections["source"], connections["target"]))
return (
self._components_name,
curated_connection_list,
self._components_type_class,
self._components_om_type,
icons_name,
icons_size,
)
[docs]
def produce_simplified_pt_file_copy(self):
"""
This function was created after the observation that the more components there are in the
powertrain, the longer it takes to run (duh). It is even more striking when running the
optimization to find a new wing area (it can takes minutes). However, for that particular
observation the whole propulsion chain is not needed. We indeed only need the propulsors
to compute the slipstream effect and the propulsive load to check that the power rate is
below 1. Consequently, and only for that particular application, we will produce a
simplified powertrain file which contains only the required elements.
"""
simplified_serializer = copy.deepcopy(self._serializer)
self._get_components()
retained_components = []
# First, we pop all the components that we don't need
for component_name, component_type_class, component_id in zip(
self._components_name, self._components_type_class, self._components_id
):
if (
"propulsor" not in component_type_class
and "propulsive_load" not in component_type_class
):
simplified_serializer.data[KEY_PT_COMPONENTS].pop(component_name)
else:
retained_components.append(component_name)
# Then we pop all the connections that don't involve the components we have
self._get_connections()
cured_connection_list = copy.deepcopy(self._connection_list)
for connection in self._connection_list:
if type(connection["source"]) is str:
if connection["source"] not in retained_components:
cured_connection_list.remove(connection)
else:
if type(connection["target"]) is str:
if connection["target"] not in retained_components:
cured_connection_list.remove(connection)
else:
if connection["target"][0] not in retained_components:
cured_connection_list.remove(connection)
else:
if connection["source"][0] not in retained_components:
cured_connection_list.remove(connection)
else:
if type(connection["target"]) is str:
if connection["target"] not in retained_components:
cured_connection_list.remove(connection)
else:
if connection["target"][0] not in retained_components:
cured_connection_list.remove(connection)
simplified_serializer.data[KEY_PT_CONNECTIONS] = cured_connection_list
pt_file_copy_path = self._power_train_file.replace(".yml", "_temp_copy.yml")
simplified_serializer.write(pt_file_copy_path)
return pt_file_copy_path
[docs]
def get_current_to_set(
self, inputs, propulsive_power_dict: dict, number_of_points: int
) -> dict:
"""
Returns a list of the dict of current variable names and the value they should be set at
for each of the subgraph. Dict will be empty if there is no current to set. The current to
set are defined in the registered_components.py file.
:param inputs: inputs vector, in the OpenMDAO format, which contains the value of the
voltages to check
:param propulsive_power_dict: dictionary with the propulsive power of each propulsor
:param number_of_points: number of points in the data to check
"""
# First we get voltage and power at each point but we first check that they are already
# registered to avoid redoing unnecessary operations
if not self._voltage_at_each_node:
_ = self.get_voltage_to_set(inputs, number_of_points)
if not self._power_at_each_node:
_, _ = self.get_power_to_set(inputs, propulsive_power_dict)
name_to_id = dict(zip(self._components_name, self._components_id))
name_to_option = dict(zip(self._components_name, self._components_options))
all_voltage_dict = copy.deepcopy(self._voltage_at_each_node)
all_power_dict = copy.deepcopy(self._power_at_each_node)
# First step is to remove all the sources inputs from the voltage setter since they won't
# appear in the power setter
for source in self.get_energy_consumption_list():
source_input_name = source + "_in"
if source_input_name in all_voltage_dict:
all_voltage_dict.pop(source_input_name)
# Something worth mentioning here. Due to the way the power_at_each_node dict was
# constructed, the splitter inputs are doubled, whereas their current aren't, meaning the
# power_at_each_node dict will always be longer or at worst the same size. This is the
# one we will use to iterate on.
all_current_dict = {}
for node in all_power_dict:
# first a quick check on whether the component is a splitter or not. Since we are
# iterating on the nodes in the power dictionary, if a components ends with either
# "_in_1" or "_in_2" it is a splitter
component_name = (
node.replace("_in_1", "")
.replace("_in_2", "")
.replace("_in", "")
.replace("_out", "")
)
component_id = name_to_id[component_name]
current_to_set = resources.DICTIONARY_I_TO_SET[component_id]
for current in current_to_set:
# These are tuple which contains the "in" or "out" tag plus the name of the variable
# Some value have been filled with a default value for voltage because they are
# not set, by the code. Turns out some current might be computed base on them so,
# instead of using this default value we will use the value on the other side of
# the component. E.g for the input of a dc/dc converter we will take its outputs
# rather than an arbitrary value.
voltage_node = all_voltage_dict[node]
if all(voltage_node == DEFAULT_VOLTAGE_VALUE):
if "_in" in node:
other_side_component = node.replace("_in", "_out")
else:
other_side_component = node.replace("_out", "_in")
voltage_node = all_voltage_dict[other_side_component]
# Some current correspond correspond to the current in one phase, in which case
# we need to divide by three the obtained current
if "one_phase" in current[0]:
factor = 3.0
else:
factor = 1.0
if current[1] == "in" and node.endswith("_in"):
variable_name = component_name + "." + current[0]
current = all_power_dict[node] / voltage_node / factor
all_current_dict[variable_name] = current
elif current[1] == "out" and node.endswith("_out"):
variable_name = component_name + "." + current[0]
current = all_power_dict[node] / voltage_node / factor
all_current_dict[variable_name] = current
# If the component is a battery, there will be no "official" current to set,
# but as seen empirically if the battery is directly connected to a bus and the
# current is not set properly, it might prevent the code from converging.
if (
name_to_id[component_name] == "fastga_he.pt_component.battery_pack"
and name_to_option[component_name]
):
# We just don't deal with battery inputs
if node.endswith("_out"):
voltage_node = all_voltage_dict[node]
current = all_power_dict[node] / voltage_node
variable_name = component_name + ".dc_current_out"
all_current_dict[variable_name] = current
return all_current_dict
[docs]
def get_battery_list(self) -> Tuple[list, list]:
"""
Returns the list of components inside the power train that are batteries. This function is
used to see where the electricity is stored in the powertrain for the LCA. For now, it tests
the id of the component, but it should be more generic in the future for components like
super-capacitors and others.
"""
self._get_components()
components_names = []
components_types = []
for component_id, component_name, component_type in zip(
self._components_id, self._components_name, self._components_type
):
if "battery_pack" in component_id:
components_names.append(component_name)
components_types.append(component_type)
return components_names, components_types
[docs]
def get_lca_production_element_list(self) -> Dict:
# I hate doing that here, but it prevents a circular import
import fastga_he.models.propulsion.components as he_comp
variables_names_mass = self.get_mass_element_lists()
# one possible way to get the path to the template LCA modules is to trace them back to
# one class we know is ner them such as the Sizing, Performances, ... The downside is
# that it will only work for package located with the component in the default delivery,
# so it won't work like fast-oad plugins. But for now it works
clean_dict = {}
for component_name, component_om_type, variable_name_mass in zip(
self._components_name, self._components_om_type, variables_names_mass
):
sizing_group = he_comp.__dict__["Sizing" + component_om_type]
path_to_sizing_file = pathlib.Path(sys.modules[sizing_group.__module__].__file__)
# The sizing class is defined inside components/sizing...py and the lca template is in
# components/lca_resources/lca_conf.yml so:
path_to_lca_prod_conf_template = (
path_to_sizing_file.parents[0] / "lca_resources/lca_conf_prod.yml"
)
if pth.exists(path_to_lca_prod_conf_template):
# Now we open the file, convert each line to an element of a list and replace the
# anchors by whatever is required
clean_lines = []
with open(path_to_lca_prod_conf_template, "r") as template_file:
for line in template_file.readlines():
# Important to add in the definition of the custom attribute, the name of
# the phase as the code writes it in the lca conf file.
# Also need to make sure we ask for the mass per functional unit and not
# the total mass
clean_lines.append(
line.replace(
"value: ANCHOR_COMPONENT_NAME",
"value: " + component_name + "_production",
)
.replace("ANCHOR_COMPONENT_NAME", component_name)
.replace(
"ANCHOR_COMPONENT_MASS",
variable_name_mass.replace("mass", "mass_per_fu").replace(
":", "__"
),
)
.replace(
"ANCHOR_COMPONENT_LENGTH",
variable_name_mass.replace("mass", "length_per_fu").replace(
":", "__"
),
)
.replace(
"ANCHOR_COMPONENT_MATERIAL",
variable_name_mass.replace("mass", "material").replace(":", "__"),
)
)
clean_dict[component_name] = clean_lines
return clean_dict
[docs]
def get_lca_use_phase_element_list(self) -> Tuple[Dict, List]:
# I still hate doing that here, but it prevents a circular import
import fastga_he.models.propulsion.components as he_comp
# We will start with the assumption that if a component of the powertrain has an impact
# in the use phase, it will have a computation of the emissions, even if they can be nil.
clean_dict = {}
species_list = []
for component_name, component_om_type, component_type in zip(
self._components_name, self._components_om_type, self._components_type
):
sizing_group = he_comp.__dict__["Sizing" + component_om_type]
path_to_sizing_file = pathlib.Path(sys.modules[sizing_group.__module__].__file__)
# The sizing class is defined inside components/sizing...py and the lca template is in
# components/lca_resources/lca_conf.yml so:
path_to_lca_use_conf_template = (
path_to_sizing_file.parents[0] / "lca_resources/lca_conf_use.yml"
)
if pth.exists(path_to_lca_use_conf_template):
# If the component has an impact on the use phase, it must release species in the
# air, which means it must have a species list. We intersect those list to have the
# names of the species release by the power train and update the NAME_TO_UNIT
# dict in the lca_core script.
pre_lca_group = he_comp.__dict__["PreLCA" + component_om_type]()
species_list = species_list + pre_lca_group.species_list
clean_lines = []
with open(path_to_lca_use_conf_template, "r") as template_file:
for line in template_file.readlines():
# Important to add in the definition of the custom attribute, the name of
# the phase as the code writes it in the lca conf file.
# If an anchor for an emission is added, we put the right variable name
if "ANCHOR_EMISSION" in line:
line_to_add = line.replace(
"ANCHOR_EMISSION_",
"data__LCA__operation__he_power_train__"
+ component_type
+ "__"
+ component_name
+ "__",
)
line_to_add = line_to_add.replace("\n", "")
line_to_add = line_to_add + "_per_fu\n"
else:
line_to_add = line.replace(
"value: ANCHOR_COMPONENT_NAME",
"value: " + component_name + "_operation",
).replace("ANCHOR_COMPONENT_NAME", component_name)
clean_lines.append(line_to_add)
clean_dict[component_name] = clean_lines
return clean_dict, list(set(species_list))
[docs]
def get_lca_manufacturing_phase_element_list(self) -> Tuple[Dict, List]:
"""
Get a dict with all the lines to add to the LCA configuration file for the manufacturing
phase. Theoretically, the manufacturing contains the assembly of the airframe plus tests
plus the construction of the assembly plant. In our case, the assembly plant will be
discarded and because we lack data, the assembly of the airframe has been aggregated in the
production. So all that remains are the line tests, which will be very similar to the use
phase. Except we won't attribute emission to each component (which means not adding the
custom attributes), however we'll need to differentiate each CO2, NOx emissions ... so we'll
tag them with component name.
"""
# I still hate doing that here, but it prevents a circular import
import fastga_he.models.propulsion.components as he_comp
# We will start with the assumption that if a component of the powertrain has an impact
# in the use phase, it will have a computation of the emissions, even if they can be nil.
clean_dict = {}
species_list = []
for component_name, component_om_type, component_type in zip(
self._components_name, self._components_om_type, self._components_type
):
sizing_group = he_comp.__dict__["Sizing" + component_om_type]
path_to_sizing_file = pathlib.Path(sys.modules[sizing_group.__module__].__file__)
# The sizing class is defined inside components/sizing...py and the lca template is in
# components/lca_resources/lca_conf.yml so:
path_to_lca_use_conf_template = (
path_to_sizing_file.parents[0] / "lca_resources/lca_conf_use.yml"
)
if pth.exists(path_to_lca_use_conf_template):
# If the component has an impact on the use phase, it must release species in the
# air, which means it must have a species list. We intersect those list to have the
# names of the species release by the power train and update the NAME_TO_UNIT
# dict in the lca_core script.
pre_lca_group = he_comp.__dict__["PreLCA" + component_om_type]()
species_list = species_list + pre_lca_group.species_list
clean_lines = []
with open(path_to_lca_use_conf_template, "r") as template_file:
lines = template_file.readlines()
for idx, line in enumerate(lines):
# Important to add in the definition of the custom attribute, the name of
# the phase as the code writes it in the lca conf file.
# Since we are using the same template as the use phase we remove the lines
# with the custom attributes. Or rather, we simply don't add them which
# means continuing to the next loop of the for
if self.belongs_to_custom_attribute_definition(line, idx, lines):
continue
# If an anchor for an emission is added, we put the right variable name
if "ANCHOR_EMISSION" in line:
line_to_add = line.replace(
"ANCHOR_EMISSION_",
"data__LCA__manufacturing__he_power_train__"
+ component_type
+ "__"
+ component_name
+ "__",
)
line_to_add = line_to_add.replace("\n", "")
line_to_add = line_to_add + "_per_fu\n"
else:
line_to_add = line.replace("ANCHOR_COMPONENT_NAME", component_name)
clean_lines.append(line_to_add)
clean_dict[component_name] = clean_lines
return clean_dict, list(set(species_list))
[docs]
def get_lca_distribution_phase_element_list(self) -> Tuple[Dict, List]:
"""
Get a dict with all the lines to add to the LCA configuration file for the distribution
phase. Will be computed all the time but won't be used if the delivery method is the train.
Lots of commonality with get_lca_manufacturing_phase_element_list
# TODO: Refactor ? Refactor !
"""
# I still hate doing that here, but it prevents a circular import
import fastga_he.models.propulsion.components as he_comp
clean_dict = {}
species_list = []
for component_name, component_om_type, component_type in zip(
self._components_name, self._components_om_type, self._components_type
):
sizing_group = he_comp.__dict__["Sizing" + component_om_type]
path_to_sizing_file = pathlib.Path(sys.modules[sizing_group.__module__].__file__)
# The sizing class is defined inside components/sizing...py and the lca template is in
# components/lca_resources/lca_conf.yml so:
path_to_lca_use_conf_template = (
path_to_sizing_file.parents[0] / "lca_resources/lca_conf_use.yml"
)
if pth.exists(path_to_lca_use_conf_template):
# If the component has an impact on the use phase, it must release species in the
# air, which means it must have a species list. We intersect those list to have the
# names of the species release by the power train and update the NAME_TO_UNIT
# dict in the lca_core script.
pre_lca_group = he_comp.__dict__["PreLCA" + component_om_type]()
species_list = species_list + pre_lca_group.species_list
clean_lines = []
with open(path_to_lca_use_conf_template, "r") as template_file:
lines = template_file.readlines()
for idx, line in enumerate(lines):
# Important to add in the definition of the custom attribute, the name of
# the phase as the code writes it in the lca conf file.
# Since we are using the same template as the use phase we remove the lines
# with the custom attributes. Or rather, we simply don't add them which
# means continuing to the next loop of the for
if self.belongs_to_custom_attribute_definition(line, idx, lines):
continue
# If an anchor for an emission is added, we put the right variable name
if "ANCHOR_EMISSION" in line:
line_to_add = line.replace(
"ANCHOR_EMISSION_",
"data__LCA__distribution__he_power_train__"
+ component_type
+ "__"
+ component_name
+ "__",
)
line_to_add = line_to_add.replace("\n", "")
line_to_add = line_to_add + "_per_fu\n"
else:
line_to_add = line.replace("ANCHOR_COMPONENT_NAME", component_name)
clean_lines.append(line_to_add)
clean_dict[component_name] = clean_lines
return clean_dict, list(set(species_list))
@staticmethod
def _check_existing_instance(power_train_file):
"""
Checks the cache to see if an instance of the cache already exists and is usable. Usable
means there was no modification to the powertrain configuration file.
"""
# If cache is empty, no instance is usable
if not FASTGAHEPowerTrainConfigurator._connection_check_cache:
return False
key = str(power_train_file)
# If the powertrain configuration file is a temporary copy or dedicated for a test,
# the connection test will be omitted
if key.endswith("_temp_copy.yml"):
return True
# If cache is not empty but there is no instance of that particular configuration file, no
# instance is usable.
if key not in FASTGAHEPowerTrainConfigurator._connection_check_cache:
return False
if FASTGAHEPowerTrainConfigurator._connection_check_cache[key]["skip_test"]:
return True
# Finally, if an instance exists, but it has been modified since, no instance is usable.
if (
FASTGAHEPowerTrainConfigurator._connection_check_cache[key]["last_mod_time"]
< pathlib.Path(power_train_file).lstat().st_mtime
):
return False
return True
@staticmethod
def _add_connection_check_cache_instance(power_train_file):
"""
In the case where no instance were usable and the compilation needed to be redone, we add
said compilation to the cache.
"""
key = str(power_train_file)
cache_instance = {
"last_mod_time": pathlib.Path(power_train_file).lstat().st_mtime,
"skip_test": False,
}
FASTGAHEPowerTrainConfigurator._connection_check_cache[key] = cache_instance
[docs]
@staticmethod
def belongs_to_custom_attribute_definition(line, line_idx, lines_to_inspect) -> bool:
"""
Utility function to detect if the line is part of the definition of a custom attribute.
"""
if (
"custom_attributes" in line
and 'attribute: "component"' in lines_to_inspect[line_idx + 1]
):
return True
# Trying to foolproof but if the format is respected it should not cause issues
if line_idx >= 1:
if (
'attribute: "component"' in line
and "custom_attributes" in lines_to_inspect[line_idx - 1]
):
return True
# Trying to foolproof but if the format is respected it should not cause issues
if line_idx >= 2:
if (
'attribute: "component"' in lines_to_inspect[line_idx - 1]
and "custom_attributes" in lines_to_inspect[line_idx - 2]
):
return True
return False
class _YAMLSerializer(ABC):
"""YAML-format serializer."""
def __init__(self):
self._data = None
@property
def data(self):
return self._data
def read(self, file_path: str):
yaml = YAML(typ="safe")
with open(file_path) as yaml_file:
self._data = yaml.load(yaml_file)
def write(self, file_path: str):
yaml = YAML()
yaml.default_flow_style = False
with open(file_path, "w") as file:
yaml.dump(self._data, file)