Commit c804de2e authored by Jalin's avatar Jalin

修复并发锁问题

parent 5de41150
...@@ -40,6 +40,8 @@ class App: ...@@ -40,6 +40,8 @@ class App:
@classmethod @classmethod
def did_start(cls): def did_start(cls):
self = cls() self = cls()
from py12306.helpers.station import Station
Station() # 防止多线程时初始化出现问题
# if Config.is_cluster_enabled(): # if Config.is_cluster_enabled():
# from py12306.cluster.cluster import Cluster # from py12306.cluster.cluster import Cluster
# Cluster().run() # Cluster().run()
...@@ -73,8 +75,8 @@ class App: ...@@ -73,8 +75,8 @@ class App:
if Config().USER_ACCOUNTS: if Config().USER_ACCOUNTS:
for account in Config().USER_ACCOUNTS: for account in Config().USER_ACCOUNTS:
if account: if account:
return True return False
return False return True
@classmethod @classmethod
def test_send_notifications(cls): def test_send_notifications(cls):
...@@ -90,10 +92,10 @@ class App: ...@@ -90,10 +92,10 @@ class App:
待优化 待优化
:return: :return:
""" """
if not cls.check_auto_code():
CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_AUTO_CODE_FAIL).flush(exit=True)
if not cls.check_user_account_is_empty(): if not cls.check_user_account_is_empty():
CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_EMPTY_USER_ACCOUNT).flush(exit=True) # CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_EMPTY_USER_ACCOUNT).flush(exit=True, publish=False) # 不填写用户则不自动下单
if not cls.check_auto_code():
CommonLog.add_quick_log(CommonLog.MESSAGE_CHECK_AUTO_CODE_FAIL).flush(exit=True, publish=False)
if Const.IS_TEST_NOTIFICATION: cls.test_send_notifications() if Const.IS_TEST_NOTIFICATION: cls.test_send_notifications()
......
...@@ -30,6 +30,9 @@ class Cluster(): ...@@ -30,6 +30,9 @@ class Cluster():
KEY_LOCK_DO_ORDER = 'lock_do_order' # 订单锁 KEY_LOCK_DO_ORDER = 'lock_do_order' # 订单锁
lock_do_order_time = 60 * 1 # 订单锁超时时间 lock_do_order_time = 60 * 1 # 订单锁超时时间
lock_prefix = 'lock_' # 锁键前缀
lock_info_prefix = 'info_'
KEY_MASTER = 1 KEY_MASTER = 1
KEY_SLAVE = 0 KEY_SLAVE = 0
...@@ -40,7 +43,6 @@ class Cluster(): ...@@ -40,7 +43,6 @@ class Cluster():
keep_alive_time = 3 # 报告存活间隔 keep_alive_time = 3 # 报告存活间隔
lost_alive_time = keep_alive_time * 2 lost_alive_time = keep_alive_time * 2
locks = []
nodes = {} nodes = {}
node_name = None node_name = None
is_ready = False is_ready = False
...@@ -222,25 +224,23 @@ class Cluster(): ...@@ -222,25 +224,23 @@ class Cluster():
timeout = int(time.time()) + timeout timeout = int(time.time()) + timeout
res = self.session.setnx(key, timeout) res = self.session.setnx(key, timeout)
if res: if res:
self.locks.append((key, timeout)) if info: self.session.set_dict(self.lock_info_prefix + key, info) # 存储额外信息
if info: self.session.set_dict(key + '_info', info) # 存储额外信息
return True return True
return False return False
def get_lock_info(self, key, default={}): def get_lock_info(self, key, default={}):
return self.session.get_dict(key + '_info', default=default) return self.session.get_dict(self.lock_info_prefix + key, default=default)
def release_lock(self, key): def release_lock(self, key):
self.session.delete(key) self.session.delete(key)
self.session.delete(key + '_info') self.session.delete(self.lock_info_prefix + key)
def check_locks(self): def check_locks(self):
index = 0 locks = self.session.keys(self.lock_prefix + '*')
for key, timeout in self.locks: for key in locks:
if timeout >= int(time.time()): val = self.session.get(key)
del self.locks[index] if val and int(val) <= time_int():
self.release_lock(key) self.release_lock(key)
index += 1
@classmethod @classmethod
def get_user_cookie(cls, key, default=None): def get_user_cookie(cls, key, default=None):
......
...@@ -8,6 +8,8 @@ from py12306.helpers.func import * ...@@ -8,6 +8,8 @@ from py12306.helpers.func import *
@singleton @singleton
class Config: class Config:
IS_DEBUG = False
USER_ACCOUNTS = [] USER_ACCOUNTS = []
# 查询任务 # 查询任务
QUERY_JOBS = [] QUERY_JOBS = []
......
...@@ -44,16 +44,11 @@ class Notification(): ...@@ -44,16 +44,11 @@ class Notification():
method='GET', headers={ method='GET', headers={
'Authorization': 'APPCODE {}'.format(appcode) 'Authorization': 'APPCODE {}'.format(appcode)
}) })
response_message = '-' result = response.json()
result = {} response_message = result.get('showapi_res_body.remark')
try: if response.status_code in [400, 401, 403]:
result = response.json()
response_message = result['showapi_res_body']['remark']
except:
pass
if response.status_code == 401 or response.status_code == 403:
return CommonLog.add_quick_log(CommonLog.MESSAGE_VOICE_API_FORBID).flush() return CommonLog.add_quick_log(CommonLog.MESSAGE_VOICE_API_FORBID).flush()
if response.status_code == 200 and 'showapi_res_body' in result and result['showapi_res_body'].get('flag'): if response.status_code == 200 and result.get('showapi_res_body.flag'):
CommonLog.add_quick_log(CommonLog.MESSAGE_VOICE_API_SEND_SUCCESS.format(response_message)).flush() CommonLog.add_quick_log(CommonLog.MESSAGE_VOICE_API_SEND_SUCCESS.format(response_message)).flush()
return True return True
else: else:
......
...@@ -7,6 +7,7 @@ from py12306.helpers.func import * ...@@ -7,6 +7,7 @@ from py12306.helpers.func import *
@singleton @singleton
class Station: class Station:
stations = [] stations = []
station_kvs = {}
def __init__(self): def __init__(self):
if path.exists(Config().STATION_FILE): if path.exists(Config().STATION_FILE):
...@@ -20,6 +21,7 @@ class Station: ...@@ -20,6 +21,7 @@ class Station:
'pinyin': tmp_info[3], 'pinyin': tmp_info[3],
'id': tmp_info[5] 'id': tmp_info[5]
}) })
self.station_kvs[tmp_info[1]] = tmp_info[2]
@classmethod @classmethod
def get_station_by_name(cls, name): def get_station_by_name(cls, name):
...@@ -35,7 +37,8 @@ class Station: ...@@ -35,7 +37,8 @@ class Station:
@classmethod @classmethod
def get_station_key_by_name(cls, name): def get_station_key_by_name(cls, name):
return cls.get_station_by_name(name).get('key') self = cls()
return self.station_kvs[name]
@classmethod @classmethod
def get_station_name_by_key(cls, key): def get_station_name_by_key(cls, key):
......
...@@ -33,11 +33,14 @@ class QueryLog(BaseLog): ...@@ -33,11 +33,14 @@ class QueryLog(BaseLog):
MESSAGE_SKIP_ORDER = '跳过本次请求,节点 {} 用户 {} 正在处理该订单\n' MESSAGE_SKIP_ORDER = '跳过本次请求,节点 {} 用户 {} 正在处理该订单\n'
MESSAGE_QUERY_JOB_BEING_DESTROY = '当前查询任务 {} 已结束' MESSAGE_QUERY_JOB_BEING_DESTROY = '当前查询任务 {} 已结束\n'
MESSAGE_INIT_PASSENGERS_SUCCESS = '初始化乘客成功' MESSAGE_INIT_PASSENGERS_SUCCESS = '初始化乘客成功'
MESSAGE_CHECK_PASSENGERS = '正在验证乘客信息' MESSAGE_CHECK_PASSENGERS = '正在验证乘客信息'
MESSAGE_USER_IS_EMPTY_WHEN_DO_ORDER = '未配置自动下单账号,{} 秒后继续查询\n'
MESSAGE_ORDER_USER_IS_EMPTY = '未找到下单账号,{} 秒后继续查询'
cluster = None cluster = None
def __init__(self): def __init__(self):
......
...@@ -9,8 +9,6 @@ from py12306.helpers.type import UserType ...@@ -9,8 +9,6 @@ from py12306.helpers.type import UserType
from py12306.log.order_log import OrderLog from py12306.log.order_log import OrderLog
class Order: class Order:
""" """
处理下单 处理下单
...@@ -51,6 +49,9 @@ class Order: ...@@ -51,6 +49,9 @@ class Order:
下单模式 暂时不清楚,使用正常步骤下单 下单模式 暂时不清楚,使用正常步骤下单
:return: :return:
""" """
# Debug
if Config().IS_DEBUG:
return random.randint(0, 10) > 7
return self.normal_order() return self.normal_order()
def normal_order(self): def normal_order(self):
......
...@@ -96,6 +96,7 @@ class Job: ...@@ -96,6 +96,7 @@ class Job:
self.left_date = date self.left_date = date
response = self.query_by_date(date) response = self.query_by_date(date)
self.handle_response(response) self.handle_response(response)
if not self.is_alive: return
self.safe_stay() self.safe_stay()
if is_main_thread(): if is_main_thread():
QueryLog.flush(sep='\t\t', publish=False) QueryLog.flush(sep='\t\t', publish=False)
...@@ -143,6 +144,7 @@ class Job: ...@@ -143,6 +144,7 @@ class Job:
allow_seats = self.allow_seats if self.allow_seats else list( allow_seats = self.allow_seats if self.allow_seats else list(
Config.SEAT_TYPES.values()) # 未设置 则所有可用 TODO 合法检测 Config.SEAT_TYPES.values()) # 未设置 则所有可用 TODO 合法检测
self.handle_seats(allow_seats, ticket_info) self.handle_seats(allow_seats, ticket_info)
if not self.is_alive: return
def handle_seats(self, allow_seats, ticket_info): def handle_seats(self, allow_seats, ticket_info):
for seat in allow_seats: # 检查座位是否有票 for seat in allow_seats: # 检查座位是否有票
...@@ -166,12 +168,21 @@ class Job: ...@@ -166,12 +168,21 @@ class Job:
QueryLog.print_ticket_available(left_date=self.get_info_of_left_date(), QueryLog.print_ticket_available(left_date=self.get_info_of_left_date(),
train_number=self.get_info_of_train_number(), train_number=self.get_info_of_train_number(),
rest_num=ticket_of_seat) rest_num=ticket_of_seat)
if User.is_empty():
QueryLog.add_quick_log(QueryLog.MESSAGE_USER_IS_EMPTY_WHEN_DO_ORDER.format(self.retry_time))
return stay_second(self.retry_time)
order_result = False order_result = False
user = self.get_user() user = self.get_user()
if not user:
QueryLog.add_quick_log(QueryLog.MESSAGE_ORDER_USER_IS_EMPTY.format(self.retry_time))
return stay_second(self.retry_time)
lock_id = Cluster.KEY_LOCK_DO_ORDER + '_' + user.key lock_id = Cluster.KEY_LOCK_DO_ORDER + '_' + user.key
if Config().is_cluster_enabled(): if Config().is_cluster_enabled():
if self.cluster.get_lock(lock_id, Cluster.lock_do_order_time, if self.cluster.get_lock(lock_id, Cluster.lock_do_order_time,
{'node': self.cluster.node_name}): # 获得下单锁 {'node': self.cluster.node_name}): # 获得下单锁
QueryLog.add_quick_log('拿到锁' + lock_id).flush()
order_result = self.do_order(user) order_result = self.do_order(user)
if not order_result: # 下单失败,解锁 if not order_result: # 下单失败,解锁
self.cluster.release_lock(lock_id) self.cluster.release_lock(lock_id)
...@@ -222,9 +233,13 @@ class Job: ...@@ -222,9 +233,13 @@ class Job:
退出任务 退出任务
:return: :return:
""" """
from py12306.query.query import Query
QueryLog.add_quick_log(QueryLog.MESSAGE_QUERY_JOB_BEING_DESTROY.format(self.job_name)).flush() QueryLog.add_quick_log(QueryLog.MESSAGE_QUERY_JOB_BEING_DESTROY.format(self.job_name)).flush()
# sys.exit(1) # 无法退出线程... # sys.exit(1) # 无法退出线程...
self.is_alive = False self.is_alive = False
# 手动移出jobs 防止单线程死循环
index = Query().jobs.index(self)
Query().jobs.pop(index)
def safe_stay(self): def safe_stay(self):
interval = get_interval_num(self.interval) interval = get_interval_num(self.interval)
...@@ -241,9 +256,9 @@ class Job: ...@@ -241,9 +256,9 @@ class Job:
def get_user(self): def get_user(self):
user = User.get_user(self.account_key) user = User.get_user(self.account_key)
if not user.check_is_ready(): # if not user.check_is_ready(): # 这里不需要检测了,后面获取乘客时已经检测过
# TODO user is not ready # #
pass # pass
return user return user
def check_passengers(self): def check_passengers(self):
......
...@@ -48,13 +48,13 @@ class Query: ...@@ -48,13 +48,13 @@ class Query:
# return # DEBUG # return # DEBUG
self.init_jobs() self.init_jobs()
QueryLog.init_data() QueryLog.init_data()
app_available_check()
stay_second(3) stay_second(3)
# 多线程 # 多线程
if Config().QUERY_JOB_THREAD_ENABLED: # 多线程 if Config().QUERY_JOB_THREAD_ENABLED: # 多线程
create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST) create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST)
else: else:
while True: while True:
if not self.jobs: break
jobs_do(self.jobs, 'run') jobs_do(self.jobs, 'run')
if Const.IS_TEST: return if Const.IS_TEST: return
...@@ -90,7 +90,6 @@ class Query: ...@@ -90,7 +90,6 @@ class Query:
self = cls() self = cls()
return objects_find_object_by_key_value(self.jobs, 'account_key', account_key) return objects_find_object_by_key_value(self.jobs, 'account_key', account_key)
# def get_jobs_from_cluster(self): # def get_jobs_from_cluster(self):
# jobs = self.cluster.session.get_dict(Cluster.KEY_JOBS) # jobs = self.cluster.session.get_dict(Cluster.KEY_JOBS)
# return jobs # return jobs
......
...@@ -83,6 +83,7 @@ class UserJob: ...@@ -83,6 +83,7 @@ class UserJob:
return True return True
# 只有主节点才能走到这 # 只有主节点才能走到这
if self.is_first_time() or not self.check_user_is_login(): if self.is_first_time() or not self.check_user_is_login():
a = 1
self.is_ready = False self.is_ready = False
if not self.handle_login(): return if not self.handle_login(): return
...@@ -103,7 +104,7 @@ class UserJob: ...@@ -103,7 +104,7 @@ class UserJob:
def set_last_heartbeat(self): def set_last_heartbeat(self):
if Config().is_cluster_enabled(): if Config().is_cluster_enabled():
return self.cluster.session.set(Cluster.KEY_USER_LAST_HEARTBEAT, time_int()) return self.cluster.session.set(Cluster.KEY_USER_LAST_HEARTBEAT, time_int())
self.last_heartbeat = time_now() self.last_heartbeat = time_int()
# def init_cookies # def init_cookies
def is_first_time(self): def is_first_time(self):
...@@ -227,6 +228,7 @@ class UserJob: ...@@ -227,6 +228,7 @@ class UserJob:
# 子节点访问会导致主节点登录失效 TODO 可快考虑实时同步 cookie # 子节点访问会导致主节点登录失效 TODO 可快考虑实时同步 cookie
if user_data: if user_data:
self.update_user_info({**user_data, **{'user_name': user_data.get('name')}}) self.update_user_info({**user_data, **{'user_name': user_data.get('name')}})
self.save_user()
return True return True
return None return None
......
...@@ -68,6 +68,11 @@ class User: ...@@ -68,6 +68,11 @@ class User:
if not array_dict_find_by_key_value(self.user_accounts, 'key', account.get('key')): if not array_dict_find_by_key_value(self.user_accounts, 'key', account.get('key')):
Event().user_job_destroy({'key': account.get('key')}) Event().user_job_destroy({'key': account.get('key')})
@classmethod
def is_empty(cls):
self = cls()
return not bool(self.users)
@classmethod @classmethod
def get_user(cls, key) -> UserJob: def get_user(cls, key) -> UserJob:
self = cls() self = cls()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment