Skip to content

IBM db2.py latest file

Platipus edited this page Sep 13, 2024 · 3 revisions

This is the file for table aliases. If expanding the DB connection details with Schema Name, the line:

    result = 'SELECT %s FROM %s' % (start, end)

should be modified with, ie:

    result = 'SELECT %s FROM %s.%s' % (start, schema, end)
import ibm_db
import ibm_db_dbi

from werkzeug._compat import iteritems, to_unicode

DATABASE = 'DB2'
NEED_DATABASE_NAME = True
NEED_LOGIN = True
NEED_PASSWORD = True
NEED_ENCODING = False
NEED_HOST = True
NEED_PORT = True
CAN_CHANGE_TYPE = False
CAN_CHANGE_SIZE = False
DDL_ROLLBACK = False
NEED_GENERATOR = False

#FROM = '"%s"'
FROM = '"%s" AS %s'

LEFT_OUTER_JOIN = 'LEFT OUTER JOIN "%s" AS %s'
FIELD_AS = 'AS'
LIKE = 'LIKE'

JAM_TYPES = TEXT, INTEGER, FLOAT, CURRENCY, DATE, DATETIME, BOOLEAN, LONGTEXT, KEYS, FILE, IMAGE = range(1, 12)
FIELD_TYPES = {
    INTEGER: 'INT',
    TEXT: 'VARCHAR',
    FLOAT: 'DOUBLE',
    CURRENCY: 'DOUBLE',
    DATE: 'DATE',
    DATETIME: 'DATETIME',
    BOOLEAN: 'INT',
    LONGTEXT: 'LONGTEXT',
    KEYS: 'LONGTEXT',
    FILE: 'LONGTEXT',
    IMAGE: 'LONGTEXT'
}

def connect(database, user, password, host, port, encoding, server):
    charset = None
    use_unicode = None
    if encoding:
        charset = encoding
        use_unicode = True
    if port:
        connection_str = "DATABASE=%s;HOSTNAME=%s;PORT=%s;PROTOCOL=TCPIP;UID=%s;PWD=%s;" % (database, host, port, user, password)
        ibm_db_conn = ibm_db.connect(connection_str, "", "")
        connection = ibm_db_dbi.Connection(ibm_db_conn)

    else:
        connection_str = "DATABASE=%s;HOSTNAME=%s;PORT=%s;PROTOCOL=TCPIP;UID=%s;PWD=%s;" % (database, host, port, user, password)
#    connection.autocommit(False)
    cursor = connection.cursor()
#    cursor.execute("SET SESSION SQL_MODE=ANSI_QUOTES")
    return connection

def get_lastrowid(cursor):
    return cursor.lastrowid

def get_select(query, fields_clause, from_clause, where_clause, group_clause, order_clause, fields):
    start = fields_clause
    end = ''.join([from_clause, where_clause, group_clause, order_clause])
    offset = query['__offset']
    limit = query['__limit']
    result = 'SELECT %s FROM %s' % (start, end)
    if limit:
        result += ' LIMIT %d, %d' % (offset, limit)
    return result

def process_sql_params(params, cursor):
    result = []
    for p in params:
        if type(p) == tuple:
            value, data_type = p
        else:
            value = p
        result.append(value)
    return result

def process_sql_result(rows):
    result = []
    for row in rows:
        new_row = []
        for r in row:
            if isinstance(r, bytes):
                r = to_unicode(r, 'utf-8')
            new_row.append(r)
        result.append(new_row)
    return result


def cast_date(date_str):
    return "'" + date_str + "'"

def cast_datetime(datetime_str):
    return "'" + datetime_str + "'"

def value_literal(index):
    return '%s'

def convert_like(field_name, val, data_type):
    return field_name, val

def create_table_sql(table_name, fields, gen_name=None, foreign_fields=None):
    result = []
    primary_key = ''
    sql = 'CREATE TABLE "%s"\n(\n' % table_name
    lines = []
    for field in fields:
        line = '"%s" %s' % (field['field_name'], FIELD_TYPES[field['data_type']])
        if field['size'] != 0 and field['data_type'] == TEXT:
            line += '(%d)' % field['size']
        if field['default_value'] and not field['primary_key']:
            if field['data_type'] == TEXT:
                line += " DEFAULT '%s'" % field['default_value']
            else:
                line += ' DEFAULT %s' % field['default_value']
        if field['primary_key']:
            line += ' NOT NULL AUTO_INCREMENT'
            primary_key = field['field_name']
        lines.append(line)
    if primary_key:
        lines.append('PRIMARY KEY("%s")' % primary_key)
    sql += ',\n'.join(lines)
    sql += ')\n'
    result.append(sql)
    return result


