xSky 实验室关注高性能计算,分布式系统/存储,大数据/机器学习/WebRTC
目录
  • 首页
  • 技术相关
  • 原创作品
  • 人工智能/机器学习
  • 系统与架构
  • 数据库/数据分析
  • 分布式系统/存储
  • 服务端开发
  • WEBRTC研究
  • 开发调试
  • 网络与安全
  • 常用工具
  • 杂七杂八

rd_kafka 的常见发布和订阅配置选项的整理

2021-05-29 16:25:25

rd_kafka 的常见发布和订阅配置选项的整理。这些选项可以用于设置 Kafka 生产者(发布者)和消费者(订阅者)的行为和属性。

发布者配置选项:

  1. bootstrap.servers:Kafka 集群的地址列表,用于引导连接。
    示例代码:

    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr));
    
  2. message.send.max.retries:消息发送的最大重试次数。
    示例代码:

    rd_kafka_conf_set(conf, "message.send.max.retries", "3", errstr, sizeof(errstr));
    
  3. request.required.acks:生产者需要接收到的分区写入确认数。
    示例代码:

    rd_kafka_conf_set(conf, "request.required.acks", "-1", errstr, sizeof(errstr));
    
  4. compression.codec:消息压缩编解码算法。
    示例代码:

    rd_kafka_conf_set(conf, "compression.codec", "gzip", errstr, sizeof(errstr));
    
  5. message.timeout.ms:消息发送超时时间(毫秒)。
    示例代码:

    rd_kafka_conf_set(conf, "message.timeout.ms", "5000", errstr, sizeof(errstr));
    
  6. partitioner:生产者使用的分区器。
    示例代码:

    rd_kafka_conf_set(conf, "partitioner", "random", errstr, sizeof(errstr));
  7. max.in.flight.requests.per.connection:单个连接上允许的未完成请求的最大数量。
    示例代码:

    rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "5", errstr, sizeof(errstr));
  8. max.poll.interval.ms:消费者在轮询期间可空闲的最大时间(毫秒)。
    示例代码:

    rd_kafka_conf_set(conf, "max.poll.interval.ms", "300000", errstr, sizeof(errstr));
  9. message.max.bytes:单个消息的最大字节数。
    示例代码:

    rd_kafka_conf_set(conf, "message.max.bytes", "1000000", errstr, sizeof(errstr));
  10. socket.send.buffer.bytes:发送套接字的缓冲区大小(字节)。
    示例代码:

    rd_kafka_conf_set(conf, "socket.send.buffer.bytes", "65536", errstr, sizeof(errstr));
    
  11. socket.receive.buffer.bytes:接收套接字的缓冲区大小(字节)。
    示例代码:

    rd_kafka_conf_set(conf, "socket.receive.buffer.bytes", "65536", errstr, sizeof(errstr));
    
  12. enable.idempotence:启用幂等性发送,确保消息的顺序性和唯一性。
    示例代码:

    rd_kafka_conf_set(conf, "enable.idempotence", "true", errstr, sizeof(errstr));
    
  13. retries:消息发送的总重试次数,包括网络错误和可重试的应用程序错误。
    示例代码:

    rd_kafka_conf_set(conf, "retries", "5", errstr, sizeof(errstr));


    14 在 librdkafka 中,设置自定义分区器回调函数的方式是通过 rd_kafka_conf_set_partitioner_cb 函数来实现。以下是一个正确的示例代码:

    int32_t my_partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) {

        // 自定义分区逻辑
        // 返回一个分区号(0 到 partition_cnt-1)
    }
    
    // 创建 Kafka 配置对象
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    // 设置自定义分区器回调函数
    rd_kafka_conf_set_partitioner_cb(conf, my_partitioner_cb);

