Commit 8e946840 authored by Jalin's avatar Jalin

优化乘客检测

parent ed049578
...@@ -15,6 +15,9 @@ def main(): ...@@ -15,6 +15,9 @@ def main():
App.did_start() App.did_start()
App.run_check() App.run_check()
Query.check_before_fun()
####### 运行任务
User.run() User.run()
Query.run() Query.run()
if not Const.IS_TEST: if not Const.IS_TEST:
......
...@@ -27,13 +27,15 @@ class Event(): ...@@ -27,13 +27,15 @@ class Event():
job.destroy() job.destroy()
def user_loaded(self, data={}, callback=False): # 用户初始化完成 def user_loaded(self, data={}, callback=False): # 用户初始化完成
from py12306.query.query import Query
if Config().is_cluster_enabled() and not callback: if Config().is_cluster_enabled() and not callback:
return self.cluster.publish_event(self.KEY_USER_LOADED, data) # 通知其它节点退出 return self.cluster.publish_event(self.KEY_USER_LOADED, data) # 通知其它节点退出
from py12306.query.query import Query
query_job = Query.job_by_account_key(data.get('key')) if not Config().is_cluster_enabled() or Config().is_master():
if query_job and Config().is_master(): query = Query.wait_for_ready()
create_thread_and_run(query_job, 'check_passengers', Const.IS_TEST) # 检查乘客信息 防止提交订单时才检查 for job in query.jobs:
if job.account_key == data.get('key'):
create_thread_and_run(job, 'check_passengers', Const.IS_TEST) # 检查乘客信息 防止提交订单时才检查
def user_job_destroy(self, data={}, callback=False): def user_job_destroy(self, data={}, callback=False):
from py12306.user.user import User from py12306.user.user import User
......
...@@ -29,7 +29,7 @@ class QueryLog(BaseLog): ...@@ -29,7 +29,7 @@ class QueryLog(BaseLog):
MESSAGE_QUERY_LOG_OF_TRAIN_INFO = '{} {}' MESSAGE_QUERY_LOG_OF_TRAIN_INFO = '{} {}'
MESSAGE_QUERY_START_BY_DATE = '出发日期 {}: {} - {}' MESSAGE_QUERY_START_BY_DATE = '出发日期 {}: {} - {}'
MESSAGE_JOBS_DID_CHANGED = '\n任务已更新,正在重新加载...' MESSAGE_JOBS_DID_CHANGED = '任务已更新,正在重新加载...\n'
MESSAGE_SKIP_ORDER = '跳过本次请求,节点 {} 用户 {} 正在处理该订单\n' MESSAGE_SKIP_ORDER = '跳过本次请求,节点 {} 用户 {} 正在处理该订单\n'
......
...@@ -20,7 +20,7 @@ class UserLog(BaseLog): ...@@ -20,7 +20,7 @@ class UserLog(BaseLog):
MESSAGE_USER_HEARTBEAT_NORMAL = '用户 {} 心跳正常,下次检测 {} 秒后' MESSAGE_USER_HEARTBEAT_NORMAL = '用户 {} 心跳正常,下次检测 {} 秒后'
MESSAGE_GET_USER_PASSENGERS_FAIL = '获取用户乘客列表失败,错误原因: {} {} 秒后重试' MESSAGE_GET_USER_PASSENGERS_FAIL = '获取用户乘客列表失败,错误原因: {} {} 秒后重试'
MESSAGE_USER_PASSENGERS_IS_INVALID = '乘客信息校验失败,在账号 {} 中未找到该乘客: {}' MESSAGE_USER_PASSENGERS_IS_INVALID = '乘客信息校验失败,在账号 {} 中未找到该乘客: {}\n'
# MESSAGE_WAIT_USER_INIT_COMPLETE = '未找到可用账号或用户正在初始化,{} 秒后重试' # MESSAGE_WAIT_USER_INIT_COMPLETE = '未找到可用账号或用户正在初始化,{} 秒后重试'
...@@ -45,14 +45,14 @@ class UserLog(BaseLog): ...@@ -45,14 +45,14 @@ class UserLog(BaseLog):
:return: :return:
""" """
self = cls() self = cls()
self.add_quick_log('# 发现 {} 个用户 #'.format(len(users))) self.add_quick_log('# 发现 {} 个用户 #\n'.format(len(users)))
self.flush() self.flush()
return self return self
@classmethod @classmethod
def print_welcome_user(cls, user): def print_welcome_user(cls, user):
self = cls() self = cls()
self.add_quick_log('# 欢迎回来,{} #'.format(user.get_name())) self.add_quick_log('# 欢迎回来,{} #\n'.format(user.get_name()))
self.flush() self.flush()
return self return self
...@@ -67,6 +67,6 @@ class UserLog(BaseLog): ...@@ -67,6 +67,6 @@ class UserLog(BaseLog):
def print_user_passenger_init_success(cls, passengers): def print_user_passenger_init_success(cls, passengers):
self = cls() self = cls()
result = [passenger.get('name') + '(' + passenger.get('type_text') + ')' for passenger in passengers] result = [passenger.get('name') + '(' + passenger.get('type_text') + ')' for passenger in passengers]
self.add_quick_log('# 乘客验证成功 {} #'.format(', '.join(result))) self.add_quick_log('# 乘客验证成功 {} #\n'.format(', '.join(result)))
self.flush() self.flush()
return self return self
...@@ -23,6 +23,7 @@ class Query: ...@@ -23,6 +23,7 @@ class Query:
is_in_thread = False is_in_thread = False
retry_time = 3 retry_time = 3
is_ready = False
def __init__(self): def __init__(self):
self.session = Request() self.session = Request()
...@@ -48,9 +49,14 @@ class Query: ...@@ -48,9 +49,14 @@ class Query:
self.start() self.start()
pass pass
@classmethod
def check_before_fun(cls):
self = cls()
self.init_jobs()
self.is_ready = True
def start(self): def start(self):
# return # DEBUG # return # DEBUG
self.init_jobs()
QueryLog.init_data() QueryLog.init_data()
stay_second(3) stay_second(3)
# 多线程 # 多线程
...@@ -59,6 +65,7 @@ class Query: ...@@ -59,6 +65,7 @@ class Query:
if not self.is_in_thread: if not self.is_in_thread:
self.is_in_thread = True self.is_in_thread = True
create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST) create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST)
if Const.IS_TEST: return
stay_second(self.retry_time) stay_second(self.retry_time)
else: else:
if not self.jobs: break if not self.jobs: break
...@@ -105,6 +112,13 @@ class Query: ...@@ -105,6 +112,13 @@ class Query:
self.jobs.append(job) self.jobs.append(job)
return job return job
@classmethod
def wait_for_ready(cls):
self = cls()
if self.is_ready: return self
stay_second(self.retry_time)
return self.wait_for_ready()
@classmethod @classmethod
def job_by_name(cls, name) -> Job: def job_by_name(cls, name) -> Job:
self = cls() self = cls()
......
...@@ -26,6 +26,7 @@ class UserJob: ...@@ -26,6 +26,7 @@ class UserJob:
info = {} # 用户信息 info = {} # 用户信息
last_heartbeat = None last_heartbeat = None
is_ready = False is_ready = False
user_loaded = False # 用户是否已加载成功
passengers = [] passengers = []
retry_time = 3 retry_time = 3
...@@ -47,7 +48,6 @@ class UserJob: ...@@ -47,7 +48,6 @@ class UserJob:
self.key = str(info.get('key')) self.key = str(info.get('key'))
self.user_name = info.get('user_name') self.user_name = info.get('user_name')
self.password = info.get('password') self.password = info.get('password')
self.update_user()
def update_user(self): def update_user(self):
from py12306.user.user import User from py12306.user.user import User
...@@ -57,6 +57,7 @@ class UserJob: ...@@ -57,6 +57,7 @@ class UserJob:
def run(self): def run(self):
# load user # load user
self.update_user()
self.start() self.start()
def start(self): def start(self):
...@@ -85,6 +86,7 @@ class UserJob: ...@@ -85,6 +86,7 @@ class UserJob:
if not self.handle_login(): return if not self.handle_login(): return
self.is_ready = True self.is_ready = True
self.user_did_load()
message = UserLog.MESSAGE_USER_HEARTBEAT_NORMAL.format(self.get_name(), Config().USER_HEARTBEAT_INTERVAL) message = UserLog.MESSAGE_USER_HEARTBEAT_NORMAL.format(self.get_name(), Config().USER_HEARTBEAT_INTERVAL)
if not Config.is_cluster_enabled(): if not Config.is_cluster_enabled():
UserLog.add_quick_log(message).flush() UserLog.add_quick_log(message).flush()
...@@ -212,12 +214,21 @@ class UserJob: ...@@ -212,12 +214,21 @@ class UserJob:
UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER.format(self.user_name)).flush() UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER.format(self.user_name)).flush()
if self.check_user_is_login() and self.get_user_info(): if self.check_user_is_login() and self.get_user_info():
UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_SUCCESS.format(self.user_name)).flush() UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_SUCCESS.format(self.user_name)).flush()
Event().user_loaded({'key': self.key}) # 发布通知
self.is_ready = True self.is_ready = True
UserLog.print_welcome_user(self) UserLog.print_welcome_user(self)
self.user_did_load()
else: else:
UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_BUT_EXPIRED).flush() UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_BUT_EXPIRED).flush()
def user_did_load(self):
"""
用户已经加载成功
:return:
"""
if self.user_loaded: return
self.user_loaded = True
Event().user_loaded({'key': self.key}) # 发布通知
def get_user_info(self): def get_user_info(self):
response = self.session.get(API_USER_INFO.get('url')) response = self.session.get(API_USER_INFO.get('url'))
result = response.json() result = response.json()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment