Create Files

import io
import os
import hashlib
import requests

import pandas as pd

from pathlib import Path
from datetime import datetime
from urllib.parse import urljoin


def calculate_checksum(content):
    return hashlib.md5(content).hexdigest()
project_root = Path.cwd() / ".."
data_root = project_root / "example" / "data"
data_root.mkdir(exist_ok=True)
%%time
# dont_test
file_size = 1024 * 1024 * 10
num_to_hash = {}
for num in range(100):
    file_path = data_root / str(num)
    if file_path.exists():
        with file_path.open("rb") as f:
            data = f.read()
    else:
        with file_path.open("wb") as f:
            data = os.urandom(file_size)
            f.write(data)
    num_to_hash[num] = calculate_checksum(data)
CPU times: user 1.55 s, sys: 177 ms, total: 1.72 s
Wall time: 1.73 s

Plotting Helper

def df_from_timestamps(timestamps, file_size):
    min_started = min([started for started, stopped in timestamps])
    max_stopped = max([stopped for started, stopped in timestamps])
    index = pd.date_range(start=min_started, end=max_stopped, freq="ms")
    df = pd.DataFrame(index=index)
    for i, ts in enumerate(timestamps):
        start, end = ts
        duration = (end - start).total_seconds()
        bandwidth = (file_size / duration) / 10 ** 6
        column = f"client_{i}"
        df.loc[:, column] = 0
        df.loc[start:end, column] = bandwidth
    return df


def plot_download_df(df):
    ax = df.plot(figsize=(10, 6), legend=False, title="Bandwidth used by clients")
    ax.set_xlabel("Time")
    _ = ax.set_ylabel("MB/s")

Download Synchronously

Start server with:

cd example
gunicorn -w 2 -k uvicorn.workers.UvicornWorker -b :8000 "example.asgi:application"
timestamps = []
base_url = "http://localhost:8000/sync/"
for num, expected_hash in num_to_hash.items():
    url = urljoin(base_url, f"{num}")
    started = datetime.now()
    r = requests.get(url)
    stopped = datetime.now()
    timestamps.append((started, stopped))
    r.raise_for_status()
    actual_hash = calculate_checksum(r.content)
    assert expected_hash == actual_hash
df = df_from_timestamps(timestamps, file_size)
plot_download_df(df)

Downloads Concurrently

import gevent

from gevent import monkey

monkey.patch_all()


class Response:
    def __init__(self, url, content, started, stopped):
        self.url = url
        self.content = content
        self.started = started
        self.stopped = stopped


def streaming_fetch(url):
    chunks = []
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        started = datetime.now()
        for chunk in r.iter_content(chunk_size=4096):
            chunks.append(chunk)
    stopped = datetime.now()
    response = Response(url, b"".join(chunks), started, stopped)
    return response
<ipython-input-8-181895b379cf>:7: MonkeyPatchWarning: Monkey-patching ssl after ssl has already been imported may lead to errors, including RecursionError on Python 3.6. It may also silently lead to incorrect behaviour on Python 3.7. Please monkey-patch earlier. See https://github.com/gevent/gevent/issues/1016. Modules that had direct imports (NOT patched): ['urllib3.util.ssl_ (/Users/jochen/.virtualenvs/django-fileresponse/lib/python3.9/site-packages/urllib3/util/ssl_.py)', 'urllib3.util (/Users/jochen/.virtualenvs/django-fileresponse/lib/python3.9/site-packages/urllib3/util/__init__.py)']. 
  monkey.patch_all()
def make_async_requests(prefix):
    urls = []
    base_url = f"http://localhost:8000/{prefix}/"
    for num, expected_hash in num_to_hash.items():
        url = urljoin(base_url, f"{num}")
        urls.append(url)

    jobs = [gevent.spawn(streaming_fetch, _url) for _url in urls]
    responses = gevent.wait(jobs)
    responses = [r.value for r in responses]

    timestamps = []
    for num, response in enumerate(responses):
        timestamps.append((response.started, response.stopped))
        expected_hash = num_to_hash[num]
        actual_hash = calculate_checksum(response.content)
    return timestamps
timestamps = make_async_requests("async_filesystem")
df = df_from_timestamps(timestamps, file_size)
plot_download_df(df)
timestamps = make_async_requests("async_minio")
df = df_from_timestamps(timestamps, file_size)
plot_download_df(df)

Create MinIO Objects

from minio import Minio
from minio.error import S3Error


def get_minio_client_and_bucket(endpoint, params, bucket):
    client = Minio(endpoint, **params)
    found = client.bucket_exists(bucket)
    if not found:
        client.make_bucket(bucket)
    return client


def checksum_for_minio(client, bucket, key):
    try:
        response = client.get_object(bucket, key)
        data = response.read()
    finally:
        response.close()
        response.release_conn()
    return calculate_checksum(data)


def create_file_minio(client, bucket, key, size):
    data = os.urandom(size)

    result = client.put_object(
        bucket,
        key,
        io.BytesIO(data),
        size,
    )
    return calculate_checksum(data)
endpoint = "127.0.0.1:9000"
params = {
    "access_key": "minioadmin",
    "secret_key": "minioadmin",
    "secure": False,
}
bucket = "fileresponse"
client = get_minio_client_and_bucket(endpoint, params, bucket)

file_size = 1024 * 1024 * 10
num_to_hash = {}
for num in range(100):
    key = str(num)
    try:
        result = client.stat_object(bucket, key)
        checksum = checksum_for_minio(client, bucket, key)
    except S3Error:
        # object does not exist -> create
        checksum = create_file_minio(client, bucket, key, file_size)
    num_to_hash[num] = checksum