pubsub.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. package tcp
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "strconv"
  8. "time"
  9. "unicode"
  10. "go-common/app/infra/databus/conf"
  11. "go-common/library/conf/env"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. "github.com/Shopify/sarama"
  15. cluster "github.com/bsm/sarama-cluster"
  16. pb "github.com/gogo/protobuf/proto"
  17. )
  18. func stringify(b []byte) []byte {
  19. return bytes.Map(
  20. func(r rune) rune {
  21. if unicode.IsSymbol(r) || unicode.IsControl(r) {
  22. return rune('-')
  23. }
  24. return r
  25. },
  26. b,
  27. )
  28. }
  29. type proto struct {
  30. prefix byte
  31. integer int
  32. message string
  33. }
  34. // psCommon is pub sub common
  35. type psCommon struct {
  36. c *conn
  37. err error
  38. closed bool
  39. // kafka
  40. group string
  41. topic string
  42. cluster string
  43. addr string
  44. color []byte
  45. }
  46. func newPsCommon(c *conn, group, topic, color, cluster string) (ps *psCommon) {
  47. ps = &psCommon{
  48. c: c,
  49. group: group,
  50. topic: topic,
  51. cluster: cluster,
  52. color: []byte(color),
  53. }
  54. if c != nil {
  55. ps.addr = c.conn.RemoteAddr().String()
  56. }
  57. return
  58. }
  59. func (ps *psCommon) write(protos ...proto) (err error) {
  60. for _, p := range protos {
  61. if err = ps.c.Write(p); err != nil {
  62. return
  63. }
  64. }
  65. err = ps.c.Flush()
  66. return
  67. }
  68. func (ps *psCommon) batchWrite(protos []proto) (err error) {
  69. if err = ps.c.Write(proto{prefix: _protoArray, integer: len(protos)}); err != nil {
  70. return
  71. }
  72. for _, p := range protos {
  73. if err = ps.c.Write(p); err != nil {
  74. return
  75. }
  76. // FIXME(felix): 因为ops-log性能问题先屏蔽了
  77. if env.DeployEnv != env.DeployEnvProd {
  78. log.Info("batchWrite group(%s) topic(%s) cluster(%s) color(%s) addr(%s) consumer(%s) ok", ps.group, ps.topic, ps.cluster, ps.color, ps.addr, stringify([]byte(p.message)))
  79. }
  80. }
  81. err = ps.c.Flush()
  82. return
  83. }
  84. func (ps *psCommon) pong() (err error) {
  85. if err = ps.write(proto{prefix: _protoStr, message: _pong}); err != nil {
  86. return
  87. }
  88. log.Info("pong group(%s) topic(%s) cluster(%s) color(%s) addr(%s) ping success", ps.group, ps.topic, ps.cluster, ps.color, ps.addr)
  89. return
  90. }
  91. func (ps *psCommon) Closed() bool {
  92. return ps.closed
  93. }
  94. // Close 跟 redis 协议耦合的太紧,加个 sendRedisErr 开关
  95. func (ps *psCommon) Close(sendRedisErr bool) {
  96. if ps.closed {
  97. return
  98. }
  99. if ps.err == nil {
  100. ps.err = errConnClosedByServer // when closed by self, send close event to client.
  101. }
  102. // write error
  103. if ps.err != errConnRead && ps.err != errConnClosedByClient && sendRedisErr {
  104. ps.write(proto{prefix: _protoErr, message: ps.err.Error()})
  105. }
  106. if ps.c != nil {
  107. ps.c.Close()
  108. }
  109. ps.closed = true
  110. }
  111. func (ps *psCommon) fatal(err error) {
  112. if err == nil || ps.closed {
  113. return
  114. }
  115. ps.err = err
  116. ps.Close(true)
  117. }
  118. // Pub databus producer
  119. type Pub struct {
  120. *psCommon
  121. // producer
  122. producer sarama.SyncProducer
  123. }
  124. // NewPub new databus producer
  125. // http 接口复用此方法,c 传 nil
  126. func NewPub(c *conn, group, topic, color string, pCfg *conf.Kafka) (p *Pub, err error) {
  127. producer, err := newProducer(group, topic, pCfg)
  128. if err != nil {
  129. log.Error("group(%s) topic(%s) cluster(%s) NewPub producer error(%v)", group, topic, pCfg.Cluster, err)
  130. return
  131. }
  132. p = &Pub{
  133. psCommon: newPsCommon(c, group, topic, color, pCfg.Cluster),
  134. producer: producer,
  135. }
  136. // http 协议的连接不作处理
  137. if c != nil {
  138. // set producer read connection timeout
  139. p.c.readTimeout = _pubReadTimeout
  140. }
  141. log.Info("NewPub() success group(%s) topic(%s) color(%s) cluster(%s) addr(%s)", group, topic, color, pCfg.Cluster, p.addr)
  142. return
  143. }
  144. // Serve databus producer goroutine
  145. func (p *Pub) Serve() {
  146. var (
  147. err error
  148. cmd string
  149. args [][]byte
  150. )
  151. for {
  152. if cmd, args, err = p.c.Read(); err != nil {
  153. if err != io.EOF {
  154. log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) read error(%v)", p.group, p.topic, p.cluster, p.color, p.addr, err)
  155. }
  156. p.fatal(errConnRead)
  157. return
  158. }
  159. if p.Closed() {
  160. log.Warn("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) p.Closed()", p.group, p.topic, p.cluster, p.color, p.addr)
  161. return
  162. }
  163. select {
  164. case <-quit:
  165. p.fatal(errConnClosedByServer)
  166. return
  167. default:
  168. }
  169. switch cmd {
  170. case _auth:
  171. err = p.write(proto{prefix: _protoStr, message: _ok})
  172. case _ping:
  173. err = p.pong()
  174. case _set:
  175. if len(args) != 2 {
  176. p.write(proto{prefix: _protoErr, message: errPubParams.Error()})
  177. continue
  178. }
  179. err = p.publish(args[0], nil, args[1])
  180. case _hset:
  181. if len(args) != 3 {
  182. p.write(proto{prefix: _protoErr, message: errPubParams.Error()})
  183. continue
  184. }
  185. err = p.publish(args[0], args[1], args[2])
  186. case _quit:
  187. err = errConnClosedByClient
  188. default:
  189. err = errCmdNotSupport
  190. }
  191. if err != nil {
  192. p.fatal(err)
  193. log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) serve error(%v)", p.group, p.topic, p.cluster, p.color, p.addr, p.err)
  194. return
  195. }
  196. }
  197. }
  198. func (p *Pub) publish(key, header, value []byte) (err error) {
  199. if _, _, err = p.Publish(key, header, value); err != nil {
  200. return
  201. }
  202. return p.write(proto{prefix: _protoStr, message: _ok})
  203. }
  204. // Publish 发送消息 redis 和 http 协议共用
  205. func (p *Pub) Publish(key, header, value []byte) (partition int32, offset int64, err error) {
  206. var message = &sarama.ProducerMessage{
  207. Topic: p.topic,
  208. Key: sarama.ByteEncoder(key),
  209. Value: sarama.ByteEncoder(value),
  210. Headers: []sarama.RecordHeader{
  211. {Key: _headerColor, Value: p.color},
  212. {Key: _headerMetadata, Value: header},
  213. },
  214. }
  215. now := time.Now()
  216. // TODO(felix): support RecordHeader
  217. if partition, offset, err = p.producer.SendMessage(message); err != nil {
  218. log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) publish(%v) error(%v)", p.group, p.topic, p.cluster, p.color, p.addr, message, err)
  219. return
  220. }
  221. if svc != nil {
  222. svc.TimeProm.Timing(p.group, int64(time.Since(now)/time.Millisecond))
  223. svc.CountProm.Incr(_opProducerMsgSpeed, p.group, p.topic)
  224. }
  225. // FIXME(felix): 因为ops-log性能问题先屏蔽了
  226. if env.DeployEnv != env.DeployEnvProd {
  227. log.Info("publish group(%s) topic(%s) cluster(%s) color(%s) addr(%s) key(%s) header(%s) value(%s) ok", p.group, p.topic, p.cluster, p.color, p.addr, key, stringify(header), stringify(value))
  228. }
  229. return
  230. }
  231. // Sub databus consumer
  232. type Sub struct {
  233. *psCommon
  234. // kafka consumer
  235. consumer *cluster.Consumer
  236. waitClosing bool
  237. batch int
  238. // ticker
  239. ticker *time.Ticker
  240. }
  241. // NewSub new databus consumer
  242. func NewSub(c *conn, group, topic, color string, sCfg *conf.Kafka, batch int64) (s *Sub, err error) {
  243. select {
  244. case <-consumerLimter:
  245. default:
  246. }
  247. // NOTE color 用于染色消费消息过虑
  248. if color != "" {
  249. group = fmt.Sprintf("%s-%s", group, color)
  250. }
  251. if err = validate(group, topic, sCfg.Brokers); err != nil {
  252. return
  253. }
  254. s = &Sub{
  255. psCommon: newPsCommon(c, group, topic, color, sCfg.Cluster),
  256. ticker: time.NewTicker(_batchInterval),
  257. }
  258. if batch == 0 {
  259. s.batch = _batchNum
  260. } else {
  261. s.batch = int(batch)
  262. }
  263. // set consumer read connection timeout
  264. s.c.readTimeout = _subReadTimeout
  265. // cluster config
  266. cfg := cluster.NewConfig()
  267. cfg.Version = sarama.V1_0_0_0
  268. cfg.ClientID = fmt.Sprintf("%s-%s", group, topic)
  269. cfg.Net.KeepAlive = 30 * time.Second
  270. // NOTE cluster auto commit offset interval
  271. cfg.Consumer.Offsets.CommitInterval = time.Second * 1
  272. // NOTE set fetch.wait.max.ms
  273. cfg.Consumer.MaxWaitTime = time.Millisecond * 250
  274. cfg.Consumer.MaxProcessingTime = 50 * time.Millisecond
  275. // NOTE errors that occur during offset management,if enabled, c.Errors channel must be read
  276. cfg.Consumer.Return.Errors = true
  277. // NOTE notifications that occur during consumer, if enabled, c.Notifications channel must be read
  278. cfg.Group.Return.Notifications = true
  279. // The initial offset to use if no offset was previously committed.
  280. // default: OffsetOldest
  281. cfg.Consumer.Offsets.Initial = sarama.OffsetNewest
  282. if s.consumer, err = cluster.NewConsumer(sCfg.Brokers, group, []string{topic}, cfg); err != nil {
  283. log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) cluster.NewConsumer() error(%v)", s.group, s.topic, s.cluster, s.color, s.addr, err)
  284. } else {
  285. log.Info("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) cluster.NewConsumer() ok", s.group, s.topic, s.cluster, s.color, s.addr)
  286. }
  287. return
  288. }
  289. func validate(group, topic string, brokers []string) (err error) {
  290. var (
  291. cli *cluster.Client
  292. c *cluster.Config
  293. broker *sarama.Broker
  294. gresp *sarama.DescribeGroupsResponse
  295. memberAssignment *sarama.ConsumerGroupMemberAssignment
  296. consumerNum int
  297. partitions []int32
  298. )
  299. c = cluster.NewConfig()
  300. c.Version = sarama.V0_10_0_1
  301. if cli, err = cluster.NewClient(brokers, c); err != nil {
  302. log.Error("group(%s) topic(%s) cluster.NewClient() error(%v)", group, topic, err)
  303. err = errKafKaData
  304. return
  305. }
  306. defer cli.Close()
  307. if partitions, err = cli.Partitions(topic); err != nil {
  308. log.Error("group(%s) topic(%s) cli.Partitions error(%v)", group, topic, err)
  309. err = errKafKaData
  310. return
  311. }
  312. if len(partitions) <= 0 {
  313. err = errKafKaData
  314. return
  315. }
  316. if err = cli.RefreshCoordinator(group); err != nil {
  317. log.Error("group(%s) topic(%s) cli.RefreshCoordinator error(%v)", group, topic, err)
  318. err = errKafKaData
  319. return
  320. }
  321. if broker, err = cli.Coordinator(group); err != nil {
  322. log.Error("group(%s) topic(%s) cli.Coordinator error(%v)", group, topic, err)
  323. err = errKafKaData
  324. return
  325. }
  326. defer broker.Close()
  327. if gresp, err = broker.DescribeGroups(&sarama.DescribeGroupsRequest{
  328. Groups: []string{group},
  329. }); err != nil {
  330. log.Error("group(%s) topic(%s) cli.DescribeGroups error(%v)", group, topic, err)
  331. err = errKafKaData
  332. return
  333. }
  334. if len(gresp.Groups) != 1 {
  335. err = errKafKaData
  336. return
  337. }
  338. for _, member := range gresp.Groups[0].Members {
  339. if memberAssignment, err = member.GetMemberAssignment(); err != nil {
  340. log.Error("group(%s) topic(%s) member.GetMemberAssignment error(%v)", group, topic, err)
  341. err = errKafKaData
  342. return
  343. }
  344. for mtopic := range memberAssignment.Topics {
  345. if mtopic == topic {
  346. consumerNum++
  347. break
  348. }
  349. }
  350. }
  351. if consumerNum >= len(partitions) {
  352. err = errUseLessConsumer
  353. return
  354. }
  355. return nil
  356. }
  357. // Serve databus consumer goroutine
  358. func (s *Sub) Serve() {
  359. var (
  360. err error
  361. cmd string
  362. args [][]byte
  363. )
  364. defer func() {
  365. svc.CountProm.Decr(_opCurrentConsumer, s.group, s.topic)
  366. }()
  367. log.Info("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) begin serve", s.group, s.topic, s.cluster, s.color, s.addr)
  368. for {
  369. if cmd, args, err = s.c.Read(); err != nil {
  370. if err != io.EOF {
  371. log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) read error(%v)", s.group, s.topic, s.cluster, s.color, s.addr, err)
  372. }
  373. s.fatal(errConnRead)
  374. return
  375. }
  376. if s.consumer == nil {
  377. log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) s.consumer is nil", s.group, s.topic, s.cluster, s.color, s.addr)
  378. s.fatal(errConsumerClosed)
  379. return
  380. }
  381. if s.Closed() {
  382. log.Warn("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) s.Closed()", s.group, s.topic, s.cluster, s.color, s.addr)
  383. return
  384. }
  385. switch cmd {
  386. case _auth:
  387. err = s.write(proto{prefix: _protoStr, message: _ok})
  388. case _ping:
  389. err = s.pong()
  390. case _mget:
  391. var enc []byte
  392. if len(args) > 0 {
  393. enc = args[0]
  394. }
  395. err = s.message(enc)
  396. case _set:
  397. err = s.commit(args)
  398. case _quit:
  399. err = errConnClosedByClient
  400. default:
  401. err = errCmdNotSupport
  402. }
  403. if err != nil {
  404. s.fatal(err)
  405. log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) serve error(%v)", s.group, s.topic, s.cluster, s.color, s.addr, err)
  406. return
  407. }
  408. }
  409. }
  410. func (s *Sub) message(enc []byte) (err error) {
  411. var (
  412. msg *sarama.ConsumerMessage
  413. notify *cluster.Notification
  414. protos []proto
  415. ok bool
  416. bs []byte
  417. last = time.Now()
  418. ret = &databus.MessagePB{}
  419. p = proto{prefix: _protoBulk}
  420. )
  421. for {
  422. select {
  423. case err = <-s.consumer.Errors():
  424. log.Error("group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", s.group, s.topic, s.cluster, s.addr, err)
  425. return
  426. case notify, ok = <-s.consumer.Notifications():
  427. if !ok {
  428. log.Info("notification notOk group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", s.group, s.topic, s.cluster, s.addr, err)
  429. err = errClosedNotifyChannel
  430. return
  431. }
  432. switch notify.Type {
  433. case cluster.UnknownNotification, cluster.RebalanceError:
  434. log.Error("notification(%s) group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", notify.Type, s.group, s.topic, s.cluster, s.addr, err)
  435. err = errClosedNotifyChannel
  436. return
  437. case cluster.RebalanceStart:
  438. log.Info("notification(%s) group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", notify.Type, s.group, s.topic, s.cluster, s.addr, err)
  439. continue
  440. case cluster.RebalanceOK:
  441. log.Info("notification(%s) group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", notify.Type, s.group, s.topic, s.cluster, s.addr, err)
  442. }
  443. if len(notify.Current[s.topic]) == 0 {
  444. log.Warn("notification(%s) no topic group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", notify.Type, s.group, s.topic, s.cluster, s.addr, err)
  445. err = errConsumerOver
  446. return
  447. }
  448. case msg, ok = <-s.consumer.Messages():
  449. if !ok {
  450. log.Error("group(%s) topic(%s) cluster(%s) addr(%s) message channel closed", s.group, s.topic, s.cluster, s.addr)
  451. err = errClosedMsgChannel
  452. return
  453. }
  454. // reset timestamp
  455. last = time.Now()
  456. ret.Key = string(msg.Key)
  457. ret.Value = msg.Value
  458. ret.Topic = s.topic
  459. ret.Partition = msg.Partition
  460. ret.Offset = msg.Offset
  461. ret.Timestamp = msg.Timestamp.Unix()
  462. if len(msg.Headers) > 0 {
  463. var notMatchColor bool
  464. for _, h := range msg.Headers {
  465. if bytes.Equal(h.Key, _headerColor) && !bytes.Equal(h.Value, s.color) {
  466. // match color
  467. notMatchColor = true
  468. } else if bytes.Equal(h.Key, _headerMetadata) && h.Value != nil {
  469. // parse metadata
  470. dh := new(databus.Header)
  471. if err = pb.Unmarshal(h.Value, dh); err != nil {
  472. log.Error("pb.Unmarshal(%s) error(%v)", h.Value, err)
  473. err = nil
  474. } else {
  475. ret.Metadata = dh.Metadata
  476. }
  477. }
  478. }
  479. if notMatchColor {
  480. continue
  481. }
  482. }
  483. if bytes.Equal(enc, _encodePB) {
  484. // encode to pb bytes
  485. if bs, err = pb.Marshal(ret); err != nil {
  486. log.Error("proto.Marshal(%v) error(%v)", ret, err)
  487. s.consumer.MarkPartitionOffset(s.topic, msg.Partition, msg.Offset, "")
  488. return s.write(proto{prefix: _protoErr, message: errMsgFormat.Error()})
  489. }
  490. } else {
  491. // encode to json bytes
  492. if bs, err = json.Marshal(ret); err != nil {
  493. log.Error("json.Marshal(%v) error(%v)", ret, err)
  494. s.consumer.MarkPartitionOffset(s.topic, msg.Partition, msg.Offset, "")
  495. return s.write(proto{prefix: _protoErr, message: errMsgFormat.Error()})
  496. }
  497. }
  498. svc.StatProm.State(_opPartitionOffset, msg.Offset, s.group, s.topic, strconv.Itoa(int(msg.Partition)))
  499. svc.CountProm.Incr(_opConsumerMsgSpeed, s.group, s.topic)
  500. svc.StatProm.Incr(_opConsumerPartition, s.group, s.topic, strconv.Itoa(int(msg.Partition)))
  501. p.message = string(bs)
  502. protos = append(protos, p)
  503. if len(protos) >= s.batch {
  504. return s.batchWrite(protos)
  505. }
  506. case <-s.ticker.C:
  507. if len(protos) != 0 {
  508. return s.batchWrite(protos)
  509. }
  510. if time.Since(last) < _batchTimeout {
  511. continue
  512. }
  513. if s.waitClosing {
  514. log.Info("consumer group(%s) topic(%s) cluster(%s) addr(%s) wait closing then exit,maybe cluster changed", s.group, s.topic, s.cluster, s.addr)
  515. err = errConsumerTimeout
  516. return
  517. }
  518. return s.batchWrite(protos)
  519. }
  520. }
  521. }
  522. func (s *Sub) commit(args [][]byte) (err error) {
  523. var (
  524. partition, offset int64
  525. )
  526. if len(args) != 2 {
  527. log.Error("group(%v) topic(%v) cluster(%s) addr(%s) commit offset error, args(%v) is illegal", s.group, s.topic, s.cluster, s.addr, args)
  528. // write error
  529. return s.write(proto{prefix: _protoErr, message: errCommitParams.Error()})
  530. }
  531. if partition, err = strconv.ParseInt(string(args[0]), 10, 32); err != nil {
  532. return s.write(proto{prefix: _protoErr, message: errCommitParams.Error()})
  533. }
  534. if offset, err = strconv.ParseInt(string(args[1]), 10, 64); err != nil {
  535. return s.write(proto{prefix: _protoErr, message: errCommitParams.Error()})
  536. }
  537. // mark partition offset
  538. s.consumer.MarkPartitionOffset(s.topic, int32(partition), offset, "")
  539. // FIXME(felix): 因为ops-log性能问题先屏蔽了
  540. if env.DeployEnv != env.DeployEnvProd {
  541. log.Info("commit group(%s) topic(%s) cluster(%s) color(%s) addr(%s) partition(%d) offset(%d) mark offset succeed", s.group, s.topic, s.cluster, s.color, s.addr, partition, offset)
  542. }
  543. return s.write(proto{prefix: _protoStr, message: _ok})
  544. }
  545. // Closed judge if consumer is closed
  546. func (s *Sub) Closed() bool {
  547. return s.psCommon != nil && s.psCommon.Closed()
  548. }
  549. // Close close consumer
  550. func (s *Sub) Close() {
  551. if !s.psCommon.Closed() {
  552. s.psCommon.Close(true)
  553. }
  554. if s.consumer != nil {
  555. s.consumer.Close()
  556. s.consumer = nil
  557. }
  558. if s.ticker != nil {
  559. s.ticker.Stop()
  560. }
  561. log.Info("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) consumer exit", s.group, s.topic, s.cluster, s.color, s.addr)
  562. }
  563. // WaitClosing marks closing state and close when consumer stoped until 30s.
  564. func (s *Sub) WaitClosing() {
  565. s.waitClosing = true
  566. }
  567. func (s *Sub) fatal(err error) {
  568. if err == nil || s.closed {
  569. return
  570. }
  571. if s.psCommon != nil {
  572. s.psCommon.fatal(err)
  573. }
  574. s.Close()
  575. }