# External imports
from scipy.interpolate import interp1d
from scipy.optimize import root_scalar
import CoolProp.CoolProp as CP
# Internal imports
from component.base_component import BaseComponent
from connector.mass_connector import MassConnector
from connector.work_connector import WorkConnector
[docs]
class PumpCurveSimilarity(BaseComponent):
"""
**Component**: Pump with a characteristic curve associated
**Model**: Model using characteristic curves for head and power and similarity laws
to calculate the pump performance at different speeds.
**Description**:
Simulates a pump using head, efficiency and NPSHr performance curves (vs flow and speed).
Uses the similarity laws to calculate the pump performance at different speeds.
**Assumptions**:
- Steady-state
- Incompressible fluid approximation for head calculation
**Connectors**:
su (MassConnector): Supply (inlet) side of the turbine.
ex (MassConnector): Exhaust (outlet) side of the turbine.
W (WorkConnector): Shaft power output from the turbine.
**Parameters**:
V_dot_curve: Array of volumetric flowrate curve [m³/h]
Delta_H_curve: Corresponding array from head rise curve [m]
eta_is_curve: Corresponding array from isentropic efficiency curve [-]
NPSH_r_curve: Corresponding array from NPSH required curve [m]
N_rot_rated: Rated pump speed [rpm]
mode : Operation mode ("P_N", "P_M", or "M_N")
- 'P_N': Given suction and exhaust pressures and rotational speed, calculates flow and power
- 'M_N': Given suction pressure, temperature, rotational speed, and mass flow rate, calculates exhaust pressure and power
- 'P_M': Given suction and exhaust pressures and mass flow rate, calculates rotational speed
**Inputs**:
*mode = P_N:*
P_su: Suction pressure [Pa]
T_su: Suction temperature [K]
P_ex: Exhaust pressure [Pa]
N_rot: Rotational speed [RPM]
fluid: Actual working fluid
*mode = P_M:*
P_su: Suction pressure [Pa]
T_su: Suction temperature [K]
P_ex: Exhaust pressure [Pa]
m_dot: Mass flow rate [kg/s]
fluid: Actual working fluid
*mode = M_N:*
P_su: Suction pressure [Pa]
T_su: Suction temperature [K]
N_rot: Rotational speed [RPM]
m_dot: Mass flow rate [kg/s]
fluid: Actual working fluid
**Outputs**:
V_dot: Volumetric flow rate [m³/h]
W_dot: Shaft power required by the pump [W]
m_dot: Mass flow rate [kg/s] or P_ex: Exhaust pressure [Pa] or N_rot: Rotational speed [RPM], depending on the mode
"""
def __init__(self):
# Initialize the base component class
super().__init__()
self.su = MassConnector() # Suction connector
self.ex = MassConnector() # Exhaust connector
self.W = WorkConnector() # Mechanical work connector
def get_required_inputs(self):
# Returns a list of required input variable names. Used to check if the model has enough data to run.
if self.params["mode"] == "P_N": # If the mode is 'P_N', the rotational speed and exhaust pressure are required while the mass flow rate of the expander is calculated
return ["P_su", "T_su", "P_ex", "N_rot", "fluid"]
elif self.params["mode"] == "P_M": # If the mode is 'P_M', the mass flow rate and exhaust pressure are required while the rotational speed of the expander is calculated
return ["P_su", "T_su", "P_ex", "m_dot", "fluid"]
elif self.params["mode"] == "M_N": # If the mode is 'M_N', the rotational speed and mass flow rate are required while the exhaust pressure of the expander is calculated
return ["P_su", "T_su", "N_rot", "m_dot", "fluid"]
else:
raise ValueError("Specify a mode for the pump : 'P_N', 'P_M', 'M_N'")
def get_required_parameters(self):
# Returns a list of required parameters needed for model execution.
return ["V_dot_curve", "Delta_H_curve", "eta_is_curve", "NPSH_r_curve", "N_rot_rated", "mode"]
def solve(self):
"""
Main solving routine that extrapolates pump curves, computes thermodynamic and performance outputs.
"""
self.check_parametrized()
self.check_calculable()
if not self.calculable or not self.parametrized:
print("Component is not calculable or not parametrized")
return
if self.params["mode"] == "P_N": # Given P_su, T_su, P_ex, N_rot
if self.su.p > self.ex.p:
print("Supply pressure is higher than exhaust pressure")
return
self.solve_PN()
elif self.params["mode"] == "P_M": # Given P_su, T_su, P_ex, m_dot
if self.su.p > self.ex.p:
print("Supply pressure is higher than exhaust pressure")
return
self.solve_PM()
elif self.params["mode"] == "M_N": # Given P_su, T_su, N_rot, m_dot
self.solve_MN()
self.solved = True
def solve_PN(self):
self.AS = CP.AbstractState("HEOS", self.su.fluid)
g = 9.81 # Gravity [m/s²]
self.V_dot_flag = 0 # Flag if flow is outside curve range
# Scale curves with respect to current speed
self.V_dot_curve = self.params["V_dot_curve"] * (self.W.N_rot / self.params["N_rot_rated"])
self.Delta_H_curve = (self.params["Delta_H_curve"] * (self.W.N_rot / self.params["N_rot_rated"]) ** 2)
self.eta_is_curve = self.params["eta_is_curve"]
self.NPSH_r_curve = (self.params["NPSH_r_curve"] * (self.W.N_rot / self.params["N_rot_rated"]) ** 2)
# Create interpolation functions for extrapolated performance
Delta_H_V = interp1d(self.Delta_H_curve, self.V_dot_curve, kind="linear", fill_value="extrapolate")
V_eta_is = interp1d(self.V_dot_curve, self.eta_is_curve, kind="linear", fill_value="extrapolate")
V_NPSH_r = interp1d(self.V_dot_curve, self.NPSH_r_curve, kind="linear", fill_value="extrapolate")
# Compute head difference [m]
DP = self.ex.p - self.su.p
self.Delta_H = DP / (g * self.su.D)
# Compute volumetric flowrate [m³/h]
self.V_dot = Delta_H_V(self.Delta_H)
# Flag if extrapolation is outside defined curves
if self.V_dot > self.V_dot_curve[-1] or self.V_dot < self.V_dot_curve[0]:
self.V_dot_flag = 1
# Mass flow rate [kg/s]
self.m_dot = self.su.D * self.V_dot / 3600
self.su.set_m_dot(self.m_dot)
self.ex.set_m_dot(self.m_dot)
# Isentropic efficiency
self.eta_is = V_eta_is(self.V_dot)
# NPSH required
self.NPSH_r = V_NPSH_r(self.V_dot)
# Isentropic outlet enthalpy
self.AS.update(CP.PSmass_INPUTS, self.ex.p, self.su.s)
h_ex_s = self.AS.hmass()
# Actual outlet enthalpy
h_ex = self.su.h + (h_ex_s - self.su.h) / self.eta_is
self.ex.set_h(h_ex)
# Ideal hydraulic power
self.W_dot_hyd = g * self.Delta_H * self.m_dot # W
# Shaft power
self.W_dot_shaft = self.su.m_dot * (self.ex.h - self.su.h) # W
self.W.set_W_dot(self.W_dot_shaft)
# Check for impossible operation
if self.V_dot_flag:
print("Flowrate outside possible range. Impossible operation.")
return
def solve_PM(self):
self.AS = CP.AbstractState("HEOS", self.su.fluid)
g = 9.81 # Gravity [m/s²]
# Known inputs
self.m_dot = self.su.m_dot
# Convert mass flow → volumetric flow in m³/h
self.V_dot = self.m_dot / self.su.D * 3600
# Required head from pressure rise
DP = self.ex.p - self.su.p
H_required = DP / (g * self.su.D)
# Rated curves
V_dot_rated = self.params["V_dot_curve"]
H_rated = self.params["Delta_H_curve"]
N_rated = self.params["N_rot_rated"]
NPSH_r_rated = self.params["NPSH_r_curve"]
# Interpolator for the head curve at rated speed
H_V_interp = interp1d(V_dot_rated, H_rated,
kind="linear", fill_value="extrapolate")
# Define the equation for N that must be zero:
# H_curve(Q,N) - H_required = 0
def head_error(N):
if N <= 0:
return 1e9 # avoid non-physical speeds
# Scale flow relative to rated speed using similarity laws
V_dot_equiv = self.V_dot * (N_rated / N)
# Get rated head at this equivalent flow
H_at_rated = H_V_interp(V_dot_equiv)
# Apply scaling law
H_at_speed = H_at_rated * (N / N_rated)**2
return H_at_speed - H_required
# Solve for N
sol = root_scalar(head_error, bracket=[1, 5*N_rated], method="bisect")
N_rot = sol.root
self.W.set_N_rot(N_rot)
# Now compute pump performance at this speed
# Rebuild scaled curves
self.V_dot_curve = V_dot_rated * (N_rot / N_rated)
self.Delta_H_curve = H_rated * (N_rot / N_rated)**2
self.NPSH_r_curve = NPSH_r_rated * (N_rot / N_rated)**2
self.eta_is_curve = self.params["eta_is_curve"]
# Interpolators
eta_interp = interp1d(self.V_dot_curve, self.eta_is_curve,
kind="linear", fill_value="extrapolate")
NPSH_r_interp = interp1d(self.V_dot_curve, self.NPSH_r_curve,
kind="linear", fill_value="extrapolate")
# NPSH
self.NPSH_r = NPSH_r_interp(self.V_dot)
# Efficiency
self.eta_is = eta_interp(self.V_dot)
# Update mass flow
self.V_dot = self.V_dot
self.ex.set_m_dot(self.m_dot)
# Thermodynamics
self.AS.update(CP.PSmass_INPUTS, self.ex.p, self.su.s)
h_ex_s = self.AS.hmass()
h_ex = self.su.h + (h_ex_s - self.su.h) / self.eta_is
self.ex.set_h(h_ex)
# Power
# Ideal hydraulic power
self.W_dot_hyd = g * H_required * self.m_dot # W
# Actual shaft power
self.W_dot_shaft = self.m_dot * (h_ex - self.su.h)
self.W.set_W_dot(self.W_dot_shaft)
def solve_MN(self):
self.AS = CP.AbstractState("HEOS", self.su.fluid)
g = 9.81 # Gravity [m/s²]
self.V_dot_flag = 0 # Flag if flow is outside curve range
# Known inputs
self.m_dot = self.su.m_dot
rho = self.su.D
N = self.W.N_rot
N_rated = self.params["N_rot_rated"]
# Convert mass flow to volumetric flow (m³/h)
V_dot = self.m_dot / rho * 3600
self.V_dot = V_dot
# Rated curves
V_dot_rated = self.params["V_dot_curve"]
H_rated = self.params["Delta_H_curve"]
eta_rated = self.params["eta_is_curve"]
NPSH_r_rated = self.params["NPSH_r_curve"]
# Interpolators for rated curves
H_interp = interp1d(V_dot_rated, H_rated, kind="linear", fill_value="extrapolate")
eta_interp = interp1d(V_dot_rated, eta_rated, kind="linear", fill_value="extrapolate")
NPSH_r_interp = interp1d(V_dot_rated, NPSH_r_rated, kind="linear", fill_value="extrapolate")
# 1. Map actual Q to equivalent Q at rated speed
V_dot_eq = V_dot * (N_rated / N)
# 2. Rated head at Q_eq
H_at_rated = H_interp(V_dot_eq)
# 3. Head at actual speed
H = H_at_rated * (N / N_rated)**2
self.Delta_H = H
# 4. Compute exhaust pressure
P_ex = self.su.p + rho * g * H
self.ex.set_p(P_ex)
# Flag out-of-range flows
if V_dot < V_dot_rated[0] or V_dot > V_dot_rated[-1]:
self.V_dot_flag = 1
# Efficiency at this speed (same scaling method)
eta = eta_interp(V_dot)
self.eta_is = eta
# NPSH required
self.NPSH_r = NPSH_r_interp(V_dot)
# 5. Thermodynamic outlet enthalpy
# Compute isentropic enthalpy
self.AS.update(CP.PSmass_INPUTS, self.ex.p, self.su.s)
h_ex_s = self.AS.hmass()
# Actual enthalpy rise
h_ex = self.su.h + (h_ex_s - self.su.h)/eta
self.ex.set_h(h_ex)
# 6. Power
self.W_dot_hyd = g * H * self.m_dot # Ideal hydraulic power [W]
self.W_dot_shaft = self.m_dot * (h_ex - self.su.h)
self.W.set_W_dot(self.W_dot_shaft)
def print_results(self):
"""
Prints the results of the pump simulation.
"""
print("Pump Results:")
print(f" Volumetric Flow Rate (V_dot): {self.V_dot:.3f} m³/h")
print(f" Mass Flow Rate (m_dot): {self.m_dot:.3f} kg/s")
print(f" Outlet Pressure (P_ex): {self.ex.p:.3f} Pa")
print(f" Rotational Speed (N_rot): {self.W.N_rot:.3f} RPM")
print(f" Shaft Power (W_dot): {self.W_dot_shaft:.3f} W")
print(f" Ideal Hydraulic Power (W_dot_hyd): {self.W_dot_hyd:.3f} W")
print(f" Isentropic Efficiency (eta_is): {self.eta_is:.3f}")
print(f" NPSH Required: {self.NPSH_r:.3f} m")
def plot_characteristic_curves(self, speeds_to_plot=None, rho_curve=None, n_points=200):
"""
Plot characteristic curves (H, Power, Efficiency, NPSH_r) vs Flow (Q)
for different rotational speeds using pump similarity laws and
density correction according to the current working fluid.
Uses the same nomenclature and curve definitions as the solver:
- V_dot_curve
- Delta_H_curve
- eta_is_curve
- NPSH_curve
- N_rot_rated
- density_curve
"""
import matplotlib.pyplot as plt
import numpy as np
from scipy.interpolate import interp1d
# ---------------------------------------------------------
# 1. Default speeds: use current operating speed
# ---------------------------------------------------------
if speeds_to_plot is None:
speeds_to_plot = [self.inputs["N_rot"]]
# ---------------------------------------------------------
# 2. Load manufacturer curves (reference curves)
# ---------------------------------------------------------
Q_base = np.array(self.params["V_dot_curve"]) # m³/h
H_base = np.array(self.params["Delta_H_curve"]) # m
eta_base = np.array(self.params["eta_is_curve"]) # -
NPSH_base = np.array(self.params["NPSH_r_curve"]) # m
# Interpolators for plotting on uniform flow grid
H_of_Q = interp1d(Q_base, H_base, fill_value="extrapolate")
eta_of_Q = interp1d(Q_base, eta_base, fill_value="extrapolate")
NPSH_of_Q = interp1d(Q_base, NPSH_base, fill_value="extrapolate")
# ---------------------------------------------------------
# 3. Density correction (power only)
# ---------------------------------------------------------
rho_actual = self.su.D # actual fluid density
rho_curve = rho_curve # reference fluid density
density_ratio = rho_actual / rho_curve
# ---------------------------------------------------------
# 4. Flow range for uniform plotting
# ---------------------------------------------------------
Q_min = np.min(Q_base)
Q_max = np.max(Q_base)
Q_grid = np.linspace(Q_min, Q_max, n_points)
# ---------------------------------------------------------
# 5. Create 4 plots
# ---------------------------------------------------------
fig, axes = plt.subplots(2, 2, figsize=(16, 10))
axH, axP, axE, axN = axes.flatten()
# ---------------------------------------------------------
# 6. Loop over speeds and scale curves
# ---------------------------------------------------------
for N in speeds_to_plot:
N_ref = self.params["N_rot_rated"]
s = N / N_ref # speed ratio
# Similarity laws:
# Q ~ N
# H ~ N²
# P ~ ρ N³
Q_scaled = Q_grid * s
H_scaled = H_of_Q(Q_grid) * s**2
eta_scaled = eta_of_Q(Q_grid)
NPSH_scaled = NPSH_of_Q(Q_grid) * s**2
# Hydraulic power at reference density
P_h = rho_curve * 9.81 * (Q_grid / 3600) * H_of_Q(Q_grid)
# Shaft power with similarity + density correction
P_scaled = (P_h / eta_scaled) * s**3 * density_ratio
# --- Plot Head ---
axH.plot(Q_scaled, H_scaled, label=f"{N:.0f} RPM")
# --- Plot Power ---
axP.plot(Q_scaled, P_scaled, label=f"{N:.0f} RPM")
# --- Plot Efficiency ---
axE.plot(Q_scaled, eta_scaled, label=f"{N:.0f} RPM")
# --- Plot NPSH_r ---
axN.plot(Q_scaled, NPSH_scaled, label=f"{N:.0f} RPM")
# ---------------------------------------------------------
# 7. Format axes
# ---------------------------------------------------------
axH.set_title("Head vs Flow Rate")
axH.set_xlabel("Flow Rate [m³/h]")
axH.set_ylabel("Head [m]")
axH.grid(True)
axH.legend()
axP.set_title("Power vs Flow Rate")
axP.set_xlabel("Flow Rate [m³/h]")
axP.set_ylabel("Shaft Power [W]")
axP.grid(True)
axP.legend()
axE.set_title("Isentropic Efficiency vs Flow")
axE.set_xlabel("Flow Rate [m³/h]")
axE.set_ylabel("η_is [-]")
axE.grid(True)
axE.legend()
axN.set_title("NPSH Required vs Flow")
axN.set_xlabel("Flow Rate [m³/h]")
axN.set_ylabel("NPSH_r [m]")
axN.grid(True)
axN.legend()
plt.tight_layout()
plt.show()