许可优化
产品
解决方案
服务支持
关于
软件库
当前位置:服务支持 >  软件文章 >  Python并行运算Dyna:cmd命令高效利用

Python并行运算Dyna:cmd命令高效利用

阅读数 6
点赞 0
article_banner

一、基础知识

1.1 dyna的cmd命令 方式

若dyna求解器的路径为https://www.gofarlic.com\Program Files\lsdyna\program\ls-dyna_smp_d_R11_1_0_winx64_ifort160.exe,需要执行的k文件路径为https://www.gofarlic.com\ProgramData\lsdyna\DOEv4.X\DOEv4.3\job.k,则通过cmd可调用 dyna求解器对k文件进行计算,cmd窗口中输入代码为

"https://www.gofarlic.com\Program Files\lsdyna\program\ls-dyna_smp_d_R11_1_0_winx64_ifort160.exe" I=https://www.gofarlic.com\ProgramData\lsdyna\DOEv4.X\DOEv4.3\job.k O=https://www.gofarlic.com\ProgramData\lsdyna\DOEv4.X\DOEv4.3\d3hsp NCPU=8

其中,NCPU=8 含义为8核SMP计算。

1.2 python执行cmd命令

python中,可以通过os库 中的os.system(command)来执行cmd命令,如python通过cmd设置定时关机的代码可表示为

import os
os.system('shutdown -s -t 3600')

1.3 python的并行机制

python的threading 模块 提供了多线程运行方式,具体细节可参考Python threading实现多线程 基础篇 - 知乎 (zhihu.com),总体而言,实现多线程运行有两种方式:

  1. 创建Thread 的实例,传给它一个可调用对象(函数或者类的实例方法)。
  2. 派生Thread 的子类,并创建子类的实例。

方式一的案例为:

from threading import Thread
from time import sleep, ctime

def func(name, sec):
    print('---开始---', name, '时间', ctime())
    sleep(sec)
    print('***结束***', name, '时间', ctime())

# 创建 Thread 实例
t1 = Thread(target=func, args=('第一个线程', 1))
t2 = Thread(target=func, args=('第二个线程', 2))

# 启动线程运行
t1.start()
t2.start()

# 等待所有线程执行完毕
t1.join()  # join() 等待线程终止,要不然一直挂起
t2.join()

方式二的案例为:

from threading import Thread
from time import sleep, ctime


# 创建 Thread 的子类
class MyThread(Thread):
    def __init__(self, func, args):
        '''
        :param func: 可调用的对象
        :param args: 可调用对象的参数
        '''
        Thread.__init__(self)   # 不要忘记调用Thread的初始化方法
        self.func = func
        self.args = args

    def run(self):
        self.func(*self.args)


def func(name, sec):
    print('---开始---', name, '时间', ctime())
    sleep(sec)
    print('***结束***', name, '时间', ctime())

def main():
    # 创建 Thread 实例
    t1 = MyThread(func, (1, 1))
    t2 = MyThread(func, (2, 2))
    # 启动线程运行
    t1.start()
    t2.start()
    # 等待所有线程执行完毕
    t1.join()
    t2.join()

if __name__ == '__main__':
    main()

两种方式的结果为:

---开始--- 一 时间 Fri Nov 29 11:34:31 2019
---开始--- 二 时间 Fri Nov 29 11:34:31 2019
***结束*** 一 时间 Fri Nov 29 11:34:32 2019
***结束*** 二 时间 Fri Nov 29 11:34:33 2019

二、 python并行运算

2.1 文件说明

任务目标为执行多个k文件,每个k文件位于一个独立的文件夹下,所有文件夹存放在同一个总体文件夹之中,文件组织如下。

文件夹目录
单个文件夹内

2.2 整体思路

