Apache Nifi 使用案例


安装

本地是 Mac 环境,直接用 brew 安装:

brew install nifi

安装完成后执行:

nifi start

浏览器访问 http://localhost:8080/nifi 可以看到图形界面。

Apache Nifi 文档中可用组件非常多,可用于数据库、ES、消息队列、Hbase 等非常多中间件。本文仅会用到几个简单的组件:

  • QueryDatabaseTable:执行 SQL 语句查询数据库,数据默认格式是 Avro。
  • ConvertAvroToJSON:把 Avro 数据转换成 JSON。
  • EvaluateJsonPath:执行一些 JsonPath 查询,这里是为了设置 ES 文档的 id。
  • PutElasticsearchHttp:把 JSON 数据同步到 ES。

MySQL 设置

设计一个 book 表,往这个表加入数据,update_time 是插入时间:

CREATE TABLE `book` (
  `id` varchar(100) NOT NULL,
  `title` varchar(50) DEFAULT NULL,
  `desc` varchar(1000) DEFAULT NULL,
  `path` varchar(200) DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

ES 配置

新建一个索引 book,并加入 mapping 名为 idx:

curl -XPUT "http://localhost:9200/book" -H "Content-Type: application/json" -d'
{
  "mappings": {
    "idx": {
      "properties": {
        "id": {
          "type": "keyword"
        },
        "title": {
          "type": "text"
        },
        "desc": {
          "type": "text"
        },
        "path": {
          "type": "keyword"
        },
        "attachment": {
          "properties": {
            "content": {
              "type": "text"
            }
          }
        }
      }
    }
  }
}'

定义 flow

用上文提及的几个组件创建一个 flow,篇幅问题就不赘述如何创建了,直接用我导出的 template

把附件模板导入到 Nifi,并把组件启动:

nifi.png

这时候往 MySQL 插入数据就可以看见数据被同步到 ES 了。