You need to enable JavaScript to run this app.
最新活动
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

KafkaConnectBigQuerySinkConnector从SchemaRegistry请求不存在的键-主题名称。

可以通过以下代码解决问题:

  1. 首先,确保您的Schema Registry主题在注册表中具有存在且正确配置的架构文件。

  2. 然后,您需要确认Sink Connector的配置文件中设置了正确的主题名称。

  3. 最后,您可以使用以下代码来验证设置是否正确:

import org.apache.kafka.connect.errors.ConnectException; 
import io.confluent.connect.avro.AvroData; 
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; 
import io.confluent.kafka.schemaregistry.client.rest.RestService; 
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; 
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; 
import org.apache.kafka.common.config.ConfigDef; 
import org.apache.kafka.common.config.ConfigException; 
import org.apache.kafka.connect.connector.ConnectorContext; 
import org.apache.kafka.connect.errors.ConnectException; 
import org.apache.kafka.connect.sink.SinkConnector; 
import org.apache.kafka.connect.sink.SinkRecord; 
import org.apache.kafka.connect.sink.SinkTask; 
import org.apache.kafka.connect.sink.SinkTaskContext; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class BigQuerySinkTask extends SinkTask { 

private static final Logger log = LoggerFactory.getLogger(BigQuerySinkTask.class); 

private String tableName; 
private AvroData avroData; 
private SchemaRegistryClient schemaRegistry; 
private RestService restService; 

@Override 
public String version() { 
return "1.0"; 
} 

@Override 
public void start(Map<String, String> props) { 
String topicName = props.get(BigQuerySinkConnector.TOPIC_CONFIG); 
if (topicName == null || topicName.isEmpty()) { 
throw new ConnectException("Missing required configuration \"" + BigQuerySinkConnector.TOPIC_CONFIG + "\""); 
} 

tableName = props.get(BigQuerySinkConnector.TABLE_CONFIG); 
if (tableName == null || tableName.isEmpty()) { 
throw new ConnectException("Missing required configuration \"" + BigQuerySinkConnector.TABLE_CONFIG + "\""); 
} 

KafkaAvroSerializerConfig serializerConfig = new KafkaAvroSerializerConfig(props); 
KafkaAvroDeserializerConfig deserializerConfig = new KafkaAvroDeserializerConfig(props); 

HashMap<String, Object> clientConfig = new HashMap<String, Object>(); 
clientConfig.put("schema.registry.url", props.get(BigQuerySinkConnector.SCHEMA_REGISTRY_CONFIG)); 

schemaRegistry = new CachedSchemaRegistryClient(restService = new RestService(props.get(BigQuerySinkConnector.SCHEMA_REGISTRY_CONFIG)), 0); 
avroData =
本文内容通过AI工具匹配关键字智能整合而成,仅供参考,火山引擎不对内容的真实、准确或完整作任何形式的承诺。如有任何问题或意见,您可以通过联系service@volcengine.com进行反馈,火山引擎收到您的反馈后将及时答复和处理。
展开更多
面向开发者的云福利中心,ECS 60元/年,域名1元起,助力开发者快速在云上构建可靠应用

社区干货

「火山引擎」数智平台 VeDI 数据中台产品双月刊 VOL.03

Kafka、ClickHouse、Hudi、Iceberg 等大数据生态组件,100%开源兼容,支持构建实时数据湖、数据仓库、湖仓一体等数据平台架构,帮助用户轻松完成企业大数据平台的建设,降低运维门槛,快速形成大数据分析能力。## **产... 支持 ETL 工具 DBT connector,进一步完善任务调度、上下游对接能力。 - 正式上架 AWS Marketplace,已有 AWS 账户可以便捷地开通和使用 ByteHouse。- **【** **新增** **ByteHouse** **企业版功能...

干货|字节跳动基于 Apache Hudi 的多流拼接实践

字节跳动存在较多业务场景需要基于具有相同主键的多个数据源实时构建一个大宽表,数据源一般包括 Kafka 中的指标数据,以及 KV 数据库中的维度数据。业务侧通常会基于实时计算引擎在流上做多个数据源的 JOIN 产出这个... ## **2.4 读取过程** 接下来,介绍多流拼接场景下 Snapshot Query 核心过程,即先对 LogFile 进行去重合并,然后再合并 BaseFile 和 去重后的 LogFile 中的数据。图 3 显示了整个数据合并的过程,具体可以拆分成以...

火山引擎 DataLeap 套件下构建数据目录(Data Catalog)系统的实践

产品名称 | 支持元数据种类 | 重要产品功能 | 机器学习能力 | 获取信息途径 | 特点分析 ... Connector市场等 | 有 | demo和文档 | 功能丰富,成熟度高,产品设计上有诸多可借鉴之处 || A** | 60+ | 搜索、血缘、标签、问答、Connect...

火山引擎 DataLeap 构建Data Catalog系统的实践(三):关键技术与总结

都含有名称、描述、字段等属性,他们都继承自DataStore这个父Type。另外一种情况,有些类型的实体可以作用于多种其他的实体,比如一张Hive表和一堆被组织在一起的业务报表,都可以被用户收藏或点赞。我们将收藏、点赞... 火山引擎 DataLeap 研发人员将某一种元数据类型的接入逻辑封装为一个connector,并通过提供SDK的方式简化connector的编写成本。以使用最广泛的T+1 bridge接入的connector SDK为例,我们参照时下流行的Flink流式处理...

特惠活动

热门爆款云服务器

100%性能独享,更高内存性能更佳,学习测试、web前端、企业应用首选,每日花费低至0.55元
60.00/1212.00/年
立即购买

域名注册服务

cn/top/com等热门域名,首年低至1元,邮箱建站必选
1.00/首年起32.00/首年起
立即购买

DCDN国内流量包100G

同时抵扣CDN与DCDN两种流量消耗,加速分发更实惠
2.00/20.00/年
立即购买

KafkaConnectBigQuerySinkConnector从SchemaRegistry请求不存在的键-主题名称。 -优选内容

Kafka/BMQ
用作数据目的(Sink)SQL CREATE TABLE kafka_sink ( name String, score INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_topic_01', 'properties.bootstrap.servers' = 'localhost:9092'... 在某些情况下可能无法自动提交 Kafka offset 信息。 topic 是 (none) String 指定 Kafka Topic 的名称。 properties.bootstrap.servers 是 (none) String 指定 Kafka Broker 的地址,格式为host:port。 ...
Upsert Kafka
Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新或删除消息将落在同一分区中。 使用限制Upsert-kafka 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。 DDL 定义SQL CREATE TABLE upsert_kafka_sink ( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY (user_region) NOT ENFORCED) WITH ( 'connector' = 'upsert-kafka', 'topic' = ' ', 'properties.bootstrap.ser...
通过 Kafka 协议消费日志
需要为指定日志主题开启 Kafka 消费功能。 登录日志服务控制台。 在左侧导航栏中选择日志服务 > 日志项目管理。 单击指定日志项目名称。 在日志主题列表中找到指定的日志主题,并单击日志主题名称。Kafka协议消费... 2 通过 Kafka 协议消费日志目前日志服务支持通过 Kafka Java SDK 或 Spark、Flink 等框架的 Kafka Connector 插件进行日志数据消费,您可以参考下文配置 Kafka 基本参数,并参考示例代码消费日志数据。 说明 Ka...
使用Logstash消费Kafka中的数据并写入到云搜索
前言 Kafka 是一个分布式、支持分区的(partition)、多副本的(replica) 分布式消息系统, 深受开发人员的青睐。 云搜索服务是火山引擎提供的完全托管的在线分布式搜索服务,兼容 Elasticsearch、Kibana 等软件及常用开... kafka.ivolces.com:9092 --topic quickstart-events> 1 rudonx> 2 liwangz> 步骤四:在云搜索中查查看数据我们可以在云搜索控制台找到 Kibana 地址,进行登录后运行如下语句: GET kafkatoes/_search{ "query": { ...

KafkaConnectBigQuerySinkConnector从SchemaRegistry请求不存在的键-主题名称。 -相关内容

配置 Kafka 数据源

*Topic名称 选择 Kafka 处理消息源的不同分类主题名称,下拉可选数据源下对应需写入数据的 Topic 名称。 *数据格式 默认仅支持 json 格式,不可编辑。 示例数据 需以 json 字符串形式描述 schema。必须填写完... "child_connector_type":"kafka220", "columns": [ { "upperCaseName": "ID", "name": "id", "type": "BIGINT"...

EMR-3.9.0发布说明

环境信息版本 环境 OS veLinux(Debian 10兼容版) Python2 2.7.16 Python3 3.10.13 Java ByteOpenJDK 1.8.0_352 系统环境应用程序版本 Hadoop集群 Flink集群 Kafka集群 Pulsar集群 Presto集群 Trino集群 HBase集群 ... 且在Spark和Flink中集成了StarRocks connector。 【组件】Hudi组件版本由0.12.2升级为0.14.1。 【组件】Iceberg组件版本由1.2.0升级为1.4.3。 【组件】Airflow组件版本由2.4.2升级为2.7.3。 【组件】DolphinSc...

创建 TOS Sink Connector 任务

本文档介绍创建 TOS Sink Connector 任务的操作步骤,成功创建任务后,Kafka Topic 中的消息将根据指定的聚合规则同步到对象存储 TOS 的存储桶中。 前提条件已创建消息队列 Kafka版实例,且实例状态为运行中。 已为指... Kafka版控制台。 在顶部菜单栏中选择地域,并在左侧导航栏中单击Connector任务。 在实例下拉列表中选择需要指定实例。 单击创建Connector任务。 填写任务的基本信息,并单击下一步。 配置 说明 任务名称 Conn...

热门爆款云服务器

100%性能独享,更高内存性能更佳,学习测试、web前端、企业应用首选,每日花费低至0.55元
60.00/1212.00/年
立即购买

域名注册服务

cn/top/com等热门域名,首年低至1元,邮箱建站必选
1.00/首年起32.00/首年起
立即购买

DCDN国内流量包100G

同时抵扣CDN与DCDN两种流量消耗,加速分发更实惠
2.00/20.00/年
立即购买

「火山引擎」数智平台 VeDI 数据中台产品双月刊 VOL.03

Kafka、ClickHouse、Hudi、Iceberg 等大数据生态组件,100%开源兼容,支持构建实时数据湖、数据仓库、湖仓一体等数据平台架构,帮助用户轻松完成企业大数据平台的建设,降低运维门槛,快速形成大数据分析能力。## **产... 支持 ETL 工具 DBT connector,进一步完善任务调度、上下游对接能力。 - 正式上架 AWS Marketplace,已有 AWS 账户可以便捷地开通和使用 ByteHouse。- **【** **新增** **ByteHouse** **企业版功能...

EMR-3.0.0版本说明

环境信息 系统环境版本 环境 OS veLinux(Debian 10兼容版) Python2 2.7.16 Python3 3.7.3 Java ByteOpenJDK 1.8.0_302 应用程序版本 Hadoop集群 Flink集群 Kafka集群 Presto集群 Trino集群 HBase集群 OpenSearch集... 修复某些查询不能回退到非向量化引擎并导致 BE Core的问题; 修复 Compaction 不能正常工作导致的 -235 错误。 【组件】Presto、Trino 自定义connector:可以在控制台 服务列表- Presto/Trino - 服务参数中的co...

EMR-3.8.0 版本说明

环境信息 版本 环境 OS veLinux(Debian 10兼容版) Python2 2.7.16 Python3 3.10.13 Java ByteOpenJDK 1.8.0_352 系统环境应用程序版本 Hadoop集群 Flink集群 Kafka集群 Pulsar集群 Presto集群 Trino集群 HBase集群... Flink 集成Bytehouse CE Connector,实现数据写入能力。 【组件】开箱参数优化: Kyuubi组件默认开启Spark动态资源调整参数。 Doris组件根据ECS机型动态设置内存。 【组件】存算分离场景下,优化 Spark 关于job c...

EMR-3.0.1版本说明

环境信息 系统环境版本 环境 OS veLinux(Debian 10兼容版) Python2 2.7.16 Python3 3.7.3 Java ByteOpenJDK 1.8.0_302 应用程序版本 Hadoop 集群 Flink 集群 Kafka 集群 Presto 集群 Trino 集群 HBase 集群 OpenSe... hive_server 3.1.2 用于将 Hive 查询作为 Web 请求接受的服务。 hive_client 3.1.2 Hive 命令行客户端。 hdfs_namenode 3.3.1 用于跟踪 HDFS 文件名和数据块的服务。 hdfs_datanode 3.3.1 存储 HDFS 数据块的节点...

EMR-3.4.0 版本说明

环境信息 系统环境版本 环境 OS veLinux(Debian 10兼容版) Python2 2.7.16 Python3 3.7.3 Java ByteOpenJDK 1.8.0_352 应用程序版本 Hadoop集群 Flink集群 Kafka集群 Presto集群 Trino集群 HBase集群 OpenSearch集... Flink组件增加对Connector的支持,提供HBase 、JDBC、Mysql CDC Connector 【组件】Spark组件增加数据源的支持,提供对Kudu、HBase、Phoenix数据源的支持 【组件】提供Tez webUI,便于分析任务的执行状态 更改、增...

DescribeKafkaConsumer

调用 DescribeKafkaConsumer 查看指定日志主题的 Kafka 消费功能状态。 使用说明此接口调用频率限制为 20 次/s,超出频率限制会报错 ExceedQPSLimit。 请求说明请求方式:GET 请求地址:https://tls-{Region}.ivolces.com/DescribeKafkaConsumer 请求参数下表仅列出该接口特有的请求参数和部分公共参数。更多信息请见公共参数。 Query参数 类型 是否必选 示例值 描述 TopicId String 是 c7e0e442-19bf-4fb3-b547-5992fb8b**** 日志主...

特惠活动

热门爆款云服务器

100%性能独享,更高内存性能更佳,学习测试、web前端、企业应用首选,每日花费低至0.55元
60.00/1212.00/年
立即购买

域名注册服务

cn/top/com等热门域名,首年低至1元,邮箱建站必选
1.00/首年起32.00/首年起
立即购买

DCDN国内流量包100G

同时抵扣CDN与DCDN两种流量消耗,加速分发更实惠
2.00/20.00/年
立即购买

产品体验

体验中心

云服务器特惠

云服务器
云服务器ECS新人特惠
立即抢购

白皮书

一图详解大模型
浓缩大模型架构,厘清生产和应用链路关系
立即获取

最新活动

爆款1核2G共享型服务器

首年60元,每月仅需5元,限量秒杀
立即抢购

火山引擎增长体验专区

丰富能力激励企业快速增长
查看详情

数据智能VeDI

易用的高性能大数据产品家族
了解详情

一键开启云上增长新空间

立即咨询