exec.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. /*
  2. Copyright 2018 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package exec
  14. import (
  15. "bytes"
  16. "context"
  17. "crypto/tls"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net"
  22. "net/http"
  23. "os"
  24. "os/exec"
  25. "reflect"
  26. "sync"
  27. "time"
  28. "github.com/golang/glog"
  29. "golang.org/x/crypto/ssh/terminal"
  30. "k8s.io/apimachinery/pkg/apis/meta/v1"
  31. "k8s.io/apimachinery/pkg/runtime"
  32. "k8s.io/apimachinery/pkg/runtime/schema"
  33. "k8s.io/apimachinery/pkg/runtime/serializer"
  34. "k8s.io/client-go/pkg/apis/clientauthentication"
  35. "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
  36. "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1"
  37. "k8s.io/client-go/tools/clientcmd/api"
  38. "k8s.io/client-go/transport"
  39. "k8s.io/client-go/util/connrotation"
  40. )
  41. const execInfoEnv = "KUBERNETES_EXEC_INFO"
  42. var scheme = runtime.NewScheme()
  43. var codecs = serializer.NewCodecFactory(scheme)
  44. func init() {
  45. v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
  46. v1alpha1.AddToScheme(scheme)
  47. v1beta1.AddToScheme(scheme)
  48. clientauthentication.AddToScheme(scheme)
  49. }
  50. var (
  51. // Since transports can be constantly re-initialized by programs like kubectl,
  52. // keep a cache of initialized authenticators keyed by a hash of their config.
  53. globalCache = newCache()
  54. // The list of API versions we accept.
  55. apiVersions = map[string]schema.GroupVersion{
  56. v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion,
  57. v1beta1.SchemeGroupVersion.String(): v1beta1.SchemeGroupVersion,
  58. }
  59. )
  60. func newCache() *cache {
  61. return &cache{m: make(map[string]*Authenticator)}
  62. }
  63. func cacheKey(c *api.ExecConfig) string {
  64. return fmt.Sprintf("%#v", c)
  65. }
  66. type cache struct {
  67. mu sync.Mutex
  68. m map[string]*Authenticator
  69. }
  70. func (c *cache) get(s string) (*Authenticator, bool) {
  71. c.mu.Lock()
  72. defer c.mu.Unlock()
  73. a, ok := c.m[s]
  74. return a, ok
  75. }
  76. // put inserts an authenticator into the cache. If an authenticator is already
  77. // associated with the key, the first one is returned instead.
  78. func (c *cache) put(s string, a *Authenticator) *Authenticator {
  79. c.mu.Lock()
  80. defer c.mu.Unlock()
  81. existing, ok := c.m[s]
  82. if ok {
  83. return existing
  84. }
  85. c.m[s] = a
  86. return a
  87. }
  88. // GetAuthenticator returns an exec-based plugin for providing client credentials.
  89. func GetAuthenticator(config *api.ExecConfig) (*Authenticator, error) {
  90. return newAuthenticator(globalCache, config)
  91. }
  92. func newAuthenticator(c *cache, config *api.ExecConfig) (*Authenticator, error) {
  93. key := cacheKey(config)
  94. if a, ok := c.get(key); ok {
  95. return a, nil
  96. }
  97. gv, ok := apiVersions[config.APIVersion]
  98. if !ok {
  99. return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
  100. }
  101. a := &Authenticator{
  102. cmd: config.Command,
  103. args: config.Args,
  104. group: gv,
  105. stdin: os.Stdin,
  106. stderr: os.Stderr,
  107. interactive: terminal.IsTerminal(int(os.Stdout.Fd())),
  108. now: time.Now,
  109. environ: os.Environ,
  110. }
  111. for _, env := range config.Env {
  112. a.env = append(a.env, env.Name+"="+env.Value)
  113. }
  114. return c.put(key, a), nil
  115. }
  116. // Authenticator is a client credential provider that rotates credentials by executing a plugin.
  117. // The plugin input and output are defined by the API group client.authentication.k8s.io.
  118. type Authenticator struct {
  119. // Set by the config
  120. cmd string
  121. args []string
  122. group schema.GroupVersion
  123. env []string
  124. // Stubbable for testing
  125. stdin io.Reader
  126. stderr io.Writer
  127. interactive bool
  128. now func() time.Time
  129. environ func() []string
  130. // Cached results.
  131. //
  132. // The mutex also guards calling the plugin. Since the plugin could be
  133. // interactive we want to make sure it's only called once.
  134. mu sync.Mutex
  135. cachedCreds *credentials
  136. exp time.Time
  137. onRotate func()
  138. }
  139. type credentials struct {
  140. token string
  141. cert *tls.Certificate
  142. }
  143. // UpdateTransportConfig updates the transport.Config to use credentials
  144. // returned by the plugin.
  145. func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
  146. wt := c.WrapTransport
  147. c.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
  148. if wt != nil {
  149. rt = wt(rt)
  150. }
  151. return &roundTripper{a, rt}
  152. }
  153. if c.TLS.GetCert != nil {
  154. return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")
  155. }
  156. c.TLS.GetCert = a.cert
  157. var dial func(ctx context.Context, network, addr string) (net.Conn, error)
  158. if c.Dial != nil {
  159. dial = c.Dial
  160. } else {
  161. dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext
  162. }
  163. d := connrotation.NewDialer(dial)
  164. a.onRotate = d.CloseAll
  165. c.Dial = d.DialContext
  166. return nil
  167. }
  168. type roundTripper struct {
  169. a *Authenticator
  170. base http.RoundTripper
  171. }
  172. func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  173. // If a user has already set credentials, use that. This makes commands like
  174. // "kubectl get --token (token) pods" work.
  175. if req.Header.Get("Authorization") != "" {
  176. return r.base.RoundTrip(req)
  177. }
  178. creds, err := r.a.getCreds()
  179. if err != nil {
  180. return nil, fmt.Errorf("getting credentials: %v", err)
  181. }
  182. if creds.token != "" {
  183. req.Header.Set("Authorization", "Bearer "+creds.token)
  184. }
  185. res, err := r.base.RoundTrip(req)
  186. if err != nil {
  187. return nil, err
  188. }
  189. if res.StatusCode == http.StatusUnauthorized {
  190. resp := &clientauthentication.Response{
  191. Header: res.Header,
  192. Code: int32(res.StatusCode),
  193. }
  194. if err := r.a.maybeRefreshCreds(creds, resp); err != nil {
  195. glog.Errorf("refreshing credentials: %v", err)
  196. }
  197. }
  198. return res, nil
  199. }
  200. func (a *Authenticator) credsExpired() bool {
  201. if a.exp.IsZero() {
  202. return false
  203. }
  204. return a.now().After(a.exp)
  205. }
  206. func (a *Authenticator) cert() (*tls.Certificate, error) {
  207. creds, err := a.getCreds()
  208. if err != nil {
  209. return nil, err
  210. }
  211. return creds.cert, nil
  212. }
  213. func (a *Authenticator) getCreds() (*credentials, error) {
  214. a.mu.Lock()
  215. defer a.mu.Unlock()
  216. if a.cachedCreds != nil && !a.credsExpired() {
  217. return a.cachedCreds, nil
  218. }
  219. if err := a.refreshCredsLocked(nil); err != nil {
  220. return nil, err
  221. }
  222. return a.cachedCreds, nil
  223. }
  224. // maybeRefreshCreds executes the plugin to force a rotation of the
  225. // credentials, unless they were rotated already.
  226. func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error {
  227. a.mu.Lock()
  228. defer a.mu.Unlock()
  229. // Since we're not making a new pointer to a.cachedCreds in getCreds, no
  230. // need to do deep comparison.
  231. if creds != a.cachedCreds {
  232. // Credentials already rotated.
  233. return nil
  234. }
  235. return a.refreshCredsLocked(r)
  236. }
  237. // refreshCredsLocked executes the plugin and reads the credentials from
  238. // stdout. It must be called while holding the Authenticator's mutex.
  239. func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error {
  240. cred := &clientauthentication.ExecCredential{
  241. Spec: clientauthentication.ExecCredentialSpec{
  242. Response: r,
  243. Interactive: a.interactive,
  244. },
  245. }
  246. env := append(a.environ(), a.env...)
  247. if a.group == v1alpha1.SchemeGroupVersion {
  248. // Input spec disabled for beta due to lack of use. Possibly re-enable this later if
  249. // someone wants it back.
  250. //
  251. // See: https://github.com/kubernetes/kubernetes/issues/61796
  252. data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
  253. if err != nil {
  254. return fmt.Errorf("encode ExecCredentials: %v", err)
  255. }
  256. env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data))
  257. }
  258. stdout := &bytes.Buffer{}
  259. cmd := exec.Command(a.cmd, a.args...)
  260. cmd.Env = env
  261. cmd.Stderr = a.stderr
  262. cmd.Stdout = stdout
  263. if a.interactive {
  264. cmd.Stdin = a.stdin
  265. }
  266. if err := cmd.Run(); err != nil {
  267. return fmt.Errorf("exec: %v", err)
  268. }
  269. _, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
  270. if err != nil {
  271. return fmt.Errorf("decoding stdout: %v", err)
  272. }
  273. if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
  274. return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
  275. a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
  276. }
  277. if cred.Status == nil {
  278. return fmt.Errorf("exec plugin didn't return a status field")
  279. }
  280. if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" {
  281. return fmt.Errorf("exec plugin didn't return a token or cert/key pair")
  282. }
  283. if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") {
  284. return fmt.Errorf("exec plugin returned only certificate or key, not both")
  285. }
  286. if cred.Status.ExpirationTimestamp != nil {
  287. a.exp = cred.Status.ExpirationTimestamp.Time
  288. } else {
  289. a.exp = time.Time{}
  290. }
  291. newCreds := &credentials{
  292. token: cred.Status.Token,
  293. }
  294. if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" {
  295. cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData))
  296. if err != nil {
  297. return fmt.Errorf("failed parsing client key/certificate: %v", err)
  298. }
  299. newCreds.cert = &cert
  300. }
  301. oldCreds := a.cachedCreds
  302. a.cachedCreds = newCreds
  303. // Only close all connections when TLS cert rotates. Token rotation doesn't
  304. // need the extra noise.
  305. if a.onRotate != nil && oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
  306. a.onRotate()
  307. }
  308. return nil
  309. }