kafka接入
概要
- 本流程主要演示把
kafka
上的url
数据导入到Druid
。url
数据样式:dt=1500714597057&index=163&str=9wevoy
(dt
的类型为date
,格式为millis
)前提:
- 部署好druid平台环境
- kafka集群已经部署好,数据已经正常写入特定的topic
- druid集群服务所在网络与kafka集群所在网络是相通的
第一步:编辑json文件
从 kafka 加载数据到 druid 平台,需要通过发送一个 http post 请求到 druid 接入接口。
该 post 请求中包含 json 格式的请求数据信息。为了方便修改,建议先编辑一个 json 文件。
第二步:建立Supervisor
使用 curl 命令发送 post 请求。假设 json 文件名为 wiki.json,curl 命令如下:
curl -X POST -H 'Content-Type: application/json' -d @/tmp/json/wiki.json http://overlord_ip:8090/druid/indexer/v1/supervisor
/tmp/json/wiki.json: 详见
wiki.json
overlord_ip: druid的overlord节点ip地址,如果有多个overlord,必须指定leader的ip.
也可以通过网页来操作:
- 登录 overlord_ip:8090/supervisor.html
- 删除页面上原来文本框里的内容,将
wiki.json
的内容复制,粘贴到文本框中,点击下面的Create Supervisor
,即可创建Supervisor
。
第三步:查看Task执行情况
- 查看日志
访问:
http://{OverlordIP}:8090/console.html
,点击Task
的日志,查看Task
的执行情况OverlordIP: druid的overlord节点ip地址,如果有多个overlord,必须指定leader的ip.
查看执行结果
使用
sugo-plyql
查询Task
的执行结果,具体的命令格式为:./plyql -h {OverlordIP} -q 'select count(*) from {datasource}'
OverlordIP: druid的overlord节点ip地址
datasource:json
配置文件中定义的datasource
名称如果查询的结果是"No Such Datasorce",则说明数据接入没有成功。
如果数据接入成功,那么查询到的结果如下:
该数字为数据条数。关于
sugo-plyql
的安装和使用,详见 sugo-plyql 使用文档
wiki.json文件内容:
{
"type": "lucene_supervisor",
"dataSchema": {
"dataSource": "wiki",
"parser": {
"type": "string",
"parseSpec": {
"format": "url",
"timestampSpec": {
"column": "dt",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
{
"name": "index",
"type": "int"
},
{
"name": "str",
"type": "string"
}
]
}
}
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "YEAR"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 500000,
"maxRowsPerSegment": 20000000
},
"ioConfig": {
"topic": "wiki",
"consumerProperties": {
"bootstrap.servers": "192.168.0.220:9092,192.168.0.221:9092,192.168.0.222:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT3600S",
"useEarliestOffset": "true"
}
}
type:
从kafka加载数据到druid时固定配置为lucene_supervisordataSchema.dataSource
数据源的名称,类似关系数据库中的表名dataSchema.parser.type
固定stringdataSchema.parser.parseSpec.format
kafka中单条数据的格式,比如url/json/csv/tsv/hive
。url:
数据类似url
参数串,如:name=jack&age=21&score=78.5&the_date= 1489131528601
dataSchema.parser.parseSpec.timestampSpec:
时间戳列以及时间的格式dataSchema.parser.parseSpec.timestampSpec.format
时间格式类型:推荐millis
yy-MM-dd HH:mm:ss
: 自定义的时间格式auto
: 自动识别时间,支持iso和millis格式iso
:iso标准时间格式,如”2016-08-03T12:53:51.999Z”posix
:从1970年1月1日开始所经过的秒数,10位的数字millis
:从1970年1月1日开始所经过的毫秒数,13位数字dataSchema.parser.parseSpec.dimensionsSpec.dimensions:
维度定义列表,每个维度的格式为:{“name”: “age”, “type”:”string”}
。Type支持的类型:string
、int
、float
、long
、date
json:
json格式数据- 数据格式:
{"name":"jack","age": 21,"score": 78.5,"the_date": 1489131528601}
- 数据格式:
csv:
csv格式数据dataSchema.parser.parseSpec.listDelimiter:
分隔符dataSchema.parser.parseSpec.columns:
维度列表,格式:["name","age","score"]
hive:
hive数据文件dataSchema.parser.parseSpec.timestampSpec
dataSchema.parser.parseSpec.dimensionsSpec
dataSchema.parser.parseSpec.separator:
维度分隔符dataSchema.parser.parseSpec.columns:
维度列表
dataSchema.granularitySpec.type:
默认使用uniform
dataSchema.granularitySpec.segmentGranularity:
段粒度,根据每天的数据量进行设置。 小数据量建议DAY
,大数据量(每天百亿)可以选择HOUR
。可选项:SECOND
、MINUTE
、FIVE_MINUTE
、TEN_MINUTE
、FIFTEEN_MINUTE
、HOUR
、SIX_HOUR
、DAY
、MONTH
、YEAR
。tuningConfig.type:
设置为kafka
ioConfig.topic:
kafka中的topic名ioConfig.consumerProperties:
kafka消费端接口的配置,比如kafka的服务器配置taskCount:
启动的任务进程数replicas:
任务的副本数taskDuration:
任务持续时间,超过指定时间后,任务会停止接收数据,等数据持久化之后会创建新的任务进程。可设置的格式:一分钟:PT60S
, 十分钟:PT10M
, 一天:P1D
useEarliestOffset:
从kafka的最早的offset
开始消费