datax抽取es数据到hive
收到⼀个需求:将es集群的数据抽取到⼤数据平台
在hive创建⼀个对应数据表
create table if not exists ods.pr_es_test_orc(
clueId STRING,
brandId STRING,
clueEstype STRING
)row format delimited FIELDS TERMINATED BY'|'
STORED AS orc;
有些主要需要配置的点:
“endpoint” :es的ip地址,
“accessId”:⽤户名,
“accessKey”: 密码,
“index”: 数据库前缀*,( 其中的*是全匹配 )
“scroll”: 每次读取数据缓存时间,
{
"job":{
"tting":{
"speed":{
"channel":7
}
},
"content":[{
"reader":{
"name":"elasticarchreader",
"parameter":{
"endpoint":"XXX.XXX.XXX.XXX:9200",
"accessId":"XXXXXXX*",
"accessKey":"XXXXXXXXXXX",
"index":"XXXXXX-*",
"type":"_doc",
"scroll":"3m",
"headers":{
},
"arch":[{
"query":{
"bool":{
"filter":[
{
"range":{
"createdTime":{
"boost":1,
"from":"${st}", ,
"include_lower":true,
"include_upper":true,
"to":"${et}"
}
}
}
]
}
},
"size":10
}],
"table":{
"column":[
{"name":"clueId"},
{"name":"brandId"},
{"name":"clueEstype"}
]
]
}
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"defaultFS":"hdfs://${hdfs}",
"fileType":"ORC",
"path":"/ur/hive/warehou/ods.db/pr_es_test_orc", "fileName":"aaaaaa",
"column":[
{"name":"clueId", "type":"STRING"},
{"name":"brandId", "type":"STRING"},
{"name":"clueEstype", "type":"STRING"}
],
"writeMode":"append",
"fieldDelimiter":"|",
"compress":"NONE"
}
}
}]
}
}