Source code for invenio_s3.storage

# -*- coding: utf-8 -*-
#
# Copyright (C) 2018, 2019, 2020, 2021 Esteban J. G. Gabancho.
#
# Invenio-S3 is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""S3 file storage interface."""

from functools import partial, wraps
from math import ceil

import s3fs
from flask import current_app
from invenio_files_rest.errors import StorageError
from invenio_files_rest.storage import PyFSFileStorage, pyfs_storage_factory

from .helpers import redirect_stream


[docs]def set_blocksize(f): """Decorator to set the correct block size according to file size.""" @wraps(f) def inner(self, *args, **kwargs): size = kwargs.get("size", None) block_size = ( ceil(size / current_app.config["S3_MAXIMUM_NUMBER_OF_PARTS"]) if size else current_app.config["S3_DEFAULT_BLOCK_SIZE"] ) if block_size > self.block_size: self.block_size = block_size return f(self, *args, **kwargs) return inner
[docs]class S3FSFileStorage(PyFSFileStorage): """File system storage using Amazon S3 API for accessing files.""" def __init__(self, fileurl, **kwargs): """Storage initialization.""" self.block_size = current_app.config["S3_DEFAULT_BLOCK_SIZE"] super(S3FSFileStorage, self).__init__(fileurl, **kwargs) def _get_fs(self, *args, **kwargs): """Get PyFilesystem instance and S3 real path.""" if not self.fileurl.startswith("s3://"): return super(S3FSFileStorage, self)._get_fs(*args, **kwargs) info = current_app.extensions["invenio-s3"].init_s3fs_info fs = s3fs.S3FileSystem(default_block_size=self.block_size, **info) return (fs, self.fileurl)
[docs] @set_blocksize def initialize(self, size=0): """Initialize file on storage and truncate to given size.""" fs, path = self._get_fs() if fs.exists(path): fp = fs.rm(path) fp = fs.open(path, mode="wb") try: to_write = size fs_chunk_size = fp.blocksize # Force write every time while to_write > 0: current_chunk_size = ( to_write if to_write <= fs_chunk_size else fs_chunk_size ) fp.write(b"\0" * current_chunk_size) to_write -= current_chunk_size except Exception: fp.close() self.delete() raise finally: fp.close() self._size = size return self.fileurl, size, None
[docs] def delete(self): """Delete a file.""" fs, path = self._get_fs() if fs.exists(path): fs.rm(path) return True
[docs] @set_blocksize def update( self, incoming_stream, seek=0, size=None, chunk_size=None, progress_callback=None, ): """Update a file in the file system.""" old_fp = self.open(mode="rb") updated_fp = S3FSFileStorage(self.fileurl, size=self._size).open(mode="wb") try: if seek >= 0: to_write = seek fs_chunk_size = updated_fp.blocksize while to_write > 0: current_chunk_size = ( to_write if to_write <= fs_chunk_size else fs_chunk_size ) updated_fp.write(old_fp.read(current_chunk_size)) to_write -= current_chunk_size bytes_written, checksum = self._write_stream( incoming_stream, updated_fp, chunk_size=chunk_size, size=size, progress_callback=progress_callback, ) if (bytes_written + seek) < self._size: old_fp.seek((bytes_written + seek)) to_write = self._size - (bytes_written + seek) fs_chunk_size = updated_fp.blocksize while to_write > 0: current_chunk_size = ( to_write if to_write <= fs_chunk_size else fs_chunk_size ) updated_fp.write(old_fp.read(current_chunk_size)) to_write -= current_chunk_size finally: old_fp.close() updated_fp.close() return bytes_written, checksum
[docs] def send_file( self, filename, mimetype=None, restricted=True, checksum=None, trusted=False, chunk_size=None, as_attachment=False, ): """Send the file to the client.""" try: fs, path = self._get_fs() s3_url_builder = partial( fs.url, path, expires=current_app.config["S3_URL_EXPIRATION"] ) return redirect_stream( s3_url_builder, filename, mimetype=mimetype, restricted=restricted, trusted=trusted, as_attachment=as_attachment, ) except Exception as e: raise StorageError("Could not send file: {}".format(e))
[docs] @set_blocksize def copy(self, src, *args, **kwargs): """Copy data from another file instance. If the source is an S3 stored object the copy process happens on the S3 server side, otherwise we use the normal ``FileStorage`` copy method. """ if src.fileurl.startswith("s3://"): fs, path = self._get_fs() fs.copy(src.fileurl, path) else: super(S3FSFileStorage, self).copy(src, *args, **kwargs)
[docs] @set_blocksize def save(self, *args, **kwargs): """Save incoming stream to storage. Just overwrite parent method to allow set the correct block size. """ return super(S3FSFileStorage, self).save(*args, **kwargs)
[docs]def s3fs_storage_factory(**kwargs): """File storage factory for S3.""" return pyfs_storage_factory(filestorage_class=S3FSFileStorage, **kwargs)