Skip to content

DataInfo Package

The datainfo package provides dataset information management for HuggingFace datasets. It enables exploration of dataset metadata, structure, and relationships without loading actual genomic data.

Overview

The datainfo package consists of three main components:

  • DataCard: High-level interface for exploring dataset metadata
  • Fetchers: Low-level components for retrieving data from HuggingFace Hub
  • Models: Pydantic models for validation and type safety

Main Interface

DataCard

tfbpapi.datainfo.datacard.DataCard

Easy-to-use interface for exploring HuggingFace dataset metadata.

Provides methods to discover and explore dataset contents, configurations, and metadata without loading the actual genomic data.

Source code in tfbpapi/datainfo/datacard.py
class DataCard:
    """
    Easy-to-use interface for exploring HuggingFace dataset metadata.

    Provides methods to discover and explore dataset contents, configurations, and
    metadata without loading the actual genomic data.

    """

    def __init__(self, repo_id: str, token: str | None = None):
        """
        Initialize DataCard for a repository.

        :param repo_id: HuggingFace repository identifier (e.g., "user/dataset")
        :param token: Optional HuggingFace token for authentication

        """
        self.repo_id = repo_id
        self.token = token
        self.logger = logging.getLogger(self.__class__.__name__)

        # Initialize fetchers
        self._card_fetcher = HfDataCardFetcher(token=token)
        self._structure_fetcher = HfRepoStructureFetcher(token=token)
        self._size_fetcher = HfSizeInfoFetcher(token=token)

        # Cache for parsed card
        self._dataset_card: DatasetCard | None = None
        self._metadata_cache: dict[str, list[ExtractedMetadata]] = {}

    @property
    def dataset_card(self) -> DatasetCard:
        """Get the validated dataset card."""
        if self._dataset_card is None:
            self._load_and_validate_card()
        # this is here for type checking purposes. _load_and_validate_card()
        # will either set the _dataset_card or raise an error
        assert self._dataset_card is not None
        return self._dataset_card

    def _load_and_validate_card(self) -> None:
        """Load and validate the dataset card from HuggingFace."""
        try:
            self.logger.debug(f"Loading dataset card for {self.repo_id}")
            card_data = self._card_fetcher.fetch(self.repo_id)

            if not card_data:
                raise DataCardValidationError(
                    f"No dataset card found for {self.repo_id}"
                )

            # Validate using Pydantic model
            self._dataset_card = DatasetCard(**card_data)
            self.logger.debug(f"Successfully validated dataset card for {self.repo_id}")

        except ValidationError as e:
            # Create a more user-friendly error message
            error_details = []
            for error in e.errors():
                field_path = " -> ".join(str(x) for x in error["loc"])
                error_type = error["type"]
                error_msg = error["msg"]
                input_value = error.get("input", "N/A")

                if "dtype" in field_path and error_type == "string_type":
                    error_details.append(
                        f"Field '{field_path}': Expected a simple data type string (like 'string', 'int64', 'float64') "
                        f"but got a complex structure. This might be a categorical field with class labels. "
                        f"Actual value: {input_value}"
                    )
                else:
                    error_details.append(
                        f"Field '{field_path}': {error_msg} (got: {input_value})"
                    )

            detailed_msg = (
                f"Dataset card validation failed for {self.repo_id}:\n"
                + "\n".join(f"  - {detail}" for detail in error_details)
            )
            self.logger.error(detailed_msg)
            raise DataCardValidationError(detailed_msg) from e
        except HfDataFetchError as e:
            raise DataCardError(f"Failed to fetch dataset card: {e}") from e

    @property
    def configs(self) -> list[DatasetConfig]:
        """Get all dataset configurations."""
        return self.dataset_card.configs

    def get_config(self, config_name: str) -> DatasetConfig | None:
        """Get a specific configuration by name."""
        return self.dataset_card.get_config_by_name(config_name)

    def get_configs_by_type(
        self, dataset_type: DatasetType | str
    ) -> list[DatasetConfig]:
        """Get configurations by dataset type."""
        if isinstance(dataset_type, str):
            dataset_type = DatasetType(dataset_type)
        return self.dataset_card.get_configs_by_type(dataset_type)

    def get_regulators(self, config_name: str | None = None) -> set[str]:
        """
        Get all regulators mentioned in the dataset.

        :param config_name: Optional specific config to search, otherwise searches all
        :return: Set of regulator identifiers found

        """
        raise NotImplementedError("Method not yet implemented")

    def get_experimental_conditions(self, config_name: str | None = None) -> set[str]:
        """
        Get all experimental conditions mentioned in the dataset.

        :param config_name: Optional specific config to search, otherwise searches all
        :return: Set of experimental conditions found

        """
        raise NotImplementedError("Method not yet implemented")

    def get_field_values(self, config_name: str, field_name: str) -> set[str]:
        """
        Get all unique values for a specific field in a configuration.

        :param config_name: Configuration name
        :param field_name: Field name to extract values from
        :return: Set of unique values
        :raises DataCardError: If config or field not found

        """
        config = self.get_config(config_name)
        if not config:
            raise DataCardError(f"Configuration '{config_name}' not found")

        # Check if field exists in the config
        field_names = [f.name for f in config.dataset_info.features]
        if field_name not in field_names:
            raise DataCardError(
                f"Field '{field_name}' not found in config '{config_name}'"
            )

        return self._extract_field_values(config, field_name)

    def _extract_field_values(self, config: DatasetConfig, field_name: str) -> set[str]:
        """Extract unique values for a field from various sources."""
        values = set()

        # Check cache first
        cache_key = f"{config.config_name}:{field_name}"
        if cache_key in self._metadata_cache:
            cached_metadata = self._metadata_cache[cache_key]
            for meta in cached_metadata:
                if meta.field_name == field_name:
                    values.update(meta.values)
                    return values

        try:
            # For partitioned datasets, extract from file structure
            if (
                config.dataset_info.partitioning
                and config.dataset_info.partitioning.enabled
            ):
                partition_values = self._extract_partition_values(config, field_name)
                if partition_values:
                    values.update(partition_values)

            # For embedded metadata fields, we would need to query the actual data
            # This is a placeholder - in practice, you might use the HF datasets server API
            if config.metadata_fields and field_name in config.metadata_fields:
                # Placeholder for actual data extraction
                self.logger.debug(
                    f"Would extract embedded metadata for {field_name} in {config.config_name}"
                )

        except Exception as e:
            self.logger.warning(f"Failed to extract values for {field_name}: {e}")

        return values

    def _extract_partition_values(
        self, config: DatasetConfig, field_name: str
    ) -> set[str]:
        """Extract values from partition structure."""
        if (
            not config.dataset_info.partitioning
            or not config.dataset_info.partitioning.enabled
        ):
            return set()

        partition_columns = config.dataset_info.partitioning.partition_by or []
        if field_name not in partition_columns:
            return set()

        try:
            # Get partition values from repository structure
            partition_values = self._structure_fetcher.get_partition_values(
                self.repo_id, field_name
            )
            return set(partition_values)
        except HfDataFetchError:
            self.logger.warning(f"Failed to extract partition values for {field_name}")
            return set()

    def get_metadata_relationships(
        self, refresh_cache: bool = False
    ) -> list[MetadataRelationship]:
        """Get relationships between data configs and their metadata.

        :param refresh_cache: If True, force refresh dataset card from remote
        """
        # Clear cached dataset card if refresh requested
        if refresh_cache:
            self._dataset_card = None

        relationships = []
        data_configs = self.dataset_card.get_data_configs()
        metadata_configs = self.dataset_card.get_metadata_configs()

        for data_config in data_configs:
            # Check for explicit applies_to relationships
            for meta_config in metadata_configs:
                if (
                    meta_config.applies_to
                    and data_config.config_name in meta_config.applies_to
                ):
                    relationships.append(
                        MetadataRelationship(
                            data_config=data_config.config_name,
                            metadata_config=meta_config.config_name,
                            relationship_type="explicit",
                        )
                    )

            # Check for embedded metadata (always runs regardless of explicit relationships)
            if data_config.metadata_fields:
                relationships.append(
                    MetadataRelationship(
                        data_config=data_config.config_name,
                        metadata_config=f"{data_config.config_name}_embedded",
                        relationship_type="embedded",
                    )
                )

        return relationships

    def get_repository_info(self) -> dict[str, Any]:
        """Get general repository information."""
        card = self.dataset_card

        try:
            structure = self._structure_fetcher.fetch(self.repo_id)
            total_files = structure.get("total_files", 0)
            last_modified = structure.get("last_modified")
        except HfDataFetchError:
            total_files = None
            last_modified = None

        return {
            "repo_id": self.repo_id,
            "pretty_name": card.pretty_name,
            "license": card.license,
            "tags": card.tags,
            "language": card.language,
            "size_categories": card.size_categories,
            "num_configs": len(card.configs),
            "dataset_types": [config.dataset_type.value for config in card.configs],
            "total_files": total_files,
            "last_modified": last_modified,
            "has_default_config": self.dataset_card.get_default_config() is not None,
        }

    def explore_config(self, config_name: str) -> dict[str, Any]:
        """Get detailed information about a specific configuration."""
        config = self.get_config(config_name)
        if not config:
            raise DataCardError(f"Configuration '{config_name}' not found")

        info: dict[str, Any] = {
            "config_name": config.config_name,
            "description": config.description,
            "dataset_type": config.dataset_type.value,
            "is_default": config.default,
            "num_features": len(config.dataset_info.features),
            "features": [
                {"name": f.name, "dtype": f.dtype, "description": f.description}
                for f in config.dataset_info.features
            ],
            "data_files": [
                {"split": df.split, "path": df.path} for df in config.data_files
            ],
        }

        # Add partitioning info if present
        if config.dataset_info.partitioning:
            info["partitioning"] = {
                "enabled": config.dataset_info.partitioning.enabled,
                "partition_by": config.dataset_info.partitioning.partition_by,
                "path_template": config.dataset_info.partitioning.path_template,
            }

        # Add metadata-specific fields
        if config.applies_to:
            info["applies_to"] = config.applies_to

        if config.metadata_fields:
            info["metadata_fields"] = config.metadata_fields

        return info

    def summary(self) -> str:
        """Get a human-readable summary of the dataset."""
        card = self.dataset_card
        info = self.get_repository_info()

        lines = [
            f"Dataset: {card.pretty_name or self.repo_id}",
            f"Repository: {self.repo_id}",
            f"License: {card.license or 'Not specified'}",
            f"Configurations: {len(card.configs)}",
            f"Dataset Types: {', '.join(info['dataset_types'])}",
        ]

        if card.tags:
            lines.append(f"Tags: {', '.join(card.tags)}")

        # Add config summaries
        lines.append("\nConfigurations:")
        for config in card.configs:
            default_mark = " (default)" if config.default else ""
            lines.append(
                f"  - {config.config_name}: {config.dataset_type.value}{default_mark}"
            )
            lines.append(f"    {config.description}")

        return "\n".join(lines)

