Upsert Task

Upsert Task支持用户新增/更新/删除 特定的记录

task说明:

  • Upsert类型的task支持行级数据的新增,修改和删除,由actionColumn指定特定列,该列的数据将标识行的记录是执行新增,修改或删除操作.该列的值只允许a,u,d三个值,分别代表新增,修改,删除操作.
  • Upsert类型的task实现了类似MySQL的UPSERT特性,即如果存在则更新,如果不存在则新增.是否存在是根据指定的列值进行判断,通常是主键,比如订单表的主键是订单号orderid,则根据orderid判断记录是否存在.
  • 为了简化数据处理逻辑,默认情况下无需指定actionColumn,所有记录都当做更新操作,如果存在则修改,如果不存在则新增.此时不能处理删除操作.如果需要处理删除操作,需要在context中添加allUpSert参数,并指定为false.

示例json配置

 {
  "type" : "lucene_upsert", 
  "filterColumns" : [ "so_no" ],
  "actionColumn" : "action",
  "dataSchema" : {
    "dataSource" : "tindex_Og_fvL1oK_project_0I3UWMyC9G",
    "parser" : {
      "parseSpec" : {
        "format" : "tsv",
        "timestampSpec" : {
          "column" : "so_entry_time",
          "format" : "yyyyy-MM-dd HH:mm:ss.SSS",
          "timezone" : "Asia/Shanghai"
        },
        "dimensionsSpec" : {
          "dimensionExclusions" : [ ],
          "spatialDimensions" : [ ],
          "dimensions" : [ {
            "name" : "so_no",
            "type" : "string"
          }, {
            "name" : "dealer_no",
            "type" : "string"
          }, {
            "name" : "dealer_name",
            "type" : "string"
          }, {
            "name" : "sex",
            "type" : "string"
          }, {
            "name" : "contact_mobile",
            "type" : "string"
          }, {
            "name" : "contact_tele",
            "type" : "string"
          }]
        },
        "delimiter" : "\u0001",
        "listDelimiter" : "\u0002",
        "columns" : [ "so_no", "dealer_no", "dealer_name", "sex", "contact_mobile", "contact_tele", "so_entry_time"]
      }
    },
    "granularitySpec" : {
      "type" : "uniform",
      "segmentGranularity" : "DAY",
      "intervals" : [ "1001-01-01T00:00:00.000Z/2999-01-01T00:00:00.000Z" ]
    }
  },
  "ioConfig" : {
    "type" : "lucene_index",
    "firehose" : {
      "type" : "hdfs",
      "baseDir" : "/user/hive/warehouse/wuxianji_poc.db/test2",
      "filter" : "*",
      "readThreads" : 3,
      "dataQueueThreshold" : 5000
    }
  },  
  "tuningConfig" : {
    "type" : "lucene_index",
    "maxRowsPerSegment" : 500000,
    "numShards" : -1,
    "basePersistDirectory" : null,
    "overwrite" : false,
    "reportParseExceptions" : true
  },
 "writerConfig" : {
   "type" : "lucene",
   "maxBufferedDocs" : -1,
   "ramBufferSizeMB" : 16.0,
   "indexRefreshIntervalSeconds" : 6
 },
  "context" : {
    "debug" : true,
    "throwAwayBadData" : false,
    "disableUpSet" : false
  }
}
属性名 类型 是否必需 默认值 说明
type lucene_supervisor string - 指定接入类型,注意:lucene_index也支持嵌套数据接入
filterColumns 自定义 list - 指定主键列,支持指定多个主键,以,分隔
actionColumn 自定义 string - 指定操作列,比如action,记录值可选为a/au/u/dcontext.allUpSert=true时可以不指定
dataSchema 参见DataSchema json - 定义表结构和数据粒度
ioConfig 参见luceneIndexIOConfig json - 定义数据来源
tuningConfig 参见luceneIndexTuningConfig json - 配置Task的优化参数
writerConfig 参见WriterConfig json - 配置数据段的写入参数
context 自定义 json null 配置Task的上下文环境参数

特殊参数说明:

  • worker: 制定task运行的worker节点,需要使用域名,或参考task管理界面的Remote Workers表格
  • dataSchema: 参考其他task的配置
  • context.throwAwayBadData: 是否直接丢弃解析出错的数据而不停止task.默认为false,即如果遇到解析报错的数据,则停止task.如果确定只是少量数据有问题,并且直接丢弃也不影响业务,则可设置为true(请慎重).
  • context.commitThreshold:写入时多少条记录commit一次,默认500.
  • context.idealPersistThreadSize:处理数据更新时由于从数据源读数据比较快,但写入相对较慢,因此采用了生产者消费者模式,一个生产者读取数据后往队列中写数据,默认最大开启2个消费者.
  • context.persistQueueThreshold:指定消息队列的大小,默认1000.
  • context.allUpSert: 默认为true,即所有的记录都当做更新或新增,无需指定actionColumn参数.如果设置为false,则必须指定actionColumn,如果设置action为修改(u),但原有数据中找的记录数不等于1,则丢弃数据,而不采用新增方式插入数据,即不执行insert操作..
© 广东数果 all right reserved,powered by Gitbook问题反馈邮件:developer@sugo.io 2020-11-12 17:52:00

results matching ""

    No results matching ""