SkillAgentSearch skills...

SmartETL

SmartETL:一个简单、灵活、可配置、开箱即用的Python ETL框架,具有领域特色,拒绝重复造轮子!提供Wikidata / Wikipedia / GDELT等多种开源数据的处理流程; 支持txt/json/csv/excel等文件格式、MySQL/PostgreSQL/MongoDB/ClickHouse/ElasticSearch等数据库作为输入和输出; 提供大模型、Web API等多种处理算子

Install / Use

/learn @ictchenbo/SmartETL
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

logo

SmartETL:一个简单实用、灵活可配、开箱即用的ETL数据处理框架,内置过滤、转换、大模型调用向量化、文件读写、数据库读写、API调用等丰富算子,内置新闻、百科、事件、知识库等多种开源情报数据的处理流程,支撑大数据分析系统、企业数仓、数据资源服务等数据加工准备

项目内置100+常用流程、400+常用ETL算子、10+领域特色数据处理流程,覆盖常见数据处理需求,欢迎试用并提出宝贵意见^_^

系统架构图: 系统架构

项目特色

  1. 灵活可配的YAML流程定义:通过YAML文件低/无代码组装流程,快速响应数据处理需求
  2. 丰富的ETL算子:提供数据集成治理图谱构建大模型NLP信息抽取等常用算子
  3. 开箱即用的数据流程:面向开源情报分析提供多样化流程,覆盖数据清洗、大模型/大数据预处理、知识图谱构建、机器学习数据集生成、信息抽取、技术评估预测、数据库备份等任务。开箱即用,具体在这里查看
  4. 持续积累开源情报数据资源10+,所见即所得:
  5. 支持常见文档文件/数据文件格式读取
    • txt
    • csv
    • html
    • Markdown(支持识别内容层级,支持提取表格、图片)
    • pdf
    • docx(doc, docx)
    • eml
    • Excel(xls, xlsx)
    • PPT(ppt, pptx)
    • PST
    • OST
    • OMG
    • json(支持json json-line, json-array, json-free多种格式)
    • yaml
    • parquet
    • zip
    • rar
    • 7zip
    • tar(.gz|.bz2|.xz)
    • gzip
    • bz2
    • xz
  6. 支持常见数据库读取和写入,覆盖常见OLTP和OLAP场景
    • MySQL(关系数据库)基于pymysql
    • PostgresSQL(关系数据库)
    • ClickHouse(关系数据库、列存数据库)基于clickhouse_driver
    • MongoDB(文档数据库)基于pymongo
    • ElasticSearch(全文索引) 基于RESTFul API
    • Qdrant(向量数据库)基于RESTFul API
    • Kafka(消息队列)基于RESTFul API和confluent_kafka
    • SQLite(关系数据库、嵌入式数据库)
    • MinIO(文件系统、对象存储、KV数据库)
    • Neo4J(图数据库)基于neo4j

应用场景

本项目具有众多数据处理分析应用场景:

  • 大模型数据预处理:调用大模型进行主题分类、文本翻译、Embedding处理等。参考大模型调用示例 新闻处理
  • 信息抽取与NLP处理:网页信息抽取、新闻主题分类、新闻地区识别、文档专门解析。参考网页信息抽取 新闻处理
  • 多模态数据处理:调用多模态embedding对图像进行Embedding处理 参考图像索引
  • 机器学习/数据挖掘数据集构建:漏洞PoC数据库构建、基于大模型的知识蒸馏、科技评估预测等。参考poc构建
  • 开源数据采集:GDELT全球事件、Web API数据集成、网页URL数据采集,JsonP数据解析,图片采集
  • 开源数据处理:wikidata维基数据、维基百科、新闻、图片等数据处理。参考wikidata处理 GDELT数据采集
  • 知识图谱构建:基于结构化数据和非结构化数据的实体抽取、关系抽取、事件抽取等(部分算子待完善) 。参考wikidata知识图谱 关于wikidata知识图谱的介绍,可以参考作者的一篇博客文章 https://blog.csdn.net/weixin_40338859/article/details/120571090
  • 数据分析:针对Excel、Parquet等表格数据的转换、过滤、去重、统计等。参考项目数据统计1 项目数据统计2
  • 数据库管理/DBA:数据库备份、同步、查询分析等。参考ClickHouse导出 MongoDB数据迁移
  • 服务监测:定时轮询API/服务状态等。参考数据监测
  • 科研数据处理:论文数据采集及处理(arXiv);论文PDF解析、建立全文索引/向量索引,形成统一论文库;论文相关github代码下载
  • 代码分析:基于ast对python代码进行分析,对类和函数进行统计