configs property

Get all dataset configurations.

dataset_card property

Get the validated dataset card.

__init__(repo_id, token=None)

Initialize DataCard for a repository.

Parameters:

Name Type Description Default
repo_id str

HuggingFace repository identifier (e.g., “user/dataset”)

required
token str | None

Optional HuggingFace token for authentication

None
Source code in tfbpapi/datainfo/datacard.py
def __init__(self, repo_id: str, token: str | None = None):
    """
    Initialize DataCard for a repository.

    :param repo_id: HuggingFace repository identifier (e.g., "user/dataset")
    :param token: Optional HuggingFace token for authentication

    """
    self.repo_id = repo_id
    self.token = token
    self.logger = logging.getLogger(self.__class__.__name__)

    # Initialize fetchers
    self._card_fetcher = HfDataCardFetcher(token=token)
    self._structure_fetcher = HfRepoStructureFetcher(token=token)
    self._size_fetcher = HfSizeInfoFetcher(token=token)

    # Cache for parsed card
    self._dataset_card: DatasetCard | None = None
    self._metadata_cache: dict[str, list[ExtractedMetadata]] = {}

explore_config(config_name)

Get detailed information about a specific configuration.

Source code in tfbpapi/datainfo/datacard.py
def explore_config(self, config_name: str) -> dict[str, Any]:
    """Get detailed information about a specific configuration."""
    config = self.get_config(config_name)
    if not config:
        raise DataCardError(f"Configuration '{config_name}' not found")

    info: dict[str, Any] = {
        "config_name": config.config_name,
        "description": config.description,
        "dataset_type": config.dataset_type.value,
        "is_default": config.default,
        "num_features": len(config.dataset_info.features),
        "features": [
            {"name": f.name, "dtype": f.dtype, "description": f.description}
            for f in config.dataset_info.features
        ],
        "data_files": [
            {"split": df.split, "path": df.path} for df in config.data_files
        ],
    }

    # Add partitioning info if present
    if config.dataset_info.partitioning:
        info["partitioning"] = {
            "enabled": config.dataset_info.partitioning.enabled,
            "partition_by": config.dataset_info.partitioning.partition_by,
            "path_template": config.dataset_info.partitioning.path_template,
        }

    # Add metadata-specific fields
    if config.applies_to:
        info["applies_to"] = config.applies_to

    if config.metadata_fields:
        info["metadata_fields"] = config.metadata_fields

    return info

