Source code for dymaxionlabs.upload

import requests
from google.resumable_media.requests import ResumableUpload
from google.resumable_media import common
from six.moves import http_client

_DEFAULT_RETRY_STRATEGY = common.RetryStrategy()
RETRYABLE = (
    common.TOO_MANY_REQUESTS,
    http_client.INTERNAL_SERVER_ERROR,
    http_client.BAD_GATEWAY,
    http_client.SERVICE_UNAVAILABLE,
    http_client.GATEWAY_TIMEOUT,
)


[docs]class CustomResumableUpload(ResumableUpload):
[docs] def initiate( self, stream, metadata, content_type, resumable_url, total_bytes=None, stream_final=True, ): method, url, payload, headers = self._prepare_initiate_request( stream, metadata, content_type, total_bytes=total_bytes, stream_final=stream_final, ) self._resumable_url = resumable_url return True
@classmethod def _transmit_chunk_wait_and_retry( cls, url, payload, headers, retry_strategy=_DEFAULT_RETRY_STRATEGY, ): response = requests.put( url, data=payload, headers=headers, ) if response.status_code not in RETRYABLE: return response total_sleep = 0.0 num_retries = 0 base_wait = 0.5 # When doubled will give 1.0 while retry_strategy.retry_allowed(total_sleep, num_retries): base_wait, wait_time = calculate_retry_wait( base_wait, retry_strategy.max_sleep) num_retries += 1 total_sleep += wait_time time.sleep(wait_time) response = requests.put( url, data=payload, headers=headers, ) if response.status_code not in RETRYABLE: return response return response
[docs] def transmit_next_chunk(self): method, url, payload, headers = self._prepare_request() response = self._transmit_chunk_wait_and_retry(url, payload, headers) self._process_response(response, len(payload)) return response