You need to enable JavaScript to run this app.
导航

如何使用Nginx代理访问VPC内的自建Kafka

最近更新时间2022.05.25 13:09:53

首次发布时间2022.05.25 13:09:53

前言

对于一些自建在VPC内的Kafka有暴露到外网的需求,那么我们就可以通过Nginx代理来做四层代理,转发请求。

关于实验

预计部署时间:30分钟
级别:初级
相关产品:同VPC内的ECS两台(1台做Nginx代理,1台做Kafka Server)
受众: 通用

环境说明
  1. 如果还没有火山引擎账号,点击此链接注册账号

  2. 如果您还没有VPC,请先点击链接创建VPC

  3. 云服务器ECS:Centos 7

  4. 本地电脑准备python环境,默认生产和消费消息。

实验步骤

步骤1:部署配置Nginx代理

1.下载安装nginx,确保编译过程中添加"--with-stream"模块,如果需要其他模块可以自行参考Nginx官网文档

#下载Nginx源码包
wget https://nginx.org/download/nginx-1.20.1.tar.gz

#解压源码包
tar -zxvf nginx-1.20.1.tar.gz 

#进入解压后的目录并编译安装软件
cd nginx-1.20.1
./configure --with-stream 
make && make install

2.检查运行Nginx是否有启动stream模块

[root@JMS conf.d]# nginx -V 2>&1| grep stream
configure arguments: --prefix=/etc/nginx --sbin-path=/usr/sbin/nginx --conf-path=/etc/nginx/nginx.conf --error-log-path=/var/log/nginx/error.log --http-log-path=/var/log/nginx/access.log --pid-path=/var/run/nginx.pid --lock-path=/var/run/nginx.lock --http-client-body-temp-path=/var/cache/nginx/client_temp --http-proxy-temp-path=/var/cache/nginx/proxy_temp --http-fastcgi-temp-path=/var/cache/nginx/fastcgi_temp --http-uwsgi-temp-path=/var/cache/nginx/uwsgi_temp --http-scgi-temp-path=/var/cache/nginx/scgi_temp --user=nginx --group=nginx --with-file-aio --with-threads --with-http_addition_module --with-http_auth_request_module --with-http_dav_module --with-http_flv_module --with-http_gunzip_module --with-http_gzip_static_module --with-http_mp4_module --with-http_random_index_module --with-http_realip_module --with-http_secure_link_module --with-http_slice_module --with-http_ssl_module --with-http_stub_status_module --with-http_sub_module --with-http_v2_module --with-mail --with-mail_ssl_module --with-stream --with-stream_realip_module --with-stream_ssl_module --with-stream_ssl_preread_module

3.修改配置文件,本实验只部署了单点的Kafka测试,如果是生产环境需要再upstream中添加多个kafka地址。

stream{
    upstream brokers{
        server 192.168.1.254:9092;
    }
    server{
        listen 9092;
        proxy_pass brokers;
    }
}

4.热加载配置文件,确认配置文件编写无误程序没有报错。

[root@JMS conf.d]# nginx -s reload

步骤2:部署配置Kafka

  1. 安装Kafka服务(需要配置好JAVA环境,本实验不做具体步骤说明)
# 下载并解压软件包
 wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz
 
# 解压进入目录
 tar -xzf kafka_2.13-3.0.0.tgz
 cd kafka_2.13-3.0.0
 
#启动自带的zookeeper
 `bin/zookeeper-server-start.sh config/zookeeper.properties`
  1. 添加域名解析
# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.1.254 opendts
  1. 修改server.properties的配置文件
listener.security.protocol.map=INTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
listeners=INTERNAL://192.168.1.254:9092
# 配置刚才的域名opendts
advertised.listeners=INTERNAL://opendts:9092
inter.broker.listener.name=INTERNAL
  1. 启动服务
bin/kafka-server-start.sh config/server.properties

步骤3:设置安全组

在两台ECS所在的安全组规则中设置开放9092端口

步骤4:本地使用Python脚本测试生产消费

1.本地/etc/hosts文件配置域名解析

#这里一定要配置,因为第一次访问kafka返回的地址是opendts,如果不解析到nginx的公网ip180.184.70.*
#数据将无法连接传输
180.184.70.* opendts

2.准备python环境运行如下脚本

from kafka import KafkaProducer
from kafka import KafkaConsumer
def test():
    producer = KafkaProducer(bootstrap_servers=['180.184.70.*:9092'])
    msg='demomessage'
    future = producer.send('my_topic' , msg.encode('utf-8'))
    result = future.get(timeout= 10)
    print(result)




    consumer = KafkaConsumer('my_topic', group_id= 'group2', bootstrap_servers= ['180.184.70.*:9092'])
    for msg1 in consumer:
        print(msg1)

if __name__ == '__main__':
      test()

3.查看本地客户端日志输出,确认已经正常生产和消费消息。

RecordMetadata(topic='my_topic', partition=0, topic_partition=TopicPartition(topic='my_topic', partition=0), offset=3, timestamp=1652182581743, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=11, serialized_header_size=-1)
ConsumerRecord(topic='my_topic', partition=0, offset=3, timestamp=1652182581743, timestamp_type=0, key=None, value=b'demomessage', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=11, serialized_header_size=-1)

如果您有其他问题,欢迎您联系火山引擎技术支持服务