get_config(config_name)

Get a specific configuration by name.

Source code in tfbpapi/datainfo/datacard.py
def get_config(self, config_name: str) -> DatasetConfig | None:
    """Get a specific configuration by name."""
    return self.dataset_card.get_config_by_name(config_name)

get_configs_by_type(dataset_type)

Get configurations by dataset type.

Source code in tfbpapi/datainfo/datacard.py
def get_configs_by_type(
    self, dataset_type: DatasetType | str
) -> list[DatasetConfig]:
    """Get configurations by dataset type."""
    if isinstance(dataset_type, str):
        dataset_type = DatasetType(dataset_type)
    return self.dataset_card.get_configs_by_type(dataset_type)

get_experimental_conditions(config_name=None)

Get all experimental conditions mentioned in the dataset.

Parameters:

Name Type Description Default
config_name str | None

Optional specific config to search, otherwise searches all

None

Returns:

Type Description
set[str]

Set of experimental conditions found

Source code in tfbpapi/datainfo/datacard.py
def get_experimental_conditions(self, config_name: str | None = None) -> set[str]:
    """
    Get all experimental conditions mentioned in the dataset.

    :param config_name: Optional specific config to search, otherwise searches all
    :return: Set of experimental conditions found

    """
    raise NotImplementedError("Method not yet implemented")

get_field_values(config_name, field_name)

Get all unique values for a specific field in a configuration.

Parameters:

Name Type Description Default
config_name str

Configuration name

required
field_name str

Field name to extract values from

required

Returns:

Type Description
set[str]

Set of unique values

Raises:

Type Description
DataCardError

If config or field not found

Source code in tfbpapi/datainfo/datacard.py
def get_field_values(self, config_name: str, field_name: str) -> set[str]:
    """
    Get all unique values for a specific field in a configuration.

    :param config_name: Configuration name
    :param field_name: Field name to extract values from
    :return: Set of unique values
    :raises DataCardError: If config or field not found

    """
    config = self.get_config(config_name)
    if not config:
        raise DataCardError(f"Configuration '{config_name}' not found")

    # Check if field exists in the config
    field_names = [f.name for f in config.dataset_info.features]
    if field_name not in field_names:
        raise DataCardError(
            f"Field '{field_name}' not found in config '{config_name}'"
        )

    return self._extract_field_values(config, field_name)

get_metadata_relationships(refresh_cache=False)

Get relationships between data configs and their metadata.

Parameters:

Name Type Description Default
refresh_cache bool

If True, force refresh dataset card from remote

False
Source code in tfbpapi/datainfo/datacard.py
def get_metadata_relationships(
    self, refresh_cache: bool = False
) -> list[MetadataRelationship]:
    """Get relationships between data configs and their metadata.

    :param refresh_cache: If True, force refresh dataset card from remote
    """
    # Clear cached dataset card if refresh requested
    if refresh_cache:
        self._dataset_card = None

    relationships = []
    data_configs = self.dataset_card.get_data_configs()
    metadata_configs = self.dataset_card.get_metadata_configs()

    for data_config in data_configs:
        # Check for explicit applies_to relationships
        for meta_config in metadata_configs:
            if (
                meta_config.applies_to
                and data_config.config_name in meta_config.applies_to
            ):
                relationships.append(
                    MetadataRelationship(
                        data_config=data_config.config_name,
                        metadata_config=meta_config.config_name,
                        relationship_type="explicit",
                    )
                )

        # Check for embedded metadata (always runs regardless of explicit relationships)
        if data_config.metadata_fields:
            relationships.append(
                MetadataRelationship(
                    data_config=data_config.config_name,
                    metadata_config=f"{data_config.config_name}_embedded",
                    relationship_type="embedded",
                )
            )

    return relationships

get_regulators(config_name=None)

Get all regulators mentioned in the dataset.

Parameters:

Name Type Description Default
config_name str | None

Optional specific config to search, otherwise searches all

None

Returns:

Type Description
set[str]

Set of regulator identifiers found

Source code in tfbpapi/datainfo/datacard.py
def get_regulators(self, config_name: str | None = None) -> set[str]:
    """
    Get all regulators mentioned in the dataset.

    :param config_name: Optional specific config to search, otherwise searches all
    :return: Set of regulator identifiers found

    """
    raise NotImplementedError("Method not yet implemented")

get_repository_info()

Get general repository information.

Source code in tfbpapi/datainfo/datacard.py
def get_repository_info(self) -> dict[str, Any]:
    """Get general repository information."""
    card = self.dataset_card

    try:
        structure = self._structure_fetcher.fetch(self.repo_id)
        total_files = structure.get("total_files", 0)
        last_modified = structure.get("last_modified")
    except HfDataFetchError:
        total_files = None
        last_modified = None

    return {
        "repo_id": self.repo_id,
        "pretty_name": card.pretty_name,
        "license": card.license,
        "tags": card.tags,
        "language": card.language,
        "size_categories": card.size_categories,
        "num_configs": len(card.configs),
        "dataset_types": [config.dataset_type.value for config in card.configs],
        "total_files": total_files,
        "last_modified": last_modified,
        "has_default_config": self.dataset_card.get_default_config() is not None,
    }

summary()

