[Frontend] Optionally remove memory buffer used for uploading to URLs in run_batch (#12927)

Signed-off-by: Pooya Davoodi <pooya.davoodi@parasail.io>
This commit is contained in:
Pooya Davoodi 2025-02-14 00:22:42 -08:00 committed by GitHub
parent 45f90bcbba
commit 185cc19f92
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import asyncio import asyncio
import tempfile
from http import HTTPStatus from http import HTTPStatus
from io import StringIO from io import StringIO
from typing import Awaitable, Callable, List, Optional from typing import Awaitable, Callable, List, Optional
@ -51,6 +52,13 @@ def parse_args():
help="The path or url to a single output file. Currently supports " help="The path or url to a single output file. Currently supports "
"local file paths, or web (http or https) urls. If a URL is specified," "local file paths, or web (http or https) urls. If a URL is specified,"
" the file should be available via HTTP PUT.") " the file should be available via HTTP PUT.")
parser.add_argument(
"--output-tmp-dir",
type=str,
default=None,
help="The directory to store the output file before uploading it "
"to the output URL.",
)
parser.add_argument("--response-role", parser.add_argument("--response-role",
type=nullable_str, type=nullable_str,
default="assistant", default="assistant",
@ -134,17 +142,107 @@ async def read_file(path_or_url: str) -> str:
return f.read() return f.read()
async def write_file(path_or_url: str, data: str) -> None: async def write_local_file(output_path: str,
batch_outputs: List[BatchRequestOutput]) -> None:
"""
Write the responses to a local file.
output_path: The path to write the responses to.
batch_outputs: The list of batch outputs to write.
"""
# We should make this async, but as long as run_batch runs as a
# standalone program, blocking the event loop won't effect performance.
with open(output_path, "w", encoding="utf-8") as f:
for o in batch_outputs:
print(o.model_dump_json(), file=f)
async def upload_data(output_url: str, data_or_file: str,
from_file: bool) -> None:
"""
Upload a local file to a URL.
output_url: The URL to upload the file to.
data_or_file: Either the data to upload or the path to the file to upload.
from_file: If True, data_or_file is the path to the file to upload.
"""
# Timeout is a common issue when uploading large files.
# We retry max_retries times before giving up.
max_retries = 5
# Number of seconds to wait before retrying.
delay = 5
for attempt in range(1, max_retries + 1):
try:
# We increase the timeout to 1000 seconds to allow
# for large files (default is 300).
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(
total=1000)) as session:
if from_file:
with open(data_or_file, "rb") as file:
async with session.put(output_url,
data=file) as response:
if response.status != 200:
raise Exception(f"Failed to upload file.\n"
f"Status: {response.status}\n"
f"Response: {response.text()}")
else:
async with session.put(output_url,
data=data_or_file) as response:
if response.status != 200:
raise Exception(f"Failed to upload data.\n"
f"Status: {response.status}\n"
f"Response: {response.text()}")
except Exception as e:
if attempt < max_retries:
logger.error(
f"Failed to upload data (attempt {attempt}). "
f"Error message: {str(e)}.\nRetrying in {delay} seconds..."
)
await asyncio.sleep(delay)
else:
raise Exception(f"Failed to upload data (attempt {attempt}). "
f"Error message: {str(e)}.") from e
async def write_file(path_or_url: str, batch_outputs: List[BatchRequestOutput],
output_tmp_dir: str) -> None:
"""
Write batch_outputs to a file or upload to a URL.
path_or_url: The path or URL to write batch_outputs to.
batch_outputs: The list of batch outputs to write.
output_tmp_dir: The directory to store the output file before uploading it
to the output URL.
"""
if path_or_url.startswith("http://") or path_or_url.startswith("https://"): if path_or_url.startswith("http://") or path_or_url.startswith("https://"):
async with aiohttp.ClientSession() as session, \ if output_tmp_dir is None:
session.put(path_or_url, data=data.encode("utf-8")): logger.info("Writing outputs to memory buffer")
pass output_buffer = StringIO()
for o in batch_outputs:
print(o.model_dump_json(), file=output_buffer)
output_buffer.seek(0)
logger.info("Uploading outputs to %s", path_or_url)
await upload_data(
path_or_url,
output_buffer.read().strip().encode("utf-8"),
from_file=False,
)
else:
# Write responses to a temporary file and then upload it to the URL.
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
dir=output_tmp_dir,
prefix="tmp_batch_output_",
suffix=".jsonl",
) as f:
logger.info("Writing outputs to temporary local file %s",
f.name)
await write_local_file(f.name, batch_outputs)
logger.info("Uploading outputs to %s", path_or_url)
await upload_data(path_or_url, f.name, from_file=True)
else: else:
# We should make this async, but as long as this is always run as a logger.info("Writing outputs to local file %s", path_or_url)
# standalone program, blocking the event loop won't effect performance await write_local_file(path_or_url, batch_outputs)
# in this particular case.
with open(path_or_url, "w", encoding="utf-8") as f:
f.write(data)
def make_error_request_output(request: BatchRequestInput, def make_error_request_output(request: BatchRequestInput,
@ -317,12 +415,7 @@ async def main(args):
with tracker.pbar(): with tracker.pbar():
responses = await asyncio.gather(*response_futures) responses = await asyncio.gather(*response_futures)
output_buffer = StringIO() await write_file(args.output_file, responses, args.output_tmp_dir)
for response in responses:
print(response.model_dump_json(), file=output_buffer)
output_buffer.seek(0)
await write_file(args.output_file, output_buffer.read().strip())
if __name__ == "__main__": if __name__ == "__main__":