-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsql_handler.py
More file actions
142 lines (128 loc) · 5.08 KB
/
sql_handler.py
File metadata and controls
142 lines (128 loc) · 5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import sqlite3
# 个人信息表
user_info_table = '''
CREATE TABLE IF NOT EXISTS user_info(
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_account TEXT NOT NULL UNIQUE,
user_password TEXT,
drop_time TEXT,
drop_item TEXT,
drop_num INTEGERL,
vac_status INTEGERL,
is_this_week_drop INTEGER,
rank INTEGER,
exp INTEGER,
session TEXT,
shared_secret TEXT,
map_info INTEGER DEFAULT 0,
country TEXT,
balance INTEGER DEFAULT 0
);
'''
# 定义普通箱子的名称数组
normal_boxes = [
"梦魇武器箱",
"千瓦武器箱",
"反冲武器箱",
"变革武器箱",
"裂空武器箱",
]
import sys
import os
def resource_path(relative_path):
""" 获取资源文件的绝对路径 """
base_path = getattr(sys, '_MEIPASS', os.path.dirname(os.path.abspath(__file__)))
return os.path.join(base_path, relative_path)
class SQLHandler:
def __init__(self, db_name='user_info.db'):
if sys.platform.startswith('darwin'):
db_name = resource_path(db_name)
self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor()
# 如果表不存在则创建表
self.cursor.execute(user_info_table)
self.conn.commit()
# 获取个人信息的掉落时间,根据掉落时间判断是否是本周掉落
def get_drop_time(self, user_account):
self.cursor.execute(
'SELECT drop_time FROM user_info WHERE user_account = ?',
(user_account,)
)
result = self.cursor.fetchone()
if result:
return result[0]
return None
def get_user_info(self, user_account):
self.cursor.execute(
'SELECT user_account, drop_item, drop_time, vac_status, is_this_week_drop, rank, exp, map_info,country,balance FROM user_info WHERE user_account = ?',
(user_account,)
)
result = self.cursor.fetchone()
if result:
return result
return None
def insert_or_update(self, user_account, user_password, drop_time, drop_item, drop_num, vac_status,
is_this_week_drop, rank,
exp, map_info,country,balance):
# 判断是否存在
if self.get_user_info(user_account):
print(f'{user_account} update')
self.cursor.execute(
'UPDATE user_info SET user_password = ?, drop_time = ?, drop_item = ?, drop_num = ?, vac_status = ?,'
'is_this_week_drop = ?, rank = ?, exp = ?, map_info = ?,country = ?,balance = ? WHERE user_account = ?',
(user_password, drop_time, drop_item, drop_num, vac_status, is_this_week_drop, rank, exp, map_info,country,balance, user_account)
)
else:
print(f'{user_account} insert')
self.cursor.execute(
'INSERT INTO user_info(user_account, user_password, drop_time, drop_item, drop_num, vac_status,'
'is_this_week_drop, rank, exp, map_info,country,balance) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?)',
(user_account, user_password, drop_time, drop_item, drop_num, vac_status, is_this_week_drop, rank, exp, map_info,country,balance)
)
self.conn.commit()
def get_all_accounts(self):
self.cursor.execute('SELECT user_account,user_password FROM user_info')
result = self.cursor.fetchall()
if result:
return result
return None
def set_account_session(self, user_account, session):
if self.get_user_info(user_account):
print(f'{user_account} session update')
self.cursor.execute(
'UPDATE user_info SET session = ? WHERE user_account = ?',
(session, user_account)
)
else:
print(f'{user_account} session insert')
self.cursor.execute(
'INSERT INTO user_info(user_account, session) VALUES(?, ?)',
(user_account, session)
)
self.conn.commit()
def get_account_session(self, user_account):
self.cursor.execute(
'SELECT session FROM user_info WHERE user_account = ?',
(user_account,)
)
result = self.cursor.fetchone()
if result:
return result[0]
return None
def get_rare_drop_accounts(self):
self.cursor.execute('SELECT user_account,user_password,drop_item FROM user_info WHERE is_this_week_drop = 1')
result = self.cursor.fetchall()
# foreach 每一条数据
rare_drop_accounts = []
for data in result:
if data[2].split(',')[0] not in normal_boxes:
rare_drop_accounts.append(data)
return rare_drop_accounts
def get_week_drop_eq_0(self):
self.cursor.execute('SELECT user_account,user_password,rank,exp FROM user_info WHERE is_this_week_drop = 0')
result = self.cursor.fetchall()
return result
def get_vac_accounts(self):
self.cursor.execute('SELECT user_account,user_password FROM user_info WHERE vac_status = 1')
result = self.cursor.fetchall()
return result