问题描述
我没有看到如何使用 camel-avro 组件生成和使用 kafka avro 消息的示例?目前我的骆驼路线是这样的.为了使用模式注册和其他类似的道具,应该改变它使用camel-kafka-avro consumer &制片人.
I dont see an example of how to use camel-avro component to produce and consume kafka avro messages? Currently my camel route is this. what should it be changed in order to work with schema-registry and other props like this using camel-kafka-avro consumer & producer.
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); public void configure() { PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class); pc.setLocation("classpath:application.properties"); log.info("About to start route: Kafka Server -> Log "); from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}" + "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}" + "&seekTo={{consumer.seekTo}}" + "&groupId={{consumer.group}}" +"&valueDeserializer="+KafkaAvroDeserializer.class +"&keyDeserializer="+StringDeserializer.class ) .routeId("FromKafka") .log("${body}");
推荐答案
我正在回答我自己的问题,因为我在这个问题上坐了几天.我希望这个答案对其他人有帮助.
I'm answering my own question because I sat on this problem for couple days. I hope this answer will be helpful for others.
我尝试使用 io.confluent.kafka.serializers.KafkaAvroDeserializer 反序列化器并得到 kafka 异常.所以我不得不编写自己的反序列化器来做以下事情:
I tried to use io.confluent.kafka.serializers.KafkaAvroDeserializer deserializer and got kafka exception. so i had to write my own deserializer to do following things:
- 设置架构注册表
- 使用特定的 avro 阅读器(这意味着不是默认的 stringDeserializer)
然后我们必须访问schemaRegistry"、useSpecificAvroReader"并设置 AbstractKafkaAvroDeserializer(io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer) 的这些字段
then we must access "schemaRegistry", "useSpecificAvroReader" and set those fields of the AbstractKafkaAvroDeserializer(io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer)
这里是解决方案...
public static void main(String[] args) throws Exception { LOG.info("About to run Kafka-camel integration..."); CamelContext camelContext = new DefaultCamelContext(); // Add route to send messages to Kafka camelContext.addRoutes(new RouteBuilder() { public void configure() throws Exception { PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class); pc.setLocation("classpath:application.properties"); log.info("About to start route: Kafka Server -> Log "); from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}" + "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}" + "&seekTo={{consumer.seekTo}}" + "&groupId={{consumer.group}}" + "&keyDeserializer="+ StringDeserializer.class.getName() + "&valueDeserializer="+CustomKafkaAvroDeserializer.class.getName() ) .routeId("FromKafka") .log("${body}"); } }); camelContext.start(); // let it run for 5 minutes before shutting down Thread.sleep(5 * 60 * 1000); camelContext.stop(); }
DESERIALIZER CLASSS - 这设置了 schema.registry.url &在抽象 AbstractKafkaAvroDeserializer 级别使用.specific.avro.reader.如果我不设置这个,我会得到 kafka-config-exception.
package com.example.camel.kafka.avro; import java.util.Collections; import java.util.List; import java.util.Map; import io.confluent.common.config.ConfigException; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import org.apache.kafka.common.serialization.Deserializer; public class CustomKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> { private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081"; @Override public void configure(KafkaAvroDeserializerConfig config) { try { final List<String> schemas = Collections.singletonList(SCHEMA_REGISTRY_URL); this.schemaRegistry = new CachedSchemaRegistryClient(schemas, Integer.MAX_VALUE); this.useSpecificAvroReader = true; } catch (ConfigException e) { throw new org.apache.kafka.common.config.ConfigException(e.getMessage()); } } @Override public void configure(Map<String, ?> configs, boolean isKey) { configure(null); } @Override public Object deserialize(String s, byte[] bytes) { return deserialize(bytes); } @Override public void close() { } }