You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Flink Scala Shell中Confluent Schema Registry序列化器初始化失败

Let's break down the root cause and fix this issue step by step—you're hitting a combination of Avro SpecificRecord requirements and Flink Scala Shell class loading quirks.

Core Problem

The error Not a Specific class: class com.github.geoheil.streamingreference.Tweet means your Tweet class doesn't fully adhere to Avro's SpecificRecord contract. Avro requires specific generated methods (like getSchema(), get(int), put(int, Object)) and inheritance from SpecificRecordBase—handwritten Scala case classes or even avrohugger-generated classes can fail this check in Flink's class loader environment, especially in older versions like 1.10.1.


Step 1: Generate a Valid Avro SpecificRecord Class

Ditch avrohugger for this use case and use the official Avro code generation tools to ensure compatibility with Flink. Here's how to set it up with Gradle:

1.1 Add Gradle Avro Plugin

Update your build.gradle to use the official plugin:

plugins {
    id "com.github.davidmc24.gradle.plugin.avro" version "1.7.0"
}

dependencies {
    // Match Avro version compatible with Flink 1.10.1 (1.8.2 is tested)
    avro "org.apache.avro:avro:1.8.2"
}

avro {
    sourceSets = [sourceSets.main]
    createSetters = true
    fieldVisibility = "PRIVATE"
}

1.2 Define Avro Schema

Create a schema file src/main/avro/Tweet.avsc:

{
  "type": "record",
  "name": "Tweet",
  "namespace": "com.github.geoheil.streamingreference",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "text", "type": "string"},
    // Add your other fields here
  ]
}

1.3 Generate the Class

Run gradle generateAvroJava—this will create a Java class com.github.geoheil.streamingreference.Tweet that properly implements SpecificRecord and inherits from SpecificRecordBase.


Flink's shell uses a custom class loader, so you need to explicitly load your generated Avro class and required dependencies:

  1. Package your generated Avro class into a JAR (run gradle jar).
  2. Launch the Flink Scala Shell with all required dependencies added to the classpath:
./bin/flink-scala-shell.sh \
  --addclasspath /path/to/your-project-jar.jar \
  --addclasspath /path/to/flink-avro-confluent-registry-1.10.1.jar \
  --addclasspath /path/to/kafka-schema-registry-client-5.4.0.jar

Note: Use Confluent version 5.x (e.g., 5.4.0) to match Flink 1.10.1's compatibility.


Step 3: Initialize the Deserialization Schema Correctly

In the shell, import the generated class and initialize the deserializer:

import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import com.github.geoheil.streamingreference.Tweet

val schemaRegistryUrl = "http://your-schema-registry:8081"
val deserializer = ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet], schemaRegistryUrl)

Troubleshooting Checks

  • Verify the generated Tweet class: Open the compiled class and confirm it implements org.apache.avro.specific.SpecificRecord.
  • Rule out version conflicts: Ensure no conflicting Avro or Confluent JARs are present in Flink's lib directory.
  • Test with GenericRecord first: If you still hit issues, use ConfluentRegistryAvroDeserializationSchema.forGeneric(schemaRegistryUrl) to validate your Schema Registry connection is working.

内容的提问来源于stack exchange,提问作者Georg Heiler

火山引擎 最新活动