server.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. //go:generate ./regenerate.sh
  19. // Package health provides a service that exposes server's health and it must be
  20. // imported to enable support for client-side health checks.
  21. package health
  22. import (
  23. "context"
  24. "sync"
  25. "google.golang.org/grpc/codes"
  26. healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
  27. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  28. "google.golang.org/grpc/status"
  29. )
  30. // Server implements `service Health`.
  31. type Server struct {
  32. mu sync.Mutex
  33. // statusMap stores the serving status of the services this Server monitors.
  34. statusMap map[string]healthpb.HealthCheckResponse_ServingStatus
  35. updates map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus
  36. }
  37. // NewServer returns a new Server.
  38. func NewServer() *Server {
  39. return &Server{
  40. statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING},
  41. updates: make(map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus),
  42. }
  43. }
  44. // Check implements `service Health`.
  45. func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
  46. s.mu.Lock()
  47. defer s.mu.Unlock()
  48. if servingStatus, ok := s.statusMap[in.Service]; ok {
  49. return &healthpb.HealthCheckResponse{
  50. Status: servingStatus,
  51. }, nil
  52. }
  53. return nil, status.Error(codes.NotFound, "unknown service")
  54. }
  55. // Watch implements `service Health`.
  56. func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  57. service := in.Service
  58. // update channel is used for getting service status updates.
  59. update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1)
  60. s.mu.Lock()
  61. // Puts the initial status to the channel.
  62. if servingStatus, ok := s.statusMap[service]; ok {
  63. update <- servingStatus
  64. } else {
  65. update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN
  66. }
  67. // Registers the update channel to the correct place in the updates map.
  68. if _, ok := s.updates[service]; !ok {
  69. s.updates[service] = make(map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus)
  70. }
  71. s.updates[service][stream] = update
  72. defer func() {
  73. s.mu.Lock()
  74. delete(s.updates[service], stream)
  75. s.mu.Unlock()
  76. }()
  77. s.mu.Unlock()
  78. var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1
  79. for {
  80. select {
  81. // Status updated. Sends the up-to-date status to the client.
  82. case servingStatus := <-update:
  83. if lastSentStatus == servingStatus {
  84. continue
  85. }
  86. lastSentStatus = servingStatus
  87. err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus})
  88. if err != nil {
  89. return status.Error(codes.Canceled, "Stream has ended.")
  90. }
  91. // Context done. Removes the update channel from the updates map.
  92. case <-stream.Context().Done():
  93. return status.Error(codes.Canceled, "Stream has ended.")
  94. }
  95. }
  96. }
  97. // SetServingStatus is called when need to reset the serving status of a service
  98. // or insert a new service entry into the statusMap.
  99. func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
  100. s.mu.Lock()
  101. defer s.mu.Unlock()
  102. s.statusMap[service] = servingStatus
  103. for _, update := range s.updates[service] {
  104. // Clears previous updates, that are not sent to the client, from the channel.
  105. // This can happen if the client is not reading and the server gets flow control limited.
  106. select {
  107. case <-update:
  108. default:
  109. }
  110. // Puts the most recent update to the channel.
  111. update <- servingStatus
  112. }
  113. }