forked from jam-py/jam-py
-
-
Notifications
You must be signed in to change notification settings - Fork 4
IBM db2.py latest file
Platipus edited this page Sep 13, 2024
·
3 revisions
This is the file for table aliases. If expanding the DB connection details with Schema Name, the line:
result = 'SELECT %s FROM %s' % (start, end)
should be modified with, ie:
result = 'SELECT %s FROM %s.%s' % (start, schema, end)
import ibm_db
import ibm_db_dbi
from werkzeug._compat import iteritems, to_unicode
DATABASE = 'DB2'
NEED_DATABASE_NAME = True
NEED_LOGIN = True
NEED_PASSWORD = True
NEED_ENCODING = False
NEED_HOST = True
NEED_PORT = True
CAN_CHANGE_TYPE = False
CAN_CHANGE_SIZE = False
DDL_ROLLBACK = False
NEED_GENERATOR = False
#FROM = '"%s"'
FROM = '"%s" AS %s'
LEFT_OUTER_JOIN = 'LEFT OUTER JOIN "%s" AS %s'
FIELD_AS = 'AS'
LIKE = 'LIKE'
JAM_TYPES = TEXT, INTEGER, FLOAT, CURRENCY, DATE, DATETIME, BOOLEAN, LONGTEXT, KEYS, FILE, IMAGE = range(1, 12)
FIELD_TYPES = {
INTEGER: 'INT',
TEXT: 'VARCHAR',
FLOAT: 'DOUBLE',
CURRENCY: 'DOUBLE',
DATE: 'DATE',
DATETIME: 'DATETIME',
BOOLEAN: 'INT',
LONGTEXT: 'LONGTEXT',
KEYS: 'LONGTEXT',
FILE: 'LONGTEXT',
IMAGE: 'LONGTEXT'
}
def connect(database, user, password, host, port, encoding, server):
charset = None
use_unicode = None
if encoding:
charset = encoding
use_unicode = True
if port:
connection_str = "DATABASE=%s;HOSTNAME=%s;PORT=%s;PROTOCOL=TCPIP;UID=%s;PWD=%s;" % (database, host, port, user, password)
ibm_db_conn = ibm_db.connect(connection_str, "", "")
connection = ibm_db_dbi.Connection(ibm_db_conn)
else:
connection_str = "DATABASE=%s;HOSTNAME=%s;PORT=%s;PROTOCOL=TCPIP;UID=%s;PWD=%s;" % (database, host, port, user, password)
# connection.autocommit(False)
cursor = connection.cursor()
# cursor.execute("SET SESSION SQL_MODE=ANSI_QUOTES")
return connection
def get_lastrowid(cursor):
return cursor.lastrowid
def get_select(query, fields_clause, from_clause, where_clause, group_clause, order_clause, fields):
start = fields_clause
end = ''.join([from_clause, where_clause, group_clause, order_clause])
offset = query['__offset']
limit = query['__limit']
result = 'SELECT %s FROM %s' % (start, end)
if limit:
result += ' LIMIT %d, %d' % (offset, limit)
return result
def process_sql_params(params, cursor):
result = []
for p in params:
if type(p) == tuple:
value, data_type = p
else:
value = p
result.append(value)
return result
def process_sql_result(rows):
result = []
for row in rows:
new_row = []
for r in row:
if isinstance(r, bytes):
r = to_unicode(r, 'utf-8')
new_row.append(r)
result.append(new_row)
return result
def cast_date(date_str):
return "'" + date_str + "'"
def cast_datetime(datetime_str):
return "'" + datetime_str + "'"
def value_literal(index):
return '%s'
def convert_like(field_name, val, data_type):
return field_name, val
def create_table_sql(table_name, fields, gen_name=None, foreign_fields=None):
result = []
primary_key = ''
sql = 'CREATE TABLE "%s"\n(\n' % table_name
lines = []
for field in fields:
line = '"%s" %s' % (field['field_name'], FIELD_TYPES[field['data_type']])
if field['size'] != 0 and field['data_type'] == TEXT:
line += '(%d)' % field['size']
if field['default_value'] and not field['primary_key']:
if field['data_type'] == TEXT:
line += " DEFAULT '%s'" % field['default_value']
else:
line += ' DEFAULT %s' % field['default_value']
if field['primary_key']:
line += ' NOT NULL AUTO_INCREMENT'
primary_key = field['field_name']
lines.append(line)
if primary_key:
lines.append('PRIMARY KEY("%s")' % primary_key)
sql += ',\n'.join(lines)
sql += ')\n'
result.append(sql)
return result
def delete_table_sql(table_name, gen_name):
result = []
result.append('DROP TABLE "%s"' % table_name)
return result
def create_index_sql(index_name, table_name, unique, fields, desc):
return 'CREATE %s INDEX "%s" ON "%s" (%s)' % (unique, index_name, table_name, fields)
def create_foreign_index_sql(table_name, index_name, key, ref, primary_key):
return 'ALTER TABLE "%s" ADD CONSTRAINT "%s" FOREIGN KEY ("%s") REFERENCES "%s"("%s")' % \
(table_name, index_name, key, ref, primary_key)
def delete_index(table_name, index_name):
return 'DROP INDEX "%s" ON "%s"' % (index_name, table_name)
def delete_foreign_index(table_name, index_name):
return 'ALTER TABLE "%s" DROP FOREIGN KEY "%s"' % (table_name, index_name)
def add_field_sql(table_name, field):
result = 'ALTER TABLE "%s" ADD "%s" %s' % \
(table_name, field['field_name'], FIELD_TYPES[field['data_type']])
if field['size']:
result += '(%d)' % field['size']
if field['default_value']:
if field['data_type'] == TEXT:
result += " DEFAULT '%s'" % field['default_value']
else:
result += ' DEFAULT %s' % field['default_value']
return result
def del_field_sql(table_name, field):
return 'ALTER TABLE "%s" DROP "%s"' % (table_name, field['field_name'])
def change_field_sql(table_name, old_field, new_field):
result = []
if old_field['field_name'] != new_field['field_name']:
sql = 'ALTER TABLE "%s" CHANGE "%s" "%s" %s' % (table_name, old_field['field_name'],
new_field['field_name'], FIELD_TYPES[new_field['data_type']])
if old_field['size']:
sql += '(%d)' % old_field['size']
#~ if old_field['data_type'] != new_field['data_type'] or \
#~ old_field['size'] != new_field['size']:
#~ sql += ' %s' % FIELD_TYPES[field['data_type']]
#~ if new_field['size'] and old_field['size'] != new_field['size']:
#~ sql += '(%d)' % new_field['size']
result.append(sql)
if old_field['default_value'] != new_field['default_value']:
if new_field['default_value']:
if new_field['data_type'] == TEXT:
sql = 'ALTER TABLE "%s" ALTER "%s" SET DEFAULT' % \
(table_name, new_field['field_name'])
sql += " '%s'" % new_field['default_value']
else:
sql = 'ALTER TABLE "%s" ALTER "%s" SET DEFAULT %s' % \
(table_name, new_field['field_name'], new_field['default_value'])
else:
sql = 'ALTER TABLE "%s" ALTER "%s" DROP DEFAULT' % \
(table_name, new_field['field_name'])
result.append(sql)
return result
def param_literal():
return '%s'
def quotes():
return '`'
def next_sequence_value_sql(table_name):
return None
def restart_sequence_sql(table_name, value):
pass
def identifier_case(name):
return name.lower()
def get_table_names(connection):
cursor = connection.cursor()
cursor.execute("select tabname from syscat.tables where type = 'T'")
result = cursor.fetchall()
return [r[0] for r in result]
def get_table_info(connection, table_name, db_name):
cursor = connection.cursor()
sql = "Select \
c.colname as field_name, \
c.typename as data_type, \
c.length as size, \
default as default_value \
from syscat.columns c \
inner join syscat.tables t on \
t.tabschema = c.tabschema and t.tabname = c.tabname \
where t.type = 'T' and c.tabname='%s'" % table_name
cursor.execute(sql)
result = cursor.fetchall()
print(result)
fields = []
for (field_name, data_type, size, default_value) in result:
# try:
# pk = False
# if autoinc and key == 'PRI':
# pk = True
# data_type = type_size.split('(')[0].upper()
# size = type_size.split('(')[1].split(')')[0]
# if not data_type in ['VARCHAR', 'CHAR']:
# size = 0
# except:
# data_type = type_size
# size = 0
fields.append({
'field_name': field_name,
'data_type': data_type,
'size': size,
'default_value': default_value,
'pk': False
})
# for r in result:
# fields.append({
# 'field_name': r[1],
# 'data_type': r[2],
# 'size': 0,
# 'default_value': r[4],
# 'pk': r[5]==1
# })
return {'fields': fields, 'field_types': FIELD_TYPES}