def delete_table_sql(table_name, gen_name):
    result = []
    result.append('DROP TABLE "%s"' % table_name)
    return result

def create_index_sql(index_name, table_name, unique, fields, desc):
    return 'CREATE %s INDEX "%s" ON "%s" (%s)' % (unique, index_name, table_name, fields)

def create_foreign_index_sql(table_name, index_name, key, ref, primary_key):
    return 'ALTER TABLE "%s" ADD CONSTRAINT "%s" FOREIGN KEY ("%s") REFERENCES "%s"("%s")' % \
        (table_name, index_name, key, ref, primary_key)

def delete_index(table_name, index_name):
    return 'DROP INDEX "%s" ON "%s"' % (index_name, table_name)

def delete_foreign_index(table_name, index_name):
    return 'ALTER TABLE "%s" DROP FOREIGN KEY "%s"' % (table_name, index_name)

def add_field_sql(table_name, field):
    result = 'ALTER TABLE "%s" ADD "%s" %s' % \
        (table_name, field['field_name'], FIELD_TYPES[field['data_type']])
    if field['size']:
        result += '(%d)' % field['size']
    if field['default_value']:
        if field['data_type'] == TEXT:
            result += " DEFAULT '%s'" % field['default_value']
        else:
            result += ' DEFAULT %s' % field['default_value']
    return result

def del_field_sql(table_name, field):
    return 'ALTER TABLE "%s" DROP "%s"' % (table_name, field['field_name'])

def change_field_sql(table_name, old_field, new_field):
    result = []
    if old_field['field_name'] != new_field['field_name']:
        sql = 'ALTER TABLE "%s" CHANGE  "%s" "%s" %s' % (table_name, old_field['field_name'],
            new_field['field_name'], FIELD_TYPES[new_field['data_type']])
        if old_field['size']:
            sql += '(%d)' % old_field['size']
        #~ if old_field['data_type'] != new_field['data_type'] or \
            #~ old_field['size'] != new_field['size']:
            #~ sql += ' %s' % FIELD_TYPES[field['data_type']]
            #~ if new_field['size'] and old_field['size'] != new_field['size']:
                #~ sql += '(%d)' % new_field['size']
        result.append(sql)
    if old_field['default_value'] != new_field['default_value']:
        if new_field['default_value']:
            if new_field['data_type'] == TEXT:
                sql = 'ALTER TABLE "%s" ALTER "%s" SET DEFAULT' % \
                    (table_name, new_field['field_name'])
                sql +=  " '%s'" % new_field['default_value']
            else:
                sql = 'ALTER TABLE "%s" ALTER "%s" SET DEFAULT %s' % \
                    (table_name, new_field['field_name'], new_field['default_value'])
        else:
            sql = 'ALTER TABLE "%s" ALTER "%s" DROP DEFAULT' % \
                (table_name, new_field['field_name'])
        result.append(sql)
    return result

def param_literal():
    return '%s'

def quotes():
    return '`'

def next_sequence_value_sql(table_name):
    return None

def restart_sequence_sql(table_name, value):
    pass

def identifier_case(name):
    return name.lower()

def get_table_names(connection):
    cursor = connection.cursor()
    cursor.execute("select tabname from syscat.tables where type = 'T'")
    result = cursor.fetchall()
    return [r[0] for r in result]

def get_table_info(connection, table_name, db_name):
    cursor = connection.cursor()

    sql = "Select \
                   c.colname as field_name, \
                   c.typename as data_type, \
                   c.length as size, \
                   default as default_value \
            from syscat.columns c \
            inner join syscat.tables t on \
                  t.tabschema = c.tabschema and t.tabname = c.tabname \
            where t.type = 'T' and c.tabname='%s'" % table_name
    cursor.execute(sql)
    result = cursor.fetchall()
    print(result)
    fields = []
    for (field_name, data_type, size, default_value) in result:
#        try:
#            pk = False
#            if autoinc and key == 'PRI':
#                pk = True
#            data_type = type_size.split('(')[0].upper()
#            size = type_size.split('(')[1].split(')')[0]
#            if not data_type in ['VARCHAR', 'CHAR']:
#                size = 0
#        except:
#            data_type = type_size
#            size = 0
        fields.append({
            'field_name': field_name,
            'data_type': data_type,
            'size': size,
            'default_value': default_value,
            'pk': False
        })
#    for r in result:
#        fields.append({
#            'field_name': r[1],
#            'data_type': r[2],
#            'size': 0,
#            'default_value': r[4],
#            'pk': r[5]==1
#        })

    return {'fields': fields, 'field_types': FIELD_TYPES}
Clone this wiki locally