#!/usr/bin/env python

encoding=utf-8

import Queue import threading import datetime import sys import time import pyodbc import argparse from utils import log, DBConnManager, HDBConnManager,is_valid_date from config import app_inst_id from config import db_config, hdb_config, hdb_driver from utils import dbPoolManager, create_pool import cx_Oracle as DBManager from utils import create_pool, DatabaseException

TOTAL_PULL_QUEUE = 40

class ThreadScanRis(threading.Thread): ''' 从ris获取study信息 ''' def init(self, pull_queue, rmt_pool, interval,spec_date=None): threading.Thread.init(self); self.pull_queue = pull_queue self.rmt_pool = rmt_pool self.retry_interval = interval self.spec_date = spec_date def run(self): log.info('Get Study list from Ris threads are working....') while True: str_select = ''' select '''' + app_inst_id + '''', Accession_number,Patient_id,Study_UUID, CONVERT(varchar(100),Study_time,20),Modality from v_study_bal ''' if self.spec_date is None: str_where = ''' where DATEDIFF(minute,Study_time,GETDATE()) < (''' + self.retry_interval + '''+32) ''' else: str_where = ''' where CONVERT(varchar(100),Study_time,112) = '''' + self.spec_date + '''' ''' rows = 0 with self.rmt_pool.get_connect() as db: try: log.info('sql = ' + str_select + str_where) db.cur.execute(str_select + str_where) record_list = [] for record in db.cur: log.info('get ris study record: ' + str(record)) tmp = list(record) record_list.append(tmp) rows = rows + 1

                log.info('total get [' + str(rows) + '] records, now queue size [' + str(self.pull_queue.qsize()) + ']')
                [self.pull_queue.put(item) for item in record_list]
                log.info('total get [' + str(rows) + '] records, now queue size [' + str(self.pull_queue.qsize()) + ']')
            except DatabaseException.DatabaseError, exc:
                log.error('Get dcm study list process request from RisDB     error!');
                log.error(exc)
            except Exception, exc:
                log.error('unexpected error[ ' + str(exc) + ']')

        if self.spec_date:
            break
        else:
            time.sleep(60 * self.retry_interval)

class ThreadWriteStudy(threading.Thread): def init(self, pull_queue, loc_pool): threading.Thread.init(self); self.pull_queue = pull_queue self.loc_pool = loc_pool

def run(self):
    log.info('Get write Studys threads are working....')
    while True:
        if self.pull_queue.empty():
            time.sleep(1)
            continue
        item = self.pull_queue.get()
        with self.loc_pool.get_connect() as db:
            try:
                result = -1
                sp_ret = db.cur.var(DBManager.NUMBER)
                item.append(sp_ret)
                dbret = db.cur.callproc('STORE_AUTOGET_' + item[0], item)
                result = dbret[-1]
                log.info('call proc[STORE_AUTOGET_' + app_inst_id + ' ' + item[1] + '-' + item[2] + '-' + item[3] + ']:' + str(result))
            except DatabaseException.DatabaseError, exc:
                log.error('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task failed')
                log.error(exc)
            else:
                if result == 0:
                    log.info('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task OK')
                else:
                    log.error('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task failed, db return is ' + str(result))
        time.sleep(1)
        self.pull_queue.task_done()

def get_args():

parser = argparse.ArgumentParser()
parser.add_argument('-t', '--spec_date', help='specify date(insert time) (format:YYYYMMDD eg. 2015-05-01)',
                        type=str,  dest='spec_date')
args = parser.parse_args()
if is_valid_date(args.spec_date):
    return vars(args)
else:
    return None

def main(): db_config['dbpool_size'] = TOTAL_PULL_QUEUE hdb_config['dbpool_size'] = 1 loc_pool = create_pool(db_config = db_config, db_type = 'oracle',max_overflow=4 ) rmt_pool = create_pool(db_config = hdb_config, db_type = hdb_driver,max_overflow=4)

args = get_args()
if args == None:
    spec_date = None
else:
    spec_date = args['spec_date']

pull_queue = Queue.Queue(0)
for i in range(TOTAL_PULL_QUEUE):
    work_pull = ThreadWriteStudy(pull_queue, loc_pool)
    work_pull.start()

dbscanget = ThreadScanRis(pull_queue, rmt_pool,interval=30,spec_date=spec_date)
dbscanget.start()

time.sleep(30)
log.info('main thread stopped!')

if name == 'main':

reload(sys)
sys.setdefaultencoding('utf-8')
main()  # 执行主函数

中文注释说明内容:#encoding=utf-8 # 设置编码格式为utf-8

