1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
| import calendar
import os
import pandas as pd
from coinbase_fetch_available_products import coinbase_fetch_available_products
from coinbase_fetch_full_history import coinbase_fetch_full_history
from datetime import datetime, timedelta
from settings import config
# Get the data directory from the configuration
DATA_DIR = config("DATA_DIR")
def coinbase_pull_data(
base_directory,
source: str,
asset_class: str,
excel_export: bool,
pickle_export: bool,
output_confirmation: bool,
base_currency: str,
quote_currency: str,
granularity: int=3600, # 60=minute, 3600=hourly, 86400=daily
status: str='online', # default status is 'online'
start_date: datetime=datetime(2025, 1, 1), # default start date
end_date: datetime=datetime.now() - timedelta(days=1), # updates data through 1 day ago due to lag in data availability
) -> pd.DataFrame:
"""
Update existing record or pull full historical data for a given product from Coinbase Exchange API.
Parameters:
-----------
base_directory
Root path to store downloaded data.
source : str
Name of the data source (e.g., 'Nasdaq_Data_Link').
asset_class : str
Asset class name (e.g., 'Equities').
excel_export : bool
If True, export data to Excel format.
pickle_export : bool
If True, export data to Pickle format.
output_confirmation : bool
If True, print confirmation message.
base_currency : str
The base currency (e.g., 'BTC').
quote_currency : str
The quote currency (e.g., 'USD').
status : str, optional
Filter products by status (default is 'online').
granularity : int
Time slice in seconds (e.g., 3600 for hourly candles).
start_date : str, optional
Start date in UTC (ISO format).
end_date : str, optional
End date in UTC (ISO format).
Returns:
--------
None
"""
# List of crypto assets
filtered_products = coinbase_fetch_available_products(base_currency=base_currency, quote_currency=quote_currency, status=status)
filtered_products_list = filtered_products['id'].tolist()
filtered_products_list = sorted(filtered_products_list)
if not filtered_products.empty:
print(filtered_products[['id', 'base_currency', 'quote_currency', 'status']])
print(filtered_products_list)
print(len(filtered_products_list))
else:
print("No products found with the specified base and/or quote currencies.")
missing_data = []
omitted_data = []
num_products = len(filtered_products_list)
counter = 0
# Loop for updates
for product in filtered_products_list:
counter+=1
print(f"Updating product {counter} of {num_products}.")
if granularity == 60:
time_length = "Minute"
elif granularity == 3600:
time_length = "Hourly"
elif granularity == 86400:
time_length = "Daily"
else:
print("Error - please confirm timeframe.")
break
# Set file location based on parameters
file_location = f"{base_directory}/{source}/{asset_class}/{time_length}/{product}.pkl"
try:
# Attempt to read existing pickle data file
ex_data = pd.read_pickle(file_location)
ex_data = ex_data.reset_index()
print(f"File found...updating the {product} data")
print("Existing data:")
print(ex_data)
# Pull recent data
new_data = coinbase_fetch_full_history(product, start_date, end_date, granularity)
new_data = new_data.rename(columns={'time':'Date'})
new_data['Date'] = new_data['Date'].dt.tz_localize(None)
print("New data:")
print(new_data)
# Combine existing data with recent data
full_history_df = pd.concat([ex_data,new_data[new_data['Date'].isin(ex_data['Date']) == False]])
full_history_df = full_history_df.sort_values(by='Date')
full_history_df['Date'] = full_history_df['Date'].dt.tz_localize(None)
full_history_df = full_history_df.set_index('Date')
print("Combined data:")
print(full_history_df)
# Create directory
directory = f"{base_directory}/{source}/{asset_class}/{time_length}"
os.makedirs(directory, exist_ok=True)
# Export to excel
if excel_export == True:
full_history_df.to_excel(f"{directory}/{product}.xlsx", sheet_name="data")
else:
pass
# Export to pickle
if pickle_export == True:
full_history_df.to_pickle(f"{directory}/{product}.pkl")
else:
pass
# Output confirmation
if output_confirmation == True:
print(f"Data update complete for {time_length} {product}.")
print("--------------------")
else:
pass
except FileNotFoundError:
# Starting year for fetching initial data
starting_year = 2025
# Print error
print(f"File not found...downloading the {product} data starting with {starting_year}.")
def get_full_hist(year):
try:
# Define the start and end dates
start_date = datetime(year, 1, 1) # Default start date
end_date = datetime.now() - timedelta(days = 1) # Updates data through 1 day ago
# Fetch and process the data
full_history_df = coinbase_fetch_full_history(product, start_date, end_date, granularity)
full_history_df = full_history_df.rename(columns={'time': 'Date'})
full_history_df = full_history_df.sort_values(by='Date')
# Iterate through rows to see if the value of the asset ever exceeds a specified threshold
# Default value for the price threshold is 0 USD
# If the price never exceeds this threshold, the asset is omitted from the final list
def find_first_close_above_threshold(full_history_df, threshold=0):
# Ensure 'Date' is the index before proceeding
if 'Date' in full_history_df.columns:
full_history_df.set_index('Date', inplace=True)
full_history_df.index = full_history_df.index.tz_localize(None)
# Iterate through the DataFrame
for index, row in full_history_df.iterrows():
if row['close'] >= threshold:
print(f"First occurrence: {index}, close={row['close']}")
# Return the filtered DataFrame starting from this row
return full_history_df.loc[index:]
# If no value meets the condition, return an empty DataFrame
print(f"Share price never exceeds {threshold} USD.")
omitted_data.append(product)
return None
full_history_above_threshold_df = find_first_close_above_threshold(full_history_df, threshold=0)
return full_history_above_threshold_df
except KeyError:
print(f"KeyError: No data available for {product} in {year}. Trying next year...")
next_year = year + 1
# Base case: Stop if the next year exceeds the current year
if next_year > datetime.now().year:
print("No more data available for any future years.")
missing_data.append(product)
return None
# Recursive call for the next year
return get_full_hist(year=next_year)
# Fetch the full history starting from the given year
full_history_df = get_full_hist(year=starting_year)
if full_history_df is not None:
# Create directory
directory = f"{base_directory}/{source}/{asset_class}/{time_length}"
os.makedirs(directory, exist_ok=True)
# Export to excel
if excel_export == True:
full_history_df.to_excel(f"{directory}/{product}.xlsx", sheet_name="data")
else:
pass
# Export to pickle
if pickle_export == True:
full_history_df.to_pickle(f"{directory}/{product}.pkl")
else:
pass
# Output confirmation
if output_confirmation == True:
print(f"Initial data fetching completed successfully for {time_length} {product}.")
print("--------------------")
else:
pass
else:
print("No data could be fetched for the specified range.")
except Exception as e:
print(str(e))
# Remove the cryptocurrencies with missing data from the final list
missing_data = sorted(missing_data)
print(f"Data missing for: {missing_data}")
for asset in missing_data:
try:
print(f"Removing {asset} from the list because it is missing data.")
filtered_products_list.remove(asset)
except ValueError:
print(f"{asset} not in list.")
pass
# Remove the cryptocurrencies with share prices that never exceed 1 USD from the final list
omitted_data = sorted(omitted_data)
print(f"Data omitted due to price for: {omitted_data}")
for asset in omitted_data:
try:
print(f"Removing {asset} from the list because the share price never exceeds 1 USD.")
filtered_products_list.remove(asset)
except ValueError:
print(f"{asset} not in list.")
pass
# Remove stablecoins from the final list
stablecoins_to_remove = ['USDT-USD', 'USDC-USD', 'PAX-USD', 'DAI-USD', 'PYUSD-USD', 'GUSD-USD']
stablecoins_to_remove = sorted(stablecoins_to_remove)
print(f"Data for stable coins not to be used: {stablecoins_to_remove}")
for asset in stablecoins_to_remove:
try:
filtered_products_list.remove(asset)
# print(f"Removing {asset} from the list because it is a stablecoin.")
except ValueError:
# print(f"{asset} not in list.")
pass
# Remove the wrapped coins from the final list
wrapped_coins_to_remove = ['WAXL-USD', 'WBTC-USD']
wrapped_coins_to_remove = sorted(wrapped_coins_to_remove)
print(f"Data for wrapped coins not to be used: {wrapped_coins_to_remove}")
for asset in wrapped_coins_to_remove:
try:
filtered_products_list.remove(asset)
# print(f"Removing {asset} from the list because it is a wrapped coin.")
except ValueError:
# print(f"{asset} not in list.")
pass
# Print the final list of token and the length of the list
print(f"Final list of tokens: {filtered_products_list}")
print(f"Length of final list of tokens: {len(filtered_products_list)}")
return full_history_df
if __name__ == "__main__":
# # Example usage to pull all data for each month from 2010 to 2024
# for granularity in [60, 3600, 86400]:
# for year in range(2010, 2025):
# for month in range(1, 13):
# print(f"Pulling data for {year}-{month:02d}...")
# try:
# # Get the last day of the month
# last_day = calendar.monthrange(year, month)[1]
# coinbase_pull_data(
# base_directory=DATA_DIR,
# source="Coinbase",
# asset_class="Cryptocurrencies",
# excel_export=False,
# pickle_export=True,
# output_confirmation=True,
# base_currency="XRP",
# quote_currency="USD",
# granularity=granularity, # 60=minute, 3600=hourly, 86400=daily
# status='online',
# start_date=datetime(year, month, 1),
# end_date=datetime(year, month, last_day),
# )
# except Exception as e:
# print(f"Failed to pull data for {year}-{month:02d}: {e}")
current_year = datetime.now().year
current_month = datetime.now().month
current_day = datetime.now().day
# Crypto Data
currencies = ["BTC", "ETH", "SOL", "XRP"]
# Iterate through each currency
for cur in currencies:
# Example usage - minute
coinbase_pull_data(
base_directory=DATA_DIR,
source="Coinbase",
asset_class="Cryptocurrencies",
excel_export=False,
pickle_export=True,
output_confirmation=True,
base_currency=cur,
quote_currency="USD",
granularity=60, # 60=minute, 3600=hourly, 86400=daily
status='online', # default status is 'online'
start_date=datetime(current_year, current_month - 1, 1), # default start date
end_date=datetime.now() - timedelta(days=1), # updates data through 1 day ago due to lag in data availability
)
# Example usage - hourly
coinbase_pull_data(
base_directory=DATA_DIR,
source="Coinbase",
asset_class="Cryptocurrencies",
excel_export=True,
pickle_export=True,
output_confirmation=True,
base_currency=cur,
quote_currency="USD",
granularity=3600, # 60=minute, 3600=hourly, 86400=daily
status='online', # default status is 'online'
start_date=datetime(current_year, current_month - 1, 1), # default start date
end_date=datetime.now() - timedelta(days=1), # updates data through 1 day ago due to lag in data availability
)
# Example usage - daily
coinbase_pull_data(
base_directory=DATA_DIR,
source="Coinbase",
asset_class="Cryptocurrencies",
excel_export=True,
pickle_export=True,
output_confirmation=True,
base_currency=cur,
quote_currency="USD",
granularity=86400, # 60=minute, 3600=hourly, 86400=daily
status='online', # default status is 'online'
start_date=datetime(current_year, current_month - 1, 1), # default start date
end_date=datetime.now() - timedelta(days=1), # updates data through 1 day ago due to lag in data availability
)
|