Upsert Task
Upsert Task支持用户新增/更新/删除 特定的记录
task说明:
Upsert
类型的task支持行级数据的新增,修改和删除,由actionColumn
指定特定列,该列的数据将标识行的记录是执行新增,修改或删除操作.该列的值只允许a
,u
,d
三个值,分别代表新增,修改,删除操作.Upsert
类型的task实现了类似MySQL的UPSERT特性,即如果存在则更新,如果不存在则新增.是否存在是根据指定的列值进行判断,通常是主键,比如订单表的主键是订单号orderid,则根据orderid判断记录是否存在.- 为了简化数据处理逻辑,默认情况下无需指定
actionColumn
,所有记录都当做更新操作,如果存在则修改,如果不存在则新增.此时不能处理删除操作.如果需要处理删除操作,需要在context
中添加allUpSert
参数,并指定为false
.
示例json配置
{
"type" : "lucene_upsert",
"filterColumns" : [ "so_no" ],
"actionColumn" : "action",
"dataSchema" : {
"dataSource" : "tindex_Og_fvL1oK_project_0I3UWMyC9G",
"parser" : {
"parseSpec" : {
"format" : "tsv",
"timestampSpec" : {
"column" : "so_entry_time",
"format" : "yyyyy-MM-dd HH:mm:ss.SSS",
"timezone" : "Asia/Shanghai"
},
"dimensionsSpec" : {
"dimensionExclusions" : [ ],
"spatialDimensions" : [ ],
"dimensions" : [ {
"name" : "so_no",
"type" : "string"
}, {
"name" : "dealer_no",
"type" : "string"
}, {
"name" : "dealer_name",
"type" : "string"
}, {
"name" : "sex",
"type" : "string"
}, {
"name" : "contact_mobile",
"type" : "string"
}, {
"name" : "contact_tele",
"type" : "string"
}]
},
"delimiter" : "\u0001",
"listDelimiter" : "\u0002",
"columns" : [ "so_no", "dealer_no", "dealer_name", "sex", "contact_mobile", "contact_tele", "so_entry_time"]
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"intervals" : [ "1001-01-01T00:00:00.000Z/2999-01-01T00:00:00.000Z" ]
}
},
"ioConfig" : {
"type" : "lucene_index",
"firehose" : {
"type" : "hdfs",
"baseDir" : "/user/hive/warehouse/wuxianji_poc.db/test2",
"filter" : "*",
"readThreads" : 3,
"dataQueueThreshold" : 5000
}
},
"tuningConfig" : {
"type" : "lucene_index",
"maxRowsPerSegment" : 500000,
"numShards" : -1,
"basePersistDirectory" : null,
"overwrite" : false,
"reportParseExceptions" : true
},
"writerConfig" : {
"type" : "lucene",
"maxBufferedDocs" : -1,
"ramBufferSizeMB" : 16.0,
"indexRefreshIntervalSeconds" : 6
},
"context" : {
"debug" : true,
"throwAwayBadData" : false,
"disableUpSet" : false
}
}
属性名 | 值 | 类型 | 是否必需 | 默认值 | 说明 |
---|---|---|---|---|---|
type | lucene_supervisor | string | 是 | - | 指定接入类型,注意:lucene_index 也支持嵌套数据接入 |
filterColumns | 自定义 | list | 是 | - | 指定主键列,支持指定多个主键,以, 分隔 |
actionColumn | 自定义 | string | 否 | - | 指定操作列,比如action ,记录值可选为a/au/u/d ,context.allUpSert=true 时可以不指定 |
dataSchema | 参见DataSchema | json | 是 | - | 定义表结构和数据粒度 |
ioConfig | 参见luceneIndexIOConfig | json | 是 | - | 定义数据来源 |
tuningConfig | 参见luceneIndexTuningConfig | json | 是 | - | 配置Task的优化参数 |
writerConfig | 参见WriterConfig | json | 是 | - | 配置数据段的写入参数 |
context | 自定义 | json | 否 | null | 配置Task的上下文环境参数 |
特殊参数说明:
worker
: 制定task运行的worker节点,需要使用域名,或参考task管理界面的Remote Workers
表格dataSchema
: 参考其他task的配置context.throwAwayBadData
: 是否直接丢弃解析出错的数据而不停止task.默认为false,即如果遇到解析报错的数据,则停止task.如果确定只是少量数据有问题,并且直接丢弃也不影响业务,则可设置为true(请慎重).context.commitThreshold
:写入时多少条记录commit一次,默认500.context.idealPersistThreadSize
:处理数据更新时由于从数据源读数据比较快,但写入相对较慢,因此采用了生产者消费者模式,一个生产者读取数据后往队列中写数据,默认最大开启2个消费者.context.persistQueueThreshold
:指定消息队列的大小,默认1000.context.allUpSert
: 默认为true,即所有的记录都当做更新或新增,无需指定actionColumn参数.如果设置为false,则必须指定actionColumn,如果设置action为修改(u
),但原有数据中找的记录数不等于1,则丢弃数据,而不采用新增方式插入数据,即不执行insert操作..