upload.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package gcs
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "os"
  19. "sync"
  20. "cloud.google.com/go/storage"
  21. "github.com/sirupsen/logrus"
  22. "k8s.io/test-infra/prow/errorutil"
  23. )
  24. // UploadFunc knows how to upload into an object
  25. type UploadFunc func(obj *storage.ObjectHandle) error
  26. // Upload uploads all of the data in the
  27. // uploadTargets map to GCS in parallel. The map is
  28. // keyed on GCS path under the bucket
  29. func Upload(bucket *storage.BucketHandle, uploadTargets map[string]UploadFunc) error {
  30. errCh := make(chan error, len(uploadTargets))
  31. group := &sync.WaitGroup{}
  32. group.Add(len(uploadTargets))
  33. for dest, upload := range uploadTargets {
  34. obj := bucket.Object(dest)
  35. logrus.WithField("dest", dest).Info("Queued for upload")
  36. go func(f UploadFunc, obj *storage.ObjectHandle, name string) {
  37. defer group.Done()
  38. if err := f(obj); err != nil {
  39. errCh <- err
  40. }
  41. logrus.WithField("dest", name).Info("Finished upload")
  42. }(upload, obj, dest)
  43. }
  44. group.Wait()
  45. close(errCh)
  46. if len(errCh) != 0 {
  47. var uploadErrors []error
  48. for err := range errCh {
  49. uploadErrors = append(uploadErrors, err)
  50. }
  51. return fmt.Errorf("encountered errors during upload: %v", uploadErrors)
  52. }
  53. return nil
  54. }
  55. // FileUpload returns an UploadFunc which copies all
  56. // data from the file on disk to the GCS object
  57. func FileUpload(file string) UploadFunc {
  58. return func(obj *storage.ObjectHandle) error {
  59. reader, err := os.Open(file)
  60. if err != nil {
  61. return err
  62. }
  63. uploadErr := DataUpload(reader)(obj)
  64. closeErr := reader.Close()
  65. return errorutil.NewAggregate(uploadErr, closeErr)
  66. }
  67. }
  68. // DataUpload returns an UploadFunc which copies all
  69. // data from src reader into GCS
  70. func DataUpload(src io.Reader) UploadFunc {
  71. return func(obj *storage.ObjectHandle) error {
  72. writer := obj.NewWriter(context.Background())
  73. _, copyErr := io.Copy(writer, src)
  74. closeErr := writer.Close()
  75. return errorutil.NewAggregate(copyErr, closeErr)
  76. }
  77. }