import Queue # 导入Queue模块,用于多线程队列 import threading # 导入threading模块,用于多线程处理 import datetime # 导入datetime模块,用于日期时间处理 import sys # 导入sys模块,用于系统相关操作 import time # 导入time模块,用于时间相关操作 import pyodbc # 导入pyodbc模块,用于连接数据库 import argparse # 导入argparse模块,用于解析命令行参数 from utils import log, DBConnManager, HDBConnManager, is_valid_date # 导入自定义的工具函数 from config import app_inst_id # 导入配置文件中的应用实例ID from config import db_config, hdb_config, hdb_driver # 导入配置文件中的数据库连接配置 from utils import dbPoolManager, create_pool, DatabaseException # 导入自定义的数据库连接池和异常类 import cx_Oracle as DBManager # 导入cx_Oracle模块,用于连接Oracle数据库

TOTAL_PULL_QUEUE = 40 # 定义队列长度

class ThreadScanRis(threading.Thread): ''' 从ris获取study信息 ''' def init(self, pull_queue, rmt_pool, interval,spec_date=None): threading.Thread.init(self); self.pull_queue = pull_queue self.rmt_pool = rmt_pool self.retry_interval = interval self.spec_date = spec_date def run(self): log.info('Get Study list from Ris threads are working....') while True: str_select = ''' select '''' + app_inst_id + '''', Accession_number,Patient_id,Study_UUID, CONVERT(varchar(100),Study_time,20),Modality from v_study_bal ''' if self.spec_date is None: str_where = ''' where DATEDIFF(minute,Study_time,GETDATE()) < (''' + self.retry_interval + '''+32) ''' else: str_where = ''' where CONVERT(varchar(100),Study_time,112) = '''' + self.spec_date + '''' ''' rows = 0 with self.rmt_pool.get_connect() as db: try: log.info('sql = ' + str_select + str_where) db.cur.execute(str_select + str_where) record_list = [] for record in db.cur: log.info('get ris study record: ' + str(record)) tmp = list(record) record_list.append(tmp) rows = rows + 1

                log.info('total get [' + str(rows) + '] records, now queue size [' + str(self.pull_queue.qsize()) + ']')
                [self.pull_queue.put(item) for item in record_list]
                log.info('total get [' + str(rows) + '] records, now queue size [' + str(self.pull_queue.qsize()) + ']')
            except DatabaseException.DatabaseError, exc:
                log.error('Get dcm study list process request from RisDB     error!');
                log.error(exc)
            except Exception, exc:
                log.error('unexpected error[ ' + str(exc) + ']')

        if self.spec_date:
            break
        else:
            time.sleep(60 * self.retry_interval)

class ThreadWriteStudy(threading.Thread): def init(self, pull_queue, loc_pool): threading.Thread.init(self); self.pull_queue = pull_queue self.loc_pool = loc_pool

def run(self):
    log.info('Get write Studys threads are working....')
    while True:
        if self.pull_queue.empty():
            time.sleep(1)
            continue
        item = self.pull_queue.get()
        with self.loc_pool.get_connect() as db:
            try:
                result = -1
                sp_ret = db.cur.var(DBManager.NUMBER)
                item.append(sp_ret)
                dbret = db.cur.callproc('STORE_AUTOGET_' + item[0], item)
                result = dbret[-1]
                log.info('call proc[STORE_AUTOGET_' + app_inst_id + ' ' + item[1] + '-' + item[2] + '-' + item[3] + ']:' + str(result))
            except DatabaseException.DatabaseError, exc:
                log.error('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task failed')
                log.error(exc)
            else:
                if result == 0:
                    log.info('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task OK')
                else:
                    log.error('insert ' + item[1] + '-' + item[2] + '-' + item[3] + ' task failed, db return is ' + str(result))
        time.sleep(1)
        self.pull_queue.task_done()

def get_args():

parser = argparse.ArgumentParser()
parser.add_argument('-t', '--spec_date', help='specify date(insert time) (format:YYYYMMDD eg. 2015-05-01)',
                        type=str,  dest='spec_date')
args = parser.parse_args()
if is_valid_date(args.spec_date):
    return vars(args)
else:
    return None

def main(): db_config['dbpool_size'] = TOTAL_PULL_QUEUE hdb_config['dbpool_size'] = 1 loc_pool = create_pool(db_config = db_config, db_type = 'oracle',max_overflow=4 ) rmt_pool = create_pool(db_config = hdb_config, db_type = hdb_driver,max_overflow=4)

args = get_args()
if args == None:
    spec_date = None
else:
    spec_date = args['spec_date']

pull_queue = Queue.Queue(0)
for i in range(TOTAL_PULL_QUEUE):
    work_pull = ThreadWriteStudy(pull_queue, loc_pool)
    work_pull.start()

dbscanget = ThreadScanRis(pull_queue, rmt_pool,interval=30,spec_date=spec_date)
dbscanget.start()

time.sleep(30)
log.info('main thread stopped!')

if name == 'main':

reload(sys)
sys.setdefaultencoding('utf-8')
main()  # 执行主函数
Python 多线程从 RIS 获取 Study 信息并写入数据库

原文地址: https://www.cveoy.top/t/topic/nwjf 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录