Python多线程和多进程深度解析

张开发
2026/4/14 2:38:38 15 分钟阅读

分享文章

Python多线程和多进程深度解析
Python多线程和多进程深度解析作为一名从后端开发转向Rust的开发者我发现Python的多线程和多进程与Rust的并发模型有很多相似之处但也有一些不同。Python的多线程和多进程可以用于并发执行任务提高程序的性能。今天我想分享一下我对Python多线程和多进程的理解和实践。多线程的基本概念线程是进程内的一个执行单元多个线程可以共享进程的资源。Python的多线程通过threading模块实现。import threading import time def worker(): print(fWorker thread {threading.current_thread().name} started) time.sleep(1) print(fWorker thread {threading.current_thread().name} completed) # 创建线程 threads [] for i in range(5): t threading.Thread(targetworker, namefThread-{i}) threads.append(t) t.start() # 等待所有线程完成 for t in threads: t.join() print(All threads completed)多进程的基本概念进程是操作系统分配资源的基本单位每个进程有自己的内存空间。Python的多进程通过multiprocessing模块实现。import multiprocessing import time def worker(): print(fWorker process {multiprocessing.current_process().name} started) time.sleep(1) print(fWorker process {multiprocessing.current_process().name} completed) # 创建进程 processes [] for i in range(5): p multiprocessing.Process(targetworker, namefProcess-{i}) processes.append(p) p.start() # 等待所有进程完成 for p in processes: p.join() print(All processes completed)线程安全由于多个线程共享进程的资源需要注意线程安全问题。Python提供了多种同步原语来解决线程安全问题。1. 锁Lockimport threading import time counter 0 lock threading.Lock() def increment(): global counter for _ in range(1000000): with lock: counter 1 # 创建线程 threads [] for i in range(5): t threading.Thread(targetincrement) threads.append(t) t.start() # 等待所有线程完成 for t in threads: t.join() print(fFinal counter value: {counter})2. 条件变量Conditionimport threading import time condition threading.Condition() data [] def producer(): for i in range(5): with condition: data.append(i) print(fProduced: {i}) condition.notify() # 通知消费者 time.sleep(0.5) def consumer(): for _ in range(5): with condition: while not data: condition.wait() # 等待生产者通知 item data.pop(0) print(fConsumed: {item}) # 创建线程 producer_thread threading.Thread(targetproducer) consumer_thread threading.Thread(targetconsumer) producer_thread.start() consumer_thread.start() producer_thread.join() consumer_thread.join()3. 信号量Semaphoreimport threading import time # 最多允许3个线程同时访问 semaphore threading.Semaphore(3) def worker(i): with semaphore: print(fWorker {i} started) time.sleep(1) print(fWorker {i} completed) # 创建线程 threads [] for i in range(10): t threading.Thread(targetworker, args(i,)) threads.append(t) t.start() # 等待所有线程完成 for t in threads: t.join() print(All workers completed)进程间通信由于进程之间不共享内存需要使用进程间通信IPC机制来交换数据。1. 队列Queueimport multiprocessing import time def producer(queue): for i in range(5): queue.put(i) print(fProduced: {i}) time.sleep(0.5) def consumer(queue): for _ in range(5): item queue.get() print(fConsumed: {item}) time.sleep(1) # 创建队列 queue multiprocessing.Queue() # 创建进程 producer_process multiprocessing.Process(targetproducer, args(queue,)) consumer_process multiprocessing.Process(targetconsumer, args(queue,)) producer_process.start() consumer_process.start() producer_process.join() consumer_process.join()2. 管道Pipeimport multiprocessing import time def sender(conn): for i in range(5): conn.send(i) print(fSent: {i}) time.sleep(0.5) conn.close() def receiver(conn): while True: try: item conn.recv() print(fReceived: {item}) time.sleep(1) except EOFError: break # 创建管道 parent_conn, child_conn multiprocessing.Pipe() # 创建进程 sender_process multiprocessing.Process(targetsender, args(child_conn,)) receiver_process multiprocessing.Process(targetreceiver, args(parent_conn,)) sender_process.start() receiver_process.start() sender_process.join() receiver_process.join()3. 共享内存Shared Memoryimport multiprocessing import time def increment(counter): for _ in range(1000000): with counter.get_lock(): counter.value 1 # 创建共享内存 counter multiprocessing.Value(i, 0) # 创建进程 processes [] for i in range(5): p multiprocessing.Process(targetincrement, args(counter,)) processes.append(p) p.start() # 等待所有进程完成 for p in processes: p.join() print(fFinal counter value: {counter.value})线程池和进程池使用线程池和进程池可以更高效地管理线程和进程。1. 线程池import concurrent.futures import time def worker(i): print(fWorker {i} started) time.sleep(1) print(fWorker {i} completed) return i * 2 # 创建线程池 with concurrent.futures.ThreadPoolExecutor(max_workers3) as executor: # 提交任务 futures [executor.submit(worker, i) for i in range(10)] # 收集结果 results [future.result() for future in concurrent.futures.as_completed(futures)] print(fResults: {results})2. 进程池import concurrent.futures import time def worker(i): print(fWorker {i} started) time.sleep(1) print(fWorker {i} completed) return i * 2 # 创建进程池 with concurrent.futures.ProcessPoolExecutor(max_workers3) as executor: # 提交任务 futures [executor.submit(worker, i) for i in range(10)] # 收集结果 results [future.result() for future in concurrent.futures.as_completed(futures)] print(fResults: {results})多线程与多进程的选择适合使用多线程的场景I/O密集型任务如文件操作、网络请求等因为线程在等待I/O操作时会释放GIL允许其他线程执行。需要共享内存的任务线程之间共享内存数据交换更高效。启动速度快的任务线程的启动速度比进程快。适合使用多进程的场景CPU密集型任务如数学计算、图像处理等因为多进程可以利用多核CPU避免GIL的限制。需要隔离的任务进程之间相互隔离一个进程崩溃不会影响其他进程。需要大量内存的任务每个进程有自己的内存空间避免内存竞争。多线程和多进程与Rust的对比相似之处都支持并发执行任务都提供了同步原语来解决并发问题都支持线程池和进程池都可以处理I/O密集型和CPU密集型任务不同之处Python的多线程受GIL限制而Rust的线程没有此限制Python的多进程启动较慢而Rust的线程启动较快Python的并发模型相对简单而Rust的并发模型更加安全和强大Python的多线程和多进程使用不同的模块而Rust的并发功能集成在标准库中实战案例使用多线程和多进程处理任务import concurrent.futures import time import requests # 模拟I/O密集型任务 def fetch_url(url): response requests.get(url) return url, response.status_code # 模拟CPU密集型任务 def compute_factorial(n): result 1 for i in range(1, n 1): result * i return n, result def main(): # 测试I/O密集型任务使用线程池 urls [ https://www.google.com, https://www.github.com, https://www.python.org, https://www.rust-lang.org, https://www.csdn.net ] print(Testing I/O密集型任务 with ThreadPoolExecutor:) start_time time.time() with concurrent.futures.ThreadPoolExecutor(max_workers5) as executor: results list(executor.map(fetch_url, urls)) for url, status_code in results: print(f{url}: {status_code}) print(fI/O密集型任务 took {time.time() - start_time:.2f} seconds) # 测试CPU密集型任务使用进程池 numbers [100000, 200000, 300000, 400000, 500000] print(\nTesting CPU密集型任务 with ProcessPoolExecutor:) start_time time.time() with concurrent.futures.ProcessPoolExecutor(max_workers5) as executor: results list(executor.map(compute_factorial, numbers)) for n, result in results: print(f{n}! has {len(str(result))} digits) print(fCPU密集型任务 took {time.time() - start_time:.2f} seconds) if __name__ __main__: main()总结Python的多线程和多进程是一种强大的工具它们可以帮助我们并发执行任务提高程序的性能。通过threading和multiprocessing模块我们可以轻松地创建和管理线程

更多文章