-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrequestqueue.py
More file actions
135 lines (113 loc) · 3.81 KB
/
requestqueue.py
File metadata and controls
135 lines (113 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import urllib3
import certifi
import abc
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
class ThreadedRequestHandler(metaclass=abc.ABCMeta):
"""
Base class for all request classes
"""
def __init__(self, url, method):
"""
Instantiates a handler to for the results of `url`.
"""
self._url = url
self.method = method
def __repr__(self):
n = self.__class__.__name__
return f'<{n} at 0x{id(self):x}>'
def __call__(self, pool_request, **kwargs):
return self.parse(pool_request(self.method, self.url, **kwargs))
@property
def url(self):
return self._url
@abc.abstractmethod
def parse(self, res):
pass
@abc.abstractmethod
def callback(self, thread):
pass
class SimpleThreadedRequestHandler(ThreadedRequestHandler):
def __init__(self, url, method='GET'):
"""
A very basic handler class that decodes the response from the
url and uses the thread callback to store the results in the
`result` attribute when the thread finishes.
"""
super().__init__(url, method)
self.result = None
def parse(self, res):
# res is an urllib3 http object. the `data` attribute holds
# the raw return.
# return as unicode instead of bytes
return res.data.decode('utf-8')
def callback(self, thread):
# copy thread result back to the handler object
self.result = thread.result()
class RequestQueue:
"""
Class for queuing up http requests to be processed.
"""
def __init__(self, pool_size=5):
self._pool_size = pool_size
self.thread_pool = ThreadPoolExecutor(pool_size)
self.http_pool = urllib3.PoolManager(
maxsize=self.pool_size,
cert_reqs='CERT_REQUIRED',
ca_certs=certifi.where()
)
self.headers = {
'User-Agent': 'Python/RequestQueue',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
self.futures = {}
def __repr__(self):
s = self.status
f = s.get('FREE')
c = s.get('FINISHED', 0)
z = self.pool_size
i = id(self)
return f'<RequestQueue {c} results {f}/{z} threads free at 0x{i:x}>'
@property
def pool_size(self):
return self._pool_size
def add_request_from_url(self,
url,
handler=SimpleThreadedRequestHandler,
**kwargs):
"""
Convenience method for adding a request to the queue.
:url: [str] url to retrieve
:request: [ThreadedRequestHandler] the type of handler object to
instantiate
:kwargs: additional key-word arguments to pass to the
ThreadedRequestHandler
"""
h = handler(url, **kwargs)
self.add_request(h)
return h
def add_request(self, handler):
"""
Add a request to the queue based on the `handler` object url.
"""
f = self.thread_pool.submit(
handler,
self.http_pool.request,
headers=self.headers
)
f.add_done_callback(handler.callback)
self.futures[f] = handler
def retrieve_completed(self):
completed = [f for f in self.futures if f.done()]
return [(f.result(), self.futures.pop(f)) for f in completed]
@property
def status(self):
d = dict(Counter(f._state for f in self.futures))
d['FREE'] = self.pool_size - d.get('RUNNING', 0)
return d
@property
def pending(self):
return self.status.get('PENDING', 0)