Get a human-readable summary of the dataset.

Source code in tfbpapi/datainfo/datacard.py
def summary(self) -> str:
    """Get a human-readable summary of the dataset."""
    card = self.dataset_card
    info = self.get_repository_info()

    lines = [
        f"Dataset: {card.pretty_name or self.repo_id}",
        f"Repository: {self.repo_id}",
        f"License: {card.license or 'Not specified'}",
        f"Configurations: {len(card.configs)}",
        f"Dataset Types: {', '.join(info['dataset_types'])}",
    ]

    if card.tags:
        lines.append(f"Tags: {', '.join(card.tags)}")

    # Add config summaries
    lines.append("\nConfigurations:")
    for config in card.configs:
        default_mark = " (default)" if config.default else ""
        lines.append(
            f"  - {config.config_name}: {config.dataset_type.value}{default_mark}"
        )
        lines.append(f"    {config.description}")

    return "\n".join(lines)

The DataCard class is the primary interface for exploring HuggingFace datasets. It provides methods to:

  • Discover dataset configurations and types
  • Explore feature schemas and data types
  • Understand metadata relationships
  • Extract field values and experimental conditions
  • Navigate partitioned dataset structures

Data Models

Core Models

tfbpapi.datainfo.models.DatasetCard

Bases: BaseModel

Complete dataset card model.

Source code in tfbpapi/datainfo/models.py
class DatasetCard(BaseModel):
    """Complete dataset card model."""

    configs: list[DatasetConfig] = Field(..., description="Dataset configurations")
    license: str | None = Field(default=None, description="Dataset license")
    language: list[str] | None = Field(default=None, description="Dataset languages")
    tags: list[str] | None = Field(default=None, description="Descriptive tags")
    pretty_name: str | None = Field(
        default=None, description="Human-readable dataset name"
    )
    size_categories: list[str] | None = Field(
        default=None, description="Dataset size categories"
    )

    @field_validator("configs")
    @classmethod
    def configs_not_empty(cls, v):
        """Ensure at least one config is present."""
        if not v:
            raise ValueError("At least one dataset configuration is required")
        return v

    @field_validator("configs")
    @classmethod
    def unique_config_names(cls, v):
        """Ensure config names are unique."""
        names = [config.config_name for config in v]
        if len(names) != len(set(names)):
            raise ValueError("Configuration names must be unique")
        return v

    @field_validator("configs")
    @classmethod
    def at_most_one_default(cls, v):
        """Ensure at most one config is marked as default."""
        defaults = [config for config in v if config.default]
        if len(defaults) > 1:
            raise ValueError("At most one configuration can be marked as default")
        return v

    def get_config_by_name(self, name: str) -> DatasetConfig | None:
        """Get a configuration by name."""
        for config in self.configs:
            if config.config_name == name:
                return config
        return None

    def get_configs_by_type(self, dataset_type: DatasetType) -> list[DatasetConfig]:
        """Get all configurations of a specific type."""
        return [
            config for config in self.configs if config.dataset_type == dataset_type
        ]

    def get_default_config(self) -> DatasetConfig | None:
        """Get the default configuration if one exists."""
        defaults = [config for config in self.configs if config.default]
        return defaults[0] if defaults else None

    def get_data_configs(self) -> list[DatasetConfig]:
        """Get all non-metadata configurations."""
        return [
            config
            for config in self.configs
            if config.dataset_type != DatasetType.METADATA
        ]

    def get_metadata_configs(self) -> list[DatasetConfig]:
        """Get all metadata configurations."""
        return [
            config
            for config in self.configs
            if config.dataset_type == DatasetType.METADATA
        ]

at_most_one_default(v) classmethod

Ensure at most one config is marked as default.

Source code in tfbpapi/datainfo/models.py
@field_validator("configs")
@classmethod
def at_most_one_default(cls, v):
    """Ensure at most one config is marked as default."""
    defaults = [config for config in v if config.default]
    if len(defaults) > 1:
        raise ValueError("At most one configuration can be marked as default")
    return v

configs_not_empty(v) classmethod

Ensure at least one config is present.

Source code in tfbpapi/datainfo/models.py
@field_validator("configs")
@classmethod
def configs_not_empty(cls, v):
    """Ensure at least one config is present."""
    if not v:
        raise ValueError("At least one dataset configuration is required")
    return v

get_config_by_name(name)

Get a configuration by name.

Source code in tfbpapi/datainfo/models.py
def get_config_by_name(self, name: str) -> DatasetConfig | None:
    """Get a configuration by name."""
    for config in self.configs:
        if config.config_name == name:
            return config
    return None

get_configs_by_type(dataset_type)

Get all configurations of a specific type.

Source code in tfbpapi/datainfo/models.py
def get_configs_by_type(self, dataset_type: DatasetType) -> list[DatasetConfig]:
    """Get all configurations of a specific type."""
    return [
        config for config in self.configs if config.dataset_type == dataset_type
    ]

get_data_configs()

Get all non-metadata configurations.

Source code in tfbpapi/datainfo/models.py
def get_data_configs(self) -> list[DatasetConfig]:
    """Get all non-metadata configurations."""
    return [
        config
        for config in self.configs
        if config.dataset_type != DatasetType.METADATA
    ]

get_default_config()

Get the default configuration if one exists.

Source code in tfbpapi/datainfo/models.py
def get_default_config(self) -> DatasetConfig | None:
    """Get the default configuration if one exists."""
    defaults = [config for config in self.configs if config.default]
    return defaults[0] if defaults else None

get_metadata_configs()

Get all metadata configurations.

Source code in tfbpapi/datainfo/models.py
def get_metadata_configs(self) -> list[DatasetConfig]:
    """Get all metadata configurations."""
    return [
        config
        for config in self.configs
        if config.dataset_type == DatasetType.METADATA
    ]

unique_config_names(v) classmethod

Ensure config names are unique.

Source code in tfbpapi/datainfo/models.py
@field_validator("configs")
@classmethod
def unique_config_names(cls, v):
    """Ensure config names are unique."""
    names = [config.config_name for config in v]
    if len(names) != len(set(names)):
        raise ValueError("Configuration names must be unique")
    return v

tfbpapi.datainfo.models.DatasetConfig

Bases: BaseModel

