Source code for bundle.core.downloader

# Copyright 2026 HorusElohim

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at

#   http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import asyncio
from pathlib import Path
from typing import Any
from urllib.parse import parse_qs, urlparse

import aiohttp
from aiofiles import open as aio_open
from tqdm.asyncio import tqdm_asyncio

from . import data, logger, tracer
from .entity import Entity

log = logger.get_logger(__name__)


[docs] class Downloader(Entity): """ Handles asynchronous downloading of files from a specified URL. Attributes: url (str): The URL to download the file from. destination (Path | None): The local file path to save the downloaded file. If None, data is stored in memory. chunk_size (int): The size of each chunk to download at a time. buffer (bytearray): A buffer to temporarily store the file's content if no destination is specified. Methods: start(byte_size: int): Placeholder for initialization logic before downloading starts. update(byte_count: int): Placeholder for update logic as chunks of data are downloaded. end(): Placeholder for cleanup logic after the download completes. download() -> bool: Asynchronously downloads a file from `url` to `destination` or to `buffer`. """ url: str destination: Path | None = None chunk_size: int = 4096 name: str = data.Field(default="downloader") _buffer: bytearray = data.PrivateAttr(default_factory=bytearray) _error_message: str = data.PrivateAttr(default="") @data.model_validator(mode="after") def _assign_name(self): """Use the destination or URL as a friendly identifier for logging.""" if self.name in {"", "default", "downloader"}: if self.destination: self.name = Path(self.destination).name else: self.name = self.url return self @property def buffer(self) -> bytearray: """Expose the in-memory buffer for downstream consumers.""" return self._buffer @property def error_message(self) -> str: return self._error_message def _request_headers(self) -> dict[str, str]: """ Return request headers for downloads. For YouTube media URLs, match the user-agent with the stream client profile (`c=` query param). """ parsed = urlparse(self.url) query = parse_qs(parsed.query) client = (query.get("c", [""])[0] or "").upper() user_agent = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" ) if client == "ANDROID": user_agent = ( "Mozilla/5.0 (Linux; Android 14; Pixel 8 Pro Build/AP2A.240805.005) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/131.0.0.0 Mobile Safari/537.36" ) elif client == "IOS": user_agent = ( "Mozilla/5.0 (iPhone; CPU iPhone OS 17_6 like Mac OS X) " "AppleWebKit/605.1.15 (KHTML, like Gecko) " "Version/17.0 Mobile/15E148 Safari/604.1" ) headers = { "User-Agent": user_agent, "Accept": "*/*", } host = (parsed.hostname or "").lower() if "youtube.com" in host: headers["Referer"] = "https://www.youtube.com/" headers["Origin"] = "https://www.youtube.com" elif "ytimg.com" in host: headers["Referer"] = "https://www.youtube.com/" return headers
[docs] def start(self, byte_size: int): """Initializes the download process. Placeholder for subclasses to implement.""" pass
[docs] def update(self, byte_count: int): """Updates the download progress. Placeholder for subclasses to implement.""" pass
[docs] def end(self): """Finalizes the download process. Placeholder for subclasses to implement.""" pass
[docs] async def download(self) -> bool: """ Asynchronously downloads a file from the specified URL. The file is either saved to the given destination path or stored in an in-memory buffer. Utilizes aiohttp for asynchronous HTTP requests and aiofiles for async file I/O operations. Returns: bool: True if the download was successful, False otherwise. """ status = False downloaded_bytes = 0 self._error_message = "" try: async with aiohttp.ClientSession() as session: # noqa: SIM117 async with session.get(self.url, headers=self._request_headers()) as response: if response.status not in {200, 206}: self._error_message = f"HTTP {response.status}" log.error(f"Error downloading {self.url}. Status: {response.status}") return False byte_size = int(response.headers.get("content-length", 0)) await tracer.Async.call_raise(self.start, byte_size) if self.destination: self.destination.parent.mkdir(parents=True, exist_ok=True) async with aio_open(self.destination, "wb") as fd: async for chunk in response.content.iter_chunked(self.chunk_size): await tracer.Async.call_raise(fd.write, chunk, log_level=logger.Level.VERBOSE) downloaded_bytes += len(chunk) await tracer.Async.call_raise( self.update, len(chunk), log_level=logger.Level.VERBOSE, ) await asyncio.sleep(0) else: async for chunk in response.content.iter_chunked(self.chunk_size): self._buffer.extend(chunk) downloaded_bytes += len(chunk) await tracer.Async.call_raise(self.update, len(chunk)) await asyncio.sleep(0) if downloaded_bytes <= 0: content_type = response.headers.get("content-type", "") self._error_message = f"Empty response body (status={response.status}, content-type={content_type})" log.error(f"Error downloading {self.url}. {self._error_message}") return False status = True except Exception as ex: self._error_message = str(ex) log.error(f"Error downloading {self.url}. Exception: {ex}") finally: await tracer.Async.call_raise(self.end) log.debug("%s", logger.Emoji.status(status)) return status
[docs] class DownloaderTQDM(Downloader): """ Extends Downloader with TQDM progress bar for visual feedback during download. Overrides the start, update, and end methods of Downloader to integrate a TQDM progress bar that updates with each downloaded chunk. """ _progress_bar: Any = data.PrivateAttr(default=None)
[docs] def start(self, byte_size: int): """Initializes the TQDM progress bar.""" self._progress_bar = tqdm_asyncio( total=byte_size, desc=f"Downloading {self.destination.name if self.destination else 'file'}", unit="B", unit_scale=True, unit_divisor=1024, )
[docs] def update(self, byte_count: int): """Updates the TQDM progress bar with the number of bytes downloaded.""" if self._progress_bar: self._progress_bar.update(byte_count)
[docs] def end(self): """Closes the TQDM progress bar.""" if self._progress_bar: self._progress_bar.close()