Python多线程数据同步程序:从RIS系统获取Study信息并写入数据库
#!/usr/bin/env python
encoding=utf-8
import Queue import threading import datetime import sys import time import pyodbc import argparse from utils import log, DBConnManager, HDBConnManager,is_valid_date from config import app_inst_id from config import db_config, hdb_config, hdb_driver from utils import dbPoolManager, create_pool import cx_Oracle as DBManager from utils import create_pool, DatabaseException
TOTAL_PULL_QUEUE = 40
class ThreadScanRis(threading.Thread): ''' 从ris获取study信息 ''' def init(self, pull_queue, rmt_pool, interval,spec_date=None): threading.Thread.init(self); self.pull_queue = pull_queue self.rmt_pool = rmt_pool self.retry_interval = interval self.spec_date = spec_date def run(self): log.info('Get Study list from Ris threads are working....') while True: str_select = ''' select '''' + app_inst_id + '''', Accession_number,Patient_id,Study_UUID, CONVERT(varchar(100),Study_time,20),Modality from v_study_bal ''' if self.spec_date is None: str_where = ''' where DATEDIFF(minute,Study_time,GETDATE()) < (''' + str(self.retry_interval) + '''+32) ''' else: str_where = ''' where CONVERT(varchar(100),Study_time,112) = '''' + self.spec_date + '''' ''' rows = 0 with self.rmt_pool.get_connect() as db: try: log.info('sql = {}'.format(str_select + str_where)) db.cur.execute(str_select + str_where) record_list = [] for record in db.cur: log.info('get ris study record: {}'.format(record)) tmp = list(record) record_list.append(tmp) rows = rows + 1
log.info('total get [{}] records, now queue size [{}]'.format(
rows, self.pull_queue.qsize()))
[self.pull_queue.put(item) for item in record_list]
log.info('total get [{}] records, now queue size [{}]'.format(
rows,self.pull_queue.qsize()))
except DatabaseException.DatabaseError, exc:
log.error('Get dcm study list process request from RisDB error!');
log.error(exc)
except Exception, exc:
log.error('unexpected error[ {}]'.format(exc))
if self.spec_date:
break
else:
time.sleep(60 * self.retry_interval)
class ThreadWriteStudy(threading.Thread): def init(self, pull_queue, loc_pool): threading.Thread.init(self); self.pull_queue = pull_queue self.loc_pool = loc_pool
def run(self):
log.info('Get write Studys threads are working....')
while True:
if self.pull_queue.empty():
time.sleep(1)
continue
item = self.pull_queue.get()
with self.loc_pool.get_connect() as db:
try:
result = -1
sp_ret = db.cur.var(DBManager.NUMBER)
item.append(sp_ret)
dbret = db.cur.callproc('STORE_AUTOGET_' + item[0], item)
result = dbret[-1]
log.info('call proc[STORE_AUTOGET_' + app_inst_id + ' ' + item[1] + '-' + item[2] + '-' + item[3] + ']:' + str(result))
except DatabaseException.DatabaseError, exc:
log.error('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task failed')
log.error(exc)
else:
if result == 0:
log.info('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task OK')
else:
log.error('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task failed, db return is ' + str(result))
time.sleep(1)
self.pull_queue.task_done()
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('-t', '--spec_date', help='specify date(insert time) (format:YYYYMMDD eg. 2015-05-01)',
type=str, dest='spec_date')
args = parser.parse_args()
if is_valid_date(args.spec_date):
return vars(args)
else:
return None
def main(): db_config['dbpool_size'] = TOTAL_PULL_QUEUE hdb_config['dbpool_size'] = 1 loc_pool = create_pool(db_config = db_config, db_type = 'oracle',max_overflow=4 ) rmt_pool = create_pool(db_config = hdb_config, db_type = hdb_driver,max_overflow=4)
args = get_args()
if args == None:
spec_date = None
else:
spec_date = args['spec_date']
pull_queue = Queue.Queue(0)
for i in range(TOTAL_PULL_QUEUE):
work_pull = ThreadWriteStudy(pull_queue, loc_pool)
work_pull.start()
dbscanget = ThreadScanRis(pull_queue, rmt_pool,interval=30,spec_date=spec_date)
dbscanget.start()
time.sleep(30)
log.info('main thread stopped!')
if name == 'main':
reload(sys)
sys.setdefaultencoding('utf-8')
main()
中文注释说明内容:#encoding=utf-8:文件编码格式为utf-8 import Queue:导入Queue模块,用于实现多线程之间的数据传输 import threading:导入threading模块,用于实现多线程 import datetime:导入datetime模块,用于处理日期时间相关的操作 import sys:导入sys模块,用于系统相关的操作 import time:导入time模块,用于处理时间相关的操作 import pyodbc:导入pyodbc模块,用于Python操作ODBC import argparse:导入argparse模块,用于解析命令行参数 from utils import log, DBConnManager, HDBConnManager,is_valid_date:从utils模块中导入log、DBConnManager、HDBConnManager和is_valid_date函数 from config import app_inst_id:从config模块中导入app_inst_id变量 from config import db_config, hdb_config, hdb_driver:从config模块中导入db_config、hdb_config和hdb_driver变量 from utils import dbPoolManager, create_pool:从utils模块中导入dbPoolManager和create_pool函数 import cx_Oracle as DBManager:导入cx_Oracle模块,并将其重命名为DBManager from utils import create_pool, DatabaseException:从utils模块中导入create_pool和DatabaseException函数
TOTAL_PULL_QUEUE = 40:定义常量TOTAL_PULL_QUEUE,值为40,表示队列的大小为40
class ThreadScanRis(threading.Thread):定义名为ThreadScanRis的类,继承自threading.Thread类
def __init__(self, pull_queue, rmt_pool, interval,spec_date=None):定义构造函数,初始化pull_queue、rmt_pool、retry_interval、spec_date属性
def run(self):定义run方法,用于执行线程的任务
class ThreadWriteStudy(threading.Thread):定义名为ThreadWriteStudy的类,继承自threading.Thread类
def __init__(self, pull_queue, loc_pool):定义构造函数,初始化pull_queue和loc_pool属性
def run(self):定义run方法,用于执行线程的任务
def get_args():定义get_args函数,用于解析命令行参数
def main():定义main函数,用于主程序的执行
db_config['dbpool_size'] = TOTAL_PULL_QUEUE:将db_config字典中的dbpool_size键对应的值设置为TOTAL_PULL_QUEUE
hdb_config['dbpool_size'] = 1:将hdb_config字典中的dbpool_size键对应的值设置为1
loc_pool = create_pool(db_config = db_config, db_type = 'oracle',max_overflow=4 ):创建Oracle数据库连接池loc_pool
rmt_pool = create_pool(db_config = hdb_config, db_type = hdb_driver,max_overflow=4):创建HDB数据库连接池rmt_pool
args = get_args():调用get_args函数,获取命令行参数
if args == None::如果args为空
spec_date = None:将spec_date设置为None
else::否则
spec_date = args['spec_date']:将spec_date设置为命令行参数中的spec_date值
pull_queue = Queue.Queue(0):创建一个大小为0的队列pull_queue
for i in range(TOTAL_PULL_QUEUE):循环40次
work_pull = ThreadWriteStudy(pull_queue, loc_pool):创建ThreadWriteStudy线程对象work_pull,并将pull_queue和loc_pool作为参数传入
work_pull.start():启动work_pull线程
dbscanget = ThreadScanRis(pull_queue, rmt_pool,interval=30,spec_date=spec_date):创建ThreadScanRis线程对象dbscanget,并将pull_queue、rmt_pool、interval和spec_date作为参数传入
dbscanget.start():启动dbscanget线程
time.sleep(30):暂停程序执行30秒
log.info('main thread stopped!'):输出日志信息,表示主线程已停止执行
if name == 'main'::如果当前脚本为主程序
reload(sys):重新加载sys模块
sys.setdefaultencoding('utf-8'):设置默认编码为utf-8
main():调用main函数,开始执行程序
原文地址: https://www.cveoy.top/t/topic/nwi3 著作权归作者所有。请勿转载和采集!