Configuration for a dataset within a repository.

Source code in tfbpapi/datainfo/models.py
class DatasetConfig(BaseModel):
    """Configuration for a dataset within a repository."""

    config_name: str = Field(..., description="Unique configuration identifier")
    description: str = Field(..., description="Human-readable description")
    dataset_type: DatasetType = Field(..., description="Type of dataset")
    default: bool = Field(
        default=False, description="Whether this is the default config"
    )
    applies_to: list[str] | None = Field(
        default=None, description="Configs this metadata applies to"
    )
    metadata_fields: list[str] | None = Field(
        default=None, description="Fields for embedded metadata extraction"
    )
    data_files: list[DataFileInfo] = Field(..., description="Data file information")
    dataset_info: DatasetInfo = Field(..., description="Dataset structure information")

    @field_validator("applies_to")
    @classmethod
    def applies_to_only_for_metadata(cls, v, info):
        """Validate that applies_to is only used for metadata configs."""
        if v is not None:
            dataset_type = info.data.get("dataset_type")
            if dataset_type != DatasetType.METADATA:
                raise ValueError(
                    "applies_to field is only valid for metadata dataset types"
                )
        return v

    @field_validator("metadata_fields")
    @classmethod
    def metadata_fields_validation(cls, v):
        """Validate metadata_fields usage."""
        if v is not None and len(v) == 0:
            raise ValueError("metadata_fields cannot be empty list, use None instead")
        return v

applies_to_only_for_metadata(v, info) classmethod

Validate that applies_to is only used for metadata configs.

Source code in tfbpapi/datainfo/models.py
@field_validator("applies_to")
@classmethod
def applies_to_only_for_metadata(cls, v, info):
    """Validate that applies_to is only used for metadata configs."""
    if v is not None:
        dataset_type = info.data.get("dataset_type")
        if dataset_type != DatasetType.METADATA:
            raise ValueError(
                "applies_to field is only valid for metadata dataset types"
            )
    return v

metadata_fields_validation(v) classmethod

Validate metadata_fields usage.

Source code in tfbpapi/datainfo/models.py
@field_validator("metadata_fields")
@classmethod
def metadata_fields_validation(cls, v):
    """Validate metadata_fields usage."""
    if v is not None and len(v) == 0:
        raise ValueError("metadata_fields cannot be empty list, use None instead")
    return v

tfbpapi.datainfo.models.FeatureInfo

Bases: BaseModel

Information about a dataset feature/column.

Source code in tfbpapi/datainfo/models.py
class FeatureInfo(BaseModel):
    """Information about a dataset feature/column."""

    name: str = Field(..., description="Column name in the data")
    dtype: str | dict[str, ClassLabelType] = Field(
        ...,
        description="Data type (string, int64, float64, etc.) or categorical class labels",
    )
    description: str = Field(..., description="Detailed description of the field")
    role: str | None = Field(
        default=None,
        description="Semantic role of the feature (e.g., 'target_identifier', 'regulator_identifier', 'quantitative_measure')",
    )

    @field_validator("dtype", mode="before")
    @classmethod
    def validate_dtype(cls, v):
        """Validate and normalize dtype field."""
        if isinstance(v, str):
            return v
        elif isinstance(v, dict):
            # Handle class_label structure
            if "class_label" in v:
                # Convert to our ClassLabelType structure
                class_label_data = v["class_label"]
                if isinstance(class_label_data, dict) and "names" in class_label_data:
                    return {"class_label": ClassLabelType(**class_label_data)}
                else:
                    raise ValueError(
                        f"Invalid class_label structure: expected dict with 'names' key, got {class_label_data}"
                    )
            else:
                raise ValueError(
                    f"Unknown dtype structure: expected 'class_label' key in dict, got keys: {list(v.keys())}"
                )
        else:
            raise ValueError(
                f"dtype must be a string or dict with class_label info, got {type(v)}: {v}"
            )

    def get_dtype_summary(self) -> str:
        """Get a human-readable summary of the data type."""
        if isinstance(self.dtype, str):
            return self.dtype
        elif isinstance(self.dtype, dict) and "class_label" in self.dtype:
            names = self.dtype["class_label"].names
            return f"categorical ({len(names)} classes: {', '.join(names)})"
        else:
            return str(self.dtype)

get_dtype_summary()

Get a human-readable summary of the data type.

Source code in tfbpapi/datainfo/models.py
def get_dtype_summary(self) -> str:
    """Get a human-readable summary of the data type."""
    if isinstance(self.dtype, str):
        return self.dtype
    elif isinstance(self.dtype, dict) and "class_label" in self.dtype:
        names = self.dtype["class_label"].names
        return f"categorical ({len(names)} classes: {', '.join(names)})"
    else:
        return str(self.dtype)

validate_dtype(v) classmethod

Validate and normalize dtype field.

Source code in tfbpapi/datainfo/models.py
@field_validator("dtype", mode="before")
@classmethod
def validate_dtype(cls, v):
    """Validate and normalize dtype field."""
    if isinstance(v, str):
        return v
    elif isinstance(v, dict):
        # Handle class_label structure
        if "class_label" in v:
            # Convert to our ClassLabelType structure
            class_label_data = v["class_label"]
            if isinstance(class_label_data, dict) and "names" in class_label_data:
                return {"class_label": ClassLabelType(**class_label_data)}
            else:
                raise ValueError(
                    f"Invalid class_label structure: expected dict with 'names' key, got {class_label_data}"
                )
        else:
            raise ValueError(
                f"Unknown dtype structure: expected 'class_label' key in dict, got keys: {list(v.keys())}"
            )
    else:
        raise ValueError(
            f"dtype must be a string or dict with class_label info, got {type(v)}: {v}"
        )

Dataset Types

tfbpapi.datainfo.models.DatasetType

Bases: str, Enum

Supported dataset types.

Source code in tfbpapi/datainfo/models.py
class DatasetType(str, Enum):
    """Supported dataset types."""

    GENOMIC_FEATURES = "genomic_features"
    ANNOTATED_FEATURES = "annotated_features"
    GENOME_MAP = "genome_map"
    METADATA = "metadata"

Relationship Models

tfbpapi.datainfo.models.MetadataRelationship

Bases: BaseModel

