368 lines
14 KiB
Python
368 lines
14 KiB
Python
# last updata: 2025-07-13
|
|
# 下载离线安装包
|
|
# mkdir lib
|
|
# pip download requests whatimage tqdm opencv-python -d lib
|
|
# 离线安装
|
|
# 1. 解压/lib (python>=3.9) ;或解压/lib38 (python=3.8)
|
|
# 2. pip install --no-index --find-links=lib requests whatimage tqdm opencv-python
|
|
# 在线安装
|
|
# pip install requests whatimage tqdm
|
|
|
|
import json,logging,time,os
|
|
from pathlib import Path
|
|
from tkinter import filedialog, Tk
|
|
import requests, whatimage
|
|
from tqdm import tqdm
|
|
from tempfile import NamedTemporaryFile
|
|
from io import BytesIO
|
|
|
|
|
|
Start_dir = Path(__file__).parent
|
|
ConfigFile = Start_dir / 'ocr_config.json'
|
|
AuthFile = Start_dir / 'ocr_auth.json'
|
|
|
|
Log_dir = Start_dir / 'Log'
|
|
Data_dir = Start_dir / 'Data'
|
|
Json_Data_dir = Data_dir / 'json'
|
|
Text_Data_dir = Data_dir / 'text'
|
|
Fail_dir = Log_dir / 'fail'
|
|
Fail_OCR_dir = Fail_dir / 'ocr'
|
|
|
|
Log_dir.mkdir(exist_ok=True)
|
|
Data_dir.mkdir(exist_ok=True)
|
|
Json_Data_dir.mkdir(exist_ok=True)
|
|
Text_Data_dir.mkdir(exist_ok=True)
|
|
Fail_dir.mkdir(exist_ok=True)
|
|
Fail_OCR_dir.mkdir(exist_ok=True)
|
|
|
|
TimeStampStr = '%Y-%m-%d_%H.%M.%S'
|
|
|
|
def get_timestamp(time_stamp_format=TimeStampStr):
|
|
return time.strftime(time_stamp_format, time.localtime())
|
|
|
|
|
|
def data_to_text(data):
|
|
result = ''
|
|
if data.get('line_ids') is not None and data.get('chars') is not None:
|
|
for i, (id_i, char_i) in enumerate(zip(data['line_ids'], data['chars'])):
|
|
#处理非结尾的字符
|
|
if i < len(data['line_ids'])-1 and id_i==data['line_ids'][i+1]:
|
|
result+=char_i
|
|
#处理结尾处的字符
|
|
else:
|
|
result+=char_i+'\n'
|
|
return result
|
|
|
|
|
|
def resize_image(img_path, max_length:int):
|
|
import cv2
|
|
file_path_gbk = str(img_path).encode('gbk')
|
|
img = cv2.imread(file_path_gbk.decode())
|
|
# img = cv2.imread(str(img_path))
|
|
|
|
height, width = img.shape[:2]
|
|
resize_factor:float = max(height, width) / max_length if max(height, width) > max_length > 0 else 1.0
|
|
if resize_factor>1:
|
|
img = cv2.resize(img, (round(width/resize_factor), round(height/resize_factor)))
|
|
_, buffer = cv2.imencode('.jpeg', img) # 编码为JPEG字节流
|
|
img_bytes = BytesIO(buffer).getvalue() # 获取字节数据
|
|
return img_bytes, resize_factor
|
|
|
|
|
|
|
|
def resize_data(data, resize_factor:float):
|
|
if resize_factor > 1:
|
|
if data.get('Width') is not None and data.get('Height') is not None:
|
|
data['Width'] = round(data['Width'] * resize_factor)
|
|
data['Height'] = round(data['Height'] * resize_factor)
|
|
if data.get('coors') is not None and type(data['coors'])==list:
|
|
data['coors'] = [[round(x*resize_factor) for x in coor] for coor in data['coors'] if type(coor)==list]
|
|
return data
|
|
|
|
def api_ocr_pro(img_path, void_value, auth_dict, config):
|
|
try:
|
|
access_token = auth_dict['token']
|
|
connect_timeout = config['timeout_connect']
|
|
read_timeout = config['timeout_read']
|
|
retry_times = config['retry_time']
|
|
server_type = config['server']
|
|
ocr_type = config['ocr_type']
|
|
|
|
max_length = config['max_length']
|
|
|
|
url = config['server_lst'][server_type] + f'/{ocr_type}'
|
|
headers = {'Authorization': f'gjcool {access_token}'}
|
|
|
|
img_name = Path(img_path).name
|
|
mime = get_mime(img_path)
|
|
|
|
if max_length == 0:
|
|
files = [('img', (img_name, open(img_path, 'rb'), mime))]
|
|
resize_factor = 1.0
|
|
else:
|
|
img_bytes, resize_factor = resize_image(img_path, max_length)
|
|
files = [('img', (img_name, img_bytes, mime))]
|
|
data = {}
|
|
i = 0
|
|
while i<retry_times:
|
|
try:
|
|
response = requests.post(url, headers=headers, data=data, files=files, timeout=(connect_timeout, read_timeout))
|
|
break
|
|
except requests.exceptions.RequestException as e:
|
|
i+=1
|
|
print(f'retry {i} times')
|
|
print(e)
|
|
|
|
if i>=retry_times or response is None:
|
|
return void_value
|
|
else:
|
|
result = response.json()
|
|
if result.get('msg') is None and result.get('detail') is None:
|
|
result = resize_data(result, resize_factor)
|
|
return result
|
|
else:
|
|
print(result)
|
|
return void_value
|
|
except:
|
|
print('ocr_pro failed')
|
|
return void_value
|
|
|
|
|
|
def batch_ocr_api(path_lst, task_name, auth_dict, config): #layout, compact, area_num, row_num, , anno_open:bool=True
|
|
logging.info(f'\t\t任务:{task_name}\t\tSTART\t\t总数:{len(path_lst)}')
|
|
logging.info(f'\t\t序号\t用时\t字数\t列数\t大小\t宽度\t高度\t路径')
|
|
|
|
#初始化记录变量
|
|
total_info = {'TimeCost':0,'CharNumber':0, 'LineNumber':0, 'ImageSize':0, 'SuccessNumber':0, 'FailNumber':0}
|
|
fail_list_path = str(Fail_OCR_dir.joinpath(f'{task_name}.txt'))
|
|
save_text(fail_list_path, "", False)
|
|
|
|
start_time = time.time()
|
|
index = 0
|
|
for path_dict in tqdm(path_lst, desc="OCR"):
|
|
now_api_time = time.time()
|
|
|
|
data = api_ocr_pro(path_dict['img_path'], {}, auth_dict, config)
|
|
last_api_time = time.time()
|
|
if data=={}:
|
|
logging.warning(f"\t{index+1:<5d}\tocr failed\t{path_dict['img_path']}")
|
|
save_text(fail_list_path, f"{path_dict['img_path']}\n", True)
|
|
total_info['FailNumber'] += 1
|
|
else:
|
|
try:
|
|
with open(path_dict['json_path'], "w", encoding='utf-8') as f:
|
|
json.dump(data, f, ensure_ascii=False)
|
|
|
|
text = data.get('text', data_to_text(data))
|
|
|
|
with open(path_dict['text_path'], "w", encoding='utf-8') as f:
|
|
f.write(text)
|
|
|
|
#序号、用时、字数、列数、大小、宽度、高度、路径
|
|
img_size = round(data['Size']/1024) #KB
|
|
time_cost= last_api_time - now_api_time #s
|
|
logging.info(f"\t\t{index+1:<6d}\t{time_cost:.2f}\t{data['CharNumber']:<6d}\t{data['LineNumber']:<6d}\t{img_size:<6d}\t{data['Width']:<6d}\t{data['Height']:<6d}\t{path_dict['img_path']}")
|
|
|
|
total_info['TimeCost'] += time_cost
|
|
total_info['CharNumber'] += data['CharNumber']
|
|
total_info['LineNumber'] += data['LineNumber']
|
|
total_info['ImageSize'] += data['Size']
|
|
total_info['SuccessNumber'] += 1
|
|
except:
|
|
logging.warning(f"\t\t{index+1:<6d}\tsave data wrong\t{path_dict['img_path']}")
|
|
save_text(fail_list_path, f"{path_dict['img_path']}\n", True)
|
|
total_info['FailNumber'] += 1
|
|
|
|
index += 1
|
|
|
|
logging.info(f"\t\t任务:{task_name}\t\tEND")
|
|
logging.info(f"\t\t总数\t总用时\t总字数\t总列数\t总大小")
|
|
logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['CharNumber']:<6d}\t{total_info['LineNumber']:<6d}\t{total_info['ImageSize']:<6d}\n")
|
|
|
|
|
|
|
|
def get_allfile_alldir_in_dir(path):
|
|
alldir_path =[]
|
|
allfile_path=[]
|
|
path_tuple = os.walk(path)
|
|
|
|
for dirpath, dirnames, filenames in path_tuple:
|
|
for dir in dirnames:
|
|
alldir_path.append(os.path.join(dirpath, dir))
|
|
|
|
for f in filenames:
|
|
allfile_path.append(os.path.join(dirpath, f))
|
|
|
|
alldir_path = sorted(alldir_path)
|
|
allfile_path = sorted(allfile_path)
|
|
return alldir_path, allfile_path
|
|
|
|
|
|
def get_token_by_login(apiid, password, url):
|
|
try:
|
|
payload = {'apiid':apiid, 'password':password, 'encrypt':1, 'is_long':1}
|
|
response = requests.post(url, data=payload).json()
|
|
token = response['access_token']
|
|
except:
|
|
token = ''
|
|
return token
|
|
|
|
|
|
def get_mime(img_path):
|
|
with open(img_path, 'rb') as f:
|
|
img = f.read()
|
|
mime_type = whatimage.identify_image(img)
|
|
if mime_type is None or mime_type=='None':
|
|
mime_type = Path(img_path).suffix.replace('.', '')
|
|
return f'image/{mime_type}'
|
|
|
|
|
|
|
|
def load_config(config_path):
|
|
try:
|
|
with open(config_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except:
|
|
print('配置文件读取失败')
|
|
return None
|
|
|
|
|
|
def logging_init(log_type:str, dir:Path=Log_dir, level=logging.INFO):
|
|
'''
|
|
初始化日志记录器
|
|
'''
|
|
log_dir = dir / log_type
|
|
log_dir.mkdir(exist_ok=True)
|
|
|
|
log_filepath = log_dir / (time.strftime("%Y-%m-%d", time.localtime()) + '.log')
|
|
logging.basicConfig(
|
|
filename=str(log_filepath),
|
|
level=level,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
encoding='utf-8'
|
|
)
|
|
|
|
|
|
def prepare_ocr_dir_task_paths(dir, task_name, max_size):
|
|
#创建目录
|
|
json_save_dir = Json_Data_dir.joinpath(task_name)
|
|
json_save_dir.mkdir(exist_ok=True)
|
|
text_save_dir = Text_Data_dir.joinpath(task_name)
|
|
text_save_dir.mkdir(exist_ok=True)
|
|
|
|
alldir_path, allfile_path = get_allfile_alldir_in_dir(dir)
|
|
for dir_path in alldir_path:
|
|
data_dir = Path(str(dir_path).replace(dir, str(json_save_dir)))
|
|
data_dir.mkdir(exist_ok=True)
|
|
text_dir = Path(str(dir_path).replace(dir, str(text_save_dir)))
|
|
text_dir.mkdir(exist_ok=True)
|
|
|
|
path_lst, fail_lst =[], []
|
|
for file_path in allfile_path:
|
|
if os.path.getsize(file_path) < max_size:
|
|
filename = Path(file_path).stem
|
|
json_dir = Path(str(file_path).replace(dir, str(json_save_dir))).parent
|
|
text_dir = Path(str(file_path).replace(dir, str(text_save_dir))).parent
|
|
path_dict = {'img_path':file_path, 'json_path':str(json_dir.joinpath(f'{filename}.json')), 'text_path':str(text_dir.joinpath(f'{filename}.txt'))}
|
|
path_lst.append(path_dict)
|
|
else:
|
|
print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
|
|
fail_lst.append(file_path)
|
|
return path_lst, fail_lst
|
|
|
|
def prepare_ocr_files_task_paths(paths, task_name, max_size):
|
|
json_save_dir = Json_Data_dir.joinpath(task_name)
|
|
json_save_dir.mkdir(exist_ok=True)
|
|
text_save_dir = Text_Data_dir.joinpath(task_name)
|
|
text_save_dir.mkdir(exist_ok=True)
|
|
|
|
path_lst, fail_lst =[], []
|
|
for file_path in paths:
|
|
if os.path.getsize(file_path) < max_size:
|
|
filename = Path(file_path).stem
|
|
path_lst.append({'img_path':file_path, 'json_path':str(json_save_dir.joinpath(f'{filename}.json')), 'text_path':str(text_save_dir.joinpath(f'{filename}.txt'))})
|
|
else:
|
|
print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
|
|
fail_lst.append(file_path)
|
|
return path_lst, fail_lst
|
|
|
|
|
|
def prepare_ocr_list_task_paths(list_paths, task_name, max_size):
|
|
img_paths = []
|
|
for lst_path in list_paths:
|
|
with open(lst_path, 'r',encoding='utf-8') as f:
|
|
for line in f.readlines():
|
|
img_path = line.strip()
|
|
if Path(img_path).exists():
|
|
img_paths.append(img_path)
|
|
|
|
path_lst, fail_lst = prepare_ocr_files_task_paths(img_paths, task_name, max_size)
|
|
|
|
return path_lst, fail_lst
|
|
|
|
def read_paths(pathtype='file', init_dir='./'):
|
|
root = Tk()
|
|
root.focus_force()
|
|
root.after(10, root.withdraw)
|
|
if pathtype == 'file':
|
|
return filedialog.askopenfilenames(parent=root, initialdir=init_dir)
|
|
elif pathtype == 'dir':
|
|
return filedialog.askdirectory(parent=root, initialdir=init_dir)
|
|
|
|
def save_text(filepath, content, is_add=False):
|
|
if not filepath: return
|
|
with open(filepath, "a" if is_add else "w",encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging_init('OCR')
|
|
sub_key = input('选择图片: 1.目录; 2.文件; 3.列表. 输入其他键, 返回上层\t')
|
|
while sub_key in ['1', '2', '3']:
|
|
task_name = input(f'请输入任务名称. 默认取当前日期时间({TimeStampStr}):\t')
|
|
if not task_name:
|
|
task_name = get_timestamp(TimeStampStr)
|
|
|
|
auth_dict = load_config(AuthFile)
|
|
config = load_config(ConfigFile)
|
|
max_size = config['max_size'] * 1024 * 1024
|
|
|
|
#图片列表:path_lst,fail_lst
|
|
if sub_key in ['1']:
|
|
print('请选择图片目录')
|
|
dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
|
|
if not dir:
|
|
break
|
|
print(dir)
|
|
path_lst, fail_lst = prepare_ocr_dir_task_paths(dir, task_name, max_size)
|
|
elif sub_key in ['2']:
|
|
print('请选择图片文件')
|
|
img_paths = read_paths(init_dir=str(Start_dir))
|
|
if not img_paths:
|
|
break
|
|
print(f'已选择{len(img_paths)}个文件')
|
|
path_lst, fail_lst = prepare_ocr_files_task_paths(img_paths, task_name, max_size)
|
|
elif sub_key in ['3']:
|
|
print('请选择列表文件')
|
|
list_paths = read_paths(init_dir=str(Start_dir))
|
|
if not list_paths:
|
|
break
|
|
print(f'已选择{len(list_paths)}个列表')
|
|
path_lst, fail_lst = prepare_ocr_list_task_paths(list_paths, task_name, max_size)
|
|
|
|
#path_lst,task_name, url, fail_lst
|
|
if len(fail_lst)>0:
|
|
check_size = input(f'有{len(fail_lst)}个文件体积超标, 是否停止任务: 1. 继续; 其他, 中止\t')
|
|
if check_size not in ['1']:
|
|
break
|
|
|
|
try:
|
|
batch_ocr_api(path_lst, task_name, auth_dict, config)
|
|
except:
|
|
print(f'{task_name}任务失败')
|
|
|
|
sub_key = input('选择方式: 1.目录; 2.文件; 3.列表. 输入其他键, 返回上层\t')
|
|
|
|
|