KafkaConnecttransform初体验
可以配置Kafka Connector transforms以进⾏⼀个轻量级的消息修改。transforms可以⽅⾯的修改数据以及事件路由。
接下来我们将介绍如何配置⼀个transform。
⾸先,我们要在任务中配置transform,⽆论source 还是sink都可以配置transform
{
"name": "test-source",
"config": {
"connector.class": "sql.MySqlConnector",
"databa.hostname": "localhost",
"databa.port": "3306",
"databa.ur": "root",
"databa.password": "root",
"delete": "fal",
"databa.rver.id": "1",
"databa.rver.name": "test-rver",
"databa.pic": "test-rver-history",
"databa.history.kafka.bootstrap.rvers": "localhost:9092",
"include.schema.changes": "true",
"databa.rverTimezone": "Asia/Shanghai",
"databa.driver": "sql.jdbc.Driver",
"databa.very.poll.interval.ms": "3000",
"defaultFetchSize": "1000",
"databa.tinyInt1isBit": "fal",
"de": "none",
"de": "string",
"transforms": "MakeMap,InrtSource",
"pe":"org.t.transforms.HoistField$Value",
"transforms.MakeMap.field":"line",
"pe":"org.t.transforms.InrtField$Value",
"transforms.InrtSource.static.field":"data_source",
"transforms.InrtSource.static.value":"test-file-source"
}
}
这是官⽅配置⼀个transforms的例⼦。主要配置有transforms,这个key下⾯主要配置有哪些transforms,在有多个transforms的情况下,transform执⾏的顺序是按照配置的顺序,在本⽂后⾯我们会贴出源码来向⼤家说明。
官⽅以及第三⽅提供了很多transforms,在满⾜我们需求的情况下可以直接使⽤,如果不能满⾜我们的需求我们将需要⾃⼰开发或者是⼆次开发transform。
我们来介绍下官⽅⽬前有哪些transforms:
1. Cast
2. ExtractField
3. Flatten
4. HoistField
5. InrtField
6. MaskField
7. RegexRouter
8. ReplaceField
9. TimestampConverter
10. SetSchemaMetadata
11. TimestampRouter
12. ValueToKey
confluent⽀持的transform有:
1. Cast 使⽤的是Kafka的包(org.t.transforms.Cast$Key 或者是
org.t.transforms.Cast$Value)
2. Drop(t.transforms.Drop$Key 或者是 t.transforms.Drop$Value)
3. ExtractField 使⽤的是Kafka的包 org.t.transforms.ExtractField$Value
or org.t.transforms.ExtractField$Key
4. ExtractTopic (t.transforms.ExtractTopic$Key 或者是
5. Filter (t.transforms.Filter$Key 或者是 t.transforms.Filter$Value)
6. Flatten (org.t.transforms.Flatten$Key 或者是 org.t.transforms.Flatten$Value)
7. HoistField (org.t.transforms.HoistField$Key 或者是
org.t.transforms.HoistField$Value)
8. InrtField (org.t.transforms.InrtField$Key 或者是
org.t.transforms.InrtField$Value)
9. MaskField (org.t.transforms.MaskField$Key 或者是
org.t.transforms.MaskField$Value)
10. MessageTimeStampRouter (t.transforms.MessageTimestampRouter)
11. RegexRouter 使⽤Kafka的包 org.t.transforms.RegexRouter
12. ReplaceField (org.t.transforms.ReplaceField$Key 或者是
org.t.transforms.ReplaceField$Value)
13. SetSchemaMetadata (org.t.transforms.SetSchemaMetadata$Key 或者是
org.t.transforms.SetSchemaMetadata$Value)
14. TimestampConverter (org.t.transforms.TimestampConverter$Key 或者是
org.t.transforms.TimestampConverter$Value)
15. TimestampRouter (org.t.transforms.TimestampRouter)
16. TombstoneHandler (t.transforms.TombstoneHandler)
17. ValueToKey (org.t.transforms.ValueToKey)
具体使⽤将在后期的⽂档中详细说明。
打个比方在这些transform不能满⾜我们需求的情况下,我们只能开发⾃⼰的transform了,
开发transform主要是实现这个接⼝:Transformation
针灸作用实现的⽅法有:
1. configure 因为Transformation继承了Configurable类,这⾥从source、sink 配置的json中获取transform的配置信息
2. apply 主要处理数据的地⽅,例如类型转换、数据结果转换、数据值的转换都在这⾥处理
3. clo 因为 Transformation 继承了Cloable类
4. config 这⾥主要配置transform相关的配置,例如我们上⾯提到的:
"pe":"org.t.transforms.HoistField$Value",
"transforms.MakeMap.field":"line",
这⾥配置的值在config值能否在transform中能获取到可以在这个⽅法⾥做处理。
接下来我们将介绍⼀个transform的源码:
public class RegexRouter<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC = "Update the record topic using the configured regular expression and replacement string."
海字成语+ "<p/>Under the hood, the regex is compiled to a <code>Pattern</code>. "
+ "If the pattern matches the input topic, <code>Matcher#replaceFirst()</code> is ud with the replacement string to obtain the new topic.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(),
ConfigDef.Importance.HIGH,
"Regular expression to u for matching.")
.define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH,
"Replacement string.");
private interface ConfigName {
String REGEX = "regex";
String REPLACEMENT = "replacement";
}
private Pattern regex;
private String replacement;
@Override
public void configure(Map<String, ?> props) {
扬州游玩攻略
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
regex = String(ConfigName.REGEX));
replacement = String(ConfigName.REPLACEMENT);
}
@Override
public R apply(R record) {
final Matcher matcher = regex.pic());
if (matcher.matches()) {
final String topic = placeFirst(replacement);
wRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
}
return record;
}
@Override
public void clo() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}
从这个代码⽚段可以看出,这个transform只接受两个配置项的值:
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(),
ConfigDef.Importance.HIGH,
"Regular expression to u for matching.")
.define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH,
化妆品真假查询
"Replacement string.");
⼀个是ConfigName.REGEX(replacement), 另⼀个是ConfigName.REPLACEMENT(replacement)
从配置configure⽅法中可以看出根据我们配置的CONFIG_DEF获取其对于配置项的值
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
regex = String(ConfigName.REGEX));
replacement = String(ConfigName.REPLACEMENT);
}
⽽在这个例⼦中clo⽅法是空的,说明这边在调⽤clo⽅法的时候没有需要执⾏的,例如不需要释放资源、 不需要关闭连接等。
接下来我们将说这个transform中的重点了:apply⽅法,其主要思路是获取这条数据(record)的topic,然后根据我们配置的regex去匹配,如果能匹配上,将对topic的值替换成我们配置的值,并重新⽣成新的数据(record);如果没有匹配上则返回其原始值。
@Override
public R apply(R record) {
final Matcher matcher = regex.pic());
if (matcher.matches()) {
final String topic = placeFirst(replacement);盐菜的做法
wRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(),
record.timestamp());
}
return record;
}
⾃此⼀个tranfrom的处理就完成了,会继续往下传,如果有多个transfrom将会⼀⼀执⾏。我们来看下transform在Kafka Connet运⾏的时候是如何执⾏的。
源码在 org.t.runtime.WorkerSourceTask#ndRecords 发送记录给下游的时候调⽤
在发送每条数据(record)的时候会调⽤:final SourceRecord record = transformationChain.apply(preTransformRecord);
⽽在apply⽅法的实现是:牙疼是什么原因
public R apply(R record) {
if (transformations.isEmpty()) return record;
for (Transformation<R> transformation : transformations) {
record = transformation.apply(record);
if (record == null) break;
}
return record;
}
代码简洁,如果我们没有配置transform直接原记录返回,如果有配置transform,会按照顺序,对数据(record)应⽤⼀次transform,然后返回结果。
transform配置项的获取源码:
public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {提防近义词
final List<String> transformAlias = getList(TRANSFORMS_CONFIG);
final List<Transformation<R>> transformations = new ArrayList<>(transformAlias.size());
for (String alias : transformAlias) {
final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
final Transformation<R> transformation;
try {
transformation = getClass(prefix + "type").asSubclass(Transformation.class).newInstance();
} catch (Exception e) {
throw new ConnectException(e);
}
transformations.add(transformation);
}
return transformations;
}
...
public static final String TRANSFORMS_CONFIG = "transforms";
Okay, 到这⾥我们我们介绍完了 transform的定义、使⽤、如何开发⼀个transform以及transform在Kafka Connect中是如何执⾏的。
后⾯我们将介绍Kafka Connect source connector 以及如何开发⼀个source connector。敬请期待,如果有问题可以和我留⾔哦~