Upsert Task
Upsert Task支持用户新增/更新/删除 特定的记录
task说明:
- Upsert类型的task支持行级数据的新增,修改和删除,由- actionColumn指定特定列,该列的数据将标识行的记录是执行新增,修改或删除操作.该列的值只允许- a,- u,- d三个值,分别代表新增,修改,删除操作.
- Upsert类型的task实现了类似MySQL的UPSERT特性,即如果存在则更新,如果不存在则新增.是否存在是根据指定的列值进行判断,通常是主键,比如订单表的主键是订单号orderid,则根据orderid判断记录是否存在.
- 为了简化数据处理逻辑,默认情况下无需指定actionColumn,所有记录都当做更新操作,如果存在则修改,如果不存在则新增.此时不能处理删除操作.如果需要处理删除操作,需要在context中添加allUpSert参数,并指定为false.
示例json配置
 {
  "type" : "lucene_upsert", 
  "filterColumns" : [ "so_no" ],
  "actionColumn" : "action",
  "dataSchema" : {
    "dataSource" : "tindex_Og_fvL1oK_project_0I3UWMyC9G",
    "parser" : {
      "parseSpec" : {
        "format" : "tsv",
        "timestampSpec" : {
          "column" : "so_entry_time",
          "format" : "yyyyy-MM-dd HH:mm:ss.SSS",
          "timezone" : "Asia/Shanghai"
        },
        "dimensionsSpec" : {
          "dimensionExclusions" : [ ],
          "spatialDimensions" : [ ],
          "dimensions" : [ {
            "name" : "so_no",
            "type" : "string"
          }, {
            "name" : "dealer_no",
            "type" : "string"
          }, {
            "name" : "dealer_name",
            "type" : "string"
          }, {
            "name" : "sex",
            "type" : "string"
          }, {
            "name" : "contact_mobile",
            "type" : "string"
          }, {
            "name" : "contact_tele",
            "type" : "string"
          }]
        },
        "delimiter" : "\u0001",
        "listDelimiter" : "\u0002",
        "columns" : [ "so_no", "dealer_no", "dealer_name", "sex", "contact_mobile", "contact_tele", "so_entry_time"]
      }
    },
    "granularitySpec" : {
      "type" : "uniform",
      "segmentGranularity" : "DAY",
      "intervals" : [ "1001-01-01T00:00:00.000Z/2999-01-01T00:00:00.000Z" ]
    }
  },
  "ioConfig" : {
    "type" : "lucene_index",
    "firehose" : {
      "type" : "hdfs",
      "baseDir" : "/user/hive/warehouse/wuxianji_poc.db/test2",
      "filter" : "*",
      "readThreads" : 3,
      "dataQueueThreshold" : 5000
    }
  },  
  "tuningConfig" : {
    "type" : "lucene_index",
    "maxRowsPerSegment" : 500000,
    "numShards" : -1,
    "basePersistDirectory" : null,
    "overwrite" : false,
    "reportParseExceptions" : true
  },
 "writerConfig" : {
   "type" : "lucene",
   "maxBufferedDocs" : -1,
   "ramBufferSizeMB" : 16.0,
   "indexRefreshIntervalSeconds" : 6
 },
  "context" : {
    "debug" : true,
    "throwAwayBadData" : false,
    "disableUpSet" : false
  }
}
| 属性名 | 值 | 类型 | 是否必需 | 默认值 | 说明 | 
|---|---|---|---|---|---|
| type | lucene_supervisor | string | 是 | - | 指定接入类型,注意: lucene_index也支持嵌套数据接入 | 
| filterColumns | 自定义 | list | 是 | - | 指定主键列,支持指定多个主键,以 ,分隔 | 
| actionColumn | 自定义 | string | 否 | - | 指定操作列,比如 action,记录值可选为a/au/u/d,context.allUpSert=true时可以不指定 | 
| dataSchema | 参见DataSchema | json | 是 | - | 定义表结构和数据粒度 | 
| ioConfig | 参见luceneIndexIOConfig | json | 是 | - | 定义数据来源 | 
| tuningConfig | 参见luceneIndexTuningConfig | json | 是 | - | 配置Task的优化参数 | 
| writerConfig | 参见WriterConfig | json | 是 | - | 配置数据段的写入参数 | 
| context | 自定义 | json | 否 | null | 配置Task的上下文环境参数 | 
特殊参数说明:
- worker: 制定task运行的worker节点,需要使用域名,或参考task管理界面的- Remote Workers表格
- dataSchema: 参考其他task的配置
- context.throwAwayBadData: 是否直接丢弃解析出错的数据而不停止task.默认为false,即如果遇到解析报错的数据,则停止task.如果确定只是少量数据有问题,并且直接丢弃也不影响业务,则可设置为true(请慎重).
- context.commitThreshold:写入时多少条记录commit一次,默认500.
- context.idealPersistThreadSize:处理数据更新时由于从数据源读数据比较快,但写入相对较慢,因此采用了生产者消费者模式,一个生产者读取数据后往队列中写数据,默认最大开启2个消费者.
- context.persistQueueThreshold:指定消息队列的大小,默认1000.
- context.allUpSert: 默认为true,即所有的记录都当做更新或新增,无需指定actionColumn参数.如果设置为false,则必须指定actionColumn,如果设置action为修改(- u),但原有数据中找的记录数不等于1,则丢弃数据,而不采用新增方式插入数据,即不执行insert操作..