快速使用

  1. 安装基本依赖
 pip install -r requirements.txt
  1. 查看帮助
 python main_flow.py -h

系统使用

  1. 启动流程
  • 方式一:基于命令行CLI运行:
 python main_flow.py --loader "String(arg1, sep=';')" --processor "Print" local "1;2;3"
  • 方式二:基于YAML流程定义运行:
 python main_flow.py flows/test.yaml

可以在这里找到很多开箱即用的流程。查看Yaml定义规范

  • 方式三:基于Python代码运行: 以下示例与test.yaml等价:
from smartetl.flow_engine import run
from smartetl.loader import JsonLine
from smartetl.processor import Print, Count, Chain, Fork, Select, AddFields

# 定义节点

# 定义数据源  新闻数据
loader = JsonLine('test_data/news.jsonl')

# 选择id和url两个字段
select = Select('id', 'url')
# 添加一个字段chain,值为1,然后打印
chain1 = Chain(AddFields(chain='1'), Print())
# 添加一个字段chain,值为2,然后打印
chain2 = Chain(AddFields(chain='2'), Print())

# 定义整体处理流程
processor = Chain(select, Fork(chain1, chain2, copy_data=True))

# 执行流程
run(loader, processor)

CLI流程示例

  1. 加载EarthCamjsonp数据
python .\main_flow.py --loader "Function('wikidata_filter.gestata.jsonp.E', 'https://www.earthcam.com/cams/common/gethofi
tems.php?hofsource=com&tm=ecn&camera=all&start=0&length=21&ec_favorite=0&cdn=0&date_start=undefined&date_end=undefined&id=&callback=onjsonpload')" --processor "Chain(SelectVal('hofdata'), Flat(), Print())" list1
  1. 加载EarthCam摄像头搜索结果,显示摄像头名字
python .\main_flow.py --loader "web.api.Get('https://www.earthcam.com/api/mapsearch/get_locations?nwx=37.38509688580934&nwy=-126.40319824218751&nex=37.38509688580934&ney=-109.18212890625001&sex=29.95662353271325&sey=-109.18212890625001&swx=29.95662353271325&swy=-126.40319824218751&zoom=9')" --processor "Chain(Flat(), SelectVal('places'), Flat(), SelectVal('name'), Print())" test1

YAML流程定义示例

Tips:可先查看已有流程,看是否有相关任务的,尽量基于已有流程修改。

  • 示例1:加载联合国教科文组织的项目清单CSV文件,按受益国家分组,统计项目数量、总预算、总支出 查看详情 流程对比
loader: CSV('test_data/unesco-projects-20241113.csv')

nodes:
  print: Print
  rename1: RenameFields(**rename)
  group: Group(by='beneficiary_country', emit_fast=False)
  g_count: aggs.Count
  g_total_budget: aggs.Sum('budget')
  g_total_cost: aggs.Sum('cumulative_cost')

processor: Chain(rename1, group, g_total_cost, print)
  • 示例2:输入wikidata dump文件(gz/json)生成id-name映射文件(方便根据ID查询名称),同时对数据结构进行简化,查看详情
name: p1_idname_simple
arguments: 1

loader: WikidataJsonDump(arg1)

nodes:
  n1: wikidata.IDNameMap
  n2: WriteJson('data/id-name.json')
  n3: wikidata.Simplify
  n4: wikidata.SimplifyProps
  n5: WriteJson('test_data/p1.json')
  chain1: Chain(n1, n2)
  chain2: Chain(n3, n4, n5)

