前言 由于我们部门的一些大数据服务是用到 kafka 的,这个时期正值我们对 golang 语言对一个转型阶段,对比了一下开源对 kafka 客户端,决定使用 confluent-kafka-go
, 所以在这里记录一下confluent-kafka-go
的一些内容
说明 confluent-kafka-go
是一个 confluent 官方的 golang 语言库,其依赖于 librdkafka
实现,大多数机器,librakafka 都已经预编译进去 golang 扩展了,不需要额外安装 librdkafka,如果不支持预编译的话,则需要额外安装。
提供一个 dockerfile 的 demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 FROM golang:1.14 .3 -busterENV GO111MODULE=onENV GOPROXY=https://goproxy.io,directENV GOPRIVATE=git.mingchao.comRUN echo \ deb http://mirrors.aliyun.com/debian/ buster main non-free contrib \ deb-src http://mirrors.aliyun.com/debian/ buster main non-free contrib \ deb http://mirrors.aliyun.com/debian-security buster/updates main \ deb-src http://mirrors.aliyun.com/debian-security buster/updates main \ deb http://mirrors.aliyun.com/debian/ buster-updates main non-free contrib \ deb-src http://mirrors.aliyun.com/debian/ buster-updates main non-free contrib \ deb http://mirrors.aliyun.com/debian/ buster-backports main non-free contrib \ deb-src http://mirrors.aliyun.com/debian/ buster-backports main non-free contrib \ > /etc/apt/sources.list && \ apt-get update -y && \ wget -qO - https://packages.confluent.io/deb/5.5/archive.key | apt-key add - > /dev/null && \ sed -i '$a deb [arch=amd64] https://packages.confluent.io/deb/5.5 stable main' /etc/apt/sources.list && \ apt-get install -y librdkafka-dev ARG GIT_USERNAME=gitARG GIT_PASSWORD=git-passwordARG GIT_CREDEN_FILE=/.git-credentialsRUN touch ${GIT_CREDEN_FILE} && \ chown 600 ${GIT_CREDEN_FILE} && \ git config --global credential.helper 'store --file ' ${GIT_CREDEN_FILE} && \ echo https://${GIT_USERNAME} :${GIT_PASSWORD} @git.mingchao.com | tee ${GIT_CREDEN_FILE}
源码 Api 说明 consumer.go
Subscribe(topic string, rebalanceCb RebalanceCb) error 订阅一个 topic,这个 api 会覆盖之前设置过了的 topic 订阅
1 2 3 func (c *Consumer) Subscribe (topic string , rebalanceCb RebalanceCb) error { return c.SubscribeTopics([]string {topic}, rebalanceCb) }
SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
订阅多个 topic
这个 api 会覆盖之前设置过了的 topic 订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (c *Consumer) SubscribeTopics (topics []string , rebalanceCb RebalanceCb) (err error) { ctopics := C.rd_kafka_topic_partition_list_new(C.int (len (topics))) defer C.rd_kafka_topic_partition_list_destroy(ctopics) for _, topic := range topics { ctopic := C.CString(topic) defer C.free(unsafe.Pointer(ctopic)) C.rd_kafka_topic_partition_list_add(ctopics, ctopic, C.RD_KAFKA_PARTITION_UA) } e := C.rd_kafka_subscribe(c.handle.rk, ctopics) if e != C.RD_KAFKA_RESP_ERR_NO_ERROR { return newError(e) } c.rebalanceCb = rebalanceCb c.handle.currAppRebalanceEnable = c.rebalanceCb != nil || c.appRebalanceEnable return nil }
Unsubscribe() (err error)
1 2 3 4 func (c *Consumer) Unsubscribe () (err error) { C.rd_kafka_unsubscribe(c.handle.rk) return nil }
Assign(partitions []TopicPartition) (err error)
分配一组要使用的 partition
这个 api 会覆盖之前分配过的
1 2 3 4 5 6 7 8 9 10 11 12 13 func (c *Consumer) Assign (partitions []TopicPartition) (err error) { c.appReassigned = true cparts := newCPartsFromTopicPartitions(partitions) defer C.rd_kafka_topic_partition_list_destroy(cparts) e := C.rd_kafka_assign(c.handle.rk, cparts) if e != C.RD_KAFKA_RESP_ERR_NO_ERROR { return newError(e) } return nil }
Unassign() (err error)
1 2 3 4 5 6 7 8 9 10 func (c *Consumer) Unassign () (err error) { c.appReassigned = true e := C.rd_kafka_assign(c.handle.rk, nil ) if e != C.RD_KAFKA_RESP_ERR_NO_ERROR { return newError(e) } return nil }
Commit() ([]TopicPartition, error)
提交当前已经分配的 partition 的 offset 值
基于 StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)
这是一个阻塞请求,如果需要异步操作,需要调用者自行用协程
返回成功提交 offset 的 topicPartition
1 2 3 func (c *Consumer) Commit () ([]TopicPartition, error) { return c.commit(nil ) }
CommitMessage(m *Message) ([]TopicPartition, error)
这个 API 基于 message 结构体
这是一个阻塞请求,如果需要异步操作,需要调用者自行用协程
返回成功提交 offset 的 topicPartition
1 2 3 4 5 6 7 8 func (c *Consumer) CommitMessage (m *Message) ([]TopicPartition, error) { if m.TopicPartition.Error != nil { return nil , newErrorFromString(ErrInvalidArg, "Can't commit errored message" ) } offsets := []TopicPartition{m.TopicPartition} offsets[0 ].Offset++ return c.commit(offsets) }
CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
根据 []TopicPartition 来提交 offset
这是一个阻塞请求,如果需要异步操作,需要调用者自行用协程
返回成功提交 offset 的 topicPartition
1 2 3 func (c *Consumer) CommitOffsets (offsets []TopicPartition) ([]TopicPartition, error) { return c.commit(offsets) }
StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)
根据 []TopicPartition 来记录将会被提交的 offset(如果允许自动提交的话,那么会受auto.commit.interval.ms
的影响,一定周期性提交,如果是手动提交的话则依赖 Commit()
Api)
返回成功存储的 offsets,如果至少有一个偏移量无法存储,则返回一个错误和偏移量列表。每个偏移量都可以通过它的来检查特定的错误
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (c *Consumer) StoreOffsets (offsets []TopicPartition) (storedOffsets []TopicPartition, err error) { coffsets := newCPartsFromTopicPartitions(offsets) defer C.rd_kafka_topic_partition_list_destroy(coffsets) cErr := C.rd_kafka_offsets_store(c.handle.rk, coffsets) storedOffsets = newTopicPartitionsFromCparts(coffsets) if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { return storedOffsets, newError(cErr) } return storedOffsets, nil }
Seek(partition TopicPartition, timeoutMs int) error
获取指定 partition 的 offset
如果timeoutMs
不是 0,则调用将等待这么长时间以执行查找。如果超时到达,内部状态将未知,并且此函数返回 ErrTimedOut。
如果timeoutMs
为 0,它将发起查找,但立即返回,不报告任何错误(例如,异步)。
Seek()只能用于已经使用的分区(通过 Assign()或隐式使用通过自平衡订阅())。
要设置起始偏移量,最好使用 Assign()并为每个分区提供一个起始偏移量。
1 2 3 4 5 6 7 8 9 10 11 func (c *Consumer) Seek (partition TopicPartition, timeoutMs int ) error { rkt := c.handle.getRkt(*partition.Topic) cErr := C.rd_kafka_seek(rkt, C.int32_t(partition.Partition), C.int64_t(partition.Offset), C.int (timeoutMs)) if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { return newError(cErr) } return nil }
Poll(timeoutMs int) (event Event)
轮询消息或事件。
将阻塞最多 timeoutMs
的超时时间
以下回调可能会被触发
Subscribe()’s rebalanceCb
如果超时则返回 nil,否则返回一个事件
1 2 3 4 func (c *Consumer) Poll (timeoutMs int ) (event Event) { ev, _ := c.handle.eventPoll(nil , timeoutMs, 1 , nil ) return ev }
ReadMessage(timeout time.Duration) (*Message, error)
返回一条消息
这是一个方便的 API,它封装了 Poll(),只返回消息或错误。所有其他事件类型都被丢弃。
该调用最多会阻塞 timeout
等待新消息或错误。timeout
可以设置为-1,表示无限期等待。
超时将会返回(nil, err)
当 err 是 kafka.(Error).Code == Kafka.ErrTimedOut
消息将会返回 (msg, nil)
, 当有错误当时候将会返回 (nil, err)
, 当指定 partition 错误的时候(topic,partition,offset),将会返回 (msg,err)
全部其他的事件类型,像PartitionEOF
,AssingedPartitions
等等将会被默认丢弃
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func (c *Consumer) ReadMessage (timeout time.Duration) (*Message, error) { var absTimeout time.Time var timeoutMs int if timeout > 0 { absTimeout = time.Now().Add(timeout) timeoutMs = (int )(timeout.Seconds() * 1000.0 ) } else { timeoutMs = (int )(timeout) } for { ev := c.Poll(timeoutMs) switch e := ev.(type ) { case *Message: if e.TopicPartition.Error != nil { return e, e.TopicPartition.Error } return e, nil case Error: return nil , e default : } if timeout > 0 { timeoutMs = int (math.Max(0.0 , absTimeout.Sub(time.Now()).Seconds()*1000.0 )) } if timeoutMs == 0 && ev == nil { return nil , newError(C.RD_KAFKA_RESP_ERR__TIMED_OUT) } } }
Close() (err error)
关闭一个 Consumer 对象
调用后,对象不再可用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (c *Consumer) Close () (err error) { close (c.readerTermChan) c.handle.waitGroup.Wait() if c.eventsChanEnable { close (c.events) } C.rd_kafka_queue_destroy(c.handle.rkq) c.handle.rkq = nil e := C.rd_kafka_consumer_close(c.handle.rk) if e != C.RD_KAFKA_RESP_ERR_NO_ERROR { return newError(e) } c.handle.cleanup() C.rd_kafka_destroy(c.handle.rk) return nil }
用于查询集群中 broker 和 topic 的元数据
如果 topic
参数不为 nil,则返回和 topoic 相关数据,否则(如果allTopics
参数为 false,那么将会返回当前使用 topic 的元数据,如果 allTopics
参数为 true, 那么将返回 broker 中所有 topic 的元数据)
GetMetadata 相当于 Java API 中的 listTopics、describeTopics 和 describeCluster。
1 2 3 func (c *Consumer) GetMetadata (topic *string , allTopics bool , timeoutMs int ) (*Metadata, error) { return getMetadata(c, topic, allTopics, timeoutMs) }
QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
根据 topic 和 partition,查询当前 broker 中他们的低水位和高水位的 offset
1 2 3 func (c *Consumer) QueryWatermarkOffsets (topic string , partition int32 , timeoutMs int ) (low, high int64 , err error) { return queryWatermarkOffsets(c, topic, partition, timeoutMs) }
GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
根据 topic 和 partition 返回当前服务存储的低水位和高水位的 offset
每个 fetch 响应或通过调用 QueryWatermarkOffsets
填充高水位的 offset
如果设置了 statistics.interval.ms
, 低水位将会有一个 statistics.interval.ms
的周期来更新
1 2 3 func (c *Consumer) GetWatermarkOffsets (topic string , partition int32 ) (low, high int64 , err error) { return getWatermarkOffsets(c, topic, partition) }
OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
每个分区返回的偏移量是最早的偏移量,其时间戳大于或等于相应分区中的给定时间戳。如果提供的时间戳超过分区中最后一条消息的时间戳,则返回-1 值。
1 2 3 func (c *Consumer) OffsetsForTimes (times []TopicPartition, timeoutMs int ) (offsets []TopicPartition, err error) { return offsetsForTimes(c, times, timeoutMs) }
Subscription() (topics []string, err error)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (c *Consumer) Subscription () (topics []string , err error) { var cTopics *C.rd_kafka_topic_partition_list_t cErr := C.rd_kafka_subscription(c.handle.rk, &cTopics) if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { return nil , newError(cErr) } defer C.rd_kafka_topic_partition_list_destroy(cTopics) topicCnt := int (cTopics.cnt) topics = make ([]string , topicCnt) for i := 0 ; i < topicCnt; i++ { crktpar := C._c_rdkafka_topic_partition_list_entry(cTopics, C.int (i)) topics[i] = C.GoString(crktpar.topic) } return topics, nil }
Assignment() (partitions []TopicPartition, err error)
1 2 3 4 5 6 7 8 9 10 11 12 13 func (c *Consumer) Assignment () (partitions []TopicPartition, err error) { var cParts *C.rd_kafka_topic_partition_list_t cErr := C.rd_kafka_assignment(c.handle.rk, &cParts) if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { return nil , newError(cErr) } defer C.rd_kafka_topic_partition_list_destroy(cParts) partitions = newTopicPartitionsFromCparts(cParts) return partitions, nil }
Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
1 2 3 4 5 6 7 8 9 10 func (c *Consumer) Committed (partitions []TopicPartition, timeoutMs int ) (offsets []TopicPartition, err error) { cparts := newCPartsFromTopicPartitions(partitions) defer C.rd_kafka_topic_partition_list_destroy(cparts) cerr := C.rd_kafka_committed(c.handle.rk, cparts, C.int (timeoutMs)) if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR { return nil , newError(cerr) } return newTopicPartitionsFromCparts(cparts), nil }
Position(partitions []TopicPartition) (offsets []TopicPartition, err error)
根据 partition 返回其 offset
典型的用法是调用 assign()来获取分区列表,然后将其传递给 Position()来获取每个分区的当前 offset
消费的位置是分区读取的下一个消息,例如(最后一条信息的+1)
1 2 3 4 5 6 7 8 9 10 func (c *Consumer) Position (partitions []TopicPartition) (offsets []TopicPartition, err error) { cparts := newCPartsFromTopicPartitions(partitions) defer C.rd_kafka_topic_partition_list_destroy(cparts) cerr := C.rd_kafka_position(c.handle.rk, cparts) if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR { return nil , newError(cerr) } return newTopicPartitionsFromCparts(cparts), nil }
Pause(partitions []TopicPartition) (err error)
根据提供的 partition 暂停消费
如果设置了go.events.channel.enable
,只会受到go.events.channel.size
的影响,这个 API 将不会生效
1 2 3 4 5 6 7 8 9 func (c *Consumer) Pause (partitions []TopicPartition) (err error) { cparts := newCPartsFromTopicPartitions(partitions) defer C.rd_kafka_topic_partition_list_destroy(cparts) cerr := C.rd_kafka_pause_partitions(c.handle.rk, cparts) if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR { return newError(cerr) } return nil }
Resume(partitions []TopicPartition) (err error)
1 2 3 4 5 6 7 8 9 func (c *Consumer) Resume (partitions []TopicPartition) (err error) { cparts := newCPartsFromTopicPartitions(partitions) defer C.rd_kafka_topic_partition_list_destroy(cparts) cerr := C.rd_kafka_resume_partitions(c.handle.rk, cparts) if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR { return newError(cerr) } return nil }
返回当前消费者组的元数据
这个返回的对象,应该传递给事务生产者的 SendOffsetsToTransaction() API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (c *Consumer) GetConsumerGroupMetadata () (*ConsumerGroupMetadata, error) { cgmd := C.rd_kafka_consumer_group_metadata(c.handle.rk) if cgmd == nil { return nil , NewError(ErrState, "Consumer group metadata not available" , false ) } defer C.rd_kafka_consumer_group_metadata_destroy(cgmd) serialized, err := serializeConsumerGroupMetadata(cgmd) if err != nil { return nil , err } return &ConsumerGroupMetadata{serialized}, nil }