jzd/jzd_main.py

1413 lines
57 KiB
Python
Raw Permalink Normal View History

2025-08-20 16:52:27 +08:00
#pip install -r requirements.txt
import json,logging,time,os,tkinter,base64,requests,whatimage,rsa
from pathlib import Path
from getpass import getpass
from tkinter import filedialog
from tqdm import tqdm
from PyPDF2 import PdfMerger, PdfReader, PdfWriter
from pdf2image import convert_from_path
from io import BytesIO
Start_dir = Path(__file__).parent
ConfigFile = Start_dir / 'config.json'
Pubkey_path = str(Start_dir / 'password_pubkey.pem')
Password_path = str(Start_dir / 'password_encrypt.txt')
SR_EXT_lst = ['jpeg', 'png', 'tiff', 'webp']
SR_Output_lst = ['file', 'base64']
Pdf_to_Image_lst= ['JPEG', 'PNG']
Usage_API_Type_lst = ['ocr', 'sr', 'pdf']
Log_dir = Start_dir / 'Log'
Data_dir = Start_dir / 'Data'
Json_Data_dir = Data_dir / 'json'
Text_Data_dir = Data_dir / 'text'
Punct_Data_dir = Data_dir / 'punct'
PDF_Data_dir = Data_dir / 'pdf'
SR_Data_dir = Data_dir / 'sr'
Fail_dir = Log_dir / 'fail'
Fail_OCR_dir = Fail_dir / 'ocr'
Fail_Punct_dir = Fail_dir / 'punct'
Fail_SR_dir = Fail_dir / 'sr'
Fail_PDF_dir = Fail_dir / 'pdf'
Log_dir.mkdir(exist_ok=True)
Data_dir.mkdir(exist_ok=True)
Json_Data_dir.mkdir(exist_ok=True)
Text_Data_dir.mkdir(exist_ok=True)
Punct_Data_dir.mkdir(exist_ok=True)
PDF_Data_dir.mkdir(exist_ok=True)
SR_Data_dir.mkdir(exist_ok=True)
Fail_dir.mkdir(exist_ok=True)
Fail_OCR_dir.mkdir(exist_ok=True)
Fail_Punct_dir.mkdir(exist_ok=True)
Fail_SR_dir.mkdir(exist_ok=True)
Fail_PDF_dir.mkdir(exist_ok=True)
# Poppler_Path = None
Poppler_Path = r'D:\poppler-0.68.0\bin'
def resize_image(img_path, max_length:int):
"""
将图片压缩到指定的最大像素
"""
try:
import cv2
file_path_gbk = str(img_path).encode('gbk')
img = cv2.imread(file_path_gbk.decode())
if img is None:
img = cv2.imread(str(img_path))
if img is None:
return None, 1.0
height, width = img.shape[:2]
resize_factor = max(height, width) / max_length if max(height, width) > max_length > 0 else 1.0
if resize_factor > 1:
img = cv2.resize(img, (round(width/resize_factor), round(height/resize_factor)))
_, buffer = cv2.imencode('.jpeg', img) # 编码为JPEG字节流
img_bytes = BytesIO(buffer).getvalue() # 获取字节数据
return img_bytes, resize_factor
except ImportError:
print("错误需要安装opencv-python才能使用压缩功能: pip install opencv-python")
return None, 1.0
except Exception as e:
print(f"图片压缩失败: {e}")
return None, 1.0
def resize_data(data, resize_factor:float):
"""
将返回的坐标数据按比例放大到原图片尺寸
"""
if resize_factor > 1:
if data.get('Width') is not None and data.get('Height') is not None:
data['Width'] = round(data['Width'] * resize_factor)
data['Height'] = round(data['Height'] * resize_factor)
if data.get('coors') is not None and type(data['coors'])==list:
data['coors'] = [[round(x*resize_factor) for x in coor] for coor in data['coors'] if type(coor)==list]
return data
def api_area(img_path, area_num, row_num, void_value, config):
try:
access_token = config['token']
connect_timeout = config['timeout_connect']
read_timeout = config['timeout_read']
retry_times = config['retry_time']
server_type = config['server']
url = config['server_lst'][server_type] + '/area'
headers = {'Authorization': f'gjcool {access_token}'}
img_name = Path(img_path).name
mime = get_mime(img_path)
files = [('img', (img_name, open(img_path, 'rb'), mime))]
data = {'area_num':area_num, 'row_num':row_num}
i = 0
while i<retry_times:
try:
response = requests.post(url, headers=headers, data=data, files=files, timeout=(connect_timeout, read_timeout))
break
except requests.exceptions.RequestException as e:
i+=1
print(f'retry {i} times')
print(e)
if i>=retry_times or response is None:
return void_value
else:
result = response.json()
return result.get('area', void_value)
except:
print('area failed')
return void_value
def api_ocr_pro(img_path, layout, area, compact, void_value, config, compress_mode=False, max_length=2500):
try:
access_token = config['token']
connect_timeout = config['timeout_connect']
read_timeout = config['timeout_read']
retry_times = config['retry_time']
server_type = config['server']
url = config['server_lst'][server_type] + '/ocr_pro'
headers = {'Authorization': f'gjcool {access_token}'}
img_name = Path(img_path).name
mime = get_mime(img_path)
# 根据压缩模式处理图片
if compress_mode and max_length > 0:
img_bytes, resize_factor = resize_image(img_path, max_length)
if img_bytes is not None:
files = [('img', (img_name, img_bytes, mime))]
else:
# 压缩失败,使用原图片
files = [('img', (img_name, open(img_path, 'rb'), mime))]
resize_factor = 1.0
else:
files = [('img', (img_name, open(img_path, 'rb'), mime))]
resize_factor = 1.0
data = {'layout':layout, 'area':str(area), 'compact':compact}
i = 0
while i<retry_times:
try:
response = requests.post(url, headers=headers, data=data, files=files, timeout=(connect_timeout, read_timeout))
break
except requests.exceptions.RequestException as e:
i+=1
print(f'retry {i} times')
print(e)
if i>=retry_times or response is None:
return void_value
else:
result = response.json()
if result.get('msg') is None and result.get('detail') is None:
# 如果使用了压缩,需要将坐标数据放大回原尺寸
if compress_mode and resize_factor > 1:
result = resize_data(result, resize_factor)
return result
else:
print(result)
return void_value
except:
print('ocr_pro failed')
return void_value
def api_punct_pro(orig_path, encoding,void_value, config):
try:
access_token = config['token']
connect_timeout = config['timeout_connect']
read_timeout = config['timeout_read']
retry_times = config['retry_time']
server_type = config['server']
url = config['server_lst'][server_type] + '/punct_pro'
headers = {'Authorization': f'gjcool {access_token}'}
lines = open(orig_path, 'r', encoding=encoding).readlines()
src =''.join(lines).replace('\n', '').replace('】【', '')
payload = {'src':src}
i = 0
while i<retry_times:
try:
response = requests.post(url, headers=headers, data=payload, timeout=(connect_timeout, read_timeout))
break
except requests.exceptions.RequestException as e:
i+=1
print(f'retry {i} times')
print(e)
if i>=retry_times or response is None:
return void_value
else:
result = response.json()
if result.get('msg') is None and result.get('detail') is None:
return result
else:
print(result)
return void_value
except:
print('punct_pro failed')
return void_value
def api_pdf(img_path, data_path, pdf_path, config):
try:
access_token = config['token']
connect_timeout = config['timeout_connect']
read_timeout = config['timeout_read']
retry_times = config['retry_time']
server_type = config['server']
url = config['server_lst'][server_type] + '/pdf'
headers = {'Authorization': f'gjcool {access_token}'}
if data_path is not None:
filename = Path(img_path).name
mime = get_mime(img_path)
files = [('img', (filename, open(img_path, 'rb'), mime)), ('data', (filename, open(data_path, 'rb'), 'application/json'))]
i = 0
while i<retry_times:
try:
response = requests.post(url, headers=headers, files=files, timeout=(connect_timeout, read_timeout))
break
except requests.exceptions.RequestException as e:
i+=1
print(f'retry {i} times')
print(e)
if i>=retry_times or response is None:
return False
elif response.headers['content-type'] == 'application/json':
print(response.json())
return False
elif response.headers['content-type'] == 'application/pdf':
with open(pdf_path, "wb") as f:
f.write(response.content)
return True
else:
return False
except:
print('pdf failed')
return False
def api_sr(img_path, output_path, scale, ext, output, config):
try:
access_token = config['token']
connect_timeout = config['timeout_connect']
read_timeout = config['timeout_read']
retry_times = config['retry_time']
server_type = config['server']
url = config['server_lst'][server_type] + '/sr'
headers = {'Authorization': f'gjcool {access_token}'}
img_name = Path(img_path).name
mime = get_mime(img_path)
files = [('img', (img_name, open(img_path, 'rb'), mime))]
data = {'scale':scale, 'ext':ext, 'output':output}
i = 0
while i<retry_times:
try:
response= requests.post(url, headers=headers, data=data, files=files, timeout=(connect_timeout, read_timeout))
break
except requests.exceptions.RequestException as e:
i+=1
print(f'retry {i} times')
print(e)
if i>=retry_times or response is None:
return False
elif response.headers['content-type'] == 'application/json':
result = response.json()
if output == 'base64':
if output in result.keys(): # result.get(output):
with open(output_path, "w", encoding='utf-8') as f:
f.write(result[output])
return True
else:
print(result)
return False
elif output == 'file':
print(result)
return False
else:
print(f'ouput wrong: {output}')
return False
else:
with open(output_path, "wb") as f:
f.write(response.content)
return True
except:
print('sr failed')
return False
def api_usage(api_type, void_value, config):
try:
access_token = config['token']
connect_timeout = config['timeout_connect']
read_timeout = config['timeout_read']
retry_times = config['retry_time']
server_type = config['server']
url = config['server_lst'][server_type] + '/usage'
headers = {'Authorization': f'gjcool {access_token}'}
i = 0
while i<retry_times:
try:
response = requests.post(f'{url}/{api_type}', headers=headers, timeout=(connect_timeout, read_timeout))
break
except requests.exceptions.RequestException as e:
i+=1
print(f'retry {i} times')
print(e)
if i>=retry_times or response is None:
return void_value
else:
result = response.json()
return result
except:
print('get usage failed')
return False
def batch_ocr_api(path_lst, layout, compact, area_num, row_num, task_name, config, compress_mode=False, max_length=2500): #area_num, row_num, area_url
logging.info(f'\t\t任务:{task_name}\t\tSTART\t\t总数:{len(path_lst)}')
if compress_mode:
logging.info(f'\t\t压缩模式开启,最大像素: {max_length}')
logging.info(f'\t\t序号\t用时\t字数\t列数\t大小\t宽度\t高度\t路径')
#初始化记录变量
total_info = {'TimeCost':0,'CharNumber':0, 'LineNumber':0, 'ImageSize':0, 'SuccessNumber':0, 'FailNumber':0}
fail_list_path = str(Fail_OCR_dir.joinpath(f'{task_name}.txt'))
save_text(fail_list_path, "", False)
start_time = time.time()
index = 0
for path_dict in tqdm(path_lst, desc="OCR"):
now_api_time = time.time()
if area_num == 0:
area = []
else:
area = api_area(path_dict['img_path'], area_num, row_num, [], config)
print(area)
data = api_ocr_pro(path_dict['img_path'], layout, area, compact, {}, config, compress_mode, max_length)
last_api_time = time.time()
if data=={}:
logging.warning(f"\t{index+1:<5d}\tocr failed\t{path_dict['img_path']}")
save_text(fail_list_path, f"{path_dict['img_path']}\n", True)
total_info['FailNumber'] += 1
else:
try:
with open(path_dict['json_path'], "w", encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False)
text = data['text'] if data.get('text') is not None else data_to_text(data)
with open(path_dict['text_path'], "w", encoding='utf-8') as f:
f.write(text)
#序号、用时、字数、列数、大小、宽度、高度、路径
# img_size = round(os.path.getsize(path_dict['img_path'])/1024) #KB
img_size = round(data['Size']/1024) #KB
time_cost= last_api_time - now_api_time #s
logging.info(f"\t\t{index+1:<6d}\t{time_cost:.2f}\t{data['CharNumber']:<6d}\t{data['LineNumber']:<6d}\t{img_size:<6d}\t{data['Width']:<6d}\t{data['Height']:<6d}\t{path_dict['img_path']}")
total_info['TimeCost'] += time_cost
total_info['CharNumber'] += data['CharNumber']
total_info['LineNumber'] += data['LineNumber']
total_info['ImageSize'] += data['Size']
total_info['SuccessNumber'] += 1
except:
logging.warning(f"\t\t{index+1:<6d}\tsave data wrong\t{path_dict['img_path']}")
save_text(fail_list_path, f"{path_dict['img_path']}\n", True)
total_info['FailNumber'] += 1
index += 1
logging.info(f"\t\t任务:{task_name}\t\tEND")
logging.info(f"\t\t总数\t总用时\t总字数\t总列数\t总大小")
logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['CharNumber']:<6d}\t{total_info['LineNumber']:<6d}\t{total_info['ImageSize']:<6d}\n")
def batch_punct_api(path_lst, task_name, config):
logging.info(f'\t\t任务:{task_name}\t\tSTART\t\t总数:{len(path_lst)}')
logging.info(f'\t\t序号\t用时\t原字数\t字数\t路径')
#初始化记录变量
total_info = {'TimeCost':0, 'OrigNumber':0, 'PunctNumber':0, 'SuccessNumber':0, 'FailNumber':0}
fail_list_path = str(Fail_Punct_dir /(f'{task_name}.txt'))
save_text(fail_list_path, "", False)
start_time = time.time()
index = 0
for path_dict in tqdm(path_lst, desc="Punct"):
now_api_time = time.time()
data = api_punct_pro(path_dict['orig_path'], path_dict['encoding'], {}, config)
last_api_time = time.time()
if data=={}:
logging.warning(f"\t{index+1:<6d}\tocr failed\t{path_dict['orig_path']}")
save_text(fail_list_path, f"{path_dict['orig_path']}\n", True)
total_info['FailNumber'] += 1
else:
try:
text = data['text'][0]
with open(path_dict['punct_path'], "w", encoding=path_dict['encoding']) as f:
f.write(text)
#序号、用时、字数、列数、大小、宽度、高度、路径
time_cost= last_api_time - now_api_time #s
orig_num = path_dict['orig_num']
punct_num = len(list(text))
logging.info(f"\t\t{index+1:<6d}\t{time_cost:.2f}\t{orig_num:<6d}\t{punct_num:<6d}\t{path_dict['orig_path']}")
total_info['TimeCost'] += time_cost
total_info['OrigNumber'] += orig_num
total_info['PunctNumber'] += punct_num
total_info['SuccessNumber'] += 1
except:
logging.warning(f"\t\t{index+1:<6d}\tsave data wrong\t{path_dict['orig_path']}")
save_text(fail_list_path, f"{path_dict['orig_path']}\n", True)
total_info['FailNumber'] += 1
index += 1
logging.info(f"\t\t任务:{task_name}\t\tEND")
logging.info(f"\t\t总数\t总用时\t总原字数\t总字数")
logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['OrigNumber']:<6d}\t{total_info['PunctNumber']:<6d}\n")
def batch_pdf_api(path_lst, task_name, config):
logging.info(f'\t\t任务:{task_name}\t\tSTART\t总数:{len(path_lst)}')
logging.info(f'\t\t序号\t用时\t大小\t路径')
#初始化记录变量
total_info = {'TimeCost':0, 'PDFSize':0, 'SuccessNumber':0, 'FailNumber':0}
fail_list_path = str(Fail_PDF_dir.joinpath(f'{task_name}.txt'))
save_text(fail_list_path, "", False)
# access_token = get_access_token_by_refresh(refresh_token)
# last_refresh_time = time.time()
start_time = time.time()
index = 0
for path_dict in tqdm(path_lst, desc="PDF"):
# if time.time()-last_refresh_time > Access_Token_Refresh_Interval:
# access_token = get_access_token_by_refresh(refresh_token)
# last_refresh_time = time.time()
# if access_token == '':
# logging.warning(f"get access token failed")
# print('get access token failed')
# break
now_api_time = time.time()
pdf_file = api_pdf(path_dict['img_path'], path_dict['data_path'], path_dict['pdf_path'], config)
last_api_time = time.time()
if not pdf_file:
logging.warning(f"\t{index+1:<5d}\tpdf failed\t{path_dict['img_path']}")
save_text(fail_list_path, f"{path_dict['img_path']}\n", True)
total_info['FailNumber'] += 1
else:
#序号、用时、大小、路径
pdf_size = round(os.path.getsize(path_dict['pdf_path'])/1024) #KB
time_cost= last_api_time - now_api_time #s
logging.info(f"\t\t{index+1:<5d}\t{time_cost:.2f}\t{pdf_size:<6d}\t{path_dict['pdf_path']}")
total_info['TimeCost'] += time_cost
total_info['PDFSize'] += pdf_size
total_info['SuccessNumber'] += 1
index += 1
# while time.time()-last_api_time < interval:
# time.sleep(1)
logging.info(f"\t\t任务:{task_name}\t\tEND")
logging.info(f"\t\t总数\t总用时\t总大小")
logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['PDFSize']:<8d}\n")
def batch_sr_api(path_lst, scale, ext, output, task_name, config):
logging.info(f'\t\t任务:{task_name}\t\tSTART\t总数:{len(path_lst)}')
logging.info(f'\t\t序号\t\t用时\t\t大小\t\t路径')
#初始化记录变量
total_info = {'TimeCost':0, 'SRSize':0, 'SuccessNumber':0, 'FailNumber':0}
fail_list_path = str(Fail_SR_dir.joinpath(f'{task_name}.txt'))
save_text(fail_list_path, "", False)
start_time = time.time()
index = 0
for path_dict in tqdm(path_lst, desc="SR"):
now_api_time = time.time()
pdf_file = api_sr(path_dict['img_path'], path_dict['sr_path'], scale, ext, output, config)
last_api_time = time.time()
if not pdf_file:
logging.warning(f"\t{index+1:<5d}\tsr failed\t{path_dict['img_path']}")
save_text(fail_list_path, f"{path_dict['img_path']}\n", True)
total_info['FailNumber'] += 1
else:
#序号、用时、大小、路径
sr_size = round(os.path.getsize(path_dict['sr_path'])/1024) #KB
time_cost= last_api_time - now_api_time #s
logging.info(f"\t\t{index+1:<5d}\t{time_cost:.2f}\t{sr_size:<6d}\t{path_dict['sr_path']}")
total_info['TimeCost'] += time_cost
total_info['SRSize'] += sr_size
total_info['SuccessNumber'] += 1
index += 1
# while time.time()-last_api_time < interval:
# time.sleep(1)
logging.info(f"\t\t任务:{task_name}\t\tEND")
logging.info(f"\t\t总数\t\t总用时\t总大小")
logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['SRSize']:<8d}\n")
def get_usage_api(api_type, config):
# access_token = get_access_token_by_refresh(refresh_token)
usage = api_usage(api_type, config, {})
if usage != {}:
if usage.get('msg') is not None:
print(usage['msg'])
elif usage.get('detail') is not None:
print(usage['detail'])
else:
result = f'本期已用: {usage["usage"]}; 本期剩余: {usage["remain"]}; 本期总量: {usage["total"]}; 历史总量: {usage["history"]}'
print(result)
else:
print('查询失败')
def check_apiid(apiid):
return len(apiid) !=0 and len(apiid)==30
def check_config(config, password_path=Password_path):
if not check_apiid(config['apiid']):
print('注意: apiid无效, 请设置apiid')
elif load_password(password_path) == '':
print('注意: 密码无效,请设置密码')
elif config['token'] == '':
print('注意: Token无效请刷新Token')
elif config['server'] not in config['server_lst'].keys():
print('注意: 服务器无效,请设置服务器')
elif config['server'] == 'local' and config['server_lst']['local'] == '':
print('注意: 本地服务器为空请设置本地服务器IP地址')
def check_password(password, pubkey_path, password_encrypt_path):
try:
if password=='':
return False
encrypt_password = encrypt_by_rsa(read_key(pubkey_path), password)
if encrypt_password=='':
return False
else:
save_text(password_encrypt_path, encrypt_password)
return True
except:
print('密码检查失败')
return False
def encrypt_by_rsa(pubkey, message:str):
'''
用RSA公钥加密密码, 不超过117bytes
pubkey: rsa.key.PublicKey
message: str
return : str
'''
try:
if len(message.encode('utf-8')) <= 117:
encrypted = rsa.encrypt(message.encode('utf-8'), pubkey)
return str(base64.encodebytes(encrypted), encoding='utf-8')
else:
raise ValueError
except ValueError:
print('message length longer than 117 bytes')
return ''
except:
print("encrypt failed")
return ''
def get_allfile_alldir_in_dir(path):
alldir_path =[]
allfile_path=[]
path_tuple = os.walk(path)
for dirpath, dirnames, filenames in path_tuple:
for dir in dirnames:
alldir_path.append(os.path.join(dirpath, dir))
for f in filenames:
allfile_path.append(os.path.join(dirpath, f))
alldir_path = sorted(alldir_path)
allfile_path = sorted(allfile_path)
return alldir_path, allfile_path
def get_token_by_login(apiid, password, url):
try:
payload = {'apiid':apiid, 'password':password, 'encrypt':1, 'is_long':1}
response = requests.post(url, data=payload).json()
token = response['access_token']
except:
token = ''
return token
def get_mime(img_path):
with open(img_path, 'rb') as f:
img = f.read()
mime_type = whatimage.identify_image(img)
if mime_type is None or mime_type=='None':
mime_type = Path(img_path).suffix.replace('.', '')
return f'image/{mime_type}'
def get_encodeing(text_path, encoding_detect, default_encoding):
encoding = default_encoding
if encoding_detect:
import chardet
try:
data = open(text_path,'rb').read()
encoding = chardet.detect(data)['encoding']
except:
encoding = default_encoding
return encoding
def get_text_length(file_path, encoding):
try:
text = open(file_path, 'r', encoding=encoding).read()
return len(list(text))
except:
return 0
def load_config(config_path=ConfigFile):
try:
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except:
print('配置文件读取失败')
return None
def load_password(filepath):
try:
return open(filepath, "r" ,encoding='utf-8').read()
except:
return ''
def logging_init(log_type:str, dir:Path=Log_dir):
'''
初始化日志记录器
'''
log_dir = dir / log_type
log_dir.mkdir(exist_ok=True)
log_filepath = log_dir / (time.strftime("%Y-%m-%d", time.localtime()) + '.log')
logging.basicConfig(
filename=str(log_filepath),
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding='utf-8'
)
def pdf_merge(pdf_lst, save_path):
try:
merger = PdfMerger()
for pdf in pdf_lst:
merger.append(pdf)
merger.write(save_path)
merger.close()
return True
except:
return False
def prepare_ocr_dir_task_paths(dir, task_name, max_size):
#创建目录
json_save_dir = Json_Data_dir.joinpath(task_name)
json_save_dir.mkdir(exist_ok=True)
text_save_dir = Text_Data_dir.joinpath(task_name)
text_save_dir.mkdir(exist_ok=True)
alldir_path, allfile_path = get_allfile_alldir_in_dir(dir)
for dir_path in alldir_path:
data_dir = Path(str(dir_path).replace(dir, str(json_save_dir)))
data_dir.mkdir(exist_ok=True)
text_dir = Path(str(dir_path).replace(dir, str(text_save_dir)))
text_dir.mkdir(exist_ok=True)
path_lst, fail_lst =[], []
for file_path in allfile_path:
if os.path.getsize(file_path) < max_size:
filename = Path(file_path).stem
json_dir = Path(str(file_path).replace(dir, str(json_save_dir))).parent
text_dir = Path(str(file_path).replace(dir, str(text_save_dir))).parent
path_dict = {'img_path':file_path, 'json_path':str(json_dir.joinpath(f'{filename}.json')), 'text_path':str(text_dir.joinpath(f'{filename}.txt'))}
path_lst.append(path_dict)
else:
print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
fail_lst.append(file_path)
return path_lst, fail_lst
def prepare_ocr_files_task_paths(paths, task_name, max_size):
json_save_dir = Json_Data_dir.joinpath(task_name)
json_save_dir.mkdir(exist_ok=True)
text_save_dir = Text_Data_dir.joinpath(task_name)
text_save_dir.mkdir(exist_ok=True)
path_lst, fail_lst =[], []
for file_path in paths:
if os.path.getsize(file_path) < max_size:
filename = Path(file_path).stem
path_lst.append({'img_path':file_path, 'json_path':str(json_save_dir.joinpath(f'{filename}.json')), 'text_path':str(text_save_dir.joinpath(f'{filename}.txt'))})
else:
print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
fail_lst.append(file_path)
return path_lst, fail_lst
def prepare_punct_dir_task_paths(dir, task_name, config):
detect_encoding = config['punct_detect_encoding']
default_encoding = config['punct_default_encoding']
max_length = config['punct_max_length']
#创建目录
save_dir = Punct_Data_dir / task_name
save_dir.mkdir(exist_ok=True)
alldir_path, allfile_path = get_allfile_alldir_in_dir(dir)
for dir_path in alldir_path:
punct_dir = Path(str(dir_path).replace(dir, str(save_dir)))
punct_dir.mkdir(exist_ok=True)
path_lst, fail_lst =[], []
for file_path in allfile_path:
encoding = get_encodeing(file_path, detect_encoding, default_encoding)
num = get_text_length(file_path, encoding)
if num == 0:
print(f'{file_path}读取失败')
fail_lst.append(file_path)
elif num > max_length:
print(f'{file_path}长度过大, , 超过最大限量{max_length}')
fail_lst.append(file_path)
else:
filename = Path(file_path).stem
punct_dir = Path(str(file_path).replace(dir, str(save_dir))).parent
path_dict = {'orig_path':file_path, 'punct_path':str(punct_dir.joinpath(f'{filename}.txt')), 'encoding':encoding, 'orig_num':num}
path_lst.append(path_dict)
return path_lst, fail_lst
def prepare_punct_files_task_paths(paths, task_name, config):
detect_encoding = config['punct_detect_encoding']
default_encoding = config['punct_default_encoding']
max_length = config['punct_max_length']
#创建目录
save_dir = Punct_Data_dir / task_name
save_dir.mkdir(exist_ok=True)
path_lst, fail_lst =[], []
for file_path in paths:
encoding = get_encodeing(file_path, detect_encoding, default_encoding)
num = get_text_length(file_path, encoding)
if num == 0:
print(f'{file_path}读取失败')
fail_lst.append(file_path)
elif num > max_length:
print(f'{file_path}长度过大, , 超过最大限量{max_length}')
fail_lst.append(file_path)
else:
filename = Path(file_path).stem
path_dict = {'orig_path':file_path, 'punct_path':str(save_dir.joinpath(f'{filename}.txt')), 'encoding':encoding, 'orig_num':num}
path_lst.append(path_dict)
return path_lst, fail_lst
def prepare_ocr_list_task_paths(list_paths, task_name, max_size):
img_paths = []
for lst_path in list_paths:
with open(lst_path, 'r',encoding='utf-8') as f:
for line in f.readlines():
img_path = line.strip()
if Path(img_path).exists():
img_paths.append(img_path)
# [img_paths.append(line.strip()) for line in f.readlines() if len(line.strip())>0 and Path(line.strip()).exists()]
path_lst, fail_lst = prepare_ocr_files_task_paths(img_paths, task_name, max_size)
return path_lst, fail_lst
def prepare_pdf_merge(root_dir, task_name):
try:
pdf_root_dir = PDF_Data_dir.joinpath(task_name)
pdf_root_dir.mkdir(exist_ok=True)
all_dir, all_files = get_allfile_alldir_in_dir(root_dir)
if len(all_dir)>0:
for dir in all_dir:
#寻找最底层子目录
subdirs, item_paths = get_allfile_alldir_in_dir(dir)
if len(subdirs)==0:
save_path = str(pdf_root_dir.joinpath(f'{Path(dir).name}.pdf'))
if pdf_merge(item_paths, save_path):
print(f'{Path(save_path).name}合并完成')
else:
print(f'{Path(save_path).name}合并失败')
else:
save_path = str(pdf_root_dir.joinpath(Path(root_dir).name))
if pdf_merge(all_files, save_path):
print(f'{Path(save_path).name}合并完成')
else:
print(f'{Path(save_path).name}合并失败')
except:
print(f'{task_name}失败')
def prepare_pdf_split(pdf_paths, task_name):
try:
pdf_root_dir = PDF_Data_dir.joinpath(task_name)
pdf_root_dir.mkdir(exist_ok=True)
for pdf_path in tqdm(pdf_paths, desc='PDF拆分'):
reader = PdfReader(pdf_path)
pdf_dir = pdf_root_dir.joinpath(Path(pdf_path).stem)
pdf_dir.mkdir(exist_ok=True)
for i in range(len(reader.pages)):
writer = PdfWriter()
writer.add_page(reader.pages[i])
page_path = pdf_dir.joinpath(f'{i+1:04d}.pdf')
with open(str(page_path), "wb") as fp:
writer.write(fp)
print(f'{task_name}完成')
except:
print(f'{task_name}失败')
def prepare_pdf_to_image(pdf_paths, task_name, dpi=300, image_format='JPEG', first_page=None, last_page=None):
try:
pdf_root_dir = PDF_Data_dir.joinpath(task_name)
pdf_root_dir.mkdir(exist_ok=True)
for pdf_path in tqdm(pdf_paths, desc='PDF转图片'):
pdf_dir = pdf_root_dir.joinpath(Path(pdf_path).stem)
pdf_dir.mkdir(exist_ok=True)
filename = Path(pdf_path).stem
try:
_ = convert_from_path(pdf_path,poppler_path=Poppler_Path, dpi=dpi, output_folder=pdf_dir, fmt=image_format, first_page=first_page, last_page=last_page, thread_count=os.cpu_count(), output_file=filename)
except Exception as e:
print(e.args)
print(f'{pdf_path}失败')
print(f'{task_name}完成')
except:
print(f'{task_name}失败')
def prepare_pdf_dir_task_paths(img_dir, data_dir, task_name, max_size):
pdf_save_dir = PDF_Data_dir.joinpath(task_name)
pdf_save_dir.mkdir(exist_ok=True)
all_img_dir, all_img_file = get_allfile_alldir_in_dir(img_dir)
for dir in all_img_dir:
pdf_dir = Path(str(dir).replace(img_dir, str(pdf_save_dir)))
pdf_dir.mkdir(exist_ok=True)
path_lst, fail_lst =[], []
for img_path in all_img_file:
if os.path.getsize(img_path) < max_size:
filename = Path(img_path).stem
data_path = Path(str(img_path).replace(img_dir, data_dir)).parent.joinpath(f'{filename}.json')
if data_path.exists():
pdf_path = Path(str(img_path).replace(img_dir, str(pdf_save_dir))).parent.joinpath(f'{filename}.pdf')
path_lst.append({'img_path':img_path, 'data_path':str(data_path), 'pdf_path':str(pdf_path)})
else:
print(f'数据不存在: {data_path}')
fail_lst.append(img_path)
else:
print(f'{img_path}体积过大, {os.path.getsize(img_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
fail_lst.append(img_path)
return path_lst, fail_lst
def prepare_pdf_files_task_paths(img_paths, data_dir, task_name, max_size):
pdf_save_dir = PDF_Data_dir.joinpath(task_name)
pdf_save_dir.mkdir(exist_ok=True)
path_lst, fail_lst =[], []
for img_path in img_paths:
if os.path.getsize(img_path) < max_size:
filename = Path(img_path).stem
data_path = Path(data_dir).joinpath(f'{filename}.json')
if data_path.exists():
pdf_path = Path(pdf_save_dir).joinpath(f'{filename}.pdf')
path_lst.append({'img_path':img_path, 'data_path':str(data_path), 'pdf_path':str(pdf_path)})
else:
print(f'数据不存在: {data_path}')
fail_lst.append(img_path)
else:
print(f'{img_path}体积过大, {os.path.getsize(img_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
fail_lst.append(img_path)
return path_lst, fail_lst
def prepare_sr_dir_task_paths(img_dir, task_name, ext, output, max_size):
#创建目录
sr_save_dir = SR_Data_dir.joinpath(task_name)
sr_save_dir.mkdir(exist_ok=True)
alldir_path, allfile_path = get_allfile_alldir_in_dir(img_dir)
for dir_path in alldir_path:
data_dir = Path(str(dir_path).replace(img_dir, str(sr_save_dir)))
data_dir.mkdir(exist_ok=True)
path_lst, fail_lst =[], []
for file_path in allfile_path:
# img_PIL = Image.open(file_path)
if os.path.getsize(file_path) > max_size:
print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
fail_lst.append(file_path)
# elif max(img_PIL.size)>max_length:
# print(f'{file_path}尺寸过大, {img_PIL.size[0]}x{img_PIL.size[1]}px, 超过最大限量{max_length}px')
# fail_lst.append(file_path)
else:
filename = Path(file_path).stem
sr_dir = Path(str(file_path).replace(img_dir, str(sr_save_dir))).parent
ext_format = 'txt' if output=='base64' else ext
path_lst.append( {'img_path':file_path, 'sr_path':str(sr_dir.joinpath(f'{filename}.{ext_format}'))})
return path_lst, fail_lst
def prepare_sr_files_task_paths(img_paths, task_name, ext, output, max_size):
#创建目录
sr_save_dir = SR_Data_dir.joinpath(task_name)
sr_save_dir.mkdir(exist_ok=True)
path_lst, fail_lst =[], []
for file_path in img_paths:
# img_PIL = Image.open(file_path)
if os.path.getsize(file_path) > max_size :
print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
fail_lst.append(file_path)
# elif max(img_PIL.size)>max_length:
# print(f'{file_path}尺寸过大, {img_PIL.size[0]}x{img_PIL.size[1]}px, 超过最大限量{max_length}px')
# fail_lst.append(file_path)
else:
filename = Path(file_path).stem
ext_format = 'txt' if output=='base64' else ext
path_lst.append( {'img_path':file_path, 'sr_path':str(sr_save_dir.joinpath(f'{filename}.{ext_format}'))})
return path_lst, fail_lst
def read_key(key_path):
'''
读取公钥文件
key_path: pem文件地址
return: rsa.key.PublicKey
'''
with open(key_path, 'rb') as f:
pubkey_data = f.read()
return rsa.PublicKey.load_pkcs1(pubkey_data)
def read_paths(pathtype='file', init_dir='./'):
root = tkinter.Tk()
root.focus_force()
root.after(10, root.withdraw)
if pathtype == 'file':
return filedialog.askopenfilenames(parent=root, initialdir=init_dir)
elif pathtype == 'dir':
return filedialog.askdirectory(parent=root, initialdir=init_dir)
def save_text(filepath, content, is_add=False):
if not filepath: return
with open(filepath, "a" if is_add else "w",encoding='utf-8') as f:
f.write(content)
def save_config(config, config_path=ConfigFile):
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=4)
def data_to_text(data):
result = ''
if data.get('line_ids') is not None and data.get('chars') is not None:
for i, (id_i, char_i) in enumerate(zip(data['line_ids'], data['chars'])):
#处理非结尾的字符
if i < len(data['line_ids'])-1 and id_i==data['line_ids'][i+1]:
result+=char_i
#处理结尾处的字符
else:
result+=char_i+'\n'
return result
if __name__ == "__main__":
key = input('选择任务: 0.设置; 1.识别; 2.自动标点; 3.PDF; 4.超分辨率增强; 5.查询使用量. 输入其他键, 退出\t')
while key in ['0', '1', '2', '3', '4', '5']:
if key in ['0']:
config = load_config()
check_config(config)
sub_key = input('1.设置apiid; 2.设置密码; 3.刷新Token; 4.选择服务器; 5.设置本地服务器IP地址. 输入其他键, 返回上层\t')
while sub_key in ['1', '2', '3', '4', '5']:
if sub_key in ['1']:
config = load_config()
apiid_now = config['apiid']
apiid = input(f'当前apiid: {apiid_now}. 如果重新设置, 请输入apiid, 否则请回车:\t')
if len(apiid) == 0:
print('放弃设置apiid')
else:
while not check_apiid(apiid):
apiid = input('请输入apiid, 回车则放弃设置:\t')
if len(apiid) == 0:
print('放弃设置apiid')
break
config['apiid'] = apiid
save_config(config)
elif sub_key in ['2']:
password = getpass('请输入密码密码加密后保存于Config/password_encrypt.txt。回车则放弃设置:\t')
while not check_password(password, Pubkey_path, Password_path):
password = getpass('请输入密码:\t')
if len(password) == 0:
print('放弃设置密码')
break
elif sub_key in ['3']:
config = load_config()
apiid = config['apiid']
login_url = config['login_url']
encrypt_password = load_password(Password_path)
token = get_token_by_login(apiid, encrypt_password, login_url)
if len(token) == 0:
print('Token刷新失败请重新检查账号信息或网络连接')
else:
print('Token刷新成功')
config['token'] = token
save_config(config)
elif sub_key in ['4']:
config = load_config()
server_now = config['server']
server_lst = list(config['server_lst'].keys())
server = input(f'当前服务器: {server_now}. 如果重新选择,请选择{",".join(server_lst)} , 否则请回车:\t')
if len(server)>0:
if server in server_lst:
config['server'] = server
save_config(config)
else:
while len(server)>0 and server not in server_lst:
server = input(f'请选择{", ".join(server_lst)} , 放弃请回车:\t')
if len(server) == 0:
print('放弃选择服务器')
break
elif server in server_lst:
config['server'] = server
save_config(config)
elif sub_key in ['5']:
config = load_config()
if config['server'] == 'local':
local_ip = input(f"当前本地服务器: {config['server_lst']['local']}. 如果重新设置, 输入本地服务器的IP地址, 否则请回车:\t")
if len(local_ip) == 0:
print('放弃修改本地服务器IP')
else:
while len(local_ip.split('.'))!= 4:
print('IP地址格式错误请重新输入')
local_ip = input(f"请输入本地服务器的IP地址, 放弃请回车:\t")
if len(local_ip) == 0:
break
if len(local_ip) != 0:
config['server_lst']['local'] = f"{config['local_head']}{local_ip}:{config['local_port']}"
save_config(config)
else:
print('请先将服务器设置为local')
config = load_config()
check_config(config)
sub_key = input('1.设置apiid; 2.设置密码; 3.刷新Token; 4.选择服务器; 5.设置本地服务器IP地址. 输入其他键, 返回上层\t')
elif key in ['1']:
logging_init('OCR')
sub_key = input('选择图片: 1.目录; 2.文件; 3.列表. 输入其他键, 返回上层\t')
while sub_key in ['1', '2', '3']:
layout = input('请输入排版方向: 0:竖版,1:横版. 默认值:0\t')
if layout not in ['0', '1']:
layout = '0'
compact = input('请输入Compact参数: 1, 2, 4, 6. 默认值: 1\t')
if compact not in ['1', '2', '4', '6']:
compact = '1'
compact = int(compact)
# 添加压缩模式选择
compress_choice = input('是否启用压缩模式(对大图片可能有更好的识别效果): 1.启用; 其他.不启用\t')
compress_mode = compress_choice in ['1']
max_length = 2500 # 默认压缩像素
if compress_mode:
max_length_input = input('请输入压缩后的最大像素(默认2500):\t')
if max_length_input.isdigit() and int(max_length_input) > 0:
max_length = int(max_length_input)
print(f'压缩模式已启用,最大像素: {max_length}')
else:
print('压缩模式未启用')
#task_name
task_name = input('请输入任务名称:\t')
if not task_name:
break
config = load_config()
max_size = config['max_size'] * 1024 * 1024
#图片列表:path_lst,fail_lst
if sub_key in ['1']:
print('请选择图片目录')
dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
if not dir:
break
print(dir)
path_lst, fail_lst = prepare_ocr_dir_task_paths(dir, task_name, max_size)
elif sub_key in ['2']:
print('请选择图片文件')
img_paths = read_paths(init_dir=str(Start_dir))
if not img_paths:
break
print(f'已选择{len(img_paths)}个文件')
path_lst, fail_lst = prepare_ocr_files_task_paths(img_paths, task_name, max_size)
elif sub_key in ['3']:
print('请选择列表文件')
list_paths = read_paths(init_dir=str(Start_dir))
if not list_paths:
break
print(f'已选择{len(list_paths)}个列表')
path_lst, fail_lst = prepare_ocr_list_task_paths(list_paths, task_name, max_size)
#path_lst, layout,task_name,url, fail_lst
if len(fail_lst)>0:
check_size = input(f'{len(fail_lst)}个文件体积超标, 是否停止任务: 1. 继续; 其他, 中止\t')
if check_size not in ['1']:
break
area_check = input('如果需要自动检测封闭区域, 则输入区域数量(1-4); 不需要则跳过\t')
if area_check in ['1', '2', '3', '4']:
area_num = int(area_check)
row_num_check = input('如果需要分栏,则输入分栏数量(2-4); 不需要则跳过\t')
if row_num_check in ['2','3','4']:
row_num = int(row_num_check)
else:
row_num = 1
else:
area_num = 0
row_num = 1
try:
batch_ocr_api(path_lst, layout, compact, area_num, row_num, task_name, config, compress_mode, max_length)
except:
print(f'{task_name}任务失败')
sub_key = input('选择方式: 1.目录; 2.文件; 3.列表. 输入其他键, 返回上层\t')
elif key in ['2']:
logging_init('Punct')
sub_key = input('选择文本: 1.目录; 2.文件. 输入其他键, 返回上层\t')
while sub_key in ['1', '2']:
task_name = input('请输入任务名称:\t')
if not task_name:
break
config = load_config()
if sub_key in ['1']:
print('请选择目录')
dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
if not dir:
break
print(dir)
path_lst, fail_lst = prepare_punct_dir_task_paths(dir, task_name, config)
elif sub_key in ['2']:
print('请选择文件')
orig_paths = read_paths(init_dir=str(Start_dir))
if not orig_paths:
break
print(f'已选择{len(orig_paths)}个文件')
path_lst, fail_lst = prepare_punct_files_task_paths(orig_paths, task_name, config)
if len(fail_lst)>0:
check_length = input(f'{len(fail_lst)}个文件长度超标, 是否停止任务: 1. 继续; 其他, 中止\t')
if check_length not in ['1']:
break
batch_punct_api(path_lst, task_name, config)
sub_key = input('选择方式: 1.目录; 2.文件. 输入其他键, 返回上层\t')
elif key in ['3']:
logging_init('PDF')
sub_key = input('选择方式: 1.按目录生成; 2.按文件生成; 3.PDF合并; 4.PDF拆分; 5.PDF转图片. 输入其他键, 返回上层\t')
while sub_key in ['1', '2', '3', '4', '5']:
task_name = input('请输入任务名称:\t')
if not task_name:
break
if sub_key in ['1', '2']:
config = load_config()
max_size = config['max_size'] * 1024 * 1024
if sub_key in ['1']:
print('请选择图片目录')
img_dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
if not img_dir:
break
print(img_dir)
print('请选择数据目录')
data_dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
if not data_dir:
print('数据为空')
break
else:
print(data_dir)
path_lst, fail_lst = prepare_pdf_dir_task_paths(img_dir, data_dir, task_name, max_size)
elif sub_key in ['2']:
print('请选择图片文件')
img_paths = read_paths(init_dir=str(Start_dir))
if not img_paths:
break
print(f'已选择{len(img_paths)}个文件')
print('请选择数据目录')
data_dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
if not data_dir:
print('数据为空')
break
else:
print(data_dir)
path_lst, fail_lst = prepare_pdf_files_task_paths(img_paths, data_dir, task_name, max_size)
#
if len(fail_lst)>0:
check_size = input(f'{len(fail_lst)}个文件存在问题, 是否停止任务: 1. 继续; 其他, 中止\t')
if check_size not in ['1']:
break
try:
config = load_config()
batch_pdf_api(path_lst, task_name, config)
except:
print(f'{task_name}任务失败')
#合并
elif sub_key in ['3']:
print('请选择PDF目录')
pdf_dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
if not pdf_dir:
break
print(pdf_dir)
prepare_pdf_merge(pdf_dir, task_name)
#拆分
elif sub_key in ['4']:
print('请选择PDF文件')
pdf_paths = read_paths(init_dir=str(Start_dir))
if not pdf_paths:
break
print(f'已选择{len(pdf_paths)}个文件')
prepare_pdf_split(pdf_paths, task_name)
#转图片
elif sub_key in ['5']:
print('请选择PDF文件')
pdf_paths = read_paths(init_dir=str(Start_dir))
if not pdf_paths:
break
print(f'已选择{len(pdf_paths)}个文件')
dpi_str = input('请输入dpi:\t')
try:
dpi = int(dpi_str)
if dpi<=0:
break
except:
break
format_str = input('请选择图片格式: 1.jpeg; 2.png.\t')
if format_str not in ['1', '2']:
break
format_id = int(format_str)-1
image_format = Pdf_to_Image_lst[format_id]
prepare_pdf_to_image(pdf_paths, task_name, dpi=dpi, image_format=image_format)
sub_key = input('选择方式: 1.按目录生成; 2.按文件生成; 3.PDF合并; 4.PDF拆分; 5.PDF转图片. 输入其他键, 返回上层\t')
elif key in ['4']:
logging_init('SR')
sub_key = input('选择方式: 1.目录; 2.文件. 输入其他键, 返回上层\t')
while sub_key in ['1', '2']:
task_name = input('请输入任务名称:\t')
if not task_name:
break
scale_str = input('请输入放大倍数: 1, 2, 4. 默认值:2\t')
if scale_str not in ['1','2','4']:
scale_str = '2'
scale = int(scale_str)
ext_str = input('请输入保存图片格式: 1.jpeg, 2.png, 3.tiff, 4.webp. 默认值:jpeg\t')
if ext_str not in ['1','2','3','4']:
ext_str = '1'
ext = SR_EXT_lst[int(ext_str)-1]
output_str = input('请输入数据格式: 1.file, 2.base64. 默认值:file\t')
if output_str not in ['1','2','3','4']:
output_str = '1'
output = SR_Output_lst[int(output_str)-1]
config = load_config()
max_size = config['max_size'] * 1024 * 1024
if sub_key in ['1']:
print('请选择图片目录')
img_dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
if not img_dir:
break
print(img_dir)
path_lst, fail_lst = prepare_sr_dir_task_paths(img_dir, task_name, ext, output, max_size)
elif sub_key in ['2']:
print('请选择图片文件')
img_paths = read_paths(init_dir=str(Start_dir))
if not img_paths:
break
# print(img_paths)
path_lst, fail_lst = prepare_sr_files_task_paths(img_paths, task_name, ext, output, max_size)
if len(fail_lst)>0:
check_size = input(f'{len(fail_lst)}个文件存在问题, 是否停止任务: 1. 继续; 其他, 中止\t')
if check_size not in ['1']:
break
try:
config = load_config()
batch_sr_api(path_lst, scale, ext, output, task_name, config)
except:
print(f'{task_name}任务失败')
sub_key = input('选择方式: 1.目录; 2.文件. 输入其他键, 返回上层\t')
elif key in ['5']:
sub_key = input('选择参数: 1.ocr; 2.sr; 3.pdf. 输入其他键, 返回上层\t')
while sub_key in ['1', '2', '3']:
api_type = Usage_API_Type_lst[int(sub_key)-1]
try:
config = load_config()
get_usage_api(api_type, config)
except:
print(f'查询失败')
sub_key = input('选择参数: 1.ocr; 2.sr; 3.pdf. 输入其他键, 返回上层\t')
key = input('选择任务: 0.设置; 1.OCR; 2.自动标点; 3.PDF; 4.超分辨率增强; 5.查询使用量. 输入其他键, 退出\t')