-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutil.py
More file actions
151 lines (126 loc) · 5.05 KB
/
util.py
File metadata and controls
151 lines (126 loc) · 5.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from django.conf import settings
import hashlib
import hmac
import logging
import ldap
import base64
from datetime import datetime
import uuid
def calc_url_mac(url, position, recipient, instance_id):
mash = ''.join([str(url), str(position), str(recipient), str(instance_id)]).encode()
return hmac.new(settings.SECRET_KEY.encode(), mash, hashlib.md5).hexdigest()
def calc_open_mac(recipient, instance_id):
mash = ''.join([str(recipient), str(instance_id)]).encode()
return hmac.new(settings.SECRET_KEY.encode(), mash, hashlib.md5).hexdigest()
def calc_unsubscribe_mac(recipient_id):
mash = ''.join([str(recipient_id)]).encode()
return hmac.new(settings.SECRET_KEY.encode(), mash, hashlib.md5).hexdigest()
def calc_unsubscribe_mac_old(recipient_id, email_id):
mash = ''.join([str(recipient_id), str(email_id)]).encode()
return hmac.new(settings.SECRET_KEY.encode(), mash, hashlib.md5).hexdigest()
def create_hash():
return uuid.uuid4()
class LDAPHelper(object):
class LDAPHelperException(Exception):
def __init__(self, error='No addtional information'):
logging.error(': '.join([str(self.__doc__), str(error)]))
class UnableToConnect(LDAPHelperException):
"""
Unable to Connect
"""
pass
class UnableToBind(LDAPHelperException):
""""Unable to Bind"""
pass
class UnableToSearch(LDAPHelperException):
"""Unable to Search"""
pass
class MultipleUsersFound(LDAPHelperException):
"""Single search returned more than one"""
pass
class NoUsersFound(LDAPHelperException):
"""Search did not find any users"""
pass
class UnexceptedResultForm(LDAPHelperException):
"""Search result was in an unexpected"""
pass
class MissingAttribute(LDAPHelperException):
"""A requested attribute is missing"""
pass
def __init__(self):
self.connection = LDAPHelper.connect()
@classmethod
def connect(cls):
try:
# TODO - Figure out why we have to ignore the cert check here
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
ldap.set_option(ldap.OPT_REFERRALS, 0)
return ldap.initialize(settings.LDAP_NET_HOST)
except ldap.LDAPError as e:
raise LDAPHelper.UnableToConnect(e)
@classmethod
def bind(cls, connection, username, password):
try:
connection.simple_bind_s(username + settings.LDAP_NET_USER_SUFFIX, password)
except ldap.LDAPError as e:
raise LDAPHelper.UnableToBind(e)
@classmethod
def search_single(cls, *args, **kwargs):
results = LDAPHelper.search(*args, **kwargs)
if len(results) == 0:
raise LDAPHelper.NoUsersFound()
elif len(results) > 1:
raise LDAPHelper.MultipleUsersFound()
else:
return results[0]
@classmethod
def search(cls, connection, filter_params, filter_string='cn=%s', sizelimit=settings.LDAP_NET_SEARCH_SIZELIMIT):
try:
ldap_filter = filter_string % filter_params
result_id = connection.search_ext(settings.LDAP_NET_BASE_DN,
ldap.SCOPE_SUBTREE,
ldap_filter,
None,
sizelimit=sizelimit)
except ldap.LDAPError as e:
raise LDAPHelper.UnableToSearch(e)
else:
results = []
while 1:
try:
ldap_type, data = connection.result(result_id, 0)
if not data:
break
else:
if ldap_type == ldap.RES_SEARCH_ENTRY:
results.append(data)
except ldap.SIZELIMIT_EXCEEDED:
# Results sizelimit has been reached
break
try:
return list(o[0][1] for o in results)
except ValueError:
raise LDAPHelper.UnexpectedResultForm(e)
@classmethod
def _extract_attribute(cls, ldap_user, attribute):
try:
return ldap_user[attribute][0]
except KeyError as e:
raise LDAPHelper.MissingAttribute(e)
except ValueError as e:
raise LDAPHelper.MissingAttribute(e)
@classmethod
def extract_guid(cls, ldap_user):
return base64.b64encode(LDAPHelper._extract_attribute(ldap_user, 'objectGUID'))
@classmethod
def extract_firstname(cls, ldap_user):
return LDAPHelper._extract_attribute(ldap_user, 'givenName').decode('utf-8')
@classmethod
def extract_lastname(cls, ldap_user):
return LDAPHelper._extract_attribute(ldap_user, 'sn').decode('utf-8')
@classmethod
def extract_email(cls, ldap_user):
return LDAPHelper._extract_attribute(ldap_user, 'mail').decode('utf-8')
@classmethod
def extract_username(cls, ldap_user):
return LDAPHelper._extract_attribute(ldap_user, 'cn').decode('utf-8')