# 多进程编程 多进程是绕过 GIL 实现真正并行计算的主要方式,适用于 CPU 密集型任务。 ## 基础使用 ### 创建进程 ```python from multiprocessing import Process import os def worker(name): print(f"Worker {name}, PID: {os.getpid()}, Parent PID: {os.getppid()}") if __name__ == '__main__': # Windows 必须 processes = [] for i in range(4): p = Process(target=worker, args=(i,)) processes.append(p) p.start() for p in processes: p.join() print(f"Main process PID: {os.getpid()}") ``` :::{warning} 在 Windows 上,多进程代码必须放在 `if __name__ == '__main__':` 块中,否则会导致无限递归创建进程。 ::: ### 进程池 ```python from multiprocessing import Pool import time def cpu_intensive(n): """CPU 密集型任务""" total = 0 for i in range(n): total += i * i return total if __name__ == '__main__': # 使用进程池 with Pool(4) as pool: # map:阻塞,保持顺序 results = pool.map(cpu_intensive, [1000000] * 8) print(f"map results: {len(results)}") # map_async:非阻塞 async_result = pool.map_async(cpu_intensive, [1000000] * 8) results = async_result.get(timeout=10) # apply:单次调用 result = pool.apply(cpu_intensive, (1000000,)) # apply_async:单次非阻塞调用 async_result = pool.apply_async(cpu_intensive, (1000000,)) result = async_result.get() # starmap:解包参数 def add(a, b): return a + b results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)]) ``` ### ProcessPoolExecutor(推荐) ```python from concurrent.futures import ProcessPoolExecutor, as_completed import time def process_data(data): """处理数据""" time.sleep(0.1) return data * 2 if __name__ == '__main__': data_list = list(range(100)) with ProcessPoolExecutor(max_workers=4) as executor: # 方式1:map results = list(executor.map(process_data, data_list)) # 方式2:submit + as_completed futures = [executor.submit(process_data, d) for d in data_list] for future in as_completed(futures): result = future.result() print(f"Got result: {result}") ``` ## 进程间通信 ### Queue(队列) ```python from multiprocessing import Process, Queue import time def producer(q): for i in range(10): q.put(f"item_{i}") time.sleep(0.1) q.put(None) # 哨兵 def consumer(q): while True: item = q.get() if item is None: break print(f"Consumed: {item}") if __name__ == '__main__': q = Queue() p1 = Process(target=producer, args=(q,)) p2 = Process(target=consumer, args=(q,)) p1.start() p2.start() p1.join() p2.join() ``` ### Pipe(管道) ```python from multiprocessing import Process, Pipe def sender(conn): conn.send("Hello from sender") conn.send([1, 2, 3]) conn.close() def receiver(conn): print(conn.recv()) # Hello from sender print(conn.recv()) # [1, 2, 3] conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p1 = Process(target=sender, args=(child_conn,)) p2 = Process(target=receiver, args=(parent_conn,)) p1.start() p2.start() p1.join() p2.join() ``` ### 共享内存 ```python from multiprocessing import Process, Value, Array import ctypes def increment_counter(counter, lock): for _ in range(10000): with lock: counter.value += 1 def modify_array(arr): for i in range(len(arr)): arr[i] = arr[i] * 2 if __name__ == '__main__': from multiprocessing import Lock # 共享值 counter = Value('i', 0) # 'i' = int lock = Lock() processes = [ Process(target=increment_counter, args=(counter, lock)) for _ in range(4) ] for p in processes: p.start() for p in processes: p.join() print(f"Counter: {counter.value}") # 40000 # 共享数组 arr = Array('d', [1.0, 2.0, 3.0, 4.0]) # 'd' = double p = Process(target=modify_array, args=(arr,)) p.start() p.join() print(f"Array: {list(arr)}") # [2.0, 4.0, 6.0, 8.0] ``` ### Manager(共享复杂对象) ```python from multiprocessing import Process, Manager def worker(shared_dict, shared_list, worker_id): shared_dict[worker_id] = f"result_{worker_id}" shared_list.append(worker_id) if __name__ == '__main__': with Manager() as manager: # Manager 支持复杂数据类型 shared_dict = manager.dict() shared_list = manager.list() processes = [ Process(target=worker, args=(shared_dict, shared_list, i)) for i in range(4) ] for p in processes: p.start() for p in processes: p.join() print(f"Dict: {dict(shared_dict)}") print(f"List: {list(shared_list)}") ``` ## 进程同步 ### Lock 和 RLock ```python from multiprocessing import Process, Lock, Value def safe_increment(counter, lock): for _ in range(10000): with lock: counter.value += 1 if __name__ == '__main__': counter = Value('i', 0) lock = Lock() processes = [ Process(target=safe_increment, args=(counter, lock)) for _ in range(4) ] for p in processes: p.start() for p in processes: p.join() print(f"Final counter: {counter.value}") # 正确: 40000 ``` ### Semaphore ```python from multiprocessing import Process, Semaphore import time def limited_resource(sem, process_id): with sem: print(f"Process {process_id} acquired resource") time.sleep(1) print(f"Process {process_id} released resource") if __name__ == '__main__': # 最多 3 个进程同时访问 sem = Semaphore(3) processes = [ Process(target=limited_resource, args=(sem, i)) for i in range(10) ] for p in processes: p.start() for p in processes: p.join() ``` ### Event ```python from multiprocessing import Process, Event import time def wait_for_event(event, process_id): print(f"Process {process_id} waiting...") event.wait() print(f"Process {process_id} got signal!") if __name__ == '__main__': event = Event() processes = [ Process(target=wait_for_event, args=(event, i)) for i in range(3) ] for p in processes: p.start() time.sleep(2) print("Setting event...") event.set() for p in processes: p.join() ``` ## 实用模式 ### 工作池模式 ```python from multiprocessing import Pool import os def init_worker(): """初始化每个 worker 进程""" print(f"Worker {os.getpid()} initialized") def process_item(item): return item * 2 if __name__ == '__main__': with Pool(4, initializer=init_worker) as pool: results = pool.map(process_item, range(10)) print(results) ``` ### 分块处理 ```python from multiprocessing import Pool def process_chunk(chunk): """处理数据块""" return [x * 2 for x in chunk] if __name__ == '__main__': data = list(range(1000)) with Pool(4) as pool: # chunksize 可以减少进程间通信开销 results = pool.map(process_chunk, [data[i:i+100] for i in range(0, len(data), 100)], chunksize=2) # 展平结果 flat_results = [item for chunk in results for item in chunk] ``` ### 超时处理 ```python from multiprocessing import Pool from multiprocessing.pool import TimeoutError def slow_function(x): import time time.sleep(x) return x if __name__ == '__main__': with Pool(2) as pool: async_result = pool.apply_async(slow_function, (10,)) try: result = async_result.get(timeout=3) except TimeoutError: print("Task timed out!") pool.terminate() # 强制终止 ``` ### 错误处理 ```python from concurrent.futures import ProcessPoolExecutor, as_completed def risky_function(x): if x == 5: raise ValueError(f"Bad value: {x}") return x * 2 if __name__ == '__main__': with ProcessPoolExecutor(max_workers=4) as executor: futures = {executor.submit(risky_function, i): i for i in range(10)} for future in as_completed(futures): x = futures[future] try: result = future.result() print(f"Input {x} -> Result {result}") except Exception as e: print(f"Input {x} raised {e}") ``` ## 注意事项 ::::{grid} 1 :gutter: 2 :::{grid-item-card} 进程 vs 线程 | 特性 | 进程 | 线程 | |------|------|------| | 创建开销 | 大 | 小 | | 内存共享 | 需要特殊机制 | 自动共享 | | GIL 影响 | 不受影响 | 受影响 | | 通信方式 | Queue/Pipe/共享内存 | 直接共享 | | 适用场景 | CPU 密集型 | I/O 密集型 | ::: :::{grid-item-card} 常见陷阱 ```python # ❌ 在函数外定义 Pool(Windows 问题) pool = Pool(4) # 错误! # ✅ 在 main 块中创建 if __name__ == '__main__': with Pool(4) as pool: pass # ❌ 传递不可 pickle 的对象 def worker(callback): # lambda 不可 pickle pass # ✅ 使用普通函数 def my_callback(): pass # ❌ 在子进程中使用全局锁 lock = Lock() # 这个锁不会在子进程间共享 # ✅ 通过参数传递锁 if __name__ == '__main__': lock = Lock() Process(target=worker, args=(lock,)).start() ``` ::: :::{grid-item-card} 性能建议 1. **减少进程间通信**:通信开销大 2. **使用合适的数据结构**:Queue vs Pipe vs 共享内存 3. **批量处理**:减少函数调用次数 4. **进程数量**:通常 = CPU 核心数 ::: ::::