Python开发:并发场景下的断点续传探索

最近有一个小需求需要实现:需要将批量并发返回的数据写入Excel,并且实现断点续传的功能。以下是原代码:

1
2
3
4
5
6
7
8
9
10
11
12
def write_entry_to_excel(entry):
try:
df = pd.DataFrame([entry])
with thread_lock:
with pd.ExcelWriter(excel_output_path, engine='openpyxl', mode='a', if_sheet_exists='overlay') as writer:
sheet_exists = sheet_name in writer.book.sheetnames
df.to_excel(writer, index=False, header=not sheet_exists)
except Exception as e:
print(f"{e}")

def process_single_entry(entry):
return [entry]

除此之外,为了实现断点续传,还需要记录一下已读写的数据:

1
2
3
4
5
6
7
8
9
def load_done_entries(done_path):
if os.path.exists(done_path):
with open(done_path, 'r') as f:
return set(line.strip() for line in f if line.strip())
return set()

def append_done_entry(done_path, entry_key):
with open(done_path, 'a') as f:
f.write(f"{entry_key}\n")

设想中,执行的时候是并发执行,返回一条数据写一条数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def process_input_file(input_path):
done_path = input_path + ".done"

with open(input_path, 'r') as f:
all_entries = [line.strip() for line in f if line.strip()]

done_entries = load_done_entries(done_path)

if len(done_entries) >= len(all_entries):
print(f"finished, ignore:{input_path}")
return

pending_entries = [entry for entry in all_entries if entry not in done_entries]

print(f"pending entries count:{len(pending_entries)}")

with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = {executor.submit(process_single_entry, entry): entry for entry in pending_entries}

for i, future in enumerate(as_completed(futures), 1):
entry = futures[future]
try:
result_list = future.result()
for result in result_list:
write_entry_to_excel(result)
append_done_entry(done_path, result.get("uuid", entry))
except Exception as e:
print(f"{e}")

if __name__ == "__main__":
input_file_path = "input.txt"
process_input_file(input_file_path)

执行的时候注意到数据缺失了,检查后发现pandas的ExcelWriter类,if_sheet_exists='overlay'默认只会在已有sheet上从首行开始写入。pandasto_excel 本身不支持自动找到最后一行并追加,需要计算起始行,并指定参数startcol来跳过。

虽然可以每次写入时都读取现有的数据行数来解决这个问题,但是效率极低。

考虑的解决方法有几种:

  1. 返回统一写内存,最后统一写入Excel
  2. 每个数据单独写在tmp文件,最后合并
  3. 存数据库

考虑到数据量过大,并不适合存内存,存数据库又略重,最后还是考虑第二种,每条数据单独写tmp,每次中断时进行merge。

最终代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import os
import csv
import glob
import requests
import pandas as pd
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock

directory_path = r'/'
output_path = f'output.xlsx'

tmp_dir = os.path.join(directory_path, "tmp_results")
os.makedirs(tmp_dir, exist_ok=True)

lock = Lock()


def load_completed_inputs(file_flag):
if os.path.exists(file_flag):
with open(file_flag, 'r') as f:
return set(line.strip() for line in f if line.strip())
return set()

def append_completed_input(file_flag, input_value):
with open(file_flag, 'a') as f:
f.write(f"{input_value}\n")

def process_single_entry(entry):
return [entry]

def save_record_to_csv(record):
input_value = record.get("uuid", "unknown")
filename = os.path.join(tmp_dir, f"{input_value}.csv")

with lock:
file_exists = os.path.exists(filename)
with open(filename, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=record.keys())
if not file_exists:
writer.writeheader()
writer.writerow(record)

def merge_csv_outputs(tmp_dir, output_path):
grouped_data = []
csv_files = glob.glob(os.path.join(tmp_dir, "*.csv"))
for file in csv_files:
df = pd.read_csv(file)
grouped_data.append(df)

with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
for dfs in grouped_data:
full_df = pd.concat(dfs, ignore_index=True)
full_df.to_excel(writer, index=False)

print(f"saving to {output_path}")


file_list = [os.path.join(directory_path, f) for f in os.listdir(directory_path) if f.endswith('.csv')]

for path in sorted(file_list):
print(f"{path}")

with open(path, 'r') as f:
inputs = [line.strip() for line in f if line.strip()]

file_flag = path + ".done"
completed_inputs = load_completed_inputs(file_flag)

if len(completed_inputs) >= len(inputs):
print(f"finished, ignore{path}")
continue

pending_inputs = [x for x in inputs if x not in completed_inputs]
print(f"left {len(pending_inputs)} ")

success_count = 0

try:
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(process_single_entry, x): x for x in pending_inputs}

for i, future in enumerate(as_completed(futures), 1):
input_value = futures[future]
try:
input_value, found, records = future.result()
if found:
success_count += 1
for rec in records:
save_record_to_csv(rec)
print(f"{success_count})")
else:
print(f"{input_value}")
append_completed_input(file_flag, input_value)
except Exception as e:
print({e})

except KeyboardInterrupt:
merge_csv_to_excel(tmp_dir, output_path)
raise

至此Excel就能正常写入了,中断后也可以进行续传。

2025/6/14 于苏州