请教这个 concurrent.futures 多进程处理 SQL 队列,为什么只处理了第一个,就停了下来?

# coding=utf-8
import time
import pymysql
import MySQLdb
import AnalyFunc 

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, wait, ALL_COMPLETED, as_completed
from dbutils.pooled_db import PooledDB

GloSQLQueueBreakFlag = 1 # 处理队列退出判断信号

# 处理 SQL 队列
def procSQLcmd(sqlqueue):
    import time
    from dbutils.pooled_db import PooledDB
    import pymysql

    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=80,  # 连接池允许的最大连接数,0 和 None 表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0 和 None 不限制
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待; False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None 表示无限制
        setsession=[],  # 开始会话前执行的命令列表。
        ping=0,  # ping MySQL 服务端,检查是否服务可用。
        host=\'192.168.89.48\',
        port=3306,
        user=\'root\',
        password=\'root123\',
        database=\'eee\',
        charset=\'utf8\'
    )

    while True:
        while not sqlqueue.empty():
            print(GloSQLQueueBreakFlag:\', str(GloSQLQueueBreakFlag))
            sqlTask = sqlqueue.get()
            DBconn = POOL.connection()
            cur = DBconn.cursor()
            print(\"sqlTask:\",sqlTask)
            ses = cur.execute(sqlTask)
            cur.close()  # or del cur
            DBconn.close()  # or del db
            time.sleep(0.5)
        if GloSQLQueueBreakFlag == 0:
            break
        else:
            time.sleep(1)
    return

if __name__ == \'__main__\':
    from concurrent import futures
    from multiprocessing import Manager
    from teFunc import TranDicttoSQLcmd

    SQLQueue = Manager().Queue()
    ProcessSQLQueue = futures.ProcessPoolExecutor(max_workers=1)
    ProcessSQLQueueRet = ProcessSQLQueue.submit(procSQLcmd,SQLQueue)

    # 导入测试数据,成为字典列表
    teList = eval(AnalyFunc.ReadFiletoStr(\'h:/testdict.dict\'))

    for i in teList:
        print(i)
        TranDicttoSQLcmd(\'testSheet\', i, SQLQueue)  # 把字典转换成 MySQL 的 INSERT 语句,同时把语句作为任务交到全局队列,交由独立进程的 procSQLcmd 函数去处理


    # 最后确保队列全部弄完,才完全退出整个程序
    waitSQLQueue = True
    while waitSQLQueue == True:
        time.sleep(0.5)
        SQLQueueCount = SQLQueue.qsize()
        print(f\'SQLQueue 队列还有:{SQLQueueCount} 未处理完.\')
        if SQLQueueCount == 0:
            GloSQLQueueBreakFlag = 0
            waitSQLQueue = False
# 将字典转换成 SQL 语句
def TranDicttoSQLcmd(tblName,DictObj,SQLQueue,printSQL=False):
    import time
    # 组合字段
    FiledStr = \'\'
    ValueStr = \'\'
    SQLText = \'\'

    # 生成 INSERT 语句
    SQLcmd = \"INSERT INTO %s ({}) VALUE ({});\" % tblName

    # 单一字典
    if isinstance(DictObj, dict):
        FiledStr = \'\'
        ValueStr = \'\'
        for k, v in DictObj.items():
            if v == None:
                continue
            FiledStr = FiledStr + \"`%s`\" % (k) + \',\'
            ValueStr = ValueStr + \"\'%s\'\" % (str(v)) + \',\'
        FiledStr = FiledStr[:-1]
        ValueStr = ValueStr[:-1]
        SQLText = SQLcmd.format(FiledStr, ValueStr)
        if printSQL:
            print(\'TranDicttoSQLcmd:\',SQLText)
        if SQLQueue:
            SQLQueue.put(SQLText)
        return SQLText

    # 字典列表
    if isinstance(DictObj, list):
        kvDict = {}
        ccount = 0
        for i in DictObj:
            FiledStr = \'\'
            ValueStr = \'\'
            ccount += 1
            for k,v in i.items():
                if v == None:
                    continue
                FiledStr = FiledStr + \"`%s`\" % (k) + \',\'
                ValueStr = ValueStr + \"\'%s\'\" % (str(v)) + \',\'
            FiledStr = FiledStr[:-1]
            ValueStr = ValueStr[:-1]
            SQLText = SQLcmd.format(FiledStr, ValueStr)
            if printSQL:
                print(\'TranDicttoSQLcmd:\',SQLText)
            if SQLQueue:
                SQLQueue.put(SQLText)
    return

运行过程:

SQLQueue 队列还有:538
GloSQLQueueBreakFlag: 1
sqlTask: INSERT INTO testSheet (`tename`,`amount`,`weight`) VALUE (\'椰子\',\'218\',\'72170\');
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
SQLQueue 队列还有:537
#一直刷下去重复

检查 testSheet 表,一个椰子内容被插入,处理 SQL 语句队列的进程函数工作不正常是什么原因呢?

相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注