Relationship between a data config and its metadata.

Source code in tfbpapi/datainfo/models.py
class MetadataRelationship(BaseModel):
    """Relationship between a data config and its metadata."""

    data_config: str = Field(..., description="Data configuration name")
    metadata_config: str = Field(..., description="Metadata configuration name")
    relationship_type: str = Field(
        ..., description="Type of relationship (explicit, embedded)"
    )

tfbpapi.datainfo.models.ExtractedMetadata

Bases: BaseModel

Metadata extracted from datasets.

Source code in tfbpapi/datainfo/models.py
class ExtractedMetadata(BaseModel):
    """Metadata extracted from datasets."""

    config_name: str = Field(..., description="Source configuration name")
    field_name: str = Field(
        ..., description="Field name the metadata was extracted from"
    )
    values: set[str] = Field(..., description="Unique values found")
    extraction_method: str = Field(..., description="How the metadata was extracted")

    model_config = ConfigDict(
        # Allow sets in JSON serialization
        json_encoders={set: list}
    )

Data Fetchers

HuggingFace Integration

tfbpapi.datainfo.fetchers.HfDataCardFetcher

Handles fetching dataset cards from HuggingFace Hub.

Source code in tfbpapi/datainfo/fetchers.py
class HfDataCardFetcher:
    """Handles fetching dataset cards from HuggingFace Hub."""

    def __init__(self, token: str | None = None):
        """
        Initialize the fetcher.

        :param token: HuggingFace token for authentication

        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.token = token or get_hf_token()

    def fetch(self, repo_id: str, repo_type: str = "dataset") -> dict[str, Any]:
        """
        Fetch and return dataset card data.

        :param repo_id: Repository identifier (e.g., "user/dataset")
        :param repo_type: Type of repository ("dataset", "model", "space")
        :return: Dataset card data as dictionary
        :raises HfDataFetchError: If fetching fails

        """
        try:
            self.logger.debug(f"Fetching dataset card for {repo_id}")
            card = DatasetCard.load(repo_id, repo_type=repo_type, token=self.token)

            if not card.data:
                self.logger.warning(f"Dataset card for {repo_id} has no data section")
                return {}

            return card.data.to_dict()

        except Exception as e:
            error_msg = f"Failed to fetch dataset card for {repo_id}: {e}"
            self.logger.error(error_msg)
            raise HfDataFetchError(error_msg) from e

__init__(token=None)

Initialize the fetcher.

Parameters:

Name Type Description Default
token str | None

HuggingFace token for authentication

None
Source code in tfbpapi/datainfo/fetchers.py
def __init__(self, token: str | None = None):
    """
    Initialize the fetcher.

    :param token: HuggingFace token for authentication

    """
    self.logger = logging.getLogger(self.__class__.__name__)
    self.token = token or get_hf_token()

fetch(repo_id, repo_type='dataset')

Fetch and return dataset card data.

Parameters:

Name Type Description Default
repo_id str

Repository identifier (e.g., “user/dataset”)

required
repo_type str

Type of repository (“dataset”, “model”, “space”)

'dataset'

Returns:

Type Description
dict[str, Any]

Dataset card data as dictionary

Raises:

Type Description
HfDataFetchError

If fetching fails

Source code in tfbpapi/datainfo/fetchers.py
def fetch(self, repo_id: str, repo_type: str = "dataset") -> dict[str, Any]:
    """
    Fetch and return dataset card data.

    :param repo_id: Repository identifier (e.g., "user/dataset")
    :param repo_type: Type of repository ("dataset", "model", "space")
    :return: Dataset card data as dictionary
    :raises HfDataFetchError: If fetching fails

    """
    try:
        self.logger.debug(f"Fetching dataset card for {repo_id}")
        card = DatasetCard.load(repo_id, repo_type=repo_type, token=self.token)

        if not card.data:
            self.logger.warning(f"Dataset card for {repo_id} has no data section")
            return {}

        return card.data.to_dict()

    except Exception as e:
        error_msg = f"Failed to fetch dataset card for {repo_id}: {e}"
        self.logger.error(error_msg)
        raise HfDataFetchError(error_msg) from e

tfbpapi.datainfo.fetchers.HfRepoStructureFetcher

Handles fetching repository structure from HuggingFace Hub.

Source code in tfbpapi/datainfo/fetchers.py
class HfRepoStructureFetcher:
    """Handles fetching repository structure from HuggingFace Hub."""

    def __init__(self, token: str | None = None):
        """
        Initialize the fetcher.

        :param token: HuggingFace token for authentication

        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.token = token or get_hf_token()
        self._cached_structure: dict[str, dict[str, Any]] = {}

    def fetch(self, repo_id: str, force_refresh: bool = False) -> dict[str, Any]:
        """
        Fetch repository structure information.

        :param repo_id: Repository identifier (e.g., "user/dataset")
        :param force_refresh: If True, bypass cache and fetch fresh data
        :return: Repository structure information
        :raises HfDataFetchError: If fetching fails

        """
        # Check cache first unless force refresh is requested
        if not force_refresh and repo_id in self._cached_structure:
            self.logger.debug(f"Using cached repo structure for {repo_id}")
            return self._cached_structure[repo_id]

        try:
            self.logger.debug(f"Fetching repo structure for {repo_id}")
            info = repo_info(repo_id=repo_id, repo_type="dataset", token=self.token)

            # Extract file structure
            files = []
            partitions: dict[str, set] = {}

            for sibling in info.siblings or []:
                file_info = {
                    "path": sibling.rfilename,
                    "size": sibling.size,
                    "is_lfs": sibling.lfs is not None,
                }
                files.append(file_info)

                # Extract partition information from file paths
                self._extract_partition_info(sibling.rfilename, partitions)

            result = {
                "repo_id": repo_id,
                "files": files,
                "partitions": partitions,
                "total_files": len(files),
                "last_modified": (
                    info.last_modified.isoformat() if info.last_modified else None
                ),
            }

            # Cache the result
            self._cached_structure[repo_id] = result
            return result

        except Exception as e:
            error_msg = f"Failed to fetch repo structure for {repo_id}: {e}"
            self.logger.error(error_msg)
            raise HfDataFetchError(error_msg) from e

    def _extract_partition_info(
        self, file_path: str, partitions: dict[str, set[str]]
    ) -> None:
        """
        Extract partition information from file paths.

        :param file_path: Path to analyze for partitions
        :param partitions: Dictionary to update with partition info

        """
        # Look for partition patterns like "column=value" in path
        partition_pattern = r"([^/=]+)=([^/]+)"
        matches = re.findall(partition_pattern, file_path)

        for column, value in matches:
            if column not in partitions:
                partitions[column] = set()
            partitions[column].add(value)

    def get_partition_values(
        self, repo_id: str, partition_column: str, force_refresh: bool = False
    ) -> list[str]:
        """
        Get all values for a specific partition column.

        :param repo_id: Repository identifier
        :param partition_column: Name of the partition column
        :param force_refresh: If True, bypass cache and fetch fresh data
        :return: List of unique partition values
        :raises HfDataFetchError: If fetching fails

        """
        structure = self.fetch(repo_id, force_refresh=force_refresh)
        partition_values = structure.get("partitions", {}).get(partition_column, set())
        return sorted(list(partition_values))

    def get_dataset_files(
        self, repo_id: str, path_pattern: str | None = None, force_refresh: bool = False
    ) -> list[dict[str, Any]]:
        """
        Get dataset files, optionally filtered by path pattern.

        :param repo_id: Repository identifier
        :param path_pattern: Optional regex pattern to filter files
        :param force_refresh: If True, bypass cache and fetch fresh data
        :return: List of matching files
        :raises HfDataFetchError: If fetching fails

        """
        structure = self.fetch(repo_id, force_refresh=force_refresh)
        files = structure["files"]

        if path_pattern:
            pattern = re.compile(path_pattern)
            files = [f for f in files if pattern.search(f["path"])]

        return files

