Commit 5de41150 authored by Jalin's avatar Jalin

优化事件处理

parent 6dc09572
...@@ -20,7 +20,8 @@ def main(): ...@@ -20,7 +20,8 @@ def main():
if not Const.IS_TEST: if not Const.IS_TEST:
while True: while True:
sleep(10000) sleep(10000)
else:
if Config().is_cluster_enabled(): stay_second(5) # 等待接受完通知
CommonLog.print_test_complete() CommonLog.print_test_complete()
......
...@@ -30,10 +30,6 @@ class Cluster(): ...@@ -30,10 +30,6 @@ class Cluster():
KEY_LOCK_DO_ORDER = 'lock_do_order' # 订单锁 KEY_LOCK_DO_ORDER = 'lock_do_order' # 订单锁
lock_do_order_time = 60 * 1 # 订单锁超时时间 lock_do_order_time = 60 * 1 # 订单锁超时时间
# 事件
KEY_EVENT_JOB_DESTROY = 'job_destroy'
KEY_EVENT_USER_LOADED = 'user_loaded'
KEY_MASTER = 1 KEY_MASTER = 1
KEY_SLAVE = 0 KEY_SLAVE = 0
...@@ -104,7 +100,7 @@ class Cluster(): ...@@ -104,7 +100,7 @@ class Cluster():
node_name = node_name if node_name else self.node_name node_name = node_name if node_name else self.node_name
self.session.hdel(self.KEY_NODES, node_name) self.session.hdel(self.KEY_NODES, node_name)
message = ClusterLog.MESSAGE_LEFT_CLUSTER.format(node_name, ClusterLog.get_print_nodes(self.get_nodes())) message = ClusterLog.MESSAGE_LEFT_CLUSTER.format(node_name, ClusterLog.get_print_nodes(self.get_nodes()))
self.publish_log_message(message) self.publish_log_message(message, node_name)
def make_nodes_as_slave(self): def make_nodes_as_slave(self):
""" """
...@@ -114,12 +110,13 @@ class Cluster(): ...@@ -114,12 +110,13 @@ class Cluster():
for node in self.nodes: for node in self.nodes:
self.session.hset(self.KEY_NODES, node, self.KEY_SLAVE) self.session.hset(self.KEY_NODES, node, self.KEY_SLAVE)
def publish_log_message(self, message): def publish_log_message(self, message, node_name=None):
""" """
发布订阅消息 发布订阅消息
:return: :return:
""" """
message = ClusterLog.MESSAGE_SUBSCRIBE_NOTIFICATION.format(self.node_name, message) node_name = node_name if node_name else self.node_name
message = ClusterLog.MESSAGE_SUBSCRIBE_NOTIFICATION.format(node_name, message)
self.session.publish(self.KEY_CHANNEL_LOG, message) self.session.publish(self.KEY_CHANNEL_LOG, message)
def publish_event(self, name, data={}): def publish_event(self, name, data={}):
...@@ -216,16 +213,10 @@ class Cluster(): ...@@ -216,16 +213,10 @@ class Cluster():
result = json.loads(message.get('data', {})) result = json.loads(message.get('data', {}))
event_name = result.get('event') event_name = result.get('event')
data = result.get('data') data = result.get('data')
from py12306.helpers.event import Event
from py12306.query.query import Query method = getattr(Event(), event_name)
from py12306.user.user import User if method:
if event_name == self.KEY_EVENT_JOB_DESTROY: # 停止查询任务 create_thread_and_run(Event(), event_name, Const.IS_TEST, kwargs={'data': data, 'callback': True})
job = Query.job_by_name(data['name'])
if job: job.destroy()
elif event_name == self.KEY_EVENT_USER_LOADED: # 用户初始化完成
query_job = Query.job_by_account_id(data['key'])
if query_job:
create_thread_and_run(query_job, 'check_passengers', Const.IS_TEST) # 检查乘客信息 防止提交订单时才检查
def get_lock(self, key, timeout=1, info={}): def get_lock(self, key, timeout=1, info={}):
timeout = int(time.time()) + timeout timeout = int(time.time()) + timeout
......
...@@ -79,14 +79,14 @@ class Config: ...@@ -79,14 +79,14 @@ class Config:
def start(self): def start(self):
self.save_to_remote() self.save_to_remote()
create_thread_and_run(self, 'refresh_configs', wait=False) create_thread_and_run(self, 'refresh_configs', wait=Const.IS_TEST)
def refresh_configs(self, once=False): def refresh_configs(self, once=False):
if not self.is_cluster_enabled(): return if not self.is_cluster_enabled(): return
while True: while True:
remote_configs = self.get_remote_config() remote_configs = self.get_remote_config()
self.update_configs_from_remote(remote_configs, once) self.update_configs_from_remote(remote_configs, once)
if once: break if once or Const.IS_TEST: return
stay_second(self.retry_time) stay_second(self.retry_time)
def get_remote_config(self): def get_remote_config(self):
......
from py12306.helpers.func import *
from py12306.config import Config
@singleton
class Event():
"""
处理事件
"""
# 事件
KEY_JOB_DESTROY = 'job_destroy'
KEY_USER_JOB_DESTROY = 'user_job_destroy'
KEY_USER_LOADED = 'user_loaded'
cluster = None
def __init__(self):
from py12306.cluster.cluster import Cluster
self.cluster = Cluster()
def job_destroy(self, data={}, callback=False): # 停止查询任务
from py12306.query.query import Query
if Config().is_cluster_enabled() and not callback:
return self.cluster.publish_event(self.KEY_JOB_DESTROY, data) # 通知其它节点退出
job = Query.job_by_name(data.get('name'))
if job:
job.destroy()
def user_loaded(self, data={}, callback=False): # 用户初始化完成
from py12306.query.query import Query
if Config().is_cluster_enabled() and not callback:
return self.cluster.publish_event(self.KEY_USER_LOADED, data) # 通知其它节点退出
query_job = Query.job_by_account_key(data.get('key'))
if query_job and Config().is_master():
create_thread_and_run(query_job, 'check_passengers', Const.IS_TEST) # 检查乘客信息 防止提交订单时才检查
def user_job_destroy(self, data={}, callback=False):
from py12306.user.user import User
if Config().is_cluster_enabled() and not callback:
return self.cluster.publish_event(self.KEY_JOB_DESTROY, data) # 通知其它节点退出
user = User.get_user(data.get('key'))
if user:
user.destroy()
...@@ -95,11 +95,11 @@ def time_int(): ...@@ -95,11 +95,11 @@ def time_int():
return int(time.time()) return int(time.time())
def create_thread_and_run(jobs, callback_name, wait=True, daemon=True, args=()): def create_thread_and_run(jobs, callback_name, wait=True, daemon=True, args=(), kwargs={}):
threads = [] threads = []
if not isinstance(jobs, list): jobs = [jobs] if not isinstance(jobs, list): jobs = [jobs]
for job in jobs: for job in jobs:
thread = threading.Thread(target=getattr(job, callback_name), args=args) thread = threading.Thread(target=getattr(job, callback_name), args=args, kwargs=kwargs)
thread.setDaemon(daemon) thread.setDaemon(daemon)
thread.start() thread.start()
threads.append(thread) threads.append(thread)
......
...@@ -36,6 +36,7 @@ class QueryLog(BaseLog): ...@@ -36,6 +36,7 @@ class QueryLog(BaseLog):
MESSAGE_QUERY_JOB_BEING_DESTROY = '当前查询任务 {} 已结束' MESSAGE_QUERY_JOB_BEING_DESTROY = '当前查询任务 {} 已结束'
MESSAGE_INIT_PASSENGERS_SUCCESS = '初始化乘客成功' MESSAGE_INIT_PASSENGERS_SUCCESS = '初始化乘客成功'
MESSAGE_CHECK_PASSENGERS = '正在验证乘客信息'
cluster = None cluster = None
......
...@@ -11,6 +11,7 @@ from py12306.helpers.func import * ...@@ -11,6 +11,7 @@ from py12306.helpers.func import *
from py12306.log.user_log import UserLog from py12306.log.user_log import UserLog
from py12306.order.order import Order from py12306.order.order import Order
from py12306.user.user import User from py12306.user.user import User
from py12306.helpers.event import Event
class Job: class Job:
...@@ -184,8 +185,7 @@ class Job: ...@@ -184,8 +185,7 @@ class Job:
# 任务已成功 通知集群停止任务 # 任务已成功 通知集群停止任务
if order_result: if order_result:
self.cluster.publish_event(Cluster.KEY_EVENT_JOB_DESTROY, {'name': self.job_name}) Event().job_destroy({'name': self.job_name})
self.destroy()
def do_order(self, user): def do_order(self, user):
self.check_passengers() self.check_passengers()
...@@ -248,8 +248,8 @@ class Job: ...@@ -248,8 +248,8 @@ class Job:
def check_passengers(self): def check_passengers(self):
if not self.passengers: if not self.passengers:
QueryLog.add_quick_log(QueryLog.MESSAGE_CHECK_PASSENGERS).flush()
self.set_passengers(User.get_passenger_for_members(self.members, self.account_key)) self.set_passengers(User.get_passenger_for_members(self.members, self.account_key))
QueryLog.add_quick_log(QueryLog.MESSAGE_INIT_PASSENGERS_SUCCESS)
return True return True
def refresh_station(self, station): def refresh_station(self, station):
......
...@@ -54,7 +54,10 @@ class Query: ...@@ -54,7 +54,10 @@ class Query:
if Config().QUERY_JOB_THREAD_ENABLED: # 多线程 if Config().QUERY_JOB_THREAD_ENABLED: # 多线程
create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST) create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST)
else: else:
while True: jobs_do(self.jobs, 'run') while True:
jobs_do(self.jobs, 'run')
if Const.IS_TEST: return
# while True: # while True:
# app_available_check() # app_available_check()
# if Config().QUERY_JOB_THREAD_ENABLED: # 多线程 # if Config().QUERY_JOB_THREAD_ENABLED: # 多线程
...@@ -83,9 +86,9 @@ class Query: ...@@ -83,9 +86,9 @@ class Query:
return objects_find_object_by_key_value(self.jobs, 'job_name', name) return objects_find_object_by_key_value(self.jobs, 'job_name', name)
@classmethod @classmethod
def job_by_account_id(cls, account_id) -> Job: def job_by_account_key(cls, account_key) -> Job:
self = cls() self = cls()
return objects_find_object_by_key_value(self.jobs, 'account_id', account_id) return objects_find_object_by_key_value(self.jobs, 'account_key', account_key)
# def get_jobs_from_cluster(self): # def get_jobs_from_cluster(self):
......
...@@ -7,6 +7,7 @@ from py12306.cluster.cluster import Cluster ...@@ -7,6 +7,7 @@ from py12306.cluster.cluster import Cluster
from py12306.helpers.api import * from py12306.helpers.api import *
from py12306.app import * from py12306.app import *
from py12306.helpers.auth_code import AuthCode from py12306.helpers.auth_code import AuthCode
from py12306.helpers.event import Event
from py12306.helpers.func import * from py12306.helpers.func import *
from py12306.helpers.request import Request from py12306.helpers.request import Request
from py12306.helpers.type import UserType from py12306.helpers.type import UserType
...@@ -213,7 +214,8 @@ class UserJob: ...@@ -213,7 +214,8 @@ class UserJob:
UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER.format(self.user_name)).flush() UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER.format(self.user_name)).flush()
if self.check_user_is_login() and self.get_user_info(): if self.check_user_is_login() and self.get_user_info():
UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_SUCCESS.format(self.user_name)).flush() UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_SUCCESS.format(self.user_name)).flush()
self.cluster.publish_event(Cluster.KEY_EVENT_USER_LOADED, {'key': self.key}) # 发布通知 Event().user_loaded({'key': self.key}) # 发布通知
self.is_ready = True
UserLog.print_welcome_user(self) UserLog.print_welcome_user(self)
else: else:
UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_BUT_EXPIRED).flush() UserLog.add_quick_log(UserLog.MESSAGE_LOADED_USER_BUT_EXPIRED).flush()
......
from py12306.app import * from py12306.app import *
from py12306.cluster.cluster import Cluster from py12306.cluster.cluster import Cluster
from py12306.helpers.event import Event
from py12306.helpers.func import * from py12306.helpers.func import *
from py12306.log.user_log import UserLog from py12306.log.user_log import UserLog
from py12306.user.job import UserJob from py12306.user.job import UserJob
...@@ -65,8 +66,7 @@ class User: ...@@ -65,8 +66,7 @@ class User:
for account in old: # 退出已删除的用户 for account in old: # 退出已删除的用户
if not array_dict_find_by_key_value(self.user_accounts, 'key', account.get('key')): if not array_dict_find_by_key_value(self.user_accounts, 'key', account.get('key')):
user = self.get_user(account.get('key')) Event().user_job_destroy({'key': account.get('key')})
user.destroy()
@classmethod @classmethod
def get_user(cls, key) -> UserJob: def get_user(cls, key) -> UserJob:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment