This repository was archived by the owner on Feb 28, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb.py
More file actions
217 lines (184 loc) · 5.78 KB
/
db.py
File metadata and controls
217 lines (184 loc) · 5.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import psycopg2
import psycopg2.extras
import Analysis_3 as A
# Database user credentials
DATABASE = "seads"
USER = "seadapi"
TABLE = "data_raw"
def query(parsed_url):
"""
Handle parsed URL data and query the database as appropriate
:param parsed_url: Array of url parameters
:return: Generator of result strings
"""
if 'device_id' not in parsed_url.keys():
raise Exception("Received malformed URL data")
header = ['time', 'I', 'W', 'V', 'T']
start_time = end_time = data_type = subset = limit = None
json = reverse = classify = False
if 'type' in parsed_url.keys():
data_type = parsed_url['type']
header = ['time', parsed_url['type']]
if 'start_time' in parsed_url.keys():
start_time = parsed_url['start_time']
if 'end_time' in parsed_url.keys():
end_time = parsed_url['end_time']
if 'subset' in parsed_url.keys():
subset = parsed_url['subset']
if 'limit' in parsed_url.keys():
limit = parsed_url['limit']
if 'json' in parsed_url.keys():
json = parsed_url['json']
if 'reverse' in parsed_url.keys():
reverse = parsed_url['reverse']
if 'classify' in parsed_url.keys():
classify = parsed_url['classify']
results = retrieve_within_filters(
parsed_url['device_id'],
start_time,
end_time,
data_type,
subset,
limit,
reverse,
)
if classify:
if serial and start_time and end_time:
classification = A.run(results)
return classification
else:
raise Exception("Received malformed URL data")
else:
return format_data(header, results, json)
def retrieve_within_filters(device_id, start_time, end_time, data_type, subset, limit, reverse):
"""
Return sensor data for a device within a specified timeframe
:param device_id: The serial number of the device in question
:param start_time: The start of the time range for which to query for data
:param end_time: The end of the time range for which to query for data
:param data_type: The type of data to query for
:param subset: The size of the subset
:param limit: Truncate result to this many rows
:param reverse: Return results in reverse order
:return: Generator of database row tuples
"""
# Initialize parameter list and WHERE clause
params = [device_id]
where = "WHERE serial = %s"
# Add subset size parameter if set
if subset:
params.insert(0, float(subset) + 1.0)
# Generate WHERE clause
if start_time and end_time:
where += " AND time BETWEEN to_timestamp(%s) AND to_timestamp(%s)"
params.append(start_time)
params.append(end_time)
elif start_time:
where += " AND time >= to_timestamp(%s)"
params.append(start_time)
elif end_time:
where += " AND time <= to_timestamp(%s)"
params.append(end_time)
if data_type:
if where:
where += " AND type = %s"
else:
where += " AND type = %s"
params.append(data_type)
query = "SELECT time, data FROM " + TABLE + " as raw " + where
if subset:
query = write_subsample(query, False)
else:
# If no data type is set we return all data types
query = write_crosstab(where, TABLE)
if subset:
query = write_subsample(query, True)
# Required for LIMIT, analysis code assumes sorted data
query += " ORDER BY time"
if reverse:
query += " ASC"
else:
query += " DESC"
if limit:
query += " LIMIT %s"
params.append(limit)
query += ";"
rows = perform_query(query, tuple(params))
return rows
def write_crosstab(where, data = TABLE):
"""
Write a PostgreSQL crosstab() query to create a pivot table and rearrange the data into a more useful form
:param where: WHERE clause for SQL query
:param data: Table or subquery from which to get the data
:return: Complete SQL query
"""
query = "SELECT * FROM crosstab(" +\
"'SELECT time, type, data from " + data + " as raw " + where + "'," +\
" 'SELECT unnest(ARRAY[''I'', ''W'', ''V'', ''T''])') " + \
"AS ct_result(time TIMESTAMP, I SMALLINT, W SMALLINT, V SMALLINT, T SMALLINT)"
return query
def perform_query(query, params):
"""
Initiate a connection to the database and return a cursor to return db rows a dictionaries
:param query: SQL query string
:param params: List of SQL query parameters
:return: Result cursor
"""
con = None
try:
con = psycopg2.connect("dbname='" + DATABASE +
"' user='" + USER + "'")
cursor = con.cursor()
print("Query:", query)
print("Parameters:", params)
cursor.execute(query, params)
return cursor.fetchall()
except psycopg2.DatabaseError as e:
print('Database error: %s' % e)
finally:
if con:
con.close()
def format_data_row(row):
"""
Formats result row into result row string
:param row: Result row
:return: Result row string
"""
return '[' + ", ".join(map(lambda x: '"' + str(x) + '"', row)) + ']'
def format_data(header, data, json=False):
"""
Process rows of data returned by the db and format them appropriately
:param header: The first row of the result
:param data: Result cursor
:param json: Whether or not to use JSON format.
:return: Generator of result strings
"""
if json:
yield '{\n"data": '
yield "[\n" + format_data_row(header) # No comma before header
for row in data:
yield ',\n' + format_data_row(row)
yield "\n]\n"
if json:
yield "}\n"
def write_subsample(query, crosstab=False):
"""
Adds subsampling to a query. This should be the absolute last step in query building. This function call should be immediately proceeded with params.insert(0, subset).
:param query: The exiting query to subsample
:param crosstab: Whether or not the query is a crosstab
:return: Query with subsampling enabled.
"""
new_query = "SELECT "
if crosstab:
new_query += "time, I, W, V, T" # SELECT all data type columns
else:
new_query += "time, data" # Single data type query
new_query += ''' FROM (
SELECT *, ((row_number() OVER (ORDER BY "time"))
%% ceil(count(*) OVER () / %s)::int) AS rn
FROM ('''
new_query += query
new_query += ''') AS subquery
) sub
WHERE sub.rn = 0'''
return new_query