Source code for avroconvert.sources.s3.reader

import boto3 as bt
from os import getenv
from avroconvert import logger


[docs]class S3: ''' A class used to read files from amazon s3 and convert them to bytes :param access_key: AWS access key id :type access_key: str :param secret_key: AWS secret access key :type secret_key: str :param session_token: AWS session token (if any) :type session_token: str :param bucket: Name of the bucket in s3. This is where the avro file is read from :type bucket: str :param datatype: format of the source file, which will be read from s3. Default value is `avro` :type datatype: str :param prefix: prefix is the starting letters of the file names in the cloud storage. For example, if the bucket contains files with name `test-01`, `test-02` and `test-03`, the file prefix can be `test`. All the files with this prefix will be read :type prefix: str ''' def __init__(self, access_key: str = None, secret_key: str = None, session_token: str = None, bucket: str = None, prefix: str = '', datatype: str = 'avro'): ''' :param access_key: AWS access key id :type access_key: str :param secret_key: AWS secret access key :type secret_key: str :param session_token: AWS session token (if any) :type session_token: str :param bucket: Name of the bucket in s3. This is where the avro file is read from :type bucket: str :param datatype: format of the source file, which will be read from s3. Default value is `avro` :type datatype: str :param prefix: prefix is the starting letters of the file names in the cloud storage. For example, if the bucket contains files with name `test-01`, `test-02` and `test-03`, the file prefix can be `test`. All the files with this prefix will be read :type prefix: str ''' self.client = self._auth(access_key, secret_key, session_token, bucket) self.bucket = bucket logger.debug(f'Bucket name as received is {self.bucket}') self.datatype = datatype logger.debug(f'File datatype as received is {self.datatype}') self.prefix = prefix logger.debug(f'File prefix as received is {self.prefix}') def _auth(self, access_key: str = None, secret_key: str = None, session_token: str = None, bucket: str = None): ''' :param access_key: AWS access key id :type access_key: str :param secret_key: AWS secret access key :type secret_key: str :param session_token: AWS session token (if any) :type session_token: str :param bucket: Name of the bucket in s3. This is where the avro file is read from :type bucket: str :returns: amazon s3 bucket client object ''' if not getenv('BUCKET', bucket): raise AttributeError('Please pass the S3 bucket name') client_params = dict({'aws_access_key_id': getenv('AWS_ACCESS_KEY_ID') or access_key, 'aws_secret_access_key': getenv('AWS_SECRET_ACCESS_KEY') or secret_key, 'aws_session_token': getenv('AWS_SESSION_TOKEN') or session_token }) s3_client = bt.resource('s3', **client_params) return s3_client.Bucket(getenv('BUCKET', bucket)) def _extract_raw_data(self) -> dict: ''' It lists all the files in s3 bucket starting with a prefix (if prefix is passed). It then calls another method called `read_files` to read each file :returns: dictionary of bytes where each key of the dict is the file name of the input file and it's value is the data read (as bytes) from that file :rtype: dict ''' logger.info('Listing files in S3') s3_files = [y.key for y in self.client.objects.filter( Prefix=self.prefix)] if not s3_files: logger.info(f'No files with prefix {self.prefix} found in S3') return None data = {s3_file: self._read_files(filename=s3_file) for s3_file in s3_files if s3_file.endswith('.avro')} return data def _read_files(self, filename: str) -> bytes: ''' Read file from s3 and convert it into bytes :returns: avro file from s3, converted to bytes :rtype: bytes ''' logger.info(f'Reading file {filename} from S3 in bytes') data_s3_object = self.client.meta\ .client.get_object(Bucket=self.bucket, Key=filename) raw_data = data_s3_object['Body'].read() return raw_data
[docs] def get_data(self) -> dict: ''' Lists all files in S3 (filtered by prefix, if it is passed), reads the files as bytes and returns a list of data read from the files :returns: list all files' data from s3. Each element of the list is bytes :rtype: list ''' if self.datatype not in ['avro', 'json', 'csv', 'parquet']: return f'Given datatype {self.datatype} not supported yet' raw_data_dict = self._extract_raw_data() return raw_data_dict