You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Mooi-Kickstart/pyrecoy/pyrecoy/pyrecoy2/financial.py

679 lines
22 KiB
Python

from pathlib import Path
from datetime import timedelta
import numpy as np
import numpy_financial as npf
import pandas as pd
import warnings
def npv(discount_rate, cashflows):
cashflows = np.array(cashflows)
return (cashflows / (1 + discount_rate) ** np.arange(1, len(cashflows) + 1)).sum(
axis=0
)
def calc_electr_market_results(model, nom_col=None, real_col=None):
"""Function to calculate the financial result on Day-Ahead and Imbalance market for the input model.
Parameters:
-----------
model : df
DataFrame containing at least 'DAM', 'POS' and 'NEG' columns.
nom_col : str
Name of the column containing the Day-Ahead nominations in MWh
Negative values = Buy, positive values = Sell
imb_col : str
Name of the column containing the Imbalance volumes in MWh
Negative values = Buy, positive values = Sell
Returns:
--------
Original df with added columns showing the financial results per timeunit.
"""
if nom_col is None:
nom_col = "Nom. vol."
model[nom_col] = 0
producing = model[real_col] > 0
model["Prod. vol."] = model[real_col].where(producing, other=0)
model["Cons. vol."] = model[real_col].where(~producing, other=0)
model["Imb. vol."] = model[real_col] - model[nom_col]
model["Day-Ahead Result"] = model[nom_col] * model["DAM"]
model["POS Result"] = 0
model["NEG Result"] = 0
posimb = model["Imb. vol."] > 0
model["POS Result"] = model["POS"] * model["Imb. vol."].where(posimb, other=0)
model["NEG Result"] = model["NEG"] * model["Imb. vol."].where(~posimb, other=0)
model["Imbalance Result"] = model["POS Result"] + model["NEG Result"]
model["Combined Result"] = model["Day-Ahead Result"] + model["Imbalance Result"]
return model
def calc_co2_costs(co2_prices, volumes, fuel):
"""Calculates gas market results
Parameters:
-----------
co2_prices : numeric or array
CO2 prices in €/ton
volumes : list
List of arrays containing volumes
fuel : list
List of arrays containing gas volumes
Returns:
--------
Returns a single negative value (=costs) in €
"""
if not isinstance(volumes, list):
volumes = [volumes]
emission_factors = {
"gas": 1.884 / 9.769
} # in ton/MWh (based on 1.884 kg CO2/Nm3, 9.769 kWh/Nm3)
if fuel not in emission_factors.keys():
raise NotImplementedError(
f"Emission factor for chosen fuel '{fuel}' is not implemented."
f"Implement it by adding emission factor to the 'emission_factors' table."
)
emission_factor = emission_factors[fuel]
return -round(
abs(sum((array * emission_factor * co2_prices).sum() for array in volumes)), 2
)
def calculate_eb_ode(
cons,
electr=True,
year=2020,
tax_bracket=None,
base_cons=None,
horti=False,
m3=False,
):
"""Calculates energy tax and ODE for consumption of electricity or natural gas in given year.
Function calculates total tax to be payed for electricity or natural gas consumption,
consisting of energy tax ('Energiebelasting') and sustainable energy surcharge ('Opslag Duurzame Energie').
Tax bracket that applies is based on consumption level, with a different tax
rate for each bracket.
For Gas
1: 0 - 170.000 m3
3: 170.000 - 1 mln. m3
4: 1 mln. - 10 mln. m3
5: > 10 mln. m3
For Electricity
1: 0 - 10 MWh
2: 10 - 50 MWh
3: 50 - 10.000 MWh
4: > 10.000 MWh
Parameters:
-----------
cons : numeric
Total consumption in given year for which to calculate taxes.
Electricity consumption in MWh
Gas consumption in MWh (or m3 and use m3=True)
electr : bool
Set to False for natural gas rates. Default is True.
year : int
Year for which tax rates should be used. Tax rates are updated
annually and can differ significantly.
tax_bracket : int
Tax bracket (1-4) to assume.
Parameter can not be used in conjunction ith 'base_cons'.
base_cons : numeric
Baseline consumption to assume, in same unit as 'cons'.
Specified value is used to decide what tax bracket to start in.
Taxes from baseline consumption are not included in calculation of the tax amount.
Parameter can not be used in conjunction with 'tax_bracket'.
horti : bool
The horticulture sector gets a discount on gas taxes.
m3 : bool
Set to True if you want to enter gas consumption in m3.
Default is to enter consumption in MWh.
Returns:
--------
Total tax amount as negative number (costs).
Note:
-----
This function is rather complicated, due to all its optionalities.
Should probably be simplified or split into different functions.
"""
if tax_bracket is not None and base_cons is not None:
raise ValueError(
"Parameters 'tax_bracket' and 'base_cons' can not be used at the same time."
)
if tax_bracket is None and base_cons is None:
raise ValueError(
"Function requires input for either 'tax_bracket' or 'base_cons'."
)
cons = abs(cons)
commodity = "electricity" if electr else "gas"
if commodity == "gas":
if not m3:
cons /= 9.769 / 1000 # Conversion factor for gas: 1 m3 = 9.769 kWh
base_cons = base_cons / (9.769 / 1000) if base_cons is not None else None
else:
cons *= 1000 # conversion MWh to kWh
base_cons = base_cons * 1000 if base_cons is not None else None
tax_brackets = {
"gas": [0, 170_000, 1_000_000, 10_000_000],
"electricity": [0, 10_000, 50_000, 10_000_000],
}
tax_brackets = tax_brackets[commodity]
base_cons = tax_brackets[tax_bracket - 1] if tax_bracket else base_cons
if commodity == "gas" and horti:
commodity += "_horticulture"
eb_rates, ode_rates = get_tax_tables(commodity)
eb = 0
ode = 0
for bracket in range(4):
if bracket < 3:
br_lower_limit = tax_brackets[bracket]
br_upper_limit = tax_brackets[bracket + 1]
if base_cons > br_upper_limit:
continue
bracket_size = br_upper_limit - max(br_lower_limit, base_cons)
cons_in_bracket = min(cons, bracket_size)
else:
cons_in_bracket = cons
# print(eb_rates.columns[bracket], cons_in_bracket, round(eb_rates.loc[year, eb_rates.columns[bracket]], 6), round(eb_rates.loc[year, eb_rates.columns[bracket]] * cons_in_bracket,2))
eb += eb_rates.loc[year, eb_rates.columns[bracket]] * cons_in_bracket
ode += ode_rates.loc[year, ode_rates.columns[bracket]] * cons_in_bracket
cons -= cons_in_bracket
if cons == 0:
break
return -round(eb, 2), -round(ode, 2)
def get_tax_tables(commodity):
"""Get EB and ODE tax rate tables from json files.
Returns two tax rate tables as DataFrame.
If table is not up-to-date, try use update_tax_tables() function.
"""
folder = Path(__file__).resolve().parent / "data" / "tax_tariffs"
eb_table = pd.read_json(folder / f"{commodity}_eb.json")
ode_table = pd.read_json(folder / f"{commodity}_ode.json")
if commodity == "electricity":
ode_table.drop(columns=ode_table.columns[3], inplace=True)
else:
eb_table.drop(columns=eb_table.columns[0], inplace=True)
if commodity != "gas_horticulture":
eb_table.drop(columns=eb_table.columns[3], inplace=True)
return eb_table, ode_table
def get_tax_rate(commodity, year, tax_bracket, perMWh=True):
"""Get tax rate for specific year and tax bracket.
Parameters:
-----------
commodity : str
{'gas' or 'electricity'}
year : int
{2013 - current year}
tax_bracket : int
{1 - 4}
For Gas:
1: 0 - 170.000 m3
3: 170.000 - 1 mln. m3
4: 1 mln. - 10 mln. m3
5: > 10 mln. m3
For Electricity:
1: 0 - 10 MWh
2: 10 - 50 MWh
3: 50 - 10.000 MWh
4: > 10.000 MWh
perMWh : bool
Defaults to True. Will return rates (for gas) in €/MWh instead of €/m3.
Returns:
--------
Dictionary with EB, ODE and combined rates (in €/MWh for electricity and €/m3 for gas)
{'EB' : float
'ODE' : float,
'EB+ODE' : float}
"""
eb_table, ode_table = get_tax_tables(commodity)
eb_rate = eb_table.loc[year, :].iloc[tax_bracket - 1].astype(float).round(5) * 1000
ode_rate = (
ode_table.loc[year, :].iloc[tax_bracket - 1].astype(float).round(5) * 1000
)
if commodity == "gas" and perMWh == True:
eb_rate /= 9.769
ode_rate /= 9.769
comb_rate = (eb_rate + ode_rate).round(5)
return {"EB": eb_rate, "ODE": ode_rate, "EB+ODE": comb_rate}
def update_tax_tables():
"""Function to get EB and ODE tax rate tables from belastingdienst.nl and save as json file."""
url = (
"https://www.belastingdienst.nl/wps/wcm/connect/bldcontentnl/belastingdienst/"
"zakelijk/overige_belastingen/belastingen_op_milieugrondslag/tarieven_milieubelastingen/"
"tabellen_tarieven_milieubelastingen?projectid=6750bae7-383b-4c97-bc7a-802790bd1110"
)
tables = pd.read_html(url)
table_index = {
3: "gas_eb",
4: "gas_horticulture_eb",
6: "electricity_eb",
8: "gas_ode",
9: "gas_horticulture_ode",
10: "electricity_ode",
}
for key, val in table_index.items():
table = tables[key].astype(str)
table = table.applymap(lambda x: x.strip("*"))
table = table.applymap(lambda x: x.strip(""))
table = table.applymap(lambda x: x.replace(",", "."))
table = table.astype(float)
table["Jaar"] = table["Jaar"].astype(int)
table.set_index("Jaar", inplace=True)
path = Path(__file__).resolve().parent / "data" / "tax_tariffs" / f"{val}.json"
table.to_json(path)
def calc_grid_costs(
peakload_kW,
grid_operator,
year,
connection_type,
totalcons_kWh=0,
kw_contract_kW=None,
path=None,
modelled_time_period_years = 1
):
"""Calculate grid connection costs for one full year
Parameters:
-----------
peakload_kW : numeric or list
Peak load in kW. Can be single value (for entire year) or value per month (list).
grid_operator : str
{'tennet', 'liander', 'enexis', 'stedin'}
year : int
Year to get tariffs for, e.g. 2020
connection_type : str
Type of grid connection, e.g. 'TS' or 'HS'.
Definitions are different for each grid operator.
totalcons_kWh : numeric
Total yearly consumption in kWh
kw_contract_kW : numeric
in kW. If provided, function will assume fixed value kW contract
path : str
Path to directory with grid tariff files.
Default is None; function will to look for default folder on SharePoint.
Returns:
--------
Total variable grid connection costs in €/year (fixed costs 'vastrecht' nog included)
"""
totalcons_kWh /= modelled_time_period_years
tariffs = get_grid_tariffs_electricity(grid_operator, year, connection_type, path)
kw_max_kW = np.mean(peakload_kW)
max_peakload_kW = np.max(peakload_kW)
if kw_contract_kW is None:
kw_contract_kW = False
if bool(kw_contract_kW) & (kw_contract_kW < max_peakload_kW):
warnings.warn(
"Maximum peak consumption is higher than provided 'kw_contract' value."
"Will continue to assume max peak consumption as kW contract."
)
kw_contract_kW = max_peakload_kW
if not bool(kw_contract_kW):
kw_contract_kW = max_peakload_kW
if (tariffs["kWh tarief"] != 0) and (totalcons_kWh is None):
raise ValueError(
"For this grid connection type a tariff for kWh has to be paid. "
"Therefore 'totalcons_kWh' can not be None."
)
# kw_contract = 1000
# print("tarieven")
# print(abs(totalcons_kWh))
# print(modelled_time_period_years)
# print(-round(tariffs["kWh tarief"]))
# print({
# "Variable": -round(tariffs["kWh tarief"] * abs(totalcons_kWh) * modelled_time_period_years, 2),
# "kW contract": -round(tariffs["kW contract per jaar"] * kw_contract_kW * modelled_time_period_years, 2),
# "kW max": -round(tariffs["kW max per jaar"] * max_peakload_kW * modelled_time_period_years, 2),
# })
return {
"Variable": -round(tariffs["kWh tarief"] * abs(totalcons_kWh) * modelled_time_period_years, 2),
"kW contract": -round(tariffs["kW contract per jaar"] * kw_contract_kW * modelled_time_period_years, 2),
"kW max": -round(tariffs["kW max per jaar"] * max_peakload_kW * modelled_time_period_years, 2),
}
def get_grid_tariffs_electricity(grid_operator, year, connection_type, path=None):
"""Get grid tranposrt tariffs
Parameters:
-----------
grid_operator : str
{'tennet', 'liander', 'enexis', 'stedin'}
year : int
Year to get tariffs for, e.g. 2020
connection_type : str
Type of grid connection, e.g. 'TS' or 'HS'.
Definitions are different for each grid operator.
path : str
Path to directory with grid tariff files.
Default is None; function will to look for default folder on SharePoint.
Returns:
--------
Dictionary containing grid tariffs in €/kW/year and €/kWh
"""
if path is None:
path = Path(__file__).resolve().parent / "data" / "grid_tariffs"
else:
path = Path(path)
if not path.exists():
raise SystemError(
f"Path '{path}' not found. Specify different path and try again."
)
filename = f"{grid_operator.lower()}_{year}.csv"
filepath = path / filename
if not filepath.exists():
raise NotImplementedError(
f"File '{filename}' does not exist. Files available: {[file.name for file in path.glob('*.csv')]}"
)
rates_table = pd.read_csv(
path / filename, sep=";", decimal=",", index_col="Aansluiting"
)
if connection_type not in rates_table.index:
raise ValueError(
f"The chosen connection type '{connection_type}' is not available "
f"for grid operator '{grid_operator}'. Please choose one of {list(rates_table.index)}."
)
return rates_table.loc[connection_type, :].to_dict()
def income_tax(ebit, fixed_tax_rate):
"""
Calculates income tax based on EBIT.
2021 tax rates
"""
if fixed_tax_rate:
return round(ebit * -0.25, 0)
if ebit > 245_000:
return round(245_000 * -0.15 + (ebit - 200_000) * -0.25, 2)
if ebit < 0:
return 0
else:
return -round(ebit * 0.15, 2)
def calc_business_case(
capex,
discount_rate,
project_duration,
depreciation,
residual_value,
regular_earnings,
irregular_cashflows=0,
eia=False,
vamil=False,
fixed_income_tax=False
):
"""Calculate NPV and IRR for business case.
All input paremeters are either absolute or relative to a baseline.
Parameters:
-----------
capex : numeric
Total CAPEX or extra CAPEX compared to baseline
discount_rate : numeric
% as decimal value
project_duration : numeric
in years
depreciation : numeric of list
Yearly depreciation costs
residual_value : numeric
Residual value at end of project in €, total or compared to baseline.
regular_earnings : numeric
Regular earnings, usually EBITDA
irregular_cashflows : list
Pass list with value for each year.
eia : bool
Apply EIA ("Energie Investerings Aftrek") tax discounts.
Defaults to False.
vamil : bool
Apply VAMIL ("Willekeurige afschrijving milieu-investeringen") tax discounts.
Defaults to False.
Returns:
--------
DataFrame showing complete calculation resulting in NPV and IRR
"""
years = [f"Year {y}" for y in range(project_duration + 1)]
years_o = years[1:]
bc_calc = pd.DataFrame(columns=years)
bc_calc.loc["CAPEX (€)", "Year 0"] = -capex
bc_calc.loc["Regular Earnings (€)", years_o] = regular_earnings
bc_calc.loc["Irregular Cashflows (€)", years_o] = irregular_cashflows
bc_calc.loc["EBITDA (€)", years_o] = (
bc_calc.loc["Regular Earnings (€)", years_o]
+ bc_calc.loc["Irregular Cashflows (€)", years_o]
)
depreciations = [depreciation] * project_duration
if vamil:
ebitdas = bc_calc.loc["EBITDA (€)", years_o].to_list()
depreciations = _apply_vamil(depreciations, project_duration, ebitdas)
bc_calc.loc["Depreciations (€) -/-", years_o] = np.array(depreciations) * -1
bc_calc.loc["EBIT (€)", years_o] = (
bc_calc.loc["EBITDA (€)", years_o]
+ bc_calc.loc["Depreciations (€) -/-", years_o]
)
if eia:
bc_calc = _apply_eia(bc_calc, project_duration, capex, years_o)
bc_calc.loc["Income tax (Vpb.) (€)", years_o] = bc_calc.loc["EBIT (€)", :].apply(
income_tax, args=[fixed_income_tax]
)
if eia:
bc_calc.loc["NOPLAT (€)", years_o] = (
bc_calc.loc["EBIT before EIA (€)", :]
+ bc_calc.loc["Income tax (Vpb.) (€)", years_o]
)
else:
bc_calc.loc["NOPLAT (€)", years_o] = (
bc_calc.loc["EBIT (€)", :] + bc_calc.loc["Income tax (Vpb.) (€)", years_o]
)
bc_calc.loc["Depreciations (€) +/+", years_o] = depreciations
bc_calc.loc["Free Cash Flow (€)", years] = (
bc_calc.loc["CAPEX (€)", years].fillna(0)
+ bc_calc.loc["NOPLAT (€)", years].fillna(0)
+ bc_calc.loc["Depreciations (€) +/+", years].fillna(0)
)
spp = calc_simple_payback_time(
capex=capex,
free_cashflows=bc_calc.loc["Free Cash Flow (€)", years_o].values,
)
bc_calc.loc["Simple Payback Period", "Year 0"] = spp
try:
bc_calc.loc["IRR (%)", "Year 0"] = (
npf.irr(bc_calc.loc["Free Cash Flow (€)", years].values) * 100
)
except:
bc_calc.loc["IRR (%)", "Year 0"] = np.nan
bc_calc.loc["WACC (%)", "Year 0"] = discount_rate * 100
bc_calc.loc["NPV of explicit period (€)", "Year 0"] = npv(
discount_rate, bc_calc.loc["Free Cash Flow (€)"].values
)
bc_calc.loc["Discounted residual value (€)", "Year 0"] = (
residual_value / (1 + discount_rate) ** project_duration
)
bc_calc.loc["NPV (€)", "Year 0"] = (
bc_calc.loc["NPV of explicit period (€)", "Year 0"]
# + bc_calc.loc["Discounted residual value (€)", "Year 0"]
)
return bc_calc.round(2)
def calc_simple_payback_time(capex, free_cashflows):
if free_cashflows.sum() < capex:
return np.nan
year = 0
spp = 0
while capex > 0:
cashflow = free_cashflows[year]
spp += min(capex, cashflow) / cashflow
capex -= cashflow
year += 1
return spp
return round(spp, 1)
def _apply_vamil(depreciations, project_duration, ebitdas):
remaining_depr = sum(depreciations)
remaining_vamil = 0.75 * remaining_depr
for i in range(project_duration):
vamil_depr = min(ebitdas[i], remaining_vamil) if remaining_vamil > 0 else 0
if remaining_depr > 0:
lin_depr = remaining_depr / (project_duration - i)
depr = max(vamil_depr, lin_depr)
depreciations[i] = max(vamil_depr, lin_depr)
remaining_vamil -= vamil_depr
remaining_depr -= depr
else:
depreciations[i] = 0
return depreciations
def _apply_eia(bc_calc, project_duration, capex, years_o):
remaining_eia = 0.45 * capex
eia_per_year = [0] * project_duration
bc_calc = bc_calc.rename(index={"EBIT (€)": "EBIT before EIA (€)"})
ebits = bc_calc.loc["EBIT before EIA (€)", years_o].to_list()
eia_duration = min(10, project_duration)
for i in range(eia_duration):
if remaining_eia > 0:
eia_curr_year = max(min(remaining_eia, ebits[i]), 0)
eia_per_year[i] = eia_curr_year
remaining_eia -= eia_curr_year
else:
break
bc_calc.loc["EIA (€)", years_o] = np.array(eia_per_year) * -1
bc_calc.loc["EBIT (€)", :] = (
bc_calc.loc["EBIT before EIA (€)", :] + bc_calc.loc["EIA (€)", :]
)
return bc_calc
def calc_irf_value(
data, irf_volume, nomination_col=None, realisation_col=None, reco_col="reco"
):
"""Calculate IRF value
Takes a DataFrame [data] and returns the same DataFrame with a new column "IRF Value"
Parameters
----------
data : DataFrame
DataFrame that contains data. Should include price data (DAM, POS and NEG).
irf_volume : int
Volume on IRF in MW.
nomination_col : str
Name of the column containing nomination data in MWh.
realisation_col : str
Name of the column containing realisation data in MWh.
reco_col : str
Name of the column contaning recommendations.
"""
if not nomination_col:
nomination_col = "zero_nom"
data[nomination_col] = 0
if not realisation_col:
realisation_col = "zero_nom"
data[realisation_col] = 0
conversion_factor = pd.to_timedelta(data.index.freq) / timedelta(hours=1)
imb_pre_irf = data[realisation_col] - data[nomination_col]
result_pre_irf = (
data[nomination_col] * data["DAM"]
+ imb_pre_irf.where(imb_pre_irf > 0, other=0) * data["POS"]
+ imb_pre_irf.where(imb_pre_irf < 0, other=0) * data["NEG"]
)
data["IRF Nom"] = (
data[nomination_col] - data[reco_col] * irf_volume * conversion_factor
)
data["IRF Imb"] = data[realisation_col] - data["IRF Nom"]
result_post_irf = (
data["IRF Nom"] * data["DAM"]
+ data["IRF Imb"].where(data["IRF Imb"] > 0, other=0) * data["POS"]
+ data["IRF Imb"].where(data["IRF Imb"] < 0, other=0) * data["NEG"]
)
data["IRF Value"] = result_post_irf - result_pre_irf
return data