SmartETL
SmartETL:一个简单、灵活、可配置、开箱即用的Python ETL框架,具有领域特色,拒绝重复造轮子!提供Wikidata / Wikipedia / GDELT等多种开源数据的处理流程; 支持txt/json/csv/excel等文件格式、MySQL/PostgreSQL/MongoDB/ClickHouse/ElasticSearch等数据库作为输入和输出; 提供大模型、Web API等多种处理算子
Install / Use
/learn @ictchenbo/SmartETLREADME

SmartETL:一个简单实用、灵活可配、开箱即用的ETL数据处理框架,内置过滤、转换、大模型调用、向量化、文件读写、数据库读写、API调用等丰富算子,内置新闻、百科、事件、知识库等多种开源情报数据的处理流程,支撑大数据分析系统、企业数仓、数据资源服务等数据加工准备
项目内置100+常用流程、400+常用ETL算子、10+领域特色数据处理流程,覆盖常见数据处理需求,欢迎试用并提出宝贵意见^_^
系统架构图:

项目特色
- 灵活可配的YAML流程定义:通过YAML文件低/无代码组装流程,快速响应数据处理需求
- 丰富的ETL算子:提供数据集成治理、图谱构建、大模型、NLP、信息抽取等常用算子
- 开箱即用的数据流程:面向开源情报分析提供多样化流程,覆盖数据清洗、大模型/大数据预处理、知识图谱构建、机器学习数据集生成、信息抽取、技术评估预测、数据库备份等任务。开箱即用,具体在这里查看
- 持续积累开源情报数据资源10+,所见即所得:
- 支持常见文档文件/数据文件格式读取
- 支持常见数据库读取和写入,覆盖常见OLTP和OLAP场景
- MySQL(关系数据库)基于pymysql
- PostgresSQL(关系数据库)
- ClickHouse(关系数据库、列存数据库)基于clickhouse_driver
- MongoDB(文档数据库)基于pymongo
- ElasticSearch(全文索引) 基于RESTFul API
- Qdrant(向量数据库)基于RESTFul API
- Kafka(消息队列)基于RESTFul API和confluent_kafka
- SQLite(关系数据库、嵌入式数据库)
- MinIO(文件系统、对象存储、KV数据库)
- Neo4J(图数据库)基于neo4j
应用场景
本项目具有众多数据处理分析应用场景:
- 大模型数据预处理:调用大模型进行主题分类、文本翻译、Embedding处理等。参考大模型调用示例 新闻处理
- 信息抽取与NLP处理:网页信息抽取、新闻主题分类、新闻地区识别、文档专门解析。参考网页信息抽取 新闻处理
- 多模态数据处理:调用多模态embedding对图像进行Embedding处理 参考图像索引
- 机器学习/数据挖掘数据集构建:漏洞PoC数据库构建、基于大模型的知识蒸馏、科技评估预测等。参考poc构建
- 开源数据采集:GDELT全球事件、Web API数据集成、网页URL数据采集,JsonP数据解析,图片采集等
- 开源数据处理:wikidata维基数据、维基百科、新闻、图片等数据处理。参考wikidata处理 GDELT数据采集
- 知识图谱构建:基于结构化数据和非结构化数据的实体抽取、关系抽取、事件抽取等(部分算子待完善) 。参考wikidata知识图谱 关于wikidata知识图谱的介绍,可以参考作者的一篇博客文章 https://blog.csdn.net/weixin_40338859/article/details/120571090
- 数据分析:针对Excel、Parquet等表格数据的转换、过滤、去重、统计等。参考项目数据统计1 项目数据统计2
- 数据库管理/DBA:数据库备份、同步、查询分析等。参考ClickHouse导出 MongoDB数据迁移
- 服务监测:定时轮询API/服务状态等。参考数据监测
- 科研数据处理:论文数据采集及处理(arXiv);论文PDF解析、建立全文索引/向量索引,形成统一论文库;论文相关github代码下载
- 代码分析:基于
ast对python代码进行分析,对类和函数进行统计
快速使用
- 安装基本依赖
pip install -r requirements.txt
- 查看帮助
python main_flow.py -h

