diff --git a/ddf_utils/chef/procedure.py b/ddf_utils/chef/procedure.py index bc38664..8c05430 100644 --- a/ddf_utils/chef/procedure.py +++ b/ddf_utils/chef/procedure.py @@ -976,3 +976,32 @@ def trend_bridge(ingredient: BaseIngredient, bridge_start, bridge_end, bridge_le return ProcedureResult(result, start.key, merged) else: return ProcedureResult(result, start.key, {target_column: result_data}) + + +@debuggable +def merge_entity(ingredient: BaseIngredient, dictionary: dict, result, merged='drop'): + """merge entities""" + from ..transformer import merge_keys + + data = ingredient.get_data() + + res_data = dict() + for k, df in data: + res_data[k] = merge_keys(df, dictionary, merged) + + return ProcedureResult(result, ingredient.key, res_data) + + +@debuggable +def split_entity(ingredient: BaseIngredient, dictionary: dict, + target_column, result, splited='drop'): + """split entities""" + from ..transformer import split_keys + + data = ingredient.get_data() + + res_data = dict() + for k, df in data: + res_data[k] = split_keys(df, target_column, dictionary, splited) + + return ProcedureResult(result, ingredient.key, res_data) diff --git a/ddf_utils/transformer.py b/ddf_utils/transformer.py index e6b5ed3..4fd2326 100644 --- a/ddf_utils/transformer.py +++ b/ddf_utils/transformer.py @@ -319,3 +319,74 @@ def extract_concepts(dfs, base=None, join='full_outer'): # ingredients_outer join: only keep concepts appears in ingredients concepts = concepts.ix[new_concepts] return concepts.reset_index() + + +def merge_keys(df, dictionary, merged='drop'): + rename_dict = dict() + for new_key, val in dictionary.items(): + for old_key in val: + rename_dict[old_key] = new_key + df_new = df.rename(index=rename_dict).groupby(level=list(range(len(df.index.levels)))).sum() + + if merged == 'drop': + return df_new + elif merged == 'keep': + df_ = df.copy() + df_ = pd.concat([df_, df_new]) + return df_[~df_.index.duplicated()] # remove all duplicated indies + else: + raise ValueError('only "drop", "keep" is allowed') + + +def split_keys(df, target_column, dictionary, splited='drop'): + """split entities""" + keys = df.index.names + df_ = df.reset_index() + + ratio = dict() + + for k, v in dictionary.items(): + masks = [df_[target_column].isin(dictionary[k]['split'])] + [masks.append(df_[c] == x) for c, x in v['at'].items()] + before_spl = df_[np.all(masks, axis=0)].set_index(target_column) + for key in keys: + if key != target_column: + before_spl = before_spl.drop(key, axis=1) + total = before_spl.sum() + ptc = before_spl / total + ratio[k] = ptc.to_dict() + + # the ratio format will be: + # ratio = { + # 'entity_to_split': { + # 'concept_1': { + # 'sub_entity_1': r11, + # 'sub_entity_2': r12, + # ... + # }, + # 'concept_2': { + # 'sub_entity_1': r21, + # 'sub_entity_2': r22, + # ... + # } + # } + # } + + to_concat = [] + for k, v in ratio.items(): + t = df_[df_[target_column] == k].copy() + for x, y in v.items(): + for g, r in y.items(): + t_ = t.copy() + t_[x] = t_[x] * r + t_[target_column] = g + to_concat.append(t_.set_index(keys)) + + final = pd.concat([df, *to_concat]) + if splited == 'drop': + final = final[~final.index.get_level_values(target_column).isin(dictionary.keys())] + return final + elif splited == 'keep': + return final + else: + raise ValueError('only support drop == "drop" and "keep".')