__init__(token=None)

Initialize the fetcher.

Parameters:

Name Type Description Default
token str | None

HuggingFace token for authentication

None
Source code in tfbpapi/datainfo/fetchers.py
def __init__(self, token: str | None = None):
    """
    Initialize the fetcher.

    :param token: HuggingFace token for authentication

    """
    self.logger = logging.getLogger(self.__class__.__name__)
    self.token = token or get_hf_token()
    self._cached_structure: dict[str, dict[str, Any]] = {}

fetch(repo_id, force_refresh=False)

Fetch repository structure information.

Parameters:

Name Type Description Default
repo_id str

Repository identifier (e.g., “user/dataset”)

required
force_refresh bool

If True, bypass cache and fetch fresh data

False

Returns:

Type Description
dict[str, Any]

Repository structure information

Raises:

Type Description
HfDataFetchError

If fetching fails

Source code in tfbpapi/datainfo/fetchers.py
def fetch(self, repo_id: str, force_refresh: bool = False) -> dict[str, Any]:
    """
    Fetch repository structure information.

    :param repo_id: Repository identifier (e.g., "user/dataset")
    :param force_refresh: If True, bypass cache and fetch fresh data
    :return: Repository structure information
    :raises HfDataFetchError: If fetching fails

    """
    # Check cache first unless force refresh is requested
    if not force_refresh and repo_id in self._cached_structure:
        self.logger.debug(f"Using cached repo structure for {repo_id}")
        return self._cached_structure[repo_id]

    try:
        self.logger.debug(f"Fetching repo structure for {repo_id}")
        info = repo_info(repo_id=repo_id, repo_type="dataset", token=self.token)

        # Extract file structure
        files = []
        partitions: dict[str, set] = {}

        for sibling in info.siblings or []:
            file_info = {
                "path": sibling.rfilename,
                "size": sibling.size,
                "is_lfs": sibling.lfs is not None,
            }
            files.append(file_info)

            # Extract partition information from file paths
            self._extract_partition_info(sibling.rfilename, partitions)

        result = {
            "repo_id": repo_id,
            "files": files,
            "partitions": partitions,
            "total_files": len(files),
            "last_modified": (
                info.last_modified.isoformat() if info.last_modified else None
            ),
        }

        # Cache the result
        self._cached_structure[repo_id] = result
        return result

    except Exception as e:
        error_msg = f"Failed to fetch repo structure for {repo_id}: {e}"
        self.logger.error(error_msg)
        raise HfDataFetchError(error_msg) from e

get_dataset_files(repo_id, path_pattern=None, force_refresh=False)

Get dataset files, optionally filtered by path pattern.

Parameters:

Name Type Description Default
repo_id str

Repository identifier

required
path_pattern str | None

Optional regex pattern to filter files

None
force_refresh bool

If True, bypass cache and fetch fresh data

False

Returns:

Type Description
list[dict[str, Any]]

List of matching files

Raises:

Type Description
HfDataFetchError

If fetching fails

Source code in tfbpapi/datainfo/fetchers.py
def get_dataset_files(
    self, repo_id: str, path_pattern: str | None = None, force_refresh: bool = False
) -> list[dict[str, Any]]:
    """
    Get dataset files, optionally filtered by path pattern.

    :param repo_id: Repository identifier
    :param path_pattern: Optional regex pattern to filter files
    :param force_refresh: If True, bypass cache and fetch fresh data
    :return: List of matching files
    :raises HfDataFetchError: If fetching fails

    """
    structure = self.fetch(repo_id, force_refresh=force_refresh)
    files = structure["files"]

    if path_pattern:
        pattern = re.compile(path_pattern)
        files = [f for f in files if pattern.search(f["path"])]

    return files

get_partition_values(repo_id, partition_column, force_refresh=False)

Get all values for a specific partition column.

Parameters:

Name Type Description Default
repo_id str

Repository identifier

required
partition_column str

Name of the partition column

required
force_refresh bool

If True, bypass cache and fetch fresh data

False

Returns:

Type Description
list[str]

List of unique partition values

Raises:

Type Description
HfDataFetchError

If fetching fails

Source code in tfbpapi/datainfo/fetchers.py
def get_partition_values(
    self, repo_id: str, partition_column: str, force_refresh: bool = False
) -> list[str]:
    """
    Get all values for a specific partition column.

    :param repo_id: Repository identifier
    :param partition_column: Name of the partition column
    :param force_refresh: If True, bypass cache and fetch fresh data
    :return: List of unique partition values
    :raises HfDataFetchError: If fetching fails

    """
    structure = self.fetch(repo_id, force_refresh=force_refresh)
    partition_values = structure.get("partitions", {}).get(partition_column, set())
    return sorted(list(partition_values))

