-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpython_sample.py
More file actions
116 lines (106 loc) · 4.52 KB
/
python_sample.py
File metadata and controls
116 lines (106 loc) · 4.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import boto3
import botocore
import os
import pandas as pd
from io import BytesIO, StringIO
import argparse
import gzip
import json
from datetime import datetime
import pytz
def get_s3_client():
# keys can be set as environment variables or hardcoded here
access_key = os.getenv('AWS_ACCESS_KEY_ID')
secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
region = os.getenv('AWS_REGION')
s3_client = boto3.client(
's3',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region
)
return s3_client
def get_last_timestamp(filename):
filename = os.path.basename(filename).rsplit('.', 2)[0]
date, time, _ = filename.split('_')
year, month, day = map(int, date.split('-'))
hour, minute, second = map(int, time.split('-'))
dt = datetime(year, month, day, hour, minute, second)
return int(dt.replace(tzinfo=pytz.utc).timestamp())
def get_s3_keys(bucket, prefix, start_time):
client = get_s3_client()
keys = []
kwargs = {'Bucket': bucket, 'Prefix': prefix}
while True:
resp = client.list_objects_v2(**kwargs)
for obj in resp['Contents']:
key = obj['Key']
# not necessary, but speeds up execution
if get_last_timestamp(key) < start_time:
continue # we make use of the key format to avoid downloading too many objects https://ember.deltixlab.com/docs/dw/s3/#batches--objects
keys.append(key)
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break
return keys
def retrieve_objects(bucket, keys, start_time, end_time):
start_time = pd.to_datetime(start_time, unit='s')
end_time = pd.to_datetime(end_time, unit='s')
client = get_s3_client()
# clearing content of the output file
with open('output.csv', 'w') as f_output:
pass
with open('output.csv', 'a') as f_output:
for key in keys:
try:
response = client.get_object(Bucket=bucket, Key=key)
data = StringIO(gzip.open(BytesIO(response['Body'].read()), 'rt').read())
df = pd.read_json(data, lines=True)
if df['Timestamp'].iloc[-1] < start_time:
continue
elif df['Timestamp'].iloc[0] > end_time:
break
# filtering out TRADES only
df = df[df['Type'] == "OrderTradeReportEvent"]
df = df[(df['Timestamp'] >= start_time) & (df['Timestamp'] <= end_time)]
# additional processing may be applied here using pandas, e.g. drop columns, change date format
# for example df['Timestamp'] = pd.to_datetime(df['Timestamp'])
# or df['Timestamp'] = df['Timestamp'].dt.strftime('%d-%m-%Y')
if not df.empty:
df.to_csv(f_output, header=f_output.tell() == 0, index=False)
except botocore.exceptions.ClientError as e:
print(f"Failed to read data from {key}: {e}")
except KeyError as e:
print(f"Failed to extract timestamp from line {line} in {key}: {e}")
except ValueError:
print(f"Failed to parse JSON from {key}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--start_time", type=int, help="the start time in Unix time format in seconds")
parser.add_argument("--end_time", type=int, help="the end time in Unix time format in seconds")
parser.add_argument("--bucket", type=str, help="the name of the S3 bucket")
parser.add_argument("--prefix", type=str, help="the prefix of the files in the S3 bucket")
args = parser.parse_args()
if not args.bucket:
print("Please provide the name of the S3 bucket")
return
if not args.prefix:
print("Please provide the prefix of the files")
return
# bucket name and prefix are obtained from command line argument
bucket = args.bucket
prefix = args.prefix
if args.start_time and args.end_time:
start_time = args.start_time
end_time = args.end_time
if start_time >= end_time:
print('Error: End time must be greater than start time')
return
# search for keys matching the specified prefix
keys = get_s3_keys(bucket, prefix, start_time)
retrieve_objects(bucket, keys, start_time, end_time)
else:
print('Please provide both --start_time and --end_time as Unix timestamps')
if __name__ == "__main__":
main()