Python 多线程从 RIS 获取 Study 信息并写入数据库
#!/usr/bin/env python
encoding=utf-8
import Queue import threading import datetime import sys import time import pyodbc import argparse from utils import log, DBConnManager, HDBConnManager,is_valid_date from config import app_inst_id from config import db_config, hdb_config, hdb_driver from utils import dbPoolManager, create_pool import cx_Oracle as DBManager from utils import create_pool, DatabaseException
TOTAL_PULL_QUEUE = 40
class ThreadScanRis(threading.Thread): ''' 从ris获取study信息 ''' def init(self, pull_queue, rmt_pool, interval,spec_date=None): threading.Thread.init(self); self.pull_queue = pull_queue self.rmt_pool = rmt_pool self.retry_interval = interval self.spec_date = spec_date def run(self): log.info('Get Study list from Ris threads are working....') while True: str_select = ''' select '''' + app_inst_id + '''', Accession_number,Patient_id,Study_UUID, CONVERT(varchar(100),Study_time,20),Modality from v_study_bal ''' if self.spec_date is None: str_where = ''' where DATEDIFF(minute,Study_time,GETDATE()) < (''' + self.retry_interval + '''+32) ''' else: str_where = ''' where CONVERT(varchar(100),Study_time,112) = '''' + self.spec_date + '''' ''' rows = 0 with self.rmt_pool.get_connect() as db: try: log.info('sql = ' + str_select + str_where) db.cur.execute(str_select + str_where) record_list = [] for record in db.cur: log.info('get ris study record: ' + str(record)) tmp = list(record) record_list.append(tmp) rows = rows + 1
log.info('total get [' + str(rows) + '] records, now queue size [' + str(self.pull_queue.qsize()) + ']')
[self.pull_queue.put(item) for item in record_list]
log.info('total get [' + str(rows) + '] records, now queue size [' + str(self.pull_queue.qsize()) + ']')
except DatabaseException.DatabaseError, exc:
log.error('Get dcm study list process request from RisDB error!');
log.error(exc)
except Exception, exc:
log.error('unexpected error[ ' + str(exc) + ']')
if self.spec_date:
break
else:
time.sleep(60 * self.retry_interval)
class ThreadWriteStudy(threading.Thread): def init(self, pull_queue, loc_pool): threading.Thread.init(self); self.pull_queue = pull_queue self.loc_pool = loc_pool
def run(self):
log.info('Get write Studys threads are working....')
while True:
if self.pull_queue.empty():
time.sleep(1)
continue
item = self.pull_queue.get()
with self.loc_pool.get_connect() as db:
try:
result = -1
sp_ret = db.cur.var(DBManager.NUMBER)
item.append(sp_ret)
dbret = db.cur.callproc('STORE_AUTOGET_' + item[0], item)
result = dbret[-1]
log.info('call proc[STORE_AUTOGET_' + app_inst_id + ' ' + item[1] + '-' + item[2] + '-' + item[3] + ']:' + str(result))
except DatabaseException.DatabaseError, exc:
log.error('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task failed')
log.error(exc)
else:
if result == 0:
log.info('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task OK')
else:
log.error('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task failed, db return is ' + str(result))
time.sleep(1)
self.pull_queue.task_done()
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('-t', '--spec_date', help='specify date(insert time) (format:YYYYMMDD eg. 2015-05-01)',
type=str, dest='spec_date')
args = parser.parse_args()
if is_valid_date(args.spec_date):
return vars(args)
else:
return None
def main(): db_config['dbpool_size'] = TOTAL_PULL_QUEUE hdb_config['dbpool_size'] = 1 loc_pool = create_pool(db_config = db_config, db_type = 'oracle',max_overflow=4 ) rmt_pool = create_pool(db_config = hdb_config, db_type = hdb_driver,max_overflow=4)
args = get_args()
if args == None:
spec_date = None
else:
spec_date = args['spec_date']
pull_queue = Queue.Queue(0)
for i in range(TOTAL_PULL_QUEUE):
work_pull = ThreadWriteStudy(pull_queue, loc_pool)
work_pull.start()
dbscanget = ThreadScanRis(pull_queue, rmt_pool,interval=30,spec_date=spec_date)
dbscanget.start()
time.sleep(30)
log.info('main thread stopped!')
if name == 'main':
reload(sys)
sys.setdefaultencoding('utf-8')
main() # 执行主函数
中文注释说明内容:#encoding=utf-8 # 设置编码格式为utf-8
import Queue # 导入Queue模块,用于多线程队列 import threading # 导入threading模块,用于多线程处理 import datetime # 导入datetime模块,用于日期时间处理 import sys # 导入sys模块,用于系统相关操作 import time # 导入time模块,用于时间相关操作 import pyodbc # 导入pyodbc模块,用于连接数据库 import argparse # 导入argparse模块,用于解析命令行参数 from utils import log, DBConnManager, HDBConnManager, is_valid_date # 导入自定义的工具函数 from config import app_inst_id # 导入配置文件中的应用实例ID from config import db_config, hdb_config, hdb_driver # 导入配置文件中的数据库连接配置 from utils import dbPoolManager, create_pool, DatabaseException # 导入自定义的数据库连接池和异常类 import cx_Oracle as DBManager # 导入cx_Oracle模块,用于连接Oracle数据库
TOTAL_PULL_QUEUE = 40 # 定义队列长度
class ThreadScanRis(threading.Thread): ''' 从ris获取study信息 ''' def init(self, pull_queue, rmt_pool, interval,spec_date=None): threading.Thread.init(self); self.pull_queue = pull_queue self.rmt_pool = rmt_pool self.retry_interval = interval self.spec_date = spec_date def run(self): log.info('Get Study list from Ris threads are working....') while True: str_select = ''' select '''' + app_inst_id + '''', Accession_number,Patient_id,Study_UUID, CONVERT(varchar(100),Study_time,20),Modality from v_study_bal ''' if self.spec_date is None: str_where = ''' where DATEDIFF(minute,Study_time,GETDATE()) < (''' + self.retry_interval + '''+32) ''' else: str_where = ''' where CONVERT(varchar(100),Study_time,112) = '''' + self.spec_date + '''' ''' rows = 0 with self.rmt_pool.get_connect() as db: try: log.info('sql = ' + str_select + str_where) db.cur.execute(str_select + str_where) record_list = [] for record in db.cur: log.info('get ris study record: ' + str(record)) tmp = list(record) record_list.append(tmp) rows = rows + 1
log.info('total get [' + str(rows) + '] records, now queue size [' + str(self.pull_queue.qsize()) + ']')
[self.pull_queue.put(item) for item in record_list]
log.info('total get [' + str(rows) + '] records, now queue size [' + str(self.pull_queue.qsize()) + ']')
except DatabaseException.DatabaseError, exc:
log.error('Get dcm study list process request from RisDB error!');
log.error(exc)
except Exception, exc:
log.error('unexpected error[ ' + str(exc) + ']')
if self.spec_date:
break
else:
time.sleep(60 * self.retry_interval)
class ThreadWriteStudy(threading.Thread): def init(self, pull_queue, loc_pool): threading.Thread.init(self); self.pull_queue = pull_queue self.loc_pool = loc_pool
def run(self):
log.info('Get write Studys threads are working....')
while True:
if self.pull_queue.empty():
time.sleep(1)
continue
item = self.pull_queue.get()
with self.loc_pool.get_connect() as db:
try:
result = -1
sp_ret = db.cur.var(DBManager.NUMBER)
item.append(sp_ret)
dbret = db.cur.callproc('STORE_AUTOGET_' + item[0], item)
result = dbret[-1]
log.info('call proc[STORE_AUTOGET_' + app_inst_id + ' ' + item[1] + '-' + item[2] + '-' + item[3] + ']:' + str(result))
except DatabaseException.DatabaseError, exc:
log.error('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task failed')
log.error(exc)
else:
if result == 0:
log.info('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task OK')
else:
log.error('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task failed, db return is ' + str(result))
time.sleep(1)
self.pull_queue.task_done()
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('-t', '--spec_date', help='specify date(insert time) (format:YYYYMMDD eg. 2015-05-01)',
type=str, dest='spec_date')
args = parser.parse_args()
if is_valid_date(args.spec_date):
return vars(args)
else:
return None
def main(): db_config['dbpool_size'] = TOTAL_PULL_QUEUE hdb_config['dbpool_size'] = 1 loc_pool = create_pool(db_config = db_config, db_type = 'oracle',max_overflow=4 ) rmt_pool = create_pool(db_config = hdb_config, db_type = hdb_driver,max_overflow=4)
args = get_args()
if args == None:
spec_date = None
else:
spec_date = args['spec_date']
pull_queue = Queue.Queue(0)
for i in range(TOTAL_PULL_QUEUE):
work_pull = ThreadWriteStudy(pull_queue, loc_pool)
work_pull.start()
dbscanget = ThreadScanRis(pull_queue, rmt_pool,interval=30,spec_date=spec_date)
dbscanget.start()
time.sleep(30)
log.info('main thread stopped!')
if name == 'main':
reload(sys)
sys.setdefaultencoding('utf-8')
main() # 执行主函数
原文地址: https://www.cveoy.top/t/topic/nwjf 著作权归作者所有。请勿转载和采集!