Reusable And Extensible Python Functions For Financial Data Analysis
··9142 words·43 mins

Table of Contents
Introduction #
This post intends to provide the code for all of the python functions that I use in my analysis. The goal here is that when writing another post I will simply be able to link to the functions below as opposed to providing the function code in each post.
Function Index #
- bb_clean_data: Takes an Excel export from Bloomberg, removes the miscellaneous headings/rows, and returns a DataFrame.
- build_index: Reads the
index_temp.mdmarkdown file, inserts the markdown dependencies where indicated, and then saves the file asindex.md. - calc_fed_cycle_asset_performance: Calculates metrics for an asset based on a specified Fed tightening/loosening cycle.
- calc_vix_trade_pnl: Calculates the profit/loss from VIX options trades.
- coinbase_fetch_available_products: Fetch available products from Coinbase Exchange API.
- coinbase_fetch_full_history: Fetch full historical data for a given product from Coinbase Exchange API.
- coinbase_fetch_historical_candles: Fetch historical candle data for a given product from Coinbase Exchange API.
- coinbase_pull_data: Update existing record or pull full historical data for a given product from Coinbase Exchange API.
- df_info: A simple function to display the information about a DataFrame and the first five rows and last five rows.
- df_info_markdown: Similar to the
df_infofunction above, except that it coverts the output to markdown. - export_track_md_deps: Exports various text outputs to markdown files, which are included in the
index.mdfile created when building the site with Hugo. - load_data: Load data from a CSV, Excel, or Pickle file into a pandas DataFrame.
- pandas_set_decimal_places: Set the number of decimal places displayed for floating-point numbers in pandas.
- plot_timeseries: Plot the price data from a DataFrame for a specified date range and columns.
- plot_bar_returns_ffr_change: Plot the bar chart of the cumulative or annualized returns for the asset class along with the change in the Fed Funds Rate.
- plot_stats: Generate a scatter plot for the mean OHLC prices.
- plot_vix_with_trades: Plot the VIX daily high and low prices, along with the VIX spikes, and trades.
- polygon_fetch_full_history: Fetch full historical data for a given product from Polygon API.
- polygon_pull_data: Read existing data file, download price data from Polygon, and export data.
- strategy_harry_brown_perm_port: Execute the strategy for the Harry Brown permanent portfolio.
- summary_stats: Generate summary statistics for a series of returns.
- yf_pull_data: Download daily price data from Yahoo Finance and export it.
Python Functions #
bb_clean_data #
import os
import pandas as pd
from IPython.display import display
def bb_clean_data(
base_directory: str,
fund_ticker_name: str,
source: str,
asset_class: str,
excel_export: bool,
pickle_export: bool,
output_confirmation: bool,
) -> pd.DataFrame:
"""
This function takes an excel export from Bloomberg and removes all
excess data leaving date and close columns.
Parameters
----------
base_directory : str
Root path to store downloaded data.
fund : str
The fund to clean the data from.
source : str
Name of the data source (e.g., 'Bloomberg').
asset_class : str
Asset class name (e.g., 'Equities').
excel_export : bool
If True, export data to Excel format.
pickle_export : bool
If True, export data to Pickle format.
output_confirmation : bool
If True, print confirmation message.
Returns
-------
df : pd.DataFrame
DataFrame containing cleaned data prices.
"""
# Set location from where to read existing excel file
location = f"{base_directory}/{source}/{asset_class}/Daily/{fund_ticker_name}.xlsx"
# Read data from excel
try:
df = pd.read_excel(location, sheet_name="Worksheet", engine="calamine")
except FileNotFoundError:
print(f"File not found...please download the data for {fund_ticker_name}")
# Set the column headings from row 5 (which is physically row 6)
df.columns = df.iloc[5]
# Set the column heading for the index to be "None"
df.rename_axis(None, axis=1, inplace=True)
# Drop the first 6 rows, 0 - 5
df.drop(df.index[0:6], inplace=True)
# Set the date column as the index
df.set_index("Date", inplace=True)
# Drop the volume column
try:
df.drop(columns={"PX_VOLUME"}, inplace=True)
except KeyError:
pass
# Rename column
df.rename(columns={"PX_LAST": "Close"}, inplace=True)
# Sort by date
df.sort_values(by=["Date"], inplace=True)
# Create directory
directory = f"{base_directory}/{source}/{asset_class}/Daily"
os.makedirs(directory, exist_ok=True)
# Export to excel
if excel_export == True:
df.to_excel(f"{directory}/{fund_ticker_name}_Clean.xlsx", sheet_name="data")
else:
pass
# Export to pickle
if pickle_export == True:
df.to_pickle(f"{directory}/{fund_ticker_name}_Clean.pkl")
else:
pass
# Output confirmation
if output_confirmation == True:
print(f"The first and last date of data for {fund_ticker_name} is: ")
display(df[:1])
display(df[-1:])
print(f"Bloomberg data cleaning complete for {fund_ticker_name}")
print(f"--------------------")
else:
pass
return df
build_index #
from pathlib import Path
def build_index() -> None:
"""
Build a Hugo-compatible index.md by combining Markdown fragments.
This function reads a template file (`index_temp.md`) and a list of markdown
dependencies from `index_dep.txt`. For each entry in the dependency list, it
replaces a corresponding placeholder in the template
(formatted as <!-- INSERT_<name>_HERE -->) with the content from the markdown
file. If a file is missing, the placeholder is replaced with a warning note.
Output
------
- Writes the final assembled content to `index.md`.
Raises
------
FileNotFoundError:
If either `index_temp.md` or `index_dep.txt` does not exist.
Example
-------
If `index_dep.txt` contains:
01_intro.md
02_analysis.md
And `index_temp.md` contains:
<!-- INSERT_01_intro_HERE -->
<!-- INSERT_02_analysis_HERE -->
The resulting `index.md` will include the contents of the respective markdown files in place
of their placeholders.
"""
temp_index_path = Path("index_temp.md")
final_index_path = Path("index.md")
dependencies_path = Path("index_dep.txt")
# Read the index template
if not temp_index_path.exists():
raise FileNotFoundError("Missing index_temp.md")
temp_index_content = temp_index_path.read_text()
# Read dependency list
if not dependencies_path.exists():
raise FileNotFoundError("Missing index_dep.txt")
with dependencies_path.open("r") as f:
markdown_files = [line.strip() for line in f if line.strip()]
# Replace placeholders for each dependency
final_index_content = temp_index_content
for md_file in markdown_files:
placeholder = f"<!-- INSERT_{Path(md_file).stem}_HERE -->"
if Path(md_file).exists():
content = Path(md_file).read_text()
final_index_content = final_index_content.replace(placeholder, content)
else:
print(
f"⚠️ Warning: {md_file} not found, skipping placeholder {placeholder}"
)
final_index_content = final_index_content.replace(
placeholder, f"*{md_file} not found*"
)
# Write final index.md
final_index_path.write_text(final_index_content)
print("✅ index.md successfully built!")
if __name__ == "__main__":
build_index()
calc_fed_cycle_asset_performance #
import numpy as np
import pandas as pd
def calc_fed_cycle_asset_performance(
fed_cycles: list,
cycle_labels: list,
fed_changes: list,
monthly_df: pd.DataFrame,
) -> pd.DataFrame:
results = []
for (start, end), label in zip(fed_cycles, cycle_labels):
start = pd.to_datetime(start)
end = pd.to_datetime(end)
# Filter TLT returns for the cycle period
returns = monthly_df.loc[start:end, "Monthly_Return"]
if len(returns) == 0:
continue
cumulative_return = (1 + returns).prod() - 1
average_return = returns.mean()
volatility = returns.std()
results.append({
"Cycle": label,
"Start": start.date(),
"End": end.date(),
"Months": len(returns),
"CumulativeReturn": cumulative_return,
"AverageMonthlyReturn": average_return,
"Volatility": volatility,
})
# Convert to DataFrame
cycle_df = pd.DataFrame(results)
cycle_df["CumulativeReturnPct"] = 100 * cycle_df["CumulativeReturn"]
cycle_df["AverageMonthlyReturnPct"] = 100 * cycle_df["AverageMonthlyReturn"]
cycle_df["AnnualizedReturn"] = (1 + cycle_df["CumulativeReturn"]) ** (12 / cycle_df["Months"]) - 1
cycle_df["AnnualizedReturnPct"] = 100 * cycle_df["AnnualizedReturn"]
# Correct the volatility calculation to annualized volatility
cycle_df["Volatility"] = cycle_df["Volatility"] * np.sqrt(12)
# Re-order columns
cycle_df = cycle_df[[
"Cycle", "Start", "End", "Months", "CumulativeReturn", "CumulativeReturnPct",
"AverageMonthlyReturn", "AverageMonthlyReturnPct", "AnnualizedReturn", "AnnualizedReturnPct", "Volatility",
]]
# Merge Fed changes into cycle_df
cycle_df["FedFundsChange"] = fed_changes
cycle_df["FedFundsChange_bps"] = cycle_df["FedFundsChange"] * 10000 # in basis
# Add annualized change in FFR in basis points
cycle_df["FFR_AnnualizedChange"] = (cycle_df["FedFundsChange"] / cycle_df["Months"]) * 12
cycle_df["FFR_AnnualizedChange_bps"] = cycle_df["FFR_AnnualizedChange"] * 10000 # Convert to basis points
cycle_df["Label"] = cycle_df.apply(
lambda row: f"{row['Cycle']}, {row['Start']} to {row['End']}", axis=1
)
return cycle_df
calc_vix_trade_pnl #
import pandas as pd
def calc_vix_trade_pnl(
transaction_df: pd.DataFrame,
exp_start_date: str,
exp_end_date: str,
trade_start_date: str,
trade_end_date: str,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, str, str, str, str]:
"""
Calculate the profit and loss (PnL) of trades based on transaction data.
Parameters
----------
transaction_df : pd.DataFrame
DataFrame containing transaction data.
exp_start_date : str
Start date for filtering transactions in 'YYYY-MM-DD' format. This is the start of the range for the option expiration date.
exp_end_date : str
End date for filtering transactions in 'YYYY-MM-DD' format. This is the end of the range for the option expiration date.
trade_start_date : str
Start date for filtering transactions in 'YYYY-MM-DD' format. This is the start of the range for the trade date.
trade_end_date : str
End date for filtering transactions in 'YYYY-MM-DD' format. This is the end of the range for the trade date.
Returns
-------
transactions_data : pd.DataFrame
Dataframe containing the transactions for the specified timeframe.
closed_trades : pd.DataFrame
DataFrame containing the closed trades with realized PnL and percent PnL.
open_trades : pd.DataFrame
DataFrame containing the open trades.
net_PnL_percent_str : str
String representation of the net profit percentage.
net_PnL_str : str
String representation of the net profit and loss in dollars.
"""
# If start and end dates for trades and expirations are None, use the entire DataFrame
if exp_start_date is None and exp_end_date is None and trade_start_date is None and trade_end_date is None:
transactions_data = transaction_df
# If both start and end dates for trades and expirations are provided then filter by both
else:
transactions_data = transaction_df[
(transaction_df['Exp_Date'] >= exp_start_date) & (transaction_df['Exp_Date'] <= exp_end_date) &
(transaction_df['Trade_Date'] >= trade_start_date) & (transaction_df['Trade_Date'] <= trade_end_date)
]
# Combine the 'Action' and 'Symbol' columns to create a unique identifier for each transaction
transactions_data['TradeDate_Action_Symbol_VIX'] = (
transactions_data['Trade_Date'].astype(str) +
", " +
transactions_data['Action'] +
", " +
transactions_data['Symbol'] +
", VIX = " +
transactions_data['Approx_VIX_Level'].astype(str)
)
# Split buys and sells and sum the notional amounts
transactions_sells = transactions_data[transactions_data['Action'] == 'Sell to Close']
transactions_sells = transactions_sells.groupby(['Symbol', 'Exp_Date'], as_index=False)[['Amount', 'Quantity']].sum()
transactions_buys = transactions_data[transactions_data['Action'] == 'Buy to Open']
transactions_buys = transactions_buys.groupby(['Symbol', 'Exp_Date'], as_index=False)[['Amount', 'Quantity']].sum()
# Merge buys and sells dataframes back together
merged_transactions = pd.merge(transactions_buys, transactions_sells, on=['Symbol', 'Exp_Date'], how='outer', suffixes=('_Buy', '_Sell'))
merged_transactions = merged_transactions.sort_values(by=['Exp_Date'], ascending=[True])
merged_transactions = merged_transactions.reset_index(drop=True)
# Identify the closed positions
merged_transactions['Closed'] = (~merged_transactions['Amount_Sell'].isna()) & (~merged_transactions['Amount_Buy'].isna()) & (merged_transactions['Quantity_Buy'] == merged_transactions['Quantity_Sell'])
# Create a new dataframe for closed positions
closed_trades = merged_transactions[merged_transactions['Closed']]
closed_trades = closed_trades.reset_index(drop=True)
closed_trades['Realized_PnL'] = closed_trades['Amount_Sell'] - closed_trades['Amount_Buy']
closed_trades['Percent_PnL'] = closed_trades['Realized_PnL'] / closed_trades['Amount_Buy']
closed_trades.drop(columns={'Closed', 'Exp_Date'}, inplace=True)
closed_trades['Quantity_Sell'] = closed_trades['Quantity_Sell'].astype(int)
# Calculate the net % PnL
net_PnL_percent = closed_trades['Realized_PnL'].sum() / closed_trades['Amount_Buy'].sum()
net_PnL_percent_str = f"{round(net_PnL_percent * 100, 2)}%"
# Calculate the net $ PnL
net_PnL = closed_trades['Realized_PnL'].sum()
net_PnL_str = f"${net_PnL:,.2f}"
# Create a new dataframe for open positions
open_trades = merged_transactions[~merged_transactions['Closed']]
open_trades = open_trades.reset_index(drop=True)
open_trades.drop(columns={'Closed', 'Amount_Sell', 'Quantity_Sell', 'Exp_Date'}, inplace=True)
# Calculate the total market value of opened positions
# If start and end dates for trades and expirations are None, use only the closed positions
if exp_start_date is None and exp_end_date is None and trade_start_date is None and trade_end_date is None:
total_opened_pos_mkt_val = closed_trades['Amount_Buy'].sum()
else:
total_opened_pos_mkt_val = closed_trades['Amount_Buy'].sum() + open_trades['Amount_Buy'].sum()
total_opened_pos_mkt_val_str = f"${total_opened_pos_mkt_val:,.2f}"
# Calculate the total market value of closed positions
total_closed_pos_mkt_val = closed_trades['Amount_Sell'].sum()
total_closed_pos_mkt_val_str = f"${total_closed_pos_mkt_val:,.2f}"
return transactions_data, closed_trades, open_trades, net_PnL_percent_str, net_PnL_str, total_opened_pos_mkt_val_str, total_closed_pos_mkt_val_str
coinbase_fetch_available_products #
import pandas as pd
import requests
def coinbase_fetch_available_products(
base_currency: str,
quote_currency: str,
status: str,
) -> pd.DataFrame:
"""
Fetch available products from Coinbase Exchange API.
Parameters:
-----------
base_currency : str, optional
Filter products by base currency (e.g., 'BTC').
quote_currency : str, optional
Filter products by quote currency (e.g., 'USD').
status : str, optional
Filter products by status (e.g., 'online', 'offline').
Returns:
--------
pd.DataFrame
DataFrame containing available products with their details.
"""
url = 'https://api.exchange.coinbase.com/products'
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
products = response.json()
# Convert the list of products into a pandas DataFrame
df = pd.DataFrame(products)
# Filter by base_currency if provided
if base_currency:
df = df[df['base_currency'] == base_currency]
# Filter by quote_currency if provided
if quote_currency:
df = df[df['quote_currency'] == quote_currency]
# Filter by status if provided
if status:
df = df[df['status'] == status]
# Sort by "id"
df = df.sort_values(by='id')
return df
except requests.exceptions.HTTPError as errh:
print(f"HTTP Error: {errh}")
except requests.exceptions.ConnectionError as errc:
print(f"Error Connecting: {errc}")
except requests.exceptions.Timeout as errt:
print(f"Timeout Error: {errt}")
except requests.exceptions.RequestException as err:
print(f"Oops: Something Else {err}")
if __name__ == "__main__":
# Example usage
df = coinbase_fetch_available_products(
base_currency=None,
quote_currency="USD",
status="online",
)
if df is not None:
print(df)
else:
print("No data returned.")
coinbase_fetch_full_history #
import pandas as pd
import time
from coinbase_fetch_historical_candles import coinbase_fetch_historical_candles
from datetime import datetime, timedelta
def coinbase_fetch_full_history(
product_id: str,
start: datetime,
end: datetime,
granularity: int,
) -> pd.DataFrame:
"""
Fetch full historical data for a given product from Coinbase Exchange API.
Parameters:
-----------
product_id : str
The trading pair (e.g., 'BTC-USD').
start : datetime
Start time in UTC.
end : datetime
End time in UTC.
granularity : int
Time slice in seconds (e.g., 3600 for hourly candles).
Returns:
--------
pd.DataFrame
DataFrame containing time, low, high, open, close, volume.
"""
all_data = []
current_start = start
while current_start < end:
current_end = min(current_start + timedelta(seconds=granularity * 300), end) # Fetch max 300 candles per request
df = coinbase_fetch_historical_candles(product_id, current_start, current_end, granularity)
if df.empty:
break
all_data.append(df)
current_start = df['time'].iloc[-1] + timedelta(seconds=granularity)
time.sleep(0.2) # Small delay to respect rate limits
if all_data:
full_df = pd.concat(all_data).reset_index(drop=True)
return full_df
else:
return pd.DataFrame()
if __name__ == "__main__":
# Example usage
df = coinbase_fetch_full_history(
product_id="BTC-USD",
start=datetime(2025, 1, 1),
end=datetime(2025, 1, 31),
granularity=86_400,
)
if df is not None:
print(df)
else:
print("No data returned.")
coinbase_fetch_historical_candles #
import pandas as pd
import requests
import time
from datetime import datetime
def coinbase_fetch_historical_candles(
product_id: str,
start: datetime,
end: datetime,
granularity: int,
) -> pd.DataFrame:
"""
Fetch historical candle data for a given product from Coinbase Exchange API.
Parameters:
-----------
product_id : str
The trading pair (e.g., 'BTC-USD').
start : str
Start time in UTC.
end : str
End time in UTC.
granularity : int
Time slice in seconds (e.g., 60 for minute candles, 3600 for hourly candles, 86,400 for daily candles).
Returns:
--------
pd.DataFrame
DataFrame containing time, low, high, open, close, volume.
"""
url = f'https://api.exchange.coinbase.com/products/{product_id}/candles'
params = {
'start': start.isoformat(),
'end': end.isoformat(),
'granularity': granularity
}
max_retries = 5
retry_delay = 1 # initial delay in seconds
for attempt in range(max_retries):
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
# Coinbase Exchange API returns data in reverse chronological order
data = data[::-1]
# Convert to DataFrame
df = pd.DataFrame(data, columns=['time', 'low', 'high', 'open', 'close', 'volume'])
df['time'] = pd.to_datetime(df['time'], unit='s')
return df
except requests.exceptions.HTTPError as errh:
if response.status_code == 429:
print(f"Rate limit exceeded. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
print(f"HTTP Error: {errh}")
break
except requests.exceptions.ConnectionError as errc:
print(f"Error Connecting: {errc}")
time.sleep(retry_delay)
retry_delay *= 2
except requests.exceptions.Timeout as errt:
print(f"Timeout Error: {errt}")
time.sleep(retry_delay)
retry_delay *= 2
except requests.exceptions.RequestException as err:
print(f"OOps: Something Else {err}")
break
raise Exception("Failed to fetch data after multiple retries.")
if __name__ == "__main__":
# Example usage
df = coinbase_fetch_historical_candles(
product_id="BTC-USD",
start=datetime(2025, 1, 1),
end=datetime(2025, 1, 2),
granularity=3_600,
)
if df is not None:
print(df)
else:
print("No data returned.")
coinbase_pull_data #
import calendar
import os
import pandas as pd
from coinbase_fetch_available_products import coinbase_fetch_available_products
from coinbase_fetch_full_history import coinbase_fetch_full_history
from datetime import datetime, timedelta
from settings import config
# Get the data directory from the configuration
DATA_DIR = config("DATA_DIR")
def coinbase_pull_data(
base_directory,
source: str,
asset_class: str,
excel_export: bool,
pickle_export: bool,
output_confirmation: bool,
base_currency: str,
quote_currency: str,
granularity: int=3600, # 60=minute, 3600=hourly, 86400=daily
status: str='online', # default status is 'online'
start_date: datetime=datetime(2025, 1, 1), # default start date
end_date: datetime=datetime.now() - timedelta(days=1), # updates data through 1 day ago due to lag in data availability
) -> pd.DataFrame:
"""
Update existing record or pull full historical data for a given product from Coinbase Exchange API.
Parameters:
-----------
base_directory
Root path to store downloaded data.
source : str
Name of the data source (e.g., 'Nasdaq_Data_Link').
asset_class : str
Asset class name (e.g., 'Equities').
excel_export : bool
If True, export data to Excel format.
pickle_export : bool
If True, export data to Pickle format.
output_confirmation : bool
If True, print confirmation message.
base_currency : str
The base currency (e.g., 'BTC').
quote_currency : str
The quote currency (e.g., 'USD').
status : str, optional
Filter products by status (default is 'online').
granularity : int
Time slice in seconds (e.g., 3600 for hourly candles).
start_date : str, optional
Start date in UTC (ISO format).
end_date : str, optional
End date in UTC (ISO format).
Returns:
--------
None
"""
# List of crypto assets
filtered_products = coinbase_fetch_available_products(base_currency=base_currency, quote_currency=quote_currency, status=status)
filtered_products_list = filtered_products['id'].tolist()
filtered_products_list = sorted(filtered_products_list)
if not filtered_products.empty:
print(filtered_products[['id', 'base_currency', 'quote_currency', 'status']])
print(filtered_products_list)
print(len(filtered_products_list))
else:
print("No products found with the specified base and/or quote currencies.")
missing_data = []
omitted_data = []
num_products = len(filtered_products_list)
counter = 0
# Loop for updates
for product in filtered_products_list:
counter+=1
print(f"Updating product {counter} of {num_products}.")
if granularity == 60:
time_length = "Minute"
elif granularity == 3600:
time_length = "Hourly"
elif granularity == 86400:
time_length = "Daily"
else:
print("Error - please confirm timeframe.")
break
# Set file location based on parameters
file_location = f"{base_directory}/{source}/{asset_class}/{time_length}/{product}.pkl"
try:
# Attempt to read existing pickle data file
ex_data = pd.read_pickle(file_location)
ex_data = ex_data.reset_index()
print(f"File found...updating the {product} data")
print("Existing data:")
print(ex_data)
# Pull recent data
new_data = coinbase_fetch_full_history(product, start_date, end_date, granularity)
new_data = new_data.rename(columns={'time':'Date'})
new_data['Date'] = new_data['Date'].dt.tz_localize(None)
print("New data:")
print(new_data)
# Combine existing data with recent data
full_history_df = pd.concat([ex_data,new_data[new_data['Date'].isin(ex_data['Date']) == False]])
full_history_df = full_history_df.sort_values(by='Date')
full_history_df['Date'] = full_history_df['Date'].dt.tz_localize(None)
full_history_df = full_history_df.set_index('Date')
print("Combined data:")
print(full_history_df)
# Create directory
directory = f"{base_directory}/{source}/{asset_class}/{time_length}"
os.makedirs(directory, exist_ok=True)
# Export to excel
if excel_export == True:
full_history_df.to_excel(f"{directory}/{product}.xlsx", sheet_name="data")
else:
pass
# Export to pickle
if pickle_export == True:
full_history_df.to_pickle(f"{directory}/{product}.pkl")
else:
pass
# Output confirmation
if output_confirmation == True:
print(f"Data update complete for {time_length} {product}.")
print("--------------------")
else:
pass
except FileNotFoundError:
# Starting year for fetching initial data
starting_year = 2025
# Print error
print(f"File not found...downloading the {product} data starting with {starting_year}.")
def get_full_hist(year):
try:
# Define the start and end dates
start_date = datetime(year, 1, 1) # Default start date
end_date = datetime.now() - timedelta(days = 1) # Updates data through 1 day ago
# Fetch and process the data
full_history_df = coinbase_fetch_full_history(product, start_date, end_date, granularity)
full_history_df = full_history_df.rename(columns={'time': 'Date'})
full_history_df = full_history_df.sort_values(by='Date')
# Iterate through rows to see if the value of the asset ever exceeds a specified threshold
# Default value for the price threshold is 0 USD
# If the price never exceeds this threshold, the asset is omitted from the final list
def find_first_close_above_threshold(full_history_df, threshold=0):
# Ensure 'Date' is the index before proceeding
if 'Date' in full_history_df.columns:
full_history_df.set_index('Date', inplace=True)
full_history_df.index = full_history_df.index.tz_localize(None)
# Iterate through the DataFrame
for index, row in full_history_df.iterrows():
if row['close'] >= threshold:
print(f"First occurrence: {index}, close={row['close']}")
# Return the filtered DataFrame starting from this row
return full_history_df.loc[index:]
# If no value meets the condition, return an empty DataFrame
print(f"Share price never exceeds {threshold} USD.")
omitted_data.append(product)
return None
full_history_above_threshold_df = find_first_close_above_threshold(full_history_df, threshold=0)
return full_history_above_threshold_df
except KeyError:
print(f"KeyError: No data available for {product} in {year}. Trying next year...")
next_year = year + 1
# Base case: Stop if the next year exceeds the current year
if next_year > datetime.now().year:
print("No more data available for any future years.")
missing_data.append(product)
return None
# Recursive call for the next year
return get_full_hist(year=next_year)
# Fetch the full history starting from the given year
full_history_df = get_full_hist(year=starting_year)
if full_history_df is not None:
# Create directory
directory = f"{base_directory}/{source}/{asset_class}/{time_length}"
os.makedirs(directory, exist_ok=True)
# Export to excel
if excel_export == True:
full_history_df.to_excel(f"{directory}/{product}.xlsx", sheet_name="data")
else:
pass
# Export to pickle
if pickle_export == True:
full_history_df.to_pickle(f"{directory}/{product}.pkl")
else:
pass
# Output confirmation
if output_confirmation == True:
print(f"Initial data fetching completed successfully for {time_length} {product}.")
print("--------------------")
else:
pass
else:
print("No data could be fetched for the specified range.")
except Exception as e:
print(str(e))
# Remove the cryptocurrencies with missing data from the final list
missing_data = sorted(missing_data)
print(f"Data missing for: {missing_data}")
for asset in missing_data:
try:
print(f"Removing {asset} from the list because it is missing data.")
filtered_products_list.remove(asset)
except ValueError:
print(f"{asset} not in list.")
pass
# Remove the cryptocurrencies with share prices that never exceed 1 USD from the final list
omitted_data = sorted(omitted_data)
print(f"Data omitted due to price for: {omitted_data}")
for asset in omitted_data:
try:
print(f"Removing {asset} from the list because the share price never exceeds 1 USD.")
filtered_products_list.remove(asset)
except ValueError:
print(f"{asset} not in list.")
pass
# Remove stablecoins from the final list
stablecoins_to_remove = ['USDT-USD', 'USDC-USD', 'PAX-USD', 'DAI-USD', 'PYUSD-USD', 'GUSD-USD']
stablecoins_to_remove = sorted(stablecoins_to_remove)
print(f"Data for stable coins not to be used: {stablecoins_to_remove}")
for asset in stablecoins_to_remove:
try:
filtered_products_list.remove(asset)
# print(f"Removing {asset} from the list because it is a stablecoin.")
except ValueError:
# print(f"{asset} not in list.")
pass
# Remove the wrapped coins from the final list
wrapped_coins_to_remove = ['WAXL-USD', 'WBTC-USD']
wrapped_coins_to_remove = sorted(wrapped_coins_to_remove)
print(f"Data for wrapped coins not to be used: {wrapped_coins_to_remove}")
for asset in wrapped_coins_to_remove:
try:
filtered_products_list.remove(asset)
# print(f"Removing {asset} from the list because it is a wrapped coin.")
except ValueError:
# print(f"{asset} not in list.")
pass
# Print the final list of token and the length of the list
print(f"Final list of tokens: {filtered_products_list}")
print(f"Length of final list of tokens: {len(filtered_products_list)}")
return full_history_df
if __name__ == "__main__":
# Example usage to pull all data for each month from 2010 to 2024
for granularity in [60, 3600, 86400]:
for year in range(2010, 2025):
for month in range(1, 13):
print(f"Pulling data for {year}-{month:02d}...")
try:
# Get the last day of the month
last_day = calendar.monthrange(year, month)[1]
coinbase_pull_data(
base_directory=DATA_DIR,
source="Coinbase",
asset_class="Cryptocurrencies",
excel_export=False,
pickle_export=True,
output_confirmation=True,
base_currency="BTC",
quote_currency="USD",
granularity=granularity, # 60=minute, 3600=hourly, 86400=daily
status='online',
start_date=datetime(year, month, 1),
end_date=datetime(year, month, last_day),
)
except Exception as e:
print(f"Failed to pull data for {year}-{month:02d}: {e}")
# current_year = datetime.now().year
# current_month = datetime.now().month
# current_day = datetime.now().day
# # Crypto Data
# currencies = ["BTC", "ETH", "SOL", "XRP"]
# # Iterate through each currency
# for cur in currencies:
# # Example usage - minute
# coinbase_pull_data(
# base_directory=DATA_DIR,
# source="Coinbase",
# asset_class="Cryptocurrencies",
# excel_export=False,
# pickle_export=True,
# output_confirmation=True,
# base_currency=cur,
# quote_currency="USD",
# granularity=60, # 60=minute, 3600=hourly, 86400=daily
# status='online', # default status is 'online'
# start_date=datetime(current_year, current_month - 1, 1), # default start date
# end_date=datetime.now() - timedelta(days=1), # updates data through 1 day ago due to lag in data availability
# )
# # Example usage - hourly
# coinbase_pull_data(
# base_directory=DATA_DIR,
# source="Coinbase",
# asset_class="Cryptocurrencies",
# excel_export=True,
# pickle_export=True,
# output_confirmation=True,
# base_currency=cur,
# quote_currency="USD",
# granularity=3600, # 60=minute, 3600=hourly, 86400=daily
# status='online', # default status is 'online'
# start_date=datetime(current_year, current_month - 1, 1), # default start date
# end_date=datetime.now() - timedelta(days=1), # updates data through 1 day ago due to lag in data availability
# )
# # Example usage - daily
# coinbase_pull_data(
# base_directory=DATA_DIR,
# source="Coinbase",
# asset_class="Cryptocurrencies",
# excel_export=True,
# pickle_export=True,
# output_confirmation=True,
# base_currency=cur,
# quote_currency="USD",
# granularity=86400, # 60=minute, 3600=hourly, 86400=daily
# status='online', # default status is 'online'
# start_date=datetime(current_year, current_month - 1, 1), # default start date
# end_date=datetime.now() - timedelta(days=1), # updates data through 1 day ago due to lag in data availability
# )
df_info #
import pandas as pd
from IPython.display import display
def df_info(
df: pd.DataFrame,
) -> None:
"""
Display summary information about a pandas DataFrame.
This function prints:
- The DataFrame's column names, shape, and data types via `df.info()`
- The first 5 rows using `df.head()`
- The last 5 rows using `df.tail()`
It uses `display()` for better output formatting in environments like Jupyter notebooks.
Parameters:
-----------
df : pd.DataFrame
The DataFrame to inspect.
Returns:
--------
None
Example:
--------
>>> df_info(my_dataframe)
"""
print("The columns, shape, and data types are:")
print(df.info())
print("The first 5 rows are:")
display(df.head())
print("The last 5 rows are:")
display(df.tail())
df_info_markdown #
import io
import pandas as pd
def df_info_markdown(
df: pd.DataFrame,
decimal_places: int = 2,
) -> str:
"""
Generate a Markdown-formatted summary of a pandas DataFrame.
This function captures and formats the output of `df.info()`, `df.head()`,
and `df.tail()` in Markdown for easy inclusion in reports, documentation,
or web-based rendering (e.g., Hugo or Jupyter export workflows).
Parameters:
-----------
df : pd.DataFrame
The DataFrame to summarize.
Returns:
--------
str
A string containing the DataFrame's info, head, and tail
formatted in Markdown.
Example:
--------
>>> print(df_info_markdown(df))
```text
The columns, shape, and data types are:
<output from df.info()>
```
The first 5 rows are:
| | col1 | col2 |
|---|------|------|
| 0 | ... | ... |
The last 5 rows are:
...
"""
buffer = io.StringIO()
# Capture df.info() output
df.info(buf=buffer)
info_str = buffer.getvalue()
# Convert head and tail to Markdown
head_str = df.head().to_markdown(floatfmt=f".{decimal_places}f")
tail_str = df.tail().to_markdown(floatfmt=f".{decimal_places}f")
markdown = [
"```text",
"The columns, shape, and data types are:\n",
info_str,
"```",
"\nThe first 5 rows are:\n",
# "The first 5 rows are:\n",
head_str,
"\nThe last 5 rows are:\n",
tail_str
]
# markdown = [
# "```text",
# "The columns, shape, and data types are:\n",
# info_str,
# "\nThe first 5 rows are:\n",
# head_str,
# "\nThe last 5 rows are:\n",
# tail_str,
# "```"
# ]
# markdown = [
# "The columns, shape, and data types are:\n",
# info_str,
# # "\nThe first 5 rows are:\n",
# "The first 5 rows are:\n",
# head_str,
# "\nThe last 5 rows are:\n",
# tail_str
# ]
return "\n".join(markdown)
export_track_md_deps #
from pathlib import Path
def export_track_md_deps(
dep_file: Path,
md_filename: str,
content: str,
output_type: str = "markdown",
) -> None:
"""
Export Markdown content to a file and track it as a dependency.
This function writes the provided content to the specified
Markdown file and appends the filename to the given dependency
file (typically `index_dep.txt`). This is useful in workflows
where Markdown fragments are later assembled into a larger
document (e.g., a Hugo `index.md`).
Parameters
----------
dep_file : Path
Path to the dependency file that tracks Markdown fragment
filenames.
md_filename : str
The name of the Markdown file to export.
content : str
The Markdown-formatted content to write to the file.
output_type : str, optional
Indicates whether the content is plain text, Python code,
or markdown for proper formatting (default: "text").
Returns
-------
None
"""
if output_type == "python":
Path(md_filename).write_text(f"```python\n{content}\n```")
elif output_type == "text":
Path(md_filename).write_text(f"```text\n{content}\n```")
elif output_type == "markdown":
Path(md_filename).write_text(content)
else:
raise ValueError("'output_type' must be either 'text', 'python', or 'markdown'.")
with dep_file.open("a") as f:
f.write(md_filename + "\n")
print(f"✅ Exported and tracked: {md_filename}")
load_api_keys #
import os
from dotenv import load_dotenv
from pathlib import Path
from settings import config
# Get the environment variable file path from the configuration
ENV_PATH = config("ENV_PATH")
def load_api_keys(
env_path: Path=ENV_PATH
) -> dict:
"""
Load API keys from a .env file.
Parameters:
-----------
env_path : Path
Path to the .env file. Default is the ENV_PATH from settings.
Returns:
--------
keys : dict
Dictionary of API keys.
"""
load_dotenv(dotenv_path=env_path)
keys = {
"INFURA_KEY": os.getenv("INFURA_KEY"),
"NASDAQ_DATA_LINK_KEY": os.getenv("NASDAQ_DATA_LINK_KEY"),
"COINBASE_KEY": os.getenv("COINBASE_KEY"),
"COINBASE_SECRET": os.getenv("COINBASE_SECRET"),
"SCHWAB_APP_KEY": os.getenv("SCHWAB_APP_KEY"),
"SCHWAB_SECRET": os.getenv("SCHWAB_SECRET"),
"SCHWAB_ACCOUNT_NUMBER_1": os.getenv("SCHWAB_ACCOUNT_NUMBER_1"),
"SCHWAB_ENCRYPTED_ACCOUNT_ID_1": os.getenv("SCHWAB_ENCRYPTED_ACCOUNT_ID_1"),
"SCHWAB_ACCOUNT_NUMBER_2": os.getenv("SCHWAB_ACCOUNT_NUMBER_2"),
"SCHWAB_ENCRYPTED_ACCOUNT_ID_2": os.getenv("SCHWAB_ENCRYPTED_ACCOUNT_ID_2"),
"SCHWAB_ACCOUNT_NUMBER_3": os.getenv("SCHWAB_ACCOUNT_NUMBER_3"),
"SCHWAB_ENCRYPTED_ACCOUNT_ID_3": os.getenv("SCHWAB_ENCRYPTED_ACCOUNT_ID_3"),
"POLYGON_KEY": os.getenv("POLYGON_KEY"),
}
# Raise error if any key is missing
for k, v in keys.items():
if not v:
raise ValueError(f"Missing environment variable: {k}")
return keys
if __name__ == "__main__":
# Example usage
api_keys = load_api_keys()
print("API keys loaded successfully.")
load_data #
import pandas as pd
from pathlib import Path
def load_data(
base_directory,
ticker: str,
source: str,
asset_class: str,
timeframe: str,
file_format: str,
) -> pd.DataFrame:
"""
Load data from a CSV, Excel, or Pickle file into a pandas DataFrame.
This function attempts to read a file first as a CSV, then as an Excel file
(specifically looking for a sheet named 'data' and using the 'calamine' engine).
If both attempts fail, a ValueError is raised.
Parameters:
-----------
base_directory
Root path to read data file.
ticker : str
Ticker symbol to read.
source : str
Name of the data source (e.g., 'Yahoo').
asset_class : str
Asset class name (e.g., 'Equities').
timeframe : str
Timeframe for the data (e.g., 'Daily', 'Month_End').
file_format : str
Format of the file to load ('csv', 'excel', or 'pickle')
Returns:
--------
pd.DataFrame
The loaded data.
Raises:
-------
ValueError
If the file could not be loaded as either CSV or Excel.
Example:
--------
>>> df = load_data(DATA_DIR, "^VIX", "Yahoo_Finance", "Indices")
"""
if file_format == "csv":
csv_path = Path(base_directory) / source / asset_class / timeframe / f"{ticker}.csv"
df = pd.read_csv(csv_path)
return df
elif file_format == "excel":
xlsx_path = Path(base_directory) / source / asset_class / timeframe / f"{ticker}.xlsx"
df = pd.read_excel(xlsx_path, sheet_name="data", engine="calamine")
return df
elif file_format == "pickle":
pickle_path = Path(base_directory) / source / asset_class / timeframe / f"{ticker}.pkl"
df = pd.read_pickle(pickle_path)
return df
else:
raise ValueError(f"❌ Unsupported file format: {file_format}. Please use 'csv', 'excel', or 'pickle'.")
pandas_set_decimal_places #
import pandas as pd
def pandas_set_decimal_places(
decimal_places: int,
) -> None:
"""
Set the number of decimal places displayed for floating-point numbers in pandas.
Parameters:
----------
decimal_places : int
The number of decimal places to display for float values in pandas DataFrames and Series.
Returns:
--------
None
Example:
--------
>>> dp(3)
>>> pd.DataFrame([1.23456789])
0
0 1.235
"""
pd.set_option('display.float_format', lambda x: f'%.{decimal_places}f' % x)
plot_bar_returns_ffr_change #
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
def plot_bar_returns_ffr_change(
cycle_df: pd.DataFrame,
asset_label: str,
annualized_or_cumulative: str,
index_num: str,
) -> None:
plt.figure(figsize=(10, 8))
if annualized_or_cumulative == "Cumulative":
# Create bar plot for cumulative returns
barplot = sns.barplot(
data=cycle_df,
x="Label",
y="CumulativeReturnPct",
palette="coolwarm"
)
# Annotate each bar with cumulative return and Fed rate change
for i, row in cycle_df.iterrows():
barplot.text(
i,
row["CumulativeReturnPct"] + 1,
f"{row['CumulativeReturnPct']:.1f}%\nΔFFR: {row['FedFundsChange_bps']:.0f}bps",
color="black",
ha="center",
fontsize=10
)
elif annualized_or_cumulative == "Annualized":
# Create bar plot for annualized returns
barplot = sns.barplot(
data=cycle_df,
x="Label", # Date range labels from earlier
y="AnnualizedReturnPct",
palette="coolwarm"
)
# Annotate each bar with annualized return + annualized FFR change
for i, row in cycle_df.iterrows():
barplot.text(
i,
row["AnnualizedReturnPct"] + 1,
f"{row['AnnualizedReturnPct']:.1f}%\nΔFFR: {row['FFR_AnnualizedChange_bps']:.0f}bps/yr",
color="black",
ha="center",
fontsize=10
)
plt.ylabel(f"{asset_label} {annualized_or_cumulative} Return (%)", fontsize=14)
plt.yticks(fontsize=12)
plt.xlabel("Fed Policy Cycle (Date Range)", fontsize=14)
plt.xticks(rotation=45, ha="right", fontsize=12)
plt.title(f"{asset_label} {annualized_or_cumulative} Return by Fed Policy Cycle With {annualized_or_cumulative} Change in Fed Funds Rate", fontsize=16)
plt.tight_layout()
plt.savefig(f"{index_num}_{asset_label}_{annualized_or_cumulative}_Returns_FFR_Change.png", dpi=300, bbox_inches="tight")
plt.show()
plot_scatter_regression_ffr_vs_returns #
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import statsmodels.api as sm
def plot_scatter_regression_ffr_vs_returns(
cycle_df: pd.DataFrame,
asset_label: str,
index_num: str,
x_vals: np.ndarray,
y_vals: np.ndarray,
intercept: float,
slope: float,
) -> None:
plt.figure(figsize=(10, 6))
sns.scatterplot(
data=cycle_df,
x="FFR_AnnualizedChange_bps",
y="AnnualizedReturnPct",
s=100,
color="blue"
)
# Annotate each point with the cycle number or date range, annualized returns and FFR
for i, row in cycle_df.iterrows():
plt.text(
row["FFR_AnnualizedChange_bps"] + 5, # small x-offset
row["AnnualizedReturnPct"],
row["Cycle"],
fontsize=10,
color="black"
)
plt.plot(x_vals, y_vals, color="red", linestyle="--", label=f"OLS Fit: y = {intercept:.1f} + {slope:.2f}x")
plt.axhline(0, color="gray", linestyle="--", linewidth=0.8)
plt.axvline(0, color="gray", linestyle="--", linewidth=0.8)
plt.title(f"{asset_label} Annualized Return vs Annualized Change in Fed Funds Rate by Policy Cycle", fontsize=16)
plt.xlabel("Annualized Change In Fed Funds Rate (bps)", fontsize=14)
plt.xticks(fontsize=12)
plt.ylabel(f"{asset_label} Annualized Return (%)", fontsize=14)
plt.yticks(fontsize=12)
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(f"{index_num}_{asset_label}_Regression_FFR_vs_Returns.png", dpi=300, bbox_inches="tight")
plt.show()
plot_timeseries #
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.ticker as mtick
import pandas as pd
from matplotlib.ticker import FormatStrFormatter, MultipleLocator, PercentFormatter
def plot_timeseries(
price_df: pd.DataFrame,
plot_start_date: str,
plot_end_date: str,
plot_columns,
title: str,
x_label: str,
x_format: str,
y_label: str,
y_format: str,
y_format_decimal_places: int,
y_tick_spacing: int,
grid: bool,
legend: bool,
export_plot: bool,
plot_file_name: str,
x_tick_rotation: int = 0,
) -> None:
"""
Plot the price data from a DataFrame for a specified date range and columns.
Parameters:
-----------
df : pd.DataFrame
DataFrame containing the price data to plot.
plot_start_date : str
Start date for the plot in 'YYYY-MM-DD' format.
plot_end_date : str
End date for the plot in 'YYYY-MM-DD' format.
plot_columns : str OR list
List of columns to plot from the DataFrame. If none, all columns will be plotted.
title : str
Title of the plot.
x_label : str
Label for the x-axis.
x_format : str
Format for the x-axis date labels.
x_tick_rotation : int, optional
Rotation angle for the x-axis tick labels (default: 0).
y_label : str
Label for the y-axis.
y_format : str
Format for the y-axis labels.
y_format_decimal_places : int
Number of decimal places for y-axis labels.
y_tick_spacing : int
Spacing for the y-axis ticks.
grid : bool
Whether to display a grid on the plot.
legend : bool
Whether to display a legend on the plot.
export_plot : bool
Whether to save the figure as a PNG file.
plot_file_name : str
File name for saving the figure (if save_fig is True).
Returns:
--------
None
"""
# If start date and end date are None, use the entire DataFrame
if plot_start_date is None and plot_end_date is None:
df_filtered = price_df
# If only end date is specified, filter by end date
elif plot_start_date is None and plot_end_date is not None:
df_filtered = price_df[(price_df.index <= plot_end_date)]
# If only start date is specified, filter by start date
elif plot_start_date is not None and plot_end_date is None:
df_filtered = price_df[(price_df.index >= plot_start_date)]
# If both start date and end date are specified, filter by both
else:
df_filtered = price_df[(price_df.index >= plot_start_date) & (price_df.index <= plot_end_date)]
# Set plot figure size and background color
# plt.figure(figsize=(10, 6), facecolor="#F5F5F5")
# plt.figure(figsize=(10, 6), facecolor=(249/255, 250/255, 251/255))
plt.figure(figsize=(10, 6))
# Plot data
if plot_columns =="All":
for col in df_filtered.columns:
plt.plot(df_filtered.index, df_filtered[col], label=col, linestyle='-', linewidth=1.5)
else:
for col in plot_columns:
plt.plot(df_filtered.index, df_filtered[col], label=col, linestyle='-', linewidth=1.5)
# Format X axis
if x_format == "Day":
plt.gca().xaxis.set_major_locator(mdates.DayLocator())
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%d %b %Y"))
elif x_format == "Week":
plt.gca().xaxis.set_major_locator(mdates.WeekdayLocator())
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%d %b %Y"))
elif x_format == "Month":
plt.gca().xaxis.set_major_locator(mdates.MonthLocator())
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%b %Y"))
elif x_format == "Year":
plt.gca().xaxis.set_major_locator(mdates.YearLocator())
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y"))
else:
raise ValueError(f"Unrecognized x_format: {x_format}. Use 'Day', 'Week', 'Month', or 'Year'.")
plt.xlabel(x_label, fontsize=14)
plt.xticks(rotation=x_tick_rotation, fontsize=12)
# Format Y axis
if y_format == "Decimal":
plt.gca().yaxis.set_major_formatter(FormatStrFormatter(f"%.{y_format_decimal_places}f"))
elif y_format == "Percentage":
plt.gca().yaxis.set_major_formatter(PercentFormatter(xmax=1, decimals=y_format_decimal_places))
elif y_format == "Scientific":
plt.gca().yaxis.set_major_formatter(FormatStrFormatter(f"%.{y_format_decimal_places}e"))
elif y_format == "Log":
plt.yscale("log")
else:
raise ValueError(f"Unrecognized y_format: {y_format}. Use 'Decimal', 'Percentage', or 'Scientific'.")
plt.gca().yaxis.set_major_locator(MultipleLocator(y_tick_spacing))
plt.ylabel(y_label, fontsize=14)
plt.yticks(fontsize=12)
# Format title, layout, grid, and legend
plt.title(title, fontsize=16)
plt.tight_layout()
if grid == True:
plt.grid(True, linestyle='--', alpha=0.7)
if legend == True:
plt.legend(fontsize=9)
# Save figure and display plot
if export_plot == True:
plt.savefig(f"{plot_file_name}.png", dpi=300, bbox_inches="tight")
# Display the plot
plt.show()
return None
plot_stats #
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.ticker import MultipleLocator
def plot_stats(
stats_df: pd.DataFrame,
plot_columns,
title: str,
x_label: str,
x_rotation: int,
x_tick_spacing: int,
y_label: str,
y_tick_spacing: int,
grid: bool,
legend: bool,
export_plot: bool,
plot_file_name: str,
) -> None:
"""
Plot the price data from a DataFrame for a specified date range and columns.
Parameters:
-----------
stats_df : pd.DataFrame
DataFrame containing the price data to plot.
plot_columns : str OR list
List of columns to plot from the DataFrame. If none, all columns will be plotted.
title : str
Title of the plot.
x_label : str
Label for the x-axis.
x_rotation : int
Rotation angle for the x-axis date labels.
x_tick_spacing : int
Spacing for the x-axis ticks.
y_label : str
Label for the y-axis.
y_tick_spacing : int
Spacing for the y-axis ticks.
grid : bool
Whether to display a grid on the plot.
legend : bool
Whether to display a legend on the plot.
export_plot : bool
Whether to save the figure as a PNG file.
plot_file_name : str
File name for saving the figure (if save_fig is True).
Returns:
--------
None
"""
# Set plot figure size and background color
plt.figure(figsize=(12, 6), facecolor="#F5F5F5")
# Plot data
if plot_columns == "All":
for col in stats_df.columns:
plt.scatter(
stats_df.index, stats_df[col], label=col, linestyle="-", linewidth=1.5
)
else:
for col in plot_columns:
plt.scatter(
stats_df.index, stats_df[col], label=col, linestyle="-", linewidth=1.5
)
# Format X axis
plt.gca().xaxis.set_major_locator(MultipleLocator(x_tick_spacing))
plt.xlabel(x_label, fontsize=10)
plt.xticks(rotation=x_rotation, fontsize=8)
# Format Y axis
plt.gca().yaxis.set_major_locator(MultipleLocator(y_tick_spacing))
plt.ylabel(y_label, fontsize=10)
plt.yticks(fontsize=8)
# Format title, layout, grid, and legend
plt.title(title, fontsize=12)
plt.tight_layout()
if grid == True:
plt.grid(True, linestyle="--", alpha=0.7)
if legend == True:
plt.legend(fontsize=9)
# Save figure and display plot
if export_plot == True:
plt.savefig(f"{plot_file_name}.png", dpi=300, bbox_inches="tight")
# Display the plot
plt.show()
return None
plot_vix_with_trades #
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.ticker import MultipleLocator
def plot_vix_with_trades(
vix_price_df: pd.DataFrame,
trades_df: pd.DataFrame,
plot_start_date: str,
plot_end_date: str,
x_tick_spacing: int,
y_tick_spacing: int,
index_number: str,
export_plot: bool,
) -> pd.DataFrame:
"""
Plot the VIX daily high and low prices, along with the VIX spikes, and trades.
Parameters:
-----------
vix_price_df : pd.DataFrame
Dataframe containing the VIX price data to plot.
trades_df : pd.DataFrame
Dataframe containing the trades data.
plot_start_date : str
Start date for the plot in 'YYYY-MM-DD' format.
plot_end_date : str
End date for the plot in 'YYYY-MM-DD' format.
index_number : str
Index number to be used in the file name of the plot export.
export_plot : bool
Whether to save the figure as a PNG file.
Returns:
--------
vix_data : pd.DataFrame
Dataframe containing the VIX price data for the specified timeframe.
"""
# Create temporary dataframe for the specified date range
vix_data = vix_price_df[(vix_price_df.index >= plot_start_date) & (vix_price_df.index <= plot_end_date)]
# Set plot figure size and background color
plt.figure(figsize=(12, 6), facecolor="#F5F5F5")
# Plot VIX high and low price data
plt.plot(vix_data.index, vix_data['High'], label='High', linestyle='-', color='steelblue', linewidth=1)
plt.plot(vix_data.index, vix_data['Low'], label='Low', linestyle='-', color='brown', linewidth=1)
# Plot VIX spikes
plt.scatter(vix_data[vix_data['Spike_SMA'] == True].index, vix_data[vix_data['Spike_SMA'] == True]['High'], label='Spike (High > 1.25 * 10 Day High SMA)', color='black', s=20)
# Plot trades
plt.scatter(trades_df['Trade_Date'], trades_df['Approx_VIX_Level'], label='Trades', color='red', s=20)
# Annotate each point in trades_df with the corresponding Action_Symbol
for _, row in trades_df.iterrows():
plt.text(
row['Trade_Date'] + pd.Timedelta(days=1),
row['Approx_VIX_Level'] + 0.1,
row['TradeDate_Action_Symbol_VIX'],
fontsize=9
)
# Format X axis
plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=x_tick_spacing))
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))
plt.xlabel("Date", fontsize=10)
plt.xticks(rotation=45, fontsize=8)
# Format Y axis
plt.gca().yaxis.set_major_locator(MultipleLocator(y_tick_spacing))
plt.ylabel("VIX", fontsize=10)
plt.yticks(fontsize=8)
# Format title, layout, grid, and legend
plt.title(f"CBOE Volatility Index (VIX), VIX Spikes, Trades, {plot_start_date} - {plot_end_date}", fontsize=12)
plt.tight_layout()
plt.grid(True, linestyle='--', alpha=0.7)
plt.legend(fontsize=9)
# Save figure and display plot
if export_plot == True:
# plt.savefig(f"{index_number}_VIX_Spike_Trades_{plot_start_date}_{plot_end_date}.png", dpi=300, bbox_inches="tight")
plt.savefig(f"{index_number}_VIX_Spike_Trades.png", dpi=300, bbox_inches="tight")
# Display the plot
plt.show()
return vix_data
polygon_fetch_full_history #
import pandas as pd
import time
from datetime import datetime, timedelta
from load_api_keys import load_api_keys
from massive import RESTClient
from settings import config
# Load API keys from the environment
api_keys = load_api_keys()
# Get the environment variable for where data is stored
DATA_DIR = config("DATA_DIR")
def polygon_fetch_full_history(
client,
ticker: str,
timespan: str,
multiplier: int,
adjusted: bool,
existing_history_df: pd.DataFrame,
current_start: datetime,
free_tier: bool,
verbose: bool,
) -> pd.DataFrame:
"""
Fetch full historical data for a given product from Polygon API.
Parameters:
-----------
client
Polygon API client instance.
ticker : str
Ticker symbol to download.
timespan : str
Time span for the data (e.g., "minute", "hour", "day", "week", "month", "quarter", "year").
multiplier : int
Multiplier for the time span (e.g., 1 for daily data).
adjusted : bool
If True, return adjusted data; if False, return raw data.
full_history_df : pd.DataFrame
DataFrame containing the data.
current_start : datetime
Date for which to start pulling data in datetime format.
free_tier : bool
If True, then pause to avoid API limits.
verbose : bool
If True, print detailed information about the data being processed.
Returns:
--------
full_history_df : pd.DataFrame
DataFrame containing the data.
"""
# Copy DataFrame
full_history_df = existing_history_df.copy()
if timespan == "minute":
time_delta = 15
time_overlap = 1
elif timespan == "hour":
time_delta = 15
time_overlap = 1
elif timespan == "day":
time_delta = 180
time_overlap = 1
else:
raise Exception(f"Invalid {timespan}.")
new_data_last_date = None
new_date_last_date_check = None
while current_start < datetime.now():
# Offset end date by time_delta
current_end = current_start + timedelta(days=time_delta)
if verbose == True:
print(f"Pulling {timespan} data for {current_start} thru {current_end} for {ticker}...\n")
try:
# Pull new data
aggs = client.get_aggs(
ticker=ticker,
timespan=timespan,
multiplier=multiplier,
from_=current_start,
to=current_end,
adjusted=adjusted,
sort="asc",
limit=5000,
)
# if len(aggs) == 0:
# raise Exception(f"No data is available for {ticker} for {current_start} thru {current_end}. Please attempt different dates.")
# Convert to DataFrame
new_data = pd.DataFrame([bar.__dict__ for bar in aggs])
new_data["timestamp"] = pd.to_datetime(new_data["timestamp"], unit="ms")
new_data = new_data.rename(columns = {'timestamp':'Date'})
new_data = new_data[['Date', 'open', 'high', 'low', 'close', 'volume', 'vwap', 'transactions', 'otc']]
new_data = new_data.sort_values(by='Date', ascending=True)
# Enforce dtypes to match full_history_df
new_data = new_data.astype(full_history_df.dtypes.to_dict())
# (Optional) reorder columns to match schema
# new_data = new_data[full_history_df.columns]
# Find last date in new_data
new_data_last_date = new_data['Date'].max()
if verbose == True:
print("New data:")
print(new_data)
# No longer necessary to check for 5000 rows of data
# Check if new data contains 5000 rows
# if len(new_data) == 5000:
# raise Exception(f"New data for {ticker} contains 5000 rows, indicating potential issues with data completeness or API limits.")
# If full_history_df length is not 0, check to confirm that data overlaps to verify that there were not any splits in the data
# if not full_history_df.empty:
# if not full_history_df['Date'].isin(new_data['Date']).any():
# raise Exception(f"New data does not overlap with existing data.")
if not full_history_df.empty:
# Columns present in both frames
common_cols = list(full_history_df.columns.intersection(new_data.columns))
if not common_cols:
raise Exception("No common columns to compare.")
# (Optional) de-duplicate to speed up the merge
# full_dedup = full_history_df[common_cols].drop_duplicates()
# new_dedup = new_data[common_cols].drop_duplicates()
price_cols = ['open', 'high', 'low', 'close']
# Inner join on every shared column = exact row matches
# overlap = full_dedup.merge(new_dedup, on=common_cols, how="inner")
# overlap = full_dedup.merge(new_dedup, on=price_cols, how="inner")
overlap = full_history_df.merge(new_data, on=price_cols, how="inner")
if overlap.empty:
raise Exception(f"New data does not overlap with existing data (full-row check).")
# Combine existing data with recent data, drop duplicates, sort values, reset index
full_history_df = pd.concat([full_history_df, new_data])
full_history_df = full_history_df.drop_duplicates(subset="Date", keep="last")
full_history_df = full_history_df.sort_values(by='Date',ascending=True)
full_history_df = full_history_df.reset_index(drop=True)
if verbose == True:
print("Combined data:")
print(full_history_df)
except KeyError as e:
print(f"No data is available for {ticker} from {current_start} thru {current_end}.")
user_choice = input(
f"Press Enter to continue, or type 'q' to quit: "
)
if user_choice.lower() == "q":
print(f"Aborting operation to update {ticker} {timespan} data.")
break # break out of the while loop
else:
print(f"Trying next timeframe for {ticker} {timespan} data.")
# Set last_data_date to current_end because we know data was not available
# up until current_end
new_data_last_date = current_end
pass
except Exception as e:
print(f"Failed to pull {timespan} data for {current_start} thru {current_end} for {ticker}: {e}")
raise # Re-raise the original exception
# Break out of loop if data is up-to-date (or close to being up-to-date because it is
# possible that entire range was not pulled due to the way API handles hour data
# from minute data)
if current_end > datetime.now():
break
else:
# Edge case, if the last date for new_date is exactly time_overlap's duration
# past current_start
if new_date_last_date_check == new_data_last_date:
current_start = current_end - timedelta(days=time_overlap)
new_date_last_date_check = new_data_last_date
else:
current_start = new_data_last_date - timedelta(days=time_overlap)
new_date_last_date_check = new_data_last_date
# Code below is likely not necessary
# # Overlap with existing data to capture all data but check to see if
# # current_end is a Sunday and if so ensure overlap covers a trading day
# if current_end.weekday() == 6:
# current_start = last_data_date - timedelta(days=(time_overlap+1))
# else:
# current_start = last_data_date - timedelta(days=time_overlap)
# Check for free tier and if so then pause for 12 seconds to avoid hitting API rate limits
if free_tier == True:
if verbose == True:
print(f"Sleeping for 12 seconds to avoid hitting API rate limits...\n")
time.sleep(12)
# Return the DataFrame containing the full history
return full_history_df
if __name__ == "__main__":
current_year = datetime.now().year
current_month = datetime.now().month
current_day = datetime.now().day
# Open client connection
client = RESTClient(api_key=api_keys["POLYGON_KEY"])
# Create an empty DataFrame
df = pd.DataFrame({
'Date': pd.Series(dtype="datetime64[ns]"),
'open': pd.Series(dtype="float64"),
'high': pd.Series(dtype="float64"),
'low': pd.Series(dtype="float64"),
'close': pd.Series(dtype="float64"),
'volume': pd.Series(dtype="float64"),
'vwap': pd.Series(dtype="float64"),
'transactions': pd.Series(dtype="int64"),
'otc': pd.Series(dtype="object")
})
# Example usage - minute
df = polygon_fetch_full_history(
client=client,
ticker="TQQQ",
timespan="hour",
multiplier=1,
adjusted=True,
existing_history_df=df,
current_start=datetime(current_year - 2, current_month, current_day),
free_tier=True,
verbose=True,
)
polygon_pull_data #
import os
import pandas as pd
import time
from datetime import datetime, timedelta
from IPython.display import display
from load_api_keys import load_api_keys
from massive import RESTClient
from polygon_fetch_full_history import polygon_fetch_full_history
from settings import config
# Load API keys from the environment
api_keys = load_api_keys()
# Get the environment variable for where data is stored
DATA_DIR = config("DATA_DIR")
def polygon_pull_data(
base_directory,
ticker: str,
source: str,
asset_class: str,
start_date: datetime,
timespan: str,
multiplier: int,
adjusted: bool,
force_existing_check: bool,
free_tier: bool,
verbose: bool,
excel_export: bool,
pickle_export: bool,
output_confirmation: bool,
) -> pd.DataFrame:
"""
Read existing data file, download price data from Polygon, and export data.
Parameters:
-----------
base_directory : any
Root path to store downloaded data.
ticker : str
Ticker symbol to download.
source : str
Name of the data source (e.g., 'Polygon').
asset_class : str
Asset class name (e.g., 'Equities').
start_date : datetime
Start date for the data in datetime format.
timespan : str
Time span for the data (e.g., "minute", "hour", "day", "week", "month", "quarter", "year").
multiplier : int
Multiplier for the time span (e.g., 1 for daily data).
adjusted : bool
If True, return adjusted data; if False, return raw data.
force_existing_check : bool
If True, force a complete check of the existing data file to verify that there are not any gaps in the data.
free_tier : bool
If True, then pause to avoid API limits.
verbose : bool
If True, print detailed information about the data being processed.
excel_export : bool
If True, export data to Excel format.
pickle_export : bool
If True, export data to Pickle format.
output_confirmation : bool
If True, print confirmation message.
Returns:
--------
pd.DataFrame
DataFrame containing the updated price data.
"""
# Open client connection
client = RESTClient(api_key=api_keys["POLYGON_KEY"])
# Set file location based on parameters
file_location = f"{base_directory}/{source}/{asset_class}/{timespan}/{ticker}.pkl"
# Create list of acceptable timespans
acceptable_timespans = ["minute", "hour", "day"]
if timespan not in acceptable_timespans:
raise Exception(f"Invalid timespan: {timespan}. Acceptable timespans are: {acceptable_timespans}.")
# if timespan == "minute":
# time_delta = 15
# time_overlap = 1
# elif timespan == "hour":
# time_delta = 15
# time_overlap = 1
# elif timespan == "day":
# time_delta = 180
# time_overlap = 1
# else:
# raise Exception(f"Invalid {timespan}.")
try:
# Attempt to read existing pickle data file
existing_history_df = pd.read_pickle(file_location)
# Reset index if 'Date' is column is the index
if "Date" not in existing_history_df.columns:
existing_history_df = existing_history_df.reset_index()
print(f"File found...updating the {ticker} {timespan} data.")
if verbose == True:
print("Existing data:")
print(existing_history_df)
# Find last date in existing data
last_data_date = existing_history_df["Date"].max()
print(f"Last date in existing data: {last_data_date}")
# Find the number of starting rows
starting_rows = len(existing_history_df)
print(f"Number of rows in existing data: {starting_rows}")
# Overlap with existing data to capture all data
# current_start = last_data_date - timedelta(days=time_overlap)
current_start = last_data_date - timedelta(days=1)
except FileNotFoundError:
# Print error
print(f"File not found...downloading the {ticker} {timespan} data.")
# Create an empty DataFrame
existing_history_df = pd.DataFrame(
{
"Date": pd.Series(dtype="datetime64[ns]"),
"open": pd.Series(dtype="float64"),
"high": pd.Series(dtype="float64"),
"low": pd.Series(dtype="float64"),
"close": pd.Series(dtype="float64"),
"volume": pd.Series(dtype="float64"),
"vwap": pd.Series(dtype="float64"),
"transactions": pd.Series(dtype="int64"),
"otc": pd.Series(dtype="object"),
}
)
# Set the number of starting rows to be 0
starting_rows = 0
# Set current date to start date
current_start = start_date
# Check for force_existing_check flag
if force_existing_check == True:
print("Forcing check of existing data...")
current_start = start_date
full_history_df = polygon_fetch_full_history(
client=client,
ticker=ticker,
timespan=timespan,
multiplier=multiplier,
adjusted=adjusted,
existing_history_df=existing_history_df,
current_start=current_start,
free_tier=free_tier,
verbose=verbose,
)
# Create directory
directory = f"{base_directory}/{source}/{asset_class}/{timespan}"
os.makedirs(directory, exist_ok=True)
# Export to Excel
if excel_export == True:
print(f"Exporting {ticker} {timespan} data to Excel...")
full_history_df.to_excel(f"{directory}/{ticker}.xlsx", sheet_name="data")
# Export to Pickle
if pickle_export == True:
print(f"Exporting {ticker} {timespan} data to Pickle...")
full_history_df.to_pickle(f"{directory}/{ticker}.pkl")
total_rows = len(full_history_df)
# Output confirmation
if output_confirmation == True:
print(f"The first and last date of {timespan} data for {ticker} is: ")
display(full_history_df[:1])
display(full_history_df[-1:])
print(f"Number of rows after data update: {total_rows}")
if starting_rows:
print(f"Number of rows added during update: {total_rows - starting_rows}")
print(f"Polygon data complete for {ticker} {timespan} data.")
print(f"--------------------")
return full_history_df
if __name__ == "__main__":
# Get current year, month, day
current_year = datetime.now().year
current_month = datetime.now().month
current_day = datetime.now().day
# Set global variables
GLOBAL_FREE_TIER = True
# Stock Data
equities = ["AMZN", "AAPL"]
# Iterate through each stock
for stock in equities:
# Example usage - minute
polygon_pull_data(
base_directory=DATA_DIR,
ticker=stock,
source="Polygon",
asset_class="Equities",
start_date=datetime(current_year - 2, current_month, current_day),
timespan="minute",
multiplier=1,
adjusted=True,
force_existing_check=False,
free_tier=GLOBAL_FREE_TIER,
verbose=False,
excel_export=True,
pickle_export=True,
output_confirmation=True,
)
if GLOBAL_FREE_TIER == True:
time.sleep(12)
else:
pass
# Example usage - hourly
polygon_pull_data(
base_directory=DATA_DIR,
ticker=stock,
source="Polygon",
asset_class="Equities",
start_date=datetime(current_year - 2, current_month, current_day),
timespan="hour",
multiplier=1,
adjusted=True,
force_existing_check=False,
free_tier=GLOBAL_FREE_TIER,
verbose=False,
excel_export=True,
pickle_export=True,
output_confirmation=True,
)
if GLOBAL_FREE_TIER == True:
time.sleep(12)
else:
pass
# Example usage - daily
polygon_pull_data(
base_directory=DATA_DIR,
ticker=stock,
source="Polygon",
asset_class="Equities",
start_date=datetime(current_year - 2, current_month, current_day),
timespan="day",
multiplier=1,
adjusted=True,
force_existing_check=False,
free_tier=GLOBAL_FREE_TIER,
verbose=False,
excel_export=True,
pickle_export=True,
output_confirmation=True,
)
if GLOBAL_FREE_TIER == True:
time.sleep(12)
else:
pass
strategy_harry_brown_perm_port #
import pandas as pd
def strategy_harry_brown_perm_port(
fund_list: str,
starting_cash: int,
cash_contrib: int,
close_prices_df: pd.DataFrame,
rebal_month: int,
rebal_day: int,
rebal_per_high: float,
rebal_per_low: float,
excel_export: bool,
pickle_export: bool,
output_confirmation: bool,
) -> pd.DataFrame:
"""
Execute the re-balance strategy based on specified criteria.
Parameters:
-----------
fund_list (str):
List of funds for data to be combined from. Funds are strings in the form "BTC-USD".
starting_cash (int):
Starting investment balance.
cash_contrib (int):
Cash contribution to be made daily.
close_prices_df (pd.DataFrame):
DataFrame containing date and close prices for all funds to be included.
rebal_month (int):
Month for annual rebalance.
rebal_day (int):
Day for annual rebalance.
rebal_per_high (float):
High percentage for rebalance.
rebal_per_low (float):
Low percentage for rebalance.
excel_export : bool
If True, export data to Excel format.
pickle_export : bool
If True, export data to Pickle format.
output_confirmation : bool
If True, print confirmation message.
Returns:
--------
df (pd.DataFrame):
DataFrame containing strategy data for all funds to be included. Also dumps the df to excel for reference later.
"""
num_funds = len(fund_list)
df = close_prices_df.copy()
df.reset_index(inplace = True)
# Date to be used for annual rebalance
target_month = rebal_month
target_day = rebal_day
# Create a dataframe with dates from the specific month
rebal_date = df[df['Date'].dt.month == target_month]
# Specify the date or the next closest
rebal_date = rebal_date[rebal_date['Date'].dt.day >= target_day]
# Group by year and take the first entry for each year
rebal_dates_by_year = rebal_date.groupby(rebal_date['Date'].dt.year).first().reset_index(drop=True)
'''
Column order for the dataframe:
df[fund + "_BA_Shares"]
df[fund + "_BA_$_Invested"]
df[fund + "_BA_Port_%"]
df['Total_BA_$_Invested']
df['Contribution']
df['Rebalance']
df[fund + "_AA_Shares"]
df[fund + "_AA_$_Invested"]
df[fund + "_AA_Port_%"]
df['Total_AA_$_Invested']
'''
# Calculate the columns and initial values for before action (BA) shares, $ invested, and port %
for fund in fund_list:
df[fund + "_BA_Shares"] = starting_cash / num_funds / df[fund + "_Close"]
df[fund + "_BA_$_Invested"] = df[fund + "_BA_Shares"] * df[fund + "_Close"]
df[fund + "_BA_Port_%"] = 0.25
# Set column values initially
df['Total_BA_$_Invested'] = starting_cash
df['Contribution'] = cash_contrib
df['Rebalance'] = "No"
# Set columns and values initially for after action (AA) shares, $ invested, and port %
for fund in fund_list:
df[fund + "_AA_Shares"] = starting_cash / num_funds / df[fund + "_Close"]
df[fund + "_AA_$_Invested"] = df[fund + "_AA_Shares"] * df[fund + "_Close"]
df[fund + "_AA_Port_%"] = 0.25
# Set column value for after action (AA) total $ invested
df['Total_AA_$_Invested'] = starting_cash
# Iterate through the dataframe and execute the strategy
for index, row in df.iterrows():
# Ensure there's a previous row to reference by checking the index value
if index > 0:
# Initialize variable
Total_BA_Invested = 0
# Calculate before action (BA) shares and $ invested values
for fund in fund_list:
df.at[index, fund + "_BA_Shares"] = df.at[index - 1, fund + "_AA_Shares"]
df.at[index, fund + "_BA_$_Invested"] = df.at[index, fund + "_BA_Shares"] * row[fund + "_Close"]
# Sum the asset values to find the total
Total_BA_Invested = Total_BA_Invested + df.at[index, fund + "_BA_$_Invested"]
# Calculate before action (BA) port % values
for fund in fund_list:
df.at[index, fund + "_BA_Port_%"] = df.at[index, fund + "_BA_$_Invested"] / Total_BA_Invested
# Set column for before action (BA) total $ invested
df.at[index, 'Total_BA_$_Invested'] = Total_BA_Invested
# Initialize variables
rebalance = "No"
date = row['Date']
# Check for a specific date annually
# Simple if statement to check if date_to_check is in jan_28_or_after_each_year
if date in rebal_dates_by_year['Date'].values:
rebalance = "Yes"
else:
pass
# Check to see if any asset has portfolio percentage of greater than 35% or less than 15% and if so set variable
for fund in fund_list:
if df.at[index, fund + "_BA_Port_%"] > rebal_per_high or df.at[index, fund + "_BA_Port_%"] < rebal_per_low:
rebalance = "Yes"
else:
pass
# If rebalance is required, rebalance back to 25% for each asset, else just divide contribution evenly across assets
if rebalance == "Yes":
df.at[index, 'Rebalance'] = rebalance
for fund in fund_list:
df.at[index, fund + "_AA_$_Invested"] = (Total_BA_Invested + df.at[index, 'Contribution']) * 0.25
else:
df.at[index, 'Rebalance'] = rebalance
for fund in fund_list:
df.at[index, fund + "_AA_$_Invested"] = df.at[index, fund + "_BA_$_Invested"] + df.at[index, 'Contribution'] * 0.25
# Initialize variable
Total_AA_Invested = 0
# Set column values for after action (AA) shares and port %
for fund in fund_list:
df.at[index, fund + "_AA_Shares"] = df.at[index, fund + "_AA_$_Invested"] / row[fund + "_Close"]
# Sum the asset values to find the total
Total_AA_Invested = Total_AA_Invested + df.at[index, fund + "_AA_$_Invested"]
# Calculate after action (AA) port % values
for fund in fund_list:
df.at[index, fund + "_AA_Port_%"] = df.at[index, fund + "_AA_$_Invested"] / Total_AA_Invested
# Set column for after action (AA) total $ invested
df.at[index, 'Total_AA_$_Invested'] = Total_AA_Invested
# If this is the first row
else:
pass
df['Return'] = df['Total_AA_$_Invested'].pct_change()
df['Cumulative_Return'] = (1 + df['Return']).cumprod()
plan_name = '_'.join(fund_list)
# Export to excel
if excel_export == True:
df.to_excel(f"{plan_name}_Strategy.xlsx", sheet_name="data")
else:
pass
# Export to pickle
if pickle_export == True:
df.to_pickle(f"{plan_name}_Strategy.pkl")
else:
pass
# Output confirmation
if output_confirmation == True:
print(f"Strategy complete for {plan_name}")
else:
pass
return df
summary_stats #
import pandas as pd
import numpy as np
def summary_stats(
fund_list: list[str],
df: pd.DataFrame,
period: str,
use_calendar_days: bool,
excel_export: bool,
pickle_export: bool,
output_confirmation: bool,
) -> pd.DataFrame:
"""
Calculate summary statistics for the given fund list and return data.
Parameters:
-----------
fund_list (str):
List of funds. This is used below in the excel/pickle export but not in the analysis.. Funds are strings in the form "BTC-USD".
df (pd.DataFrame):
Dataframe with return data. Assumes returns are in decimal format (e.g., 0.05 for 5%), and assumes there is only 1 column.
period (str):
Period for which to calculate statistics. Options are "Monthly", "Weekly", "Daily".
use_calendar_days (bool):
If True, use calendar days for calculations. If False, use trading days.
excel_export : bool
If True, export data to Excel format.
pickle_export : bool
If True, export data to Pickle format.
output_confirmation : bool
If True, print confirmation message.
Returns:
--------
df_stats (pd.DataFrame):
pd.DataFrame: DataFrame containing various portfolio statistics.
"""
# Get the period in proper format
period = period.strip().capitalize()
# Map base timeframes
period_to_timeframe = {
"Monthly": 12,
"Weekly": 52,
"Daily": 365 if use_calendar_days else 252,
}
try:
timeframe = period_to_timeframe[period]
except KeyError:
raise ValueError(f"Invalid period: {period}. Must be one of {list(period_to_timeframe.keys())}")
df_stats = pd.DataFrame(df.mean(axis=0) * timeframe) # annualized
df_stats.columns = ['Annualized Mean']
df_stats['Annualized Volatility'] = df.std() * np.sqrt(timeframe) # annualized
df_stats['Annualized Sharpe Ratio'] = df_stats['Annualized Mean'] / df_stats['Annualized Volatility']
df_cagr = (1 + df[df.columns[0]]).cumprod()
cagr = (df_cagr.iloc[-1] / 1) ** ( 1 / (len(df_cagr) / timeframe)) - 1
df_stats['CAGR'] = cagr
df_stats[f'{period} Max Return'] = df.max()
df_stats[f'{period} Max Return (Date)'] = df.idxmax().values[0]
df_stats[f'{period} Min Return'] = df.min()
df_stats[f'{period} Min Return (Date)'] = df.idxmin().values[0]
wealth_index = 1000 * (1 + df).cumprod()
previous_peaks = wealth_index.cummax()
drawdowns = (wealth_index - previous_peaks) / previous_peaks
df_stats['Max Drawdown'] = drawdowns.min()
df_stats['Peak'] = [previous_peaks[col][:drawdowns[col].idxmin()].idxmax() for col in previous_peaks.columns]
df_stats['Trough'] = drawdowns.idxmin()
recovery_date = []
for col in wealth_index.columns:
prev_max = previous_peaks[col][:drawdowns[col].idxmin()].max()
recovery_wealth = pd.DataFrame([wealth_index[col][drawdowns[col].idxmin():]]).T
recovery_date.append(recovery_wealth[recovery_wealth[col] >= prev_max].index.min())
df_stats['Recovery Date'] = recovery_date
df_stats['Days to Recover'] = (df_stats['Recovery Date'] - df_stats['Trough']).dt.days
df_stats['MAR Ratio'] = df_stats['CAGR'] / -df_stats['Max Drawdown']
plan_name = '_'.join(fund_list)
# Export to excel
if excel_export == True:
df_stats.to_excel(f"{plan_name}_Summary_Stats.xlsx", sheet_name="data")
else:
pass
# Export to pickle
if pickle_export == True:
df_stats.to_pickle(f"{plan_name}_Summary_Stats.pkl")
else:
pass
# Output confirmation
if output_confirmation == True:
print(f"Summary stats complete for {plan_name}")
else:
pass
return df_stats
yf_pull_data #
import os
import pandas as pd
import yfinance as yf
from IPython.display import display
def yf_pull_data(
base_directory,
ticker: str,
source: str,
asset_class: str,
excel_export: bool,
pickle_export: bool,
output_confirmation: bool,
) -> pd.DataFrame:
"""
Download daily price data from Yahoo Finance and export it.
Parameters:
-----------
base_directory
Root path to store downloaded data.
ticker : str
Ticker symbol to download.
source : str
Name of the data source (e.g., 'Yahoo').
asset_class : str
Asset class name (e.g., 'Equities').
excel_export : bool
If True, export data to Excel format.
pickle_export : bool
If True, export data to Pickle format.
output_confirmation : bool
If True, print confirmation message.
Returns:
--------
df : pd.DataFrame
DataFrame containing the downloaded data.
"""
# Download data from YF
df = yf.download(ticker, start="1900-01-01")
# Drop the column level with the ticker symbol
df.columns = df.columns.droplevel(1)
# Reset index
df = df.reset_index()
# Remove the "Price" header from the index
df.columns.name = None
# Reset date column
df['Date'] = df['Date'].dt.tz_localize(None)
# Set 'Date' column as index
df = df.set_index('Date', drop=True)
# Drop data from last day because it's not accrate until end of day
df = df.drop(df.index[-1])
# Create directory
directory = f"{base_directory}/{source}/{asset_class}/Daily"
os.makedirs(directory, exist_ok=True)
# Export to excel
if excel_export == True:
df.to_excel(f"{directory}/{ticker}.xlsx", sheet_name="data")
else:
pass
# Export to pickle
if pickle_export == True:
df.to_pickle(f"{directory}/{ticker}.pkl")
else:
pass
# Output confirmation
if output_confirmation == True:
print(f"The first and last date of data for {ticker} is: ")
display(df[:1])
display(df[-1:])
print(f"Yahoo Finance data complete for {ticker}")
print(f"--------------------")
else:
pass
return df
References #
None
Code #
The Jupyter notebook with the functions and all other code is available here.The HTML export of the jupyter notebook is available here.
The PDF export of the jupyter notebook is available here.