# last updata: 2025-07-13 # 下载离线安装包 # mkdir lib # pip download requests whatimage tqdm opencv-python -d lib # 离线安装 # 1. 解压/lib (python>=3.9) ;或解压/lib38 (python=3.8) # 2. pip install --no-index --find-links=lib requests whatimage tqdm opencv-python # 在线安装 # pip install requests whatimage tqdm import json,logging,time,os from pathlib import Path from tkinter import filedialog, Tk import requests, whatimage from tqdm import tqdm from tempfile import NamedTemporaryFile from io import BytesIO Start_dir = Path(__file__).parent ConfigFile = Start_dir / 'ocr_config.json' AuthFile = Start_dir / 'ocr_auth.json' Log_dir = Start_dir / 'Log' Data_dir = Start_dir / 'Data' Json_Data_dir = Data_dir / 'json' Text_Data_dir = Data_dir / 'text' Fail_dir = Log_dir / 'fail' Fail_OCR_dir = Fail_dir / 'ocr' Log_dir.mkdir(exist_ok=True) Data_dir.mkdir(exist_ok=True) Json_Data_dir.mkdir(exist_ok=True) Text_Data_dir.mkdir(exist_ok=True) Fail_dir.mkdir(exist_ok=True) Fail_OCR_dir.mkdir(exist_ok=True) TimeStampStr = '%Y-%m-%d_%H.%M.%S' def get_timestamp(time_stamp_format=TimeStampStr): return time.strftime(time_stamp_format, time.localtime()) def data_to_text(data): result = '' if data.get('line_ids') is not None and data.get('chars') is not None: for i, (id_i, char_i) in enumerate(zip(data['line_ids'], data['chars'])): #处理非结尾的字符 if i < len(data['line_ids'])-1 and id_i==data['line_ids'][i+1]: result+=char_i #处理结尾处的字符 else: result+=char_i+'\n' return result def resize_image(img_path, max_length:int): import cv2 file_path_gbk = str(img_path).encode('gbk') img = cv2.imread(file_path_gbk.decode()) # img = cv2.imread(str(img_path)) height, width = img.shape[:2] resize_factor:float = max(height, width) / max_length if max(height, width) > max_length > 0 else 1.0 if resize_factor>1: img = cv2.resize(img, (round(width/resize_factor), round(height/resize_factor))) _, buffer = cv2.imencode('.jpeg', img) # 编码为JPEG字节流 img_bytes = BytesIO(buffer).getvalue() # 获取字节数据 return img_bytes, resize_factor def resize_data(data, resize_factor:float): if resize_factor > 1: if data.get('Width') is not None and data.get('Height') is not None: data['Width'] = round(data['Width'] * resize_factor) data['Height'] = round(data['Height'] * resize_factor) if data.get('coors') is not None and type(data['coors'])==list: data['coors'] = [[round(x*resize_factor) for x in coor] for coor in data['coors'] if type(coor)==list] return data def api_ocr_pro(img_path, void_value, auth_dict, config): try: access_token = auth_dict['token'] connect_timeout = config['timeout_connect'] read_timeout = config['timeout_read'] retry_times = config['retry_time'] server_type = config['server'] ocr_type = config['ocr_type'] max_length = config['max_length'] url = config['server_lst'][server_type] + f'/{ocr_type}' headers = {'Authorization': f'gjcool {access_token}'} img_name = Path(img_path).name mime = get_mime(img_path) if max_length == 0: files = [('img', (img_name, open(img_path, 'rb'), mime))] resize_factor = 1.0 else: img_bytes, resize_factor = resize_image(img_path, max_length) files = [('img', (img_name, img_bytes, mime))] data = {} i = 0 while i=retry_times or response is None: return void_value else: result = response.json() if result.get('msg') is None and result.get('detail') is None: result = resize_data(result, resize_factor) return result else: print(result) return void_value except: print('ocr_pro failed') return void_value def batch_ocr_api(path_lst, task_name, auth_dict, config): #layout, compact, area_num, row_num, , anno_open:bool=True logging.info(f'\t\t任务:{task_name}\t\tSTART\t\t总数:{len(path_lst)}') logging.info(f'\t\t序号\t用时\t字数\t列数\t大小\t宽度\t高度\t路径') #初始化记录变量 total_info = {'TimeCost':0,'CharNumber':0, 'LineNumber':0, 'ImageSize':0, 'SuccessNumber':0, 'FailNumber':0} fail_list_path = str(Fail_OCR_dir.joinpath(f'{task_name}.txt')) save_text(fail_list_path, "", False) start_time = time.time() index = 0 for path_dict in tqdm(path_lst, desc="OCR"): now_api_time = time.time() data = api_ocr_pro(path_dict['img_path'], {}, auth_dict, config) last_api_time = time.time() if data=={}: logging.warning(f"\t{index+1:<5d}\tocr failed\t{path_dict['img_path']}") save_text(fail_list_path, f"{path_dict['img_path']}\n", True) total_info['FailNumber'] += 1 else: try: with open(path_dict['json_path'], "w", encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False) text = data.get('text', data_to_text(data)) with open(path_dict['text_path'], "w", encoding='utf-8') as f: f.write(text) #序号、用时、字数、列数、大小、宽度、高度、路径 img_size = round(data['Size']/1024) #KB time_cost= last_api_time - now_api_time #s logging.info(f"\t\t{index+1:<6d}\t{time_cost:.2f}\t{data['CharNumber']:<6d}\t{data['LineNumber']:<6d}\t{img_size:<6d}\t{data['Width']:<6d}\t{data['Height']:<6d}\t{path_dict['img_path']}") total_info['TimeCost'] += time_cost total_info['CharNumber'] += data['CharNumber'] total_info['LineNumber'] += data['LineNumber'] total_info['ImageSize'] += data['Size'] total_info['SuccessNumber'] += 1 except: logging.warning(f"\t\t{index+1:<6d}\tsave data wrong\t{path_dict['img_path']}") save_text(fail_list_path, f"{path_dict['img_path']}\n", True) total_info['FailNumber'] += 1 index += 1 logging.info(f"\t\t任务:{task_name}\t\tEND") logging.info(f"\t\t总数\t总用时\t总字数\t总列数\t总大小") logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['CharNumber']:<6d}\t{total_info['LineNumber']:<6d}\t{total_info['ImageSize']:<6d}\n") def get_allfile_alldir_in_dir(path): alldir_path =[] allfile_path=[] path_tuple = os.walk(path) for dirpath, dirnames, filenames in path_tuple: for dir in dirnames: alldir_path.append(os.path.join(dirpath, dir)) for f in filenames: allfile_path.append(os.path.join(dirpath, f)) alldir_path = sorted(alldir_path) allfile_path = sorted(allfile_path) return alldir_path, allfile_path def get_token_by_login(apiid, password, url): try: payload = {'apiid':apiid, 'password':password, 'encrypt':1, 'is_long':1} response = requests.post(url, data=payload).json() token = response['access_token'] except: token = '' return token def get_mime(img_path): with open(img_path, 'rb') as f: img = f.read() mime_type = whatimage.identify_image(img) if mime_type is None or mime_type=='None': mime_type = Path(img_path).suffix.replace('.', '') return f'image/{mime_type}' def load_config(config_path): try: with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) except: print('配置文件读取失败') return None def logging_init(log_type:str, dir:Path=Log_dir, level=logging.INFO): ''' 初始化日志记录器 ''' log_dir = dir / log_type log_dir.mkdir(exist_ok=True) log_filepath = log_dir / (time.strftime("%Y-%m-%d", time.localtime()) + '.log') logging.basicConfig( filename=str(log_filepath), level=level, format="%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", encoding='utf-8' ) def prepare_ocr_dir_task_paths(dir, task_name, max_size): #创建目录 json_save_dir = Json_Data_dir.joinpath(task_name) json_save_dir.mkdir(exist_ok=True) text_save_dir = Text_Data_dir.joinpath(task_name) text_save_dir.mkdir(exist_ok=True) alldir_path, allfile_path = get_allfile_alldir_in_dir(dir) for dir_path in alldir_path: data_dir = Path(str(dir_path).replace(dir, str(json_save_dir))) data_dir.mkdir(exist_ok=True) text_dir = Path(str(dir_path).replace(dir, str(text_save_dir))) text_dir.mkdir(exist_ok=True) path_lst, fail_lst =[], [] for file_path in allfile_path: if os.path.getsize(file_path) < max_size: filename = Path(file_path).stem json_dir = Path(str(file_path).replace(dir, str(json_save_dir))).parent text_dir = Path(str(file_path).replace(dir, str(text_save_dir))).parent path_dict = {'img_path':file_path, 'json_path':str(json_dir.joinpath(f'{filename}.json')), 'text_path':str(text_dir.joinpath(f'{filename}.txt'))} path_lst.append(path_dict) else: print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB') fail_lst.append(file_path) return path_lst, fail_lst def prepare_ocr_files_task_paths(paths, task_name, max_size): json_save_dir = Json_Data_dir.joinpath(task_name) json_save_dir.mkdir(exist_ok=True) text_save_dir = Text_Data_dir.joinpath(task_name) text_save_dir.mkdir(exist_ok=True) path_lst, fail_lst =[], [] for file_path in paths: if os.path.getsize(file_path) < max_size: filename = Path(file_path).stem path_lst.append({'img_path':file_path, 'json_path':str(json_save_dir.joinpath(f'{filename}.json')), 'text_path':str(text_save_dir.joinpath(f'{filename}.txt'))}) else: print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB') fail_lst.append(file_path) return path_lst, fail_lst def prepare_ocr_list_task_paths(list_paths, task_name, max_size): img_paths = [] for lst_path in list_paths: with open(lst_path, 'r',encoding='utf-8') as f: for line in f.readlines(): img_path = line.strip() if Path(img_path).exists(): img_paths.append(img_path) path_lst, fail_lst = prepare_ocr_files_task_paths(img_paths, task_name, max_size) return path_lst, fail_lst def read_paths(pathtype='file', init_dir='./'): root = Tk() root.focus_force() root.after(10, root.withdraw) if pathtype == 'file': return filedialog.askopenfilenames(parent=root, initialdir=init_dir) elif pathtype == 'dir': return filedialog.askdirectory(parent=root, initialdir=init_dir) def save_text(filepath, content, is_add=False): if not filepath: return with open(filepath, "a" if is_add else "w",encoding='utf-8') as f: f.write(content) if __name__ == "__main__": logging_init('OCR') sub_key = input('选择图片: 1.目录; 2.文件; 3.列表. 输入其他键, 返回上层\t') while sub_key in ['1', '2', '3']: task_name = input(f'请输入任务名称. 默认取当前日期时间({TimeStampStr}):\t') if not task_name: task_name = get_timestamp(TimeStampStr) auth_dict = load_config(AuthFile) config = load_config(ConfigFile) max_size = config['max_size'] * 1024 * 1024 #图片列表:path_lst,fail_lst if sub_key in ['1']: print('请选择图片目录') dir = read_paths(pathtype='dir', init_dir=str(Start_dir)) if not dir: break print(dir) path_lst, fail_lst = prepare_ocr_dir_task_paths(dir, task_name, max_size) elif sub_key in ['2']: print('请选择图片文件') img_paths = read_paths(init_dir=str(Start_dir)) if not img_paths: break print(f'已选择{len(img_paths)}个文件') path_lst, fail_lst = prepare_ocr_files_task_paths(img_paths, task_name, max_size) elif sub_key in ['3']: print('请选择列表文件') list_paths = read_paths(init_dir=str(Start_dir)) if not list_paths: break print(f'已选择{len(list_paths)}个列表') path_lst, fail_lst = prepare_ocr_list_task_paths(list_paths, task_name, max_size) #path_lst,task_name, url, fail_lst if len(fail_lst)>0: check_size = input(f'有{len(fail_lst)}个文件体积超标, 是否停止任务: 1. 继续; 其他, 中止\t') if check_size not in ['1']: break try: batch_ocr_api(path_lst, task_name, auth_dict, config) except: print(f'{task_name}任务失败') sub_key = input('选择方式: 1.目录; 2.文件; 3.列表. 输入其他键, 返回上层\t')