feat(schema): 新增数据集相关模型并添加文档扫描功能
- 新增 dataset.py 文件,定义数据集相关模型 - 新增 tools 目录,包含解析 Markdown 和扫描文档的功能 - 修改 parse_markdown.py,增加处理 Markdown 文件的函数 - 新增 scan_doc_dir.py,实现文档目录扫描功能
This commit is contained in:
parent
6a00699472
commit
541d37c674
28
schema/dataset.py
Normal file
28
schema/dataset.py
Normal file
@ -0,0 +1,28 @@
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel, Field
|
||||
from datetime import datetime, timezone
|
||||
|
||||
class doc(BaseModel):
|
||||
id: Optional[int] = Field(default=None, description="文档ID")
|
||||
name: str = Field(default="", description="文档名称")
|
||||
path: str = Field(default="", description="文档路径")
|
||||
markdown_files: list[str] = Field(default_factory=list, description="文档路径列表")
|
||||
|
||||
class Q_A(BaseModel):
|
||||
question: str = Field(default="", min_length=1,description="问题")
|
||||
answer: str = Field(default="", min_length=1, description="答案")
|
||||
|
||||
class dataset_item(BaseModel):
|
||||
id: Optional[int] = Field(default=None, description="数据集项ID")
|
||||
message: list[Q_A] = Field(description="数据集项内容")
|
||||
|
||||
class dataset(BaseModel):
|
||||
id: Optional[int] = Field(default=None, description="数据集ID")
|
||||
name: Optional[str] = Field(default=None, description="数据集名称")
|
||||
model_id: Optional[list[str]] = Field(default=None, description="数据集使用的模型ID")
|
||||
description: Optional[str] = Field(default="", description="数据集描述")
|
||||
created_at: datetime = Field(
|
||||
default_factory=lambda: datetime.now(timezone.utc),
|
||||
description="记录创建时间"
|
||||
)
|
||||
dataset_items: list[dataset_item] = Field(default_factory=list, description="数据集项列表")
|
2
tools/__init__.py
Normal file
2
tools/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
from .parse_markdown import parse_markdown
|
||||
from .scan_doc_dir import *
|
@ -6,6 +6,27 @@ from pathlib import Path
|
||||
sys.path.append(str(Path(__file__).resolve().parent.parent))
|
||||
from schema import MarkdownNode
|
||||
|
||||
def process_markdown_file(file_path):
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
root = parse_markdown(content)
|
||||
results = []
|
||||
|
||||
def traverse(node, parent_titles):
|
||||
current_titles = parent_titles.copy()
|
||||
current_titles.append(node.title)
|
||||
|
||||
if not node.children: # 叶子节点
|
||||
if node.content:
|
||||
full_text = ' -> '.join(current_titles) + '\n' + node.content
|
||||
results.append(full_text)
|
||||
else:
|
||||
for child in node.children:
|
||||
traverse(child, current_titles)
|
||||
|
||||
traverse(root, [])
|
||||
return results
|
||||
def add_child(parent, child):
|
||||
parent.children.append(child)
|
||||
|
||||
@ -60,10 +81,13 @@ def parse_markdown(markdown):
|
||||
return root
|
||||
|
||||
if __name__=="__main__":
|
||||
# 从文件读取 Markdown 内容
|
||||
with open("workdir/example.md", "r", encoding="utf-8") as f:
|
||||
markdown = f.read()
|
||||
# # 从文件读取 Markdown 内容
|
||||
# with open("workdir/example.md", "r", encoding="utf-8") as f:
|
||||
# markdown = f.read()
|
||||
|
||||
# 解析 Markdown 并打印树结构
|
||||
root = parse_markdown(markdown)
|
||||
print_tree(root)
|
||||
# # 解析 Markdown 并打印树结构
|
||||
# root = parse_markdown(markdown)
|
||||
# print_tree(root)
|
||||
for i in process_markdown_file("workdir/example.md"):
|
||||
print("~"*20)
|
||||
print(i)
|
32
tools/scan_doc_dir.py
Normal file
32
tools/scan_doc_dir.py
Normal file
@ -0,0 +1,32 @@
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# 添加项目根目录到sys.path
|
||||
sys.path.append(str(Path(__file__).resolve().parent.parent))
|
||||
from schema import doc
|
||||
|
||||
def scan_docs_directory(workdir: str):
|
||||
docs_dir = os.path.join(workdir, "docs")
|
||||
|
||||
doc_list = os.listdir(docs_dir)
|
||||
|
||||
to_return = []
|
||||
|
||||
for doc_name in doc_list:
|
||||
doc_path = os.path.join(docs_dir, doc_name)
|
||||
if os.path.isdir(doc_path):
|
||||
markdown_files = []
|
||||
for root, dirs, files in os.walk(doc_path):
|
||||
for file in files:
|
||||
if file.endswith(".md"):
|
||||
markdown_files.append(os.path.join(root, file))
|
||||
to_return.append(doc(name=doc_name, path=doc_path, markdown_files=markdown_files))
|
||||
|
||||
return to_return
|
||||
|
||||
# 添加测试代码
|
||||
if __name__ == "__main__":
|
||||
workdir = os.path.join(os.path.dirname(__file__), "..", "workdir")
|
||||
docs = scan_docs_directory(workdir)
|
||||
print(docs)
|
Loading…
x
Reference in New Issue
Block a user