- 启动流程
- 方式一:基于命令行CLI运行:
python main_flow.py --loader "String(arg1, sep=';')" --processor "Print" local "1;2;3"
- 方式二:基于YAML流程定义运行:
python main_flow.py flows/test.yaml
可以在这里找到很多开箱即用的流程。查看Yaml定义规范
- 方式三:基于Python代码运行: 以下示例与test.yaml等价:
from smartetl.flow_engine import run
from smartetl.loader import JsonLine
from smartetl.processor import Print, Count, Chain, Fork, Select, AddFields
# 定义节点
# 定义数据源 新闻数据
loader = JsonLine('test_data/news.jsonl')
# 选择id和url两个字段
select = Select('id', 'url')
# 添加一个字段chain,值为1,然后打印
chain1 = Chain(AddFields(chain='1'), Print())
# 添加一个字段chain,值为2,然后打印
chain2 = Chain(AddFields(chain='2'), Print())
# 定义整体处理流程
processor = Chain(select, Fork(chain1, chain2, copy_data=True))
# 执行流程
run(loader, processor)
CLI流程示例
- 加载
EarthCamjsonp数据
python .\main_flow.py --loader "Function('wikidata_filter.gestata.jsonp.E', 'https://www.earthcam.com/cams/common/gethofi
tems.php?hofsource=com&tm=ecn&camera=all&start=0&length=21&ec_favorite=0&cdn=0&date_start=undefined&date_end=undefined&id=&callback=onjsonpload')" --processor "Chain(SelectVal('hofdata'), Flat(), Print())" list1
- 加载
EarthCam摄像头搜索结果,显示摄像头名字
python .\main_flow.py --loader "web.api.Get('https://www.earthcam.com/api/mapsearch/get_locations?nwx=37.38509688580934&nwy=-126.40319824218751&nex=37.38509688580934&ney=-109.18212890625001&sex=29.95662353271325&sey=-109.18212890625001&swx=29.95662353271325&swy=-126.40319824218751&zoom=9')" --processor "Chain(Flat(), SelectVal('places'), Flat(), SelectVal('name'), Print())" test1
YAML流程定义示例
Tips:可先查看已有流程,看是否有相关任务的,尽量基于已有流程修改。
loader: CSV('test_data/unesco-projects-20241113.csv')
nodes:
print: Print
rename1: RenameFields(**rename)
group: Group(by='beneficiary_country', emit_fast=False)
g_count: aggs.Count
g_total_budget: aggs.Sum('budget')
g_total_cost: aggs.Sum('cumulative_cost')
processor: Chain(rename1, group, g_total_cost, print)
- 示例2:输入wikidata dump文件(gz/json)生成id-name映射文件(方便根据ID查询名称),同时对数据结构进行简化,查看详情
name: p1_idname_simple
arguments: 1
loader: WikidataJsonDump(arg1)
nodes:
n1: wikidata.IDNameMap
n2: WriteJson('data/id-name.json')
n3: wikidata.Simplify
n4: wikidata.SimplifyProps
n5: WriteJson('test_data/p1.json')
chain1: Chain(n1, n2)
chain2: Chain(n3, n4, n5)
processor: Fork(chain1, chain2)
- 示例3:基于wikidata生成简单图谱结构,包含Item/Property/Item_Property/Property_Property四张表 查看详情
name: p1_wikidata_graph
description: transform wikidata dump to graph, including item/property/item_property/property_property
arguments: 1
loader: WikidataJsonDump(arg1)
nodes:
writer1: WriteJson('test_data/item.json')
writer2: WriteJson('test_data/property.json')
writer3: WriteJson('test_data/item_property.json')
writer4: WriteJson('test_data/property_property.json')
rm_type: RemoveFields('_type')
entity: wikidata.Entity
filter_item: matcher.SimpleMatch(_type='item')
filter_property: matcher.SimpleMatch(_type='property')
chain1: Chain(filter_item, rm_type, writer1)
chain2: Chain(filter_property, rm_type, writer2)
group1: Fork(chain1, chain2)
property: wikidata.Property
filter_item_property: matcher.SimpleMatch(_type='item_property')
filter_property_property: matcher.SimpleMatch(_type='property_property')
chain3: Chain(filter_item_property, rm_type, writer3)
chain4: Chain(filter_property_property, rm_type, writer4)
group2: Fork(chain3, chain4)
chain_entity: Chain(entity, group1)
chain_property: Chain(property, group2)
processor: Fork(chain_entity, chain_property)
算子统计结果(截至2025.12.28)
| 算子类型 | 所在模块 | 数量 | |------------|------|--------------| | loader | smartetl/loader | 48 | | processor | smartetl/processor | 119 | | database | smartetl/database |16 | | util | smartetl/util | 80 | | gestata | smartetl/gestata | 182 | | 合计 | - | 443 |
TIPS: 统计方法 python main_flow.py flows/scan_py.yaml
流程统计(截至2025.12.28)
| 分类 | 数量 | |--------------|--------| | arxiv | 27 | | news | 19 | | dba | 17 | | wikidata | 16 | | paper | 14 | | files | 13 | | nl2poc | 12 | | kg | 11 | | demos | 7 | | llm | 6 | | crawler | 4 | | web | 4 | | agent | 2 | | event | 2 | | osint | 1 | | paperwithcode| 1 | | wikipedia | 1 | | work-eval | 1 | | 其他 | 15 | | 合计 | 173 |
TIPS:统计方法 python main_flow.py flows/scan_flow.yaml
New!
-
2025.12.29 修改算子统计方法,设计为
gestata.python模块 统计方法:python main_flow.py flows/scan_py.yaml -
2025.12.28
- 实现项目类算子(loader+processor+database)和函数式算子(util+gestata)统计
python analyze_class.py -o data/metadata/database.json smartetl smartetl.database实现类算子统计;python analyze_function.py -o data/metadata/util.json smartetl/util实现函数式算子统计 - 实现流程统计流程
- 将文件加载器进行统一管理
- 新增
util.files.read实现各种类型文件读取
- 2025.12.25
- 实现
paperwithcode数据集论文关联代码下载,查看
- 2025.9.14
- 完善
gestata.mineru算子 - 新增
apps.pdf_parser_parallel,搭配mineru的并行化部署,实现大规模PDF解析 - arXiv相关处理流程优化
- 2025.8.21
- 完善论文处理流程
- 完善新的gdelt处理流程
- 新增微信公众号文章采集流程
-
2025.8.14 启动V3版本开发
-
2025.8.13
- 更新GDELT解析入库流程
- 实现日志机制,通过YAML中配置
logging,全局注入debuginfowarningerror函数,方便组件使用;也支持替换print。查看示例
- 2025.8.12
Map支持多个位置参数 #37- GDELT流程拆分为
