import io
import os
import hashlib
import requests
import pandas as pd
from pathlib import Path
from datetime import datetime
from urllib.parse import urljoin
def calculate_checksum(content):
return hashlib.md5(content).hexdigest()
project_root = Path.cwd() / ".."
data_root = project_root / "example" / "data"
data_root.mkdir(exist_ok=True)
%%time
# dont_test
file_size = 1024 * 1024 * 10
num_to_hash = {}
for num in range(100):
file_path = data_root / str(num)
if file_path.exists():
with file_path.open("rb") as f:
data = f.read()
else:
with file_path.open("wb") as f:
data = os.urandom(file_size)
f.write(data)
num_to_hash[num] = calculate_checksum(data)
def df_from_timestamps(timestamps, file_size):
min_started = min([started for started, stopped in timestamps])
max_stopped = max([stopped for started, stopped in timestamps])
index = pd.date_range(start=min_started, end=max_stopped, freq="ms")
df = pd.DataFrame(index=index)
for i, ts in enumerate(timestamps):
start, end = ts
duration = (end - start).total_seconds()
bandwidth = (file_size / duration) / 10 ** 6
column = f"client_{i}"
df.loc[:, column] = 0
df.loc[start:end, column] = bandwidth
return df
def plot_download_df(df):
ax = df.plot(figsize=(10, 6), legend=False, title="Bandwidth used by clients")
ax.set_xlabel("Time")
_ = ax.set_ylabel("MB/s")
timestamps = []
base_url = "http://localhost:8000/sync/"
for num, expected_hash in num_to_hash.items():
url = urljoin(base_url, f"{num}")
started = datetime.now()
r = requests.get(url)
stopped = datetime.now()
timestamps.append((started, stopped))
r.raise_for_status()
actual_hash = calculate_checksum(r.content)
assert expected_hash == actual_hash
df = df_from_timestamps(timestamps, file_size)
plot_download_df(df)
import gevent
from gevent import monkey
monkey.patch_all()
class Response:
def __init__(self, url, content, started, stopped):
self.url = url
self.content = content
self.started = started
self.stopped = stopped
def streaming_fetch(url):
chunks = []
with requests.get(url, stream=True) as r:
r.raise_for_status()
started = datetime.now()
for chunk in r.iter_content(chunk_size=4096):
chunks.append(chunk)
stopped = datetime.now()
response = Response(url, b"".join(chunks), started, stopped)
return response
def make_async_requests(prefix):
urls = []
base_url = f"http://localhost:8000/{prefix}/"
for num, expected_hash in num_to_hash.items():
url = urljoin(base_url, f"{num}")
urls.append(url)
jobs = [gevent.spawn(streaming_fetch, _url) for _url in urls]
responses = gevent.wait(jobs)
responses = [r.value for r in responses]
timestamps = []
for num, response in enumerate(responses):
timestamps.append((response.started, response.stopped))
expected_hash = num_to_hash[num]
actual_hash = calculate_checksum(response.content)
return timestamps
timestamps = make_async_requests("async_filesystem")
df = df_from_timestamps(timestamps, file_size)
plot_download_df(df)
timestamps = make_async_requests("async_minio")
df = df_from_timestamps(timestamps, file_size)
plot_download_df(df)
from minio import Minio
from minio.error import S3Error
def get_minio_client_and_bucket(endpoint, params, bucket):
client = Minio(endpoint, **params)
found = client.bucket_exists(bucket)
if not found:
client.make_bucket(bucket)
return client
def checksum_for_minio(client, bucket, key):
try:
response = client.get_object(bucket, key)
data = response.read()
finally:
response.close()
response.release_conn()
return calculate_checksum(data)
def create_file_minio(client, bucket, key, size):
data = os.urandom(size)
result = client.put_object(
bucket,
key,
io.BytesIO(data),
size,
)
return calculate_checksum(data)
endpoint = "127.0.0.1:9000"
params = {
"access_key": "minioadmin",
"secret_key": "minioadmin",
"secure": False,
}
bucket = "fileresponse"
client = get_minio_client_and_bucket(endpoint, params, bucket)
file_size = 1024 * 1024 * 10
num_to_hash = {}
for num in range(100):
key = str(num)
try:
result = client.stat_object(bucket, key)
checksum = checksum_for_minio(client, bucket, key)
except S3Error:
# object does not exist -> create
checksum = create_file_minio(client, bucket, key, file_size)
num_to_hash[num] = checksum