-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathmutt-ldap.py
executable file
·189 lines (159 loc) · 6.1 KB
/
mutt-ldap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/usr/bin/env python
#
# Copyright (C) 2008-2011 W. Trevor King
# Copyright (C) 2012 Wade Berrier
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""LDAP address searches for Mutt.
Add :file:`mutt-ldap.py` to your ``PATH`` and add the following line
to your :file:`.muttrc`::
set query_command = "mutt-ldap.py '%s'"
Search for addresses with `^t`, optionally after typing part of the
name. Configure your connection by creating :file:`~/.mutt-ldap.rc`
contaning something like::
[connection]
server = myserver.example.net
basedn = ou=people,dc=example,dc=net
See the `CONFIG` options for other available settings.
"""
import email.utils
import itertools
import os.path
import ConfigParser
import pickle
import ldap
import ldap.sasl
CONFIG = ConfigParser.SafeConfigParser()
CONFIG.add_section('connection')
CONFIG.set('connection', 'server', 'domaincontroller.yourdomain.com')
CONFIG.set('connection', 'port', '389') # set to 636 for default over SSL
CONFIG.set('connection', 'ssl', 'no')
CONFIG.set('connection', 'starttls', 'no')
CONFIG.set('connection', 'basedn', 'ou=x co.,dc=example,dc=net')
CONFIG.add_section('auth')
CONFIG.set('auth', 'user', '')
CONFIG.set('auth', 'password', '')
CONFIG.set('auth', 'gssapi', 'no')
CONFIG.add_section('query')
CONFIG.set('query', 'filter', '') # only match entries according to this filter
CONFIG.set('query', 'search_fields', 'cn uid mail') # fields to wildcard search
CONFIG.add_section('results')
CONFIG.set('results', 'optional_column', '') # mutt can display one optional column
CONFIG.add_section('cache')
CONFIG.set('cache', 'enable', 'yes') # enable caching by default
CONFIG.set('cache', 'path', '~/.mutt-ldap.cache') # cache results here
#CONFIG.set('cache', 'longevity_days', '14') # TODO: cache results for 14 days by default
CONFIG.read(os.path.expanduser('~/.mutt-ldap.rc'))
def connect():
protocol = 'ldap'
if CONFIG.getboolean('connection', 'ssl'):
protocol = 'ldaps'
url = '%s://%s:%s' % (
protocol,
CONFIG.get('connection', 'server'),
CONFIG.get('connection', 'port'))
connection = ldap.initialize(url)
if CONFIG.getboolean('connection', 'starttls') and protocol == 'ldap':
connection.start_tls_s()
if CONFIG.getboolean('auth', 'gssapi'):
sasl = ldap.sasl.gssapi()
connection.sasl_interactive_bind_s('', sasl)
else:
connection.bind(
CONFIG.get('auth', 'user'),
CONFIG.get('auth', 'password'),
ldap.AUTH_SIMPLE)
return connection
def search(query, connection):
post = ''
if query:
post = '*'
filterstr = '(|%s)' % (
u' '.join([u'(%s=*%s%s)' % (field, query, post)
for field in CONFIG.get('query', 'search_fields').split()]))
filter = CONFIG.get('query', 'filter')
if filter:
filterstr = u'(&(%s)%s)' % (filter, filterstr)
msg_id = connection.search(
CONFIG.get('connection', 'basedn'),
ldap.SCOPE_SUBTREE,
filterstr.encode('utf-8'))
return msg_id
def format_columns(address, data):
yield address
for c in ['cn'] + [CONFIG.get('results', 'optional_column')]:
if c in data:
yield data[c][-1]
def format_entry(entry):
cn,data = entry
if 'mail' in data:
for m in data['mail']:
# Format: tab separated columns
# http://www.mutt.org/doc/manual/manual.html#toc4.5
yield "\t".join(format_columns(m, data))
def cache_filename(query):
# TODO: is the query filename safe?
return os.path.expanduser(CONFIG.get('cache', 'path')) + os.sep + query
def settings_match(serialized_settings):
"""Check to make sure the settings are the same for this cache"""
return pickle.dumps(CONFIG) == serialized_settings
def cache_lookup(query):
hit = False
addresses = []
if CONFIG.get('cache', 'enable') == 'yes':
cache_file = cache_filename(query)
cache_dir = os.path.dirname(cache_file)
if not os.path.exists(cache_dir): os.mkdir(cache_dir)
# TODO: validate longevity setting
if os.path.exists(cache_file):
cache_info = pickle.loads(open(cache_file).read())
if settings_match(cache_info['settings']):
hit = True
addresses = cache_info['addresses']
# DEBUG
#print "Cache hit?: " + str(hit)
return hit, addresses
def cache_persist(query, addresses):
cache_info = {
'settings': pickle.dumps(CONFIG),
'addresses': addresses
}
fd = open(cache_filename(query), 'w')
pickle.dump(cache_info, fd)
fd.close()
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
sys.stderr.write('%s: no search string given\n' % sys.argv[0])
sys.exit(1)
query = unicode(' '.join(sys.argv[1:]), 'utf-8')
(cache_hit, addresses) = cache_lookup(query)
if not cache_hit:
connection = connect()
msg_id = search(query, connection)
# wacky, but allows partial results
while True:
try:
res_type, res_data = connection.result(msg_id, 0)
except ldap.ADMINLIMIT_EXCEEDED:
#print "Partial results"
break
# last result will have this set
if res_type == ldap.RES_SEARCH_RESULT:
break
addresses += [entry for entry in format_entry(res_data[-1])]
# Cache results for next lookup
cache_persist(query, addresses)
print '%d addresses found:' % len(addresses)
print '\n'.join(addresses)