订阅者配置选项:

  1. bootstrap.servers:Kafka 集群的地址列表,用于引导连接。
    示例代码:

    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr));
  2. group.id:消费者所属的消费者组的唯一标识符。
    示例代码:

    rd_kafka_conf_set(conf, "group.id", "my-consumer-group", errstr, sizeof(errstr));
  3. auto.offset.reset:当消费者在启动时没有有效的偏移量时,对应的消费位置。
    示例代码:

    rd_kafka_conf_set(conf, "auto.offset.reset","earliest", errstr, sizeof(errstr));
  4. enable.auto.commit:启用自动提交消费位移。
    示例代码:

    rd_kafka_conf_set(conf, "enable.auto.commit", "true", errstr, sizeof(errstr));
  5. auto.commit.interval.ms:自动提交消费位移的间隔时间(毫秒)。
    示例代码:

    rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", errstr, sizeof(errstr));
  6. max.poll.records:每次轮询从单个分区中返回的最大记录数。
    示例代码:

    rd_kafka_conf_set(conf, "max.poll.records", "100", errstr, sizeof(errstr));
  7. fetch.wait.max.ms:在没有可用消息时,消费者等待获取新消息的最大时间(毫秒)。
    示例代码:

    rd_kafka_conf_set(conf, "fetch.wait.max.ms", "100", errstr, sizeof(errstr));
  8. session.timeout.ms:消费者组中消费者被认为失效之前的超时时间(毫秒)。
    示例代码:

    rd_kafka_conf_set(conf, "session.timeout.ms", "6000", errstr, sizeof(errstr));
  9. heartbeat.interval.ms:心跳间隔时间(毫秒),用于检测消费者组中的消费者是否存活。
    示例代码:

    rd_kafka_conf_set(conf, "heartbeat.interval.ms", "2000", errstr, sizeof(errstr));
  10. fetch.max.bytes:单次拉取请求从代理返回的最大数据量(字节)。
    示例代码:

    rd_kafka_conf_set(conf, "fetch.max.bytes", "1048576", errstr, sizeof(errstr));
    
  11. queued.min.messages:当消息排队的消息数低于此阈值时,轮询将阻塞(等待更多消息)。
    示例代码:

    rd_kafka_conf_set(conf, "queued.min.messages", "1000", errstr, sizeof(errstr));
    
  12. queued.max.messages.kbytes:待处理消息队列的最大总大小(千字节)。
    示例代码:

    rd_kafka_conf_set(conf, "queued.max.messages.kbytes", "10240", errstr, sizeof(errstr));
    

这些是一些常见的 rd_kafka 发布和订阅配置选项。

https://blog.csdn.net/dwjlyl/article/details/127432628

By:xSky | 技术相关 |

  • 分类目录

    • 技术相关 (35)
    • 原创作品 (13)
    • 人工智能/机器学习 (6)
    • 系统与架构 (9)
    • 数据库/数据分析 (11)
    • 分布式系统/存储 (4)
    • 服务端开发 (7)
    • WEBRTC研究 (7)
    • 开发调试 (7)
    • 网络与安全 (9)
    • 常用工具 (9)
    • 杂七杂八 (6)
  • 最新文章

    • 嵌入式分析型数据库(DuckDB)
    • WSL从C盘迁移到其他盘区
    • 赵何娟:中国AI追随之路的五大误区,我们至少落后十年
    • zap  发送日志到 websocket
    • QUIC(隐藏的)超能力
    • MYSQL 生成日期/时间序列总结
    • Linux bash终端设置代理(proxy)访问
    • centos 下 yum安装python3
    • 使用SQL查询Milvus 向量数据库
    • 浅谈 MySQL 新的身份验证插件 caching_sha2_password
    • Milvus v2.2.1 开源向量搜索引擎使用教程
    • 部署了一个SRS的demo
    • Dockerfile 详解
    • Docker常用命令
    • Tus文件上传协议
    • 编译运行Milvus
    • MinIO 快速入门
    • ESP32
    • Prometheus监控报警系统搭建
    • go语言JSON字典模拟
    • go语言的sql解析器
    • Grafana配置数据源,自定义查询语法
    • TDengine + Telegraf + Grafana
    • gRPC-Gateway 返回JSON数据int64类型被转为string类型问题
    • LLAMA模型试玩
    • 语音识别的一些开源项目整理
    • 使用MYSQL8进行统计分析
    • 记录FFmpeg抽帧、合流、转码、加水印等操作
    • 移动网络弱网处理研究
    • 翻译:使用 Semgrep 进行热点代码评审
  • 链接

    • xSky的Blog
    • 我的Github
    • 实时监控图表
    • 预印本
    • xRedis 在线文档
    • xSkyProxy
    • xChart 数据在线测试
    • 我的电子书
    • xChart 数据可视化系统
    • 树莓派技术圈
    • WebRTC开发者社区
  • 开源项目

    • xReis C++的redis客户端库
    • xBlog-C++ 博客程序
    • xSkyProxy-新型MySQL代理网关
    • 数据可视化平台- xChart
    • xhttpcache 高速数据缓存服务
    • xMonitor-图形监测工具
    • 网址收集

Powered By xBlog

Copyright 2010~2024 0xsky.com All Rights Reserved.