forked from snorkel-team/snorkel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
elastics.py
352 lines (329 loc) · 9.73 KB
/
elastics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
from elasticsearch import Elasticsearch,client
from snorkel import SnorkelSession
from snorkel.models import Document, Sentence,Span
from snorkel.viewer import SentenceNgramViewer
import os
import json
es = Elasticsearch()
session = SnorkelSession()
class ElasticSession:
#define document and index names
def __init__(self,**keyword_parameters):
self.indexName = "corpus"
self.docType = "articles"
self.fieldName = "sentence"
self.elastic_index()
if "cands" in keyword_parameters:
self.generate_tags(keyword_parameters['cands'])
def set_cand(self, Cands):
self.cands = Cands
#get the index mapping
def get_map(self):
mapping = es.indices.get_mapping(self.indexName)
print 'Index Mapping'
print(json.dumps(mapping, indent=2))
#get all index information
def get_index(self):
print 'Index Information: '
print ' '
print es.cat.indices(v='true')
#get a document by its id number
def get_doc(self,iden):
return es.get(index=self.indexName, doc_type=self.docType, id=iden)
#Elasticsearch to SQL mapping
#Index - Database
#Table - Type
#Row - Document
#Values are the data to be added to each document
def elastic_index(self):
#Define our index mapping
request_body = {
'settings' : {
'number_of_shards': 5,
'number_of_replicas': 1,
'analysis':{
'char_filter': {
'quotes': {
#Standardize apostrophes
'type': 'mapping',
'mappings': [
'\u0091=>\u0027',
'\u0092=>\u0027',
'\u2018=>\u0027',
'\u2019=>\u0027',
'\u201B=>\u0027'
]
}
},
'analyzer':{
'my_analyzer':{
'type':'custom',
'tokenizer':'standard',
'char_filter': ['quotes'],
#Remove apostrophes and perform asciifolding
'filter':['apostrophe','asciifolding']
},
#used to remove the unicode marker
'my_stop': {
'type':'stop',
'stopwords': ['u']
}
}
}
},
#define field properties
'mappings': {
self.docType: {
'properties': {
'lineNum':{'type':'integer'},
self.fieldName: {'type': 'text','analyzer':'my_analyzer'},
'tagged':{'type':'text','analyzer':'my_stop'},
'fillCand':{'type':'text','analyzer':'my_stop','search_analyzer':'my_stop'}
}}}}
#create the index
es.indices.create(index = self.indexName, body = request_body)
print 'Begin indexing'
docCount=0
for p in session.query(Document):
docCount+=1
for i in p.sentences:
#analyze the string and create an array of that length of o's
#this will be used for the candidate layer
value=len((es.indices.analyze(index=self.indexName,body={'analyzer':'standard','text':i.text}))['tokens'])
es.index(index=self.indexName, doc_type=self.docType, id=i.id,
body = {
'lineNum': i.id,
self.fieldName: i.text,
'fillCand':['o']*value
})
self.get_index()
print '%d items indexed'%docCount
print ""
def generate_tags(self,Cands):
self.set_cand(Cands)
print "Begin generating tags"
unique=[]
total=0
#Get all the sentences in our candidate set
for c in session.query(Cands).all():
total+=1
unique.append(c[0].sentence_id)
#Turn it into a set to get only the unique sentences
unique = set(unique)
#Used to keep tracking of the candidates that could not be tagged
flagNum=0
flagged=[]
for sent in unique:
#Get all candidates that correspond to a particular sentence
q = session.query(Cands)\
.join(Span, getattr(Cands, Cands.__argnames__[0] + '_id') == Span.id)\
.join(Span.sentence).filter(Sentence.id == sent).all()
#Get the term vector of the sentence. We will use this to determine
#where the candidate is in the sentence
vector=es.termvectors(
index=self.indexName,
doc_type=self.docType,
id=sent,
body ={
'fields' : [self.fieldName],
'positions' : 'true'
})
temp = []
for p in q:
for num in range(0,2):
candidate= p[num].get_span()
#Candidates can be more the one word so we asciifold and split the candidates
#on the spaces
value=es.indices.analyze(index=self.indexName,body={'analyzer':'my_analyzer','text':candidate})['tokens']
for vectorized in value:
temp.append(vectorized['token'])
#Get the candidate array that we will modify
hold=es.get(index=self.indexName, doc_type='articles', id=sent)['_source']['fillCand']
for tagObj in temp:
try:
#Candidates can appear multiple times in a sentence so we get the
#total number of occurances
limit = vector['term_vectors'][self.fieldName]['terms'][tagObj]['term_freq']
for i in range(0,limit):
#Find the candidate position and tag that index
index=vector['term_vectors'][self.fieldName]['terms'][tagObj]['tokens'][i]['position']
hold[index]='OBJECT'
#Used to handle candidates that could not be found
except KeyError:
flagNum+=1
flagged.append([sent,tagObj])
#Arrays have an implicit 100 positional gap between indices which
#make the search queries behave weirdly. To compensate we change
#the array to a string and add it to a new field.
turnAr = ' '.join((e).decode('utf-8') for e in hold)
es.update(index=self.indexName, doc_type=self.docType, id=sent,
body={'doc':{'fillCand':hold,'tagged':turnAr}})
#Most candidates that can not be tagged are ones that correspond to punctuation and spaces
#those are automatically stripped when the string is tokenized
print '%d candidates of %d tagged'%((total-flagNum),(total))
def search_index(self,keyWord,*args,**keyword_parameters):
check = 0
if keyWord == 'match':
for hold,query in enumerate(args):
#Match phrase if there is a slop value
if 'slop' in keyword_parameters:
sQuery={
'match_phrase':{
self.fieldName:{
'query':query,
'slop':keyword_parameters['slop']
}
}
}
else:
#Match query if no slop is defined
sQuery={
'match': {
self.fieldName: {
'query':query
}
}
}
#Query a specific field where we can about the order
#position(value1)<position(value2)<position(value3) etc
elif keyWord=='position':
holdVal=[]
if 'slop' in keyword_parameters:
dist = keyword_parameters['slop']
else:
dist=0
for hold,values in enumerate(args):
holdVal.append({ 'span_term' : { self.fieldName : values } })
sQuery={
'span_near' : {
'clauses' : holdVal,
'slop' : dist,
'in_order' : 'true'
}
}
#Query two fields in parallel respective of order
#the mask searches the tagged for object then switches to the fieldName to search for value
#before switching back to tagged to search for object again
elif keyWord=='between_cand':
check=1
if 'slop' in keyword_parameters:
dist = keyword_parameters['slop']
else:
dist=0
for hold,value in enumerate(args):
sQuery={
'span_near': {
'clauses': [
{'span_term': {'tagged': 'object'}},
{'field_masking_span': {
'query': {
'span_term': {
self.fieldName: value
}
},
'field': 'tagged'}
},
{'span_term': {'tagged': 'object'}},
],
'slop': dist,
'in_order': 'true'
}
}
#Query two fields in parallel respective of order
#Searches the fieldName first for the value then switches to the tagged
#field for the OBJECT tag
elif keyWord == 'before_cand':
check=1
holdVal=[]
if 'slop' in keyword_parameters:
dist = keyword_parameters['slop']
else:
dist=0
for hold,values in enumerate(args):
sQuery={
'span_near': {
'clauses': [
{'span_term':
{ self.fieldName : values }},
{'field_masking_span': {
'query': {
'span_term': {
'tagged': 'object'
}
},
'field': self.fieldName}
}
],
'slop': dist,
'in_order': 'true'
}
}
#Query two fields in parallel respective of order
#Searches the tagged field first for object then switches to the fieldName
#for the value
elif keyWord == 'after_cand':
check=1
if 'slop' in keyword_parameters:
dist = keyword_parameters['slop']
else:
dist=0
for hold,values in enumerate(args):
sQuery={
'span_near': {
'clauses': [
{'span_term':
{'tagged': 'object'}},
{'field_masking_span': {
'query': {
'span_term': {
self.fieldName : values
}
},
'field': 'tagged'}
}
],
'slop': dist,
'in_order': 'true'
}
}
else:
print 'QUERY TYPE NOT FOUND'
return
#Size indicates how many search results to return
if 'size' in keyword_parameters:
numRes = keyword_parameters['size']
else:
numRes=5
#Perform the query
searchResult = es.search(
size =numRes,
index=self.indexName,
doc_type=self.docType,
body={
'query': sQuery
})
temp=[]
print "Number of hits: %d" %searchResult['hits']['total']
#get sentence numbers from the search results
for i in searchResult['hits']['hits']:
temp.append(i['_source']['lineNum'])
holdCands=[]
if check ==1:
for i in temp:
#query the candidate set for all spans with the sentence number
q = session.query(self.cands)\
.join(Span, getattr(self.cands, self.cands.__argnames__[0] + '_id') == Span.id)\
.join(Span.sentence).filter(Sentence.id == i).all()
for span in q:
holdCands.append(span)
else:
for i in temp:
#get sentence using sentence number
q=session.query(Sentence).filter(Sentence.id ==i).all()
holdCands.append(q[0])
#returns candidate object
return holdCands
#deletes an elasticsearch index taking the index name as a parameter
#the _all flag will delete all indecies
def delete_index(indexName):
print es.indices.delete(index=indexName,ignore=404)