Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I am trying to setup a simple kafka stack in local and I am at the point where I need to create a toy Producer. This: https://lombardo-chcg.github.io/tools/2017/09/29/kafka-avro-producer-in-scala.html (see below for the piece of code I'm interested in) is almost exactly what I want except:

Here the producer sends a GenericData.Record object, so the whole schema is sent and it doesn't leverage the schema registry. I want to send an Array[Byte] with the first few bytes being the id of the schema and the following bytes being the data, without the schema (or so I think it is the optimal way to do it)

The piece of code I am talking about:

import java.util.Properties

import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.slf4j.LoggerFactory

case class User(name: String, favoriteNumber: Int, favoriteColor: String)

class AvroProducer {
  val logger = LoggerFactory.getLogger(getClass)

  val kafkaBootstrapServer = sys.env("KAFKA_BOOTSTRAP_SERVER")
  val schemaRegistryUrl = sys.env("SCHEMA_REGISTRY_URL")

  val props = new Properties()
  props.put("bootstrap.servers", kafkaBootstrapServer)
  props.put("schema.registry.url", schemaRegistryUrl)
  props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
  props.put("acks", "1")

  val producer = new KafkaProducer[String, GenericData.Record](props)
  val schemaParser = new Parser

  val key = "key1"
  val valueSchemaJson =
  s"""
    {
      "namespace": "com.avro.junkie",
      "type": "record",
      "name": "User2",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "favoriteNumber",  "type": "int"},
        {"name": "favoriteColor", "type": "string"}
      ]
    }
  """
  val valueSchemaAvro = schemaParser.parse(valueSchemaJson)
  val avroRecord = new GenericData.Record(valueSchemaAvro)

  val mary = new User("Mary", 840, "Green")
  avroRecord.put("name", mary.name)
  avroRecord.put("favoriteNumber", mary.favoriteNumber)
  avroRecord.put("favoriteColor", mary.favoriteColor)

  def start = {
    try {
      val record = new ProducerRecord("users", key, avroRecord)
      val ack = producer.send(record).get()
      // grabbing the ack and logging for visibility
      logger.info(s"${ack.toString} written to partition ${ack.partition.toString}")
    }
    catch {
      case e: Throwable => logger.error(e.getMessage, e)
    }
  }
}

Problem(s):

  • I don't know how to retrieve the id of the schema from schema-registry
  • I don't know how to send only the data without the schema + the id as Array[Byte]

I know how to write the whole avro to Array[Byte]:

    val writer = new SpecificDatumWriter[GenericData.Record](valueSchemaAvro)
    val out = new ByteArrayOutputStream
    val encoder = EncoderFactory.get.binaryEncoder(out, null)
    writer.write(avroRecord, encoder) // but here I am also writing the schema, right?
    encoder.flush
    out.close
    out.toByteArray

thanks so much


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
3.9k views
Welcome To Ask or Share your Answers For Others

1 Answer

The first code does use the Schema Registry, and computes an ID + replaces the schema in the byte array for you within KafkaAvroSerializer

If you want to bypass the Schema Registry, use ByteArraySerializer and send the result of out.toByteArray in the second code block to the producer.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...