TensorFlow新手之路:深入理解队列机制

TensorFlow 队列

队列(Queue)是一种最为常用的数据输入输出方式,其通过先进先出的线性数据结构,一端只负责增加队列中的数据元素,而数据的输出和删除在队列的另一端实现。


队列的创建

队列的使用和Python中队列的函数类似。


操作描述
class tf.QueueBase基本的队列应用类,队列(queue)是一种数据结构,该结构通过多个步骤存储tensors,并且对tensors进行入列(enqueue)与出列(dequeue)操作。
tf.enqueue(vals,name=None)将一个元素编入该队列中。如果在执行该操作时队列已满,那么将会阻塞起到元素输入队列之中。
tf.enqueue_many(vals,name=None)将零个或多个元素编入该队列中
tf.dequeue(name=None)将元素从队列中移出,如果在执行该操作时队列已空,那么将会阻塞元素出殡,返回出殡的tensors的tuple
tf.size(name=None)计算队列中的元素个数
tf.close关闭该队列
f.dequeue_up_to(n,name=None)从该队列中移出n个元素并将之连接
tf.dtypes列出组成元素的数据类型
tf.from_list(index,queues)根据queues[index]的参考队列创建一个队列
​ ​tf.name​​返回队列最下面元素的名称
tf.names返回队列每一个组成部分的名称
class tf.FIFOQueue在出殡时依照先入先出顺序
class tf.PaddingFIFOQueue一个FIFOQueue,同时根据padding支持batching变长的tensor
class tf.RandomShuffleQueue该队列将随机元素出列

一般而言,创建一个队列首先要选定数据出入类型,例如是使用FIFOQueue函数设定数据为先入先出,还是RandomShuffleQueue这种随机元素出列的方式。

q = tf.FIFOQueue(3,"float")  # 第一个参数为队列中数据的个数,第二个参数是队列中元素的类型1.

之后要对队列中元素进行初始化和进行操作。

示例:

import tensorflow as tfwith tf.Session() as sess:    q = tf.FIFOQueue(3, "float")              
# 设定一个先进先出队列    init = q.enqueue_many(([0.1, 0.2, 0.3],))  # 填进数据    init2 = q.dequeue()
# 弹出数据   [0.2, 0.3]    init3 = q.enqueue(1.)                      
# [0.2, 0.3, 1]    sess.run(init)    sess.run(init2)    sess.run(init3)    quelen = sess.run(q.size())
# 获取队列的数据个数    for i in range(quelen):                   
# 循环弹出数据        print(sess.run(q.dequeue()))1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.

另外,TensorFlow 提供了QueueRunner函数用以解决异步操作问题。其可创建一系列的线程同时进入主线程内进行操作,数据的读取与操作是同步,即主线程在进行训练模型的工作的同时将数据从硬盘读入。

import tensorflow as tfq = tf.FIFOQueue(1000, "float32")counter = tf
.Variable(0.0)add_op = tf.assign_add(counter,tf.constant(1.0))enqueueData_op = q
.enqueue(counter)sess = tf.Session()qr = tf.train.QueueRunner(q,enqueue_ops=[add_op, enqueueData_op] * 2)sess
.run(tf.initialize_all_variables())enqueue_threads = qr.create_threads(sess, start=True)    
# 启动入队线程for i in range(10):    print(sess.run(q.dequeue()))1.2.3.4.5.6.7.8.9.10.11.12.13.14.

运行会话是正确的,但程序也没有结束,而是被挂起。造成这种情况的原因是add操作和入队操作没有同步,即TensorFlow在队列设计时为了优化IO系统,队列的操作一般使用批处理,这样入队线程没有发送结束的信息而程序主线程期望将程序结束,因此造成线程堵塞程序被挂起。


线程同步与停止

TensorFlow中的会话是支持多线程的,多个线程可以很方便地在一个会话下共同工作,并行地相互执行。但通过上面程序看到,这种同步会造成某个线程想要关闭对话时,对话被强行关闭而未完成工作的线程也被强行关闭。

TensorFlow为了解决多线程的同步和处理问题,提供了Coordinator和QueueRunner函数来对线程进行控制和协调。在使用上,这2个类必须被同时工作,共同协作来停止会话中所有线程,并向等待所有工作线程终止的程序报告。

import tensorflow as tfq = tf.FIFOQueue(1000, "float32")counter = tf
.Variable(0.0)add_op = tf.assign_add(counter,tf.constant(1.0))enqueueData_op = q
.enqueue(counter)sess = tf.Session()qr = tf.train.QueueRunner(q, 
enqueue_ops=[add_op, enqueueData_op] * 2)sess.run(tf.initialize_all_variables())enqueue_threads = qr
.create_threads(sess, start=True)    # 启动入队线程coord = tf.train.Coordinator()enqueue_threads = qr
.create_threads(sess, coord = coord, start=True)for i in range(0, 10):    
print(sess.run(q.dequeue()))coord.request_stop()coord.join(enqueue_threads)1.2.3.4.5.6.7.8.9.10.11.12
.13.14.15.16.17.18.19.

这里create_threads 函数被添加了一个新的参数:线程协调器,用于协调线程之间的关系,之后启动线程以后,线程协调器在最后负责所有线程的接受和处理,即当一个线程结束时,线程协调器会对所有的线程发出通知,协调其完毕。


从有磁盘读取数据步骤

  1. 从磁盘读取数据的名称与路径
  2. 将文件名堆入列队尾部
  3. 从队列头部读取文件名并读取数据
  4. Decoder将读取的数据解码
  5. 将数据输入样本队列,供后续使用。

 

免责声明:本文系网络转载或改编,未找到原创作者,版权归原作者所有。如涉及版权,请联系删

QR Code
微信扫一扫,欢迎咨询~

联系我们
武汉格发信息技术有限公司
湖北省武汉市经开区科技园西路6号103孵化器
电话:155-2731-8020 座机:027-59821821
邮件:tanzw@gofarlic.com
Copyright © 2023 Gofarsoft Co.,Ltd. 保留所有权利
遇到许可问题?该如何解决!?
评估许可证实际采购量? 
不清楚软件许可证使用数据? 
收到软件厂商律师函!?  
想要少购买点许可证,节省费用? 
收到软件厂商侵权通告!?  
有正版license,但许可证不够用,需要新购? 
联系方式 155-2731-8020
预留信息,一起解决您的问题
* 姓名:
* 手机:

* 公司名称:

姓名不为空

手机不正确

公司不为空