Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 44 additions & 12 deletions backend/apps/db/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
import os
import platform
import urllib.parse
from datetime import timedelta
from datetime import datetime, date, time, timedelta
from decimal import Decimal
from typing import Optional

import oracledb
import psycopg2
import pymssql
import re

from apps.db.db_sql import get_table_sql, get_field_sql, get_version_sql
from common.error import ParseSQLResultError
Expand Down Expand Up @@ -324,11 +323,14 @@ def get_schema(ds: CoreDatasource):
with get_session(ds) as session:
sql: str = ''
if equals_ignore_case(ds.type, "sqlServer"):
sql = """select name from sys.schemas"""
sql = """select name
from sys.schemas"""
elif equals_ignore_case(ds.type, "pg", "excel"):
sql = """SELECT nspname FROM pg_namespace"""
sql = """SELECT nspname
FROM pg_namespace"""
elif equals_ignore_case(ds.type, "oracle"):
sql = """select * from all_users"""
sql = """select *
from all_users"""
with session.execute(text(sql)) as result:
res = result.fetchall()
res_list = [item[0] for item in res]
Expand All @@ -338,15 +340,18 @@ def get_schema(ds: CoreDatasource):
if equals_ignore_case(ds.type, 'dm'):
with dmPython.connect(user=conf.username, password=conf.password, server=conf.host,
port=conf.port, **extra_config_dict) as conn, conn.cursor() as cursor:
cursor.execute("""select OBJECT_NAME from dba_objects where object_type='SCH'""", timeout=conf.timeout)
cursor.execute("""select OBJECT_NAME
from dba_objects
where object_type = 'SCH'""", timeout=conf.timeout)
res = cursor.fetchall()
res_list = [item[0] for item in res]
return res_list
elif equals_ignore_case(ds.type, 'redshift'):
with redshift_connector.connect(host=conf.host, port=conf.port, database=conf.database, user=conf.username,
password=conf.password,
timeout=conf.timeout, **extra_config_dict) as conn, conn.cursor() as cursor:
cursor.execute("""SELECT nspname FROM pg_namespace""")
cursor.execute("""SELECT nspname
FROM pg_namespace""")
res = cursor.fetchall()
res_list = [item[0] for item in res]
return res_list
Expand All @@ -355,7 +360,8 @@ def get_schema(ds: CoreDatasource):
password=conf.password,
options=f"-c statement_timeout={conf.timeout * 1000}",
**extra_config_dict) as conn, conn.cursor() as cursor:
cursor.execute("""SELECT nspname FROM pg_namespace""")
cursor.execute("""SELECT nspname
FROM pg_namespace""")
res = cursor.fetchall()
res_list = [item[0] for item in res]
return res_list
Expand Down Expand Up @@ -463,8 +469,16 @@ def get_fields(ds: CoreDatasource, table_name: str = None):
return res_list


def convert_value(value):
"""转换值为JSON可序列化的类型"""
def convert_value(value, datetime_format='space'):
"""
将Python值转换为JSON可序列化的类型

:param value: 要转换的值
:param datetime_format: 日期时间格式
'iso' - 2024-01-15T14:30:45 (ISO标准,带T)
'space' - 2024-01-15 14:30:45 (空格分隔,更常见)
'auto' - 自动选择
"""
if value is None:
return None
# 处理 bytes 类型(包括 BIT 字段)
Expand Down Expand Up @@ -504,8 +518,26 @@ def convert_value(value):
return str(value) # 或 value.total_seconds()
elif isinstance(value, Decimal):
return float(value)
elif hasattr(value, 'isoformat'): # 处理 datetime/date/time
return value.isoformat()
# 4. 处理 datetime
elif isinstance(value, datetime):
if datetime_format == 'iso':
return value.isoformat()
elif datetime_format == 'space':
return value.strftime('%Y-%m-%d %H:%M:%S')
else: # 'auto' 或其他
# 自动判断:没有时间部分只显示日期
if value.hour == 0 and value.minute == 0 and value.second == 0 and value.microsecond == 0:
return value.strftime('%Y-%m-%d')
else:
return value.strftime('%Y-%m-%d %H:%M:%S')

# 5. 处理 date
elif isinstance(value, date):
return value.isoformat() # 总是 YYYY-MM-DD

# 6. 处理 time
elif isinstance(value, time):
return str(value)
else:
return value

Expand Down