离线接入
概要
本流程主要说明把离线数据实时导入到
Tindex
1. csv文件导入
导入过程步骤:
第一步:找MiddleManagers服务IP
第二步:到MiddleManagers服务器上传本地数据文件、任务json文件
- 通过shell登录后,输入命令:cd /data1/tmp/druid
注: /data1/tmp/druid 根据 taskspec.json 文件实际路径决定
第三步: 创建、编辑、保存taskspec.json文件:
在 data1/tmp/druid 目录下,vim taskspec.json ,将下面json
配置说明内容拷贝,然后点击键盘 ESC 按键退出编辑, 再输入 wq 命令退出并保存。
第四步:执行命令上传csv文件
csv文件上传, 在shell工具登录:MiddleManagers服务器, cd /data1/tmp/druid 目录下,执行:
curl -X 'POST' -H 'Content-Type:application/json' -d @csv-task-spec.json http://{OverlordIP}:8090/druid/indexer/v1/task
overlordIP: druid的overlord节点ip地址
csv-task-spec.json task配置文件,详见下文
第五步:查看csv文件上传task的日志信息。进入overlord服务的任务列表页面,查看数据导入情况。
http://{overlordIP}:8090/console.html
overlordIP: druid的overlord节点ip地址,如果有多个overlord,必须指定leader的ip.
第六步:在需要停止task时,可以发送如下http post请求停止task任务
curl -X 'POST' -H 'Content-Type:application/json' http://{overlordIP}:8090/druid/indexer/v1/task/{taskId}/shutdown
overlordIP: druid的overlord节点ip地址,如果有多个overlord,必须指定leader的ip.
taskId: 在
http://{overlordIP}:8090/console.html
task详细页面对应 id 列的信息
csv-task-spec.json详细配置如下:
{
"type" : "lucene_index",
"worker": "dev224.sugo.net:8091",
"spec" : {
"dataSchema" : {
"dataSource" : "test_10million",
"parser": {
"type": "string",
"parseSpec": {
"format": "csv",
"timestampSpec": {
"column": "dealtime",
"format": "yy-MM-dd HH:mm:ss.SSS"
},
"dimensionsSpec": {
"dimensionExclusions": [],
"spatialDimensions": [],
"dimensions": [
{
"name": "old_card_no",
"type": "string"
},
{
"name": "gender",
"type": "string"
},
{
"name": "birthday",
"type": "string"
},
{
"name": "mobile_province",
"type": "string"
},
{
"name": "mobile_city",
"type": "string"
}
]
},
"listDelimiter": ",",
"multiValueDelimiter": "\u0002",
"columns": [
"dealtime",
"old_card_no",
"gender",
"birthday",
"mobile_province",
"mobile_city"
]
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "MONTH",
"intervals" : ["1000/3000"]
}
},
"ioConfig" : {
"type" : "lucene_index",
"firehose" : {
"type" : "local",
"baseDir" : "/tmp/test/aaa",
"filter" : "*",
"recursive" : true
}
},
"tuningConfig" : {
"type" : "lucene_index",
"maxRowsPerSegment" : 5000000,
"numShards" : -1,
"overwrite" : false,
"reportParseExceptions" : true,
"writeThreads" : 3,
"writeQueueThreshold" : 1000,
"mergeThreads" : 3,
"useDataSequenceMode" : false,
"maxSegmentIdleSeconds" : 10
}
},
"writerConfig" : {
"type" : "lucene",
"maxBufferedDocs" : -1,
"ramBufferSizeMB" : 16.0,
"indexRefreshIntervalSeconds" : 6
},
"context" : {
"debug" : true
}
}
参数说明:
属性名 | 值 | 类型 | 是否必需 | 默认值 | 说明 |
---|---|---|---|---|---|
type | lucene_index | string | 是 | - | 指定接入类型,固定 |
worker | 自定义 | string | 否 | - | 指定启动task的worker地址,前置条件参考配置worker策略 |
spec.dataSchema | 参见DataSchema | json | 是 | - | 定义表结构和数据粒度 |
spec.ioConfig | 参见luceneIndexIOConfig | json | 是 | - | 定义数据来源 |
spec.tuningConfig | 参见luceneIndexTuningConfig | json | 是 | - | 配置Task的优化参数 |
writerConfig | 参见WriterConfig | json | 是 | - | 配置数据段的写入参数 |
context | 自定义 | json | 否 | null | 配置Task的上下文环境参数 |
1.1 配置worker策略
worker分配策略支持以下几种:
- fillCapacity: 填充满一个worker容量优先
- fillCapacityWithAffinity: 支持配置数据源与worker的绑定关系,若不绑定,则采用填充满一个worker容量优先
- equalDistribution: 平均分配
- equalDistributionWithAffinity: 支持配置数据源与worker的绑定关系,若不绑定,则平均分配
- javascript: js自定义分配
specialEqualDistribution: 支持指定worker,若不指定,则平均分配
task在worker上的分配策略默认为
equalDistribution
均分策略,若要支持指定worker,需要发送以下命令:curl -X 'POST' -H 'Content-Type:application/json' -d '{"selectStrategy":{"type":"specialEqualDistribution"},"autoScaler":null}' http://{overlordIP}:8090/druid/indexer/v1/worker
2. tsv文件导入
TSV文件导入流程大体与CSV文件导入
一样,此处不做赘述。仅提供接入的json样例
tsv-task-spec.json详细配置如下:
{
"type" : "lucene_index",
"spec" : {
"dataSchema" : {
"dataSource" : "test_10million",
"parser": {
"type": "string",
"parseSpec": {
"format": "tsv",
"timestampSpec": {
"column": "dealtime",
"format": "yy-MM-dd HH:mm:ss.SSS"
},
"dimensionsSpec": {
"dimensionExclusions": [],
"spatialDimensions": [],
"dimensions": [
{
"name": "old_card_no",
"type": "string"
},
{
"name": "gender",
"type": "string"
},
{
"name": "birthday",
"type": "string"
},
{
"name": "mobile_province",
"type": "string"
},
{
"name": "mobile_city",
"type": "string"
}
]
},
"delimiter": "\t",
"listDelimiter": "\u0002",
"nullFormat": "",
"columns": [
"dealtime",
"old_card_no",
"gender",
"birthday",
"mobile_province",
"mobile_city"
]
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "MONTH",
"intervals" : ["1000/3000"]
}
},
"ioConfig" : {
"type" : "lucene_index",
"firehose" : {
"type" : "local",
"baseDir" : "/tmp/test/aaa",
"filter" : "*",
"recursive" : true
}
},
"tuningConfig" : {
"type" : "lucene_index",
"maxRowsPerSegment" : 5000000,
"numShards" : -1,
"overwrite" : false,
"reportParseExceptions" : true,
"writeThreads" : 3,
"writeQueueThreshold" : 1000,
"mergeThreads" : 3,
"useDataSequenceMode" : false,
"maxSegmentIdleSeconds" : 10
}
},
"writerConfig" : {
"type" : "lucene",
"maxBufferedDocs" : -1,
"ramBufferSizeMB" : 16.0,
"indexRefreshIntervalSeconds" : 6
},
"context" : {
"debug" : true
}
}
参数说明:
属性名 | 值 | 类型 | 是否必需 | 默认值 | 说明 |
---|---|---|---|---|---|
type | lucene_index | string | 是 | - | 指定接入类型,固定 |
spec.dataSchema | 参见DataSchema | json | 是 | - | 定义表结构和数据粒度 |
spec.ioConfig | 参见luceneIndexIOConfig | json | 是 | - | 定义数据来源 |
spec.tuningConfig | 参见luceneIndexTuningConfig | json | 是 | - | 配置Task的优化参数 |
writerConfig | 参见WriterConfig | json | 是 | - | 配置数据段的写入参数 |
context | 自定义 | json | 否 | null | 配置Task的上下文环境参数 |
3. hdfs文件导入
本节主要说明导入hdfs集群上的文本文件,注意需要保证Tindex集群中的MiddleManager节点与目标hdfs集群是可访问的。
hdfs-task-spec.json详细配置如下:
{
"type" : "lucene_index",
"spec" : {
"dataSchema" : {
"dataSource" : "test_10million",
"parser": {
"type": "string",
"parseSpec": {
"format": "tsv",
"timestampSpec": {
"column": "dealtime",
"format": "yy-MM-dd HH:mm:ss.SSS"
},
"dimensionsSpec": {
"dimensionExclusions": [],
"spatialDimensions": [],
"dimensions": [
{
"name": "old_card_no",
"type": "string"
},
{
"name": "gender",
"type": "string"
},
{
"name": "birthday",
"type": "string"
},
{
"name": "mobile_province",
"type": "string"
},
{
"name": "mobile_city",
"type": "string"
}
]
},
"delimiter": "\u0001",
"listDelimiter": "\u0002",
"nullFormat": "",
"columns": [
"dealtime",
"old_card_no",
"gender",
"birthday",
"mobile_province",
"mobile_city"
]
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "MONTH",
"intervals" : ["1000/3000"]
}
},
"ioConfig" : {
"type" : "lucene_index",
"firehose" : {
"type" : "hdfs",
"baseDir" : "/tmp/csv",
"filter" : "*.*",
"dirFilter" : null,
"recursive" : true
}
},
"tuningConfig" : {
"type" : "lucene_index",
"maxRowsPerSegment" : 5000000,
"numShards" : -1,
"overwrite" : false,
"reportParseExceptions" : true,
"writeThreads" : 3,
"writeQueueThreshold" : 1000,
"mergeThreads" : 3,
"useDataSequenceMode" : false,
"maxSegmentIdleSeconds" : 10
}
},
"writerConfig" : {
"type" : "lucene",
"maxBufferedDocs" : -1,
"ramBufferSizeMB" : 16.0,
"indexRefreshIntervalSeconds" : 6
},
"context" : {
"debug" : true
}
}
参数说明:
属性名 | 值 | 类型 | 是否必需 | 默认值 | 说明 |
---|---|---|---|---|---|
type | lucene_index | string | 是 | - | 指定接入类型,固定 |
spec.dataSchema | 参见DataSchema | json | 是 | - | 定义表结构和数据粒度 |
spec.ioConfig | 参见luceneIndexIOConfig | json | 是 | - | 定义数据来源 |
spec.tuningConfig | 参见luceneIndexTuningConfig | json | 是 | - | 配置Task的优化参数 |
writerConfig | 参见WriterConfig | json | 是 | - | 配置数据段的写入参数 |
context | 自定义 | json | 否 | null | 配置Task的上下文环境参数 |
4. parquet文件导入
本节主要说明该直接导入hdfs集群上的parquet文件,注意需要保证Tindex集群中的MiddleManager节点与目标hdfs集群是可访问的。
parquet-task-spec.json详细配置如下:
{
"type" : "lucene_index",
"spec" : {
"dataSchema" : {
"dataSource" : "test_10million",
"parser": {
"type": "parquet",
"parseSpec": {
"format": "parquet",
"timestampSpec": {
"column": "dealtime",
"format": "yy-MM-dd HH:mm:ss.SSS"
},
"dimensionsSpec": {
"dimensionExclusions": [],
"spatialDimensions": [],
"dimensions": [
{
"name": "old_card_no",
"type": "string"
},
{
"name": "gender",
"type": "string"
},
{
"name": "birthday",
"type": "string"
},
{
"name": "mobile_province",
"type": "string"
},
{
"name": "mobile_city",
"type": "string"
}
]
},
"delimiter": "\u0001",
"listDelimiter": "\u0002",
"nullFormat": "",
"columns": [
"dealtime",
"old_card_no",
"gender",
"birthday",
"mobile_province",
"mobile_city"
]
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "MONTH",
"intervals" : ["1000/3000"]
}
},
"ioConfig" : {
"type" : "lucene_index",
"firehose" : {
"type" : "parquet",
"baseDir" : "/tmp/csv",
"filter" : "*.parq",
"dirFilter" : null,
"recursive" : true
}
},
"tuningConfig" : {
"type" : "lucene_index",
"maxRowsPerSegment" : 5000000,
"numShards" : -1,
"overwrite" : false,
"reportParseExceptions" : true,
"writeThreads" : 3,
"writeQueueThreshold" : 1000,
"mergeThreads" : 3,
"useDataSequenceMode" : false,
"maxSegmentIdleSeconds" : 10
}
},
"writerConfig" : {
"type" : "lucene",
"maxBufferedDocs" : -1,
"ramBufferSizeMB" : 16.0,
"indexRefreshIntervalSeconds" : 6
},
"context" : {
"debug" : true
}
}
参数说明:
属性名 | 值 | 类型 | 是否必需 | 默认值 | 说明 |
---|---|---|---|---|---|
type | lucene_index | string | 是 | - | 指定接入类型,固定 |
spec.dataSchema | 参见DataSchema | json | 是 | - | 定义表结构和数据粒度 |
spec.ioConfig | 参见luceneIndexIOConfig | json | 是 | - | 定义数据来源 |
spec.tuningConfig | 参见luceneIndexTuningConfig | json | 是 | - | 配置Task的优化参数 |
writerConfig | 参见WriterConfig | json | 是 | - | 配置数据段的写入参数 |
context | 自定义 | json | 否 | null | 配置Task的上下文环境参数 |
5. 数据库导入
Tindex的数据库导入任务主要是基于JDBC协议实现数据库的连接和数据拉取
导入过程步骤:
第一步:获取驱动包
根据数据库类型和版本获取对应的jdbc 驱动jar包。
一般可以在对应数据官网可以获取,如MYSQL5的驱动包:mysql-connector-java-5.1.38.jar
第二步:上传驱动包到MiddleManagers服务器的驱动目录
驱动目录默认是:/data1/druid/driver_jars
, 实际配置以数果Ambari配置为准。
scp mysql-connector-java-5.1.38.jar root@192.168.0.224:/data1/druid/driver_jars
第三步: 创建、编辑、保存taskspec.json文件:
在 data1/tmp/druid 目录下,vim taskspec.json ,将下面json
配置说明内容拷贝,然后点击键盘 ESC 按键退出编辑, 再输入 wq 命令退出并保存。
task-spec.json详细配置如下:
{
"type": "lucene_upsert",
"filterColumns": [
"store_id"
],
"actionColumn": "action",
"dataSchema": {
"dataSource": "tindex_indices_tt2",
"parser": {
"type": "map",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "__time",
"reNewTime":true
},
"dimensionsSpec": {
"dimensionExclusions": [],
"spatialDimensions": [],
"dimensions": [
{
"name": "store_name",
"type": "string"
},
{
"name": "format_type",
"type": "string"
},
{
"name": "sub_format_type",
"type": "string"
},
{
"name": "store_floor",
"type": "string"
},
{
"name": "index_name",
"type": "string"
},
{
"name": "store_address",
"type": "string"
}
]
}
}
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"intervals": [
"1001-01-01T00:00:00.000Z/2999-01-01T00:00:00.000Z"
]
}
},
"ioConfig": {
"type": "lucene_index",
"firehose": {
"type": "jdbc",
"provider": {
"type": "default",
"url": "jdbc:mysql://192.168.0.217:3306/mall_coo",
"user": "root",
"password": "000000",
"encrypt": false,
"driverClassName":"com.mysql.jdbc.Driver",
"driverPath":"/data1/druid/driver_jars/mysql-connector-java-5.1.38.jar"
},
"sqls": [
"select * from store_info"
]
}
},
"tuningConfig": {
"type": "lucene_index",
"maxRowsPerSegment": 500000,
"numShards": -1,
"basePersistDirectory": null,
"overwrite": false,
"reportParseExceptions": true
},
"writerConfig": {
"type": "lucene",
"maxBufferedDocs": -1,
"ramBufferSizeMB": 16,
"indexRefreshIntervalSeconds": 6
},
"context": {
"debug": true,
"throwAwayBadData": false
}
}
参数说明:
属性名 | 值 | 类型 | 是否必需 | 默认值 | 说明 |
---|---|---|---|---|---|
type | lucene_index/lucene_upsert | string | 是 | - | 指定接入类型,枚举值 |
spec.dataSchema | 参见DataSchema | json | 是 | - | 定义表结构和数据粒度 |
spec.ioConfig | 参见luceneIndexIOConfig | json | 是 | - | 定义数据来源 |
spec.tuningConfig | 参见luceneIndexTuningConfig | json | 是 | - | 配置Task的优化参数 |
writerConfig | 参见WriterConfig | json | 是 | - | 配置数据段的写入参数 |
context | 自定义 | json | 否 | null | 配置Task的上下文环境参数 |