1341 lines
54 KiB
Python
1341 lines
54 KiB
Python
#pip install -r requirements.txt
|
||
import json,logging,time,os,tkinter,base64,requests,whatimage,rsa
|
||
from pathlib import Path
|
||
from getpass import getpass
|
||
from tkinter import filedialog
|
||
from tqdm import tqdm
|
||
from PyPDF2 import PdfMerger, PdfReader, PdfWriter
|
||
from pdf2image import convert_from_path
|
||
|
||
Start_dir = Path(__file__).parent
|
||
ConfigFile = Start_dir / 'config.json'
|
||
Pubkey_path = str(Start_dir / 'password_pubkey.pem')
|
||
Password_path = str(Start_dir / 'password_encrypt.txt')
|
||
|
||
|
||
SR_EXT_lst = ['jpeg', 'png', 'tiff', 'webp']
|
||
SR_Output_lst = ['file', 'base64']
|
||
Pdf_to_Image_lst= ['JPEG', 'PNG']
|
||
Usage_API_Type_lst = ['ocr', 'sr', 'pdf']
|
||
|
||
|
||
Log_dir = Start_dir / 'Log'
|
||
Data_dir = Start_dir / 'Data'
|
||
|
||
Json_Data_dir = Data_dir / 'json'
|
||
Text_Data_dir = Data_dir / 'text'
|
||
Punct_Data_dir = Data_dir / 'punct'
|
||
PDF_Data_dir = Data_dir / 'pdf'
|
||
SR_Data_dir = Data_dir / 'sr'
|
||
|
||
Fail_dir = Log_dir / 'fail'
|
||
Fail_OCR_dir = Fail_dir / 'ocr'
|
||
Fail_Punct_dir = Fail_dir / 'punct'
|
||
Fail_SR_dir = Fail_dir / 'sr'
|
||
Fail_PDF_dir = Fail_dir / 'pdf'
|
||
|
||
Log_dir.mkdir(exist_ok=True)
|
||
Data_dir.mkdir(exist_ok=True)
|
||
Json_Data_dir.mkdir(exist_ok=True)
|
||
Text_Data_dir.mkdir(exist_ok=True)
|
||
Punct_Data_dir.mkdir(exist_ok=True)
|
||
PDF_Data_dir.mkdir(exist_ok=True)
|
||
SR_Data_dir.mkdir(exist_ok=True)
|
||
Fail_dir.mkdir(exist_ok=True)
|
||
Fail_OCR_dir.mkdir(exist_ok=True)
|
||
Fail_Punct_dir.mkdir(exist_ok=True)
|
||
Fail_SR_dir.mkdir(exist_ok=True)
|
||
Fail_PDF_dir.mkdir(exist_ok=True)
|
||
|
||
# Poppler_Path = None
|
||
Poppler_Path = r'D:\poppler-0.68.0\bin'
|
||
|
||
def api_area(img_path, area_num, row_num, void_value, config):
|
||
try:
|
||
access_token = config['token']
|
||
connect_timeout = config['timeout_connect']
|
||
read_timeout = config['timeout_read']
|
||
retry_times = config['retry_time']
|
||
server_type = config['server']
|
||
|
||
url = config['server_lst'][server_type] + '/area'
|
||
headers = {'Authorization': f'gjcool {access_token}'}
|
||
|
||
img_name = Path(img_path).name
|
||
mime = get_mime(img_path)
|
||
files = [('img', (img_name, open(img_path, 'rb'), mime))]
|
||
data = {'area_num':area_num, 'row_num':row_num}
|
||
|
||
i = 0
|
||
while i<retry_times:
|
||
try:
|
||
response = requests.post(url, headers=headers, data=data, files=files, timeout=(connect_timeout, read_timeout))
|
||
break
|
||
except requests.exceptions.RequestException as e:
|
||
i+=1
|
||
print(f'retry {i} times')
|
||
print(e)
|
||
|
||
if i>=retry_times or response is None:
|
||
return void_value
|
||
else:
|
||
result = response.json()
|
||
return result.get('area', void_value)
|
||
|
||
except:
|
||
print('area failed')
|
||
return void_value
|
||
|
||
def api_ocr_pro(img_path, layout, area, compact, void_value, config):
|
||
try:
|
||
access_token = config['token']
|
||
connect_timeout = config['timeout_connect']
|
||
read_timeout = config['timeout_read']
|
||
retry_times = config['retry_time']
|
||
server_type = config['server']
|
||
|
||
url = config['server_lst'][server_type] + '/ocr_pro'
|
||
headers = {'Authorization': f'gjcool {access_token}'}
|
||
|
||
img_name = Path(img_path).name
|
||
mime = get_mime(img_path)
|
||
files = [('img', (img_name, open(img_path, 'rb'), mime))]
|
||
data = {'layout':layout, 'area':str(area), 'compact':compact}
|
||
|
||
i = 0
|
||
while i<retry_times:
|
||
try:
|
||
response = requests.post(url, headers=headers, data=data, files=files, timeout=(connect_timeout, read_timeout))
|
||
break
|
||
except requests.exceptions.RequestException as e:
|
||
i+=1
|
||
print(f'retry {i} times')
|
||
print(e)
|
||
|
||
if i>=retry_times or response is None:
|
||
return void_value
|
||
else:
|
||
result = response.json()
|
||
if result.get('msg') is None and result.get('detail') is None:
|
||
return result
|
||
else:
|
||
print(result)
|
||
return void_value
|
||
except:
|
||
print('ocr_pro failed')
|
||
return void_value
|
||
|
||
def api_punct_pro(orig_path, encoding,void_value, config):
|
||
try:
|
||
access_token = config['token']
|
||
connect_timeout = config['timeout_connect']
|
||
read_timeout = config['timeout_read']
|
||
retry_times = config['retry_time']
|
||
server_type = config['server']
|
||
|
||
url = config['server_lst'][server_type] + '/punct_pro'
|
||
headers = {'Authorization': f'gjcool {access_token}'}
|
||
|
||
lines = open(orig_path, 'r', encoding=encoding).readlines()
|
||
src =''.join(lines).replace('\n', '').replace('】【', '')
|
||
payload = {'src':src}
|
||
i = 0
|
||
while i<retry_times:
|
||
try:
|
||
response = requests.post(url, headers=headers, data=payload, timeout=(connect_timeout, read_timeout))
|
||
break
|
||
except requests.exceptions.RequestException as e:
|
||
i+=1
|
||
print(f'retry {i} times')
|
||
print(e)
|
||
|
||
if i>=retry_times or response is None:
|
||
return void_value
|
||
else:
|
||
result = response.json()
|
||
if result.get('msg') is None and result.get('detail') is None:
|
||
return result
|
||
else:
|
||
print(result)
|
||
return void_value
|
||
except:
|
||
print('punct_pro failed')
|
||
return void_value
|
||
|
||
def api_pdf(img_path, data_path, pdf_path, config):
|
||
try:
|
||
access_token = config['token']
|
||
connect_timeout = config['timeout_connect']
|
||
read_timeout = config['timeout_read']
|
||
retry_times = config['retry_time']
|
||
server_type = config['server']
|
||
|
||
url = config['server_lst'][server_type] + '/pdf'
|
||
headers = {'Authorization': f'gjcool {access_token}'}
|
||
|
||
if data_path is not None:
|
||
filename = Path(img_path).name
|
||
mime = get_mime(img_path)
|
||
files = [('img', (filename, open(img_path, 'rb'), mime)), ('data', (filename, open(data_path, 'rb'), 'application/json'))]
|
||
|
||
i = 0
|
||
while i<retry_times:
|
||
try:
|
||
response = requests.post(url, headers=headers, files=files, timeout=(connect_timeout, read_timeout))
|
||
break
|
||
except requests.exceptions.RequestException as e:
|
||
i+=1
|
||
print(f'retry {i} times')
|
||
print(e)
|
||
|
||
if i>=retry_times or response is None:
|
||
return False
|
||
elif response.headers['content-type'] == 'application/json':
|
||
print(response.json())
|
||
return False
|
||
elif response.headers['content-type'] == 'application/pdf':
|
||
with open(pdf_path, "wb") as f:
|
||
f.write(response.content)
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
except:
|
||
print('pdf failed')
|
||
return False
|
||
|
||
def api_sr(img_path, output_path, scale, ext, output, config):
|
||
try:
|
||
access_token = config['token']
|
||
connect_timeout = config['timeout_connect']
|
||
read_timeout = config['timeout_read']
|
||
retry_times = config['retry_time']
|
||
server_type = config['server']
|
||
|
||
url = config['server_lst'][server_type] + '/sr'
|
||
headers = {'Authorization': f'gjcool {access_token}'}
|
||
|
||
img_name = Path(img_path).name
|
||
mime = get_mime(img_path)
|
||
files = [('img', (img_name, open(img_path, 'rb'), mime))]
|
||
data = {'scale':scale, 'ext':ext, 'output':output}
|
||
|
||
i = 0
|
||
while i<retry_times:
|
||
try:
|
||
response= requests.post(url, headers=headers, data=data, files=files, timeout=(connect_timeout, read_timeout))
|
||
break
|
||
except requests.exceptions.RequestException as e:
|
||
i+=1
|
||
print(f'retry {i} times')
|
||
print(e)
|
||
|
||
if i>=retry_times or response is None:
|
||
return False
|
||
elif response.headers['content-type'] == 'application/json':
|
||
result = response.json()
|
||
if output == 'base64':
|
||
if output in result.keys(): # result.get(output):
|
||
with open(output_path, "w", encoding='utf-8') as f:
|
||
f.write(result[output])
|
||
return True
|
||
else:
|
||
print(result)
|
||
return False
|
||
elif output == 'file':
|
||
print(result)
|
||
return False
|
||
else:
|
||
print(f'ouput wrong: {output}')
|
||
return False
|
||
else:
|
||
with open(output_path, "wb") as f:
|
||
f.write(response.content)
|
||
return True
|
||
except:
|
||
print('sr failed')
|
||
return False
|
||
|
||
def api_usage(api_type, void_value, config):
|
||
try:
|
||
access_token = config['token']
|
||
connect_timeout = config['timeout_connect']
|
||
read_timeout = config['timeout_read']
|
||
retry_times = config['retry_time']
|
||
server_type = config['server']
|
||
|
||
url = config['server_lst'][server_type] + '/usage'
|
||
headers = {'Authorization': f'gjcool {access_token}'}
|
||
|
||
i = 0
|
||
while i<retry_times:
|
||
try:
|
||
response = requests.post(f'{url}/{api_type}', headers=headers, timeout=(connect_timeout, read_timeout))
|
||
break
|
||
except requests.exceptions.RequestException as e:
|
||
i+=1
|
||
print(f'retry {i} times')
|
||
print(e)
|
||
|
||
if i>=retry_times or response is None:
|
||
return void_value
|
||
else:
|
||
result = response.json()
|
||
return result
|
||
|
||
|
||
except:
|
||
print('get usage failed')
|
||
return False
|
||
|
||
def batch_ocr_api(path_lst, layout, compact, area_num, row_num, task_name, config): #area_num, row_num, area_url
|
||
logging.info(f'\t\t任务:{task_name}\t\tSTART\t\t总数:{len(path_lst)}')
|
||
logging.info(f'\t\t序号\t用时\t字数\t列数\t大小\t宽度\t高度\t路径')
|
||
|
||
#初始化记录变量
|
||
total_info = {'TimeCost':0,'CharNumber':0, 'LineNumber':0, 'ImageSize':0, 'SuccessNumber':0, 'FailNumber':0}
|
||
fail_list_path = str(Fail_OCR_dir.joinpath(f'{task_name}.txt'))
|
||
save_text(fail_list_path, "", False)
|
||
|
||
start_time = time.time()
|
||
index = 0
|
||
for path_dict in tqdm(path_lst, desc="OCR"):
|
||
now_api_time = time.time()
|
||
|
||
if area_num == 0:
|
||
area = []
|
||
else:
|
||
area = api_area(path_dict['img_path'], area_num, row_num, [], config)
|
||
print(area)
|
||
|
||
data = api_ocr_pro(path_dict['img_path'], layout, area, compact, {}, config)
|
||
|
||
last_api_time = time.time()
|
||
if data=={}:
|
||
logging.warning(f"\t{index+1:<5d}\tocr failed\t{path_dict['img_path']}")
|
||
save_text(fail_list_path, f"{path_dict['img_path']}\n", True)
|
||
total_info['FailNumber'] += 1
|
||
else:
|
||
try:
|
||
with open(path_dict['json_path'], "w", encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False)
|
||
|
||
text = data['text'] if data.get('text') is not None else data_to_text(data)
|
||
with open(path_dict['text_path'], "w", encoding='utf-8') as f:
|
||
f.write(text)
|
||
|
||
#序号、用时、字数、列数、大小、宽度、高度、路径
|
||
# img_size = round(os.path.getsize(path_dict['img_path'])/1024) #KB
|
||
img_size = round(data['Size']/1024) #KB
|
||
time_cost= last_api_time - now_api_time #s
|
||
logging.info(f"\t\t{index+1:<6d}\t{time_cost:.2f}\t{data['CharNumber']:<6d}\t{data['LineNumber']:<6d}\t{img_size:<6d}\t{data['Width']:<6d}\t{data['Height']:<6d}\t{path_dict['img_path']}")
|
||
|
||
total_info['TimeCost'] += time_cost
|
||
total_info['CharNumber'] += data['CharNumber']
|
||
total_info['LineNumber'] += data['LineNumber']
|
||
total_info['ImageSize'] += data['Size']
|
||
total_info['SuccessNumber'] += 1
|
||
except:
|
||
logging.warning(f"\t\t{index+1:<6d}\tsave data wrong\t{path_dict['img_path']}")
|
||
save_text(fail_list_path, f"{path_dict['img_path']}\n", True)
|
||
total_info['FailNumber'] += 1
|
||
|
||
index += 1
|
||
|
||
logging.info(f"\t\t任务:{task_name}\t\tEND")
|
||
logging.info(f"\t\t总数\t总用时\t总字数\t总列数\t总大小")
|
||
logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['CharNumber']:<6d}\t{total_info['LineNumber']:<6d}\t{total_info['ImageSize']:<6d}\n")
|
||
|
||
def batch_punct_api(path_lst, task_name, config):
|
||
logging.info(f'\t\t任务:{task_name}\t\tSTART\t\t总数:{len(path_lst)}')
|
||
logging.info(f'\t\t序号\t用时\t原字数\t字数\t路径')
|
||
|
||
#初始化记录变量
|
||
total_info = {'TimeCost':0, 'OrigNumber':0, 'PunctNumber':0, 'SuccessNumber':0, 'FailNumber':0}
|
||
fail_list_path = str(Fail_Punct_dir /(f'{task_name}.txt'))
|
||
save_text(fail_list_path, "", False)
|
||
|
||
start_time = time.time()
|
||
index = 0
|
||
for path_dict in tqdm(path_lst, desc="Punct"):
|
||
now_api_time = time.time()
|
||
data = api_punct_pro(path_dict['orig_path'], path_dict['encoding'], {}, config)
|
||
|
||
last_api_time = time.time()
|
||
if data=={}:
|
||
logging.warning(f"\t{index+1:<6d}\tocr failed\t{path_dict['orig_path']}")
|
||
save_text(fail_list_path, f"{path_dict['orig_path']}\n", True)
|
||
total_info['FailNumber'] += 1
|
||
else:
|
||
try:
|
||
text = data['text'][0]
|
||
with open(path_dict['punct_path'], "w", encoding=path_dict['encoding']) as f:
|
||
f.write(text)
|
||
|
||
#序号、用时、字数、列数、大小、宽度、高度、路径
|
||
time_cost= last_api_time - now_api_time #s
|
||
orig_num = path_dict['orig_num']
|
||
punct_num = len(list(text))
|
||
logging.info(f"\t\t{index+1:<6d}\t{time_cost:.2f}\t{orig_num:<6d}\t{punct_num:<6d}\t{path_dict['orig_path']}")
|
||
|
||
total_info['TimeCost'] += time_cost
|
||
total_info['OrigNumber'] += orig_num
|
||
total_info['PunctNumber'] += punct_num
|
||
total_info['SuccessNumber'] += 1
|
||
except:
|
||
logging.warning(f"\t\t{index+1:<6d}\tsave data wrong\t{path_dict['orig_path']}")
|
||
save_text(fail_list_path, f"{path_dict['orig_path']}\n", True)
|
||
total_info['FailNumber'] += 1
|
||
index += 1
|
||
|
||
logging.info(f"\t\t任务:{task_name}\t\tEND")
|
||
logging.info(f"\t\t总数\t总用时\t总原字数\t总字数")
|
||
logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['OrigNumber']:<6d}\t{total_info['PunctNumber']:<6d}\n")
|
||
|
||
|
||
def batch_pdf_api(path_lst, task_name, config):
|
||
logging.info(f'\t\t任务:{task_name}\t\tSTART\t总数:{len(path_lst)}')
|
||
logging.info(f'\t\t序号\t用时\t大小\t路径')
|
||
|
||
#初始化记录变量
|
||
total_info = {'TimeCost':0, 'PDFSize':0, 'SuccessNumber':0, 'FailNumber':0}
|
||
fail_list_path = str(Fail_PDF_dir.joinpath(f'{task_name}.txt'))
|
||
save_text(fail_list_path, "", False)
|
||
|
||
# access_token = get_access_token_by_refresh(refresh_token)
|
||
# last_refresh_time = time.time()
|
||
start_time = time.time()
|
||
index = 0
|
||
for path_dict in tqdm(path_lst, desc="PDF"):
|
||
# if time.time()-last_refresh_time > Access_Token_Refresh_Interval:
|
||
# access_token = get_access_token_by_refresh(refresh_token)
|
||
# last_refresh_time = time.time()
|
||
# if access_token == '':
|
||
# logging.warning(f"get access token failed")
|
||
# print('get access token failed')
|
||
# break
|
||
|
||
now_api_time = time.time()
|
||
pdf_file = api_pdf(path_dict['img_path'], path_dict['data_path'], path_dict['pdf_path'], config)
|
||
last_api_time = time.time()
|
||
|
||
if not pdf_file:
|
||
logging.warning(f"\t{index+1:<5d}\tpdf failed\t{path_dict['img_path']}")
|
||
save_text(fail_list_path, f"{path_dict['img_path']}\n", True)
|
||
total_info['FailNumber'] += 1
|
||
else:
|
||
#序号、用时、大小、路径
|
||
pdf_size = round(os.path.getsize(path_dict['pdf_path'])/1024) #KB
|
||
time_cost= last_api_time - now_api_time #s
|
||
logging.info(f"\t\t{index+1:<5d}\t{time_cost:.2f}\t{pdf_size:<6d}\t{path_dict['pdf_path']}")
|
||
|
||
total_info['TimeCost'] += time_cost
|
||
total_info['PDFSize'] += pdf_size
|
||
total_info['SuccessNumber'] += 1
|
||
|
||
index += 1
|
||
# while time.time()-last_api_time < interval:
|
||
# time.sleep(1)
|
||
|
||
logging.info(f"\t\t任务:{task_name}\t\tEND")
|
||
logging.info(f"\t\t总数\t总用时\t总大小")
|
||
logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['PDFSize']:<8d}\n")
|
||
|
||
def batch_sr_api(path_lst, scale, ext, output, task_name, config):
|
||
logging.info(f'\t\t任务:{task_name}\t\tSTART\t总数:{len(path_lst)}')
|
||
logging.info(f'\t\t序号\t\t用时\t\t大小\t\t路径')
|
||
|
||
#初始化记录变量
|
||
total_info = {'TimeCost':0, 'SRSize':0, 'SuccessNumber':0, 'FailNumber':0}
|
||
fail_list_path = str(Fail_SR_dir.joinpath(f'{task_name}.txt'))
|
||
save_text(fail_list_path, "", False)
|
||
|
||
start_time = time.time()
|
||
index = 0
|
||
for path_dict in tqdm(path_lst, desc="SR"):
|
||
|
||
now_api_time = time.time()
|
||
pdf_file = api_sr(path_dict['img_path'], path_dict['sr_path'], scale, ext, output, config)
|
||
last_api_time = time.time()
|
||
if not pdf_file:
|
||
logging.warning(f"\t{index+1:<5d}\tsr failed\t{path_dict['img_path']}")
|
||
save_text(fail_list_path, f"{path_dict['img_path']}\n", True)
|
||
total_info['FailNumber'] += 1
|
||
else:
|
||
#序号、用时、大小、路径
|
||
sr_size = round(os.path.getsize(path_dict['sr_path'])/1024) #KB
|
||
time_cost= last_api_time - now_api_time #s
|
||
logging.info(f"\t\t{index+1:<5d}\t{time_cost:.2f}\t{sr_size:<6d}\t{path_dict['sr_path']}")
|
||
|
||
total_info['TimeCost'] += time_cost
|
||
total_info['SRSize'] += sr_size
|
||
total_info['SuccessNumber'] += 1
|
||
|
||
index += 1
|
||
# while time.time()-last_api_time < interval:
|
||
# time.sleep(1)
|
||
|
||
logging.info(f"\t\t任务:{task_name}\t\tEND")
|
||
logging.info(f"\t\t总数\t\t总用时\t总大小")
|
||
logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['SRSize']:<8d}\n")
|
||
|
||
def get_usage_api(api_type, config):
|
||
# access_token = get_access_token_by_refresh(refresh_token)
|
||
|
||
usage = api_usage(api_type, config, {})
|
||
if usage != {}:
|
||
if usage.get('msg') is not None:
|
||
print(usage['msg'])
|
||
elif usage.get('detail') is not None:
|
||
print(usage['detail'])
|
||
else:
|
||
result = f'本期已用: {usage["usage"]}; 本期剩余: {usage["remain"]}; 本期总量: {usage["total"]}; 历史总量: {usage["history"]}'
|
||
print(result)
|
||
else:
|
||
print('查询失败')
|
||
|
||
|
||
|
||
def check_apiid(apiid):
|
||
return len(apiid) !=0 and len(apiid)==30
|
||
|
||
def check_config(config, password_path=Password_path):
|
||
if not check_apiid(config['apiid']):
|
||
print('注意: apiid无效, 请设置apiid')
|
||
elif load_password(password_path) == '':
|
||
print('注意: 密码无效,请设置密码')
|
||
elif config['token'] == '':
|
||
print('注意: Token无效,请刷新Token')
|
||
elif config['server'] not in config['server_lst'].keys():
|
||
print('注意: 服务器无效,请设置服务器')
|
||
elif config['server'] == 'local' and config['server_lst']['local'] == '':
|
||
print('注意: 本地服务器为空,请设置本地服务器IP地址')
|
||
|
||
def check_password(password, pubkey_path, password_encrypt_path):
|
||
try:
|
||
if password=='':
|
||
return False
|
||
encrypt_password = encrypt_by_rsa(read_key(pubkey_path), password)
|
||
if encrypt_password=='':
|
||
return False
|
||
else:
|
||
save_text(password_encrypt_path, encrypt_password)
|
||
return True
|
||
except:
|
||
print('密码检查失败')
|
||
return False
|
||
|
||
|
||
|
||
|
||
|
||
def encrypt_by_rsa(pubkey, message:str):
|
||
'''
|
||
用RSA公钥加密密码, 不超过117bytes
|
||
|
||
pubkey: rsa.key.PublicKey
|
||
message: str
|
||
return : str
|
||
'''
|
||
try:
|
||
if len(message.encode('utf-8')) <= 117:
|
||
encrypted = rsa.encrypt(message.encode('utf-8'), pubkey)
|
||
return str(base64.encodebytes(encrypted), encoding='utf-8')
|
||
else:
|
||
raise ValueError
|
||
except ValueError:
|
||
print('message length longer than 117 bytes')
|
||
return ''
|
||
except:
|
||
print("encrypt failed")
|
||
return ''
|
||
|
||
def get_allfile_alldir_in_dir(path):
|
||
alldir_path =[]
|
||
allfile_path=[]
|
||
path_tuple = os.walk(path)
|
||
|
||
for dirpath, dirnames, filenames in path_tuple:
|
||
for dir in dirnames:
|
||
alldir_path.append(os.path.join(dirpath, dir))
|
||
|
||
for f in filenames:
|
||
allfile_path.append(os.path.join(dirpath, f))
|
||
|
||
alldir_path = sorted(alldir_path)
|
||
allfile_path = sorted(allfile_path)
|
||
return alldir_path, allfile_path
|
||
|
||
|
||
def get_token_by_login(apiid, password, url):
|
||
try:
|
||
payload = {'apiid':apiid, 'password':password, 'encrypt':1, 'is_long':1}
|
||
response = requests.post(url, data=payload).json()
|
||
token = response['access_token']
|
||
except:
|
||
token = ''
|
||
return token
|
||
|
||
|
||
def get_mime(img_path):
|
||
with open(img_path, 'rb') as f:
|
||
img = f.read()
|
||
mime_type = whatimage.identify_image(img)
|
||
if mime_type is None or mime_type=='None':
|
||
mime_type = Path(img_path).suffix.replace('.', '')
|
||
return f'image/{mime_type}'
|
||
|
||
def get_encodeing(text_path, encoding_detect, default_encoding):
|
||
encoding = default_encoding
|
||
if encoding_detect:
|
||
import chardet
|
||
try:
|
||
data = open(text_path,'rb').read()
|
||
encoding = chardet.detect(data)['encoding']
|
||
except:
|
||
encoding = default_encoding
|
||
|
||
return encoding
|
||
|
||
def get_text_length(file_path, encoding):
|
||
try:
|
||
text = open(file_path, 'r', encoding=encoding).read()
|
||
return len(list(text))
|
||
except:
|
||
return 0
|
||
|
||
def load_config(config_path=ConfigFile):
|
||
try:
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except:
|
||
print('配置文件读取失败')
|
||
return None
|
||
|
||
|
||
def load_password(filepath):
|
||
try:
|
||
return open(filepath, "r" ,encoding='utf-8').read()
|
||
except:
|
||
return ''
|
||
|
||
def logging_init(log_type:str, dir:Path=Log_dir):
|
||
'''
|
||
初始化日志记录器
|
||
'''
|
||
log_dir = dir / log_type
|
||
log_dir.mkdir(exist_ok=True)
|
||
|
||
log_filepath = log_dir / (time.strftime("%Y-%m-%d", time.localtime()) + '.log')
|
||
logging.basicConfig(
|
||
filename=str(log_filepath),
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)s %(message)s",
|
||
datefmt="%Y-%m-%d %H:%M:%S",
|
||
encoding='utf-8'
|
||
)
|
||
|
||
|
||
def pdf_merge(pdf_lst, save_path):
|
||
try:
|
||
merger = PdfMerger()
|
||
for pdf in pdf_lst:
|
||
merger.append(pdf)
|
||
merger.write(save_path)
|
||
merger.close()
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
|
||
|
||
def prepare_ocr_dir_task_paths(dir, task_name, max_size):
|
||
#创建目录
|
||
json_save_dir = Json_Data_dir.joinpath(task_name)
|
||
json_save_dir.mkdir(exist_ok=True)
|
||
text_save_dir = Text_Data_dir.joinpath(task_name)
|
||
text_save_dir.mkdir(exist_ok=True)
|
||
|
||
alldir_path, allfile_path = get_allfile_alldir_in_dir(dir)
|
||
for dir_path in alldir_path:
|
||
data_dir = Path(str(dir_path).replace(dir, str(json_save_dir)))
|
||
data_dir.mkdir(exist_ok=True)
|
||
text_dir = Path(str(dir_path).replace(dir, str(text_save_dir)))
|
||
text_dir.mkdir(exist_ok=True)
|
||
|
||
path_lst, fail_lst =[], []
|
||
for file_path in allfile_path:
|
||
if os.path.getsize(file_path) < max_size:
|
||
filename = Path(file_path).stem
|
||
json_dir = Path(str(file_path).replace(dir, str(json_save_dir))).parent
|
||
text_dir = Path(str(file_path).replace(dir, str(text_save_dir))).parent
|
||
path_dict = {'img_path':file_path, 'json_path':str(json_dir.joinpath(f'{filename}.json')), 'text_path':str(text_dir.joinpath(f'{filename}.txt'))}
|
||
path_lst.append(path_dict)
|
||
else:
|
||
print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
|
||
fail_lst.append(file_path)
|
||
return path_lst, fail_lst
|
||
|
||
def prepare_ocr_files_task_paths(paths, task_name, max_size):
|
||
json_save_dir = Json_Data_dir.joinpath(task_name)
|
||
json_save_dir.mkdir(exist_ok=True)
|
||
text_save_dir = Text_Data_dir.joinpath(task_name)
|
||
text_save_dir.mkdir(exist_ok=True)
|
||
|
||
path_lst, fail_lst =[], []
|
||
for file_path in paths:
|
||
if os.path.getsize(file_path) < max_size:
|
||
filename = Path(file_path).stem
|
||
path_lst.append({'img_path':file_path, 'json_path':str(json_save_dir.joinpath(f'{filename}.json')), 'text_path':str(text_save_dir.joinpath(f'{filename}.txt'))})
|
||
else:
|
||
print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
|
||
fail_lst.append(file_path)
|
||
return path_lst, fail_lst
|
||
|
||
|
||
|
||
def prepare_punct_dir_task_paths(dir, task_name, config):
|
||
detect_encoding = config['punct_detect_encoding']
|
||
default_encoding = config['punct_default_encoding']
|
||
max_length = config['punct_max_length']
|
||
|
||
#创建目录
|
||
save_dir = Punct_Data_dir / task_name
|
||
save_dir.mkdir(exist_ok=True)
|
||
|
||
alldir_path, allfile_path = get_allfile_alldir_in_dir(dir)
|
||
for dir_path in alldir_path:
|
||
punct_dir = Path(str(dir_path).replace(dir, str(save_dir)))
|
||
punct_dir.mkdir(exist_ok=True)
|
||
|
||
path_lst, fail_lst =[], []
|
||
for file_path in allfile_path:
|
||
encoding = get_encodeing(file_path, detect_encoding, default_encoding)
|
||
num = get_text_length(file_path, encoding)
|
||
if num == 0:
|
||
print(f'{file_path}读取失败')
|
||
fail_lst.append(file_path)
|
||
elif num > max_length:
|
||
print(f'{file_path}长度过大, , 超过最大限量{max_length}字')
|
||
fail_lst.append(file_path)
|
||
else:
|
||
filename = Path(file_path).stem
|
||
punct_dir = Path(str(file_path).replace(dir, str(save_dir))).parent
|
||
path_dict = {'orig_path':file_path, 'punct_path':str(punct_dir.joinpath(f'{filename}.txt')), 'encoding':encoding, 'orig_num':num}
|
||
path_lst.append(path_dict)
|
||
|
||
|
||
return path_lst, fail_lst
|
||
|
||
def prepare_punct_files_task_paths(paths, task_name, config):
|
||
detect_encoding = config['punct_detect_encoding']
|
||
default_encoding = config['punct_default_encoding']
|
||
max_length = config['punct_max_length']
|
||
|
||
#创建目录
|
||
save_dir = Punct_Data_dir / task_name
|
||
save_dir.mkdir(exist_ok=True)
|
||
|
||
path_lst, fail_lst =[], []
|
||
for file_path in paths:
|
||
encoding = get_encodeing(file_path, detect_encoding, default_encoding)
|
||
num = get_text_length(file_path, encoding)
|
||
if num == 0:
|
||
print(f'{file_path}读取失败')
|
||
fail_lst.append(file_path)
|
||
elif num > max_length:
|
||
print(f'{file_path}长度过大, , 超过最大限量{max_length}字')
|
||
fail_lst.append(file_path)
|
||
else:
|
||
filename = Path(file_path).stem
|
||
path_dict = {'orig_path':file_path, 'punct_path':str(save_dir.joinpath(f'{filename}.txt')), 'encoding':encoding, 'orig_num':num}
|
||
path_lst.append(path_dict)
|
||
|
||
return path_lst, fail_lst
|
||
|
||
|
||
def prepare_ocr_list_task_paths(list_paths, task_name, max_size):
|
||
img_paths = []
|
||
for lst_path in list_paths:
|
||
with open(lst_path, 'r',encoding='utf-8') as f:
|
||
for line in f.readlines():
|
||
img_path = line.strip()
|
||
if Path(img_path).exists():
|
||
img_paths.append(img_path)
|
||
|
||
# [img_paths.append(line.strip()) for line in f.readlines() if len(line.strip())>0 and Path(line.strip()).exists()]
|
||
|
||
path_lst, fail_lst = prepare_ocr_files_task_paths(img_paths, task_name, max_size)
|
||
|
||
return path_lst, fail_lst
|
||
|
||
def prepare_pdf_merge(root_dir, task_name):
|
||
try:
|
||
pdf_root_dir = PDF_Data_dir.joinpath(task_name)
|
||
pdf_root_dir.mkdir(exist_ok=True)
|
||
|
||
all_dir, all_files = get_allfile_alldir_in_dir(root_dir)
|
||
if len(all_dir)>0:
|
||
for dir in all_dir:
|
||
#寻找最底层子目录
|
||
subdirs, item_paths = get_allfile_alldir_in_dir(dir)
|
||
if len(subdirs)==0:
|
||
save_path = str(pdf_root_dir.joinpath(f'{Path(dir).name}.pdf'))
|
||
if pdf_merge(item_paths, save_path):
|
||
print(f'{Path(save_path).name}合并完成')
|
||
else:
|
||
print(f'{Path(save_path).name}合并失败')
|
||
else:
|
||
save_path = str(pdf_root_dir.joinpath(Path(root_dir).name))
|
||
if pdf_merge(all_files, save_path):
|
||
print(f'{Path(save_path).name}合并完成')
|
||
else:
|
||
print(f'{Path(save_path).name}合并失败')
|
||
|
||
except:
|
||
print(f'{task_name}失败')
|
||
|
||
def prepare_pdf_split(pdf_paths, task_name):
|
||
try:
|
||
pdf_root_dir = PDF_Data_dir.joinpath(task_name)
|
||
pdf_root_dir.mkdir(exist_ok=True)
|
||
for pdf_path in tqdm(pdf_paths, desc='PDF拆分'):
|
||
reader = PdfReader(pdf_path)
|
||
pdf_dir = pdf_root_dir.joinpath(Path(pdf_path).stem)
|
||
pdf_dir.mkdir(exist_ok=True)
|
||
for i in range(len(reader.pages)):
|
||
writer = PdfWriter()
|
||
writer.add_page(reader.pages[i])
|
||
page_path = pdf_dir.joinpath(f'{i+1:04d}.pdf')
|
||
with open(str(page_path), "wb") as fp:
|
||
writer.write(fp)
|
||
print(f'{task_name}完成')
|
||
except:
|
||
print(f'{task_name}失败')
|
||
|
||
def prepare_pdf_to_image(pdf_paths, task_name, dpi=300, image_format='JPEG', first_page=None, last_page=None):
|
||
try:
|
||
pdf_root_dir = PDF_Data_dir.joinpath(task_name)
|
||
pdf_root_dir.mkdir(exist_ok=True)
|
||
|
||
for pdf_path in tqdm(pdf_paths, desc='PDF转图片'):
|
||
pdf_dir = pdf_root_dir.joinpath(Path(pdf_path).stem)
|
||
pdf_dir.mkdir(exist_ok=True)
|
||
filename = Path(pdf_path).stem
|
||
try:
|
||
_ = convert_from_path(pdf_path,poppler_path=Poppler_Path, dpi=dpi, output_folder=pdf_dir, fmt=image_format, first_page=first_page, last_page=last_page, thread_count=os.cpu_count(), output_file=filename)
|
||
except Exception as e:
|
||
print(e.args)
|
||
print(f'{pdf_path}失败')
|
||
|
||
print(f'{task_name}完成')
|
||
except:
|
||
print(f'{task_name}失败')
|
||
|
||
|
||
def prepare_pdf_dir_task_paths(img_dir, data_dir, task_name, max_size):
|
||
pdf_save_dir = PDF_Data_dir.joinpath(task_name)
|
||
pdf_save_dir.mkdir(exist_ok=True)
|
||
|
||
all_img_dir, all_img_file = get_allfile_alldir_in_dir(img_dir)
|
||
for dir in all_img_dir:
|
||
pdf_dir = Path(str(dir).replace(img_dir, str(pdf_save_dir)))
|
||
pdf_dir.mkdir(exist_ok=True)
|
||
|
||
path_lst, fail_lst =[], []
|
||
for img_path in all_img_file:
|
||
if os.path.getsize(img_path) < max_size:
|
||
filename = Path(img_path).stem
|
||
|
||
data_path = Path(str(img_path).replace(img_dir, data_dir)).parent.joinpath(f'{filename}.json')
|
||
if data_path.exists():
|
||
pdf_path = Path(str(img_path).replace(img_dir, str(pdf_save_dir))).parent.joinpath(f'{filename}.pdf')
|
||
path_lst.append({'img_path':img_path, 'data_path':str(data_path), 'pdf_path':str(pdf_path)})
|
||
else:
|
||
print(f'数据不存在: {data_path}')
|
||
fail_lst.append(img_path)
|
||
else:
|
||
print(f'{img_path}体积过大, {os.path.getsize(img_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
|
||
fail_lst.append(img_path)
|
||
return path_lst, fail_lst
|
||
|
||
def prepare_pdf_files_task_paths(img_paths, data_dir, task_name, max_size):
|
||
pdf_save_dir = PDF_Data_dir.joinpath(task_name)
|
||
pdf_save_dir.mkdir(exist_ok=True)
|
||
|
||
path_lst, fail_lst =[], []
|
||
for img_path in img_paths:
|
||
if os.path.getsize(img_path) < max_size:
|
||
filename = Path(img_path).stem
|
||
|
||
data_path = Path(data_dir).joinpath(f'{filename}.json')
|
||
if data_path.exists():
|
||
pdf_path = Path(pdf_save_dir).joinpath(f'{filename}.pdf')
|
||
path_lst.append({'img_path':img_path, 'data_path':str(data_path), 'pdf_path':str(pdf_path)})
|
||
else:
|
||
print(f'数据不存在: {data_path}')
|
||
fail_lst.append(img_path)
|
||
else:
|
||
print(f'{img_path}体积过大, {os.path.getsize(img_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
|
||
fail_lst.append(img_path)
|
||
return path_lst, fail_lst
|
||
|
||
def prepare_sr_dir_task_paths(img_dir, task_name, ext, output, max_size):
|
||
#创建目录
|
||
sr_save_dir = SR_Data_dir.joinpath(task_name)
|
||
sr_save_dir.mkdir(exist_ok=True)
|
||
|
||
alldir_path, allfile_path = get_allfile_alldir_in_dir(img_dir)
|
||
for dir_path in alldir_path:
|
||
data_dir = Path(str(dir_path).replace(img_dir, str(sr_save_dir)))
|
||
data_dir.mkdir(exist_ok=True)
|
||
|
||
path_lst, fail_lst =[], []
|
||
for file_path in allfile_path:
|
||
# img_PIL = Image.open(file_path)
|
||
|
||
if os.path.getsize(file_path) > max_size:
|
||
print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
|
||
fail_lst.append(file_path)
|
||
# elif max(img_PIL.size)>max_length:
|
||
# print(f'{file_path}尺寸过大, {img_PIL.size[0]}x{img_PIL.size[1]}px, 超过最大限量{max_length}px')
|
||
# fail_lst.append(file_path)
|
||
else:
|
||
filename = Path(file_path).stem
|
||
sr_dir = Path(str(file_path).replace(img_dir, str(sr_save_dir))).parent
|
||
ext_format = 'txt' if output=='base64' else ext
|
||
path_lst.append( {'img_path':file_path, 'sr_path':str(sr_dir.joinpath(f'{filename}.{ext_format}'))})
|
||
|
||
return path_lst, fail_lst
|
||
|
||
def prepare_sr_files_task_paths(img_paths, task_name, ext, output, max_size):
|
||
#创建目录
|
||
sr_save_dir = SR_Data_dir.joinpath(task_name)
|
||
sr_save_dir.mkdir(exist_ok=True)
|
||
|
||
path_lst, fail_lst =[], []
|
||
for file_path in img_paths:
|
||
# img_PIL = Image.open(file_path)
|
||
|
||
if os.path.getsize(file_path) > max_size :
|
||
print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
|
||
fail_lst.append(file_path)
|
||
# elif max(img_PIL.size)>max_length:
|
||
# print(f'{file_path}尺寸过大, {img_PIL.size[0]}x{img_PIL.size[1]}px, 超过最大限量{max_length}px')
|
||
# fail_lst.append(file_path)
|
||
else:
|
||
filename = Path(file_path).stem
|
||
ext_format = 'txt' if output=='base64' else ext
|
||
path_lst.append( {'img_path':file_path, 'sr_path':str(sr_save_dir.joinpath(f'{filename}.{ext_format}'))})
|
||
|
||
|
||
return path_lst, fail_lst
|
||
|
||
def read_key(key_path):
|
||
'''
|
||
读取公钥文件
|
||
key_path: pem文件地址
|
||
return: rsa.key.PublicKey
|
||
'''
|
||
with open(key_path, 'rb') as f:
|
||
pubkey_data = f.read()
|
||
return rsa.PublicKey.load_pkcs1(pubkey_data)
|
||
|
||
def read_paths(pathtype='file', init_dir='./'):
|
||
root = tkinter.Tk()
|
||
root.focus_force()
|
||
root.after(10, root.withdraw)
|
||
if pathtype == 'file':
|
||
return filedialog.askopenfilenames(parent=root, initialdir=init_dir)
|
||
elif pathtype == 'dir':
|
||
return filedialog.askdirectory(parent=root, initialdir=init_dir)
|
||
|
||
def save_text(filepath, content, is_add=False):
|
||
if not filepath: return
|
||
with open(filepath, "a" if is_add else "w",encoding='utf-8') as f:
|
||
f.write(content)
|
||
|
||
def save_config(config, config_path=ConfigFile):
|
||
with open(config_path, 'w', encoding='utf-8') as f:
|
||
json.dump(config, f, ensure_ascii=False, indent=4)
|
||
|
||
def data_to_text(data):
|
||
result = ''
|
||
if data.get('line_ids') is not None and data.get('chars') is not None:
|
||
for i, (id_i, char_i) in enumerate(zip(data['line_ids'], data['chars'])):
|
||
#处理非结尾的字符
|
||
if i < len(data['line_ids'])-1 and id_i==data['line_ids'][i+1]:
|
||
result+=char_i
|
||
#处理结尾处的字符
|
||
else:
|
||
result+=char_i+'\n'
|
||
return result
|
||
|
||
if __name__ == "__main__":
|
||
|
||
key = input('选择任务: 0.设置; 1.识别; 2.自动标点; 3.PDF; 4.超分辨率增强; 5.查询使用量. 输入其他键, 退出\t')
|
||
while key in ['0', '1', '2', '3', '4', '5']:
|
||
if key in ['0']:
|
||
config = load_config()
|
||
check_config(config)
|
||
|
||
sub_key = input('1.设置apiid; 2.设置密码; 3.刷新Token; 4.选择服务器; 5.设置本地服务器IP地址. 输入其他键, 返回上层\t')
|
||
while sub_key in ['1', '2', '3', '4', '5']:
|
||
if sub_key in ['1']:
|
||
config = load_config()
|
||
apiid_now = config['apiid']
|
||
apiid = input(f'当前apiid: {apiid_now}. 如果重新设置, 请输入apiid, 否则请回车:\t')
|
||
if len(apiid) == 0:
|
||
print('放弃设置apiid')
|
||
else:
|
||
while not check_apiid(apiid):
|
||
apiid = input('请输入apiid, 回车则放弃设置:\t')
|
||
if len(apiid) == 0:
|
||
print('放弃设置apiid')
|
||
break
|
||
config['apiid'] = apiid
|
||
save_config(config)
|
||
|
||
elif sub_key in ['2']:
|
||
password = getpass('请输入密码,密码加密后保存于Config/password_encrypt.txt。回车则放弃设置:\t')
|
||
while not check_password(password, Pubkey_path, Password_path):
|
||
password = getpass('请输入密码:\t')
|
||
if len(password) == 0:
|
||
print('放弃设置密码')
|
||
break
|
||
|
||
elif sub_key in ['3']:
|
||
config = load_config()
|
||
apiid = config['apiid']
|
||
login_url = config['login_url']
|
||
encrypt_password = load_password(Password_path)
|
||
token = get_token_by_login(apiid, encrypt_password, login_url)
|
||
if len(token) == 0:
|
||
print('Token刷新失败,请重新检查账号信息或网络连接')
|
||
else:
|
||
print('Token刷新成功')
|
||
config['token'] = token
|
||
save_config(config)
|
||
|
||
elif sub_key in ['4']:
|
||
config = load_config()
|
||
server_now = config['server']
|
||
server_lst = list(config['server_lst'].keys())
|
||
server = input(f'当前服务器: {server_now}. 如果重新选择,请选择{",".join(server_lst)} , 否则请回车:\t')
|
||
if len(server)>0:
|
||
if server in server_lst:
|
||
config['server'] = server
|
||
save_config(config)
|
||
else:
|
||
while len(server)>0 and server not in server_lst:
|
||
server = input(f'请选择{", ".join(server_lst)} , 放弃请回车:\t')
|
||
if len(server) == 0:
|
||
print('放弃选择服务器')
|
||
break
|
||
elif server in server_lst:
|
||
config['server'] = server
|
||
save_config(config)
|
||
|
||
elif sub_key in ['5']:
|
||
config = load_config()
|
||
if config['server'] == 'local':
|
||
local_ip = input(f"当前本地服务器: {config['server_lst']['local']}. 如果重新设置, 输入本地服务器的IP地址, 否则请回车:\t")
|
||
if len(local_ip) == 0:
|
||
print('放弃修改本地服务器IP')
|
||
else:
|
||
while len(local_ip.split('.'))!= 4:
|
||
print('IP地址格式错误,请重新输入')
|
||
local_ip = input(f"请输入本地服务器的IP地址, 放弃请回车:\t")
|
||
if len(local_ip) == 0:
|
||
break
|
||
if len(local_ip) != 0:
|
||
config['server_lst']['local'] = f"{config['local_head']}{local_ip}:{config['local_port']}"
|
||
save_config(config)
|
||
else:
|
||
print('请先将服务器设置为local')
|
||
|
||
config = load_config()
|
||
check_config(config)
|
||
sub_key = input('1.设置apiid; 2.设置密码; 3.刷新Token; 4.选择服务器; 5.设置本地服务器IP地址. 输入其他键, 返回上层\t')
|
||
|
||
elif key in ['1']:
|
||
logging_init('OCR')
|
||
sub_key = input('选择图片: 1.目录; 2.文件; 3.列表. 输入其他键, 返回上层\t')
|
||
while sub_key in ['1', '2', '3']:
|
||
|
||
layout = input('请输入排版方向: 0:竖版,1:横版. 默认值:0\t')
|
||
if layout not in ['0', '1']:
|
||
layout = '0'
|
||
|
||
compact = input('请输入Compact参数: 1, 2, 4, 6. 默认值: 1\t')
|
||
if compact not in ['1', '2', '4', '6']:
|
||
compact = '1'
|
||
compact = int(compact)
|
||
|
||
#task_name
|
||
task_name = input('请输入任务名称:\t')
|
||
if not task_name:
|
||
break
|
||
|
||
config = load_config()
|
||
max_size = config['max_size'] * 1024 * 1024
|
||
|
||
#图片列表:path_lst,fail_lst
|
||
if sub_key in ['1']:
|
||
print('请选择图片目录')
|
||
dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
|
||
if not dir:
|
||
break
|
||
print(dir)
|
||
path_lst, fail_lst = prepare_ocr_dir_task_paths(dir, task_name, max_size)
|
||
elif sub_key in ['2']:
|
||
print('请选择图片文件')
|
||
img_paths = read_paths(init_dir=str(Start_dir))
|
||
if not img_paths:
|
||
break
|
||
print(f'已选择{len(img_paths)}个文件')
|
||
path_lst, fail_lst = prepare_ocr_files_task_paths(img_paths, task_name, max_size)
|
||
elif sub_key in ['3']:
|
||
print('请选择列表文件')
|
||
list_paths = read_paths(init_dir=str(Start_dir))
|
||
if not list_paths:
|
||
break
|
||
print(f'已选择{len(list_paths)}个列表')
|
||
path_lst, fail_lst = prepare_ocr_list_task_paths(list_paths, task_name, max_size)
|
||
|
||
#path_lst, layout,task_name,url, fail_lst
|
||
if len(fail_lst)>0:
|
||
check_size = input(f'有{len(fail_lst)}个文件体积超标, 是否停止任务: 1. 继续; 其他, 中止\t')
|
||
if check_size not in ['1']:
|
||
break
|
||
|
||
area_check = input('如果需要自动检测封闭区域, 则输入区域数量(1-4); 不需要则跳过\t')
|
||
if area_check in ['1', '2', '3', '4']:
|
||
area_num = int(area_check)
|
||
|
||
row_num_check = input('如果需要分栏,则输入分栏数量(2-4); 不需要则跳过\t')
|
||
if row_num_check in ['2','3','4']:
|
||
row_num = int(row_num_check)
|
||
else:
|
||
row_num = 1
|
||
else:
|
||
area_num = 0
|
||
row_num = 1
|
||
|
||
try:
|
||
batch_ocr_api(path_lst, layout, compact, area_num, row_num, task_name, config)
|
||
except:
|
||
print(f'{task_name}任务失败')
|
||
|
||
sub_key = input('选择方式: 1.目录; 2.文件; 3.列表. 输入其他键, 返回上层\t')
|
||
|
||
elif key in ['2']:
|
||
logging_init('Punct')
|
||
sub_key = input('选择文本: 1.目录; 2.文件. 输入其他键, 返回上层\t')
|
||
while sub_key in ['1', '2']:
|
||
task_name = input('请输入任务名称:\t')
|
||
if not task_name:
|
||
break
|
||
|
||
config = load_config()
|
||
|
||
if sub_key in ['1']:
|
||
print('请选择目录')
|
||
dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
|
||
if not dir:
|
||
break
|
||
print(dir)
|
||
path_lst, fail_lst = prepare_punct_dir_task_paths(dir, task_name, config)
|
||
elif sub_key in ['2']:
|
||
print('请选择文件')
|
||
orig_paths = read_paths(init_dir=str(Start_dir))
|
||
if not orig_paths:
|
||
break
|
||
print(f'已选择{len(orig_paths)}个文件')
|
||
path_lst, fail_lst = prepare_punct_files_task_paths(orig_paths, task_name, config)
|
||
|
||
if len(fail_lst)>0:
|
||
check_length = input(f'有{len(fail_lst)}个文件长度超标, 是否停止任务: 1. 继续; 其他, 中止\t')
|
||
if check_length not in ['1']:
|
||
break
|
||
|
||
batch_punct_api(path_lst, task_name, config)
|
||
|
||
sub_key = input('选择方式: 1.目录; 2.文件. 输入其他键, 返回上层\t')
|
||
|
||
|
||
elif key in ['3']:
|
||
logging_init('PDF')
|
||
sub_key = input('选择方式: 1.按目录生成; 2.按文件生成; 3.PDF合并; 4.PDF拆分; 5.PDF转图片. 输入其他键, 返回上层\t')
|
||
while sub_key in ['1', '2', '3', '4', '5']:
|
||
|
||
task_name = input('请输入任务名称:\t')
|
||
if not task_name:
|
||
break
|
||
|
||
if sub_key in ['1', '2']:
|
||
config = load_config()
|
||
max_size = config['max_size'] * 1024 * 1024
|
||
|
||
if sub_key in ['1']:
|
||
print('请选择图片目录')
|
||
img_dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
|
||
if not img_dir:
|
||
break
|
||
print(img_dir)
|
||
|
||
print('请选择数据目录')
|
||
data_dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
|
||
if not data_dir:
|
||
print('数据为空')
|
||
break
|
||
else:
|
||
print(data_dir)
|
||
|
||
|
||
path_lst, fail_lst = prepare_pdf_dir_task_paths(img_dir, data_dir, task_name, max_size)
|
||
|
||
elif sub_key in ['2']:
|
||
print('请选择图片文件')
|
||
img_paths = read_paths(init_dir=str(Start_dir))
|
||
if not img_paths:
|
||
break
|
||
print(f'已选择{len(img_paths)}个文件')
|
||
|
||
print('请选择数据目录')
|
||
data_dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
|
||
if not data_dir:
|
||
print('数据为空')
|
||
break
|
||
else:
|
||
print(data_dir)
|
||
|
||
path_lst, fail_lst = prepare_pdf_files_task_paths(img_paths, data_dir, task_name, max_size)
|
||
|
||
#
|
||
if len(fail_lst)>0:
|
||
check_size = input(f'有{len(fail_lst)}个文件存在问题, 是否停止任务: 1. 继续; 其他, 中止\t')
|
||
if check_size not in ['1']:
|
||
break
|
||
try:
|
||
config = load_config()
|
||
batch_pdf_api(path_lst, task_name, config)
|
||
except:
|
||
print(f'{task_name}任务失败')
|
||
|
||
#合并
|
||
elif sub_key in ['3']:
|
||
print('请选择PDF目录')
|
||
pdf_dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
|
||
if not pdf_dir:
|
||
break
|
||
print(pdf_dir)
|
||
|
||
prepare_pdf_merge(pdf_dir, task_name)
|
||
|
||
#拆分
|
||
elif sub_key in ['4']:
|
||
print('请选择PDF文件')
|
||
pdf_paths = read_paths(init_dir=str(Start_dir))
|
||
if not pdf_paths:
|
||
break
|
||
print(f'已选择{len(pdf_paths)}个文件')
|
||
|
||
prepare_pdf_split(pdf_paths, task_name)
|
||
|
||
#转图片
|
||
elif sub_key in ['5']:
|
||
print('请选择PDF文件')
|
||
pdf_paths = read_paths(init_dir=str(Start_dir))
|
||
if not pdf_paths:
|
||
break
|
||
print(f'已选择{len(pdf_paths)}个文件')
|
||
|
||
dpi_str = input('请输入dpi:\t')
|
||
try:
|
||
dpi = int(dpi_str)
|
||
if dpi<=0:
|
||
break
|
||
except:
|
||
break
|
||
|
||
format_str = input('请选择图片格式: 1.jpeg; 2.png.\t')
|
||
if format_str not in ['1', '2']:
|
||
break
|
||
format_id = int(format_str)-1
|
||
image_format = Pdf_to_Image_lst[format_id]
|
||
|
||
prepare_pdf_to_image(pdf_paths, task_name, dpi=dpi, image_format=image_format)
|
||
|
||
|
||
sub_key = input('选择方式: 1.按目录生成; 2.按文件生成; 3.PDF合并; 4.PDF拆分; 5.PDF转图片. 输入其他键, 返回上层\t')
|
||
|
||
elif key in ['4']:
|
||
logging_init('SR')
|
||
sub_key = input('选择方式: 1.目录; 2.文件. 输入其他键, 返回上层\t')
|
||
while sub_key in ['1', '2']:
|
||
|
||
task_name = input('请输入任务名称:\t')
|
||
if not task_name:
|
||
break
|
||
|
||
scale_str = input('请输入放大倍数: 1, 2, 4. 默认值:2\t')
|
||
if scale_str not in ['1','2','4']:
|
||
scale_str = '2'
|
||
scale = int(scale_str)
|
||
|
||
ext_str = input('请输入保存图片格式: 1.jpeg, 2.png, 3.tiff, 4.webp. 默认值:jpeg\t')
|
||
if ext_str not in ['1','2','3','4']:
|
||
ext_str = '1'
|
||
ext = SR_EXT_lst[int(ext_str)-1]
|
||
|
||
output_str = input('请输入数据格式: 1.file, 2.base64. 默认值:file\t')
|
||
if output_str not in ['1','2','3','4']:
|
||
output_str = '1'
|
||
output = SR_Output_lst[int(output_str)-1]
|
||
|
||
config = load_config()
|
||
max_size = config['max_size'] * 1024 * 1024
|
||
|
||
if sub_key in ['1']:
|
||
print('请选择图片目录')
|
||
img_dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
|
||
if not img_dir:
|
||
break
|
||
print(img_dir)
|
||
path_lst, fail_lst = prepare_sr_dir_task_paths(img_dir, task_name, ext, output, max_size)
|
||
|
||
elif sub_key in ['2']:
|
||
print('请选择图片文件')
|
||
img_paths = read_paths(init_dir=str(Start_dir))
|
||
if not img_paths:
|
||
break
|
||
# print(img_paths)
|
||
path_lst, fail_lst = prepare_sr_files_task_paths(img_paths, task_name, ext, output, max_size)
|
||
|
||
if len(fail_lst)>0:
|
||
check_size = input(f'有{len(fail_lst)}个文件存在问题, 是否停止任务: 1. 继续; 其他, 中止\t')
|
||
if check_size not in ['1']:
|
||
break
|
||
|
||
try:
|
||
config = load_config()
|
||
batch_sr_api(path_lst, scale, ext, output, task_name, config)
|
||
except:
|
||
print(f'{task_name}任务失败')
|
||
sub_key = input('选择方式: 1.目录; 2.文件. 输入其他键, 返回上层\t')
|
||
|
||
elif key in ['5']:
|
||
sub_key = input('选择参数: 1.ocr; 2.sr; 3.pdf. 输入其他键, 返回上层\t')
|
||
while sub_key in ['1', '2', '3']:
|
||
api_type = Usage_API_Type_lst[int(sub_key)-1]
|
||
|
||
try:
|
||
config = load_config()
|
||
get_usage_api(api_type, config)
|
||
except:
|
||
print(f'查询失败')
|
||
|
||
sub_key = input('选择参数: 1.ocr; 2.sr; 3.pdf. 输入其他键, 返回上层\t')
|
||
|
||
key = input('选择任务: 0.设置; 1.OCR; 2.自动标点; 3.PDF; 4.超分辨率增强; 5.查询使用量. 输入其他键, 退出\t') |