Utility to convert JSON/objects to SQLAlchemy queryset, which is used later to generate SQL queries to fetch data from RDBMS.
pip install sqlalchemy-json-querybuilder
-
Multiple operators' support.
- Support for Filter operators.
- Support for Relationship operators i.e.
any
,has
.
-
Filter in relationship as well as in collections.
-
Pagination using windowing & slicing. Pagination can be disabled if needed.
-
Ordering/Sorting in
ASC
&DESC
order. -
Supports
AND
&OR
, so multiple query criterion can be glued and bundled usingAND
orOR
as follows -criteria = { 'and': [and_criterion_dict_1, and_criterion_dict_2, ... and_criterion_dict_n], 'or': [or_criterion_dict_1, or_criterion_dict_2, ... or_criterion_dict_n] }
which is equivalent to -
SELECT field_1, field_2..field_n FROM some_table WHERE (and_criterion_dict_1 AND and_criterion_dict_2 AND....AND and_criterion_dict_n) AND (or_criterion_dict_1 OR or_criterion_dict_2 OR....OR or_criterion_dict_n);
-
# Each criterion has 3 attributes: field_name, operator, field_value criterion_1 = { 'field_name': 'MyModel1.some_field', 'operator': 'some_operator' # Supported operators are listed below 'field_value': 'some_value' } # Once all the critera are defined in the form of dictionary/object, bundle them as follows - filter_by = { 'and': [criterion_1, criterion_2,....criterion_n], 'or': [other_criterion_1, other_criterion_2,....other_criterion_n] } # If there are `and` critera only, then they can be bundled in following 2 ways - filter_by = [criterion_1, criterion_2,....criterion_n] # Alternative way to bundle `and` criteria filter_by = { 'and': [criterion_1, criterion_2,....criterion_n] } # If there are `or` critera only, then they can be bundled as - filter_by = { 'or': [criterion_1, criterion_2,....criterion_n] }
-
ordering = ['MyModel1.some_field', '-MyModel1.other_field'] # `-` sign indicates DESC order.
-
Following 3 attributes are used to control pagination:
page
: Current page number.per_page
: Number of records to be displayed on a page.all
: Defaults toFalse
, make itTrue
in order to disable the pagination and fetch all records at once.
-
from sqlalchemy_json_querybuilder.querybuilder.search import Search # session - SqlAlchemy session # 'some_module.models' - Package/module where all the models are placed. search_obj = Search(session, 'some_module.models', (MyModel1,), filter_by=criteria, order_by=ordering, page=1, per_page=10, all=False) # `results` property will query the DB and fetch the results, Results contains `data` & `count` results = search_obj.results # SQLAlchemy `queryset` can also be obtanied, all the functions supported by SQLAlchemy on queryset can be invoked on the underlying queryset and later records can be fetched - queryset = search_obj.query queryset = queryset.join(Address, User.id==Address.user_id).join(UserProfile) # Fetching records results = queryset.all()
Following operators are supported -
equals
, eq
, ==
, =
,
not_equals
, ne
, !=
, ~=
,
less_than
, lt
, <
,
less_than_equals
, lte
, <=
,
greater_than
, gt
, >
,
greater_than_equals
, gte
, >=
,
like
, ilike
,
startswith
, istartswith
, endswith
, iendswith
,
contains
, icontains
,
match
,
in
, notin
,
isnull
, isnotnull
,
any
, has
Note -
i
stands forcase insensitive
.
-
filter_by = [{ 'field_name': 'User.addresses', 'operator': 'any', 'field_value': { 'field_name': 'Address.email_address', 'operator': 'equals', 'field_value': 'bar' } }]
is translated to
query.filter(User.addresses.any(Address.email_address == 'bar')) # also takes keyword arguments: query.filter(User.addresses.any(email_address='bar'))
-
filter_by = [{ 'field_name': 'Address.user', 'operator': 'has', 'field_value': { 'field_name': 'User.name', 'operator': 'equals', 'field_value': 'bar' } }]
is translated to
query.filter(Address.user.has(name='ed'))
-
filter_by = [dict(field_name='User.name', field_value='ed', operator='equals')]
is translated to
query.filter(User.name == 'ed')
-
filter_by = [dict(field_name='User.name', field_value='ed', operator='not_equals')]
is translated to
query.filter(User.name != 'ed')
-
filter_by = [dict(field_name='User.age', field_value=18, operator='lt')]
is translated to
query.filter(User.age < 18)
-
filter_by = [dict(field_name='User.age', field_value=18, operator='lte')]
is translated to
query.filter(User.age <= 18)
-
filter_by = [dict(field_name='User.age', field_value=18, operator='gt')]
is translated to
query.filter(User.age > 18)
-
filter_by = [dict(field_name='User.age', field_value=18, operator='gte')]
is translated to
query.filter(User.age >= 18)
-
filter_by = [dict(field_name='User.name', field_value=['ed', 'wendy', 'jack'], operator='in')]
is translated to
query.filter(User.name.in_(['ed', 'wendy', 'jack']))
-
filter_by = [dict(field_name='User.name', field_value=['ed', 'wendy', 'jack'], operator='notin')]
is translated to
query.filter(~User.name.in_(['ed', 'wendy', 'jack']))
-
filter_by = [dict(field_name='User.name', field_value=null, operator='isnull')]
is translated to
query.filter(User.name == None) # alternatively, if pep8/linters are a concern query.filter(User.name.is_(None))
-
filter_by = [dict(field_name='User.name', field_value=null, operator='isnotnull')]
is translated to
query.filter(User.name != None) # alternatively, if pep8/linters are a concern query.filter(User.name.isnot(None))
-
filter_by = [dict(field_name='User.name', field_value='ed', operator='contains')]
is translated to
query.filter(User.name.like('%ed%'))
-
filter_by = [dict(field_name='User.name', field_value='ed', operator='startswith')]
is translated to
query.filter(User.name.like('ed%'))
-
filter_by = [dict(field_name='User.name', field_value='ed', operator='endswith')]
is translated to
query.filter(User.name.like('%ed'))
-
filter_by = [dict(field_name='User.name', field_value='wendy', operator='match')]
is translated to
query.filter(User.name.match('wendy'))
Some examples are given below. More examples can be found here.
#-------------- Creating connection & session ---------------#
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
Base = declarative_base()
con_url = 'mysql+pymysql://{username}:{password}@{host}:{port}/{database}'.format(
username='root', password='', host='localhost', port=3306, database='test'
)
engine = create_engine(con_url, pool_recycle=3600)
# Set up the session
session_maker = sessionmaker(bind=engine, autoflush=True, autocommit=False, expire_on_commit=True)
session = scoped_session(session_maker)
#-------------- Models ---------------#
from uuid import uuid4
from sqlalchemy import Column, Integer, String, Text, ForeignKey
from sqlalchemy.orm import relationship
def generate_uuid():
return str(uuid4())
class NotificationGroup(Base):
__tablename__ = "notification_group"
id = Column("id", String(75), primary_key=True, default=generate_uuid)
client_id = Column('client_id', Integer, nullable=False)
denotation = Column('denotation', String(250), nullable=False)
description = Column('description', String(500))
customers_sites = Column('customers_sites', Text, nullable=False)
group_mappings = relationship("NotificationGroupMapping", backref="notification_group_mapping", lazy='dynamic')
class NotificationGroupMapping(Base):
__tablename__ = "notification_group_mapping"
id = Column("id", String(75), primary_key=True, default=generate_uuid)
notification_group_id = Column(String(75), ForeignKey('notification_group.id'))
event_id = Column(String(75), nullable=False)
recipient_id = Column(String(75), ForeignKey('recipient_group.id'))
recipient = relationship("Recipient")
is_used = Column(String(75), nullable=False)
class Recipient(Base):
__tablename__ = 'recipients'
client_id = Column('client_id', Integer, nullable=False)
user_id = Column('user_id', Integer, nullable=False)
email = Column('email', String(256), nullable=False)
#-------------- Query -------------#
from sqlalchemy_json_querybuilder.querybuilder.search import Search
# `filter_by` can have multiple criteria objects bundled as a list.
filter_by = [{
"field_name": "NotificationGroup.group_mappings",
"field_value": {
"field_name": "NotificationGroupMapping.recipient",
"field_value": {
"field_name": "Recipient.email",
"field_value": "[email protected]",
"operator": "equals"
},
"operator": "has"
},
"operator": "any"
}]
# `order_by` can have multiple column names. `-` indicates arranging the results in `DESC` order.
order_by = ['-NotificationGroup.client_id']
# returns `results` dict containing `data` & `count`
results = Search(session, "models.notification_group", (NotificationGroup,),
filter_by=filter_by, order_by=order_by, page=1, per_page=5).results
# Above code snippet is equivalent to
results = session.query(NotificationGroup).filter(
NotificationGroup.group_mappings.any(
NotificationGroupMapping.recipient.has(
Recipient.email=='[email protected]'
)
)
).all()
Pull requests are welcome! Please create new pull requests from dev
branch.
- Support for
JSON
columns.