#!/usr/bin/env python

encoding=utf-8

import Queue import threading import datetime import sys import time import pyodbc import argparse from utils import log, DBConnManager, HDBConnManager,is_valid_date from config import app_inst_id from config import db_config, hdb_config, hdb_driver from utils import dbPoolManager, create_pool import cx_Oracle as DBManager from utils import create_pool, DatabaseException

TOTAL_PULL_QUEUE = 40

class ThreadScanRis(threading.Thread): ''' 从ris获取study信息 ''' def init(self, pull_queue, rmt_pool, interval,spec_date=None): threading.Thread.init(self); self.pull_queue = pull_queue self.rmt_pool = rmt_pool self.retry_interval = interval self.spec_date = spec_date def run(self): log.info('Get Study list from Ris threads are working....') while True: str_select = ''' select '''' + app_inst_id + '''', Accession_number,Patient_id,Study_UUID, CONVERT(varchar(100),Study_time,20),Modality from v_study_bal ''' if self.spec_date is None: str_where = ''' where DATEDIFF(minute,Study_time,GETDATE()) < (''' + str(self.retry_interval) + '''+32) ''' else: str_where = ''' where CONVERT(varchar(100),Study_time,112) = '''' + self.spec_date + '''' ''' rows = 0 with self.rmt_pool.get_connect() as db: try: log.info('sql = {}'.format(str_select + str_where)) db.cur.execute(str_select + str_where) record_list = [] for record in db.cur: log.info('get ris study record: {}'.format(record)) tmp = list(record) record_list.append(tmp) rows = rows + 1

                log.info('total get [{}] records, now queue size [{}]'.format(
                                                          rows, self.pull_queue.qsize()))
                [self.pull_queue.put(item) for item in record_list]
                log.info('total get [{}] records, now queue size [{}]'.format(
                                                          rows,self.pull_queue.qsize()))
            except DatabaseException.DatabaseError, exc:
                log.error('Get dcm study list process request from RisDB     error!');
                log.error(exc)
            except Exception, exc:
                log.error('unexpected error[ {}]'.format(exc))

        if self.spec_date:
            break
        else:
            time.sleep(60 * self.retry_interval)

class ThreadWriteStudy(threading.Thread): def init(self, pull_queue, loc_pool): threading.Thread.init(self); self.pull_queue = pull_queue self.loc_pool = loc_pool

def run(self):
    log.info('Get write Studys threads are working....')
    while True:
        if self.pull_queue.empty():
            time.sleep(1)
            continue
        item = self.pull_queue.get()
        with self.loc_pool.get_connect() as db:
            try:
                result = -1
                sp_ret = db.cur.var(DBManager.NUMBER)
                item.append(sp_ret)
                dbret = db.cur.callproc('STORE_AUTOGET_' + item[0], item)
                result = dbret[-1]
                log.info('call proc[STORE_AUTOGET_' + app_inst_id + ' ' + item[1] + '-' + item[2] + '-' + item[3] + ']:' + str(result))
            except DatabaseException.DatabaseError, exc:
                log.error('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task failed')
                log.error(exc)
            else:
                if result == 0:
                    log.info('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task OK')
                else:
                    log.error('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task failed, db return is ' + str(result))
        time.sleep(1)
        self.pull_queue.task_done()

def get_args():

parser = argparse.ArgumentParser()
parser.add_argument('-t', '--spec_date', help='specify date(insert time) (format:YYYYMMDD eg. 2015-05-01)',
                        type=str,  dest='spec_date')
args = parser.parse_args()
if is_valid_date(args.spec_date):
    return vars(args)
else:
    return None

def main(): db_config['dbpool_size'] = TOTAL_PULL_QUEUE hdb_config['dbpool_size'] = 1 loc_pool = create_pool(db_config = db_config, db_type = 'oracle',max_overflow=4 ) rmt_pool = create_pool(db_config = hdb_config, db_type = hdb_driver,max_overflow=4)

args = get_args()
if args == None:
    spec_date = None
else:
    spec_date = args['spec_date']

pull_queue = Queue.Queue(0)
for i in range(TOTAL_PULL_QUEUE):
    work_pull = ThreadWriteStudy(pull_queue, loc_pool)
    work_pull.start()

