123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604 |
- package tcp
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "io"
- "strconv"
- "time"
- "unicode"
- "go-common/app/infra/databus/conf"
- "go-common/library/conf/env"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "github.com/Shopify/sarama"
- cluster "github.com/bsm/sarama-cluster"
- pb "github.com/gogo/protobuf/proto"
- )
- func stringify(b []byte) []byte {
- return bytes.Map(
- func(r rune) rune {
- if unicode.IsSymbol(r) || unicode.IsControl(r) {
- return rune('-')
- }
- return r
- },
- b,
- )
- }
- type proto struct {
- prefix byte
- integer int
- message string
- }
- // psCommon is pub sub common
- type psCommon struct {
- c *conn
- err error
- closed bool
- // kafka
- group string
- topic string
- cluster string
- addr string
- color []byte
- }
- func newPsCommon(c *conn, group, topic, color, cluster string) (ps *psCommon) {
- ps = &psCommon{
- c: c,
- group: group,
- topic: topic,
- cluster: cluster,
- color: []byte(color),
- }
- if c != nil {
- ps.addr = c.conn.RemoteAddr().String()
- }
- return
- }
- func (ps *psCommon) write(protos ...proto) (err error) {
- for _, p := range protos {
- if err = ps.c.Write(p); err != nil {
- return
- }
- }
- err = ps.c.Flush()
- return
- }
- func (ps *psCommon) batchWrite(protos []proto) (err error) {
- if err = ps.c.Write(proto{prefix: _protoArray, integer: len(protos)}); err != nil {
- return
- }
- for _, p := range protos {
- if err = ps.c.Write(p); err != nil {
- return
- }
- // FIXME(felix): 因为ops-log性能问题先屏蔽了
- if env.DeployEnv != env.DeployEnvProd {
- log.Info("batchWrite group(%s) topic(%s) cluster(%s) color(%s) addr(%s) consumer(%s) ok", ps.group, ps.topic, ps.cluster, ps.color, ps.addr, stringify([]byte(p.message)))
- }
- }
- err = ps.c.Flush()
- return
- }
- func (ps *psCommon) pong() (err error) {
- if err = ps.write(proto{prefix: _protoStr, message: _pong}); err != nil {
- return
- }
- log.Info("pong group(%s) topic(%s) cluster(%s) color(%s) addr(%s) ping success", ps.group, ps.topic, ps.cluster, ps.color, ps.addr)
- return
- }
- func (ps *psCommon) Closed() bool {
- return ps.closed
- }
- // Close 跟 redis 协议耦合的太紧,加个 sendRedisErr 开关
- func (ps *psCommon) Close(sendRedisErr bool) {
- if ps.closed {
- return
- }
- if ps.err == nil {
- ps.err = errConnClosedByServer // when closed by self, send close event to client.
- }
- // write error
- if ps.err != errConnRead && ps.err != errConnClosedByClient && sendRedisErr {
- ps.write(proto{prefix: _protoErr, message: ps.err.Error()})
- }
- if ps.c != nil {
- ps.c.Close()
- }
- ps.closed = true
- }
- func (ps *psCommon) fatal(err error) {
- if err == nil || ps.closed {
- return
- }
- ps.err = err
- ps.Close(true)
- }
- // Pub databus producer
- type Pub struct {
- *psCommon
- // producer
- producer sarama.SyncProducer
- }
- // NewPub new databus producer
- // http 接口复用此方法,c 传 nil
- func NewPub(c *conn, group, topic, color string, pCfg *conf.Kafka) (p *Pub, err error) {
- producer, err := newProducer(group, topic, pCfg)
- if err != nil {
- log.Error("group(%s) topic(%s) cluster(%s) NewPub producer error(%v)", group, topic, pCfg.Cluster, err)
- return
- }
- p = &Pub{
- psCommon: newPsCommon(c, group, topic, color, pCfg.Cluster),
- producer: producer,
- }
- // http 协议的连接不作处理
- if c != nil {
- // set producer read connection timeout
- p.c.readTimeout = _pubReadTimeout
- }
- log.Info("NewPub() success group(%s) topic(%s) color(%s) cluster(%s) addr(%s)", group, topic, color, pCfg.Cluster, p.addr)
- return
- }
- // Serve databus producer goroutine
- func (p *Pub) Serve() {
- var (
- err error
- cmd string
- args [][]byte
- )
- for {
- if cmd, args, err = p.c.Read(); err != nil {
- if err != io.EOF {
- log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) read error(%v)", p.group, p.topic, p.cluster, p.color, p.addr, err)
- }
- p.fatal(errConnRead)
- return
- }
- if p.Closed() {
- log.Warn("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) p.Closed()", p.group, p.topic, p.cluster, p.color, p.addr)
- return
- }
- select {
- case <-quit:
- p.fatal(errConnClosedByServer)
- return
- default:
- }
- switch cmd {
- case _auth:
- err = p.write(proto{prefix: _protoStr, message: _ok})
- case _ping:
- err = p.pong()
- case _set:
- if len(args) != 2 {
- p.write(proto{prefix: _protoErr, message: errPubParams.Error()})
- continue
- }
- err = p.publish(args[0], nil, args[1])
- case _hset:
- if len(args) != 3 {
- p.write(proto{prefix: _protoErr, message: errPubParams.Error()})
- continue
- }
- err = p.publish(args[0], args[1], args[2])
- case _quit:
- err = errConnClosedByClient
- default:
- err = errCmdNotSupport
- }
- if err != nil {
- p.fatal(err)
- log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) serve error(%v)", p.group, p.topic, p.cluster, p.color, p.addr, p.err)
- return
- }
- }
- }
- func (p *Pub) publish(key, header, value []byte) (err error) {
- if _, _, err = p.Publish(key, header, value); err != nil {
- return
- }
- return p.write(proto{prefix: _protoStr, message: _ok})
- }
- // Publish 发送消息 redis 和 http 协议共用
- func (p *Pub) Publish(key, header, value []byte) (partition int32, offset int64, err error) {
- var message = &sarama.ProducerMessage{
- Topic: p.topic,
- Key: sarama.ByteEncoder(key),
- Value: sarama.ByteEncoder(value),
- Headers: []sarama.RecordHeader{
- {Key: _headerColor, Value: p.color},
- {Key: _headerMetadata, Value: header},
- },
- }
- now := time.Now()
- // TODO(felix): support RecordHeader
- if partition, offset, err = p.producer.SendMessage(message); err != nil {
- log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) publish(%v) error(%v)", p.group, p.topic, p.cluster, p.color, p.addr, message, err)
- return
- }
- if svc != nil {
- svc.TimeProm.Timing(p.group, int64(time.Since(now)/time.Millisecond))
- svc.CountProm.Incr(_opProducerMsgSpeed, p.group, p.topic)
- }
- // FIXME(felix): 因为ops-log性能问题先屏蔽了
- if env.DeployEnv != env.DeployEnvProd {
- log.Info("publish group(%s) topic(%s) cluster(%s) color(%s) addr(%s) key(%s) header(%s) value(%s) ok", p.group, p.topic, p.cluster, p.color, p.addr, key, stringify(header), stringify(value))
- }
- return
- }
- // Sub databus consumer
- type Sub struct {
- *psCommon
- // kafka consumer
- consumer *cluster.Consumer
- waitClosing bool
- batch int
- // ticker
- ticker *time.Ticker
- }
- // NewSub new databus consumer
- func NewSub(c *conn, group, topic, color string, sCfg *conf.Kafka, batch int64) (s *Sub, err error) {
- select {
- case <-consumerLimter:
- default:
- }
- // NOTE color 用于染色消费消息过虑
- if color != "" {
- group = fmt.Sprintf("%s-%s", group, color)
- }
- if err = validate(group, topic, sCfg.Brokers); err != nil {
- return
- }
- s = &Sub{
- psCommon: newPsCommon(c, group, topic, color, sCfg.Cluster),
- ticker: time.NewTicker(_batchInterval),
- }
- if batch == 0 {
- s.batch = _batchNum
- } else {
- s.batch = int(batch)
- }
- // set consumer read connection timeout
- s.c.readTimeout = _subReadTimeout
- // cluster config
- cfg := cluster.NewConfig()
- cfg.Version = sarama.V1_0_0_0
- cfg.ClientID = fmt.Sprintf("%s-%s", group, topic)
- cfg.Net.KeepAlive = 30 * time.Second
- // NOTE cluster auto commit offset interval
- cfg.Consumer.Offsets.CommitInterval = time.Second * 1
- // NOTE set fetch.wait.max.ms
- cfg.Consumer.MaxWaitTime = time.Millisecond * 250
- cfg.Consumer.MaxProcessingTime = 50 * time.Millisecond
- // NOTE errors that occur during offset management,if enabled, c.Errors channel must be read
- cfg.Consumer.Return.Errors = true
- // NOTE notifications that occur during consumer, if enabled, c.Notifications channel must be read
- cfg.Group.Return.Notifications = true
- // The initial offset to use if no offset was previously committed.
- // default: OffsetOldest
- cfg.Consumer.Offsets.Initial = sarama.OffsetNewest
- if s.consumer, err = cluster.NewConsumer(sCfg.Brokers, group, []string{topic}, cfg); err != nil {
- log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) cluster.NewConsumer() error(%v)", s.group, s.topic, s.cluster, s.color, s.addr, err)
- } else {
- log.Info("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) cluster.NewConsumer() ok", s.group, s.topic, s.cluster, s.color, s.addr)
- }
- return
- }
- func validate(group, topic string, brokers []string) (err error) {
- var (
- cli *cluster.Client
- c *cluster.Config
- broker *sarama.Broker
- gresp *sarama.DescribeGroupsResponse
- memberAssignment *sarama.ConsumerGroupMemberAssignment
- consumerNum int
- partitions []int32
- )
- c = cluster.NewConfig()
- c.Version = sarama.V0_10_0_1
- if cli, err = cluster.NewClient(brokers, c); err != nil {
- log.Error("group(%s) topic(%s) cluster.NewClient() error(%v)", group, topic, err)
- err = errKafKaData
- return
- }
- defer cli.Close()
- if partitions, err = cli.Partitions(topic); err != nil {
- log.Error("group(%s) topic(%s) cli.Partitions error(%v)", group, topic, err)
- err = errKafKaData
- return
- }
- if len(partitions) <= 0 {
- err = errKafKaData
- return
- }
- if err = cli.RefreshCoordinator(group); err != nil {
- log.Error("group(%s) topic(%s) cli.RefreshCoordinator error(%v)", group, topic, err)
- err = errKafKaData
- return
- }
- if broker, err = cli.Coordinator(group); err != nil {
- log.Error("group(%s) topic(%s) cli.Coordinator error(%v)", group, topic, err)
- err = errKafKaData
- return
- }
- defer broker.Close()
- if gresp, err = broker.DescribeGroups(&sarama.DescribeGroupsRequest{
- Groups: []string{group},
- }); err != nil {
- log.Error("group(%s) topic(%s) cli.DescribeGroups error(%v)", group, topic, err)
- err = errKafKaData
- return
- }
- if len(gresp.Groups) != 1 {
- err = errKafKaData
- return
- }
- for _, member := range gresp.Groups[0].Members {
- if memberAssignment, err = member.GetMemberAssignment(); err != nil {
- log.Error("group(%s) topic(%s) member.GetMemberAssignment error(%v)", group, topic, err)
- err = errKafKaData
- return
- }
- for mtopic := range memberAssignment.Topics {
- if mtopic == topic {
- consumerNum++
- break
- }
- }
- }
- if consumerNum >= len(partitions) {
- err = errUseLessConsumer
- return
- }
- return nil
- }
- // Serve databus consumer goroutine
- func (s *Sub) Serve() {
- var (
- err error
- cmd string
- args [][]byte
- )
- defer func() {
- svc.CountProm.Decr(_opCurrentConsumer, s.group, s.topic)
- }()
- log.Info("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) begin serve", s.group, s.topic, s.cluster, s.color, s.addr)
- for {
- if cmd, args, err = s.c.Read(); err != nil {
- if err != io.EOF {
- log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) read error(%v)", s.group, s.topic, s.cluster, s.color, s.addr, err)
- }
- s.fatal(errConnRead)
- return
- }
- if s.consumer == nil {
- log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) s.consumer is nil", s.group, s.topic, s.cluster, s.color, s.addr)
- s.fatal(errConsumerClosed)
- return
- }
- if s.Closed() {
- log.Warn("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) s.Closed()", s.group, s.topic, s.cluster, s.color, s.addr)
- return
- }
- switch cmd {
- case _auth:
- err = s.write(proto{prefix: _protoStr, message: _ok})
- case _ping:
- err = s.pong()
- case _mget:
- var enc []byte
- if len(args) > 0 {
- enc = args[0]
- }
- err = s.message(enc)
- case _set:
- err = s.commit(args)
- case _quit:
- err = errConnClosedByClient
- default:
- err = errCmdNotSupport
- }
- if err != nil {
- s.fatal(err)
- log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) serve error(%v)", s.group, s.topic, s.cluster, s.color, s.addr, err)
- return
- }
- }
- }
- func (s *Sub) message(enc []byte) (err error) {
- var (
- msg *sarama.ConsumerMessage
- notify *cluster.Notification
- protos []proto
- ok bool
- bs []byte
- last = time.Now()
- ret = &databus.MessagePB{}
- p = proto{prefix: _protoBulk}
- )
- for {
- select {
- case err = <-s.consumer.Errors():
- log.Error("group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", s.group, s.topic, s.cluster, s.addr, err)
- return
- case notify, ok = <-s.consumer.Notifications():
- if !ok {
- log.Info("notification notOk group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", s.group, s.topic, s.cluster, s.addr, err)
- err = errClosedNotifyChannel
- return
- }
- switch notify.Type {
- case cluster.UnknownNotification, cluster.RebalanceError:
- log.Error("notification(%s) group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", notify.Type, s.group, s.topic, s.cluster, s.addr, err)
- err = errClosedNotifyChannel
- return
- case cluster.RebalanceStart:
- log.Info("notification(%s) group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", notify.Type, s.group, s.topic, s.cluster, s.addr, err)
- continue
- case cluster.RebalanceOK:
- log.Info("notification(%s) group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", notify.Type, s.group, s.topic, s.cluster, s.addr, err)
- }
- if len(notify.Current[s.topic]) == 0 {
- log.Warn("notification(%s) no topic group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", notify.Type, s.group, s.topic, s.cluster, s.addr, err)
- err = errConsumerOver
- return
- }
- case msg, ok = <-s.consumer.Messages():
- if !ok {
- log.Error("group(%s) topic(%s) cluster(%s) addr(%s) message channel closed", s.group, s.topic, s.cluster, s.addr)
- err = errClosedMsgChannel
- return
- }
- // reset timestamp
- last = time.Now()
- ret.Key = string(msg.Key)
- ret.Value = msg.Value
- ret.Topic = s.topic
- ret.Partition = msg.Partition
- ret.Offset = msg.Offset
- ret.Timestamp = msg.Timestamp.Unix()
- if len(msg.Headers) > 0 {
- var notMatchColor bool
- for _, h := range msg.Headers {
- if bytes.Equal(h.Key, _headerColor) && !bytes.Equal(h.Value, s.color) {
- // match color
- notMatchColor = true
- } else if bytes.Equal(h.Key, _headerMetadata) && h.Value != nil {
- // parse metadata
- dh := new(databus.Header)
- if err = pb.Unmarshal(h.Value, dh); err != nil {
- log.Error("pb.Unmarshal(%s) error(%v)", h.Value, err)
- err = nil
- } else {
- ret.Metadata = dh.Metadata
- }
- }
- }
- if notMatchColor {
- continue
- }
- }
- if bytes.Equal(enc, _encodePB) {
- // encode to pb bytes
- if bs, err = pb.Marshal(ret); err != nil {
- log.Error("proto.Marshal(%v) error(%v)", ret, err)
- s.consumer.MarkPartitionOffset(s.topic, msg.Partition, msg.Offset, "")
- return s.write(proto{prefix: _protoErr, message: errMsgFormat.Error()})
- }
- } else {
- // encode to json bytes
- if bs, err = json.Marshal(ret); err != nil {
- log.Error("json.Marshal(%v) error(%v)", ret, err)
- s.consumer.MarkPartitionOffset(s.topic, msg.Partition, msg.Offset, "")
- return s.write(proto{prefix: _protoErr, message: errMsgFormat.Error()})
- }
- }
- svc.StatProm.State(_opPartitionOffset, msg.Offset, s.group, s.topic, strconv.Itoa(int(msg.Partition)))
- svc.CountProm.Incr(_opConsumerMsgSpeed, s.group, s.topic)
- svc.StatProm.Incr(_opConsumerPartition, s.group, s.topic, strconv.Itoa(int(msg.Partition)))
- p.message = string(bs)
- protos = append(protos, p)
- if len(protos) >= s.batch {
- return s.batchWrite(protos)
- }
- case <-s.ticker.C:
- if len(protos) != 0 {
- return s.batchWrite(protos)
- }
- if time.Since(last) < _batchTimeout {
- continue
- }
- if s.waitClosing {
- log.Info("consumer group(%s) topic(%s) cluster(%s) addr(%s) wait closing then exit,maybe cluster changed", s.group, s.topic, s.cluster, s.addr)
- err = errConsumerTimeout
- return
- }
- return s.batchWrite(protos)
- }
- }
- }
- func (s *Sub) commit(args [][]byte) (err error) {
- var (
- partition, offset int64
- )
- if len(args) != 2 {
- log.Error("group(%v) topic(%v) cluster(%s) addr(%s) commit offset error, args(%v) is illegal", s.group, s.topic, s.cluster, s.addr, args)
- // write error
- return s.write(proto{prefix: _protoErr, message: errCommitParams.Error()})
- }
- if partition, err = strconv.ParseInt(string(args[0]), 10, 32); err != nil {
- return s.write(proto{prefix: _protoErr, message: errCommitParams.Error()})
- }
- if offset, err = strconv.ParseInt(string(args[1]), 10, 64); err != nil {
- return s.write(proto{prefix: _protoErr, message: errCommitParams.Error()})
- }
- // mark partition offset
- s.consumer.MarkPartitionOffset(s.topic, int32(partition), offset, "")
- // FIXME(felix): 因为ops-log性能问题先屏蔽了
- if env.DeployEnv != env.DeployEnvProd {
- log.Info("commit group(%s) topic(%s) cluster(%s) color(%s) addr(%s) partition(%d) offset(%d) mark offset succeed", s.group, s.topic, s.cluster, s.color, s.addr, partition, offset)
- }
- return s.write(proto{prefix: _protoStr, message: _ok})
- }
- // Closed judge if consumer is closed
- func (s *Sub) Closed() bool {
- return s.psCommon != nil && s.psCommon.Closed()
- }
- // Close close consumer
- func (s *Sub) Close() {
- if !s.psCommon.Closed() {
- s.psCommon.Close(true)
- }
- if s.consumer != nil {
- s.consumer.Close()
- s.consumer = nil
- }
- if s.ticker != nil {
- s.ticker.Stop()
- }
- log.Info("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) consumer exit", s.group, s.topic, s.cluster, s.color, s.addr)
- }
- // WaitClosing marks closing state and close when consumer stoped until 30s.
- func (s *Sub) WaitClosing() {
- s.waitClosing = true
- }
- func (s *Sub) fatal(err error) {
- if err == nil || s.closed {
- return
- }
- if s.psCommon != nil {
- s.psCommon.fatal(err)
- }
- s.Close()
- }
|