Skip to content

Commit

Permalink
new: procedure for merging/spliting entities
Browse files Browse the repository at this point in the history
see #58
  • Loading branch information
semio committed Apr 24, 2017
1 parent 07a7a74 commit 0250875
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 0 deletions.
29 changes: 29 additions & 0 deletions ddf_utils/chef/procedure.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,3 +976,32 @@ def trend_bridge(ingredient: BaseIngredient, bridge_start, bridge_end, bridge_le
return ProcedureResult(result, start.key, merged)
else:
return ProcedureResult(result, start.key, {target_column: result_data})


@debuggable
def merge_entity(ingredient: BaseIngredient, dictionary: dict, result, merged='drop'):
"""merge entities"""
from ..transformer import merge_keys

data = ingredient.get_data()

res_data = dict()
for k, df in data:
res_data[k] = merge_keys(df, dictionary, merged)

return ProcedureResult(result, ingredient.key, res_data)


@debuggable
def split_entity(ingredient: BaseIngredient, dictionary: dict,
target_column, result, splited='drop'):
"""split entities"""
from ..transformer import split_keys

data = ingredient.get_data()

res_data = dict()
for k, df in data:
res_data[k] = split_keys(df, target_column, dictionary, splited)

return ProcedureResult(result, ingredient.key, res_data)
71 changes: 71 additions & 0 deletions ddf_utils/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,74 @@ def extract_concepts(dfs, base=None, join='full_outer'):
# ingredients_outer join: only keep concepts appears in ingredients
concepts = concepts.ix[new_concepts]
return concepts.reset_index()


def merge_keys(df, dictionary, merged='drop'):
rename_dict = dict()
for new_key, val in dictionary.items():
for old_key in val:
rename_dict[old_key] = new_key
df_new = df.rename(index=rename_dict).groupby(level=list(range(len(df.index.levels)))).sum()

if merged == 'drop':
return df_new
elif merged == 'keep':
df_ = df.copy()
df_ = pd.concat([df_, df_new])
return df_[~df_.index.duplicated()] # remove all duplicated indies
else:
raise ValueError('only "drop", "keep" is allowed')


def split_keys(df, target_column, dictionary, splited='drop'):
"""split entities"""
keys = df.index.names
df_ = df.reset_index()

ratio = dict()

for k, v in dictionary.items():
masks = [df_[target_column].isin(dictionary[k]['split'])]
[masks.append(df_[c] == x) for c, x in v['at'].items()]
before_spl = df_[np.all(masks, axis=0)].set_index(target_column)
for key in keys:
if key != target_column:
before_spl = before_spl.drop(key, axis=1)
total = before_spl.sum()
ptc = before_spl / total
ratio[k] = ptc.to_dict()

# the ratio format will be:
# ratio = {
# 'entity_to_split': {
# 'concept_1': {
# 'sub_entity_1': r11,
# 'sub_entity_2': r12,
# ...
# },
# 'concept_2': {
# 'sub_entity_1': r21,
# 'sub_entity_2': r22,
# ...
# }
# }
# }

to_concat = []
for k, v in ratio.items():
t = df_[df_[target_column] == k].copy()
for x, y in v.items():
for g, r in y.items():
t_ = t.copy()
t_[x] = t_[x] * r
t_[target_column] = g
to_concat.append(t_.set_index(keys))

final = pd.concat([df, *to_concat])
if splited == 'drop':
final = final[~final.index.get_level_values(target_column).isin(dictionary.keys())]
return final
elif splited == 'keep':
return final
else:
raise ValueError('only support drop == "drop" and "keep".')

0 comments on commit 0250875

Please sign in to comment.