c
# -*- coding: utf-8 -*-
# 使用方法,将本文件放置到和待转换文件的同级目录
# import sys, time
import zipfile
import os
import ebookmeta
import ebooklib
import tqdm
from ebooklib import epub
from io import BytesIO
from PIL import Image
import xml.etree.ElementTree as ET
import pathlib
import re
from typing import Tuple, Optional, List, Union
class PageInfo:
def __init__(self, idx: int):
self.image = idx
self.Type = ""
self.double_page = ""
self.image_size = ""
self.key = ""
self.book_mark = ""
self.image_width = ""
self.image_height = ""
def to_xml_ele(self):
ele = ET.Element("Page")
# ET.ident(ele)
ele.set("Image", str(self.image))
if self.Type:
ele.set("Type", self.type)
if self.double_page is True:
ele.set("DoublePage", "true")
elif self.double_page is False:
ele.set("DoublePage", "false")
if self.image_size:
ele.set("ImageSize", self.image_size)
if self.key:
ele.set("Key", self.key)
if self.book_mark:
ele.set("Bookmark", self.book_mark)
if self.image_width:
ele.set("ImageWidth", self.image_width)
if self.image_height:
ele.set("ImageHeight", self.image_height)
return ele
class ComicInfo:
def __init__(self):
self.series = ""
self.series_sort = ""
self.writer = ""
self.publisher = ""
self.title = ""
self.number = ""
self.volume = ""
self.language_iso = "zh-CN"
self.year = ""
self.month = ""
self.day = ""
self.GTIN = ""
self.tags = ""
self.notes = ""
self.summary = ""
self.locations = ""
self.pages = []
def add_page(self, page: PageInfo):
self.pages.append(page)
def merge_with_epub_info(self, meta):
if meta.identifier:
self.GTIN = meta.identifier
if len(meta.author_list):
self.writer = ",".join(meta.author_list)
if meta.series:
self.series = meta.series
self.series_sort = meta.series
if meta.series_index:
self.volume = str(int(float(meta.series_index)))
if len(meta.tag_list):
self.tags = ",".join(meta.tag_list)
if meta.description:
self.summary = meta.description
if meta.lang:
self.language_iso = meta.lang
if meta.title:
self.title = meta.title
self.notes = str(meta)
pub_info = meta.publish_info
if pub_info.title:
self.title = pub_info.title
if pub_info.publisher:
self.publisher = pub_info.publisher
if pub_info.year:
self.year = pub_info.year
if pub_info.city:
self.locations = pub_info.city
if pub_info.series:
self.series = pub_info.series
if pub_info.series_index:
self.volume = str(int(float(pub_info.series_index)))
if pub_info.isbn:
self.GTIN = pub_info.isbn
def merge_with_name_info(self, series, vol, chapter, publisher):
if series:
self.series = series
self.series_sort = series
if vol:
self.volume = str(vol)
if chapter:
self.number = chapter
if publisher:
self.publisher = publisher
def build_comic_info_xml(self):
try:
root = ET.Element("ComicInfo")
root.attrib["xmlns:xsi"] = "https://www.w3.org/2001/XMLSchema-instance"
root.attrib["xmlns:xsd"] = "https://www.w3.org/2001/XMLSchema"
def assign(cix_entry: str, md_entry: Optional[Union[str, int]]) -> None:
if md_entry is not None and md_entry:
et_entry = root.find(cix_entry)
if et_entry is not None:
et_entry.text = str(md_entry)
else:
et_entry = ET.SubElement(root, cix_entry)
et_entry.text = str(md_entry)
# return et_entry
else:
et_entry = root.find(cix_entry)
if et_entry is not None:
root.remove(et_entry)
assign("Title", self.title)
assign("Series", self.series)
assign("SeriesSort", self.series_sort)
assign("Writer", self.writer)
assign("Publisher", self.publisher)
assign("Number", self.number)
assign("Volume", self.volume)
assign("LanguageISO", self.language_iso)
assign("Year", self.year)
assign("Month", self.month)
assign("Day", self.day)
assign("GTIN", self.GTIN)
assign("Tags", self.tags)
assign("Notes", self.notes)
assign("Summary", self.summary)
assign("Locations", self.locations)
if len(self.pages):
pages_node = root.find("Pages")
if pages_node is not None:
pages_node.clear()
else:
pages_node = ET.SubElement(root, "Pages")
for p in self.pages:
pages_node.append(p.to_xml_ele())
ET.indent(root)
tree = ET.ElementTree(root)
return True, ET.tostring(tree.getroot(), encoding="utf-8", xml_declaration=True).decode(), ""
except Exception as e:
m = f"convert comic info xml failed with {e}"
print(m)
return False, "", m
# name_Vol.01_Ch.001-002_[publisher].epub
VOL_CH_RE_PAIR = (re.compile(r"([^_]+)_Vol\.(\d+)_Ch\.([^_]+)_\[([^\]]+)\]\."),
(2, 3, 1, 4, -1)) # series:1 vol:2 ch:3 publish:4,subname:-1
# name_Vol.01_[publisher].epub # series:1 vol:2 ch:-1 publish:3,subname:-1
VOL_RE_PAIR = (re.compile(r"([^_]+)_Vol\.(\d+)_\[([^\]]+)\]\."), (2, -1, 1, 3, -1))
# [publisher][series]sub_name第01卷.kepub.epub
MOE_SUBNAME_RE = (re.compile(r"\[([^\[]+)\](\[[^\[]+\])(.+)第(\d+)卷"), (4, -1, 2, 1, 3))
# [publisher][series]卷01.kepub.epub # publisher:1 series:2 vol:3,ch:-1,subname:-1
MOE_SUBNAME_RE = (re.compile(r"\[([^\[]+)\]\[([^\[]+)\](.+)第(\d+)卷"), (4, -1, 2, 1, 3))
# [publisher][series]話01-002.kepub.epub # publisher:1 series:2 vol:-1,ch:3,subname:-1
MOE_CH_RE_PAIR = (re.compile(r"\[([^\[]+)\]\[([^\[]+)\]話([\d-]+)"), (-1, 3, 2, 1, -1))
NAME_RULE=[
VOL_CH_RE_PAIR,
VOL_RE_PAIR,
MOE_CH_RE_PAIR,
MOE_SUBNAME_RE,
MOE_VOL_RE_PAIR
]
class Converter:
def __init__(self):
self.error_msg = ""
pass
def produce_metda_data_name(self, path) -> (str, str):
cm = ComicInfo()
obj_path = pathlib.Path(path)
name = str(obj_path.name)
res = False
for rules in NAME_RULE:
res, vol, ch, series, publisher = self.extract_base_info_from_name(name, rules)
if res:
cm.merge_with_name_info(series, vol, ch, publisher)
break
if res is False:
m = f"filename {path} not support"
self.error_msg += m + "\n"
res = False
print(m)
if res:
cm.merge_with_name_info(series, vol, ch, publisher)
try:
metadata = ebookmeta.get_metadata(path)
cm.merge_with_epub_info(metadata)
except Exception as e:
m = f"parse metadata from epub failed with {e}"
self.error_msg += m + "\n"
print(m)
if res:
_, name = self.produce_new_name(series, vol, ch, publisher)
else:
name = ""
return cm, name
def convert_to_webp(self, img_bytes) -> (bool, bytes):
try:
img = Image.open(BytesIO(img_bytes))
# import pdb
# pdb.set_trace()
out = BytesIO()
img.save(out, format="webp", quality=80)
# img.save(out,format='webp',lossless=True,quality=100,method=6)
return True, out.getvalue(), img.size
except Exception as e:
m = f"convert to webp failed with {e}"
self.error_msg += m + "\n"
print(m)
return False, img_bytes, (-1, -1)
def extract_base_info_from_name(self, name, re_pair) -> (
bool, int, str, str,
str): # repr, group_index: Tuple[int, int, int, int]) -> # (vol,chapter,series,publisher) not kown use "" or 1000
repr = re_pair[0]
group_index = re_pair[1]
if len(group_index) != 5:
return False, 1, "", "", "", "", ""
res = repr.search(name)
if res:
try:
vol = 1000
chapter = ""
series = ""
publisher = ""
vol_idx = group_index[0]
chapter_idx = group_index[1]
series_idx = group_index[2]
publisher_idx = group_index[3]
sub_name_idx = group_index[4]
if vol_idx != -1:
vol = int(float(res.group(vol_idx)))
if chapter_idx != -1:
chapter = res.group(chapter_idx)
if series_idx != -1:
series = res.group(series_idx)
if publisher_idx != -1:
publisher = res.group(publisher_idx)
if sub_name_idx != -1:
sub_name = res.group(sub_name_idx)
if sub_name:
series=f"{series}_{sub_name}"
return True, vol, chapter, series, publisher
except Exception as e:
m = f"extract info from {name} use {repr.pattern} Failed for{e}"
self.error_msg += m + "\n"
print(m)
return False, 1, "", "", ""
else:
return False, 1, "", "", ""
def produce_new_name(self, series, vol: int, chapter: str, publisher) -> (bool, str):
# vol padding on len 3,chapter padding on 4
try:
if not publisher:
publisher = "ericma"
if "-" in chapter:
chapter = [f"{int(float(i)):04}" for i in chapter.split("-")]
chapter = "-".join(chapter)
elif chapter:
chapter = f"{int(float(chapter)):04}"
if chapter:
return True, f"{series}_[{publisher}]_Vol.{vol:04}_Ch.{chapter}.cbz"
else:
return True, f"{series}_[{publisher}]_Vol.{vol:04}.cbz"
except Exception as e:
m = f"build name on ({series},{vol, chapter, publisher}) failed for {e}"
self.error_msg += m + "\n"
print(m)
return False, ""
def resolve_path_on_any_platform(self, root_path, rel_path):
root = pathlib.PurePosixPath(root_path)
rel_path = pathlib.PurePosixPath(rel_path)
for p in rel_path.parts:
if p == "..":
root = root.parent
elif p != '.':
root = root / p
return root.as_posix()
def process(self, path):
new_name = None
try:
print(f"process {path}")
self.error_msg = ""
cm, new_name = self.produce_metda_data_name(path)
old_name = pathlib.Path(path).name
if not new_name:
new_name = path.replace(".epub", ".cbz")
else:
new_name = path.replace(old_name, new_name)
if os.path.exists(new_name):
print(f"cbz {new_name} already exists")
return True, ""
with zipfile.ZipFile(new_name, 'w') as zwrite:
# if data:
# zwrite.writestr("ComicInfo.xml", data) # ,zipfile.ZIP_DEFLATED)
ebook = ebooklib.epub.read_epub(path, options={"ignore_ncx": True})
idx = 1
img_list = []
for ref_id, is_show in ebook.spine:
page = ebook.get_item_with_id(ref_id)
if type(page) == ebooklib.epub.EpubHtml:
xml_content = page.content
root_path = str(pathlib.PurePosixPath(page.file_name).parent)
ele = ET.fromstring(xml_content)
for item in ele.findall(".//"):
if "img" in item.tag:
if "src" in item.attrib:
src = item.attrib["src"]
# process imag_path
abs_path = self.resolve_path_on_any_platform(root_path, src)
img_list.append((idx, abs_path, ref_id, item.attrib))
idx += 1
paddinglen = len(str(len(img_list)))
for idx, abs_path, ref_id, attr_dict in tqdm.tqdm(img_list):
try:
img_block = ebook.get_item_with_href(abs_path)
s = pathlib.Path(abs_path).suffix
if s in set([".jpg", ".png", ".jpeg"]) or img_block.media_type in set(
["image/jpeg", "image/png"]):
res, img_d, shape = self.convert_to_webp(img_block.content)
if res:
newname = f"{str(idx).rjust(paddinglen, '0')}-{ref_id}.webp"
else:
newname = f"{str(idx).rjust(paddinglen, '0')}-{ref_id}{s}"
page = PageInfo(idx)
if "class" in attr_dict:
if attr_dict["class"] == "singlePage":
page.double_page = False
elif attr_dict["class"] == "twoPage":
page.double_page = True
page.image_size = str(len(img_d))
page.key = ref_id
page.image_width = str(shape[0])
page.image_height = str(shape[1])
cm.add_page(page)
zwrite.writestr(newname, img_d) # , zipfile.ZIP_DEFLATED)
except Exception as e:
m = f"process image on {ref_id} name {abs_path} failed with {e} "
self.error_msg += m + "\n"
if new_name:
if os.path.exists(new_name):
os.remove(new_name)
return False, self.error_msg
res, data, msg = cm.build_comic_info_xml()
if msg:
self.error_msg += msg + "\n"
if data:
zwrite.writestr("ComicInfo.xml", data, zipfile.ZIP_DEFLATED)
return True, self.error_msg
except Exception as e:
m = f"process {path} failed with {e}"
self.error_msg += m + '\n'
print(e)
if new_name:
if os.path.exists(new_name):
os.remove(new_name)
return False, self.error_msg
if __name__ == '__main__':
c = Converter()
now = os.getcwd()
# import pdb
# pdb.set_trace()
def fn(file_dir):
for root, dirs, files in os.walk(file_dir):
for f in files:
if os.path.splitext(f)[1] == '.epub': # 处理epub
yield os.path.relpath(os.path.join(root, f), now)
res_warning_dict = dict()
res_failed_dict = dict()
for filename in fn(now): # 读取当前以及子目录下所有的epub文件
res, msg = c.process(filename)
if res:
print(f"process {filename} succeed")
if msg:
res_warning_dict[filename] = msg
else:
print(f"process {filename} failed")
res_failed_dict[filename] = msg
print("==============below is convert with some warning ==============")
for k, v in res_warning_dict.items():
print(f"> {k}\n {v}\n ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n")
print("==============below is convert failed ==============")
for k, v in res_failed_dict.items():
print(f"> {k}\n {v}\n ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n")