1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
| import os import csv import glob import requests import pandas as pd from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock
directory_path = r'/' output_path = f'output.xlsx'
tmp_dir = os.path.join(directory_path, "tmp_results") os.makedirs(tmp_dir, exist_ok=True)
lock = Lock()
def load_completed_inputs(file_flag): if os.path.exists(file_flag): with open(file_flag, 'r') as f: return set(line.strip() for line in f if line.strip()) return set()
def append_completed_input(file_flag, input_value): with open(file_flag, 'a') as f: f.write(f"{input_value}\n")
def process_single_entry(entry): return [entry]
def save_record_to_csv(record): input_value = record.get("uuid", "unknown") filename = os.path.join(tmp_dir, f"{input_value}.csv")
with lock: file_exists = os.path.exists(filename) with open(filename, 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=record.keys()) if not file_exists: writer.writeheader() writer.writerow(record)
def merge_csv_outputs(tmp_dir, output_path): grouped_data = [] csv_files = glob.glob(os.path.join(tmp_dir, "*.csv")) for file in csv_files: df = pd.read_csv(file) grouped_data.append(df)
with pd.ExcelWriter(output_path, engine='openpyxl') as writer: for dfs in grouped_data: full_df = pd.concat(dfs, ignore_index=True) full_df.to_excel(writer, index=False)
print(f"saving to {output_path}")
file_list = [os.path.join(directory_path, f) for f in os.listdir(directory_path) if f.endswith('.csv')]
for path in sorted(file_list): print(f"{path}")
with open(path, 'r') as f: inputs = [line.strip() for line in f if line.strip()]
file_flag = path + ".done" completed_inputs = load_completed_inputs(file_flag)
if len(completed_inputs) >= len(inputs): print(f"finished, ignore{path}") continue
pending_inputs = [x for x in inputs if x not in completed_inputs] print(f"left {len(pending_inputs)} ")
success_count = 0
try: with ThreadPoolExecutor(max_workers=5) as executor: futures = {executor.submit(process_single_entry, x): x for x in pending_inputs}
for i, future in enumerate(as_completed(futures), 1): input_value = futures[future] try: input_value, found, records = future.result() if found: success_count += 1 for rec in records: save_record_to_csv(rec) print(f"{success_count})") else: print(f"{input_value}") append_completed_input(file_flag, input_value) except Exception as e: print({e})
except KeyboardInterrupt: merge_csv_to_excel(tmp_dir, output_path) raise
|