1. 线程
Python中关于线程的主要是 _thread 和 threading 模块
1.1 _thread 模块
1.1.1 创建线程
import time
import datetime
import _thread
date_time_format = "%H:%M:%S"
def get_time_str():
now = datetime.datetime.now()
return datetime.datetime.strftime(now, date_time_format)
def thread_function(thread_id):
print("Thread %d\t start at %s" % (thread_id, get_time_str()))
print("Thread %d\t sleep" % thread_id)
time.sleep(4)
print("Thread %d\t finish at %s" % (thread_id, get_time_str()))
def main():
print("Main thread start at %s" % get_time_str())
for i in range(5):
# 启动新线程,接收一个 元组 作为参数,返回一个标识符
_thread.start_new_thread(thread_function, (i,))
time.sleep(1)
time.sleep(6)
print("Main thread finish at %s" % get_time_str())
if __name__ == "__main__":
main()
1.1.2 使用锁
import time
import datetime
import _thread
date_time_format = "%H:%M:%S"
def get_time_str():
now = datetime.datetime.now()
return datetime.datetime.strftime(now, date_time_format)
def thread_function(thread_id, lock):
print("Thread %d\t start at %s" % (thread_id, get_time_str()))
print("Thread %d\t sleep" % thread_id)
time.sleep(4)
print("Thread %d\t finish at %s" % (thread_id, get_time_str()))
lock.release()
def main():
print("Main Thread start at %s" % get_time_str())
locks = []
for i in range(5):
lock = _thread.allocate_lock()
lock.acquire()
locks.append(lock)
for i in range(5):
_thread.start_new_thread(thread_function, (i, locks[i]))
time.sleep(1)
for i in range(5):
while locks[i].locked():
time.sleep(1)
print("Main Thread finish at %s" % get_time_str())
if __name__ == "__main__":
main()
1.2 threading 模块
1.2.1 创建线程
import time
import datetime
import threading
date_time_format = "%H:%M:%S"
def get_time_str():
now = datetime.datetime.now()
return datetime.datetime.strftime(now, date_time_format)
def thread_function(thread_id):
print("Thread %d\t start at %s" % (thread_id, get_time_str()))
print("Thread %d\t sleep" % thread_id)
time.sleep(4)
print("Thread %d\t finish at %s" % (thread_id, get_time_str()))
def main():
print("Main Thread start at %s" % get_time_str())
threads = []
# 创建线程
for i in range(5):
thread = threading.Thread(target=thread_function, args=(i,))
threads.append(thread)
# 启动线程
for i in range(5):
threads[i].start()
time.sleep(1)
# 等待线程执行完毕
for i in range(5):
threads[i].join()
print("Main Thread finish at %s" % get_time_str())
if __name__ == "__main__":
main()
1.2.2 通过子类创建线程
import time
import datetime
import threading
date_time_format = "%H:%M:%S"
def get_time_str():
now = datetime.datetime.now()
return datetime.datetime.strftime(now, date_time_format)
class MyThread(threading.Thread):
def __init__(self, thread_id):
super(MyThread, self).__init__()
self.thread_id = thread_id
def run(self):
print("Thread %d\t start at %s" % (self.thread_id, get_time_str()))
print("Thread %d\t sleep" % self.thread_id)
time.sleep(4)
print("Thread %d\t finish at %s" % (self.thread_id, get_time_str()))
def main():
print("Main Thread start at %s" % get_time_str())
threads = []
# 创建线程
for i in range(5):
thread = MyThread(i)
threads.append(thread)
# 启动线程
for i in range(5):
threads[i].start()
time.sleep(1)
# 等待线程执行完毕
for i in range(5):
threads[i].join()
print("Main Thread finish at %s" % get_time_str())
if __name__ == '__main__':
main()
1.3 线程同步
import time
import threading
thread_lock = None
class MyThread(threading.Thread):
def __init__(self, thread_id):
super(MyThread, self).__init__()
self.thread_id = thread_id
def run(self):
# 锁定
thread_lock.acquire()
for i in range(3):
print("Thread %d\t printing! times:%d" % (self.thread_id, i))
# 释放
thread_lock.release()
time.sleep(1)
# 锁定
thread_lock.acquire()
for i in range(3):
print("Thread %d\t printing! times:%d" % (self.thread_id, i))
# 释放
thread_lock.release()
def main():
print("Main Thread start")
threads = []
# 创建线程
for i in range(5):
thread = MyThread(i)
threads.append(thread)
# 启动线程
for i in range(5):
threads[i].start()
# 等待线程执行完毕
for i in range(5):
threads[i].join()
print("Main Thread finish")
if __name__ == '__main__':
# 获取锁
thread_lock = threading.Lock()
main()
1.4 队列
# 1. 队列使用
print('--------------------------------------- 1 ---------------------------------------')
from queue import Queue
q = Queue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get())
#
# 2. 队列结合线程使用
print('--------------------------------------- 2 ---------------------------------------')
import time
import threading
import queue
# 创建工作队列并且限制队列的最大元素个数为 10
work_queue = queue.Queue(maxsize=10)
# 创建结果队列并且限制队列的最大元素个数为 10
result_queue = queue.Queue(maxsize=10)
class WorkThread(threading.Thread):
def __init__(self, thread_id):
super(WorkThread, self).__init__()
self.thread_id = thread_id
def run(self):
while not work_queue.empty():
# 从工作队列中获取数据
work = work_queue.get()
# 模拟工作耗时3秒
time.sleep(3)
out = "Thread %d\t received %s" % (self.thread_id, work)
# 把结果放入结果队列
result_queue.put(out)
def main():
# 工作队列放入数据
for i in range(10):
work_queue.put("message id %d" % i)
# 开启两个工作线程
for i in range(2):
thread = WorkThread(i)
thread.start()
# 输出10个结果
for i in range(10):
result = result_queue.get()
print(result)
if __name__ == '__main__':
main()
2. 进程
2.1 os 模块
2.1.1 system 函数
system 函数是最简单的创建进程的方式,函数只有一个参数,就是要执行的命令
import os
# 判断是否是 windows
if os.name == 'nt':
return_code = os.system("dir")
else:
return_code = os.system("ls")
# 判断命令返回值是否是0,0代表执行成功
if return_code == 0:
print("Run success!")
else:
print("Something wrong!")
2.1.2 fork
fork 调用系统 API 创建子进程。但是 fork 函数在 windows 上并不存在,在 linux和mac上可以使用
import os
print("Main Process ID (%s)" % os.getpid())
pid = os.fork()
if pid == 0:
print("This is child process(%s) and main process is %s." % (os.getpid, os.getppid()))
else:
print("Created a child process (%s)." % (pid, ))
2.2 subprocess 模块
2.2.1 call
import os
import subprocess
# 判断是否是 windows
if os.name == 'nt':
return_code = subprocess.call(["cmd", "/c", "dir"])
else:
return_code = subprocess.call(["ls", "-l"])
# 判断命令返回值是否是0,0代表执行成功
if return_code == 0:
print("Run success!")
else:
print("Something wrong!")
2.2.2 Popen
使用 Popen 调用外部命令
import os
import subprocess
if os.name == 'nt':
ping = subprocess.Popen("ping -n 5 www.baidu.com", shell=True, stdout=subprocess.PIPE)
else:
ping = subprocess.Popen("pint -c 5 www.baidu.com", shell=True, stdout=subprocess.PIPE)
# 等待命令执行完毕
ping.wait()
# 答应外部命令的进程id
print(ping.pid)
# 答应外部命令的返回码
print(ping.returncode)
# 打印外部命令的输出内容
out = ping.stdout.read()
print(out)
2.3 multiprocessing Process
2.3.1 创建进程
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('Python',))
p.start()
p.join()
2.3.2 使用子类创建子进程
from multiprocessing import Process
import os
class MyProcess(Process):
def __init__(self):
super(MyProcess, self).__init__()
def run(self):
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def main():
processes = []
# 创建进程
for i in range(5):
processes.append(MyProcess())
# 启动进程
for i in range(5):
processes[i].start()
# 等待进程结束
for i in range(5):
processes[i].join()
if __name__ == '__main__':
main()
2.3.3 multiprocessing.Queue
使用 multiprocessing.Queue 实现进程同步
from multiprocessing import Process, Queue
import os
# 创建队列
result_queue = Queue()
class MyProcess(Process):
def __init__(self, q):
super(MyProcess, self).__init__()
# 获取队列
self.q = q
def run(self):
output = 'module name %s\n' % __name__
output += 'parent process: %d\n' % os.getppid()
output += 'process id: %d\n' % os.getpid()
self.q.put(output)
def main():
processes = []
# 创建进程并把队列传给进程
for i in range(5):
processes.append(MyProcess(result_queue))
# 启动进程
for i in range(5):
processes[i].start()
# 等待进程结束
for i in range(5):
processes[i].join()
while not result_queue.empty():
output = result_queue.get()
print(output)
if __name__ == '__main__':
main()
3. 进程池
3.1 multiprocessing.Pool
使用 multiprocessing 的 Pool 管理进程
import multiprocessing.pool
def process_func(process_id):
print("process id %d start" % process_id)
time.sleep(1)
print("process id %d end" % process_id)
def main():
pool = multiprocessing.Pool(processes=3)
for i in range(10):
# 向进程池中添加要执行的任务
pool.apply_async(process_func, args=(i,))
# 先调用close关闭进程池,不能再有新任务加入进程池中
pool.close()
# join函数等待所有子进程结束
pool.join()
if __name__ == '__main__':
main()
3.2 使用 Pool 的map函数
import multiprocessing.pool
def process_func(process_id):
print("process id %d start" % process_id)
time.sleep(1)
print("process id %d end" % process_id)
def main():
pool = multiprocessing.Pool(processes=3)
pool.map(process_func, range(10))
# 先调用close关闭进程池,不能再有新任务加入进程池中
pool.close()
# join函数等待所有子进程结束
pool.join()
if __name__ == '__main__':
main()
4. 线程池
4.1 multiprocessing.dummy
使用 multiprocessing 的 dummy 管理线程
import multiprocessing.dummy
def process_func(process_id):
print("process id %d start" % process_id)
time.sleep(3)
print("process id %d end" % process_id)
def main():
pool = multiprocessing.dummy.Pool(processes=3)
for i in range(10):
# 向进程池中添加要执行的任务
pool.apply_async(process_func, args=(i,))
# 先调用close关闭进程池,不能再有新任务加入进程池中
pool.close()
# join函数等待所有子进程结束
pool.join()
4.2 使用 dummy.Pool 的map函数
import multiprocessing
import time
def process_func(process_id):
print("process id %d start" % process_id)
time.sleep(1)
print("process id %d end" % process_id)
def main():
pool = multiprocessing.dummy.Pool(processes=3)
pool.map(process_func, range(10))
# 先调用close关闭进程池,不能再有新任务加入进程池中
pool.close()
# join函数等待所有子进程结束
pool.join()
if __name__ == '__main__':
main()