Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 39 additions & 39 deletions async_fetch_ohlcv.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

def set_fetch_step(timeframe):
'''
Set the fetch step based on the timeframe.
Sets the fetch step based on the timeframe.
'''
msec = 1000
minute = 60 * msec
Expand Down Expand Up @@ -61,26 +61,26 @@ def set_filepath(exchange,exchange_name,currency,timeframe,start,step):

def save_data_to_csv(exchange,data,DATASET,df_origin,save = 0,save_temp = False):
'''
Save data to our file path.
Saves data to the file path.
input:
exchange:exchange, ccxt.exchange_name()
data:list, the ohlcv data we fetched.
DATASET:string, the file path we want to save the data.
df_origin:dataframe, the origin data we stored.
data:list, the ohlcv data fetched.
DATASET:string, the file path to save the data to.
df_origin:dataframe, the original data stored.
save:int default 0, the storage range.
save_temp:boolean default False, if it's True,it will return save(int).
save_temp:boolean default False, if True,it returns save(int).
output:
save(int) or None
'''
if(len(data) > save*1000):
# update save
# Updates save
save += 1

# transform data to dataframe
# Transforms data to dataframe
df = pd.DataFrame(data=data,columns=['Timestamp','Open','High','Low','Close','Volume'])
df['Timestamp'] = df['Timestamp'].apply(exchange.iso8601)

# split timestamp to date and time to fit the multicharts data form
# Splits timestamp to date and time to fit the multicharts' data form
time_pair = df['Timestamp'].map(lambda x:dateutil.parser.parse(x).strftime("%Y-%m-%d %H:%M:%S").split(' '))
temp = pd.DataFrame(columns = ['Date','Time'])
temp['Date'] = time_pair.map(lambda x:x[0])
Expand All @@ -89,7 +89,7 @@ def save_data_to_csv(exchange,data,DATASET,df_origin,save = 0,save_temp = False)
df_con = pd.concat([temp,df.drop(['Timestamp'],axis = 1)],axis = 1)
df_con['Volume'] = df_con['Volume'].apply(int) # the volume data should be int

# save to file
# Saves data to the file path
df_con = pd.concat([df_origin,df_con],axis = 0).set_index('Date')
df_con.to_csv(DATASET)

Expand All @@ -98,78 +98,78 @@ def save_data_to_csv(exchange,data,DATASET,df_origin,save = 0,save_temp = False)

def save_dataname_to_json(filename,start,end):
'''
Save finished data to json file which named datta_record.
Saves finished data to the json file named data_record.
input:
filename:string
start:string, record the start time of the data
end:string, record the end time of the data
start:string, records the start time of the data
end:string, records the end time of the data
output:
None
'''

DATASET_PATH = os.environ.get('DATASET_PATH', 'dataset')
DATASET_RECORD = os.path.join(DATASET_PATH,'data_record.json')

# load json file
# Loads the json file
if os.path.exists(DATASET_RECORD):
with open(DATASET_RECORD,'r') as fp:
data_record = json.load(fp)
else:
data_record = {}

# write json file
# Writes the json file
with open(DATASET_RECORD, 'w') as fp:

# split filename
# Splits the filename
name_list = filename[:-4].split('-')
exchange = name_list[0]
currency = name_list[1]+'/'+name_list[2]
timeframe = name_list[3]

# save to dictionary
# Saves to the dictionary

# save exchange
# Saves the exchange
if not exchange in data_record.keys():
data_record[exchange] = {}

# save currency
# Saves the currency
if not currency in data_record[exchange].keys():
data_record[exchange][currency] = {}

# save timeframe
# Saves the timeframe
if not timeframe in data_record[exchange][currency].keys():
data_record[exchange][currency][timeframe] = {}

# save start time and end time
# Saves the start time and end time
data_record[exchange][currency][timeframe]['start'] = start
data_record[exchange][currency][timeframe]['end'] = end

# save dictionary to json
# Saves the dictionary to json
json.dump(data_record,fp)

async def fetch_ohlcv(exchange_name,currency = 'BTC/USDT',timeframe = '1d',start = '2017-01-01 00:00:00'):
'''
Start to fetch ohlcv data asynchronously into csv file.
Starts to fetch ohlcv data asynchronously into a csv file.
input:
exchange_name:string, the name of the exchange(e.g. binance,bitfinex )(lowercase letters)
currency:string, the currency you want to crawl
timeframe:string, the timeframe you want(e.g. '1d' '30m')
start:string, the time you start to crawl
currency:string, the currency to be crawled
timeframe:string, the timeframe wanted (e.g. '1d' '30m')
start:string, the time at which the crawl starts
output:
None, just print the process for your crawling
None, just prints the crawling process
'''
# set step
# Sets step
step = set_fetch_step(timeframe)
# set exchange
# Sets exchange
exchange = getattr(ccxt,exchange_name)({
'rateLimit': 10000, # set the delay between two http resquest to avoid the ratelimit of exchange
'enableRateLimit': True, # activate the rateLimit function
'rateLimit': 10000, # Sets the delay between two http requests to avoid the ratelimit of the exchange
'enableRateLimit': True, # Activates the rateLimit function
# 'verbose': True,
})
# set filepath
# Sets the filepath
from_timestamp, filename, DATASET, df_origin,start = set_filepath(exchange,exchange_name,currency,timeframe,start,step)

# get ohlcv data
# Gets the ohlcv data
save = 1
hold = 10
data = []
Expand All @@ -185,7 +185,7 @@ async def fetch_ohlcv(exchange_name,currency = 'BTC/USDT',timeframe = '1d',start
print('First candle epoch', first, exchange.iso8601(first))
print('Last candle epoch', last, exchange.iso8601(last))

# update timestamp
# Updates the timestamp
from_timestamp = last + step
data += ohlcvs

Expand All @@ -194,7 +194,7 @@ async def fetch_ohlcv(exchange_name,currency = 'BTC/USDT',timeframe = '1d',start
await asyncio.sleep(hold)

finally:
# save data during fetching
# Saves data during the fetching
save = save_data_to_csv(exchange,data,DATASET,df_origin,save,True)

save_data_to_csv(exchange,data,DATASET,df_origin)
Expand All @@ -204,15 +204,15 @@ async def fetch_ohlcv(exchange_name,currency = 'BTC/USDT',timeframe = '1d',start

def start_loop(loop):
'''
Open a new event loop.
Opens a new event loop.
'''
asyncio.set_event_loop(loop)
loop.run_forever()

# Just use this function to crawl data.
# This function should be used just to crawl data.
def start_fetch_ohlcv(exchange_name,currency,timeframe,start):
'''
Start to fetch the ohlcv.
Starts to fetch the ohlcv.
input:
exchange_name:list, exchange names
currency:list
Expand All @@ -226,4 +226,4 @@ def start_fetch_ohlcv(exchange_name,currency,timeframe,start):

for e,c,t,f in zip(exchange_name,currency,timeframe,start):
asyncio.run_coroutine_threadsafe(fetch_ohlcv(e,c,t,f), new_loop)
new_loop.close()
new_loop.close()