How to use listen with PostgreSQL on_conflict_do? #11387
-
Before we start, here's the mvp we'll talk about next. from sqlalchemy.dialects.postgresql import Insert as Pg_Insert
# A class that implements upsert, in a form convenient for me.
class Upsert(Pg_Insert):
def __init__(self, table: BaseTable, values: ValidDT) -> None:
"""Doc."""
self.values: ValidDT = self._validate(values)
self.columns, self.vl = self._separete_columns()
super().__init__(table)
def with_update(
self,
columns: List[str] = None,
constraint: str = 'pk'
) -> Pg_Insert:
LOG.info(f'Begin insert into {self.table.name}')
columns = columns or self.columns
constraint = self._get_constraint_by_name(constraint)
stmt = self._pg_insert()
return stmt.on_conflict_do_update(
set_={i: stmt.excluded.get(i) for i in columns},
constraint=constraint
)
def with_do_nothing(self, **kwargs) -> Insert:
if constraint := kwargs.get('constraint'):
constraint = self._get_constraint_by_name(constraint)
kwargs.update({'constraint': constraint})
return self._pg_insert().on_conflict_do_nothing(**kwargs)
def with_raise(self) -> Insert:
return insert(self.table)
def _separete_columns(self) -> Tuple[List[str], List[Dict]]:
if isinstance(self.values, Dict):
return list(self.values.keys()), [self.values]
if isinstance(self.values, List):
return [list(i.keys()) for i in self.values][-1], self.values
if isinstance(self.values, DataFrame):
return self.values.columns.values.tolist(), \
self.values.to_dict(orient='records')
raise TypeError('Invalid type for values')
def _get_constraint_by_name(
self,
constraint: str = 'pk'
) -> Constraint:
"""Doc."""
if constraint == 'pk' or constraint == self.table.primary_key:
return self.table.primary_key
constraints: set[Constraint] = self.table.constraints
if constraint in [i.name for i in constraints]:
return constraint
raise ValueError(
f'No such constraint ({constraint}) in table: {self.table}')
def _validate(self, values: ValidDT) -> List[Dict]:
if isinstance(values, List):
return values
if isinstance(values, Dict):
return [values]
if isinstance(values, DataFrame):
return values.to_dict(orient='records')
raise TypeError(f'No support for type {type(values)}')
def _pg_insert(self) -> Pg_Insert:
return pg_insert(self.table).values(self.values)
def upsert(table, values) -> Upsert:
return Upsert(table, values)
####################
# Class with sqlalchemy crud operations
class OrmWorker:
_default_settings = _DefaultSettings()
def __init__(
self,
driver: str = _default_settings.drivername,
user: str = _default_settings.username,
password: str = _default_settings.password,
host: str = _default_settings.host,
port: int = _default_settings.port,
database: DatabaseT = _default_settings.database,
echo: bool = False,
expire_on_commit: bool = False
) -> None:
"""Doc."""
self.driver: str = driver
self.user: str = user
self.password: str = password
self.host: str = host
self.port: int = port
self.database: str = database
self.engine = create_async_engine(self.url, echo=echo)
self.session_maker = async_sessionmaker(
self.engine,
class_=AsyncSession,
expire_on_commit=expire_on_commit
)
@property
def url(self) -> URL:
return URL.create(
self.driver,
self.user,
self.password,
self.host,
self.port,
self.database
)
@to_dict_
def insert(self, table: BaseTable, values: Dict) -> Insert: # Just return basic insert function
return insert(table).values(values)
@to_dict_
def update(self, table: BaseTable, values) -> Update: # Just return basic update function
return update(table).values(values)
@to_dict_
def pg_insert(
self,
table: BaseTable,
values: ValidDT,
upd_columns: List[str] = None,
constraint: str = 'pk',
on_conflict: Literal['do_update', 'do_nothing', 'raise'] = 'raise'
) -> Pg_Insert | Insert: # Return Pg_insert which sqlalchemy.dialects.postgresql.Insert object
"""Doc."""
if on_conflict == 'do_update':
return upsert(table, values).with_update(upd_columns, constraint)
if on_conflict == 'do_nothing':
return upsert(table, values).with_do_nothing()
if on_conflict == 'raise':
return self.insert(table, values)
raise AttributeError(
f'on_conflict got incorrect value: {on_conflict}'
'Value should be one of ["do_update", "do_nothing", "raise"]'
) At first I tried to do add "listen" for insert, update, delete operations. def insert(self, table: BaseTable, values: Dict) -> Insert:
listen(table, 'before_insert', b_insert) # just print("this is insert")
return insert(table).values(values) But after reading the documentation more carefully, I saw that this is only suitable for add and delete operations. For functions that are executed inside session.execute() you need @listens_for(AsyncSession.sync_session_class, 'do_orm_execute'). Here is my example: @listens_for(AsyncSession.sync_session_class, 'do_orm_execute')
def orm_execute_log(orm_execute_state: ORMExecuteState):
if orm_execute_state.is_insert:
print('THIS IS INSERT')
if orm_execute_state.is_update:
print('THIS IS UPDATE')
if orm_execute_state.is_delete:
print('THIS IS DELETE') But when I do pg_insert it's always insert, but never update. How can I separate the behavior if insert.on_conflict_do_nothing and insert.on_conflict_do_update are executed? For example, I would like to display how many rows were updated, which columns, or which constraint were used. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
Hi, The is_update flag refers to an update statement. All insert statements, regardless of At the moment I don't think there are public accessors to get what value was used, but you can take a look at the source and likely use @zzzeek are there better options? |
Beta Was this translation helpful? Give feedback.
Hi,
The is_update flag refers to an update statement. All insert statements, regardless of
on_conflict
qualification are identified as inserts.At the moment I don't think there are public accessors to get what value was used, but you can take a look at the source and likely use
_post_values_clause
. https://github.com/sqlalchemy/sqlalchemy/blob/main/lib/sqlalchemy/dialects/postgresql/dml.py@zzzeek are there better options?