dbscanget = ThreadScanRis(pull_queue, rmt_pool,interval=30,spec_date=spec_date)
dbscanget.start()

time.sleep(30)
log.info('main thread stopped!')

if name == 'main':

reload(sys)
sys.setdefaultencoding('utf-8')
main()

中文注释说明内容:#encoding=utf-8:文件编码格式为utf-8 import Queue:导入Queue模块,用于实现多线程之间的数据传输 import threading:导入threading模块,用于实现多线程 import datetime:导入datetime模块,用于处理日期时间相关的操作 import sys:导入sys模块,用于系统相关的操作 import time:导入time模块,用于处理时间相关的操作 import pyodbc:导入pyodbc模块,用于Python操作ODBC import argparse:导入argparse模块,用于解析命令行参数 from utils import log, DBConnManager, HDBConnManager,is_valid_date:从utils模块中导入log、DBConnManager、HDBConnManager和is_valid_date函数 from config import app_inst_id:从config模块中导入app_inst_id变量 from config import db_config, hdb_config, hdb_driver:从config模块中导入db_config、hdb_config和hdb_driver变量 from utils import dbPoolManager, create_pool:从utils模块中导入dbPoolManager和create_pool函数 import cx_Oracle as DBManager:导入cx_Oracle模块,并将其重命名为DBManager from utils import create_pool, DatabaseException:从utils模块中导入create_pool和DatabaseException函数

TOTAL_PULL_QUEUE = 40:定义常量TOTAL_PULL_QUEUE,值为40,表示队列的大小为40

class ThreadScanRis(threading.Thread):定义名为ThreadScanRis的类,继承自threading.Thread类

def __init__(self, pull_queue, rmt_pool, interval,spec_date=None):定义构造函数,初始化pull_queue、rmt_pool、retry_interval、spec_date属性

def run(self):定义run方法,用于执行线程的任务

class ThreadWriteStudy(threading.Thread):定义名为ThreadWriteStudy的类,继承自threading.Thread类

def __init__(self, pull_queue, loc_pool):定义构造函数,初始化pull_queue和loc_pool属性

def run(self):定义run方法,用于执行线程的任务

def get_args():定义get_args函数,用于解析命令行参数

def main():定义main函数,用于主程序的执行

db_config['dbpool_size'] = TOTAL_PULL_QUEUE:将db_config字典中的dbpool_size键对应的值设置为TOTAL_PULL_QUEUE

hdb_config['dbpool_size'] = 1:将hdb_config字典中的dbpool_size键对应的值设置为1

loc_pool = create_pool(db_config = db_config, db_type = 'oracle',max_overflow=4 ):创建Oracle数据库连接池loc_pool

rmt_pool = create_pool(db_config = hdb_config, db_type = hdb_driver,max_overflow=4):创建HDB数据库连接池rmt_pool

args = get_args():调用get_args函数,获取命令行参数

if args == None::如果args为空

    spec_date = None:将spec_date设置为None

else::否则

    spec_date = args['spec_date']:将spec_date设置为命令行参数中的spec_date值

pull_queue = Queue.Queue(0):创建一个大小为0的队列pull_queue

for i in range(TOTAL_PULL_QUEUE):循环40次

    work_pull = ThreadWriteStudy(pull_queue, loc_pool):创建ThreadWriteStudy线程对象work_pull,并将pull_queue和loc_pool作为参数传入

    work_pull.start():启动work_pull线程

dbscanget = ThreadScanRis(pull_queue, rmt_pool,interval=30,spec_date=spec_date):创建ThreadScanRis线程对象dbscanget,并将pull_queue、rmt_pool、interval和spec_date作为参数传入

dbscanget.start():启动dbscanget线程

time.sleep(30):暂停程序执行30秒

log.info('main thread stopped!'):输出日志信息,表示主线程已停止执行

if name == 'main'::如果当前脚本为主程序

reload(sys):重新加载sys模块

sys.setdefaultencoding('utf-8'):设置默认编码为utf-8

main():调用main函数,开始执行程序
Python多线程数据同步程序:从RIS系统获取Study信息并写入数据库

原文地址: https://www.cveoy.top/t/topic/nwi3 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录