程序的输入为总体文件夹的路径、进程数量NProcessor、单进程调用核的数量NCPU、输出为执行有限元仿真的cmd代码及在cmd中的并行执行,并辅以日志和实时显示的功能。步骤如下:

  1. 定义类的主体,确定dyna路径,输入NCPU和NProcessor来初始化类;
  2. 搜索总体文件夹下的子文件夹,记录在类变量KfileDirs中;
  3. 将KfileDirs视为队列,实施并行计算;
    1. 每次从KfileDirs中取出一个KfileDir,将它作为参数调用pulltask函数来启用进程(根据k文件路径调用cmd执行,并返回进程);
    2. 动态展示当前各个进程的状态和信息;
    3. 只要KfileDirs未完全遍历 或 全部进程均空闲,就继续执行;

总体代码如下:

from threading import Thread
import os
import time
import logging
from utilis.logger import setlogger
from utilis.ParrellelRunLib import watchdog_check,pull_task,check_single_thread
from datetime import datetime
import threading
import time


class Parel_Runer(object):
    lsdyna_dir = "https://www.gofarlic.com\\Program Files\\lsdyna\\program\\ls-dyna_smp_d_R11_1_0_winx64_ifort160.exe"
    Kfile_name = 'job.k'

    def __init__(self,NCPU = 6, NProcessor = 7):
        self.NCPU = NCPU
        self.thread_register = [None for i in range(NProcessor)]

    def run_in_dir(self,dir): #获取需要执行的k文件路径列表,并调用函数执行
        self.dir = dir
        setlogger(os.path.join(self.dir, 'training-%s.log' % datetime.strftime(datetime.now(), '%m%d-%H%M%S')))

        #获取需要执行的k文件路径列表
        self.KfileDirs = []
        subdirs = next(os.walk(self.dir))[1]
        for subdir in subdirs:
            if 'condition' in subdir:
                self.KfileDirs.append(os.path.join(dir,subdir))
        
        #记录
        for KfileDir in self.KfileDirs:
            commandline = '"%s" I=%s NCPU=%d' % (self.lsdyna_dir, os.path.join(KfileDir, self.Kfile_name), self.NCPU)
            logging.info('dir:\t' + KfileDir)
            logging.info('command:\t' + commandline + '\n \n')
        logging.info('--' * 20)

        #执行
        self.run_actually()

    def run_actually(self):
        start = time.time()
        # 循环执行条件 :KfileDirs不为空或 线程不全为空
        while len(self.KfileDirs) > 0 or not register_is_free(self.thread_register):
            response = watchdog_check(self.thread_register)
            if response != None:  # 检查是否有空闲,空闲返回
                KfileDir= self.KfileDirs.pop(0)
                commandline = '"%s" I=%s NCPU=%d' % (self.lsdyna_dir, os.path.join(KfileDir, self.Kfile_name), self.NCPU)
                self.thread_register[response] = pull_task(commandline, KfileDir, "thread-%d" % (response + 1))
            time.sleep(2)
            # 动态刷新
            timeinfo = "<{:s}> processtime:{:6.2f}min |||||\n".format(datetime.strftime(datetime.now(), '%H:%M:%S'),
                                                                    (time.time() - start) / 60)
            showinfo = ["<thread-{:<2d} | mode: {:^9s} | running time:{:5.2f}min> | processdir: {:s}\n"
                            .format(i + 1, check_single_thread(item), 0 if item is None else item.runningtime() / 60,
                                    "None" if item is None else os.path.split(item.cal_dir)[-1])
                        for i, item in enumerate(self.thread_register)]
            print('\r', timeinfo, *showinfo, end='')

if __name__ == '__main__':
    runner = Parel_Runer()
    runner.run_in_dir(r'https://www.gofarlic.com\ProgramData\lsdyna\Ex1-Model2\1_SimplifiedModel\5-generation\table_v2')

