Source code for avroconvert.sources.gcs.reader

from google.cloud import storage
from os import getenv, path
from avroconvert import logger


[docs]class GCS: ''' A class used to read files from google cloud storage and convert them to list of bytes :param client: google cloud storage bucket client object. With this, we can interact with google cloud's bucket :type client: gcs bucket client :param bucket: Name of the bucket in google cloud storage. This is where the avro file is read from :type bucket: str :param datatype: format of the source file, which will be read from google cloud storage. Default value is `avro` :type datatype: str :param prefix: str prefix is the starting letters of the file names in the cloud storage. For example, if the bucket contains files with name `test-01`, `test-02` and `test-03`, the file prefix can be `test`. All the files with this prefix will be read :type prefix: str ''' def __init__(self, auth_file: str = None, bucket: str = None, datatype: str = 'avro', prefix: str = None): ''' :param auth_file: path to the google cloud service account json file :type auth_file: str :param bucket: Name of the bucket in google cloud storage. This is where the avro file is read from :type bucket: str :param datatype: str format of the source file, which will be read from google cloud storage. Default value is `avro` :type datatype: str :param prefix: str prefix is the starting letters of the file names in the cloud storage. For example, if the bucket contains files with name `test-01`, `test-02` and `test-03`, the file prefix can be `test`. All the files with this prefix will be read :type prefix: str ''' self.client = self._auth(auth_file=auth_file, bucket=bucket) self.bucket = bucket logger.debug(f'Bucket name as received is {self.bucket}') self.datatype = datatype logger.debug(f'File datatype as received is {self.datatype}') self.prefix = prefix logger.debug(f'File prefix as received is {self.prefix}') def _auth(self, auth_file: str = None, bucket: str = None): ''' This method authenticates the code to interact with google cloud, and creates the client object. :param auth_file: path to the google cloud service account json file :type bucket: str :param bucket: Name of the bucket in google cloud storage. This is where the avro file is read from :type bucket: str :returns: the client object is used to interact with google cloud storage :rtype : google storage client object ''' if not getenv('BUCKET', bucket): raise AttributeError('Please pass the GCS bucket name') if not getenv('GOOGLE_APPLICATION_CREDENTIALS', auth_file): err = 'Credentials not set. Please set GOOGLE_APPLICATION_CREDENTIALS or' err += ' pass the path of service account json file' raise AttributeError(err) gcs_client = storage.Client.from_service_account_json( getenv('GOOGLE_APPLICATION_CREDENTIALS', auth_file)) return gcs_client.get_bucket(getenv('BUCKET', bucket)) def _extract_raw_data(self) -> list: ''' It lists all the files in google storage bucket starting with a prefix (if prefix is passed). It then calls another method called `read_files` to read each file :returns: dictionary of bytes where each key of the dict is the file name of the input file and it's value is the data read (as bytes) from that file :rtype: dict ''' logger.info('Listing files in GCS') gcs_files = self._filter() if not gcs_files: logger.info(f'No files with prefix {self.prefix} found in GCS') return None data = {gcs_file: self._read_files(filename=gcs_file) for gcs_file in gcs_files if gcs_file.endswith('.avro')} return data def _filter(self): ''' Helper function to avoid reading empty folders from google storage buckets ''' files = list() for blob in self.client.list_blobs(prefix=self.prefix): try: filename = blob.name if filename.split('/')[1] == '' \ and path.dirname(filename) == filename.split('/')[0]: continue else: files.append(filename) except IndexError as e: files.append(filename) continue return files def _read_files(self, filename: str) -> bytes: ''' Read file from google cloud bucket and convert it into bytes :param filename: Name of the file to read from google cloud bucket :type filename: str :returns: avro file from google, converted to bytes :rtype: bytes ''' logger.info(f'Reading file {filename} from GCS in bytes') gcs_blob = self.client.get_blob(filename) raw_data = gcs_blob.download_as_string() return raw_data
[docs] def get_data(self) -> list: ''' Lists all files in S3 (filtering by prefix if one is provided), reads them as bytes, and returns a list of data read from the files. :returns: list of bytes where each element of the list represents a file read (in bytes) from Google Storage Bucket. :rtype: list ''' supported_types = ['avro'] if self.datatype not in supported_types: raise TypeError( f'Given datatype {self.datatype} not supported yet') raw_data_dict = self._extract_raw_data() return raw_data_dict