tfbpapi.datainfo.fetchers.HfSizeInfoFetcher

Handles fetching size information from HuggingFace Dataset Server API.

Source code in tfbpapi/datainfo/fetchers.py
class HfSizeInfoFetcher:
    """Handles fetching size information from HuggingFace Dataset Server API."""

    def __init__(self, token: str | None = None):
        """
        Initialize the fetcher.

        :param token: HuggingFace token for authentication

        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.token = token or get_hf_token()
        self.base_url = "https://datasets-server.huggingface.co"

    def _build_headers(self) -> dict[str, str]:
        """Build request headers with authentication if available."""
        headers = {"User-Agent": "TFBP-API/1.0"}
        if self.token:
            headers["Authorization"] = f"Bearer {self.token}"
        return headers

    def fetch(self, repo_id: str) -> dict[str, Any]:
        """
        Fetch dataset size information.

        :param repo_id: Repository identifier (e.g., "user/dataset")
        :return: Size information as dictionary
        :raises HfDataFetchError: If fetching fails

        """
        url = f"{self.base_url}/size"
        params = {"dataset": repo_id}
        headers = self._build_headers()

        try:
            self.logger.debug(f"Fetching size info for {repo_id}")
            response = requests.get(url, params=params, headers=headers, timeout=30)
            response.raise_for_status()

            data = response.json()
            self.logger.debug(f"Size info fetched successfully for {repo_id}")
            return data

        except HTTPError as e:
            if e.response.status_code == 404:
                error_msg = f"Dataset {repo_id} not found"
            elif e.response.status_code == 403:
                error_msg = (
                    f"Access denied to dataset {repo_id} (check token permissions)"
                )
            else:
                error_msg = f"HTTP error fetching size for {repo_id}: {e}"

            self.logger.error(error_msg)
            raise HfDataFetchError(error_msg) from e

        except requests.RequestException as e:
            error_msg = f"Request failed fetching size for {repo_id}: {e}"
            self.logger.error(error_msg)
            raise HfDataFetchError(error_msg) from e

        except ValueError as e:
            error_msg = f"Invalid JSON response fetching size for {repo_id}: {e}"
            self.logger.error(error_msg)
            raise HfDataFetchError(error_msg) from e

__init__(token=None)

Initialize the fetcher.

Parameters:

Name Type Description Default
token str | None

HuggingFace token for authentication

None
Source code in tfbpapi/datainfo/fetchers.py
def __init__(self, token: str | None = None):
    """
    Initialize the fetcher.

    :param token: HuggingFace token for authentication

    """
    self.logger = logging.getLogger(self.__class__.__name__)
    self.token = token or get_hf_token()
    self.base_url = "https://datasets-server.huggingface.co"

fetch(repo_id)

Fetch dataset size information.

Parameters:

Name Type Description Default
repo_id str

Repository identifier (e.g., “user/dataset”)

required

Returns:

Type Description
dict[str, Any]

Size information as dictionary

Raises:

Type Description
HfDataFetchError

If fetching fails

Source code in tfbpapi/datainfo/fetchers.py
def fetch(self, repo_id: str) -> dict[str, Any]:
    """
    Fetch dataset size information.

    :param repo_id: Repository identifier (e.g., "user/dataset")
    :return: Size information as dictionary
    :raises HfDataFetchError: If fetching fails

    """
    url = f"{self.base_url}/size"
    params = {"dataset": repo_id}
    headers = self._build_headers()

    try:
        self.logger.debug(f"Fetching size info for {repo_id}")
        response = requests.get(url, params=params, headers=headers, timeout=30)
        response.raise_for_status()

        data = response.json()
        self.logger.debug(f"Size info fetched successfully for {repo_id}")
        return data

    except HTTPError as e:
        if e.response.status_code == 404:
            error_msg = f"Dataset {repo_id} not found"
        elif e.response.status_code == 403:
            error_msg = (
                f"Access denied to dataset {repo_id} (check token permissions)"
            )
        else:
            error_msg = f"HTTP error fetching size for {repo_id}: {e}"

        self.logger.error(error_msg)
        raise HfDataFetchError(error_msg) from e

    except requests.RequestException as e:
        error_msg = f"Request failed fetching size for {repo_id}: {e}"
        self.logger.error(error_msg)
        raise HfDataFetchError(error_msg) from e

    except ValueError as e:
        error_msg = f"Invalid JSON response fetching size for {repo_id}: {e}"
        self.logger.error(error_msg)
        raise HfDataFetchError(error_msg) from e

Usage Examples

Basic Dataset Exploration

from tfbpapi.datainfo import DataCard

# Initialize DataCard for a repository
card = DataCard('BrentLab/rossi_2021')

# Get repository overview
repo_info = card.get_repository_info()
print(f"Dataset: {repo_info['pretty_name']}")
print(f"Configurations: {repo_info['num_configs']}")

# Explore configurations
for config in card.configs:
    print(f"{config.config_name}: {config.dataset_type.value}")

Understanding Dataset Structure

# Get detailed config information
config_info = card.explore_config('metadata')
print(f"Features: {config_info['num_features']}")

# Check for partitioned data
if 'partitioning' in config_info:
    partition_info = config_info['partitioning']
    print(f"Partitioned by: {partition_info['partition_by']}")

Metadata Relationships

# Discover metadata relationships
relationships = card.get_metadata_relationships()
for rel in relationships:
    print(f"{rel.data_config} -> {rel.metadata_config} ({rel.relationship_type})")

Integration with HfQueryAPI

The datainfo package is designed to work seamlessly with HfQueryAPI for efficient data loading:

from tfbpapi import HfQueryAPI
from tfbpapi.datainfo import DataCard

# Explore dataset structure first
card = DataCard('BrentLab/rossi_2021')
config_info = card.explore_config('genome_map')

# Use insights to load data efficiently
query_api = HfQueryAPI('BrentLab/rossi_2021')
data = query_api.get_pandas('genome_map',
                           filters={'run_accession': 'SRR123456'})

For a complete tutorial, see the DataCard Tutorial.