1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package gcs
- import (
- "context"
- "fmt"
- "io"
- "os"
- "sync"
- "cloud.google.com/go/storage"
- "github.com/sirupsen/logrus"
- "k8s.io/test-infra/prow/errorutil"
- )
- // UploadFunc knows how to upload into an object
- type UploadFunc func(obj *storage.ObjectHandle) error
- // Upload uploads all of the data in the
- // uploadTargets map to GCS in parallel. The map is
- // keyed on GCS path under the bucket
- func Upload(bucket *storage.BucketHandle, uploadTargets map[string]UploadFunc) error {
- errCh := make(chan error, len(uploadTargets))
- group := &sync.WaitGroup{}
- group.Add(len(uploadTargets))
- for dest, upload := range uploadTargets {
- obj := bucket.Object(dest)
- logrus.WithField("dest", dest).Info("Queued for upload")
- go func(f UploadFunc, obj *storage.ObjectHandle, name string) {
- defer group.Done()
- if err := f(obj); err != nil {
- errCh <- err
- }
- logrus.WithField("dest", name).Info("Finished upload")
- }(upload, obj, dest)
- }
- group.Wait()
- close(errCh)
- if len(errCh) != 0 {
- var uploadErrors []error
- for err := range errCh {
- uploadErrors = append(uploadErrors, err)
- }
- return fmt.Errorf("encountered errors during upload: %v", uploadErrors)
- }
- return nil
- }
- // FileUpload returns an UploadFunc which copies all
- // data from the file on disk to the GCS object
- func FileUpload(file string) UploadFunc {
- return func(obj *storage.ObjectHandle) error {
- reader, err := os.Open(file)
- if err != nil {
- return err
- }
- uploadErr := DataUpload(reader)(obj)
- closeErr := reader.Close()
- return errorutil.NewAggregate(uploadErr, closeErr)
- }
- }
- // DataUpload returns an UploadFunc which copies all
- // data from src reader into GCS
- func DataUpload(src io.Reader) UploadFunc {
- return func(obj *storage.ObjectHandle) error {
- writer := obj.NewWriter(context.Background())
- _, copyErr := io.Copy(writer, src)
- closeErr := writer.Close()
- return errorutil.NewAggregate(copyErr, closeErr)
- }
- }
|