2.3 详解pull task函数

  1. CustomThread类,是threading.Thread的子类;
  2. pull_task函数,接受了K文件路径和计算结果放置路径,然后通过CustomThread唤起了进程执行execute_cmd_repeat函数并返回该进程变量;
  3. execute_cmd_repeat函数,目的为循环执行有限元。为防止有限元不收敛提前终止,因此通过该函数对生成文件d3plot的数量进行检查,如果生成d3plot的数量不达标且未达到重复上限,则清除生成的文件并再次执行有限元仿真;
  4. execute_cmd函数,单次执行有限元仿真。跟据生成文件夹更改执行路径,然后根据K文件路径执行有限元仿真;
  5. cal_d3plot_number函数:统计文件夹下d3plot文件的数量;
  6. clearfile函数:清除文件夹下k文件外的所有文件。
class CustomThread(threading.Thread):
    def __init__(self, *args,**kwargs):
        super().__init__(*args,**kwargs)
    def setdir(self,cal_dir):
        self.cal_dir = cal_dir
    def run(self):
        self.starttime = time.time()
        self._target(*self._args)

    def runningtime(self):
        try:
            return time.time() - self.starttime
        except:
            return 0

def pull_task(commandline,cal_dir=None,name = None):
    t = CustomThread(target=execute_cmd_repeat, args=(commandline,cal_dir), name= name)
    t.setdir(cal_dir)
    t.start()
    return t

def execute_cmd_repeat(command,cal_dir='./',repeat=2,d3plotfile_target=100):
     runtime = 0
     number = cal_d3plot_number(cal_dir)
     while (number < d3plotfile_target and runtime < repeat):
         clearfile(cal_dir)
         execute_cmd(command,cal_dir,runtime)
         number = cal_d3plot_number(cal_dir)
         runtime += 1



def execute_cmd(command,cal_dir='./',repeattime=0):
    log_info=['-----' * 10, "calculation start in %s:"%threading.current_thread().name,
              cal_dir,command,'\nrepeattime: %d\n'%repeattime,'-----' * 10]
    for item in log_info:
        logging.info(item)

    #获取当前路径
    os.chdir(cal_dir)
    os.system(command)

    log_info=['-----' * 10, "calculation finished in %s:"%threading.current_thread().name,
              cal_dir,command,'\nrepeattime: %d\n'%repeattime,'-----' * 10]
    for item in log_info:
        logging.info(item)


def cal_d3plot_number(cal_dir):
    files = next(os.walk(cal_dir))[2]
    count = 0
    for file in files:
        if 'd3plot' in file:
            count += 1
    return count

def clearfile(target_dir):
    files = next(os.walk(target_dir))[2]
    for file in files:
        if not file.endswith('.k'):
            os.remove(os.path.join(target_dir,file))

2.4 其他函数

  1. watchdog函数:检查线程列表中的线程是否有空闲,若有空闲,返回空闲的线程编号,否则返回None;
  2. register_is_free函数:检查线程列表是否全部空闲;
  3. check_single_thread函数:检查单个线程是否在运行;
def watchdog_check(register):
    for i,item in enumerate(register):
        if item == None or not item.is_alive():
            return i
    return None

def register_is_free(register):
    for item in register:
        if item != None and item.is_alive():
            return False
    return True


def check_single_thread(td):
    if td == None:
        return 'undefined'
    else:
        return "%s"%td.is_alive()

2.5 运行时的情况


免责声明:本文系网络转载或改编,未找到原创作者,版权归原作者所有。如涉及版权,请联系删
相关文章
QR Code
微信扫一扫,欢迎咨询~

联系我们
武汉格发信息技术有限公司
湖北省武汉市经开区科技园西路6号103孵化器
电话:155-2731-8020 座机:027-59821821
邮件:tanzw@gofarlic.com
Copyright © 2023 Gofarsoft Co.,Ltd. 保留所有权利
遇到许可问题?该如何解决!?
评估许可证实际采购量? 
不清楚软件许可证使用数据? 
收到软件厂商律师函!?  
想要少购买点许可证,节省费用? 
收到软件厂商侵权通告!?  
有正版license,但许可证不够用,需要新购? 
联系方式 155-2731-8020
预留信息,一起解决您的问题
* 姓名:
* 手机:

* 公司名称:

姓名不为空

手机不正确

公司不为空