hadoop接入
概要
- 本流程主要演示的是把
hadoop上的csv数据导入到Druid。csv数据样式:1500711222228,165,cye1z8(第一个字段的类型为date,格式为millis)前提:
1.部署好druid平台环境
2.部署好hadoop环境
第一步:导入数据到hdfs(如果数据已经在hdfs上则不用)
在部署了 hadoop 环境的机器上进行操作。
- 切换到 hdfs 用户,指令为:
su hdfs - 在 hdfs 上创建存放数据的目录,指令为:
hadoop fs -mkdir -p dirName
例如,要把数据放到 hdfs 下的 /data1/tmp/druid 下,指令为:hadoop fs -mkdir -p /data1/tmp/druid - 将数据放到目录下,指令为:
hadoop fs -put fileName dirName
例如,将 /data/tmp/test.json 文件放到hdfs下的 /data1/tmp/druid 下,指令为:hadoop fs -put /data/tmp/test.json /data1/tmp/druid
第二步:创建json文件
创建、编辑、保存 taskspec.json 文件说明:
- 按照数据的格式,创建 taskspec.json 文件,具体参数参照
task-spec.json。 - 执行指令:
cd /data1/tmp/druid
进入该目录下,创建 json 文件。指令为vim taskspec.json
将 json 内容拷贝,保存退出。
说明:taskspec.json 文件定义 csv 文件分2种,一种是有时间列,一种是没时间列,需要将json文件中的
"timestampSpec":{
"column":"ts",
"format":"millis"
}
改成:
"timestampSpec":{
"missingValue":"2017-08-01T12:00:000Z"
}
(固定时间,时间可改)。其中ts为表中时间列的列名。
第三步:执行命令,建立task
- 进入存放 taskspec.json 文件的目录:
cd /data1/tmp/druid - 执行建立task的指令:
curl -X 'POST' -H 'Content-Type:application/json' -d @task-spec.json http://{overlordIP}:8090/druid/indexer/v1/task task-spec.json:json文件的名字overlordIp: druid的overlord节点ip地址
第四步:查看Task执行情况
查看日志
访问:http://{OverlordIP}:8090/console.html,点击Task的日志,查看Task的执行情况OverlordIP: druid的overlord节点ip地址,如果有多个overlord,必须指定leader的ip.

查看执行结果
使用sugo-plyql查询Task的执行结果,具体的命令格式为:./plyql -h {OverlordIP} -q 'select count(*) from {datasource}'OverlordIP: druid的overlord节点ip地址
datasource:json配置文件中定义的datasource名称如果查询的结果是"No Such Datasorce",则说明数据接入没有成功。
如果数据接入成功,那么查询到的结果如下:
该数字为数据条数。关于
sugo-plyql的安装和使用,详见 sugo-plyql 使用文档
第五步:停止task(需要时再用)
在需要停止 task 时,可以发送如下 http post 请求停止 task 任务
curl -X 'POST' -H 'Content-Type:application/json' http://{overlordIP}:8090/druid/indexer/v1/task/{taskId}/shutdown
overlord_ip: druid的overlord节点ip地址taskId: 在http://{overlordIP}:8090/console.htmltask详细页面对应 id 列的信息
注意事项
spec.dataSchema.parser.parseSpec.columns:包含时间戳列,并且先后顺序要和源数据对应spec.dataSchema.parser.parseSpec.dimensionsSpec.dimensions:不包含时间戳列,并且name要和parser.parseSpec.columns对应,type要和源数据对应起来。spec.ioConfig.inputSpec.paths:数据文件在hdfs上的目录,注意是否配置正确- 数据源有 date 类型时,要对 taskspec.json 文件 date 类型按要求定义:
Date类型格式 2017-01-01 00:00,则定义为:"date","format":"yy-MM-dd HH:mm"
Date类型格式 2017-01-01 00:00:00,则定义为:"date","format":"yy-MM-dd HH:mm:ss"
Date类型格式 2017-01-01,则定义为:"date","format":"yy-MM-dd"
Date类型格式 2017-01,则定义为:"date","format":"yyyy-MM"
Date类型格式 2017,则定义为:"date","format":"yyyy"
Date类型格式是从1970年1月1日开始所经过的秒数,10位的数字,则定义为:"date","format":"posix"
Date类型格式是从1970年1月1日开始所经过的毫秒数,13位数字,则定义为:"date","format":"millis" - 如果
segmentGranularity即段粒度较小,interval的范围也要较小,否则分片过多,非常耗时。
task-spec.json详细配置如下:
{
"type": "lucene_index_hadoop",
"spec": {
"dataSchema": {
"dataSource": "com_HyoaKhQMl_project_r1U1FDLjg",
"parser": {
"type": "string",
"parseSpec": {
"format": "csv",
"timestampSpec": {
"column": "da",
"format": "millis"
},
"listDelimiter": ",",
"columns": [
"da",
"id",
"md5_str"
],
"dimensionsSpec": {
"dynamicDimension": false,
"dimensions": [
{"name": "id","type": "int"},
{"name": "md5_str","type": "string"}
]
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "MONTH",
"queryGranularity": "NONE",
"intervals": ["2016-05-21/2017-07-02"]
}
},
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"paths": "/data1/tmp/test"
}
},
"tuningConfig": {
"type": "hadoop",
"partitionsSpec": {
"numShards": 3
},
"jobProperties": {
"mapreduce.job.queuename": "root.default",
"mapreduce.job.classloader": "true",
"mapreduce.job.classloader.system.classes": "-javax.validation.,java.,javax.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop.,com.hadoop.compression"
}
}
}
}
spec.dataSchema.parser.parseSpec.timestampSpec.column:时间戳列spec.dataSchema.parser.parseSpec.timestampSpec.format:时间格式类型:推荐millisyy-MM-dd HH:mm:ss.SSS:自定义的时间格式auto:自动识别时间,支持iso和millis格式iso:iso标准时间格式,如2016-08-03T12:53:51.999Zposix:从1970年1月1日开始所经过的秒数,10位的数字millis:从1970年1月1日开始所经过的毫秒数,13位数字
spec.dataSchema.parser.parseSpec.listDelimiter:csv列分隔符spec.dataSchema.parser.parseSpec.columns:维度列表,包含时间戳列,如:["ts","ProductID"]spec.dataSchema.parser.parseSpec.dimensionsSpec.dimensions:维度定义列表,不包含时间戳列,每个维度的格式为:{“name”: “name_string”, “type”:”type_string”}。Type支持的类型:string、int、float、long、datespec.dataSchema.granularitySpec.intervals:数据时间戳范围,不能为空spec.dataSchema.granularitySpec.segmentGranularity:段粒度,根据每天的数据量进行设置。
小数据量建议DAY,大数据量(每天百亿)可以选择HOUR。可选项:SECOND、MINUTE、FIVE_MINUTE、TEN_MINUTE、FIFTEEN_MINUTE、HOUR、SIX_HOUR、DAY、MONTH、YEAR。spec.dataSchema.dataSource:数据源的名称,类似关系数据库中的表名spec.ioConfig.type:这里是从hadoop接入数据,固定为hadoopspec.ioConfig.inputSpec.paths:数据文件在hdfs上的目录,注意该目录下是否有多个文件
spec.tuningConfig.partitionsSpec.numShards:指定分片数spec.tuningConfig.jobProperties.mapreduce.job.queuename:指定yarn上的队列名spec.tuningConfig.jobProperties.mapreduce.job.classloader:如果为true,task会用druid中的hadoop-dependencies,而不是hadoop集群的配置