Youtu-Parsing结构化输出教程如何定制JSON Schema适配内部业务系统字段映射你是不是遇到过这样的问题公司内部系统有一套固定的数据格式但每次从文档里提取信息都要手动整理、复制粘贴费时费力还容易出错。特别是财务报销单、合同、发票这些结构化文档每个字段都要对号入座简直让人头疼。今天要介绍的Youtu-Parsing就能帮你解决这个痛点。它不仅能识别文档里的文字、表格、公式还能按照你定义的格式直接输出结构化的JSON数据。这意味着你可以让AI自动把文档内容映射到公司系统的数据库字段里省去大量人工整理的时间。1. 为什么需要定制化JSON输出在开始之前我们先搞清楚一个核心问题为什么标准的文档解析不够用想象一下你公司的人力资源系统需要录入员工信息表。系统要求的数据格式是这样的{ employee_id: E001, full_name: 张三, department: 技术部, position: 高级工程师, hire_date: 2023-06-15, salary: 25000 }但文档解析模型通常输出的是这样的员工编号E001 姓名张三 所属部门技术部 职位高级工程师 入职日期2023年6月15日 月薪25000元看到问题了吗虽然内容都对但格式完全不一样。你需要的是结构化的JSON数据可以直接导入数据库而不是一段需要再次处理的文本。这就是定制化JSON Schema的价值所在——让AI按照你的规则输出数据一步到位。2. Youtu-Parsing的核心能力不只是文字识别Youtu-Parsing基于腾讯优图的Youtu-LLM-2B模型构建它和普通的OCR工具最大的区别在于“理解能力”。普通OCR只能识别文字而Youtu-Parsing能理解文档的结构和语义。2.1 全要素解析看到文档的每一个细节这个模型能识别文档中的六类关键元素文本内容不仅仅是识别文字还能理解段落、标题、列表等结构表格数据自动识别表格边框、行列转换为HTML或结构化数据数学公式把复杂的数学表达式转换成LaTeX格式图表信息识别图表类型提取关键数据点印章识别定位文档中的印章位置和内容手写文字即使是手写体也能准确识别2.2 像素级定位知道每个字在哪里传统的文档解析往往只关注内容不关心位置。但Youtu-Parsing能做到像素级的精确定位为每个识别出的元素提供坐标信息。这个功能特别有用比如你要提取发票上的“金额”字段不仅要知道金额是多少还要知道它在发票的哪个位置方便后续的验证和核对。2.3 结构化输出从文档到数据的直接转换这是本文的重点。Youtu-Parsing支持三种输出格式纯文本干净的、去除了格式干扰的文本内容Markdown保留文档结构适合阅读和展示JSON完全结构化的数据可以直接用于程序处理我们今天要深入探讨的就是如何定制JSON Schema让输出完全符合你的业务需求。3. 实战开始定义你的JSON Schema让我们通过一个实际的例子来学习。假设你是一家电商公司的技术负责人需要处理供应商发来的产品报价单。报价单通常包含产品信息、价格、规格等数据你需要把这些信息提取出来导入到公司的采购系统中。3.1 第一步分析业务需求先看看采购系统需要哪些字段{ product: { sku: PROD-001, name: 无线蓝牙耳机, category: 电子产品/音频设备, specifications: { color: 黑色, battery_life: 20小时, charging_time: 1.5小时 } }, supplier: { name: 某某科技有限公司, contact: 李经理, phone: 13800138000 }, pricing: { unit_price: 299.00, currency: CNY, moq: 100, delivery_days: 7 }, document_info: { quote_number: QT20240115001, date: 2024-01-15, valid_until: 2024-02-15 } }这是一个嵌套结构包含了产品、供应商、价格和文档信息四个主要部分。每个部分都有特定的字段有些字段还是嵌套的对象。3.2 第二步设计Schema模板Youtu-Parsing使用JSON Schema来定义输出格式。JSON Schema是一种描述JSON数据结构的标准就像给JSON数据定了一个“模板”。针对上面的采购报价单我们可以设计这样的Schema{ $schema: http://json-schema.org/draft-07/schema#, title: 采购报价单解析模板, description: 用于解析供应商产品报价单的结构化模板, type: object, properties: { product: { type: object, description: 产品信息, properties: { sku: { type: string, description: 产品SKU编码 }, name: { type: string, description: 产品名称 }, category: { type: string, description: 产品分类 }, specifications: { type: object, description: 产品规格参数, properties: { color: {type: string}, battery_life: {type: string}, charging_time: {type: string} } } }, required: [sku, name] }, supplier: { type: object, description: 供应商信息, properties: { name: {type: string}, contact: {type: string}, phone: {type: string} } }, pricing: { type: object, description: 价格信息, properties: { unit_price: { type: number, description: 单价 }, currency: { type: string, enum: [CNY, USD, EUR], default: CNY }, moq: { type: integer, description: 最小起订量 }, delivery_days: { type: integer, description: 交货天数 } } }, document_info: { type: object, description: 文档信息, properties: { quote_number: {type: string}, date: {type: string, format: date}, valid_until: {type: string, format: date} } } }, required: [product, pricing] }这个Schema定义了每个字段的数据类型string、number、integer、object等字段的描述信息帮助模型理解要提取什么哪些字段是必填的required枚举值限制比如currency只能取CNY、USD、EUR默认值设置3.3 第三步准备示例文档和标注要让模型学会按照你的Schema提取信息你需要提供一些示例。这就是所谓的“少样本学习”few-shot learning。准备3-5张不同类型的报价单图片然后为每张图片标注出Schema中定义的字段。标注格式如下{ product: { sku: PROD-001, name: 无线蓝牙耳机, category: 电子产品/音频设备, specifications: { color: 黑色, battery_life: 20小时, charging_time: 1.5小时 } }, supplier: { name: 某某科技有限公司, contact: 李经理, phone: 13800138000 }, pricing: { unit_price: 299.00, currency: CNY, moq: 100, delivery_days: 7 }, document_info: { quote_number: QT20240115001, date: 2024-01-15, valid_until: 2024-02-15 } }把这些示例保存为JSON文件和对应的图片放在一起。模型会从这些示例中学习如何从新文档中提取信息。4. 代码实现让Youtu-Parsing按你的规则工作现在我们来写代码把上面设计的Schema应用到实际的文档解析中。4.1 安装和配置首先确保你已经部署了Youtu-Parsing服务。如果还没部署可以参考项目文档快速搭建。服务运行后WebUI界面通常在http://localhost:7860。4.2 使用Python调用APIYoutu-Parsing提供了API接口我们可以用Python程序来调用。下面是一个完整的示例import requests import json from pathlib import Path from typing import Dict, Any class YoutuParsingClient: def __init__(self, base_url: str http://localhost:7860): self.base_url base_url self.api_url f{base_url}/api/parse def load_schema(self, schema_path: str) - Dict[str, Any]: 加载JSON Schema文件 with open(schema_path, r, encodingutf-8) as f: schema json.load(f) return schema def prepare_prompt(self, schema: Dict[str, Any], examples: list None) - str: 构建解析提示词 prompt f请按照以下JSON Schema解析文档内容 {schema[description]} 输出要求 1. 严格按照Schema定义的结构输出JSON 2. 只提取Schema中定义的字段 3. 如果某个字段在文档中找不到可以留空或使用null 4. 保持数据类型一致字符串、数字、布尔值等 Schema结构 {json.dumps(schema, ensure_asciiFalse, indent2)} if examples: prompt \n\n参考示例\n for i, example in enumerate(examples, 1): prompt f示例{i}\n{json.dumps(example, ensure_asciiFalse, indent2)}\n return prompt def parse_document( self, image_path: str, schema: Dict[str, Any], output_format: str json, examples: list None ) - Dict[str, Any]: 解析文档并返回结构化数据 # 准备请求数据 with open(image_path, rb) as f: files {image: f} data { prompt: self.prepare_prompt(schema, examples), output_format: output_format, structured: true } # 发送请求 response requests.post( self.api_url, filesfiles, datadata, timeout60 # 设置超时时间 ) if response.status_code 200: result response.json() return result else: raise Exception(f解析失败: {response.status_code} - {response.text}) def batch_parse( self, image_dir: str, schema: Dict[str, Any], output_dir: str ./outputs ) - list: 批量解析文档 image_dir Path(image_dir) output_dir Path(output_dir) output_dir.mkdir(exist_okTrue) results [] image_files list(image_dir.glob(*.jpg)) list(image_dir.glob(*.png)) for image_file in image_files: try: print(f正在处理: {image_file.name}) result self.parse_document(str(image_file), schema) # 保存结果 output_file output_dir / f{image_file.stem}.json with open(output_file, w, encodingutf-8) as f: json.dump(result, f, ensure_asciiFalse, indent2) results.append({ file: image_file.name, success: True, output: output_file }) except Exception as e: results.append({ file: image_file.name, success: False, error: str(e) }) print(f处理失败 {image_file.name}: {e}) return results # 使用示例 if __name__ __main__: # 初始化客户端 client YoutuParsingClient() # 加载Schema schema client.load_schema(purchase_quote_schema.json) # 加载示例数据如果有 examples [] try: with open(examples.json, r, encodingutf-8) as f: examples json.load(f) except FileNotFoundError: print(未找到示例文件将不使用示例学习) # 解析单个文档 result client.parse_document( image_pathquote_001.jpg, schemaschema, examplesexamples[:3] if examples else None # 使用前3个示例 ) print(解析结果) print(json.dumps(result, ensure_asciiFalse, indent2)) # 批量处理 # results client.batch_parse(quotes/, schema) # print(f批量处理完成成功{sum(1 for r in results if r[success])}失败{sum(1 for r in results if not r[success])})4.3 解析结果后处理有时候模型提取的数据可能需要进一步处理比如格式转换、数据清洗等。我们可以添加一个后处理模块class ResultProcessor: staticmethod def clean_string(value: str) - str: 清理字符串去除多余空格和换行 if not value: return value return .join(value.split()).strip() staticmethod def parse_price(price_str: str) - float: 解析价格字符串提取数字 import re if not price_str: return 0.0 # 匹配数字包括小数 match re.search(r[\d,]\.?\d*, price_str) if match: # 去除千分位逗号 num_str match.group().replace(,, ) return float(num_str) return 0.0 staticmethod def parse_date(date_str: str) - str: 解析日期统一格式为YYYY-MM-DD import re from datetime import datetime if not date_str: return # 尝试多种日期格式 patterns [ r(\d{4})年(\d{1,2})月(\d{1,2})日, r(\d{4})-(\d{1,2})-(\d{1,2}), r(\d{4})/(\d{1,2})/(\d{1,2}), r(\d{1,2})/(\d{1,2})/(\d{4}) # 美式日期 ] for pattern in patterns: match re.match(pattern, date_str) if match: groups match.groups() if len(groups) 3: try: if pattern patterns[3]: # 美式日期 MM/DD/YYYY month, day, year groups else: # YYYY-MM-DD 或类似格式 year, month, day groups # 补齐前导零 month month.zfill(2) day day.zfill(2) return f{year}-{month}-{day} except: continue return date_str # 无法解析返回原字符串 def process_result(self, result: Dict[str, Any]) - Dict[str, Any]: 处理解析结果进行数据清洗和转换 processed result.copy() # 清理产品信息 if product in processed: product processed[product] if name in product: product[name] self.clean_string(product[name]) if sku in product: product[sku] self.clean_string(product[sku]) # 处理价格信息 if pricing in processed: pricing processed[pricing] if unit_price in pricing and isinstance(pricing[unit_price], str): pricing[unit_price] self.parse_price(pricing[unit_price]) # 处理日期信息 if document_info in processed: doc_info processed[document_info] for date_field in [date, valid_until]: if date_field in doc_info and doc_info[date_field]: doc_info[date_field] self.parse_date(doc_info[date_field]) return processed # 使用后处理器 processor ResultProcessor() cleaned_result processor.process_result(result)5. 高级技巧提升解析准确率即使有了好的Schema有时候解析结果可能还是不够准确。这里分享几个提升准确率的技巧。5.1 提供更详细的字段描述在Schema中字段的description非常重要。好的描述能帮助模型更好地理解要提取什么。{ unit_price: { type: number, description: 产品的单价通常是数字可能带有货币符号如¥、$、€或者包含元、美元等单位。注意提取纯数字部分忽略货币符号和单位。 }, moq: { type: integer, description: 最小起订量通常是整数可能写为MOQ: 100、最小订单量: 50、起订量100件等形式。提取数字部分即可。 } }5.2 使用枚举限制选项对于有限选项的字段使用enum可以显著提高准确率{ currency: { type: string, enum: [CNY, USD, EUR, GBP, JPY], description: 货币类型从文档中识别货币符号或文字映射到标准代码人民币→CNY美元→USD欧元→EUR英镑→GBP日元→JPY }, payment_terms: { type: string, enum: [T/T, L/C, D/P, D/A, Western Union], description: 付款方式常见的有电汇(T/T)、信用证(L/C)、付款交单(D/P)、承兑交单(D/A)、西联汇款等 } }5.3 添加字段依赖关系有些字段之间存在逻辑关系可以在Schema中体现{ delivery: { type: object, properties: { method: { type: string, enum: [海运, 空运, 陆运, 快递] }, lead_time: { type: integer, description: 交货时间单位是天 }, port: { type: string, description: 目的港当运输方式为海运时需要此字段 } }, required: [method, lead_time], if: { properties: { method: { const: 海运 } } }, then: { required: [port] } } }5.4 多示例学习策略提供多样化的示例非常重要不同版式的文档横版、竖版、带水印、扫描质量不同的不同字段组合有些字段可能在某些文档中缺失不同表述方式同一个意思可能有多种表达方式建议至少准备5-10个高质量的标注示例覆盖各种常见情况。6. 集成到业务系统解析出来的结构化数据最终要导入到业务系统中。这里提供几种常见的集成方式。6.1 直接写入数据库import sqlite3 import pandas as pd from datetime import datetime class DatabaseExporter: def __init__(self, db_path: str purchase_system.db): self.conn sqlite3.connect(db_path) self.create_tables() def create_tables(self): 创建数据库表 cursor self.conn.cursor() # 产品表 cursor.execute( CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, sku TEXT UNIQUE, name TEXT, category TEXT, specifications TEXT, -- JSON格式存储 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ) # 供应商表 cursor.execute( CREATE TABLE IF NOT EXISTS suppliers ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, contact TEXT, phone TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ) # 报价表 cursor.execute( CREATE TABLE IF NOT EXISTS quotes ( id INTEGER PRIMARY KEY AUTOINCREMENT, quote_number TEXT UNIQUE, product_id INTEGER, supplier_id INTEGER, unit_price REAL, currency TEXT, moq INTEGER, delivery_days INTEGER, quote_date DATE, valid_until DATE, document_path TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (product_id) REFERENCES products (id), FOREIGN KEY (supplier_id) REFERENCES suppliers (id) ) ) self.conn.commit() def export_result(self, result: Dict[str, Any], image_path: str): 将解析结果导出到数据库 cursor self.conn.cursor() try: # 插入或更新产品信息 product result.get(product, {}) cursor.execute( INSERT OR REPLACE INTO products (sku, name, category, specifications) VALUES (?, ?, ?, ?) , ( product.get(sku), product.get(name), product.get(category), json.dumps(product.get(specifications, {}), ensure_asciiFalse) )) product_id cursor.lastrowid # 插入或更新供应商信息 supplier result.get(supplier, {}) cursor.execute( INSERT OR REPLACE INTO suppliers (name, contact, phone) VALUES (?, ?, ?) , ( supplier.get(name), supplier.get(contact), supplier.get(phone) )) supplier_id cursor.lastrowid # 插入报价信息 pricing result.get(pricing, {}) doc_info result.get(document_info, {}) cursor.execute( INSERT OR REPLACE INTO quotes (quote_number, product_id, supplier_id, unit_price, currency, moq, delivery_days, quote_date, valid_until, document_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) , ( doc_info.get(quote_number), product_id, supplier_id, pricing.get(unit_price), pricing.get(currency, CNY), pricing.get(moq), pricing.get(delivery_days), doc_info.get(date), doc_info.get(valid_until), image_path )) self.conn.commit() print(f成功导入报价单: {doc_info.get(quote_number, 未知编号)}) except Exception as e: self.conn.rollback() print(f导入失败: {e}) raise def close(self): 关闭数据库连接 self.conn.close() # 使用示例 exporter DatabaseExporter() exporter.export_result(cleaned_result, quote_001.jpg) exporter.close()6.2 生成API接口如果你需要让其他系统调用可以封装成REST APIfrom fastapi import FastAPI, File, UploadFile, HTTPException from pydantic import BaseModel import tempfile import os app FastAPI(title文档解析API, description基于Youtu-Parsing的文档结构化解析服务) class ParseRequest(BaseModel): schema_name: str use_examples: bool True class ParseResponse(BaseModel): success: bool data: dict None error: str None app.post(/parse/quote, response_modelParseResponse) async def parse_quote_document( file: UploadFile File(...), request: ParseRequest None ): 解析采购报价单 try: # 保存上传的文件 with tempfile.NamedTemporaryFile(deleteFalse, suffix.jpg) as tmp_file: content await file.read() tmp_file.write(content) tmp_path tmp_file.name # 加载对应的Schema if request.schema_name purchase_quote: schema_path schemas/purchase_quote_schema.json elif request.schema_name invoice: schema_path schemas/invoice_schema.json else: raise HTTPException(status_code400, detail不支持的Schema类型) # 解析文档 client YoutuParsingClient() schema client.load_schema(schema_path) examples None if request.use_examples: examples_path fexamples/{request.schema_name}_examples.json if os.path.exists(examples_path): with open(examples_path, r, encodingutf-8) as f: examples json.load(f) result client.parse_document(tmp_path, schema, examplesexamples) # 清理临时文件 os.unlink(tmp_path) # 后处理 processor ResultProcessor() cleaned_result processor.process_result(result) # 导出到数据库可选 # exporter DatabaseExporter() # exporter.export_result(cleaned_result, file.filename) return ParseResponse(successTrue, datacleaned_result) except Exception as e: return ParseResponse(successFalse, errorstr(e)) app.get(/schemas) async def list_schemas(): 列出所有可用的Schema schemas [] schema_dir schemas for file in os.listdir(schema_dir): if file.endswith(.json): with open(os.path.join(schema_dir, file), r, encodingutf-8) as f: schema json.load(f) schemas.append({ name: file.replace(.json, ), title: schema.get(title, ), description: schema.get(description, ) }) return schemas if __name__ __main__: import uvicorn uvicorn.run(app, host0.0.0.0, port8000)6.3 自动化工作流集成对于需要批量处理的场景可以建立完整的工作流import schedule import time from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import shutil class DocumentProcessor: def __init__(self): self.client YoutuParsingClient() self.processor ResultProcessor() self.exporter DatabaseExporter() # 加载所有Schema self.schemas {} schema_dir schemas for file in os.listdir(schema_dir): if file.endswith(.json): schema_name file.replace(.json, ) self.schemas[schema_name] self.client.load_schema( os.path.join(schema_dir, file) ) def process_folder(self, folder_path: str, schema_name: str): 处理文件夹中的所有文档 folder Path(folder_path) if not folder.exists(): print(f文件夹不存在: {folder_path}) return image_extensions [.jpg, .jpeg, .png, .bmp, .tiff] processed_dir folder / processed processed_dir.mkdir(exist_okTrue) for ext in image_extensions: for image_file in folder.glob(f*{ext}): if image_file.parent processed_dir: continue print(f处理文件: {image_file.name}) try: # 解析文档 result self.client.parse_document( str(image_file), self.schemas[schema_name] ) # 后处理 cleaned_result self.processor.process_result(result) # 导出到数据库 self.exporter.export_result(cleaned_result, str(image_file)) # 移动已处理文件 shutil.move(str(image_file), str(processed_dir / image_file.name)) print(f处理完成: {image_file.name}) except Exception as e: print(f处理失败 {image_file.name}: {e}) def start_watching(self, watch_folder: str, schema_name: str): 监控文件夹自动处理新文件 class NewFileHandler(FileSystemEventHandler): def __init__(self, processor, schema_name): self.processor processor self.schema_name schema_name def on_created(self, event): if not event.is_directory: file_path Path(event.src_path) if file_path.suffix.lower() in [.jpg, .jpeg, .png]: print(f检测到新文件: {file_path.name}) # 等待文件完全写入 time.sleep(1) self.processor.process_folder(str(file_path.parent), self.schema_name) event_handler NewFileHandler(self, schema_name) observer Observer() observer.schedule(event_handler, watch_folder, recursiveFalse) observer.start() print(f开始监控文件夹: {watch_folder}) try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() # 使用示例 processor DocumentProcessor() # 方式1立即处理一个文件夹 processor.process_folder(./incoming_quotes/, purchase_quote) # 方式2定时处理每天上午9点 schedule.every().day.at(09:00).do( processor.process_folder, ./incoming_quotes/, purchase_quote ) # 方式3实时监控文件夹 # processor.start_watching(./watch_folder/, purchase_quote)7. 总结通过定制JSON SchemaYoutu-Parsing从一个通用的文档解析工具变成了能够直接适配你业务系统的智能数据提取引擎。整个过程可以总结为四个关键步骤第一步定义数据结构。分析你的业务系统需要什么数据设计对应的JSON Schema。记住好的Schema应该像一份清晰的数据说明书告诉模型每个字段的含义、格式和要求。第二步准备学习材料。收集一些典型的文档作为示例仔细标注每个字段。这些示例就像给模型看的“参考答案”质量越高、覆盖越广模型学得越好。第三步编写处理代码。用Python把整个流程串起来调用API解析文档、按照Schema提取数据、清洗和转换格式、最后导入业务系统。代码要健壮能处理各种异常情况。第四步持续优化迭代。在实际使用中你会遇到各种特殊情况。把这些情况补充到示例中优化Schema的描述调整后处理逻辑。模型会越用越聪明。这种定制化的好处很明显自动化程度高文档来了自动处理准确率高按照你的业务规则提取集成简单输出就是系统需要的格式。不过也要注意几个实际问题文档质量会影响识别效果模糊、倾斜、复杂的版式可能识别不准字段太多太复杂时需要更多的示例来训练业务规则变化时Schema和代码都需要更新。最好的实践是从简单的文档开始先处理那些格式规范、内容清晰的积累经验后再逐步扩展到更复杂的场景。记住这是一个持续优化的过程不是一蹴而就的。现在你可以开始设计自己的Schema让Youtu-Parsing为你的业务系统提供智能化的文档处理能力了。从一张发票、一份合同、一个报表开始你会发现很多重复性的文档处理工作其实可以交给AI来完成。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。