processor: Fork(chain1, chain2)
  • 示例3:基于wikidata生成简单图谱结构,包含Item/Property/Item_Property/Property_Property四张表 查看详情
name: p1_wikidata_graph
description: transform wikidata dump to graph, including item/property/item_property/property_property
arguments: 1

loader: WikidataJsonDump(arg1)

nodes:
  writer1: WriteJson('test_data/item.json')
  writer2: WriteJson('test_data/property.json')
  writer3: WriteJson('test_data/item_property.json')
  writer4: WriteJson('test_data/property_property.json')

  rm_type: RemoveFields('_type')

  entity: wikidata.Entity
  filter_item: matcher.SimpleMatch(_type='item')
  filter_property: matcher.SimpleMatch(_type='property')
  chain1: Chain(filter_item, rm_type, writer1)
  chain2: Chain(filter_property, rm_type, writer2)
  group1: Fork(chain1, chain2)

  property: wikidata.Property
  filter_item_property: matcher.SimpleMatch(_type='item_property')
  filter_property_property: matcher.SimpleMatch(_type='property_property')
  chain3: Chain(filter_item_property, rm_type, writer3)
  chain4: Chain(filter_property_property, rm_type, writer4)
  group2: Fork(chain3, chain4)

  chain_entity: Chain(entity, group1)
  chain_property: Chain(property, group2)

processor: Fork(chain_entity, chain_property)

算子统计结果(截至2025.12.28)

| 算子类型 | 所在模块 | 数量 | |------------|------|--------------| | loader | smartetl/loader | 48 | | processor | smartetl/processor | 119 | | database | smartetl/database |16 | | util | smartetl/util | 80 | | gestata | smartetl/gestata | 182 | | 合计 | - | 443 |

TIPS: 统计方法 python main_flow.py flows/scan_py.yaml

流程统计(截至2025.12.28)

| 分类 | 数量 | |--------------|--------| | arxiv | 27 | | news | 19 | | dba | 17 | | wikidata | 16 | | paper | 14 | | files | 13 | | nl2poc | 12 | | kg | 11 | | demos | 7 | | llm | 6 | | crawler | 4 | | web | 4 | | agent | 2 | | event | 2 | | osint | 1 | | paperwithcode| 1 | | wikipedia | 1 | | work-eval | 1 | | 其他 | 15 | | 合计 | 173 |

TIPS:统计方法 python main_flow.py flows/scan_flow.yaml

New!

  • 2025.12.29 修改算子统计方法,设计为 gestata.python模块 统计方法:python main_flow.py flows/scan_py.yaml

  • 2025.12.28

  1. 实现项目类算子(loader+processor+database)和函数式算子(util+gestata)统计 python analyze_class.py -o data/metadata/database.json smartetl smartetl.database 实现类算子统计;python analyze_function.py -o data/metadata/util.json smartetl/util 实现函数式算子统计
  2. 实现流程统计流程
  3. 将文件加载器进行统一管理
  4. 新增util.files.read实现各种类型文件读取
  • 2025.12.25
  1. 实现paperwithcode数据集论文关联代码下载,查看
  • 2025.9.14
  1. 完善gestata.mineru算子
  2. 新增apps.pdf_parser_parallel,搭配mineru的并行化部署,实现大规模PDF解析
  3. arXiv相关处理流程优化
  • 2025.8.21
  1. 完善论文处理流程
  2. 完善新的gdelt处理流程
  3. 新增微信公众号文章采集流程
  • 2025.8.14 启动V3版本开发

  • 2025.8.13

  1. 更新GDELT解析入库流程
  2. 实现日志机制,通过YAML中配置logging,全局注入debug info warning error函数,方便组件使用;也支持替换print。查看示例
  • 2025.8.12
  1. Map支持多个位置参数 #37
  2. GDELT流程拆分为
View on GitHub
GitHub Stars28
CategoryData
Updated2mo ago
Forks6

Languages

Python

Security Score

95/100

Audited on Jan 8, 2026

No findings