jzd/jzd_ocr.py

368 lines
14 KiB
Python
Raw Permalink Normal View History

2025-08-20 16:52:27 +08:00
# last updata: 2025-07-13
# 下载离线安装包
# mkdir lib
# pip download requests whatimage tqdm opencv-python -d lib
# 离线安装
# 1. 解压/lib (python>=3.9) ;或解压/lib38 (python=3.8)
# 2. pip install --no-index --find-links=lib requests whatimage tqdm opencv-python
# 在线安装
# pip install requests whatimage tqdm
import json,logging,time,os
from pathlib import Path
from tkinter import filedialog, Tk
import requests, whatimage
from tqdm import tqdm
from tempfile import NamedTemporaryFile
from io import BytesIO
Start_dir = Path(__file__).parent
ConfigFile = Start_dir / 'ocr_config.json'
AuthFile = Start_dir / 'ocr_auth.json'
Log_dir = Start_dir / 'Log'
Data_dir = Start_dir / 'Data'
Json_Data_dir = Data_dir / 'json'
Text_Data_dir = Data_dir / 'text'
Fail_dir = Log_dir / 'fail'
Fail_OCR_dir = Fail_dir / 'ocr'
Log_dir.mkdir(exist_ok=True)
Data_dir.mkdir(exist_ok=True)
Json_Data_dir.mkdir(exist_ok=True)
Text_Data_dir.mkdir(exist_ok=True)
Fail_dir.mkdir(exist_ok=True)
Fail_OCR_dir.mkdir(exist_ok=True)
TimeStampStr = '%Y-%m-%d_%H.%M.%S'
def get_timestamp(time_stamp_format=TimeStampStr):
return time.strftime(time_stamp_format, time.localtime())
def data_to_text(data):
result = ''
if data.get('line_ids') is not None and data.get('chars') is not None:
for i, (id_i, char_i) in enumerate(zip(data['line_ids'], data['chars'])):
#处理非结尾的字符
if i < len(data['line_ids'])-1 and id_i==data['line_ids'][i+1]:
result+=char_i
#处理结尾处的字符
else:
result+=char_i+'\n'
return result
def resize_image(img_path, max_length:int):
import cv2
file_path_gbk = str(img_path).encode('gbk')
img = cv2.imread(file_path_gbk.decode())
# img = cv2.imread(str(img_path))
height, width = img.shape[:2]
resize_factor:float = max(height, width) / max_length if max(height, width) > max_length > 0 else 1.0
if resize_factor>1:
img = cv2.resize(img, (round(width/resize_factor), round(height/resize_factor)))
_, buffer = cv2.imencode('.jpeg', img) # 编码为JPEG字节流
img_bytes = BytesIO(buffer).getvalue() # 获取字节数据
return img_bytes, resize_factor
def resize_data(data, resize_factor:float):
if resize_factor > 1:
if data.get('Width') is not None and data.get('Height') is not None:
data['Width'] = round(data['Width'] * resize_factor)
data['Height'] = round(data['Height'] * resize_factor)
if data.get('coors') is not None and type(data['coors'])==list:
data['coors'] = [[round(x*resize_factor) for x in coor] for coor in data['coors'] if type(coor)==list]
return data
def api_ocr_pro(img_path, void_value, auth_dict, config):
try:
access_token = auth_dict['token']
connect_timeout = config['timeout_connect']
read_timeout = config['timeout_read']
retry_times = config['retry_time']
server_type = config['server']
ocr_type = config['ocr_type']
max_length = config['max_length']
url = config['server_lst'][server_type] + f'/{ocr_type}'
headers = {'Authorization': f'gjcool {access_token}'}
img_name = Path(img_path).name
mime = get_mime(img_path)
if max_length == 0:
files = [('img', (img_name, open(img_path, 'rb'), mime))]
resize_factor = 1.0
else:
img_bytes, resize_factor = resize_image(img_path, max_length)
files = [('img', (img_name, img_bytes, mime))]
data = {}
i = 0
while i<retry_times:
try:
response = requests.post(url, headers=headers, data=data, files=files, timeout=(connect_timeout, read_timeout))
break
except requests.exceptions.RequestException as e:
i+=1
print(f'retry {i} times')
print(e)
if i>=retry_times or response is None:
return void_value
else:
result = response.json()
if result.get('msg') is None and result.get('detail') is None:
result = resize_data(result, resize_factor)
return result
else:
print(result)
return void_value
except:
print('ocr_pro failed')
return void_value
def batch_ocr_api(path_lst, task_name, auth_dict, config): #layout, compact, area_num, row_num, , anno_open:bool=True
logging.info(f'\t\t任务:{task_name}\t\tSTART\t\t总数:{len(path_lst)}')
logging.info(f'\t\t序号\t用时\t字数\t列数\t大小\t宽度\t高度\t路径')
#初始化记录变量
total_info = {'TimeCost':0,'CharNumber':0, 'LineNumber':0, 'ImageSize':0, 'SuccessNumber':0, 'FailNumber':0}
fail_list_path = str(Fail_OCR_dir.joinpath(f'{task_name}.txt'))
save_text(fail_list_path, "", False)
start_time = time.time()
index = 0
for path_dict in tqdm(path_lst, desc="OCR"):
now_api_time = time.time()
data = api_ocr_pro(path_dict['img_path'], {}, auth_dict, config)
last_api_time = time.time()
if data=={}:
logging.warning(f"\t{index+1:<5d}\tocr failed\t{path_dict['img_path']}")
save_text(fail_list_path, f"{path_dict['img_path']}\n", True)
total_info['FailNumber'] += 1
else:
try:
with open(path_dict['json_path'], "w", encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False)
text = data.get('text', data_to_text(data))
with open(path_dict['text_path'], "w", encoding='utf-8') as f:
f.write(text)
#序号、用时、字数、列数、大小、宽度、高度、路径
img_size = round(data['Size']/1024) #KB
time_cost= last_api_time - now_api_time #s
logging.info(f"\t\t{index+1:<6d}\t{time_cost:.2f}\t{data['CharNumber']:<6d}\t{data['LineNumber']:<6d}\t{img_size:<6d}\t{data['Width']:<6d}\t{data['Height']:<6d}\t{path_dict['img_path']}")
total_info['TimeCost'] += time_cost
total_info['CharNumber'] += data['CharNumber']
total_info['LineNumber'] += data['LineNumber']
total_info['ImageSize'] += data['Size']
total_info['SuccessNumber'] += 1
except:
logging.warning(f"\t\t{index+1:<6d}\tsave data wrong\t{path_dict['img_path']}")
save_text(fail_list_path, f"{path_dict['img_path']}\n", True)
total_info['FailNumber'] += 1
index += 1
logging.info(f"\t\t任务:{task_name}\t\tEND")
logging.info(f"\t\t总数\t总用时\t总字数\t总列数\t总大小")
logging.info(f"\t\t{total_info['SuccessNumber']}/{total_info['FailNumber']} \t{time.time()-start_time:.2f}\t{total_info['CharNumber']:<6d}\t{total_info['LineNumber']:<6d}\t{total_info['ImageSize']:<6d}\n")
def get_allfile_alldir_in_dir(path):
alldir_path =[]
allfile_path=[]
path_tuple = os.walk(path)
for dirpath, dirnames, filenames in path_tuple:
for dir in dirnames:
alldir_path.append(os.path.join(dirpath, dir))
for f in filenames:
allfile_path.append(os.path.join(dirpath, f))
alldir_path = sorted(alldir_path)
allfile_path = sorted(allfile_path)
return alldir_path, allfile_path
def get_token_by_login(apiid, password, url):
try:
payload = {'apiid':apiid, 'password':password, 'encrypt':1, 'is_long':1}
response = requests.post(url, data=payload).json()
token = response['access_token']
except:
token = ''
return token
def get_mime(img_path):
with open(img_path, 'rb') as f:
img = f.read()
mime_type = whatimage.identify_image(img)
if mime_type is None or mime_type=='None':
mime_type = Path(img_path).suffix.replace('.', '')
return f'image/{mime_type}'
def load_config(config_path):
try:
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except:
print('配置文件读取失败')
return None
def logging_init(log_type:str, dir:Path=Log_dir, level=logging.INFO):
'''
初始化日志记录器
'''
log_dir = dir / log_type
log_dir.mkdir(exist_ok=True)
log_filepath = log_dir / (time.strftime("%Y-%m-%d", time.localtime()) + '.log')
logging.basicConfig(
filename=str(log_filepath),
level=level,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
encoding='utf-8'
)
def prepare_ocr_dir_task_paths(dir, task_name, max_size):
#创建目录
json_save_dir = Json_Data_dir.joinpath(task_name)
json_save_dir.mkdir(exist_ok=True)
text_save_dir = Text_Data_dir.joinpath(task_name)
text_save_dir.mkdir(exist_ok=True)
alldir_path, allfile_path = get_allfile_alldir_in_dir(dir)
for dir_path in alldir_path:
data_dir = Path(str(dir_path).replace(dir, str(json_save_dir)))
data_dir.mkdir(exist_ok=True)
text_dir = Path(str(dir_path).replace(dir, str(text_save_dir)))
text_dir.mkdir(exist_ok=True)
path_lst, fail_lst =[], []
for file_path in allfile_path:
if os.path.getsize(file_path) < max_size:
filename = Path(file_path).stem
json_dir = Path(str(file_path).replace(dir, str(json_save_dir))).parent
text_dir = Path(str(file_path).replace(dir, str(text_save_dir))).parent
path_dict = {'img_path':file_path, 'json_path':str(json_dir.joinpath(f'{filename}.json')), 'text_path':str(text_dir.joinpath(f'{filename}.txt'))}
path_lst.append(path_dict)
else:
print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
fail_lst.append(file_path)
return path_lst, fail_lst
def prepare_ocr_files_task_paths(paths, task_name, max_size):
json_save_dir = Json_Data_dir.joinpath(task_name)
json_save_dir.mkdir(exist_ok=True)
text_save_dir = Text_Data_dir.joinpath(task_name)
text_save_dir.mkdir(exist_ok=True)
path_lst, fail_lst =[], []
for file_path in paths:
if os.path.getsize(file_path) < max_size:
filename = Path(file_path).stem
path_lst.append({'img_path':file_path, 'json_path':str(json_save_dir.joinpath(f'{filename}.json')), 'text_path':str(text_save_dir.joinpath(f'{filename}.txt'))})
else:
print(f'{file_path}体积过大, {os.path.getsize(file_path)/1024/1024}MB, 超过最大限量{max_size/1024/1024}MB')
fail_lst.append(file_path)
return path_lst, fail_lst
def prepare_ocr_list_task_paths(list_paths, task_name, max_size):
img_paths = []
for lst_path in list_paths:
with open(lst_path, 'r',encoding='utf-8') as f:
for line in f.readlines():
img_path = line.strip()
if Path(img_path).exists():
img_paths.append(img_path)
path_lst, fail_lst = prepare_ocr_files_task_paths(img_paths, task_name, max_size)
return path_lst, fail_lst
def read_paths(pathtype='file', init_dir='./'):
root = Tk()
root.focus_force()
root.after(10, root.withdraw)
if pathtype == 'file':
return filedialog.askopenfilenames(parent=root, initialdir=init_dir)
elif pathtype == 'dir':
return filedialog.askdirectory(parent=root, initialdir=init_dir)
def save_text(filepath, content, is_add=False):
if not filepath: return
with open(filepath, "a" if is_add else "w",encoding='utf-8') as f:
f.write(content)
if __name__ == "__main__":
logging_init('OCR')
sub_key = input('选择图片: 1.目录; 2.文件; 3.列表. 输入其他键, 返回上层\t')
while sub_key in ['1', '2', '3']:
task_name = input(f'请输入任务名称. 默认取当前日期时间({TimeStampStr}):\t')
if not task_name:
task_name = get_timestamp(TimeStampStr)
auth_dict = load_config(AuthFile)
config = load_config(ConfigFile)
max_size = config['max_size'] * 1024 * 1024
#图片列表:path_lst,fail_lst
if sub_key in ['1']:
print('请选择图片目录')
dir = read_paths(pathtype='dir', init_dir=str(Start_dir))
if not dir:
break
print(dir)
path_lst, fail_lst = prepare_ocr_dir_task_paths(dir, task_name, max_size)
elif sub_key in ['2']:
print('请选择图片文件')
img_paths = read_paths(init_dir=str(Start_dir))
if not img_paths:
break
print(f'已选择{len(img_paths)}个文件')
path_lst, fail_lst = prepare_ocr_files_task_paths(img_paths, task_name, max_size)
elif sub_key in ['3']:
print('请选择列表文件')
list_paths = read_paths(init_dir=str(Start_dir))
if not list_paths:
break
print(f'已选择{len(list_paths)}个列表')
path_lst, fail_lst = prepare_ocr_list_task_paths(list_paths, task_name, max_size)
#path_lst,task_name, url, fail_lst
if len(fail_lst)>0:
check_size = input(f'{len(fail_lst)}个文件体积超标, 是否停止任务: 1. 继续; 其他, 中止\t')
if check_size not in ['1']:
break
try:
batch_ocr_api(path_lst, task_name, auth_dict, config)
except:
print(f'{task_name}任务失败')
sub_key = input('选择方式: 1.目录; 2.文件; 3.列表. 输入其他键, 返回上层\t')