Source code for datakit_data.s3
import os
import logging
from logging import NullHandler
import subprocess
logger = logging.getLogger(__name__)
logger.addHandler(NullHandler())
[docs]class S3:
"""
A limited, human-friendly interface to S3.
:param aws_user_profile: From ~/.aws/credentials
:param bucket: S3 bucket URL
"""
def __init__(self, aws_user_profile, s3_bucket):
self.user_profile = aws_user_profile
self.bucket = s3_bucket
# Public
[docs] def push(self, data_dir, s3_path='', extra_flags=[]):
args = ('push', data_dir, s3_path, extra_flags)
payload = self.prepare_command_meta(*args)
self.run(payload['cmd'], payload['project_dir'])
[docs] def pull(self, data_dir, s3_path='', extra_flags=[]):
args = ('pull', data_dir, s3_path, extra_flags)
payload = self.prepare_command_meta(*args)
self.run(payload['cmd'], payload['project_dir'])
# Private
[docs] def run(self, cmd, project_dir):
logger.info("EXECUTING: {}".format(' '.join(cmd)))
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
logger.info(output.decode('utf-8'))
except subprocess.CalledProcessError as e:
msg = "\n*** Error ***\n{}\n".format(
e.output.decode('utf-8').strip()
)
logger.info(msg)
[docs] def build_s3_sync_cmd(self, source, target, extra_flags=[]):
cmd = [
'aws', 's3', 'sync',
'--profile', self.user_profile,
source,
target,
]
if extra_flags:
cmd.extend(extra_flags)
return cmd
[docs] def build_s3_url(self, s3_path=None):
target_url = self.s3_endpoint(s3_path)
if s3_path:
target_url += self.fix_uri_portion(s3_path)
return target_url
[docs] def s3_endpoint(self, s3_path):
return "s3://{}".format(self.bucket)
[docs] def fix_uri_portion(self, pth):
if not pth.startswith("/"):
pth = "/" + pth
if not pth.endswith("/"):
pth += "/"
return pth