diff --git a/src/data_designer/config/utils/visualization.py b/src/data_designer/config/utils/visualization.py index 85a230a95..0f0725acb 100644 --- a/src/data_designer/config/utils/visualization.py +++ b/src/data_designer/config/utils/visualization.py @@ -123,7 +123,7 @@ def display_sample_record( display_sample_record( record=record, processor_data_to_display=processor_data_to_display, - config_builder=self._config_builder, + config_builder=self.config_builder, background_color=background_color, syntax_highlighting_theme=syntax_highlighting_theme, hide_seed_columns=hide_seed_columns, diff --git a/src/data_designer/integrations/huggingface/__init__.py b/src/data_designer/integrations/huggingface/__init__.py new file mode 100644 index 000000000..a379d8a18 --- /dev/null +++ b/src/data_designer/integrations/huggingface/__init__.py @@ -0,0 +1,8 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from data_designer.integrations.huggingface.client import HuggingFaceHubClient, resolve_hf_token +from data_designer.integrations.huggingface.hub_results import HubDatasetResults +from data_designer.integrations.huggingface.reconstruction import reconstruct_dataset_creation_results + +__all__ = ["HuggingFaceHubClient", "HubDatasetResults", "resolve_hf_token", "reconstruct_dataset_creation_results"] diff --git a/src/data_designer/integrations/huggingface/client.py b/src/data_designer/integrations/huggingface/client.py new file mode 100644 index 000000000..d22a2064a --- /dev/null +++ b/src/data_designer/integrations/huggingface/client.py @@ -0,0 +1,1061 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import json +import logging +import shutil +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import Any, Protocol + +import pandas as pd +from datasets import Dataset, DatasetDict, load_dataset +from huggingface_hub import DatasetCardData, HfApi, get_token, hf_hub_download, list_repo_files +from huggingface_hub.utils import HfHubHTTPError + +from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults +from data_designer.config.column_types import DataDesignerColumnType, get_column_display_order +from data_designer.config.config_builder import DataDesignerConfigBuilder +from data_designer.engine.analysis.utils.column_statistics_calculations import ( + convert_pyarrow_dtype_to_simple_dtype, +) +from data_designer.engine.dataset_builders.errors import ArtifactStorageError +from data_designer.integrations.huggingface.dataset_card import DataDesignerDatasetCard +from data_designer.integrations.huggingface.hub_results import HubDatasetResults + +logger = logging.getLogger(__name__) + + +class HasDataset(Protocol): + """Protocol for classes that have a load_dataset method.""" + + def load_dataset(self) -> pd.DataFrame: ... + + +class HasArtifactStorage(Protocol): + """Protocol for classes that have artifact_storage with metadata_file_path.""" + + @property + def artifact_storage(self) -> Any: ... + + +def resolve_hf_token(token: str | None) -> str | None: + """Resolve the Hugging Face token from parameter or huggingface_hub. + + This function tries to resolve a token in the following order: + 1. Token provided as parameter + 2. huggingface_hub's get_token() (checks environment variables, cache, config file, etc.) + + Args: + token: Token provided as parameter. + + Returns: + Resolved token or None if not found. + """ + if token is not None: + return token + + try: + token = get_token() + if token: + return token + except Exception: + pass + + return None + + +def parse_size_category(num_records: int) -> str: + """Parse dataset size into Hugging Face size category. + + Uses the same category names as Argilla's size_categories_parser. + + Args: + num_records: Number of records in the dataset. + + Returns: + Size category string matching Hugging Face format (e.g., "n<1K", "1K1T" + + +def pydantic_to_dict(obj: Any) -> dict[str, Any]: + """Convert a Pydantic model to a dict, handling enum fields properly. + + Args: + obj: Pydantic model instance. + + Returns: + Dictionary representation of the object. + """ + if not hasattr(obj, "model_dump"): + return obj + + result = obj.model_dump(mode="json") + for key in ["column_type", "sampler_type"]: + if key not in result: + continue + value = result[key] + if isinstance(value, dict) and "value" in value: + continue + if hasattr(value, "value"): + result[key] = {"value": value.value} + elif isinstance(value, str): + result[key] = {"value": value} + return result + + +class HuggingFaceHubClient: + """Client for pushing and pulling datasets to/from Hugging Face Hub. + + This class encapsulates all Hugging Face Hub operations and can be composed + into other classes to provide hub functionality without using mixins. + """ + + def __init__( + self, + dataset_provider: HasDataset, + artifact_storage_provider: HasArtifactStorage | None = None, + analysis: DatasetProfilerResults | None = None, + config_builder: DataDesignerConfigBuilder | None = None, + ) -> None: + """Initialize the Hugging Face Hub client. + + Args: + dataset_provider: Object that provides the dataset via load_dataset(). + artifact_storage_provider: Object that provides artifact storage. + analysis: Optional analysis results for dataset card generation. + config_builder: Optional config builder for dataset card generation. + """ + self._dataset_provider = dataset_provider + self._artifact_storage_provider = artifact_storage_provider + self._analysis = analysis + self._config_builder = config_builder + + def push_to_hub( + self, + repo_id: str, + *, + token: str | None = None, + generate_card: bool = True, + **kwargs: Any, + ) -> None: + """Push the dataset to Hugging Face Hub. + + This method converts the pandas DataFrame to a HuggingFace Dataset, pushes it to + the Hugging Face Hub, and optionally generates and uploads a dataset card. + + Args: + repo_id: The ID of the Hugging Face Hub repository (e.g., "username/dataset-name"). + token: Hugging Face token for authentication. If None, will check environment + variables HF_TOKEN or HUGGINGFACE_HUB_TOKEN. + generate_card: Whether to generate and upload a dataset card. Defaults to True. + **kwargs: Additional arguments to pass to `dataset.push_to_hub()`. + + Raises: + ArtifactStorageError: If there's an error loading the dataset or metadata. + """ + resolved_token = resolve_hf_token(token) + dataset_df = self._dataset_provider.load_dataset() + hf_dataset = Dataset.from_pandas(dataset_df) + hf_dataset.push_to_hub(repo_id, token=resolved_token, **kwargs) + + if self._artifact_storage_provider: + self._upload_additional_artifacts(repo_id, resolved_token) + + if generate_card: + self._upload_dataset_card(repo_id, resolved_token, dataset_df) + + def _upload_additional_artifacts( + self, + repo_id: str, + token: str | None, + ) -> None: + """Upload additional artifacts to Hugging Face Hub. + + This includes: + - Analysis results (as JSON) + - Processor datasets (as parquet files) + - Processor artifacts (directories) + - Configuration files (column_configs.json, model_configs.json) + + Args: + repo_id: The ID of the Hugging Face Hub repository. + token: Hugging Face token for authentication. + """ + if not self._artifact_storage_provider: + return + + hf_api = HfApi(token=token) + artifact_storage = self._artifact_storage_provider.artifact_storage + + self._upload_analysis(hf_api, repo_id) + self._upload_processor_artifacts(hf_api, repo_id, artifact_storage) + self._upload_metadata(hf_api, repo_id, artifact_storage) + self._upload_config_files(hf_api, repo_id, artifact_storage) + + def _upload_analysis(self, hf_api: HfApi, repo_id: str) -> None: + """Upload analysis results as JSON. + + Args: + hf_api: Hugging Face API client. + repo_id: The ID of the Hugging Face Hub repository. + """ + if self._analysis is None: + return + + try: + analysis_json = self._analysis.model_dump(mode="json") + with TemporaryDirectory() as tmpdir: + analysis_path = Path(tmpdir) / "analysis.json" + with open(analysis_path, "w") as f: + json.dump(analysis_json, f, indent=2, default=str) + hf_api.upload_file( + path_or_fileobj=str(analysis_path), + path_in_repo="analysis.json", + repo_id=repo_id, + repo_type="dataset", + ) + except Exception as e: + logger.warning(f"Failed to upload analysis results: {e}") + + def _upload_processor_artifacts( + self, + hf_api: HfApi, + repo_id: str, + artifact_storage: Any, + ) -> None: + """Upload processor datasets and artifacts. + + Args: + hf_api: Hugging Face API client. + repo_id: The ID of the Hugging Face Hub repository. + artifact_storage: Artifact storage object. + """ + if not hasattr(artifact_storage, "processors_outputs_path"): + return + + processors_path = artifact_storage.processors_outputs_path + if not processors_path.exists(): + return + + for processor_dir in processors_path.iterdir(): + if not processor_dir.is_dir(): + continue + processor_name = processor_dir.name + self._upload_processor_dataset(hf_api, repo_id, processor_dir, processor_name) + self._upload_processor_files(hf_api, repo_id, processors_path, processor_dir, processor_name) + + def _upload_processor_dataset( + self, + hf_api: HfApi, + repo_id: str, + processor_dir: Path, + processor_name: str, + ) -> None: + """Upload a processor dataset as a parquet file. + + Args: + hf_api: Hugging Face API client. + repo_id: The ID of the Hugging Face Hub repository. + processor_dir: Directory containing the processor files. + processor_name: Name of the processor. + """ + parquet_files = list(processor_dir.glob("*.parquet")) + if not parquet_files: + return + + try: + dfs = [pd.read_parquet(f) for f in parquet_files] + combined_df = pd.concat(dfs, ignore_index=True) + + with TemporaryDirectory() as tmpdir: + processor_parquet = Path(tmpdir) / f"{processor_name}.parquet" + combined_df.to_parquet(processor_parquet, index=False) + hf_api.upload_file( + path_or_fileobj=str(processor_parquet), + path_in_repo=f"processors/{processor_name}.parquet", + repo_id=repo_id, + repo_type="dataset", + ) + except Exception as e: + logger.warning(f"Failed to upload processor dataset {processor_name}: {e}") + + def _upload_processor_files( + self, + hf_api: HfApi, + repo_id: str, + processors_path: Path, + processor_dir: Path, + processor_name: str, + ) -> None: + """Upload non-parquet files from a processor directory. + + Args: + hf_api: Hugging Face API client. + repo_id: The ID of the Hugging Face Hub repository. + processors_path: Base path for all processors. + processor_dir: Directory containing the processor files. + processor_name: Name of the processor. + """ + for artifact_file in processor_dir.rglob("*"): + if not artifact_file.is_file() or artifact_file.suffix == ".parquet": + continue + try: + relative_path = artifact_file.relative_to(processors_path) + hf_api.upload_file( + path_or_fileobj=str(artifact_file), + path_in_repo=f"processors/{relative_path.as_posix()}", + repo_id=repo_id, + repo_type="dataset", + ) + except Exception as e: + logger.warning(f"Failed to upload processor artifact {artifact_file}: {e}") + + def _upload_metadata( + self, + hf_api: HfApi, + repo_id: str, + artifact_storage: Any, + ) -> None: + """Upload metadata file with sanitized file paths. + + Args: + hf_api: Hugging Face API client. + repo_id: The ID of the Hugging Face Hub repository. + artifact_storage: Artifact storage object. + """ + if not hasattr(artifact_storage, "metadata_file_path"): + return + + metadata_path = artifact_storage.metadata_file_path + if not metadata_path.exists(): + return + + try: + with open(metadata_path, "r") as f: + metadata = json.load(f) + + sanitized_metadata = self._sanitize_metadata_file_paths(metadata, artifact_storage) + + with TemporaryDirectory() as tmpdir: + sanitized_metadata_path = Path(tmpdir) / "metadata.json" + with open(sanitized_metadata_path, "w") as f: + json.dump(sanitized_metadata, f, indent=2, default=str) + hf_api.upload_file( + path_or_fileobj=str(sanitized_metadata_path), + path_in_repo="metadata.json", + repo_id=repo_id, + repo_type="dataset", + ) + except Exception as e: + logger.warning(f"Failed to upload metadata: {e}") + + def _sanitize_metadata_file_paths(self, metadata: dict[str, Any], artifact_storage: Any) -> dict[str, Any]: + """Sanitize file paths in metadata by converting local paths to remote paths. + + Args: + metadata: Metadata dictionary that may contain file_paths. + artifact_storage: Artifact storage object. + + Returns: + Metadata dictionary with sanitized file paths. + """ + if "file_paths" not in metadata or not isinstance(metadata["file_paths"], list): + return metadata + + sanitized_paths = [] + base_path = artifact_storage.base_dataset_path + + for file_path in metadata["file_paths"]: + path_obj = Path(str(file_path)) + sanitized = None + + if path_obj.is_absolute(): + try: + relative_path = path_obj.relative_to(base_path) + sanitized = f"data/{relative_path.as_posix()}" + except ValueError: + pass + + if not sanitized: + path_str = str(file_path) + if "parquet-files" in path_str: + idx = path_str.find("parquet-files") + remaining = path_str[idx + len("parquet-files") :].lstrip("/\\") if idx != -1 else path_obj.name + sanitized = f"data/parquet-files/{remaining}" + else: + sanitized = f"data/{path_obj.name}" + + sanitized_paths.append(sanitized) + + result = metadata.copy() + if sanitized_paths: + result["file_paths"] = sanitized_paths + else: + result.pop("file_paths", None) + return result + + def _upload_config_files( + self, + hf_api: HfApi, + repo_id: str, + artifact_storage: Any, + ) -> None: + """Upload configuration files (column_configs.json, model_configs.json). + + Args: + hf_api: Hugging Face API client. + repo_id: The ID of the Hugging Face Hub repository. + artifact_storage: Artifact storage object. + """ + if not hasattr(artifact_storage, "base_dataset_path"): + return + + base_path = artifact_storage.base_dataset_path + config_files = ["column_configs.json", "model_configs.json"] + for config_file in config_files: + config_path = base_path / config_file + if config_path.exists(): + try: + hf_api.upload_file( + path_or_fileobj=str(config_path), + path_in_repo=config_file, + repo_id=repo_id, + repo_type="dataset", + ) + except Exception as e: + logger.warning(f"Failed to upload {config_file}: {e}") + + def _upload_dataset_card( + self, + repo_id: str, + token: str | None, + dataset_df: pd.DataFrame, + ) -> None: + """Generate and upload the dataset card to Hugging Face Hub. + + Args: + repo_id: The ID of the Hugging Face Hub repository. + token: Hugging Face token for authentication. + dataset_df: The dataset as a pandas DataFrame. + + Raises: + ArtifactStorageError: If analysis or config_builder is missing. + """ + if self._analysis is None or self._config_builder is None: + raise ArtifactStorageError( + "Cannot generate dataset card: missing analysis or config_builder. " + "Ensure the client was initialized with analysis and config_builder." + ) + + metadata = self._load_metadata_for_card() + template_variables = self._build_card_template_variables( + dataset_df=dataset_df, + analysis=self._analysis, + config_builder=self._config_builder, + metadata=metadata, + repo_id=repo_id, + ) + + card = self._create_dataset_card(dataset_df, template_variables) + self._save_and_upload_card(card, repo_id, token) + + def _load_metadata_for_card(self) -> dict[str, Any] | None: + """Load and sanitize metadata for dataset card generation. + + Returns: + Sanitized metadata dictionary or None if not available. + """ + if not self._artifact_storage_provider: + return None + + artifact_storage = self._artifact_storage_provider.artifact_storage + if not hasattr(artifact_storage, "metadata_file_path"): + return None + + metadata_path = artifact_storage.metadata_file_path + if not metadata_path.exists(): + return None + + try: + with open(metadata_path, "r") as f: + metadata = json.load(f) + return self._sanitize_metadata_file_paths(metadata, artifact_storage) + except Exception: + return None + + def _build_card_template_variables( + self, + dataset_df: pd.DataFrame, + analysis: DatasetProfilerResults, + config_builder: DataDesignerConfigBuilder, + metadata: dict[str, Any] | None, + repo_id: str, + ) -> dict[str, Any]: + """Build template variables for the dataset card. + + Args: + dataset_df: The dataset as a pandas DataFrame. + analysis: Profiling analysis results. + config_builder: Configuration builder. + metadata: Optional metadata dictionary. + repo_id: Repository ID. + + Returns: + Dictionary of template variables. + """ + column_configs = config_builder.get_column_configs() + column_names = set(dataset_df.columns) + + all_columns = self._build_column_info(dataset_df, column_names) + unconfigured_columns = self._find_unconfigured_columns(dataset_df, column_names, column_configs) + sample_records = self._build_sample_records(dataset_df) + config_types = self._build_config_types_summary(column_configs) + column_stats_by_type = self._build_column_stats_by_type(analysis) + + return { + "size_categories": parse_size_category(len(dataset_df)), + "num_records": len(dataset_df), + "target_num_records": analysis.target_num_records, + "percent_complete": analysis.percent_complete, + "num_columns": len(dataset_df.columns), + "repo_id": repo_id, + "metadata": metadata or {}, + "column_configs": [pydantic_to_dict(col_config) for col_config in column_configs] if column_configs else [], + "unconfigured_columns": unconfigured_columns, + "all_columns": all_columns, + "column_statistics": ( + [pydantic_to_dict(stat) for stat in analysis.column_statistics] if analysis.column_statistics else [] + ), + "column_stats_by_type": column_stats_by_type, + "sorted_column_types": self._sort_column_types(column_stats_by_type), + "num_samples": len(sample_records), + "sample_records": sample_records, + "config_types": config_types, + } + + def _build_column_info(self, dataset_df: pd.DataFrame, column_names: set[str]) -> dict[str, str]: + """Build column information dictionary with normalized types. + + Args: + dataset_df: The dataset as a pandas DataFrame. + column_names: Set of column names. + + Returns: + Dictionary mapping column names to their normalized types. + """ + all_columns: dict[str, str] = {} + for col_name in sorted(column_names): + try: + normalized_type = convert_pyarrow_dtype_to_simple_dtype(dataset_df[col_name].dtype.pyarrow_dtype) + except Exception: + normalized_type = str(dataset_df[col_name].dtype) + all_columns[col_name] = normalized_type + return all_columns + + def _find_unconfigured_columns( + self, + dataset_df: pd.DataFrame, + column_names: set[str], + column_configs: list[Any] | None, + ) -> dict[str, str]: + """Find columns that don't have configurations. + + Args: + dataset_df: The dataset as a pandas DataFrame. + column_names: Set of all column names. + column_configs: List of column configurations. + + Returns: + Dictionary mapping unconfigured column names to their types. + """ + if not column_configs: + return {} + + configured_names = {col.name for col in column_configs} + unconfigured = column_names - configured_names + return {col_name: str(dataset_df[col_name].dtype) for col_name in sorted(unconfigured)} + + def _build_sample_records(self, dataset_df: pd.DataFrame) -> list[dict[str, Any]]: + """Build sample records for the dataset card. + + Args: + dataset_df: The dataset as a pandas DataFrame. + + Returns: + List of sample records as dictionaries. + """ + num_samples = min(5, len(dataset_df)) + if num_samples == 0: + return [] + + sample_df = dataset_df.head(num_samples) + records = sample_df.to_dict(orient="records") + return [ + {k: v if isinstance(v, (str, int, float, bool, type(None))) else str(v) for k, v in record.items()} + for record in records + ] + + def _build_config_types_summary(self, column_configs: list[Any] | None) -> dict[str, int]: + """Build summary of configuration types. + + Args: + column_configs: List of column configurations. + + Returns: + Dictionary mapping config type names to counts. + """ + if not column_configs: + return {} + + config_types: dict[str, int] = {} + for col_config in column_configs: + config_type = type(col_config).__name__ + config_types[config_type] = config_types.get(config_type, 0) + 1 + return config_types + + def _build_column_stats_by_type(self, analysis: DatasetProfilerResults) -> dict[str, list[dict[str, Any]]]: + """Build column statistics grouped by type. + + Args: + analysis: Profiling analysis results. + + Returns: + Dictionary mapping column types to lists of statistics dictionaries. + """ + column_stats_by_type: dict[str, list[Any]] = {} + for column_type in analysis.column_types: + try: + column_type_enum = DataDesignerColumnType(column_type) + stats = analysis.get_column_statistics_by_type(column_type_enum) + if stats: + column_stats_by_type[column_type] = stats + except (ValueError, TypeError): + continue + + return { + col_type: [pydantic_to_dict(stat) for stat in stats_list] + for col_type, stats_list in column_stats_by_type.items() + } + + def _sort_column_types(self, column_stats_by_type: dict[str, list[dict[str, Any]]]) -> list[str]: + """Sort column types by display order. + + Args: + column_stats_by_type: Dictionary mapping column types to statistics. + + Returns: + Sorted list of column type names. + """ + display_order = get_column_display_order() + return sorted( + column_stats_by_type.keys(), + key=lambda x: display_order.index(x) if x in display_order else len(display_order), + ) + + def _create_dataset_card( + self, + dataset_df: pd.DataFrame, + template_variables: dict[str, Any], + ) -> DataDesignerDatasetCard: + """Create a dataset card from template variables. + + Args: + dataset_df: The dataset as a pandas DataFrame. + template_variables: Template variables for the card. + + Returns: + DataDesignerDatasetCard instance. + """ + tags_list = ["datadesigner", "synthetic"] + return DataDesignerDatasetCard.from_template( + card_data=DatasetCardData( + size_categories=parse_size_category(len(dataset_df)), + tags=tags_list, + ), + tags=tags_list, + **template_variables, + ) + + def _save_and_upload_card( + self, + card: DataDesignerDatasetCard, + repo_id: str, + token: str | None, + ) -> None: + """Save dataset card to temporary file and upload to hub. + + Args: + card: The dataset card to upload. + repo_id: The ID of the Hugging Face Hub repository. + token: Hugging Face token for authentication. + + Raises: + ArtifactStorageError: If card saving fails. + """ + with TemporaryDirectory() as tmpdir: + card_path = Path(tmpdir) / "README.md" + try: + card.save(filepath=str(card_path)) + except Exception as e: + raise ArtifactStorageError(f"Failed to save dataset card: {e}") from e + + HfApi(token=token).upload_file( + path_or_fileobj=str(card_path), + path_in_repo="README.md", + repo_id=repo_id, + repo_type="dataset", + ) + + @staticmethod + def pull_from_hub( + repo_id: str, + *, + token: str | None = None, + split: str | None = None, + include_analysis: bool = True, + include_processors: bool = True, + include_configs: bool = True, + **kwargs: Any, + ) -> HubDatasetResults: + """Load a dataset and all associated artifacts from Hugging Face Hub. + + This function loads a dataset from the Hugging Face Hub along with analysis results, + processor datasets, processor artifacts, and configuration files if available. + + Args: + repo_id: The ID of the Hugging Face Hub repository (e.g., "username/dataset-name"). + token: Hugging Face token for authentication. If None, will check environment + variables HF_TOKEN or HUGGINGFACE_HUB_TOKEN. + split: The split to load from the dataset. If None, the default split will be used. + include_analysis: Whether to load analysis results. Defaults to True. + include_processors: Whether to load processor datasets and artifacts. Defaults to True. + include_configs: Whether to load configuration files. Defaults to True. + **kwargs: Additional arguments to pass to `datasets.load_dataset()`. + + Returns: + A HubDatasetResults object containing the dataset and all associated artifacts. + """ + resolved_token = resolve_hf_token(token) + hf_dataset = HuggingFaceHubClient._load_dataset_from_hub(repo_id, split, resolved_token, **kwargs) + dataset_df = hf_dataset.to_pandas() + + analysis = None + if include_analysis: + analysis = HuggingFaceHubClient._load_analysis_from_hub(repo_id, resolved_token) + + processor_datasets = None + processor_artifacts = None + if include_processors: + processor_datasets, processor_artifacts = HuggingFaceHubClient._load_processors_from_hub( + repo_id, resolved_token + ) + + metadata = None + column_configs = None + model_configs = None + if include_configs: + metadata, column_configs, model_configs = HuggingFaceHubClient._load_configs_from_hub( + repo_id, resolved_token + ) + + return HubDatasetResults( + dataset=dataset_df, + analysis=analysis, + processor_datasets=processor_datasets, + processor_artifacts=processor_artifacts, + metadata=metadata, + column_configs=column_configs, + model_configs=model_configs, + ) + + @staticmethod + def _load_dataset_from_hub( + repo_id: str, + split: str | None, + token: str | None, + **kwargs: Any, + ) -> Dataset: + """Load the main dataset from Hugging Face Hub. + + Args: + repo_id: The ID of the Hugging Face Hub repository. + split: The split to load. If None, the first split will be used. + token: Hugging Face token for authentication. + **kwargs: Additional arguments to pass to load_dataset. + + Returns: + HuggingFace Dataset object. + """ + hf_dataset = load_dataset(repo_id, split=split, token=token, **kwargs) + + if isinstance(hf_dataset, (DatasetDict, dict)): + if split is None: + split = next(iter(hf_dataset.keys())) + hf_dataset = hf_dataset[split] + + return hf_dataset + + @staticmethod + def _load_analysis_from_hub( + repo_id: str, + token: str | None, + ) -> DatasetProfilerResults | None: + """Load analysis results from Hugging Face Hub. + + Args: + repo_id: The ID of the Hugging Face Hub repository. + token: Hugging Face token for authentication. + + Returns: + DatasetProfilerResults if available, None otherwise. + """ + try: + analysis_path = hf_hub_download( + repo_id=repo_id, + filename="analysis.json", + repo_type="dataset", + token=token, + ) + with open(analysis_path, "r") as f: + return DatasetProfilerResults.model_validate(json.load(f)) + except (HfHubHTTPError, FileNotFoundError, Exception): + return None + + @staticmethod + def _load_processors_from_hub( + repo_id: str, + token: str | None, + ) -> tuple[dict[str, pd.DataFrame] | None, dict[str, Path] | None]: + """Load processor datasets and artifacts from Hugging Face Hub. + + Args: + repo_id: The ID of the Hugging Face Hub repository. + token: Hugging Face token for authentication. + + Returns: + Tuple of (processor_datasets dict, processor_artifacts dict), or (None, None) if unavailable. + """ + try: + repo_files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=token) + processor_files = [f for f in repo_files if f.startswith("processors/")] + + processor_groups = HuggingFaceHubClient._group_processor_files(processor_files) + processor_datasets = HuggingFaceHubClient._download_processor_datasets(repo_id, token, processor_groups) + processor_artifacts = HuggingFaceHubClient._download_processor_artifacts(repo_id, token, processor_groups) + + return processor_datasets or None, processor_artifacts or None + except (HfHubHTTPError, FileNotFoundError, Exception): + return None, None + + @staticmethod + def _group_processor_files(processor_files: list[str]) -> dict[str, list[str]]: + """Group processor files by processor name. + + Args: + processor_files: List of file paths in the processors/ directory. + + Returns: + Dictionary mapping processor names to lists of file paths. + """ + processor_groups: dict[str, list[str]] = {} + for file_path in processor_files: + parts = file_path.replace("processors/", "").split("/") + processor_name = parts[0].replace(".parquet", "") + if processor_name not in processor_groups: + processor_groups[processor_name] = [] + processor_groups[processor_name].append(file_path) + return processor_groups + + @staticmethod + def _download_processor_datasets( + repo_id: str, + token: str | None, + processor_groups: dict[str, list[str]], + ) -> dict[str, pd.DataFrame]: + """Download processor datasets from the hub. + + Args: + repo_id: The ID of the Hugging Face Hub repository. + token: Hugging Face token for authentication. + processor_groups: Dictionary mapping processor names to file paths. + + Returns: + Dictionary mapping processor names to DataFrames. + """ + processor_datasets: dict[str, pd.DataFrame] = {} + for processor_name, files in processor_groups.items(): + parquet_file = next((f for f in files if f.endswith(".parquet")), None) + if parquet_file: + try: + local_path = hf_hub_download( + repo_id=repo_id, + filename=parquet_file, + repo_type="dataset", + token=token, + ) + processor_datasets[processor_name] = pd.read_parquet(local_path) + except Exception: + pass + return processor_datasets + + @staticmethod + def _download_processor_artifacts( + repo_id: str, + token: str | None, + processor_groups: dict[str, list[str]], + ) -> dict[str, Path]: + """Download processor artifacts from the hub. + + Args: + repo_id: The ID of the Hugging Face Hub repository. + token: Hugging Face token for authentication. + processor_groups: Dictionary mapping processor names to file paths. + + Returns: + Dictionary mapping processor names to artifact directory paths. + """ + processor_artifacts: dict[str, Path] = {} + for processor_name, files in processor_groups.items(): + other_files = [f for f in files if not f.endswith(".parquet")] + if other_files: + with TemporaryDirectory() as tmpdir: + artifact_dir = Path(tmpdir) / processor_name + artifact_dir.mkdir(parents=True, exist_ok=True) + + for artifact_file in other_files: + try: + local_path = hf_hub_download( + repo_id=repo_id, + filename=artifact_file, + repo_type="dataset", + token=token, + ) + relative_path = artifact_file.replace(f"processors/{processor_name}/", "") + if relative_path: + target_path = artifact_dir / relative_path + target_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(local_path, target_path) + except Exception: + pass + + if any(artifact_dir.rglob("*")): + processor_artifacts[processor_name] = artifact_dir + + return processor_artifacts + + @staticmethod + def _load_configs_from_hub( + repo_id: str, + token: str | None, + ) -> tuple[dict[str, Any] | None, list[dict[str, Any]] | None, list[dict[str, Any]] | None]: + """Load configuration files from Hugging Face Hub. + + Args: + repo_id: The ID of the Hugging Face Hub repository. + token: Hugging Face token for authentication. + + Returns: + Tuple of (metadata, column_configs, model_configs), with None values if unavailable. + """ + metadata = HuggingFaceHubClient._load_metadata_from_hub(repo_id, token) + column_configs = HuggingFaceHubClient._load_column_configs_from_hub(repo_id, token) + model_configs = HuggingFaceHubClient._load_model_configs_from_hub(repo_id, token) + + return metadata, column_configs, model_configs + + @staticmethod + def _load_metadata_from_hub(repo_id: str, token: str | None) -> dict[str, Any] | None: + """Load metadata from Hugging Face Hub. + + Args: + repo_id: The ID of the Hugging Face Hub repository. + token: Hugging Face token for authentication. + + Returns: + Metadata dictionary or None if unavailable. + """ + try: + metadata_path = hf_hub_download( + repo_id=repo_id, + filename="metadata.json", + repo_type="dataset", + token=token, + ) + with open(metadata_path, "r") as f: + return json.load(f) + except (HfHubHTTPError, FileNotFoundError, Exception): + return None + + @staticmethod + def _load_column_configs_from_hub(repo_id: str, token: str | None) -> list[dict[str, Any]] | None: + """Load column configurations from Hugging Face Hub. + + Args: + repo_id: The ID of the Hugging Face Hub repository. + token: Hugging Face token for authentication. + + Returns: + List of column config dictionaries or None if unavailable. + """ + try: + config_path = hf_hub_download( + repo_id=repo_id, + filename="column_configs.json", + repo_type="dataset", + token=token, + ) + with open(config_path, "r") as f: + raw_column_configs = json.load(f) + + column_configs = [] + for config in raw_column_configs: + if "columns" in config and isinstance(config["columns"], list): + column_configs.extend(config["columns"]) + else: + column_configs.append(config) + return column_configs + except (HfHubHTTPError, FileNotFoundError, Exception): + return None + + @staticmethod + def _load_model_configs_from_hub(repo_id: str, token: str | None) -> list[dict[str, Any]] | None: + """Load model configurations from Hugging Face Hub. + + Args: + repo_id: The ID of the Hugging Face Hub repository. + token: Hugging Face token for authentication. + + Returns: + List of model config dictionaries or None if unavailable. + """ + try: + config_path = hf_hub_download( + repo_id=repo_id, + filename="model_configs.json", + repo_type="dataset", + token=token, + ) + with open(config_path, "r") as f: + return json.load(f) + except (HfHubHTTPError, FileNotFoundError, Exception): + return None diff --git a/src/data_designer/integrations/huggingface/dataset_card.py b/src/data_designer/integrations/huggingface/dataset_card.py new file mode 100644 index 000000000..9b16b173d --- /dev/null +++ b/src/data_designer/integrations/huggingface/dataset_card.py @@ -0,0 +1,21 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from pathlib import Path + +from huggingface_hub import DatasetCard + +TEMPLATE_DATA_DESIGNER_DATASET_CARD_PATH = Path(__file__).parent / "dataset_card_template.md" + + +class DataDesignerDatasetCard(DatasetCard): + """Dataset card for NeMo Data Designer datasets. + + This class extends Hugging Face's DatasetCard with a custom template + specifically designed for Data Designer generated datasets. + The template is located at `data_designer/integrations/huggingface/dataset_card_template.md`. + """ + + default_template_path = TEMPLATE_DATA_DESIGNER_DATASET_CARD_PATH diff --git a/src/data_designer/integrations/huggingface/dataset_card_template.md b/src/data_designer/integrations/huggingface/dataset_card_template.md new file mode 100644 index 000000000..fd81c8d47 --- /dev/null +++ b/src/data_designer/integrations/huggingface/dataset_card_template.md @@ -0,0 +1,176 @@ +--- +size_categories: {{ size_categories }} +tags: +{% for tag in tags %} + - {{ tag }} +{% endfor %} +--- + +# Dataset Card + +This dataset was generated using **NeMo Data Designer**, a comprehensive framework for creating high-quality synthetic datasets from scratch or using seed data. + +## About NeMo Data Designer + +NeMo Data Designer is a general framework for generating high-quality synthetic data that goes beyond simple LLM prompting. It provides: + +- **Diverse data generation** using statistical samplers, LLMs, or existing seed datasets +- **Relationship control** between fields with dependency-aware generation +- **Quality validation** with built-in Python, SQL, and custom local and remote validators +- **LLM-as-a-judge** scoring for quality assessment +- **Fast iteration** with preview mode before full-scale generation + +For more information, visit: [https://github.com/NVIDIA-NeMo/DataDesigner](https://github.com/NVIDIA-NeMo/DataDesigner) (`pip install data-designer`) + +## Quick Start + +Load this dataset for fine-tuning: + +```python +from datasets import load_dataset + +dataset = load_dataset("{{ repo_id }}") +# Access the data +df = dataset["train"].to_pandas() +``` + +Or with NeMo Data Designer: + +```python +from data_designer.interface.results import DatasetCreationResults + +# Load dataset with all artifacts (analysis, configs, etc.) +results = DatasetCreationResults.pull_from_hub("{{ repo_id }}") + +# Access the dataset +df = results.load_dataset() + +# Access the analysis +analysis = results.load_analysis() + +# Access the config builder +config_builder = results.config_builder +``` + +## Dataset Summary + +- **Number of records**: {% if num_records is defined and num_records is not none %}{{ "{:,}".format(num_records) }}{% else %}N/A{% endif %} +- **Number of columns**: {{ num_columns }} +- **Size category**: {{ size_categories }} +{% if target_num_records is defined and target_num_records is not none and target_num_records != num_records %} +- **Target records**: {{ "{:,}".format(target_num_records) }} ({{ "%.1f" | format(percent_complete) if percent_complete is defined and percent_complete is not none else "N/A" }}% complete) +{% endif %} + +## Sample Data + +{% if num_samples > 0 %} +Here are sample records from the dataset: + +{% for idx in range(num_samples) %} +### Example {{ idx + 1 }} + +```json +{{ sample_records[idx] | tojson(indent=2) }} +``` +{% endfor %} +{% else %} +No sample records available. +{% endif %} + +## Schema + +{% if all_columns is defined and all_columns %} +| Column | Type | +|--------|------| +{% for col_name, dtype in all_columns | dictsort -%} +| `{{ col_name }}` | {{ dtype }} | +{% endfor -%} +{% else %} +No column information available. +{% endif %} + +## Data Quality + +{% if column_stats_by_type %} +### Column Statistics + +{% for col_type in sorted_column_types %} +{% set stats_list = column_stats_by_type[col_type] %} +{% if stats_list %} +{% set col_type_label = col_type.replace("_", " ").title().replace("Llm", "LLM") %} +#### {{ col_type_label }} Columns + +{% if col_type == "sampler" %} +| Column | Data Type | Unique Values | Sampler Type | +|--------|-----------|---------------|--------------| +{% for stat in stats_list -%} +| **{{ stat.get('column_name', 'unknown') }}** | {{ stat.get('simple_dtype', 'unknown') }} | {% if 'num_unique' in stat and stat['num_unique'] is not none %}{{ stat['num_unique'] }}{% else %}N/A{% endif %} ({% if 'num_unique' in stat and stat['num_unique'] is not none and num_records > 0 %}{{ "%.1f" | format((stat['num_unique'] / num_records * 100)) }}{% else %}0.0{% endif %}%) | {% if 'sampler_type' in stat and stat['sampler_type'] is not none %}{% set sampler_type = stat['sampler_type'] %}{% if sampler_type is mapping %}{{ sampler_type.get('value', 'N/A') }}{% else %}{{ sampler_type }}{% endif %}{% else %}N/A{% endif %} | +{% endfor -%} + +{% elif col_type in ["llm_text", "llm_structured", "llm_code", "llm_judge"] %} +| Column | Data Type | Unique Values | Prompt Tokens (avg) | Completion Tokens (avg) | +|--------|-----------|---------------|---------------------|--------------------------| +{% for stat in stats_list -%} +| **{{ stat.get('column_name', 'unknown') }}** | {{ stat.get('simple_dtype', 'unknown') }} | {% if 'num_unique' in stat and stat['num_unique'] is not none %}{{ stat['num_unique'] }}{% else %}N/A{% endif %} ({% if 'num_unique' in stat and stat['num_unique'] is not none and num_records > 0 %}{{ "%.1f" | format((stat['num_unique'] / num_records * 100)) }}{% else %}0.0{% endif %}%) | {% if 'prompt_tokens_mean' in stat and stat['prompt_tokens_mean'] is not none %}{{ "%.1f" | format(stat['prompt_tokens_mean']) }}{% else %}N/A{% endif %} ± {% if 'prompt_tokens_stddev' in stat and stat['prompt_tokens_stddev'] is not none %}{{ "%.1f" | format(stat['prompt_tokens_stddev']) }}{% else %}N/A{% endif %} | {% if 'completion_tokens_mean' in stat and stat['completion_tokens_mean'] is not none %}{{ "%.1f" | format(stat['completion_tokens_mean']) }}{% else %}N/A{% endif %} ± {% if 'completion_tokens_stddev' in stat and stat['completion_tokens_stddev'] is not none %}{{ "%.1f" | format(stat['completion_tokens_stddev']) }}{% else %}N/A{% endif %} | +{% endfor -%} + +{% else %} +| Column | Data Type | Unique Values | Null Values | +|--------|-----------|---------------|-------------| +{% for stat in stats_list -%} +| **{{ stat.get('column_name', 'unknown') }}** | {{ stat.get('simple_dtype', 'unknown') }} | {% if 'num_unique' in stat and stat['num_unique'] is not none %}{{ stat['num_unique'] }}{% else %}N/A{% endif %} ({% if 'num_unique' in stat and stat['num_unique'] is not none and num_records > 0 %}{{ "%.1f" | format((stat['num_unique'] / num_records * 100)) }}{% else %}0.0{% endif %}%) | {% if 'num_null' in stat and stat['num_null'] is not none %}{{ stat['num_null'] }}{% else %}0{% endif %} ({% if 'num_null' in stat and stat['num_null'] is not none and num_records > 0 %}{{ "%.1f" | format((stat['num_null'] / num_records * 100)) }}{% else %}0.0{% endif %}%) | +{% endfor -%} +{% endif %} +{% endif %} + +{% endfor %} +{% elif column_statistics %} +{% for stat in column_statistics[:10] %} +- **{{ stat.get('column_name', 'unknown') }}** ({{ stat.get('column_type', 'unknown') }}): {% if 'num_unique' in stat and stat['num_unique'] is not none %}{{ stat['num_unique'] }} unique values{% if num_records > 0 %} ({{ "%.1f" | format((stat['num_unique'] / num_records * 100)) }}% coverage){% endif %}{% else %}N/A{% endif %}{% if 'num_null' in stat and stat['num_null'] is not none and stat['num_null'] > 0 %}, {{ stat['num_null'] }} nulls{% endif %} +{% endfor %} +{% if column_statistics | length > 10 %} +*... and {{ (column_statistics | length) - 10 }} more columns* +{% endif %} +{% endif %} + +## Configuration Details + +{% if column_configs %} +This dataset was generated with {{ column_configs | length }} column configuration(s). + +### Generation Strategy + +{% for config_type, count in config_types | dictsort %} +- **{{ config_type }}**: {{ count }} column(s) +{% endfor %} + +### Column Configurations + +{% for col_config in column_configs %} +- **{{ col_config.get('name', 'unknown') }}**: {% set col_type = col_config.get('column_type') %}{% if col_type is mapping %}{{ col_type.get('value', 'unknown') }}{% elif col_type %}{{ col_type }}{% else %}unknown{% endif %} +{% endfor %} +{% else %} +No column configurations available. +{% endif %} + +{% if metadata %} +## Metadata + +```json +{{ metadata | tojson(indent=2) }} +``` +{% endif %} + +## Citation + +If you use this dataset in your research, please cite: + +```bibtex +@misc{nemo-data-designer, + author = {The NeMo Data Designer Team, NVIDIA}, + title = {NeMo Data Designer: A framework for generating synthetic data from scratch or based on your own seed data}, + howpublished = {\url{https://github.com/NVIDIA-NeMo/DataDesigner}}, + year = {2025}, + note = {GitHub Repository}, +} +``` diff --git a/src/data_designer/integrations/huggingface/hub_results.py b/src/data_designer/integrations/huggingface/hub_results.py new file mode 100644 index 000000000..ec37e5b52 --- /dev/null +++ b/src/data_designer/integrations/huggingface/hub_results.py @@ -0,0 +1,42 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import pandas as pd + +from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults + + +@dataclass +class HubDatasetResults: + """Results container for datasets pulled from Hugging Face Hub. + + This class contains the main dataset, analysis results, processor datasets, + and processor artifacts that were pushed to the hub. + """ + + dataset: pd.DataFrame + """The main dataset as a pandas DataFrame.""" + + analysis: DatasetProfilerResults | None = None + """Analysis results if available.""" + + processor_datasets: dict[str, pd.DataFrame] | None = None + """Dictionary of processor datasets, keyed by processor name.""" + + processor_artifacts: dict[str, Path] | None = None + """Dictionary of paths to processor artifacts, keyed by processor name.""" + + metadata: dict[str, Any] | None = None + """Metadata dictionary if available.""" + + column_configs: list[dict[str, Any]] | None = None + """Column configurations if available.""" + + model_configs: list[dict[str, Any]] | None = None + """Model configurations if available.""" diff --git a/src/data_designer/integrations/huggingface/reconstruction.py b/src/data_designer/integrations/huggingface/reconstruction.py new file mode 100644 index 000000000..4f832d00a --- /dev/null +++ b/src/data_designer/integrations/huggingface/reconstruction.py @@ -0,0 +1,270 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import json +import logging +import shutil +import tempfile +from pathlib import Path +from typing import Any + +from huggingface_hub import hf_hub_download + +from data_designer.config.config_builder import DataDesignerConfigBuilder +from data_designer.config.models import ModelConfig +from data_designer.engine.dataset_builders.artifact_storage import ArtifactStorage +from data_designer.engine.dataset_builders.errors import ArtifactStorageError +from data_designer.integrations.huggingface.client import resolve_hf_token +from data_designer.integrations.huggingface.hub_results import HubDatasetResults + +logger = logging.getLogger(__name__) + + +def reconstruct_dataset_creation_results( + hub_results: HubDatasetResults, + repo_id: str, + artifact_path: Path | str | None = None, + token: str | None = None, +) -> tuple[ArtifactStorage, DataDesignerConfigBuilder]: + """Reconstruct ArtifactStorage and DataDesignerConfigBuilder from hub results. + + This function downloads all artifacts from the Hugging Face Hub and reconstructs + the necessary components for a DatasetCreationResults object. + + Args: + hub_results: Results from pulling from the hub. + repo_id: The ID of the Hugging Face Hub repository. + artifact_path: Optional path to save downloaded artifacts. If None, a temporary + directory will be used. + token: Hugging Face token for authentication. + + Returns: + Tuple of (ArtifactStorage, DataDesignerConfigBuilder). + + Raises: + ArtifactStorageError: If analysis results are not found or reconstruction fails. + """ + if hub_results.analysis is None: + raise ArtifactStorageError("Cannot reconstruct DatasetCreationResults: analysis results not found in hub.") + + if artifact_path is None: + temp_dir = tempfile.mkdtemp(prefix="data_designer_hub_") + artifact_path = Path(temp_dir) + else: + artifact_path = Path(artifact_path) + artifact_path.mkdir(parents=True, exist_ok=True) + + dataset_name = "dataset" + artifact_storage = ArtifactStorage( + artifact_path=artifact_path, + dataset_name=dataset_name, + ) + base_path = artifact_storage.base_dataset_path + base_path.mkdir(parents=True, exist_ok=True) + + _save_main_dataset(hub_results, artifact_storage) + _save_metadata(hub_results, base_path) + _save_processor_datasets(hub_results, base_path) + _save_processor_artifacts(hub_results, base_path) + _save_config_files(hub_results, base_path) + + config_builder = _reconstruct_config_builder(hub_results, repo_id, token) + + return artifact_storage, config_builder + + +def _save_main_dataset(hub_results: HubDatasetResults, artifact_storage: ArtifactStorage) -> None: + """Save the main dataset as parquet files. + + Args: + hub_results: Results from pulling from the hub. + artifact_storage: Artifact storage object. + """ + final_dataset_path = artifact_storage.final_dataset_path + final_dataset_path.mkdir(parents=True, exist_ok=True) + hub_results.dataset.to_parquet(final_dataset_path / "data.parquet", index=False) + + +def _save_metadata(hub_results: HubDatasetResults, base_path: Path) -> None: + """Save metadata if available. + + Args: + hub_results: Results from pulling from the hub. + base_path: Base path for artifacts. + """ + if hub_results.metadata: + with open(base_path / "metadata.json", "w") as f: + json.dump(hub_results.metadata, f, indent=2) + + +def _save_processor_datasets(hub_results: HubDatasetResults, base_path: Path) -> None: + """Save processor datasets if available. + + Args: + hub_results: Results from pulling from the hub. + base_path: Base path for artifacts. + """ + if not hub_results.processor_datasets: + return + + processors_path = base_path / "processors-files" + processors_path.mkdir(parents=True, exist_ok=True) + for processor_name, processor_df in hub_results.processor_datasets.items(): + processor_dir = processors_path / processor_name + processor_dir.mkdir(parents=True, exist_ok=True) + processor_df.to_parquet(processor_dir / f"{processor_name}.parquet", index=False) + + +def _save_processor_artifacts(hub_results: HubDatasetResults, base_path: Path) -> None: + """Save processor artifacts if available. + + Args: + hub_results: Results from pulling from the hub. + base_path: Base path for artifacts. + """ + if not hub_results.processor_artifacts: + return + + processors_path = base_path / "processors-files" + processors_path.mkdir(parents=True, exist_ok=True) + for processor_name, artifact_dir in hub_results.processor_artifacts.items(): + if not artifact_dir.exists(): + continue + target_dir = processors_path / processor_name + if target_dir.exists(): + shutil.rmtree(target_dir) + shutil.copytree(artifact_dir, target_dir) + + +def _save_config_files(hub_results: HubDatasetResults, base_path: Path) -> None: + """Save configuration files if available. + + Args: + hub_results: Results from pulling from the hub. + base_path: Base path for artifacts. + """ + if hub_results.column_configs: + with open(base_path / "column_configs.json", "w") as f: + json.dump(hub_results.column_configs, f, indent=2) + + if hub_results.model_configs: + with open(base_path / "model_configs.json", "w") as f: + json.dump(hub_results.model_configs, f, indent=2) + + +def _reconstruct_config_builder( + hub_results: HubDatasetResults, + repo_id: str, + token: str | None, +) -> DataDesignerConfigBuilder: + """Reconstruct the config builder from hub results or hub files. + + Args: + hub_results: Results from pulling from the hub. + repo_id: The ID of the Hugging Face Hub repository. + token: Hugging Face token for authentication. + + Returns: + DataDesignerConfigBuilder instance. + """ + if hub_results.column_configs and hub_results.model_configs: + model_configs = [ModelConfig.model_validate(mc) for mc in hub_results.model_configs] + config_builder = DataDesignerConfigBuilder(model_configs=model_configs) + column_config_class_mapping = _get_column_config_class_mapping() + + for col_config_dict in hub_results.column_configs: + configs_to_add = ( + col_config_dict["columns"] + if "columns" in col_config_dict and isinstance(col_config_dict["columns"], list) + else [col_config_dict] + ) + for single_col_config_dict in configs_to_add: + col_config = _load_column_config(single_col_config_dict, column_config_class_mapping) + if col_config is not None: + config_builder.add_column(col_config) + + return config_builder + + resolved_token = resolve_hf_token(token) + try: + model_configs_path = hf_hub_download( + repo_id=repo_id, + filename="model_configs.json", + repo_type="dataset", + token=resolved_token, + ) + with open(model_configs_path, "r") as f: + model_configs_data = json.load(f) + model_configs = [ModelConfig.model_validate(mc) for mc in model_configs_data] + return DataDesignerConfigBuilder(model_configs=model_configs) + except Exception: + return DataDesignerConfigBuilder() + + +def _get_column_config_class_mapping() -> dict[str, type[Any]]: + """Build a mapping from column_type string to config class dynamically. + + Returns: + Dictionary mapping column type strings to config classes. + """ + from data_designer.config.column_configs import ( + ExpressionColumnConfig, + LLMCodeColumnConfig, + LLMJudgeColumnConfig, + LLMStructuredColumnConfig, + LLMTextColumnConfig, + SamplerColumnConfig, + SeedDatasetColumnConfig, + ValidationColumnConfig, + ) + from data_designer.plugin_manager import PluginManager + + mapping: dict[str, type[Any]] = { + "sampler": SamplerColumnConfig, + "llm_text": LLMTextColumnConfig, + "llm_structured": LLMStructuredColumnConfig, + "llm_code": LLMCodeColumnConfig, + "llm_judge": LLMJudgeColumnConfig, + "expression": ExpressionColumnConfig, + "seed_dataset": SeedDatasetColumnConfig, + "validation": ValidationColumnConfig, + } + + plugin_manager = PluginManager() + for plugin in plugin_manager.get_column_generator_plugins(): + mapping[plugin.name] = plugin.config_cls + + return mapping + + +def _load_column_config( + col_config_dict: dict[str, Any], + column_config_class_mapping: dict[str, type[Any]], +) -> Any | None: + """Load a single column config from dict using dynamic class mapping. + + Args: + col_config_dict: Dictionary representation of column config. + column_config_class_mapping: Mapping from column type to config class. + + Returns: + Column config instance or None if loading fails. + """ + column_type = col_config_dict.get("column_type") + if not column_type: + return None + + config_class = column_config_class_mapping.get(column_type) + if config_class is None: + logger.warning( + f"Skipping column config with unknown type '{column_type}': {col_config_dict.get('name', 'unknown')}" + ) + return None + + try: + return config_class.model_validate(col_config_dict) + except Exception as e: + logger.warning(f"Failed to load column config '{col_config_dict.get('name', 'unknown')}': {e}. Skipping.") + return None diff --git a/src/data_designer/interface/huggingface/__init__.py b/src/data_designer/interface/huggingface/__init__.py new file mode 100644 index 000000000..b225ea5e7 --- /dev/null +++ b/src/data_designer/interface/huggingface/__init__.py @@ -0,0 +1,17 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# Backward compatibility: re-export from new location +from data_designer.integrations.huggingface import ( + HubDatasetResults, + HuggingFaceHubClient, + resolve_hf_token, +) + +# For backward compatibility, provide pull_from_hub as a function +pull_from_hub = HuggingFaceHubClient.pull_from_hub + +# Legacy alias for mixin (deprecated, use HuggingFaceHubClient instead) +HuggingFaceHubMixin = HuggingFaceHubClient + +__all__ = ["HuggingFaceHubMixin", "HuggingFaceHubClient", "pull_from_hub", "HubDatasetResults", "resolve_hf_token"] diff --git a/src/data_designer/interface/results.py b/src/data_designer/interface/results.py index 263173a70..824d1a739 100644 --- a/src/data_designer/interface/results.py +++ b/src/data_designer/interface/results.py @@ -4,6 +4,7 @@ from __future__ import annotations from pathlib import Path +from typing import Any import pandas as pd @@ -12,6 +13,10 @@ from data_designer.config.utils.visualization import WithRecordSamplerMixin from data_designer.engine.dataset_builders.artifact_storage import ArtifactStorage from data_designer.engine.dataset_builders.errors import ArtifactStorageError +from data_designer.integrations.huggingface import ( + HuggingFaceHubClient, + reconstruct_dataset_creation_results, +) class DatasetCreationResults(WithRecordSamplerMixin): @@ -39,6 +44,12 @@ def __init__( self.artifact_storage = artifact_storage self._analysis = analysis self._config_builder = config_builder + self._hub_client = HuggingFaceHubClient( + dataset_provider=self, + artifact_storage_provider=self, + analysis=analysis, + config_builder=config_builder, + ) def load_analysis(self) -> DatasetProfilerResults: """Load the profiling analysis results for the generated dataset. @@ -49,6 +60,15 @@ def load_analysis(self) -> DatasetProfilerResults: """ return self._analysis + @property + def config_builder(self) -> DataDesignerConfigBuilder: + """Load the configuration builder used to create the dataset. + + Returns: + DataDesignerConfigBuilder containing the configuration used to create the dataset. + """ + return self._config_builder + def load_dataset(self) -> pd.DataFrame: """Load the generated dataset as a pandas DataFrame. @@ -89,3 +109,103 @@ def get_path_to_processor_artifacts(self, processor_name: str) -> Path: if not self.artifact_storage.processors_outputs_path.exists(): raise ArtifactStorageError(f"Processor {processor_name} has no artifacts.") return self.artifact_storage.processors_outputs_path / processor_name + + def push_to_hub( + self, + repo_id: str, + *, + token: str | None = None, + generate_card: bool = True, + **kwargs: Any, + ) -> None: + """Push the dataset to Hugging Face Hub. + + This method converts the pandas DataFrame to a HuggingFace Dataset, pushes it to + the Hugging Face Hub, and optionally generates and uploads a dataset card. + + Args: + repo_id: The ID of the Hugging Face Hub repository (e.g., "username/dataset-name"). + token: Hugging Face token for authentication. If None, will check environment + variables HF_TOKEN or HUGGINGFACE_HUB_TOKEN. + generate_card: Whether to generate and upload a dataset card. Defaults to True. + **kwargs: Additional arguments to pass to `dataset.push_to_hub()`. + + Raises: + ArtifactStorageError: If there's an error loading the dataset or metadata. + """ + self._hub_client.push_to_hub( + repo_id=repo_id, + token=token, + generate_card=generate_card, + **kwargs, + ) + + @classmethod + def pull_from_hub( + cls, + repo_id: str, + *, + token: str | None = None, + artifact_path: Path | str | None = None, + split: str | None = None, + **kwargs: Any, + ) -> DatasetCreationResults: + """Load a dataset and all artifacts from Hugging Face Hub as a DatasetCreationResults object. + + This classmethod downloads all artifacts from the Hugging Face Hub and reconstructs + a DatasetCreationResults object that can be used just like one created from a local + dataset generation run. + + Args: + repo_id: The ID of the Hugging Face Hub repository (e.g., "username/dataset-name"). + token: Hugging Face token for authentication. If None, will check environment + variables HF_TOKEN or HUGGINGFACE_HUB_TOKEN. + artifact_path: Optional path to save downloaded artifacts. If None, a temporary + directory will be used (note: temporary directories are cleaned up when + the object is garbage collected). + split: The split to load from the dataset. If None, the default split will be used. + **kwargs: Additional arguments to pass to `pull_from_hub()` function. + + Returns: + A DatasetCreationResults object containing the dataset, analysis, and all artifacts. + + Example: + ```python + from data_designer.interface.results import DatasetCreationResults + + # Load from hub (uses temporary directory) + results = DatasetCreationResults.pull_from_hub("username/dataset-name") + + # Load to a specific directory + results = DatasetCreationResults.pull_from_hub( + "username/dataset-name", + artifact_path="./downloaded_datasets/my_dataset" + ) + + # Access the dataset and analysis + df = results.load_dataset() + analysis = results.load_analysis() + ``` + """ + hub_results = HuggingFaceHubClient.pull_from_hub( + repo_id=repo_id, + token=token, + split=split, + include_analysis=True, + include_processors=True, + include_configs=True, + **kwargs, + ) + + artifact_storage, config_builder = reconstruct_dataset_creation_results( + hub_results=hub_results, + repo_id=repo_id, + artifact_path=artifact_path, + token=token, + ) + + return cls( + artifact_storage=artifact_storage, + analysis=hub_results.analysis, + config_builder=config_builder, + ) diff --git a/tests/integrations/huggingface/test_client.py b/tests/integrations/huggingface/test_client.py new file mode 100644 index 000000000..a626ac6cf --- /dev/null +++ b/tests/integrations/huggingface/test_client.py @@ -0,0 +1,433 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for HuggingFaceHubClient.""" + +import json +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import Mock, patch + +import pandas as pd +import pytest + +from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults +from data_designer.config.config_builder import DataDesignerConfigBuilder +from data_designer.engine.dataset_builders.artifact_storage import ArtifactStorage +from data_designer.integrations.huggingface.client import ( + HuggingFaceHubClient, + parse_size_category, + pydantic_to_dict, + resolve_hf_token, +) + + +class TestResolveHfToken: + """Tests for resolve_hf_token function.""" + + def test_resolve_token_provided(self) -> None: + """Test that provided token is returned.""" + assert resolve_hf_token("test-token") == "test-token" + + def test_resolve_token_from_hub(self) -> None: + """Test that token is resolved from huggingface_hub.""" + with patch("data_designer.integrations.huggingface.client.get_token", return_value="hub-token"): + assert resolve_hf_token(None) == "hub-token" + + def test_resolve_token_none(self) -> None: + """Test that None is returned when no token is available.""" + with patch("data_designer.integrations.huggingface.client.get_token", return_value=None): + assert resolve_hf_token(None) is None + + def test_resolve_token_exception(self) -> None: + """Test that exceptions are handled gracefully.""" + with patch("data_designer.integrations.huggingface.client.get_token", side_effect=Exception()): + assert resolve_hf_token(None) is None + + +class TestParseSizeCategory: + """Tests for parse_size_category function.""" + + def test_small_dataset(self) -> None: + """Test small dataset category.""" + assert parse_size_category(500) == "n<1K" + + def test_medium_dataset(self) -> None: + """Test medium dataset category.""" + assert parse_size_category(5000) == "1K None: + """Test large dataset category.""" + assert parse_size_category(500000) == "100K None: + """Test very large dataset category.""" + assert parse_size_category(50000000) == "10M None: + """Test extremely large dataset category.""" + assert parse_size_category(5000000000) == "1B1T" + + +class TestPydanticToDict: + """Tests for pydantic_to_dict function.""" + + def test_pydantic_model(self) -> None: + """Test conversion of Pydantic model.""" + from pydantic import BaseModel + + class TestModel(BaseModel): + name: str + value: int + + obj = TestModel(name="test", value=42) + result = pydantic_to_dict(obj) + assert result == {"name": "test", "value": 42} + + def test_non_pydantic_object(self) -> None: + """Test that non-Pydantic objects are returned as-is.""" + obj = {"key": "value"} + assert pydantic_to_dict(obj) == obj + + +class TestHuggingFaceHubClient: + """Tests for HuggingFaceHubClient class.""" + + @pytest.fixture + def mock_dataset_provider(self) -> Mock: + """Create a mock dataset provider.""" + provider = Mock() + provider.load_dataset.return_value = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]}) + return provider + + @pytest.fixture + def mock_artifact_storage(self) -> Mock: + """Create a mock artifact storage.""" + storage = Mock(spec=ArtifactStorage) + storage.base_dataset_path = Path("/tmp/test") + storage.processors_outputs_path = Path("/tmp/test/processors") + storage.metadata_file_path = Path("/tmp/test/metadata.json") + return storage + + @pytest.fixture + def mock_artifact_storage_provider(self, mock_artifact_storage: Mock) -> Mock: + """Create a mock artifact storage provider.""" + provider = Mock() + provider.artifact_storage = mock_artifact_storage + return provider + + @pytest.fixture + def mock_analysis(self) -> Mock: + """Create a mock analysis.""" + analysis = Mock(spec=DatasetProfilerResults) + analysis.num_records = 3 + analysis.target_num_records = 3 + analysis.percent_complete = 100.0 + analysis.column_types = [] + analysis.column_statistics = [] + analysis.get_column_statistics_by_type.return_value = [] + return analysis + + @pytest.fixture + def mock_config_builder(self) -> Mock: + """Create a mock config builder.""" + builder = Mock(spec=DataDesignerConfigBuilder) + builder.get_column_configs.return_value = [] + return builder + + @pytest.fixture + def client( + self, + mock_dataset_provider: Mock, + mock_artifact_storage_provider: Mock, + mock_analysis: Mock, + mock_config_builder: Mock, + ) -> HuggingFaceHubClient: + """Create a HuggingFaceHubClient instance.""" + return HuggingFaceHubClient( + dataset_provider=mock_dataset_provider, + artifact_storage_provider=mock_artifact_storage_provider, + analysis=mock_analysis, + config_builder=mock_config_builder, + ) + + def test_init(self, client: HuggingFaceHubClient) -> None: + """Test client initialization.""" + assert client._dataset_provider is not None + assert client._artifact_storage_provider is not None + assert client._analysis is not None + assert client._config_builder is not None + + @patch("data_designer.integrations.huggingface.client.Dataset") + @patch("data_designer.integrations.huggingface.client.resolve_hf_token") + def test_push_to_hub_basic( + self, + mock_resolve_token: Mock, + mock_dataset_class: Mock, + client: HuggingFaceHubClient, + ) -> None: + """Test basic push_to_hub functionality.""" + mock_resolve_token.return_value = "test-token" + mock_hf_dataset = Mock() + mock_dataset_class.from_pandas.return_value = mock_hf_dataset + + with patch.object(client, "_upload_additional_artifacts"), patch.object(client, "_upload_dataset_card"): + client.push_to_hub("test-user/test-dataset", token="test-token", generate_card=False) + + mock_dataset_class.from_pandas.assert_called_once() + mock_hf_dataset.push_to_hub.assert_called_once_with("test-user/test-dataset", token="test-token") + + @patch("data_designer.integrations.huggingface.client.HfApi") + def test_upload_analysis( + self, + mock_hf_api_class: Mock, + client: HuggingFaceHubClient, + mock_analysis: Mock, + ) -> None: + """Test uploading analysis results.""" + mock_hf_api = Mock() + mock_hf_api_class.return_value = mock_hf_api + mock_analysis.model_dump.return_value = {"num_records": 3} + + client._upload_analysis(mock_hf_api, "test-user/test-dataset") + + mock_hf_api.upload_file.assert_called_once() + call_args = mock_hf_api.upload_file.call_args + assert call_args.kwargs["path_in_repo"] == "analysis.json" + assert call_args.kwargs["repo_id"] == "test-user/test-dataset" + + @patch("data_designer.integrations.huggingface.client.HfApi") + def test_upload_analysis_none( + self, + mock_hf_api_class: Mock, + mock_dataset_provider: Mock, + mock_artifact_storage_provider: Mock, + mock_config_builder: Mock, + ) -> None: + """Test uploading analysis when analysis is None.""" + client = HuggingFaceHubClient( + dataset_provider=mock_dataset_provider, + artifact_storage_provider=mock_artifact_storage_provider, + analysis=None, + config_builder=mock_config_builder, + ) + mock_hf_api = Mock() + mock_hf_api_class.return_value = mock_hf_api + + client._upload_analysis(mock_hf_api, "test-user/test-dataset") + + mock_hf_api.upload_file.assert_not_called() + + def test_sanitize_metadata_file_paths_absolute( + self, + client: HuggingFaceHubClient, + mock_artifact_storage: Mock, + ) -> None: + """Test sanitizing absolute file paths.""" + mock_artifact_storage.base_dataset_path = Path("/base/path") + metadata = { + "file_paths": [ + "/base/path/data/file1.parquet", + "/base/path/data/file2.parquet", + ] + } + + result = client._sanitize_metadata_file_paths(metadata, mock_artifact_storage) + + assert result["file_paths"] == ["data/data/file1.parquet", "data/data/file2.parquet"] + + def test_sanitize_metadata_file_paths_parquet_files( + self, + client: HuggingFaceHubClient, + mock_artifact_storage: Mock, + ) -> None: + """Test sanitizing parquet file paths.""" + mock_artifact_storage.base_dataset_path = Path("/base/path") + metadata = { + "file_paths": [ + "/some/path/parquet-files/file1.parquet", + ] + } + + result = client._sanitize_metadata_file_paths(metadata, mock_artifact_storage) + + assert result["file_paths"] == ["data/parquet-files/file1.parquet"] + + def test_sanitize_metadata_file_paths_no_file_paths( + self, + client: HuggingFaceHubClient, + mock_artifact_storage: Mock, + ) -> None: + """Test sanitizing metadata without file_paths.""" + metadata = {"other_key": "value"} + + result = client._sanitize_metadata_file_paths(metadata, mock_artifact_storage) + + assert result == metadata + + def test_build_column_info(self, client: HuggingFaceHubClient) -> None: + """Test building column information.""" + df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]}) + column_names = set(df.columns) + + result = client._build_column_info(df, column_names) + + assert "col1" in result + assert "col2" in result + assert isinstance(result["col1"], str) + assert isinstance(result["col2"], str) + + def test_find_unconfigured_columns(self, client: HuggingFaceHubClient) -> None: + """Test finding unconfigured columns.""" + df = pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"], "col3": [1.0, 2.0]}) + column_names = {"col1", "col2", "col3"} + mock_config1 = Mock() + mock_config1.name = "col1" + mock_config2 = Mock() + mock_config2.name = "col2" + mock_configs = [mock_config1, mock_config2] + + result = client._find_unconfigured_columns(df, column_names, mock_configs) + + assert "col3" in result + assert "col1" not in result + assert "col2" not in result + assert isinstance(result["col3"], str) + + def test_build_sample_records(self, client: HuggingFaceHubClient) -> None: + """Test building sample records.""" + df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]}) + + result = client._build_sample_records(df) + + assert len(result) == 3 + assert all(isinstance(r, dict) for r in result) + assert result[0]["col1"] == 1 + assert result[0]["col2"] == "a" + + def test_build_sample_records_empty(self, client: HuggingFaceHubClient) -> None: + """Test building sample records from empty dataset.""" + df = pd.DataFrame() + + result = client._build_sample_records(df) + + assert result == [] + + def test_build_config_types_summary(self, client: HuggingFaceHubClient) -> None: + """Test building config types summary.""" + + class Config1: + pass + + class Config2: + pass + + mock_configs = [ + type("Config1Instance", (Config1,), {})(), + type("Config1Instance2", (Config1,), {})(), + type("Config2Instance", (Config2,), {})(), + ] + + result = client._build_config_types_summary(mock_configs) + + assert result["Config1Instance"] == 1 + assert result["Config1Instance2"] == 1 + assert result["Config2Instance"] == 1 + + @patch("data_designer.integrations.huggingface.client.load_dataset") + @patch("data_designer.integrations.huggingface.client.resolve_hf_token") + def test_pull_from_hub_basic( + self, + mock_resolve_token: Mock, + mock_load_dataset: Mock, + ) -> None: + """Test basic pull_from_hub functionality.""" + mock_resolve_token.return_value = "test-token" + mock_hf_dataset = Mock() + mock_hf_dataset.to_pandas.return_value = pd.DataFrame({"col1": [1, 2, 3]}) + mock_load_dataset.return_value = mock_hf_dataset + + with ( + patch.object(HuggingFaceHubClient, "_load_analysis_from_hub", return_value=None), + patch.object(HuggingFaceHubClient, "_load_processors_from_hub", return_value=(None, None)), + patch.object(HuggingFaceHubClient, "_load_configs_from_hub", return_value=(None, None, None)), + ): + result = HuggingFaceHubClient.pull_from_hub( + "test-user/test-dataset", + token="test-token", + include_analysis=False, + include_processors=False, + include_configs=False, + ) + + assert result.dataset is not None + assert len(result.dataset) == 3 + + @patch("data_designer.integrations.huggingface.client.hf_hub_download") + def test_load_analysis_from_hub_success( + self, + mock_hf_hub_download: Mock, + ) -> None: + """Test loading analysis from hub successfully.""" + with TemporaryDirectory() as tmpdir: + analysis_path = Path(tmpdir) / "analysis.json" + analysis_data = { + "num_records": 10, + "target_num_records": 10, + "column_statistics": [ + { + "column_name": "test_col", + "num_records": 10, + "num_null": 0, + "num_unique": 5, + "pyarrow_dtype": "string", + "simple_dtype": "string", + "column_type": "general", + } + ], + } + with open(analysis_path, "w") as f: + json.dump(analysis_data, f) + + mock_hf_hub_download.return_value = str(analysis_path) + + result = HuggingFaceHubClient._load_analysis_from_hub("test-user/test-dataset", "test-token") + + assert result is not None + assert result.num_records == 10 + assert len(result.column_statistics) == 1 + + @patch("data_designer.integrations.huggingface.client.hf_hub_download") + def test_load_analysis_from_hub_not_found( + self, + mock_hf_hub_download: Mock, + ) -> None: + """Test loading analysis when file is not found.""" + from huggingface_hub.utils import HfHubHTTPError + + mock_hf_hub_download.side_effect = HfHubHTTPError("Not found", response=Mock(status_code=404)) + + result = HuggingFaceHubClient._load_analysis_from_hub("test-user/test-dataset", "test-token") + + assert result is None + + def test_group_processor_files(self) -> None: + """Test grouping processor files by processor name.""" + processor_files = [ + "processors/processor1/file1.parquet", + "processors/processor1/file2.txt", + "processors/processor2/file3.parquet", + ] + + result = HuggingFaceHubClient._group_processor_files(processor_files) + + assert "processor1" in result + assert "processor2" in result + assert len(result["processor1"]) == 2 + assert len(result["processor2"]) == 1 diff --git a/tests/interface/test_hub_integration.py b/tests/interface/test_hub_integration.py new file mode 100644 index 000000000..cb41e36f4 --- /dev/null +++ b/tests/interface/test_hub_integration.py @@ -0,0 +1,355 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Integration tests for Hugging Face Hub push/pull functionality.""" + +import json +import tempfile +from unittest.mock import MagicMock, patch + +import pandas as pd +import pytest + +from data_designer.essentials import ( + CategorySamplerParams, + DataDesigner, + DataDesignerConfigBuilder, + LLMTextColumnConfig, + SamplerColumnConfig, + SamplerType, +) +from data_designer.integrations.huggingface import HuggingFaceHubClient +from data_designer.interface.results import DatasetCreationResults + + +@pytest.fixture +def stub_model_configs(): + """Mock model configs for testing.""" + from data_designer.config.models import InferenceParameters, ModelConfig + + return [ + ModelConfig( + alias="nvidia-text", + model="nvidia/nvidia-nemotron-nano-9b-v2", + provider="nvidia", + inference_parameters=InferenceParameters( + temperature=0.5, + top_p=1.0, + max_tokens=1024, + ), + ) + ] + + +@pytest.fixture +def sample_dataset_config(stub_model_configs): + """Create a sample dataset configuration matching the README example.""" + config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) + + # Add a product category + config_builder.add_column( + SamplerColumnConfig( + name="product_category", + sampler_type=SamplerType.CATEGORY, + params=CategorySamplerParams( + values=["Electronics", "Clothing", "Home & Kitchen", "Books"], + ), + ) + ) + + # For integration tests, we'll mock LLM calls, but keep the config structure + # Generate personalized customer reviews (will be mocked in tests) + config_builder.add_column( + LLMTextColumnConfig( + name="review", + model_alias="nvidia-text", + prompt="""Write a brief product review for a {{ product_category }} item you recently purchased.""", + ) + ) + + return config_builder + + +@pytest.fixture +def simple_dataset_config(): + """Create a simple dataset configuration without LLM calls for faster testing.""" + config_builder = DataDesignerConfigBuilder() + + # Add a product category + config_builder.add_column( + SamplerColumnConfig( + name="product_category", + sampler_type=SamplerType.CATEGORY, + params=CategorySamplerParams( + values=["Electronics", "Clothing", "Home & Kitchen", "Books"], + ), + ) + ) + + # Add a simple numeric column + from data_designer.config.sampler_params import UniformSamplerParams + + config_builder.add_column( + SamplerColumnConfig( + name="rating", + sampler_type=SamplerType.UNIFORM, + params=UniformSamplerParams(low=1, high=5), + ) + ) + + return config_builder + + +@pytest.mark.integration +@patch("data_designer.integrations.huggingface.client.Dataset") +@patch("data_designer.integrations.huggingface.client.HfApi") +@patch("data_designer.integrations.huggingface.client.load_dataset") +def test_push_and_pull_from_hub_integration( + mock_load_dataset, + mock_hf_api_class, + mock_dataset_class, + simple_dataset_config, + tmp_path, +): + """Integration test: create dataset, push to hub, pull from hub, verify round-trip.""" + # Initialize DataDesigner + data_designer = DataDesigner() + + # Create a small dataset (10 records for testing) - using simple config without LLM + num_records = 10 + results = data_designer.create(config_builder=simple_dataset_config, num_records=num_records) + + # Verify dataset was created + original_df = results.load_dataset() + assert len(original_df) == num_records + assert "product_category" in original_df.columns + assert "rating" in original_df.columns + + # Get original analysis + original_analysis = results.load_analysis() + + # Mock Hugging Face Hub interactions + mock_hf_dataset = MagicMock() + mock_dataset_class.from_pandas.return_value = mock_hf_dataset + + mock_hf_api = MagicMock() + mock_hf_api_class.return_value = mock_hf_api + + # Mock the uploaded files for pull_from_hub + uploaded_files = {} + + def mock_upload_file(**kwargs): + """Capture uploaded files.""" + path_or_fileobj = kwargs.get("path_or_fileobj") + path_in_repo = kwargs.get("path_in_repo") + if isinstance(path_or_fileobj, str): + with open(path_or_fileobj, "rb") as f: + uploaded_files[path_in_repo] = f.read() + else: + uploaded_files[path_in_repo] = path_or_fileobj.read() + + mock_hf_api.upload_file.side_effect = mock_upload_file + + # Mock load_dataset for pull_from_hub + def mock_load_dataset_for_pull(repo_id, split=None, token=None, **kwargs): + """Mock loading dataset from hub.""" + # Return the original dataset + mock_hf_dataset_for_pull = MagicMock() + mock_hf_dataset_for_pull.to_pandas.return_value = original_df + return mock_hf_dataset_for_pull + + mock_load_dataset.side_effect = mock_load_dataset_for_pull + + # Mock hf_hub_download for pull_from_hub + def mock_hf_hub_download(repo_id, filename, repo_type, token=None): + """Mock downloading files from hub.""" + if filename in uploaded_files: + # Create a temporary file with the uploaded content + temp_file = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") + if filename.endswith(".json"): + content = uploaded_files[filename].decode("utf-8") if isinstance(uploaded_files[filename], bytes) else uploaded_files[filename] + temp_file.write(content) + temp_file.close() + return temp_file.name + raise FileNotFoundError(f"File {filename} not found") + + # Mock list_repo_files + def mock_list_repo_files(repo_id, repo_type, token=None): + """Mock listing repo files.""" + return list(uploaded_files.keys()) + + # Push to hub + repo_id = "test-user/test-dataset" + with ( + patch("data_designer.integrations.huggingface.client.hf_hub_download", side_effect=mock_hf_hub_download), + patch("data_designer.integrations.huggingface.client.list_repo_files", side_effect=mock_list_repo_files), + ): + results.push_to_hub(repo_id, token="test-token", generate_card=True) + + # Verify dataset was pushed + mock_dataset_class.from_pandas.assert_called_once() + mock_hf_dataset.push_to_hub.assert_called_once_with(repo_id, token="test-token") + + # Verify analysis.json was uploaded + assert "analysis.json" in uploaded_files + analysis_data = json.loads(uploaded_files["analysis.json"].decode("utf-8")) + assert analysis_data["num_records"] == num_records + + # Verify README.md was uploaded + assert "README.md" in uploaded_files + readme_content = uploaded_files["README.md"].decode("utf-8") + assert "NeMo Data Designer" in readme_content + assert repo_id in readme_content + + # Pull from hub + pulled_results = DatasetCreationResults.pull_from_hub( + repo_id=repo_id, + token="test-token", + artifact_path=tmp_path / "pulled_artifacts", + ) + + # Verify pulled dataset matches original + pulled_df = pulled_results.load_dataset() + pd.testing.assert_frame_equal(pulled_df, original_df, check_dtype=False) + + # Verify pulled analysis matches original + pulled_analysis = pulled_results.load_analysis() + assert pulled_analysis.num_records == original_analysis.num_records + assert pulled_analysis.target_num_records == original_analysis.target_num_records + assert len(pulled_analysis.column_statistics) == len(original_analysis.column_statistics) + + # Verify config builder was reconstructed + pulled_config_builder = pulled_results.config_builder + assert pulled_config_builder is not None + pulled_column_configs = pulled_config_builder.get_column_configs() + assert len(pulled_column_configs) == 2 # product_category and rating + + # Verify artifact storage structure exists + assert pulled_results.artifact_storage.base_dataset_path.exists() + assert (pulled_results.artifact_storage.base_dataset_path / "parquet-files").exists() + + +@pytest.mark.integration +@patch("data_designer.integrations.huggingface.client.Dataset") +@patch("data_designer.integrations.huggingface.client.HfApi") +@patch("data_designer.integrations.huggingface.client.load_dataset") +def test_push_and_pull_with_pull_from_hub_function( + mock_load_dataset, + mock_hf_api_class, + mock_dataset_class, + simple_dataset_config, +): + """Integration test: create dataset, push to hub, pull using pull_from_hub function.""" + # Initialize DataDesigner + data_designer = DataDesigner() + + # Create a small dataset - using simple config without LLM + num_records = 5 + results = data_designer.create(config_builder=simple_dataset_config, num_records=num_records) + + original_df = results.load_dataset() + + # Mock Hugging Face Hub interactions + mock_hf_dataset = MagicMock() + mock_dataset_class.from_pandas.return_value = mock_hf_dataset + + mock_hf_api = MagicMock() + mock_hf_api_class.return_value = mock_hf_api + + uploaded_files = {} + + def mock_upload_file(**kwargs): + """Capture uploaded files.""" + path_or_fileobj = kwargs.get("path_or_fileobj") + path_in_repo = kwargs.get("path_in_repo") + if isinstance(path_or_fileobj, str): + with open(path_or_fileobj, "rb") as f: + uploaded_files[path_in_repo] = f.read() + else: + uploaded_files[path_in_repo] = path_or_fileobj.read() + + mock_hf_api.upload_file.side_effect = mock_upload_file + + # Mock load_dataset for pull_from_hub + def mock_load_dataset_for_pull(repo_id, split=None, token=None, **kwargs): + """Mock loading dataset from hub.""" + mock_hf_dataset_for_pull = MagicMock() + mock_hf_dataset_for_pull.to_pandas.return_value = original_df + return mock_hf_dataset_for_pull + + mock_load_dataset.side_effect = mock_load_dataset_for_pull + + # Mock hf_hub_download for pull_from_hub + def mock_hf_hub_download(repo_id, filename, repo_type, token=None): + """Mock downloading files from hub.""" + if filename in uploaded_files: + temp_file = tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") + if filename.endswith(".json"): + content = uploaded_files[filename].decode("utf-8") if isinstance(uploaded_files[filename], bytes) else uploaded_files[filename] + temp_file.write(content) + temp_file.close() + return temp_file.name + raise FileNotFoundError(f"File {filename} not found") + + # Mock list_repo_files + def mock_list_repo_files(repo_id, repo_type, token=None): + """Mock listing repo files.""" + return list(uploaded_files.keys()) + + # Push to hub + repo_id = "test-user/test-dataset-2" + with ( + patch("data_designer.integrations.huggingface.client.hf_hub_download", side_effect=mock_hf_hub_download), + patch("data_designer.integrations.huggingface.client.list_repo_files", side_effect=mock_list_repo_files), + ): + results.push_to_hub(repo_id, token="test-token", generate_card=True) + + # Pull using pull_from_hub function + hub_results = HuggingFaceHubClient.pull_from_hub( + repo_id=repo_id, + token="test-token", + include_analysis=True, + include_configs=True, + ) + + # Verify pulled dataset matches original + pd.testing.assert_frame_equal(hub_results.dataset, original_df, check_dtype=False) + + # Verify analysis was loaded + assert hub_results.analysis is not None + assert hub_results.analysis.num_records == num_records + + # Verify configs were loaded + assert hub_results.column_configs is not None + assert len(hub_results.column_configs) == 2 + + +@pytest.mark.integration +def test_real_push_to_hub(simple_dataset_config): + """Real integration test: create dataset and push to actual Hugging Face Hub.""" + import os + + # Only run if HF_TOKEN is set + token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_HUB_TOKEN") + if not token: + pytest.skip("HF_TOKEN or HUGGINGFACE_HUB_TOKEN not set, skipping real push test") + + # Initialize DataDesigner + data_designer = DataDesigner() + + # Create a small dataset - using simple config without LLM + num_records = 1 + results = data_designer.create(config_builder=simple_dataset_config, num_records=num_records) + + # Verify dataset was created + original_df = results.load_dataset() + assert len(original_df) == num_records + assert "product_category" in original_df.columns + assert "rating" in original_df.columns + + # Push to actual Hugging Face Hub + repo_id = "davidberenstein1957/datadesigner-test" + print(f"\n🚀 Pushing dataset to {repo_id}...") + results.push_to_hub(repo_id, token=token, generate_card=True) + print(f"✅ Successfully pushed dataset to {repo_id}!") + diff --git a/tests/interface/test_results.py b/tests/interface/test_results.py index ac159fc42..1ff598dad 100644 --- a/tests/interface/test_results.py +++ b/tests/interface/test_results.py @@ -40,7 +40,7 @@ def test_init(stub_artifact_storage, stub_dataset_profiler_results, stub_complet ) assert results.artifact_storage == stub_artifact_storage assert results._analysis == stub_dataset_profiler_results - assert results._config_builder == stub_complete_builder + assert results.config_builder == stub_complete_builder def test_load_dataset(stub_dataset_creation_results, stub_artifact_storage, stub_dataframe):