#pip install -r requirements.txt import json,logging,time,os,tkinter,base64,requests,whatimage,rsa from pathlib import Path from getpass import getpass from tkinter import filedialog from tqdm import tqdm from PyPDF2 import PdfMerger, PdfReader, PdfWriter from pdf2image import convert_from_path Start_dir = Path(__file__).parent ConfigFile = Start_dir / 'config.json' Pubkey_path = str(Start_dir / 'password_pubkey.pem') Password_path = str(Start_dir / 'password_encrypt.txt') SR_EXT_lst = ['jpeg', 'png', 'tiff', 'webp'] SR_Output_lst = ['file', 'base64'] Pdf_to_Image_lst= ['JPEG', 'PNG'] Usage_API_Type_lst = ['ocr', 'sr', 'pdf'] Log_dir = Start_dir / 'Log' Data_dir = Start_dir / 'Data' Json_Data_dir = Data_dir / 'json' Text_Data_dir = Data_dir / 'text' Punct_Data_dir = Data_dir / 'punct' PDF_Data_dir = Data_dir / 'pdf' SR_Data_dir = Data_dir / 'sr' Fail_dir = Log_dir / 'fail' Fail_OCR_dir = Fail_dir / 'ocr' Fail_Punct_dir = Fail_dir / 'punct' Fail_SR_dir = Fail_dir / 'sr' Fail_PDF_dir = Fail_dir / 'pdf' Log_dir.mkdir(exist_ok=True) Data_dir.mkdir(exist_ok=True) Json_Data_dir.mkdir(exist_ok=True) Text_Data_dir.mkdir(exist_ok=True) Punct_Data_dir.mkdir(exist_ok=True) PDF_Data_dir.mkdir(exist_ok=True) SR_Data_dir.mkdir(exist_ok=True) Fail_dir.mkdir(exist_ok=True) Fail_OCR_dir.mkdir(exist_ok=True) Fail_Punct_dir.mkdir(exist_ok=True) Fail_SR_dir.mkdir(exist_ok=True) Fail_PDF_dir.mkdir(exist_ok=True) # Poppler_Path = None Poppler_Path = r'D:\poppler-0.68.0\bin' def api_area(img_path, area_num, row_num, void_value, config): try: access_token = config['token'] connect_timeout = config['timeout_connect'] read_timeout = config['timeout_read'] retry_times = config['retry_time'] server_type = config['server'] url = config['server_lst'][server_type] + '/area' headers = {'Authorization': f'gjcool {access_token}'} img_name = Path(img_path).name mime = get_mime(img_path) files = [('img', (img_name, open(img_path, 'rb'), mime))] data = {'area_num':area_num, 'row_num':row_num} i = 0 while i=retry_times or response is None: return void_value else: result = response.json() return result.get('area', void_value) except: print('area failed') return void_value def api_ocr_pro(img_path, layout, area, compact, void_value, config): try: access_token = config['token'] connect_timeout = config['timeout_connect'] read_timeout = config['timeout_read'] retry_times = config['retry_time'] server_type = config['server'] url = config['server_lst'][server_type] + '/ocr_pro' headers = {'Authorization': f'gjcool {access_token}'} img_name = Path(img_path).name mime = get_mime(img_path) files = [('img', (img_name, open(img_path, 'rb'), mime))] data = {'layout':layout, 'area':str(area), 'compact':compact} i = 0 while i=retry_times or response is None: return void_value else: result = response.json() if result.get('msg') is None and result.get('detail') is None: return result else: print(result) return void_value except: print('ocr_pro failed') return void_value def api_punct_pro(orig_path, encoding,void_value, config): try: access_token = config['token'] connect_timeout = config['timeout_connect'] read_timeout = config['timeout_read'] retry_times = config['retry_time'] server_type = config['server'] url = config['server_lst'][server_type] + '/punct_pro' headers = {'Authorization': f'gjcool {access_token}'} lines = open(orig_path, 'r', encoding=encoding).readlines() src =''.join(lines).replace('\n', '').replace('】【', '') payload = {'src':src} i = 0 while i=retry_times or response is None: return void_value else: result = response.json() if result.get('msg') is None and result.get('detail') is None: return result else: print(result) return void_value except: print('punct_pro failed') return void_value def api_pdf(img_path, data_path, pdf_path, config): try: access_token = config['token'] connect_timeout = config['timeout_connect'] read_timeout = config['timeout_read'] retry_times = config['retry_time'] server_type = config['server'] url = config['server_lst'][server_type] + '/pdf' headers = {'Authorization': f'gjcool {access_token}'} if data_path is not None: filename = Path(img_path).name mime = get_mime(img_path) files = [('img', (filename, open(img_path, 'rb'), mime)), ('data', (filename, open(data_path, 'rb'), 'application/json'))] i = 0 while i=retry_times or response is None: return False elif response.headers['content-type'] == 'application/json': print(response.json()) return False elif response.headers['content-type'] == 'application/pdf': with open(pdf_path, "wb") as f: f.write(response.content) return True else: return False except: print('pdf failed') return False def api_sr(img_path, output_path, scale, ext, output, config): try: access_token = config['token'] connect_timeout = config['timeout_connect'] read_timeout = config['timeout_read'] retry_times = config['retry_time'] server_type = config['server'] url = config['server_lst'][server_type] + '/sr' headers = {'Authorization': f'gjcool {access_token}'} img_name = Path(img_path).name mime = get_mime(img_path) files = [('img', (img_name, open(img_path, 'rb'), mime))] data = {'scale':scale, 'ext':ext, 'output':output} i = 0 while i=retry_times or response is None: return False elif response.headers['content-type'] == 'application/json': result = response.json() if output == 'base64': if output in result.keys(): # result.get(output): with open(output_path, "w", encoding='utf-8') as f: f.write(result[output]) return True else: print(result) return False elif output == 'file': print(result) return False else: print(f'ouput wrong: {output}') return False else: with open(output_path, "wb") as f: f.write(response.content) return True except: print('sr failed') return False def api_usage(api_type, void_value, config): try: access_token = config['token'] connect_timeout = config['timeout_connect'] read_timeout = config['timeout_read'] retry_times = config['retry_time'] server_type = config['server'] url = config['server_lst'][server_type] + '/usage' headers = {'Authorization': f'gjcool {access_token}'} i = 0 while i=retry_times or response is None: return void_value else: result = response.json() return result except: print('get usage failed') return False def batch_ocr_api(path_lst, layout, compact, area_num, row_num, task_name, config): #area_num, row_num, area_url logging.info(f'\t\t任务:{task_name}\t\tSTART\t\t总数:{len(path_lst)}') logging.info(f'\t\t序号\t用时\t字数\t列数\t大小\t宽度\t高度\t路径') #初始化记录变量 total_info = {'TimeCost':0,'CharNumber':0, 'LineNumber':0, 'ImageSize':0, 'SuccessNumber':0, 'FailNumber':0} fail_list_path = str(Fail_OCR_dir.joinpath(f'{task_name}.txt')) save_text(fail_list_path, "", False) start_time = time.time() index = 0 for path_dict in tqdm(path_lst, desc="OCR"): now_api_time = time.time() if area_num == 0: area = [] else: area = api_area(path_dict['img_path'], area_num, row_num, [], config) print(area) data = api_ocr_pro(path_dict['img_path'], layout, area, compact, {}, config) last_api_time = time.time() if data=={}: logging.warning(f"\t{index+1:<5d}\tocr failed\t{path_dict['img_path']}") save_text(fail_list_path, f"{path_dict['img_path']}\n", True) total_info['FailNumber'] += 1 else: try: with open(path_dict['json_path'], "w", encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False) text = data['text'] if data.get('text') is not None else data_to_text(data) with open(path_dict['text_path'], "w", encoding='utf-8') as f: f.write(text) #序号、用时、字数、列数、大小、宽度、高度、路径 # img_size = round(os.path.getsize(path_dict['img_path'])/1024) #KB img_size = round(data['Size']/1024) #KB time_cost= last_api_time - now_api_time #s logging.info(f"\t\t{index+1:<6d}\t{time_cost:.2f}\t{data['CharNumber']:<6d}\t{data['LineNumber']:<6d}\t{img_size:<6d}\t{data['Width']:<6d}\t{data['Height']:<6d}\t{path_dict['img_path']}") total_info['TimeCost'] += time_cost total_info['CharNumber'] += data['CharNumber'] total_info['LineNumber'] += data['LineNumber'] total_info['ImageSize'] += data['Size'] total_info['SuccessNumber'] += 1 except: logging.warning(f"\t\t{index+1:<6d}\tsave data wrong\t{path_dict['img_path']}") save_text(fail_list_path, f"{path_dict['img_path']}\n", True) total_info['FailNumber'] += 1 index += 1 logging.info(f"\t\t任务:{task_name}\t\tEND") logging.info(f"\t\t总数\t总用时\t总字数\t总列数\t总大小") logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['CharNumber']:<6d}\t{total_info['LineNumber']:<6d}\t{total_info['ImageSize']:<6d}\n") def batch_punct_api(path_lst, task_name, config): logging.info(f'\t\t任务:{task_name}\t\tSTART\t\t总数:{len(path_lst)}') logging.info(f'\t\t序号\t用时\t原字数\t字数\t路径') #初始化记录变量 total_info = {'TimeCost':0, 'OrigNumber':0, 'PunctNumber':0, 'SuccessNumber':0, 'FailNumber':0} fail_list_path = str(Fail_Punct_dir /(f'{task_name}.txt')) save_text(fail_list_path, "", False) start_time = time.time() index = 0 for path_dict in tqdm(path_lst, desc="Punct"): now_api_time = time.time() data = api_punct_pro(path_dict['orig_path'], path_dict['encoding'], {}, config) last_api_time = time.time() if data=={}: logging.warning(f"\t{index+1:<6d}\tocr failed\t{path_dict['orig_path']}") save_text(fail_list_path, f"{path_dict['orig_path']}\n", True) total_info['FailNumber'] += 1 else: try: text = data['text'][0] with open(path_dict['punct_path'], "w", encoding=path_dict['encoding']) as f: f.write(text) #序号、用时、字数、列数、大小、宽度、高度、路径 time_cost= last_api_time - now_api_time #s orig_num = path_dict['orig_num'] punct_num = len(list(text)) logging.info(f"\t\t{index+1:<6d}\t{time_cost:.2f}\t{orig_num:<6d}\t{punct_num:<6d}\t{path_dict['orig_path']}") total_info['TimeCost'] += time_cost total_info['OrigNumber'] += orig_num total_info['PunctNumber'] += punct_num total_info['SuccessNumber'] += 1 except: logging.warning(f"\t\t{index+1:<6d}\tsave data wrong\t{path_dict['orig_path']}") save_text(fail_list_path, f"{path_dict['orig_path']}\n", True) total_info['FailNumber'] += 1 index += 1 logging.info(f"\t\t任务:{task_name}\t\tEND") logging.info(f"\t\t总数\t总用时\t总原字数\t总字数") logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['OrigNumber']:<6d}\t{total_info['PunctNumber']:<6d}\n") def batch_pdf_api(path_lst, task_name, config): logging.info(f'\t\t任务:{task_name}\t\tSTART\t总数:{len(path_lst)}') logging.info(f'\t\t序号\t用时\t大小\t路径') #初始化记录变量 total_info = {'TimeCost':0, 'PDFSize':0, 'SuccessNumber':0, 'FailNumber':0} fail_list_path = str(Fail_PDF_dir.joinpath(f'{task_name}.txt')) save_text(fail_list_path, "", False) # access_token = get_access_token_by_refresh(refresh_token) # last_refresh_time = time.time() start_time = time.time() index = 0 for path_dict in tqdm(path_lst, desc="PDF"): # if time.time()-last_refresh_time > Access_Token_Refresh_Interval: # access_token = get_access_token_by_refresh(refresh_token) # last_refresh_time = time.time() # if access_token == '': # logging.warning(f"get access token failed") # print('get access token failed') # break now_api_time = time.time() pdf_file = api_pdf(path_dict['img_path'], path_dict['data_path'], path_dict['pdf_path'], config) last_api_time = time.time() if not pdf_file: logging.warning(f"\t{index+1:<5d}\tpdf failed\t{path_dict['img_path']}") save_text(fail_list_path, f"{path_dict['img_path']}\n", True) total_info['FailNumber'] += 1 else: #序号、用时、大小、路径 pdf_size = round(os.path.getsize(path_dict['pdf_path'])/1024) #KB time_cost= last_api_time - now_api_time #s logging.info(f"\t\t{index+1:<5d}\t{time_cost:.2f}\t{pdf_size:<6d}\t{path_dict['pdf_path']}") total_info['TimeCost'] += time_cost total_info['PDFSize'] += pdf_size total_info['SuccessNumber'] += 1 index += 1 # while time.time()-last_api_time < interval: # time.sleep(1) logging.info(f"\t\t任务:{task_name}\t\tEND") logging.info(f"\t\t总数\t总用时\t总大小") logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['PDFSize']:<8d}\n") def batch_sr_api(path_lst, scale, ext, output, task_name, config): logging.info(f'\t\t任务:{task_name}\t\tSTART\t总数:{len(path_lst)}') logging.info(f'\t\t序号\t\t用时\t\t大小\t\t路径') #初始化记录变量 total_info = {'TimeCost':0, 'SRSize':0, 'SuccessNumber':0, 'FailNumber':0} fail_list_path = str(Fail_SR_dir.joinpath(f'{task_name}.txt')) save_text(fail_list_path, "", False) start_time = time.time() index = 0 for path_dict in tqdm(path_lst, desc="SR"): now_api_time = time.time() pdf_file = api_sr(path_dict['img_path'], path_dict['sr_path'], scale, ext, output, config) last_api_time = time.time() if not pdf_file: logging.warning(f"\t{index+1:<5d}\tsr failed\t{path_dict['img_path']}") save_text(fail_list_path, f"{path_dict['img_path']}\n", True) total_info['FailNumber'] += 1 else: #序号、用时、大小、路径 sr_size = round(os.path.getsize(path_dict['sr_path'])/1024) #KB time_cost= last_api_time - now_api_time #s logging.info(f"\t\t{index+1:<5d}\t{time_cost:.2f}\t{sr_size:<6d}\t{path_dict['sr_path']}") total_info['TimeCost'] += time_cost total_info['SRSize'] += sr_size total_info['SuccessNumber'] += 1 index += 1 # while time.time()-last_api_time < interval: # time.sleep(1) logging.info(f"\t\t任务:{task_name}\t\tEND") logging.info(f"\t\t总数\t\t总用时\t总大小") logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['SRSize']:<8d}\n") def get_usage_api(api_type, config): # access_token = get_access_token_by_refresh(refresh_token) usage = api_usage(api_type, config, {}) if usage != {}: if usage.get('msg') is not None: print(usage['msg']) elif usage.get('detail') is not None: print(usage['detail']) else: result = f'本期已用: {usage["usage"]}; 本期剩余: {usage["remain"]}; 本期总量: {usage["total"]}; 历史总量: {usage["history"]}' print(result) else: print('查询失败') def check_apiid(apiid): return len(apiid) !=0 and len(apiid)==30 def check_config(config, password_path=Password_path): if not check_apiid(config['apiid']): print('注意: apiid无效, 请设置apiid') elif load_password(password_path) == '': print('注意: 密码无效,请设置密码') elif config['token'] == '': print('注意: Token无效,请刷新Token') elif config['server'] not in config['server_lst'].keys(): print('注意: 服务器无效,请设置服务器') elif config['server'] == 'local' and config['server_lst']['local'] == '': print('注意: 本地服务器为空,请设置本地服务器IP地址') def check_password(password, pubkey_path, password_encrypt_path): try: if password=='': return False encrypt_password = encrypt_by_rsa(read_key(pubkey_path), password) if encrypt_password=='': return False else: save_text(password_encrypt_path, encrypt_password) return True except: print('密码检查失败') return False def encrypt_by_rsa(pubkey, message:str): ''' 用RSA公钥加密密码, 不超过117bytes pubkey: rsa.key.PublicKey message: str return : str ''' try: if len(message.encode('utf-8')) <= 117: encrypted = rsa.encrypt(message.encode('utf-8'), pubkey) return str(base64.encodebytes(encrypted), encoding='utf-8') else: raise ValueError except ValueError: print('message length longer than 117 bytes') return '' except: print("encrypt failed") return '' def get_allfile_alldir_in_dir(path): alldir_path =[] allfile_path=[] path_tuple = os.walk(path) for dirpath, dirnames, filenames in path_tuple: for dir in dirnames: alldir_path.append(os.path.join(dirpath, dir)) for f in filenames: allfile_path.append(os.path.join(dirpath, f)) alldir_path = sorted(alldir_path) allfile_path = sorted(allfile_path) return alldir_path, allfile_path def get_token_by_login(apiid, password, url): try: payload = {'apiid':apiid, 'password':password, 'encrypt':1, 'is_long':1} response = requests.post(url, data=payload).json() token = response['access_token'] except: token = '' return token def get_mime(img_path): with open(img_path, 'rb') as f: img = f.read() mime_type = whatimage.identify_image(img) if mime_type is None or mime_type=='None': mime_type = Path(img_path).suffix.replace('.', '') return f'image/{mime_type}' def get_encodeing(text_path, encoding_detect, default_encoding): encoding = default_encoding if encoding_detect: import chardet try: data = open(text_path,'rb').read() encoding = chardet.detect(data)['encoding'] except: encoding = default_encoding return encoding def get_text_length(file_path, encoding): try: text = open(file_path, 'r', encoding=encoding).read() return len(list(text)) except: return 0 def load_config(config_path=ConfigFile): try: with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) except: print('配置文件读取失败') return None def load_password(filepath): try: return open(filepath, "r" ,encoding='utf-8').read() except: return '' def logging_init(log_type:str, dir:Path=Log_dir): ''' 初始化日志记录器 ''' log_dir = dir / log_type log_dir.mkdir(exist_ok=True) log_filepath = log_dir / (time.strftime("%Y-%m-%d", time.localtime()) + '.log') logging.basicConfig( filename=str(log_filepath), level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", encoding='utf-8' ) def pdf_merge(pdf_lst, save_path): try: merger = PdfMerger() for pdf in pdf_lst: merger.append(pdf) merger.write(save_path) merger.close() return True except: return False def prepare_ocr_dir_task_paths(dir, task_name, max_size): #创建目录 json_save_dir = Json_Data_dir.joinpath(task_name) json_save_dir.mkdir(exist_ok=True) text_save_dir = Text_Data_dir.joinpath(task_name) text_save_dir.mkdir(exist_ok=True) alldir_path, allfile_path = get_allfile_alldir_in_dir(dir) for dir_path in alldir_path: data_dir = Path(str(dir_path).replace(dir, str(json_save_dir))) data_dir.mkdir(exist_ok=True) text_dir = Path(str(dir_path).replace(dir, str(text_save_dir))) text_dir.mkdir(exist_ok=True) path_lst, fail_lst =[], [] for file_path in allfile_path: if os.path.getsize(file_path) < max_size: filename = Path(file_path).stem json_dir = Path(str(file_path).replace(dir, str(json_save_dir))).parent text_dir = Path(str(file_path).replace(dir, str(text_save_dir))).parent path_dict = {'img_path':file_path, 'json_path':str(json_dir.joinpath(f'{filename}.json')), 'text_path':str(text_dir.joinpath(f'{filename}.txt'))} path_lst.append(path_dict) else: print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB') fail_lst.append(file_path) return path_lst, fail_lst def prepare_ocr_files_task_paths(paths, task_name, max_size): json_save_dir = Json_Data_dir.joinpath(task_name) json_save_dir.mkdir(exist_ok=True) text_save_dir = Text_Data_dir.joinpath(task_name) text_save_dir.mkdir(exist_ok=True) path_lst, fail_lst =[], [] for file_path in paths: if os.path.getsize(file_path) < max_size: filename = Path(file_path).stem path_lst.append({'img_path':file_path, 'json_path':str(json_save_dir.joinpath(f'{filename}.json')), 'text_path':str(text_save_dir.joinpath(f'{filename}.txt'))}) else: print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB') fail_lst.append(file_path) return path_lst, fail_lst def prepare_punct_dir_task_paths(dir, task_name, config): detect_encoding = config['punct_detect_encoding'] default_encoding = config['punct_default_encoding'] max_length = config['punct_max_length'] #创建目录 save_dir = Punct_Data_dir / task_name save_dir.mkdir(exist_ok=True) alldir_path, allfile_path = get_allfile_alldir_in_dir(dir) for dir_path in alldir_path: punct_dir = Path(str(dir_path).replace(dir, str(save_dir))) punct_dir.mkdir(exist_ok=True) path_lst, fail_lst =[], [] for file_path in allfile_path: encoding = get_encodeing(file_path, detect_encoding, default_encoding) num = get_text_length(file_path, encoding) if num == 0: print(f'{file_path}读取失败') fail_lst.append(file_path) elif num > max_length: print(f'{file_path}长度过大, , 超过最大限量{max_length}字') fail_lst.append(file_path) else: filename = Path(file_path).stem punct_dir = Path(str(file_path).replace(dir, str(save_dir))).parent path_dict = {'orig_path':file_path, 'punct_path':str(punct_dir.joinpath(f'{filename}.txt')), 'encoding':encoding, 'orig_num':num} path_lst.append(path_dict) return path_lst, fail_lst def prepare_punct_files_task_paths(paths, task_name, config): detect_encoding = config['punct_detect_encoding'] default_encoding = config['punct_default_encoding'] max_length = config['punct_max_length'] #创建目录 save_dir = Punct_Data_dir / task_name save_dir.mkdir(exist_ok=True) path_lst, fail_lst =[], [] for file_path in paths: encoding = get_encodeing(file_path, detect_encoding, default_encoding) num = get_text_length(file_path, encoding) if num == 0: print(f'{file_path}读取失败') fail_lst.append(file_path) elif num > max_length: print(f'{file_path}长度过大, , 超过最大限量{max_length}字') fail_lst.append(file_path) else: filename = Path(file_path).stem path_dict = {'orig_path':file_path, 'punct_path':str(save_dir.joinpath(f'{filename}.txt')), 'encoding':encoding, 'orig_num':num} path_lst.append(path_dict) return path_lst, fail_lst def prepare_ocr_list_task_paths(list_paths, task_name, max_size): img_paths = [] for lst_path in list_paths: with open(lst_path, 'r',encoding='utf-8') as f: for line in f.readlines(): img_path = line.strip() if Path(img_path).exists(): img_paths.append(img_path) # [img_paths.append(line.strip()) for line in f.readlines() if len(line.strip())>0 and Path(line.strip()).exists()] path_lst, fail_lst = prepare_ocr_files_task_paths(img_paths, task_name, max_size) return path_lst, fail_lst def prepare_pdf_merge(root_dir, task_name): try: pdf_root_dir = PDF_Data_dir.joinpath(task_name) pdf_root_dir.mkdir(exist_ok=True) all_dir, all_files = get_allfile_alldir_in_dir(root_dir) if len(all_dir)>0: for dir in all_dir: #寻找最底层子目录 subdirs, item_paths = get_allfile_alldir_in_dir(dir) if len(subdirs)==0: save_path = str(pdf_root_dir.joinpath(f'{Path(dir).name}.pdf')) if pdf_merge(item_paths, save_path): print(f'{Path(save_path).name}合并完成') else: print(f'{Path(save_path).name}合并失败') else: save_path = str(pdf_root_dir.joinpath(Path(root_dir).name)) if pdf_merge(all_files, save_path): print(f'{Path(save_path).name}合并完成') else: print(f'{Path(save_path).name}合并失败') except: print(f'{task_name}失败') def prepare_pdf_split(pdf_paths, task_name): try: pdf_root_dir = PDF_Data_dir.joinpath(task_name) pdf_root_dir.mkdir(exist_ok=True) for pdf_path in tqdm(pdf_paths, desc='PDF拆分'): reader = PdfReader(pdf_path) pdf_dir = pdf_root_dir.joinpath(Path(pdf_path).stem) pdf_dir.mkdir(exist_ok=True) for i in range(len(reader.pages)): writer = PdfWriter() writer.add_page(reader.pages[i]) page_path = pdf_dir.joinpath(f'{i+1:04d}.pdf') with open(str(page_path), "wb") as fp: writer.write(fp) print(f'{task_name}完成') except: print(f'{task_name}失败') def prepare_pdf_to_image(pdf_paths, task_name, dpi=300, image_format='JPEG', first_page=None, last_page=None): try: pdf_root_dir = PDF_Data_dir.joinpath(task_name) pdf_root_dir.mkdir(exist_ok=True) for pdf_path in tqdm(pdf_paths, desc='PDF转图片'): pdf_dir = pdf_root_dir.joinpath(Path(pdf_path).stem) pdf_dir.mkdir(exist_ok=True) filename = Path(pdf_path).stem try: _ = convert_from_path(pdf_path,poppler_path=Poppler_Path, dpi=dpi, output_folder=pdf_dir, fmt=image_format, first_page=first_page, last_page=last_page, thread_count=os.cpu_count(), output_file=filename) except Exception as e: print(e.args) print(f'{pdf_path}失败') print(f'{task_name}完成') except: print(f'{task_name}失败') def prepare_pdf_dir_task_paths(img_dir, data_dir, task_name, max_size): pdf_save_dir = PDF_Data_dir.joinpath(task_name) pdf_save_dir.mkdir(exist_ok=True) all_img_dir, all_img_file = get_allfile_alldir_in_dir(img_dir) for dir in all_img_dir: pdf_dir = Path(str(dir).replace(img_dir, str(pdf_save_dir))) pdf_dir.mkdir(exist_ok=True) path_lst, fail_lst =[], [] for img_path in all_img_file: if os.path.getsize(img_path) < max_size: filename = Path(img_path).stem data_path = Path(str(img_path).replace(img_dir, data_dir)).parent.joinpath(f'{filename}.json') if data_path.exists(): pdf_path = Path(str(img_path).replace(img_dir, str(pdf_save_dir))).parent.joinpath(f'{filename}.pdf') path_lst.append({'img_path':img_path, 'data_path':str(data_path), 'pdf_path':str(pdf_path)}) else: print(f'数据不存在: {data_path}') fail_lst.append(img_path) else: print(f'{img_path}体积过大, {os.path.getsize(img_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB') fail_lst.append(img_path) return path_lst, fail_lst def prepare_pdf_files_task_paths(img_paths, data_dir, task_name, max_size): pdf_save_dir = PDF_Data_dir.joinpath(task_name) pdf_save_dir.mkdir(exist_ok=True) path_lst, fail_lst =[], [] for img_path in img_paths: if os.path.getsize(img_path) < max_size: filename = Path(img_path).stem data_path = Path(data_dir).joinpath(f'{filename}.json') if data_path.exists(): pdf_path = Path(pdf_save_dir).joinpath(f'{filename}.pdf') path_lst.append({'img_path':img_path, 'data_path':str(data_path), 'pdf_path':str(pdf_path)}) else: print(f'数据不存在: {data_path}') fail_lst.append(img_path) else: print(f'{img_path}体积过大, {os.path.getsize(img_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB') fail_lst.append(img_path) return path_lst, fail_lst def prepare_sr_dir_task_paths(img_dir, task_name, ext, output, max_size): #创建目录 sr_save_dir = SR_Data_dir.joinpath(task_name) sr_save_dir.mkdir(exist_ok=True) alldir_path, allfile_path = get_allfile_alldir_in_dir(img_dir) for dir_path in alldir_path: data_dir = Path(str(dir_path).replace(img_dir, str(sr_save_dir))) data_dir.mkdir(exist_ok=True) path_lst, fail_lst =[], [] for file_path in allfile_path: # img_PIL = Image.open(file_path) if os.path.getsize(file_path) > max_size: print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB') fail_lst.append(file_path) # elif max(img_PIL.size)>max_length: # print(f'{file_path}尺寸过大, {img_PIL.size[0]}x{img_PIL.size[1]}px, 超过最大限量{max_length}px') # fail_lst.append(file_path) else: filename = Path(file_path).stem sr_dir = Path(str(file_path).replace(img_dir, str(sr_save_dir))).parent ext_format = 'txt' if output=='base64' else ext path_lst.append( {'img_path':file_path, 'sr_path':str(sr_dir.joinpath(f'{filename}.{ext_format}'))}) return path_lst, fail_lst def prepare_sr_files_task_paths(img_paths, task_name, ext, output, max_size): #创建目录 sr_save_dir = SR_Data_dir.joinpath(task_name) sr_save_dir.mkdir(exist_ok=True) path_lst, fail_lst =[], [] for file_path in img_paths: # img_PIL = Image.open(file_path) if os.path.getsize(file_path) > max_size : print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB') fail_lst.append(file_path) # elif max(img_PIL.size)>max_length: # print(f'{file_path}尺寸过大, {img_PIL.size[0]}x{img_PIL.size[1]}px, 超过最大限量{max_length}px') # fail_lst.append(file_path) else: filename = Path(file_path).stem ext_format = 'txt' if output=='base64' else ext path_lst.append( {'img_path':file_path, 'sr_path':str(sr_save_dir.joinpath(f'{filename}.{ext_format}'))}) return path_lst, fail_lst def read_key(key_path): ''' 读取公钥文件 key_path: pem文件地址 return: rsa.key.PublicKey ''' with open(key_path, 'rb') as f: pubkey_data = f.read() return rsa.PublicKey.load_pkcs1(pubkey_data) def read_paths(pathtype='file', init_dir='./'): root = tkinter.Tk() root.focus_force() root.after(10, root.withdraw) if pathtype == 'file': return filedialog.askopenfilenames(parent=root, initialdir=init_dir) elif pathtype == 'dir': return filedialog.askdirectory(parent=root, initialdir=init_dir) def save_text(filepath, content, is_add=False): if not filepath: return with open(filepath, "a" if is_add else "w",encoding='utf-8') as f: f.write(content) def save_config(config, config_path=ConfigFile): with open(config_path, 'w', encoding='utf-8') as f: json.dump(config, f, ensure_ascii=False, indent=4) def data_to_text(data): result = '' if data.get('line_ids') is not None and data.get('chars') is not None: for i, (id_i, char_i) in enumerate(zip(data['line_ids'], data['chars'])): #处理非结尾的字符 if i < len(data['line_ids'])-1 and id_i==data['line_ids'][i+1]: result+=char_i #处理结尾处的字符 else: result+=char_i+'\n' return result if __name__ == "__main__": key = input('选择任务: 0.设置; 1.识别; 2.自动标点; 3.PDF; 4.超分辨率增强; 5.查询使用量. 输入其他键, 退出\t') while key in ['0', '1', '2', '3', '4', '5']: if key in ['0']: config = load_config() check_config(config) sub_key = input('1.设置apiid; 2.设置密码; 3.刷新Token; 4.选择服务器; 5.设置本地服务器IP地址. 输入其他键, 返回上层\t') while sub_key in ['1', '2', '3', '4', '5']: if sub_key in ['1']: config = load_config() apiid_now = config['apiid'] apiid = input(f'当前apiid: {apiid_now}. 如果重新设置, 请输入apiid, 否则请回车:\t') if len(apiid) == 0: print('放弃设置apiid') else: while not check_apiid(apiid): apiid = input('请输入apiid, 回车则放弃设置:\t') if len(apiid) == 0: print('放弃设置apiid') break config['apiid'] = apiid save_config(config) elif sub_key in ['2']: password = getpass('请输入密码,密码加密后保存于Config/password_encrypt.txt。回车则放弃设置:\t') while not check_password(password, Pubkey_path, Password_path): password = getpass('请输入密码:\t') if len(password) == 0: print('放弃设置密码') break elif sub_key in ['3']: config = load_config() apiid = config['apiid'] login_url = config['login_url'] encrypt_password = load_password(Password_path) token = get_token_by_login(apiid, encrypt_password, login_url) if len(token) == 0: print('Token刷新失败,请重新检查账号信息或网络连接') else: print('Token刷新成功') config['token'] = token save_config(config) elif sub_key in ['4']: config = load_config() server_now = config['server'] server_lst = list(config['server_lst'].keys()) server = input(f'当前服务器: {server_now}. 如果重新选择,请选择{",".join(server_lst)} , 否则请回车:\t') if len(server)>0: if server in server_lst: config['server'] = server save_config(config) else: while len(server)>0 and server not in server_lst: server = input(f'请选择{", ".join(server_lst)} , 放弃请回车:\t') if len(server) == 0: print('放弃选择服务器') break elif server in server_lst: config['server'] = server save_config(config) elif sub_key in ['5']: config = load_config() if config['server'] == 'local': local_ip = input(f"当前本地服务器: {config['server_lst']['local']}. 如果重新设置, 输入本地服务器的IP地址, 否则请回车:\t") if len(local_ip) == 0: print('放弃修改本地服务器IP') else: while len(local_ip.split('.'))!= 4: print('IP地址格式错误,请重新输入') local_ip = input(f"请输入本地服务器的IP地址, 放弃请回车:\t") if len(local_ip) == 0: break if len(local_ip) != 0: config['server_lst']['local'] = f"{config['local_head']}{local_ip}:{config['local_port']}" save_config(config) else: print('请先将服务器设置为local') config = load_config() check_config(config) sub_key = input('1.设置apiid; 2.设置密码; 3.刷新Token; 4.选择服务器; 5.设置本地服务器IP地址. 输入其他键, 返回上层\t') elif key in ['1']: logging_init('OCR') sub_key = input('选择图片: 1.目录; 2.文件; 3.列表. 输入其他键, 返回上层\t') while sub_key in ['1', '2', '3']: layout = input('请输入排版方向: 0:竖版,1:横版. 默认值:0\t') if layout not in ['0', '1']: layout = '0' compact = input('请输入Compact参数: 1, 2, 4, 6. 默认值: 1\t') if compact not in ['1', '2', '4', '6']: compact = '1' compact = int(compact) #task_name task_name = input('请输入任务名称:\t') if not task_name: break config = load_config() max_size = config['max_size'] * 1024 * 1024 #图片列表:path_lst,fail_lst if sub_key in ['1']: print('请选择图片目录') dir = read_paths(pathtype='dir', init_dir=str(Start_dir)) if not dir: break print(dir) path_lst, fail_lst = prepare_ocr_dir_task_paths(dir, task_name, max_size) elif sub_key in ['2']: print('请选择图片文件') img_paths = read_paths(init_dir=str(Start_dir)) if not img_paths: break print(f'已选择{len(img_paths)}个文件') path_lst, fail_lst = prepare_ocr_files_task_paths(img_paths, task_name, max_size) elif sub_key in ['3']: print('请选择列表文件') list_paths = read_paths(init_dir=str(Start_dir)) if not list_paths: break print(f'已选择{len(list_paths)}个列表') path_lst, fail_lst = prepare_ocr_list_task_paths(list_paths, task_name, max_size) #path_lst, layout,task_name,url, fail_lst if len(fail_lst)>0: check_size = input(f'有{len(fail_lst)}个文件体积超标, 是否停止任务: 1. 继续; 其他, 中止\t') if check_size not in ['1']: break area_check = input('如果需要自动检测封闭区域, 则输入区域数量(1-4); 不需要则跳过\t') if area_check in ['1', '2', '3', '4']: area_num = int(area_check) row_num_check = input('如果需要分栏,则输入分栏数量(2-4); 不需要则跳过\t') if row_num_check in ['2','3','4']: row_num = int(row_num_check) else: row_num = 1 else: area_num = 0 row_num = 1 try: batch_ocr_api(path_lst, layout, compact, area_num, row_num, task_name, config) except: print(f'{task_name}任务失败') sub_key = input('选择方式: 1.目录; 2.文件; 3.列表. 输入其他键, 返回上层\t') elif key in ['2']: logging_init('Punct') sub_key = input('选择文本: 1.目录; 2.文件. 输入其他键, 返回上层\t') while sub_key in ['1', '2']: task_name = input('请输入任务名称:\t') if not task_name: break config = load_config() if sub_key in ['1']: print('请选择目录') dir = read_paths(pathtype='dir', init_dir=str(Start_dir)) if not dir: break print(dir) path_lst, fail_lst = prepare_punct_dir_task_paths(dir, task_name, config) elif sub_key in ['2']: print('请选择文件') orig_paths = read_paths(init_dir=str(Start_dir)) if not orig_paths: break print(f'已选择{len(orig_paths)}个文件') path_lst, fail_lst = prepare_punct_files_task_paths(orig_paths, task_name, config) if len(fail_lst)>0: check_length = input(f'有{len(fail_lst)}个文件长度超标, 是否停止任务: 1. 继续; 其他, 中止\t') if check_length not in ['1']: break batch_punct_api(path_lst, task_name, config) sub_key = input('选择方式: 1.目录; 2.文件. 输入其他键, 返回上层\t') elif key in ['3']: logging_init('PDF') sub_key = input('选择方式: 1.按目录生成; 2.按文件生成; 3.PDF合并; 4.PDF拆分; 5.PDF转图片. 输入其他键, 返回上层\t') while sub_key in ['1', '2', '3', '4', '5']: task_name = input('请输入任务名称:\t') if not task_name: break if sub_key in ['1', '2']: config = load_config() max_size = config['max_size'] * 1024 * 1024 if sub_key in ['1']: print('请选择图片目录') img_dir = read_paths(pathtype='dir', init_dir=str(Start_dir)) if not img_dir: break print(img_dir) print('请选择数据目录') data_dir = read_paths(pathtype='dir', init_dir=str(Start_dir)) if not data_dir: print('数据为空') break else: print(data_dir) path_lst, fail_lst = prepare_pdf_dir_task_paths(img_dir, data_dir, task_name, max_size) elif sub_key in ['2']: print('请选择图片文件') img_paths = read_paths(init_dir=str(Start_dir)) if not img_paths: break print(f'已选择{len(img_paths)}个文件') print('请选择数据目录') data_dir = read_paths(pathtype='dir', init_dir=str(Start_dir)) if not data_dir: print('数据为空') break else: print(data_dir) path_lst, fail_lst = prepare_pdf_files_task_paths(img_paths, data_dir, task_name, max_size) # if len(fail_lst)>0: check_size = input(f'有{len(fail_lst)}个文件存在问题, 是否停止任务: 1. 继续; 其他, 中止\t') if check_size not in ['1']: break try: config = load_config() batch_pdf_api(path_lst, task_name, config) except: print(f'{task_name}任务失败') #合并 elif sub_key in ['3']: print('请选择PDF目录') pdf_dir = read_paths(pathtype='dir', init_dir=str(Start_dir)) if not pdf_dir: break print(pdf_dir) prepare_pdf_merge(pdf_dir, task_name) #拆分 elif sub_key in ['4']: print('请选择PDF文件') pdf_paths = read_paths(init_dir=str(Start_dir)) if not pdf_paths: break print(f'已选择{len(pdf_paths)}个文件') prepare_pdf_split(pdf_paths, task_name) #转图片 elif sub_key in ['5']: print('请选择PDF文件') pdf_paths = read_paths(init_dir=str(Start_dir)) if not pdf_paths: break print(f'已选择{len(pdf_paths)}个文件') dpi_str = input('请输入dpi:\t') try: dpi = int(dpi_str) if dpi<=0: break except: break format_str = input('请选择图片格式: 1.jpeg; 2.png.\t') if format_str not in ['1', '2']: break format_id = int(format_str)-1 image_format = Pdf_to_Image_lst[format_id] prepare_pdf_to_image(pdf_paths, task_name, dpi=dpi, image_format=image_format) sub_key = input('选择方式: 1.按目录生成; 2.按文件生成; 3.PDF合并; 4.PDF拆分; 5.PDF转图片. 输入其他键, 返回上层\t') elif key in ['4']: logging_init('SR') sub_key = input('选择方式: 1.目录; 2.文件. 输入其他键, 返回上层\t') while sub_key in ['1', '2']: task_name = input('请输入任务名称:\t') if not task_name: break scale_str = input('请输入放大倍数: 1, 2, 4. 默认值:2\t') if scale_str not in ['1','2','4']: scale_str = '2' scale = int(scale_str) ext_str = input('请输入保存图片格式: 1.jpeg, 2.png, 3.tiff, 4.webp. 默认值:jpeg\t') if ext_str not in ['1','2','3','4']: ext_str = '1' ext = SR_EXT_lst[int(ext_str)-1] output_str = input('请输入数据格式: 1.file, 2.base64. 默认值:file\t') if output_str not in ['1','2','3','4']: output_str = '1' output = SR_Output_lst[int(output_str)-1] config = load_config() max_size = config['max_size'] * 1024 * 1024 if sub_key in ['1']: print('请选择图片目录') img_dir = read_paths(pathtype='dir', init_dir=str(Start_dir)) if not img_dir: break print(img_dir) path_lst, fail_lst = prepare_sr_dir_task_paths(img_dir, task_name, ext, output, max_size) elif sub_key in ['2']: print('请选择图片文件') img_paths = read_paths(init_dir=str(Start_dir)) if not img_paths: break # print(img_paths) path_lst, fail_lst = prepare_sr_files_task_paths(img_paths, task_name, ext, output, max_size) if len(fail_lst)>0: check_size = input(f'有{len(fail_lst)}个文件存在问题, 是否停止任务: 1. 继续; 其他, 中止\t') if check_size not in ['1']: break try: config = load_config() batch_sr_api(path_lst, scale, ext, output, task_name, config) except: print(f'{task_name}任务失败') sub_key = input('选择方式: 1.目录; 2.文件. 输入其他键, 返回上层\t') elif key in ['5']: sub_key = input('选择参数: 1.ocr; 2.sr; 3.pdf. 输入其他键, 返回上层\t') while sub_key in ['1', '2', '3']: api_type = Usage_API_Type_lst[int(sub_key)-1] try: config = load_config() get_usage_api(api_type, config) except: print(f'查询失败') sub_key = input('选择参数: 1.ocr; 2.sr; 3.pdf. 输入其他键, 返回上层\t') key = input('选择任务: 0.设置; 1.OCR; 2.自动标点; 3.PDF; 4.超分辨率增强; 5.查询使用量. 输入其他键, 退出\t')