forked from wshon/AutoApiSecret
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrefresh.py
More file actions
111 lines (100 loc) · 3.56 KB
/
refresh.py
File metadata and controls
111 lines (100 loc) · 3.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# -*- coding: UTF-8 -*-
import json
import os
import sys
import urllib.request
from api_list import API_LIST
CURRENT_PATH = os.path.split(os.path.realpath(__file__))[0]
REQUEST_COUNT_MAX = 3
# Cache
REFRESH_TOKEN_PATH = CURRENT_PATH + r'/refresh_token.txt'
# Env
CLIENT_ID = os.getenv('CLIENT_ID')
CLIENT_SECRET = os.getenv('CLIENT_SECRET')
REFRESH_TOKEN = os.getenv('REFRESH_TOKEN')
def load_refresh_token():
try:
print(f'read refresh_token from file: {REFRESH_TOKEN_PATH}')
with open(REFRESH_TOKEN_PATH, "r+") as f:
refresh_token = f.read()
print(f'refresh_token: {refresh_token}')
except FileNotFoundError:
print(f'file {REFRESH_TOKEN_PATH} not found, load from env')
refresh_token = REFRESH_TOKEN
print(f'refresh_token: {refresh_token}')
return refresh_token
def save_refresh_token(refresh_token):
print(f'save refresh_token to file: {REFRESH_TOKEN_PATH}')
with open(REFRESH_TOKEN_PATH, 'w+') as f:
f.write(refresh_token)
def request_token(client_id, client_secret, refresh_token):
url = 'https://login.microsoftonline.com/common/oauth2/v2.0/token'
headers = {
'Content-Type':'application/x-www-form-urlencoded'
}
data = {
'grant_type': 'refresh_token',
'client_id': client_id,
'client_secret':client_secret,
'refresh_token': refresh_token,
'redirect_uri':'http://localhost:53682/'
}
print(f'request url: {url}')
data = urllib.parse.urlencode(data).encode('utf8')
req = urllib.request.Request(url, data, headers)
try:
rsp_c = urllib.request.urlopen(req).read()
try:
rsp_json = json.loads(rsp_c)
refresh_token = rsp_json['refresh_token']
access_token = rsp_json['access_token']
return refresh_token, access_token
except json.decoder.JSONDecodeError:
print(f'rsp json error: {rsp_c.decode()}')
return None, None
except urllib.error.HTTPError as e:
err_c = e.read()
try:
err_json = json.loads(err_c)
print(f'request error: [{err_json["error"]}] {err_json["error_description"]}')
except json.decoder.JSONDecodeError:
print(f'request error: {e.code} {e.reason}')
return None, None
pass
def request_api(url, access_token):
headers = {
'Authorization': access_token,
'Content-Type': 'application/json'
}
print(f'api {url} request...')
req = urllib.request.Request(url, headers=headers)
try:
rsp = urllib.request.urlopen(req)
print(f'api success: {rsp.reason}')
except urllib.error.HTTPError as e:
err_c = e.read()
try:
err_json = json.loads(err_c)
print(f'api error: {err_json["error"]["message"]}')
except json.decoder.JSONDecodeError:
print(f'api error: {e.code} {e.reason}')
pass
def main():
client_id = CLIENT_ID
client_secret = CLIENT_SECRET
refresh_token = load_refresh_token()
refresh_token, access_token = request_token(client_id, client_secret, refresh_token)
print(f'client_id {client_id}')
print(f'client_secret {client_secret}')
print(f'refresh_token {refresh_token}')
if access_token is None:
return
save_refresh_token(refresh_token)
for round in range(REQUEST_COUNT_MAX):
print(f'# api request round {round+1} start...')
for api in API_LIST:
request_api(api, access_token)
pass
print(f'# api request round {round+1} done.')
if __name__ == '__main__':
main()