博客
关于我
05-bid_document_generator
阅读量:798 次
发布时间:2023-03-23

本文共 16714 字,大约阅读时间需要 55 分钟。

import requests
import json
import re
import os
import pandas as pd
from paddleocr import PaddleOCR
from pdf2image import convert_from_path
import shutil
import time
from typing import Optional, Tuple, List
from docx import Document
from docx.shared import Pt, RGBColor, Inches
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
from docx.oxml.ns import qn
from docx.oxml import OxmlElement
from datetime import datetime
class PDFTextExtractor:
"""PDF文本提取器,负责从PDF中提取文本内容"""
def __init__(self, output_folder: str):
self.output_folder = output_folder
self.ocr = PaddleOCR(use_angle_cls=True, lang="ch", use_gpu=False)
def extract_tender_text_content_from_pdf(self, pdf_path: str) -> str:
"""从PDF中提取文本内容"""
os.makedirs(self.output_folder, exist_ok=True)
full_text = []
try:
with pdfplumber.open(pdf_path) as pdf:
total_pages = len(pdf.pages)
for page_num, page in enumerate(pdf.pages, start=1):
page_text = self._process_page(page, page_num, pdf_path, total_pages)
full_text.append(page_text)
print(f"处理进度: {page_num}/{total_pages}页", end="\r")
except Exception as e:
print(f"PDF处理出错: {str(e)}")
raise
if os.path.exists(self.output_folder):
shutil.rmtree(self.output_folder)
return '\n'.join(full_text)
def _process_page(self, page, page_num: int, pdf_path: str, total_pages: int) -> str:
"""处理单个PDF页面"""
page_text = f"\n=== 第 {page_num}/{total_pages} 页内容 ===\n"
text = page.extract_text()
if text and text.strip():
page_text += text + "\n\n"
page_text += self._extract_tables(page)
else:
page_text += self._process_image_page(page_num, pdf_path)
return page_text
def _extract_tables(self, page) -> str:
"""从页面中提取表格"""
tables_text = ""
try:
tables = page.extract_tables({
"vertical_strategy": "lines",
"horizontal_strategy": "lines",
"explicit_vertical_lines": page.curves + page.edges,
"explicit_horizontal_lines": page.curves + page.edges,
"intersection_y_tolerance": 10
})
for table_idx, table in enumerate(tables, start=1):
df = pd.DataFrame(table)
table_str = [f'【表格{table_idx}开始】\n']
if df.shape[0] > 0:
headers = [str(df.iloc[0, j]) if pd.notna(df.iloc[0, j]) else f'列{j + 1}'
for j in range(df.shape[1])]
table_str.append(' | '.join(headers) + '\n')
table_str.append('---' * len(headers) + '\n')
for i in range(1 if df.shape[0] > 0 else 0, df.shape[0]):
row = [str(df.iloc[i, j]) if pd.notna(df.iloc[i, j]) else ''
for j in range(df.shape[1])]
table_str.append(' | '.join(row) + '\n')
table_str.append(f'【表格{table_idx}结束】\n\n')
tables_text += ''.join(table_str)
except Exception as e:
print(f"表格提取出错: {str(e)}")
return tables_text
def _process_image_page(self, page_num: int, pdf_path: str) -> str:
"""处理图片页面,使用OCR识别"""
try:
images = convert_from_path(pdf_path, first_page=page_num, last_page=page_num)
if not images:
return ""
img_path = os.path.join(self.output_folder, f"page_{page_num}.png")
images[0].save(img_path, 'PNG')
result = self.ocr.ocr(img_path, cls=True)
if result and result[0]:
ocr_text = [line[1][0] for line in result[0] if line and line[1]]
return '\n'.join(ocr_text) + '\n'
except Exception as e:
print(f"OCR处理出错: {str(e)}")
return ""
class LLMClient:
"""大语言模型客户端,负责与Ollama API交互"""
def __init__(self, api_url: str, model: str):
self.api_url = api_url
self.model = model
def generate_response(self, prompt: str, max_retries: int = 3) -> Optional[str]:
"""生成响应,带重试机制"""
for attempt in range(max_retries):
try:
print(f"正在生成投标文件 (尝试 {attempt + 1}/{max_retries})...")
response = requests.post(
self.api_url,
json=self._build_payload(prompt),
# timeout=120 # 增加超时时间
)
response.raise_for_status()
result = response.json()
response_text = result.get("response", "").strip()
return self._clean_response(response_text)
except requests.exceptions.RequestException as e:
print(f"API请求失败: {str(e)}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"等待{wait_time}秒后重试...")
time.sleep(wait_time)
except json.JSONDecodeError as e:
print(f"JSON解析失败: {str(e)}")
return None
def _build_payload(self, prompt: str) -> dict:
"""构建API请求负载"""
return {
"model": self.model,
"prompt": prompt,
"temperature": 0.3,
"top_k": 50,
"top_p": 0.9,
"stream": False
}
def _clean_response(self, text: str) -> str:
"""清理响应文本"""
text = re.sub(r'
.*?
', ' ', text, flags=re.DOTALL)
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
class BidDocumentGenerator:
"""投标文件生成器"""
@staticmethod
def generate_from_markdown(markdown_text: str, output_file: str):
"""从Markdown文本生成Word文档"""
try:
doc = Document()
BidDocumentGenerator._setup_document_styles(doc)
lines = markdown_text.split('\n')
i = 0
while i < len(lines):
line = lines[i].strip()
if not line:
i += 1
continue
if line.startswith('# '):
BidDocumentGenerator._add_heading(doc, line[2:].strip('* '), level=0)
elif line.startswith('## '):
BidDocumentGenerator._add_heading(doc, line[3:].strip('* '), level=1)
elif line.startswith('### '):
BidDocumentGenerator._add_heading(doc, line[4:].strip('* '), level=2)
elif line.startswith('#### '):
BidDocumentGenerator._add_heading(doc, line[5:].strip('* '), level=3)
elif line.startswith('---'):
BidDocumentGenerator._add_divider(doc)
elif line.startswith('**') and line.endswith('**'):
BidDocumentGenerator._add_bold_paragraph(doc, line[2:-2].strip())
elif re.match(r'^\d+\.\s+', line):
items = []
while i < len(lines) and re.match(r'^\d+\.\s+', lines[i].strip()):
items.append(re.sub(r'^\d+\.\s+', '', lines[i].strip()))
i += 1
BidDocumentGenerator._add_numbered_list(doc, items)
continue
elif line.startswith('- '):
items = []
while i < len(lines) and lines[i].strip().startswith('- '):
items.append(lines[i].strip()[2:].strip())
i += 1
BidDocumentGenerator._add_bullet_list(doc, items)
continue
else:
BidDocumentGenerator._add_normal_paragraph(doc, line.strip('* '))
i += 1
doc.save(output_file)
print(f"\nWord文档已生成: {os.path.basename(output_file)}")
except Exception as e:
print(f"\n文档生成失败: {str(e)}")
raise
@staticmethod
def _setup_document_styles(doc):
"""设置文档样式"""
style = doc.styles['Normal']
font = style.font
font.name = '微软雅黑'
font.size = Pt(10.5)
doc.styles['Normal']._element.rPr.rFonts.set(qn('w:eastAsia'), '微软雅黑')
for level in range(4):
heading_style = doc.styles[f'Heading {level + 1}']
heading_font = heading_style.font
heading_font.name = '微软雅黑'
heading_font.size = Pt(16 - level * 2)
heading_font.bold = True
heading_style._element.rPr.rFonts.set(qn('w:eastAsia'), '微软雅黑')
@staticmethod
def _add_heading(doc, text: str, level: int):
"""添加标题"""
heading = doc.add_heading(level=level + 1)
run = heading.add_run(text)
run.font.name = '微软雅黑'
run.font.size = Pt(16 - level * 2)
run.font.bold = True
run.font.color.rgb = RGBColor(0, 0, 0)
heading.paragraph_format.space_after = Pt(6)
heading.paragraph_format.space_before = Pt(12 if level == 0 else 6)
heading.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER if level == 0 else WD_PARAGRAPH_ALIGNMENT.LEFT
@staticmethod
def _add_bold_paragraph(doc, text: str):
"""添加加粗段落"""
p = doc.add_paragraph()
p.paragraph_format.space_after = Pt(6)
run = p.add_run(text)
run.bold = True
run.font.name = '微软雅黑'
run.font.size = Pt(10.5)
p.paragraph_format.first_line_indent = Inches(0.3)
@staticmethod
def _add_normal_paragraph(doc, text: str):
"""添加普通段落"""
p = doc.add_paragraph()
p.paragraph_format.space_after = Pt(6)
run = p.add_run(text)
run.font.name = '微软雅黑'
run.font.size = Pt(10.5)
p.paragraph_format.first_line_indent = Inches(0.3)
@staticmethod
def _add_numbered_list(doc, items: List[str]):
"""添加有序列表"""
for item in items:
p = doc.add_paragraph(style='List Number')
p.paragraph_format.space_after = Pt(6)
p.paragraph_format.left_indent = Inches(0.5)
run = p.add_run(item)
run.font.name = '微软雅黑'
run.font.size = Pt(10.5)
@staticmethod
def _add_bullet_list(doc, items: List[str]):
"""添加无序列表"""
for item in items:
p = doc.add_paragraph(style='List Bullet')
p.paragraph_format.space_after = Pt(6)
p.paragraph_format.left_indent = Inches(0.5)
run = p.add_run(item)
run.font.name = '微软雅黑'
run.font.size = Pt(10.5)
@staticmethod
def _add_table(doc, table_data: List[List[str]]):
"""添加表格"""
if len(table_data) < 2:
return
table = doc.add_table(rows=len(table_data), cols=len(table_data[0]))
table.style = 'Table Grid'
for row_idx, row in enumerate(table_data):
for col_idx, cell in enumerate(row):
table.cell(row_idx, col_idx).text = cell
table.cell(row_idx, col_idx).paragraphs[0].runs[0].font.name = '微软雅黑'
table.cell(row_idx, col_idx).paragraphs[0].runs[0].font.size = Pt(10)
@staticmethod
def _add_divider(doc):
"""添加分割线"""
p = doc.add_paragraph()
p.paragraph_format.space_after = Pt(12)
run = p.add_run()
run.add_break()
p_border = OxmlElement('w:pBdr')
p_pr = p._element.get_or_add_pPr()
p_pr.append(p_border)
bottom = OxmlElement('w:bottom')
bottom.set(qn('w:val'), 'single')
bottom.set(qn('w:sz'), '6')
bottom.set(qn('w:space'), '1')
bottom.set(qn('w:color'), 'auto')
p_border.append(bottom)
class ContractInfoExtractor:
"""合同信息提取器,主业务流程"""
def __init__(self, pdf_path, output_folder, model, ocr_txt_file_path):
self.pdf_path = pdf_path
self.output_folder = output_folder
self.model = model
self.ollama_api_url = "http://10.80.0.230:11434/api/generate"
self.ocr_txt_file_path = ocr_txt_file_path
def save_string_to_file(self, content, file_path):
"""保存字符串到文件"""
try:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as file:
file.write(content)
print(f"字符串已成功保存到 {file_path}")
except Exception as e:
print(f"保存文件时出错: {e}")
def get_bid_template_info(self):
"""获取投标文件模板信息"""
print(f"开始处理文件: {os.path.basename(self.pdf_path)}")
try:
# 第一步: 提取PDF文本
pdf_extractor = PDFTextExtractor(self.output_folder)
print("正在提取PDF文本内容...")
tender_pdf_text_content = pdf_extractor.extract_tender_text_content_from_pdf(self.pdf_path)
new_ocr_file_path = FileNameHelper.add_timestamp_to_filename(self.ocr_txt_file_path, self.model)
self.save_string_to_file(tender_pdf_text_content, new_ocr_file_path)
if not tender_pdf_text_content:
print("未能提取招标PDF文本内容")
return None
print("\nPDF文本提取成功")
# 第二步: 生成投标文件内容
llm_client = LLMClient(self.ollama_api_url, self.model)
prompt = self._build_extraction_prompt(tender_pdf_text_content)
bid_text = llm_client.generate_response(prompt)
if not bid_text:
print("未能生成投标文件内容")
return None
print("投标文件内容生成成功")
return bid_text
except Exception as e:
print(f"处理过程中出错: {str(e)}")
return None
def _build_extraction_prompt(self, tender_pdf_text: str) -> str:
"""构建结构化的提示词"""
return f""" 【任务说明】
您是一位专业的投标文件撰写专家,需要根据提供的招标文件内容,严格按照招标要求编写一份完整、规范的投标文件。请确保投标文件内容全面、响应准确、格式规范。
【输入内容】
以下是招标文件的完整内容:
{tender_pdf_text}
【编写要求】
1. 严格遵循招标文件中规定的投标文件格式要求
2. 内容必须全面覆盖技术响应和商务响应两大部分
3. 技术响应部分需逐条对应招标文件的技术要求,明确说明响应情况
4. 商务响应部分需完整回应招标文件的商务条款
5. 使用专业、规范的投标文件语言
6. 结构清晰,层次分明
7. 必须包含招标文件中要求的所有证件和提到的必要信息,确保无遗漏
【输出格式要求】
请按照以下标准格式组织投标文件内容:
# 投标文件
## 一、投标函
(包含投标意向、承诺等正式函件内容)
## 二、技术响应
### 1. 技术方案
(详细说明技术实施方案,对应招标文件技术要求)
### 2. 技术偏离表
(以表格形式清晰列出各项技术要求的响应情况)
| 招标要求条目 | 我方响应 | 偏离说明 |
|-------------|---------|---------|
| ... | ... | ... |
## 三、商务响应
### 1. 商务条款响应
(逐条回应招标文件的商务条款)
### 2. 报价明细
(提供完整、清晰的报价明细表)
## 四、公司资质
(提供相关资质证明文件说明,必须包含招标书要求的所有证件)
## 五、服务承诺
(包括售后服务、质量保证等承诺内容)
【注意事项】
1. 请确保所有技术参数和商务条款都得到明确响应
2. 对于有偏离的条款,必须明确说明偏离性质和理由
3. 报价信息应当清晰、完整,符合招标要求
4. 使用正式、专业的商务语言
5. 避免出现模糊不清或不确定的表述
6. 特别检查是否包含了招标文件中提到的所有必要证件和信息
请根据以上要求,编写一份完整、规范的投标文件。
"""
class FileNameHelper:
"""文件名辅助工具"""
@staticmethod
def add_timestamp_to_filename(file_path, model):
"""在文件名中加入时间戳和模型信息"""
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
directory, filename = os.path.split(file_path)
name, ext = os.path.splitext(filename)
modified_string = model.replace(":", "-")
new_filename = f"{name}_{current_time}_{modified_string}{ext}"
return os.path.join(directory, new_filename)
def measure_execution_time(func):
"""执行时间测量装饰器"""
def wrapper(*args, **kwargs):
start_time = time.time()
print(f"\n开始执行: {func.__name__}")
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
minutes = int(execution_time // 60)
seconds = execution_time % 60
print(f"\n执行完成: 耗时 {minutes} 分钟 {seconds:.2f} 秒")
return result
return wrapper
@measure_execution_time
def main():
# 配置参数
pdf_path = r"C:\Leon\python_project\oceanxecm\2025\04\20250402-广电计量-demo\pdf\01-bidding_documents_measuring_instruments_technical_services.pdf"
output_folder = r"C:\Leon\python_project\oceanxecm\2025\04\20250402-广电计量-demo\py\output-images"
model = "deepseek-r1:32b"
word_output_file = r"C:\Leon\python_project\oceanxecm\2025\04\20250402-广电计量-demo\word\投标文件_海南炼油计量器具技术服务协议.docx"
ocr_txt_file_path = r"C:\Leon\python_project\oceanxecm\2025\04\20250402-广电计量-demo\ocr-txt\投标文件_海南炼油计量器具技术服务协议.txt"
# 处理流程
extractor = ContractInfoExtractor(pdf_path, output_folder, model, ocr_txt_file_path)
bid_template = extractor.get_bid_template_info()
if bid_template:
new_word_output_file = FileNameHelper.add_timestamp_to_filename(word_output_file, model)
print(f"\n正在生成Word文档: {os.path.basename(new_word_output_file)}")
BidDocumentGenerator.generate_from_markdown(bid_template, new_word_output_file)

转载地址:http://eoqfk.baihongyu.com/

你可能感兴趣的文章
Objective-C实现控制NRP8S功率计读取功率 (附完整源码)
查看>>
Objective-C实现控制程控电源2306读取电流 (附完整源码)
查看>>
Objective-C实现摄氏温度和华氏温度互转(附完整源码)
查看>>
Objective-C实现播放器(附完整源码)
查看>>
Objective-C实现操作MySQL(附完整源码)
查看>>
Objective-C实现操作注册表 (附完整源码)
查看>>
Objective-C实现改变图片亮度算法(附完整源码)
查看>>
Objective-C实现数字图像处理算法(附完整源码)
查看>>
Objective-C实现数组切片(附完整源码)
查看>>
Objective-C实现数组去重(附完整源码)
查看>>
Objective-C实现数组的循环左移(附完整源码)
查看>>
Objective-C实现数除以二divideByTwo算法(附完整源码)
查看>>
Objective-C实现文件分割(附完整源码)
查看>>
Objective-C实现文件的删除、复制与重命名操作实例(附完整源码)
查看>>
Objective-C实现无序表查找算法(附完整源码)
查看>>
Objective-C实现无锁链表(附完整源码)
查看>>
Objective-C实现无锁链表(附完整源码)
查看>>
Objective-C实现时间戳转为年月日时分秒(附完整源码)
查看>>
Objective-C实现是否为 Pythagoreantriplet 毕氏三元数组算法(附完整源码)
查看>>
Objective-C实现显示响应算法(附完整源码)
查看>>