123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package kube
- import (
- "bytes"
- "crypto/tls"
- "crypto/x509"
- "encoding/base64"
- "encoding/json"
- "errors"
- "flag"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "strconv"
- "strings"
- "time"
- "github.com/ghodss/yaml"
- "github.com/sirupsen/logrus"
- "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/util/sets"
- )
- var InClusterBaseURL string
- func init() {
- flag.StringVar(&InClusterBaseURL, "in-cluster-base-url", "https://kubernetes.default", "the base url to request k8s apiserver in cluster")
- }
- const (
- // TestContainerName specifies the primary container name.
- TestContainerName = "test"
- https = "https"
- maxRetries = 8
- retryDelay = 2 * time.Second
- requestTimeout = time.Minute
- // EmptySelector selects everything
- EmptySelector = ""
- // DefaultClusterAlias specifies the default cluster key to schedule jobs.
- DefaultClusterAlias = "default"
- )
- // newClient is used to allow mocking out the behavior of 'NewClient' while testing.
- var newClient = NewClient
- // Logger can print debug messages
- type Logger interface {
- Debugf(s string, v ...interface{})
- }
- // Client interacts with the Kubernetes api-server.
- type Client struct {
- // If logger is non-nil, log all method calls with it.
- logger Logger
- baseURL string
- deckURL string
- client *http.Client
- token string
- namespace string
- fake bool
- hiddenReposProvider func() []string
- hiddenOnly bool
- }
- // SetHiddenReposProvider takes a continuation that fetches a list of orgs and repos for
- // which PJs should not be returned.
- // NOTE: This function is not thread safe and should be called before the client is in use.
- func (c *Client) SetHiddenReposProvider(p func() []string, hiddenOnly bool) {
- c.hiddenReposProvider = p
- c.hiddenOnly = hiddenOnly
- }
- // Namespace returns a copy of the client pointing at the specified namespace.
- func (c *Client) Namespace(ns string) *Client {
- nc := *c
- nc.namespace = ns
- return &nc
- }
- func (c *Client) log(methodName string, args ...interface{}) {
- if c.logger == nil {
- return
- }
- var as []string
- for _, arg := range args {
- as = append(as, fmt.Sprintf("%v", arg))
- }
- c.logger.Debugf("%s(%s)", methodName, strings.Join(as, ", "))
- }
- // ConflictError is http 409.
- type ConflictError struct {
- e error
- }
- func (e ConflictError) Error() string {
- return e.e.Error()
- }
- // NewConflictError returns an error with the embedded inner error
- func NewConflictError(e error) ConflictError {
- return ConflictError{e: e}
- }
- // UnprocessableEntityError happens when the apiserver returns http 422.
- type UnprocessableEntityError struct {
- e error
- }
- func (e UnprocessableEntityError) Error() string {
- return e.e.Error()
- }
- // NewUnprocessableEntityError returns an error with the embedded inner error
- func NewUnprocessableEntityError(e error) UnprocessableEntityError {
- return UnprocessableEntityError{e: e}
- }
- // NotFoundError happens when the apiserver returns http 404
- type NotFoundError struct {
- e error
- }
- func (e NotFoundError) Error() string {
- return e.e.Error()
- }
- // NewNotFoundError returns an error with the embedded inner error
- func NewNotFoundError(e error) NotFoundError {
- return NotFoundError{e: e}
- }
- type request struct {
- method string
- path string
- deckPath string
- query map[string]string
- requestBody interface{}
- }
- func (c *Client) request(r *request, ret interface{}) error {
- out, err := c.requestRetry(r)
- if err != nil {
- return err
- }
- if ret != nil {
- if err := json.Unmarshal(out, ret); err != nil {
- return err
- }
- }
- return nil
- }
- func (c *Client) retry(r *request) (*http.Response, error) {
- var resp *http.Response
- var err error
- backoff := retryDelay
- for retries := 0; retries < maxRetries; retries++ {
- resp, err = c.doRequest(r.method, r.deckPath, r.path, r.query, r.requestBody)
- if err == nil {
- if resp.StatusCode < 500 {
- break
- }
- resp.Body.Close()
- }
- time.Sleep(backoff)
- backoff *= 2
- }
- return resp, err
- }
- // Retry on transport failures. Does not retry on 500s.
- func (c *Client) requestRetryStream(r *request) (io.ReadCloser, error) {
- if c.fake && r.deckPath == "" {
- return nil, nil
- }
- resp, err := c.retry(r)
- if err != nil {
- return nil, err
- }
- if resp.StatusCode == 409 {
- return nil, NewConflictError(fmt.Errorf("body cannot be streamed"))
- } else if resp.StatusCode < 200 || resp.StatusCode > 299 {
- return nil, fmt.Errorf("response has status \"%s\"", resp.Status)
- }
- return resp.Body, nil
- }
- // Retry on transport failures. Does not retry on 500s.
- func (c *Client) requestRetry(r *request) ([]byte, error) {
- if c.fake && r.deckPath == "" {
- return []byte("{}"), nil
- }
- resp, err := c.retry(r)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- rb, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return nil, err
- }
- if resp.StatusCode == 409 {
- return nil, NewConflictError(fmt.Errorf("body: %s", string(rb)))
- } else if resp.StatusCode == 422 {
- return nil, NewUnprocessableEntityError(fmt.Errorf("body: %s", string(rb)))
- } else if resp.StatusCode == 404 {
- return nil, NewNotFoundError(fmt.Errorf("body: %s", string(rb)))
- } else if resp.StatusCode < 200 || resp.StatusCode > 299 {
- return nil, fmt.Errorf("response has status \"%s\" and body \"%s\"", resp.Status, string(rb))
- }
- return rb, nil
- }
- func (c *Client) doRequest(method, deckPath, urlPath string, query map[string]string, body interface{}) (*http.Response, error) {
- url := c.baseURL + urlPath
- if c.deckURL != "" && deckPath != "" {
- url = c.deckURL + deckPath
- }
- var buf io.Reader
- if body != nil {
- b, err := json.Marshal(body)
- if err != nil {
- return nil, err
- }
- buf = bytes.NewBuffer(b)
- }
- req, err := http.NewRequest(method, url, buf)
- if err != nil {
- return nil, err
- }
- if c.token != "" {
- req.Header.Set("Authorization", "Bearer "+c.token)
- }
- if method == http.MethodPatch {
- req.Header.Set("Content-Type", "application/strategic-merge-patch+json")
- } else {
- req.Header.Set("Content-Type", "application/json")
- }
- q := req.URL.Query()
- for k, v := range query {
- q.Add(k, v)
- }
- req.URL.RawQuery = q.Encode()
- return c.client.Do(req)
- }
- // NewFakeClient creates a client that doesn't do anything. If you provide a
- // deck URL then the client will hit that for the supported calls.
- func NewFakeClient(deckURL string) *Client {
- return &Client{
- namespace: "default",
- deckURL: deckURL,
- client: &http.Client{},
- fake: true,
- }
- }
- // NewClientInCluster creates a Client that works from within a pod.
- func NewClientInCluster(namespace string) (*Client, error) {
- tokenFile := "/var/run/secrets/kubernetes.io/serviceaccount/token"
- token, err := ioutil.ReadFile(tokenFile)
- if err != nil {
- return nil, err
- }
- client := &http.Client{Timeout: requestTimeout}
- if strings.HasPrefix(InClusterBaseURL, https) {
- rootCAFile := "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
- certData, err := ioutil.ReadFile(rootCAFile)
- if err != nil {
- return nil, err
- }
- cp := x509.NewCertPool()
- cp.AppendCertsFromPEM(certData)
- client.Transport = &http.Transport{
- TLSClientConfig: &tls.Config{
- MinVersion: tls.VersionTLS12,
- RootCAs: cp,
- },
- }
- }
- return &Client{
- logger: logrus.WithField("client", "kube"),
- baseURL: InClusterBaseURL,
- client: client,
- token: string(token),
- namespace: namespace,
- }, nil
- }
- // Cluster represents the information necessary to talk to a Kubernetes
- // master endpoint.
- // NOTE: if your cluster runs on GKE you can use the following command to get these credentials:
- // gcloud --project <gcp_project> container clusters describe --zone <zone> <cluster_name>
- type Cluster struct {
- // The IP address of the cluster's master endpoint.
- Endpoint string `json:"endpoint"`
- // Base64-encoded public cert used by clients to authenticate to the
- // cluster endpoint.
- ClientCertificate string `json:"clientCertificate"`
- // Base64-encoded private key used by clients..
- ClientKey string `json:"clientKey"`
- // Base64-encoded public certificate that is the root of trust for the
- // cluster.
- ClusterCACertificate string `json:"clusterCaCertificate"`
- }
- // NewClientFromFile reads a Cluster object at clusterPath and returns an
- // authenticated client using the keys within.
- func NewClientFromFile(clusterPath, namespace string) (*Client, error) {
- data, err := ioutil.ReadFile(clusterPath)
- if err != nil {
- return nil, err
- }
- var c Cluster
- if err := yaml.Unmarshal(data, &c); err != nil {
- return nil, err
- }
- return NewClient(&c, namespace)
- }
- // UnmarshalClusterMap reads a map[string]Cluster in yaml bytes.
- func UnmarshalClusterMap(data []byte) (map[string]Cluster, error) {
- var raw map[string]Cluster
- if err := yaml.Unmarshal(data, &raw); err != nil {
- // If we failed to unmarshal the multicluster format try the single Cluster format.
- var singleConfig Cluster
- if err := yaml.Unmarshal(data, &singleConfig); err != nil {
- return nil, err
- }
- raw = map[string]Cluster{DefaultClusterAlias: singleConfig}
- }
- return raw, nil
- }
- // MarshalClusterMap writes c as yaml bytes.
- func MarshalClusterMap(c map[string]Cluster) ([]byte, error) {
- return yaml.Marshal(c)
- }
- // ClientMapFromFile reads the file at clustersPath and attempts to load a map of cluster aliases
- // to authenticated clients to the respective clusters.
- // The file at clustersPath is expected to be a yaml map from strings to Cluster structs OR it may
- // simply be a single Cluster struct which will be assigned the alias $DefaultClusterAlias.
- // If the file is an alias map, it must include the alias $DefaultClusterAlias.
- func ClientMapFromFile(clustersPath, namespace string) (map[string]*Client, error) {
- data, err := ioutil.ReadFile(clustersPath)
- if err != nil {
- return nil, fmt.Errorf("read error: %v", err)
- }
- raw, err := UnmarshalClusterMap(data)
- if err != nil {
- return nil, fmt.Errorf("unmarshal error: %v", err)
- }
- foundDefault := false
- result := map[string]*Client{}
- for alias, config := range raw {
- client, err := newClient(&config, namespace)
- if err != nil {
- return nil, fmt.Errorf("failed to load config for build cluster alias %q in file %q: %v", alias, clustersPath, err)
- }
- result[alias] = client
- if alias == DefaultClusterAlias {
- foundDefault = true
- }
- }
- if !foundDefault {
- return nil, fmt.Errorf("failed to find the required %q alias in build cluster config %q", DefaultClusterAlias, clustersPath)
- }
- return result, nil
- }
- // NewClient returns an authenticated Client using the keys in the Cluster.
- func NewClient(c *Cluster, namespace string) (*Client, error) {
- cc, err := base64.StdEncoding.DecodeString(c.ClientCertificate)
- if err != nil {
- return nil, err
- }
- ck, err := base64.StdEncoding.DecodeString(c.ClientKey)
- if err != nil {
- return nil, err
- }
- ca, err := base64.StdEncoding.DecodeString(c.ClusterCACertificate)
- if err != nil {
- return nil, err
- }
- cert, err := tls.X509KeyPair(cc, ck)
- if err != nil {
- return nil, err
- }
- cp := x509.NewCertPool()
- cp.AppendCertsFromPEM(ca)
- tr := &http.Transport{
- TLSClientConfig: &tls.Config{
- MinVersion: tls.VersionTLS12,
- Certificates: []tls.Certificate{cert},
- RootCAs: cp,
- },
- }
- return &Client{
- logger: logrus.WithField("client", "kube"),
- baseURL: c.Endpoint,
- client: &http.Client{Transport: tr, Timeout: requestTimeout},
- namespace: namespace,
- }, nil
- }
- // GetPod is analogous to kubectl get pods/NAME namespace=client.namespace
- func (c *Client) GetPod(name string) (Pod, error) {
- c.log("GetPod", name)
- var retPod Pod
- err := c.request(&request{
- path: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", c.namespace, name),
- }, &retPod)
- return retPod, err
- }
- // ListPods is analogous to kubectl get pods --selector=SELECTOR --namespace=client.namespace
- func (c *Client) ListPods(selector string) ([]Pod, error) {
- c.log("ListPods", selector)
- var pl struct {
- Items []Pod `json:"items"`
- }
- err := c.request(&request{
- path: fmt.Sprintf("/api/v1/namespaces/%s/pods", c.namespace),
- query: map[string]string{"labelSelector": selector},
- }, &pl)
- return pl.Items, err
- }
- // DeletePod deletes the pod at name in the client's default namespace.
- //
- // Analogous to kubectl delete pod
- func (c *Client) DeletePod(name string) error {
- c.log("DeletePod", name)
- return c.request(&request{
- method: http.MethodDelete,
- path: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", c.namespace, name),
- }, nil)
- }
- // CreateProwJob creates a prowjob in the client's default namespace.
- //
- // Analogous to kubectl create prowjob
- func (c *Client) CreateProwJob(j ProwJob) (ProwJob, error) {
- var representation string
- if out, err := json.Marshal(j); err == nil {
- representation = string(out[:])
- } else {
- representation = fmt.Sprintf("%v", j)
- }
- c.log("CreateProwJob", representation)
- var retJob ProwJob
- err := c.request(&request{
- method: http.MethodPost,
- path: fmt.Sprintf("/apis/prow.k8s.io/v1/namespaces/%s/prowjobs", c.namespace),
- requestBody: &j,
- }, &retJob)
- return retJob, err
- }
- func (c *Client) getHiddenRepos() sets.String {
- if c.hiddenReposProvider == nil {
- return nil
- }
- return sets.NewString(c.hiddenReposProvider()...)
- }
- func shouldHide(pj *ProwJob, hiddenRepos sets.String, showHiddenOnly bool) bool {
- if pj.Spec.Refs == nil {
- // periodic jobs do not have refs and therefore cannot be
- // hidden by the org/repo mechanism
- return false
- }
- shouldHide := hiddenRepos.HasAny(fmt.Sprintf("%s/%s", pj.Spec.Refs.Org, pj.Spec.Refs.Repo), pj.Spec.Refs.Org)
- if showHiddenOnly {
- return !shouldHide
- }
- return shouldHide
- }
- // GetProwJob returns the prowjob at name in the client's default namespace.
- //
- // Analogous to kubectl get prowjob/NAME
- func (c *Client) GetProwJob(name string) (ProwJob, error) {
- c.log("GetProwJob", name)
- var pj ProwJob
- err := c.request(&request{
- path: fmt.Sprintf("/apis/prow.k8s.io/v1/namespaces/%s/prowjobs/%s", c.namespace, name),
- }, &pj)
- if err == nil && shouldHide(&pj, c.getHiddenRepos(), c.hiddenOnly) {
- pj = ProwJob{}
- // Revealing the existence of this prow job is ok because the pj name cannot be used to
- // retrieve the pj itself. Furthermore, a timing attack could differentiate true 404s from
- // 404s returned when a hidden pj is queried so returning a 404 wouldn't hide the pj's existence.
- err = errors.New("403 ProwJob is hidden")
- }
- return pj, err
- }
- // ListProwJobs lists prowjobs using the specified labelSelector in the client's default namespace.
- //
- // Analogous to kubectl get prowjobs --selector=SELECTOR
- func (c *Client) ListProwJobs(selector string) ([]ProwJob, error) {
- c.log("ListProwJobs", selector)
- var jl struct {
- Items []ProwJob `json:"items"`
- }
- err := c.request(&request{
- path: fmt.Sprintf("/apis/prow.k8s.io/v1/namespaces/%s/prowjobs", c.namespace),
- deckPath: "/prowjobs.js",
- query: map[string]string{"labelSelector": selector},
- }, &jl)
- if err == nil {
- hidden := c.getHiddenRepos()
- var pjs []ProwJob
- for _, pj := range jl.Items {
- if !shouldHide(&pj, hidden, c.hiddenOnly) {
- pjs = append(pjs, pj)
- }
- }
- jl.Items = pjs
- }
- return jl.Items, err
- }
- // DeleteProwJob deletes the prowjob at name in the client's default namespace.
- func (c *Client) DeleteProwJob(name string) error {
- c.log("DeleteProwJob", name)
- return c.request(&request{
- method: http.MethodDelete,
- path: fmt.Sprintf("/apis/prow.k8s.io/v1/namespaces/%s/prowjobs/%s", c.namespace, name),
- }, nil)
- }
- // ReplaceProwJob will replace name with job in the client's default namespace.
- //
- // Analogous to kubectl replace prowjobs/NAME
- func (c *Client) ReplaceProwJob(name string, job ProwJob) (ProwJob, error) {
- c.log("ReplaceProwJob", name, job)
- var retJob ProwJob
- err := c.request(&request{
- method: http.MethodPut,
- path: fmt.Sprintf("/apis/prow.k8s.io/v1/namespaces/%s/prowjobs/%s", c.namespace, name),
- requestBody: &job,
- }, &retJob)
- return retJob, err
- }
- // CreatePod creates a pod in the client's default namespace.
- //
- // Analogous to kubectl create pod
- func (c *Client) CreatePod(p v1.Pod) (Pod, error) {
- c.log("CreatePod", p)
- var retPod Pod
- err := c.request(&request{
- method: http.MethodPost,
- path: fmt.Sprintf("/api/v1/namespaces/%s/pods", c.namespace),
- requestBody: &p,
- }, &retPod)
- return retPod, err
- }
- // GetLog returns the log of the default container in the specified pod, in the client's default namespace.
- //
- // Analogous to kubectl logs pod
- func (c *Client) GetLog(pod string) ([]byte, error) {
- c.log("GetLog", pod)
- return c.requestRetry(&request{
- path: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/log", c.namespace, pod),
- })
- }
- // GetLogTail returns the last n bytes of the log of the specified container in the specified pod,
- // in the client's default namespace.
- //
- // Analogous to kubectl logs pod --tail -1 --limit-bytes n -c container
- func (c *Client) GetLogTail(pod, container string, n int64) ([]byte, error) {
- c.log("GetLogTail", pod, n)
- return c.requestRetry(&request{
- path: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/log", c.namespace, pod),
- query: map[string]string{ // Because we want last n bytes, we fetch all lines and then limit to n bytes
- "tailLines": "-1",
- "container": container,
- "limitBytes": strconv.FormatInt(n, 10),
- },
- })
- }
- // GetContainerLog returns the log of a container in the specified pod, in the client's default namespace.
- //
- // Analogous to kubectl logs pod -c container
- func (c *Client) GetContainerLog(pod, container string) ([]byte, error) {
- c.log("GetContainerLog", pod)
- return c.requestRetry(&request{
- path: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/log", c.namespace, pod),
- query: map[string]string{"container": container},
- })
- }
- // CreateConfigMap creates a configmap.
- //
- // Analogous to kubectl create configmap
- func (c *Client) CreateConfigMap(content ConfigMap) (ConfigMap, error) {
- c.log("CreateConfigMap")
- var retConfigMap ConfigMap
- err := c.request(&request{
- method: http.MethodPost,
- path: fmt.Sprintf("/api/v1/namespaces/%s/configmaps", c.namespace),
- requestBody: &content,
- }, &retConfigMap)
- return retConfigMap, err
- }
- // GetConfigMap gets the configmap identified.
- func (c *Client) GetConfigMap(name, namespace string) (ConfigMap, error) {
- c.log("GetConfigMap", name)
- if namespace == "" {
- namespace = c.namespace
- }
- var retConfigMap ConfigMap
- err := c.request(&request{
- path: fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", namespace, name),
- }, &retConfigMap)
- return retConfigMap, err
- }
- // ReplaceConfigMap puts the configmap into name.
- //
- // Analogous to kubectl replace configmap
- //
- // If config.Namespace is empty, the client's default namespace is used.
- // Returns the content returned by the apiserver
- func (c *Client) ReplaceConfigMap(name string, config ConfigMap) (ConfigMap, error) {
- c.log("ReplaceConfigMap", name)
- namespace := c.namespace
- if config.Namespace != "" {
- namespace = config.Namespace
- }
- var retConfigMap ConfigMap
- err := c.request(&request{
- method: http.MethodPut,
- path: fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", namespace, name),
- requestBody: &config,
- }, &retConfigMap)
- return retConfigMap, err
- }
|