本地文件接入
概要:
本例主要演示
LuceneIndexTask
接入本地文件数据的流程,整个过程会与csv文件接入
的流程比较类似,但与csv文件接入
中的lucene_index_realtime
相比,本流程的LuceneIndexTask
支持一些额外的优化设置:
- 通过
maxRowsPerSegment
或numShards
属性可以设定不同segment
生成策略,二者区别如下:maxRowsPerSegment
: 每个segment
最大的存储行数numShards
: 设置总的segment
数- 通过
overwrite
设置对datasource
进行覆盖写入还是追加写入
基于以上的优化,接入本地文件数据时推荐使用本流程
以下以接入 csv
数据为例。
第一步:准备好目标csv文件数据
- 本例的部分
csv
数据如下:1500520434191,1,jkcdashjfksa 1500520430411,2,fdfjdkhjfk 1500020434191,4,fkdsjfkskjfa 1420520434191,5,kljfdsaj 1500510434191,9,sagfkldjgf 1500520214191,8,fdjsklklf
第二步:创建、编辑、保存json文件
- 具体的json文件配置请参考后文的LuceneIndexTask_Json实例
第三步:上传csv、json文件到MiddleManager服务器上
- 使用
scp
命令将csv
、json
文件上传到MiddleManager
scp {file_path} root@{MiddleManagerIP}:{storage_dir}
file_path
csv
或json
文件在本地的绝对路径,要指定到文件名这一层
MiddleManagerIP: druid的MiddleManager
节点ip地址
storage_dir 在MiddleManager
的存放目录,使用绝对路径
第四步:执行命令,启动Task
- 通过
ssh
远程登录到第三步中选择的MiddleManager
中 - 执行
cd
命令进入第三步中选择存放json
文件的目录中 - 如果task在worker上的分配策略为均分策略,执行
curl
命令取消之curl -X 'POST' -H 'Content-Type:application/json' -d '{"selectStrategy":{"type":"specialEqualDistribution"},"autoScaler":null}' http://{overlordIP}:8090/druid/indexer/v1/worker
执行
curl
命令启动Task
,具体命令格式如下:curl -X 'POST' -H 'Content-Type:application/json' -d @{file_name} http://{OverlordIP}:8090/druid/indexer/v1/task
OverlordIP: druid的overlord节点ip地址,如果有多个overlord,必须指定leader的ip.
file_name
json
文件名
第五步:查看Task执行情况
查看日志 访问:
http://{OverlordIP}:8090/console.html
,点击Task
的日志,查看Task
的执行情况OverlordIP: druid的overlord节点ip地址
查看执行结果 使用
sugo-plyql
查询Task
的执行结果,具体的命令格式为:./plyql -h {OverlordIP} -q 'select count(*) from {datasource}'
OverlordIP: druid的overlord节点ip地址
datasource:json
配置文件中定义的datasource
名称如果查询的结果是"No Such Datasorce",则说明数据接入没有成功。
如果数据接入成功,那么查询到的结果如下:
该数字为数据条数。关于
sugo-plyql
的安装和使用,详见 sugo-plyql 使用文档
注意事项
- 如果task在worker上的分配策略为均分策略,执行
curl
命令取消之curl -X 'POST' -H 'Content-Type:application/json' -d '{"selectStrategy":{"type":"specialEqualDistribution"},"autoScaler":null}' http://{overlordIP}:8090/druid/indexer/v1/worker
worker:
指定具体的worker的address,格式为hostname:port
,如dev224.sugo.net:8091
spec.dataSchema.granularitySpec.intervals:
是数据时间戳范围,不能为空maxRowsPerSegment
和概要中提到的numShards
是两种不同的segment
生成策略,所以不可同时指定,只能二选一spec.tuningConfig.overwrite:
设置对datasource
进行覆盖写入还是追加写入,默认为false
spec.tuningConfig.reportParseExceptions:
是否汇报数据解析错误,默认为false
context:
任务上下文环境设置,可以不设置context.debug:
开启debug
模式,调试时开启,生成环境不开启
本流程使用的json配置实例如下:
{
"type": "lucene_index",
"worker": "dev224.sugo.net:8091",
"spec": {
"dataSchema": {
"dataSource": "index-224-cyz113",
"metricsSpec": [],
"parser": {
"parseSpec": {
"format": "csv",
"timestampSpec": {"column": "da","format": "millis"},
"dimensionsSpec": {
"dimensionExclusions": [],
"spatialDimensions": [],
"dimensions": [
{"name": "num","type": "int"},
{"name": "strvalue","type": "string"}
]
},
"listDelimiter": ",",
"columns": ["da","num","strvalue"]
}
},
"granularitySpec": {
"intervals": ["2010-01-01/2018-01-01"],
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"type": "uniform"
}
},
"ioConfig": {
"type":"lucene_index",
"firehose":{
"type": "local",
"filter": "data1.csv",
"baseDir": "/data1/csv/"
}
},
"tuningConfig": {
"type": "lucene_index",
"maxRowsPerSegment": 5000000,
"overwrite": false,
"reportParseExceptions":true
}
},
"context":{
"debug":true
}
}
type:
指定数据接入的类型,固定为lucene_index
worker:
指定具体的worker的address,一般的形式为hostname:port
spec:
一些参数说明spec.dataSchema:
关于接入数据的概要说明spec.dataSchema.datasource:
数据源的名称,类似关系数据库中的表名spec.dataSchema.metricsSpec:
所有的指标列和所使用的聚合函数,可以不设置spec.dataSchema.parser:
数据转换器spec.dataSchema.parser.parseSpec:
数据转换的参数说明spec.dataSchema.parser.parseSpec.format:
单条数据的格式spec.dataSchema.parser.parseSpec.timestampSpec:
时间戳的说明spec.dataSchema.parser.parseSpec.timestampSpec.column:
时间戳的列名spec.dataSchema.parser.parseSpec.timestampSpec.format:
时间格式类型, 推荐millis
- [ ]
yy-MM-dd HH:mm:ss.SSS:
自定义的时间格式 - [ ]
auto:
自动识别时间,支持iso
和millis
格式 - [ ]
iso:
iso标准时间格式,如2016-08-03T12:53:51.999Z
- [ ]
posix:
从1970年1月1日开始所经过的秒数,10位的数字 - [ ]
millis:
从1970年1月1日开始所经过的毫秒数,13位数字
- [ ]
spec.dataSchema.parser.parseSpec.dimensionsSpec:
维度说明spec.dataSchema.parser.parseSpec.dimensionsSpec.dimensionExclusions:
排除在外的维度spec.dataSchema.parser.parseSpec.dimensionsSpec.spatialDimensions:
维度的空间说明spec.dataSchema.parser.parseSpec.dimensionsSpec.dimensions:
维度定义列表,每个维度的格式为:{“name”: “age”, “type”:”string”}
。Type支持的类型:string
、int
、float
、long
、date
spec.dataSchema.parser.parseSpec.dimensionsSpec.listDelimiter:
csv列分隔符spec.dataSchema.parser.parseSpec.dimensionsSpec.multiValueDelimiter:
csv多值列分隔符spec.dataSchema.parser.parseSpec.dimensionsSpec.columns:
维度列表,包含时间戳列,eg:["da","ProductID"]
spec.dataSchema.granularitySpec:
数据粒度说明spec.dataSchema.granularitySpec.intervals:
数据时间戳范围,不能为空,可以指定多个范围spec.dataSchema.granularitySpec.segmentGranularity:
段粒度,根据每天的数据量进行设置。 小数据量建议DAY
,大数据量(每天百亿)可以选择HOUR
。可选项:SECOND
、MINUTE
、FIVE_MINUTE
、TEN_MINUTE
、FIFTEEN_MINUTE
、HOUR、SIX_HOUR
、DAY
、MONTH
、YEAR
。spec.dataSchema.granularitySpec.queryGranularity:
查询粒度spec.dataSchema.granularitySpec.type:
粒度说明的类型,默认使用uniform
spec.ioConfig:
数据的IO说明spec.ioConfig.type:
固定为lucene_index
spec.ioConfig.firehose:
数据源适配器spec.ioConfig.firehose.type:
数据源适配器的类型,一般用local
spec.ioConfig.firehose.filter:
文件名匹配符,如*.csv
匹配所有的csv
文件spec.ioConfig.firehose.parser:
数据转换器,与上面配置的spec.dataSchema.parser
一样spec.ioConfig.firehose.baseDir:
数据文件所在目录spec.tuningConfig:
优化说明spec.tuningConfig.type:
固定为lucene_index
spec.tuningConfig.maxRowsPerSegment:
每个segment
最大的存储行数,默认为5000000