Commit a87f10c8 authored by Jalin's avatar Jalin

增加动态加载配置

parent da469f3e
...@@ -99,6 +99,8 @@ docker run -d -v $(pwd):/config -v py12306:/data pjialin/py12306 ...@@ -99,6 +99,8 @@ docker run -d -v $(pwd):/config -v py12306:/data pjialin/py12306
## 更新 ## 更新
### 19-01-10 ### 19-01-10
* 支持分布式集群 * 支持分布式集群
### 19-01-11
* 配置文件支持动态修改
## 下单成功截图 ## 下单成功截图
![下单成功图片](./data/images/order_success.png) ![下单成功图片](./data/images/order_success.png)
......
...@@ -11,6 +11,8 @@ from py12306.log.order_log import OrderLog ...@@ -11,6 +11,8 @@ from py12306.log.order_log import OrderLog
def app_available_check(): def app_available_check():
# return True # Debug # return True # Debug
if Config().IS_DEBUG:
return True
now = time_now() now = time_now()
if now.hour >= 23 or now.hour < 6: if now.hour >= 23 or now.hour < 6:
CommonLog.add_quick_log(CommonLog.MESSAGE_12306_IS_CLOSED.format(time_now())).flush() CommonLog.add_quick_log(CommonLog.MESSAGE_12306_IS_CLOSED.format(time_now())).flush()
......
...@@ -45,7 +45,7 @@ class Config: ...@@ -45,7 +45,7 @@ class Config:
NOTIFICATION_API_APP_CODE = '' NOTIFICATION_API_APP_CODE = ''
# 集群配置 # 集群配置
CLUSTER_ENABLED = 1 CLUSTER_ENABLED = 0
NODE_SLAVE_CAN_BE_MASTER = 1 NODE_SLAVE_CAN_BE_MASTER = 1
NODE_IS_MASTER = 1 NODE_IS_MASTER = 1
NODE_NAME = '' NODE_NAME = ''
...@@ -55,6 +55,7 @@ class Config: ...@@ -55,6 +55,7 @@ class Config:
envs = [] envs = []
retry_time = 5 retry_time = 5
last_modify_time = 0
disallow_update_cofigs = [ disallow_update_cofigs = [
'CLUSTER_ENABLED', 'CLUSTER_ENABLED',
...@@ -67,8 +68,11 @@ class Config: ...@@ -67,8 +68,11 @@ class Config:
def __init__(self): def __init__(self):
self.init_envs() self.init_envs()
self.last_modify_time = get_file_modify_time(self.CONFIG_FILE)
if Config().is_slave(): if Config().is_slave():
self.refresh_configs(True) self.refresh_configs(True)
else:
create_thread_and_run(self, 'watch_file_change', False)
@classmethod @classmethod
def run(cls): def run(cls):
...@@ -109,6 +113,24 @@ class Config: ...@@ -109,6 +113,24 @@ class Config:
for key, value in envs: for key, value in envs:
setattr(self, key, value) setattr(self, key, value)
def watch_file_change(self):
"""
监听配置文件修改
:return:
"""
if Config().is_slave(): return
from py12306.log.common_log import CommonLog
while True:
value = get_file_modify_time(self.CONFIG_FILE)
if value > self.last_modify_time:
self.last_modify_time = value
CommonLog.add_quick_log(CommonLog.MESSAGE_CONFIG_FILE_DID_CHANGED).flush()
envs = EnvLoader.load_with_file(self.CONFIG_FILE)
self.update_configs_from_remote(envs)
if Config().is_master(): # 保存配置
self.save_to_remote()
stay_second(self.retry_time)
def update_configs_from_remote(self, envs, first=False): def update_configs_from_remote(self, envs, first=False):
if envs == self.envs: return if envs == self.envs: return
from py12306.query.query import Query from py12306.query.query import Query
...@@ -148,9 +170,12 @@ class Config: ...@@ -148,9 +170,12 @@ class Config:
# return members # return members
class EnvLoader(): class EnvLoader:
envs = [] envs = []
def __init__(self):
self.envs = [] # 不是单例不初始化怎么还会有值
@classmethod @classmethod
def load_with_file(cls, file): def load_with_file(cls, file):
self = cls() self = cls()
...@@ -161,4 +186,6 @@ class EnvLoader(): ...@@ -161,4 +186,6 @@ class EnvLoader():
return self.envs return self.envs
def __setattr__(self, key, value): def __setattr__(self, key, value):
self.envs.append(([key, value])) super().__setattr__(key, value)
if re.search(r'^[A-Z]+_', key):
self.envs.append(([key, value]))
import datetime import datetime
import hashlib
import json
import os
import random import random
import threading import threading
import functools import functools
...@@ -87,6 +90,16 @@ def time_now(): ...@@ -87,6 +90,16 @@ def time_now():
return datetime.datetime.now() return datetime.datetime.now()
def timestamp_to_time(timestamp):
time_struct = time.localtime(timestamp)
return time.strftime('%Y-%m-%d %H:%M:%S', time_struct)
def get_file_modify_time(filePath):
timestamp = os.path.getmtime(filePath)
return timestamp_to_time(timestamp)
def str_to_time(str): def str_to_time(str):
return datetime.datetime.strptime(str, '%Y-%m-%d %H:%M:%S.%f') return datetime.datetime.strptime(str, '%Y-%m-%d %H:%M:%S.%f')
...@@ -160,6 +173,10 @@ def available_value(value): ...@@ -160,6 +173,10 @@ def available_value(value):
return str(value) return str(value)
def md5(value):
return hashlib.md5(json.dumps(value).encode()).hexdigest()
@singleton @singleton
class Const: class Const:
IS_TEST = False IS_TEST = False
......
...@@ -30,7 +30,7 @@ class BaseLog: ...@@ -30,7 +30,7 @@ class BaseLog:
self = cls() self = cls()
logs = self.get_logs() logs = self.get_logs()
# 输出到文件 # 输出到文件
if file == None and Config().OUT_PUT_LOG_TO_FILE_ENABLED: # TODO 文件无法写入友好提示 if file == None and Config().OUT_PUT_LOG_TO_FILE_ENABLED and not Const.IS_TEST: # TODO 文件无法写入友好提示
file = open(Config().OUT_PUT_LOG_TO_FILE_PATH, 'a', encoding='utf-8') file = open(Config().OUT_PUT_LOG_TO_FILE_PATH, 'a', encoding='utf-8')
if not file: file = None if not file: file = None
# 输出日志到各个节点 # 输出日志到各个节点
...@@ -56,7 +56,7 @@ class BaseLog: ...@@ -56,7 +56,7 @@ class BaseLog:
logs = self.thread_logs.get(current_thread_id()) logs = self.thread_logs.get(current_thread_id())
return logs return logs
def empty_logs(self, logs): def empty_logs(self, logs=None):
if self.quick_log: if self.quick_log:
self.quick_log = [] self.quick_log = []
else: else:
......
...@@ -23,6 +23,8 @@ class CommonLog(BaseLog): ...@@ -23,6 +23,8 @@ class CommonLog(BaseLog):
MESSAGE_TEST_SEND_VOICE_CODE = '正在测试发送语音验证码...' MESSAGE_TEST_SEND_VOICE_CODE = '正在测试发送语音验证码...'
MESSAGE_CONFIG_FILE_DID_CHANGED = '配置文件已修改,正在重新加载中'
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.init_data() self.init_data()
......
...@@ -52,7 +52,7 @@ class QueryLog(BaseLog): ...@@ -52,7 +52,7 @@ class QueryLog(BaseLog):
def init_data(cls): def init_data(cls):
self = cls() self = cls()
# 获取上次记录 # 获取上次记录
if Const.IS_TEST: return # if Const.IS_TEST: return
result = False result = False
if not Config.is_cluster_enabled() and path.exists(self.data_path): if not Config.is_cluster_enabled() and path.exists(self.data_path):
with open(self.data_path, encoding='utf-8') as f: with open(self.data_path, encoding='utf-8') as f:
...@@ -152,10 +152,10 @@ class QueryLog(BaseLog): ...@@ -152,10 +152,10 @@ class QueryLog(BaseLog):
@classmethod @classmethod
def print_job_start(cls, job_name): def print_job_start(cls, job_name):
self = cls() self = cls()
self.refresh_data()
self.add_log( self.add_log(
'=== 正在进行第 {query_count} 次查询 {job_name} === {time}'.format(query_count=self.data.get('query_count'), '=== 正在进行第 {query_count} 次查询 {job_name} === {time}'.format(query_count=self.data.get('query_count') + 1,
job_name=job_name, time=datetime.datetime.now())) job_name=job_name, time=datetime.datetime.now()))
self.refresh_data()
if is_main_thread(): if is_main_thread():
self.flush(publish=False) self.flush(publish=False)
return self return self
......
...@@ -18,6 +18,7 @@ class Job: ...@@ -18,6 +18,7 @@ class Job:
""" """
查询任务 查询任务
""" """
id = 0
is_alive = True is_alive = True
job_name = None job_name = None
left_dates = [] left_dates = []
...@@ -56,15 +57,18 @@ class Job: ...@@ -56,15 +57,18 @@ class Job:
def __init__(self, info, query): def __init__(self, info, query):
self.cluster = Cluster() self.cluster = Cluster()
self.query = query
self.init_data(info)
self.update_interval()
def init_data(self, info):
self.id = md5(info)
self.left_dates = info.get('left_dates') self.left_dates = info.get('left_dates')
# 多车站已放在下面处理
# self.left_station = info.get('stations').get('left')
# self.arrive_station = info.get('stations').get('arrive')
# self.left_station_code = Station.get_station_key_by_name(self.left_station)
# self.arrive_station_code = Station.get_station_key_by_name(self.arrive_station)
self.stations = info.get('stations') self.stations = info.get('stations')
self.stations = [self.stations] if isinstance(self.stations, dict) else self.stations self.stations = [self.stations] if isinstance(self.stations, dict) else self.stations
self.job_name = info.get('job_name', '{} -> {}'.format(self.stations[0]['left'], self.stations[0]['arrive'])) if not self.job_name: # name 不能被修改
self.job_name = info.get('job_name',
'{} -> {}'.format(self.stations[0]['left'], self.stations[0]['arrive']))
self.account_key = str(info.get('account_key')) self.account_key = str(info.get('account_key'))
self.allow_seats = info.get('seats') self.allow_seats = info.get('seats')
...@@ -74,8 +78,8 @@ class Job: ...@@ -74,8 +78,8 @@ class Job:
self.member_num_take = self.member_num self.member_num_take = self.member_num
self.allow_less_member = bool(info.get('allow_less_member')) self.allow_less_member = bool(info.get('allow_less_member'))
self.interval = query.interval def update_interval(self):
self.query = query self.interval = self.query.interval
def run(self): def run(self):
self.start() self.start()
...@@ -100,7 +104,7 @@ class Job: ...@@ -100,7 +104,7 @@ class Job:
self.safe_stay() self.safe_stay()
if is_main_thread(): if is_main_thread():
QueryLog.flush(sep='\t\t', publish=False) QueryLog.flush(sep='\t\t', publish=False)
if is_main_thread(): if not Config().QUERY_JOB_THREAD_ENABLED:
QueryLog.add_quick_log('').flush(publish=False) QueryLog.add_quick_log('').flush(publish=False)
break break
else: else:
...@@ -233,9 +237,9 @@ class Job: ...@@ -233,9 +237,9 @@ class Job:
:return: :return:
""" """
from py12306.query.query import Query from py12306.query.query import Query
self.is_alive = False
QueryLog.add_quick_log(QueryLog.MESSAGE_QUERY_JOB_BEING_DESTROY.format(self.job_name)).flush() QueryLog.add_quick_log(QueryLog.MESSAGE_QUERY_JOB_BEING_DESTROY.format(self.job_name)).flush()
# sys.exit(1) # 无法退出线程... # sys.exit(1) # 无法退出线程...
self.is_alive = False
# 手动移出jobs 防止单线程死循环 # 手动移出jobs 防止单线程死循环
index = Query().jobs.index(self) index = Query().jobs.index(self)
Query().jobs.pop(index) Query().jobs.pop(index)
......
...@@ -21,6 +21,9 @@ class Query: ...@@ -21,6 +21,9 @@ class Query:
interval = {} interval = {}
cluster = None cluster = None
is_in_thread = False
retry_time = 3
def __init__(self): def __init__(self):
self.session = Request() self.session = Request()
self.cluster = Cluster() self.cluster = Cluster()
...@@ -29,13 +32,14 @@ class Query: ...@@ -29,13 +32,14 @@ class Query:
def update_query_interval(self, auto=False): def update_query_interval(self, auto=False):
self.interval = init_interval_by_number(Config().QUERY_INTERVAL) self.interval = init_interval_by_number(Config().QUERY_INTERVAL)
if auto:
jobs_do(self.jobs, 'update_interval')
def update_query_jobs(self, auto=False): def update_query_jobs(self, auto=False):
self.query_jobs = Config().QUERY_JOBS self.query_jobs = Config().QUERY_JOBS
if auto: if auto:
self.jobs = []
QueryLog.add_quick_log(QueryLog.MESSAGE_JOBS_DID_CHANGED).flush() QueryLog.add_quick_log(QueryLog.MESSAGE_JOBS_DID_CHANGED).flush()
self.init_jobs() self.refresh_jobs()
@classmethod @classmethod
def run(cls): def run(cls):
...@@ -50,11 +54,15 @@ class Query: ...@@ -50,11 +54,15 @@ class Query:
QueryLog.init_data() QueryLog.init_data()
stay_second(3) stay_second(3)
# 多线程 # 多线程
if Config().QUERY_JOB_THREAD_ENABLED: # 多线程 while True:
create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST) if Config().QUERY_JOB_THREAD_ENABLED: # 多线程
else: if not self.is_in_thread:
while True: self.is_in_thread = True
create_thread_and_run(jobs=self.jobs, callback_name='run', wait=Const.IS_TEST)
stay_second(self.retry_time)
else:
if not self.jobs: break if not self.jobs: break
self.is_in_thread = False
jobs_do(self.jobs, 'run') jobs_do(self.jobs, 'run')
if Const.IS_TEST: return if Const.IS_TEST: return
...@@ -67,12 +75,36 @@ class Query: ...@@ -67,12 +75,36 @@ class Query:
# if Const.IS_TEST: return # if Const.IS_TEST: return
# self.refresh_jobs() # 刷新任务 # self.refresh_jobs() # 刷新任务
def refresh_jobs(self):
"""
更新任务
:return:
"""
allow_jobs = []
for job in self.query_jobs:
id = md5(job)
job_ins = objects_find_object_by_key_value(self.jobs, 'id', id) # [1 ,2]
if not job_ins:
job_ins = self.init_job(job)
if Config().QUERY_JOB_THREAD_ENABLED: # 多线程重新添加
create_thread_and_run(jobs=job_ins, callback_name='run', wait=Const.IS_TEST)
allow_jobs.append(job_ins)
for job in self.jobs: # 退出已删除 Job
if job not in allow_jobs: job.destroy()
QueryLog.print_init_jobs(jobs=self.jobs)
def init_jobs(self): def init_jobs(self):
for job in self.query_jobs: for job in self.query_jobs:
job = Job(info=job, query=self) self.init_job(job)
self.jobs.append(job)
QueryLog.print_init_jobs(jobs=self.jobs) QueryLog.print_init_jobs(jobs=self.jobs)
def init_job(self, job):
job = Job(info=job, query=self)
self.jobs.append(job)
return job
@classmethod @classmethod
def job_by_name(cls, name) -> Job: def job_by_name(cls, name) -> Job:
self = cls() self = cls()
......
...@@ -18,7 +18,6 @@ from py12306.log.user_log import UserLog ...@@ -18,7 +18,6 @@ from py12306.log.user_log import UserLog
class UserJob: class UserJob:
# heartbeat = 60 * 2 # 心跳保持时长 # heartbeat = 60 * 2 # 心跳保持时长
is_alive = True is_alive = True
heartbeat_interval = 60 * 2
check_interval = 5 check_interval = 5
key = None key = None
user_name = '' user_name = ''
...@@ -53,7 +52,6 @@ class UserJob: ...@@ -53,7 +52,6 @@ class UserJob:
def update_user(self): def update_user(self):
from py12306.user.user import User from py12306.user.user import User
self.user = User() self.user = User()
self.heartbeat_interval = self.user.heartbeat
# if not Const.IS_TEST: 测试模块下也可以从文件中加载用户 # if not Const.IS_TEST: 测试模块下也可以从文件中加载用户
self.load_user() self.load_user()
...@@ -79,7 +77,7 @@ class UserJob: ...@@ -79,7 +77,7 @@ class UserJob:
def check_heartbeat(self): def check_heartbeat(self):
# 心跳检测 # 心跳检测
if self.get_last_heartbeat() and (time_int() - self.get_last_heartbeat()) < self.heartbeat_interval: if self.get_last_heartbeat() and (time_int() - self.get_last_heartbeat()) < Config().USER_HEARTBEAT_INTERVAL:
return True return True
# 只有主节点才能走到这 # 只有主节点才能走到这
if self.is_first_time() or not self.check_user_is_login(): if self.is_first_time() or not self.check_user_is_login():
...@@ -87,7 +85,7 @@ class UserJob: ...@@ -87,7 +85,7 @@ class UserJob:
if not self.handle_login(): return if not self.handle_login(): return
self.is_ready = True self.is_ready = True
message = UserLog.MESSAGE_USER_HEARTBEAT_NORMAL.format(self.get_name(), self.heartbeat_interval) message = UserLog.MESSAGE_USER_HEARTBEAT_NORMAL.format(self.get_name(), Config().USER_HEARTBEAT_INTERVAL)
if not Config.is_cluster_enabled(): if not Config.is_cluster_enabled():
UserLog.add_quick_log(message).flush() UserLog.add_quick_log(message).flush()
else: else:
......
...@@ -8,7 +8,6 @@ from py12306.user.job import UserJob ...@@ -8,7 +8,6 @@ from py12306.user.job import UserJob
@singleton @singleton
class User: class User:
heartbeat = 60 * 2
users = [] users = []
user_accounts = [] user_accounts = []
...@@ -17,7 +16,6 @@ class User: ...@@ -17,7 +16,6 @@ class User:
def __init__(self): def __init__(self):
self.cluster = Cluster() self.cluster = Cluster()
self.heartbeat = Config().USER_HEARTBEAT_INTERVAL
self.update_interval() self.update_interval()
self.update_user_accounts() self.update_user_accounts()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment