Skip to content

HfCacheManager

tfbpapi.HfCacheManager.HfCacheManager

Bases: DataCard

Enhanced cache management for Hugging Face Hub with metadata-focused retrieval.

Source code in tfbpapi/HfCacheManager.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
class HfCacheManager(DataCard):
    """Enhanced cache management for Hugging Face Hub with metadata-focused
    retrieval."""

    def __init__(
        self,
        repo_id: str,
        duckdb_conn: duckdb.DuckDBPyConnection,
        token: str | None = None,
        logger: logging.Logger | None = None,
    ):
        super().__init__(repo_id, token)
        self.duckdb_conn = duckdb_conn
        self.logger = logger or logging.getLogger(__name__)

    def _get_metadata_for_config(
        self, config, force_refresh: bool = False
    ) -> dict[str, Any]:
        """Get metadata for a specific configuration using 3-case strategy.

        :param config: Configuration object to process
        :param force_refresh: If True, skip cache checks and download fresh from remote
        """
        config_result = {
            "config_name": config.config_name,
            "strategy": None,
            "table_name": None,
            "success": False,
            "message": "",
        }

        table_name = f"metadata_{config.config_name}"

        try:
            # Skip cache checks if force_refresh is True
            if not force_refresh:
                # Case 1: Check if metadata already exists in DuckDB
                if self._check_metadata_exists_in_duckdb(table_name):
                    config_result.update(
                        {
                            "strategy": "duckdb_exists",
                            "table_name": table_name,
                            "success": True,
                            "message": f"Metadata table {table_name} "
                            "already exists in DuckDB",
                        }
                    )
                    return config_result

                # Case 2: Check if HF data is in cache, create DuckDB representation
                if self._load_metadata_from_cache(config, table_name):
                    config_result.update(
                        {
                            "strategy": "cache_loaded",
                            "table_name": table_name,
                            "success": True,
                            "message": "Loaded metadata from cache "
                            f"into table {table_name}",
                        }
                    )
                    return config_result

            # Case 3: Download from HF (explicit vs embedded)
            if self._download_and_load_metadata(config, table_name):
                config_result.update(
                    {
                        "strategy": "downloaded",
                        "table_name": table_name,
                        "success": True,
                        "message": "Downloaded and loaded metadata "
                        f"into table {table_name}",
                    }
                )
                return config_result

            config_result["message"] = (
                f"Failed to retrieve metadata for {config.config_name}"
            )

        except Exception as e:
            config_result["message"] = f"Error processing {config.config_name}: {e}"
            self.logger.error(f"Error in metadata config {config.config_name}: {e}")

        return config_result

    def _check_metadata_exists_in_duckdb(self, table_name: str) -> bool:
        """Case 1: Check if metadata table already exists in DuckDB database."""
        try:
            # Query information schema to check if table exists
            result = self.duckdb_conn.execute(
                "SELECT table_name FROM information_schema.tables WHERE table_name = ?",
                [table_name],
            ).fetchone()

            exists = result is not None
            if exists:
                self.logger.debug(f"Table {table_name} already exists in DuckDB")
            return exists

        except Exception as e:
            self.logger.debug(f"Error checking DuckDB table existence: {e}")
            return False

    def _load_metadata_from_cache(self, config, table_name: str) -> bool:
        """Case 2: HF data in cache, create DuckDB representation."""
        try:
            # Check if metadata files are cached locally
            cached_files = []
            for data_file in config.data_files:
                cached_path = try_to_load_from_cache(
                    repo_id=self.repo_id,
                    filename=data_file.path,
                    repo_type="dataset",
                )

                if isinstance(cached_path, str) and Path(cached_path).exists():
                    cached_files.append(cached_path)

            if not cached_files:
                self.logger.debug(f"No cached files found for {config.config_name}")
                return False

            # Load cached parquet files into DuckDB
            self._create_duckdb_table_from_files(cached_files, table_name)
            self.logger.info(
                f"Loaded {len(cached_files)} cached files into {table_name}"
            )
            return True

        except Exception as e:
            self.logger.debug(f"Error loading from cache for {config.config_name}: {e}")
            return False

    def _download_and_load_metadata(self, config, table_name: str) -> bool:
        """Case 3: Download from HF (explicit vs embedded)."""
        try:
            from huggingface_hub import snapshot_download

            # Download specific files for this metadata config
            file_patterns = [data_file.path for data_file in config.data_files]

            downloaded_path = snapshot_download(
                repo_id=self.repo_id,
                repo_type="dataset",
                allow_patterns=file_patterns,
                token=self.token,
            )

            # Find downloaded parquet files
            downloaded_files = []
            for pattern in file_patterns:
                file_path = Path(downloaded_path) / pattern
                if file_path.exists() and file_path.suffix == ".parquet":
                    downloaded_files.append(str(file_path))
                else:
                    # Handle wildcard patterns, including nested wildcards
                    if "*" in pattern:
                        # Use glob on the full pattern relative to downloaded_path
                        base_path = Path(downloaded_path)
                        matching_files = list(base_path.glob(pattern))
                        downloaded_files.extend([str(f) for f in matching_files if f.suffix == ".parquet"])
                    else:
                        # Handle non-wildcard patterns that might be directories
                        parent_dir = Path(downloaded_path) / Path(pattern).parent
                        if parent_dir.exists():
                            downloaded_files.extend(
                                [str(f) for f in parent_dir.glob("*.parquet")]
                            )

            if not downloaded_files:
                self.logger.warning(
                    f"No parquet files found after download for {config.config_name}"
                )
                return False

            # Load downloaded files into DuckDB
            self._create_duckdb_table_from_files(downloaded_files, table_name)
            self.logger.info(
                f"Downloaded and loaded {len(downloaded_files)} files into {table_name}"
            )
            return True

        except Exception as e:
            self.logger.error(
                f"Error downloading metadata for {config.config_name}: {e}"
            )
            return False

    def _create_duckdb_table_from_files(
        self, file_paths: list[str], table_name: str
    ) -> None:
        """Create DuckDB table/view from parquet files."""
        if len(file_paths) == 1:
            # Single file
            create_sql = f"""
            CREATE OR REPLACE VIEW {table_name} AS
            SELECT * FROM read_parquet('{file_paths[0]}')
            """
        else:
            # Multiple files
            files_str = "', '".join(file_paths)
            create_sql = f"""
            CREATE OR REPLACE VIEW {table_name} AS
            SELECT * FROM read_parquet(['{files_str}'])
            """

        self.duckdb_conn.execute(create_sql)
        self.logger.debug(
            f"Created DuckDB view {table_name} from {len(file_paths)} files"
        )

    def _extract_embedded_metadata_field(
        self, data_table_name: str, field_name: str, metadata_table_name: str
    ) -> bool:
        """Extract a specific metadata field from a data table."""
        try:
            # Create a metadata view with unique values from the specified field
            extract_sql = f"""
            CREATE OR REPLACE VIEW {metadata_table_name} AS
            SELECT DISTINCT {field_name} as value, COUNT(*) as count
            FROM {data_table_name}
            WHERE {field_name} IS NOT NULL
            GROUP BY {field_name}
            ORDER BY count DESC
            """

            self.duckdb_conn.execute(extract_sql)

            # Verify the table was created and has data
            count_result = self.duckdb_conn.execute(
                f"SELECT COUNT(*) FROM {metadata_table_name}"
            ).fetchone()

            if count_result and count_result[0] > 0:
                self.logger.info(
                    f"Extracted {count_result[0]} unique values for {field_name} "
                    f"into {metadata_table_name}"
                )
                return True
            else:
                self.logger.warning(f"No data found for field {field_name}")
                return False

        except Exception as e:
            self.logger.error(f"Error extracting field {field_name}: {e}")
            return False

    def clean_cache_by_age(
        self,
        max_age_days: int = 30,
        dry_run: bool = True,
    ) -> DeleteCacheStrategy:
        """
        Clean cache entries older than specified age.

        :param max_age_days: Remove revisions older than this many days
        :param  dry_run: If True, show what would be deleted without executing
            size_threshold: Only delete if total cache size exceeds this (e.g., "10GB")

        :return: DeleteCacheStrategy object that can be executed

        """
        cache_info = scan_cache_dir()
        cutoff_date = datetime.now() - timedelta(days=max_age_days)

        old_revisions = []
        for repo in cache_info.repos:
            for revision in repo.revisions:
                # Check if revision is older than cutoff
                revision_date = datetime.fromtimestamp(revision.last_modified)
                if revision_date < cutoff_date:
                    old_revisions.append(revision.commit_hash)
                    self.logger.debug(
                        f"Marking for deletion: {revision.commit_hash} "
                        f"(last modified: {revision.last_modified})"
                    )

        if not old_revisions:
            self.logger.info("No old revisions found to delete")
            # return None

        delete_strategy = cache_info.delete_revisions(*old_revisions)

        self.logger.info(
            f"Found {len(old_revisions)} old revisions. "
            f"Will free {delete_strategy.expected_freed_size_str}"
        )

        if not dry_run:
            delete_strategy.execute()
            self.logger.info(
                f"Cache cleanup completed. Freed "
                f"{delete_strategy.expected_freed_size_str}"
            )
        else:
            self.logger.info("Dry run completed. Use dry_run=False to execute deletion")

        return delete_strategy

    def clean_cache_by_size(
        self,
        target_size: str,
        strategy: Literal[
            "oldest_first", "largest_first", "least_used"
        ] = "oldest_first",
        dry_run: bool = True,
    ) -> DeleteCacheStrategy:
        """
        Clean cache to reach target size by removing revisions.

        :param target_size: Target cache size (e.g., "5GB", "500MB")
        :param strategy: Deletion strategy - "oldest_first", "largest_first",
            "least_used"
        :param dry_run: If True, show what would be deleted without executing

        :return: DeleteCacheStrategy object that can be executed

        """
        cache_info = scan_cache_dir()
        current_size = cache_info.size_on_disk
        target_bytes = self._parse_size_string(target_size)

        if current_size <= target_bytes:
            self.logger.info(
                f"Cache size ({cache_info.size_on_disk_str}) already below "
                f"target ({target_size})"
            )

        bytes_to_free = current_size - target_bytes

        # Get all revisions sorted by strategy
        all_revisions = []
        for repo in cache_info.repos:
            for revision in repo.revisions:
                all_revisions.append(revision)

        # Sort revisions based on strategy
        if strategy == "oldest_first":
            all_revisions.sort(key=lambda r: r.last_modified)
        elif strategy == "largest_first":
            all_revisions.sort(key=lambda r: r.size_on_disk, reverse=True)
        elif strategy == "least_used":
            # Use last_modified as proxy for usage
            all_revisions.sort(key=lambda r: r.last_modified)
        else:
            raise ValueError(f"Unknown strategy: {strategy}")

        # Select revisions to delete
        revisions_to_delete = []
        freed_bytes = 0

        for revision in all_revisions:
            if freed_bytes >= bytes_to_free:
                break
            revisions_to_delete.append(revision.commit_hash)
            freed_bytes += revision.size_on_disk

        if not revisions_to_delete:
            self.logger.warning("No revisions selected for deletion")

        delete_strategy = cache_info.delete_revisions(*revisions_to_delete)

        self.logger.info(
            f"Selected {len(revisions_to_delete)} revisions for deletion. "
            f"Will free {delete_strategy.expected_freed_size_str}"
        )

        if not dry_run:
            delete_strategy.execute()
            self.logger.info(
                f"Cache cleanup completed. Freed "
                f"{delete_strategy.expected_freed_size_str}"
            )
        else:
            self.logger.info("Dry run completed. Use dry_run=False to execute deletion")

        return delete_strategy

    def clean_unused_revisions(
        self, keep_latest: int = 2, dry_run: bool = True
    ) -> DeleteCacheStrategy:
        """
        Clean unused revisions, keeping only the latest N revisions per repo.

        :param keep_latest: Number of latest revisions to keep per repo
        :param dry_run: If True, show what would be deleted without executing
        :return: DeleteCacheStrategy object that can be executed

        """
        cache_info = scan_cache_dir()
        revisions_to_delete = []

        for repo in cache_info.repos:
            # Sort revisions by last modified (newest first)
            sorted_revisions = sorted(
                repo.revisions, key=lambda r: r.last_modified, reverse=True
            )

            # Keep the latest N, mark the rest for deletion
            if len(sorted_revisions) > keep_latest:
                old_revisions = sorted_revisions[keep_latest:]
                for revision in old_revisions:
                    revisions_to_delete.append(revision.commit_hash)
                    self.logger.debug(
                        f"Marking old revision for deletion: {repo.repo_id} - "
                        f"{revision.commit_hash}"
                    )

        delete_strategy = cache_info.delete_revisions(*revisions_to_delete)

        self.logger.info(
            f"Found {len(revisions_to_delete)} unused revisions. "
            f"Will free {delete_strategy.expected_freed_size_str}"
        )

        if not dry_run:
            delete_strategy.execute()
            self.logger.info(
                f"Cache cleanup completed. Freed "
                f"{delete_strategy.expected_freed_size_str}"
            )
        else:
            self.logger.info("Dry run completed. Use dry_run=False to execute deletion")

        return delete_strategy

    def auto_clean_cache(
        self,
        max_age_days: int = 30,
        max_total_size: str = "10GB",
        keep_latest_per_repo: int = 2,
        dry_run: bool = True,
    ) -> list[DeleteCacheStrategy]:
        """
        Automated cache cleaning with multiple strategies.

        :param max_age_days: Remove revisions older than this
        :param max_total_size: Target maximum cache size
        :param keep_latest_per_repo: Keep this many latest revisions per repo
        :param dry_run: If True, show what would be deleted without executing
        :return: List of DeleteCacheStrategy objects that were executed

        """
        strategies_executed = []

        self.logger.info("Starting automated cache cleanup...")

        # Step 1: Remove very old revisions
        strategy = self.clean_cache_by_age(max_age_days=max_age_days, dry_run=dry_run)
        if strategy:
            strategies_executed.append(strategy)

        # Step 2: Remove unused revisions (keep only latest per repo)
        strategy = self.clean_unused_revisions(
            keep_latest=keep_latest_per_repo, dry_run=dry_run
        )
        if strategy:
            strategies_executed.append(strategy)

        # Step 3: If still over size limit, remove more aggressively
        cache_info = scan_cache_dir()
        if cache_info.size_on_disk > self._parse_size_string(max_total_size):
            strategy = self.clean_cache_by_size(
                target_size=max_total_size, strategy="oldest_first", dry_run=dry_run
            )
            if strategy:
                strategies_executed.append(strategy)

        total_freed = sum(s.expected_freed_size for s in strategies_executed)
        self.logger.info(
            f"Automated cleanup complete. Total freed: "
            f"{self._format_bytes(total_freed)}"
        )

        return strategies_executed

    def _parse_size_string(self, size_str: str) -> int:
        """Parse size string like '10GB' to bytes."""
        size_str = size_str.upper().strip()

        # Check longer units first to avoid partial matches
        multipliers = {"TB": 1024**4, "GB": 1024**3, "MB": 1024**2, "KB": 1024, "B": 1}

        for unit, multiplier in multipliers.items():
            if size_str.endswith(unit):
                number = float(size_str[: -len(unit)])
                return int(number * multiplier)

        # If no unit specified, assume bytes
        return int(size_str)

    def _format_bytes(self, bytes_size: int) -> str:
        """Format bytes into human readable string."""
        if bytes_size == 0:
            return "0B"

        # iterate over common units, dividing by 1024 each time, to find an
        # appropriate unit. Default to TB if the size is very large
        size = float(bytes_size)
        for unit in ["B", "KB", "MB", "GB", "TB"]:
            if size < 1024.0:
                return f"{size:.1f}{unit}"
            size /= 1024.0
        return f"{size:.1f}TB"

auto_clean_cache(max_age_days=30, max_total_size='10GB', keep_latest_per_repo=2, dry_run=True)

Automated cache cleaning with multiple strategies.

Parameters:

Name Type Description Default
max_age_days int

Remove revisions older than this

30
max_total_size str

Target maximum cache size

'10GB'
keep_latest_per_repo int

Keep this many latest revisions per repo

2
dry_run bool

If True, show what would be deleted without executing

True

Returns:

Type Description
list[DeleteCacheStrategy]

List of DeleteCacheStrategy objects that were executed

Source code in tfbpapi/HfCacheManager.py
def auto_clean_cache(
    self,
    max_age_days: int = 30,
    max_total_size: str = "10GB",
    keep_latest_per_repo: int = 2,
    dry_run: bool = True,
) -> list[DeleteCacheStrategy]:
    """
    Automated cache cleaning with multiple strategies.

    :param max_age_days: Remove revisions older than this
    :param max_total_size: Target maximum cache size
    :param keep_latest_per_repo: Keep this many latest revisions per repo
    :param dry_run: If True, show what would be deleted without executing
    :return: List of DeleteCacheStrategy objects that were executed

    """
    strategies_executed = []

    self.logger.info("Starting automated cache cleanup...")

    # Step 1: Remove very old revisions
    strategy = self.clean_cache_by_age(max_age_days=max_age_days, dry_run=dry_run)
    if strategy:
        strategies_executed.append(strategy)

    # Step 2: Remove unused revisions (keep only latest per repo)
    strategy = self.clean_unused_revisions(
        keep_latest=keep_latest_per_repo, dry_run=dry_run
    )
    if strategy:
        strategies_executed.append(strategy)

    # Step 3: If still over size limit, remove more aggressively
    cache_info = scan_cache_dir()
    if cache_info.size_on_disk > self._parse_size_string(max_total_size):
        strategy = self.clean_cache_by_size(
            target_size=max_total_size, strategy="oldest_first", dry_run=dry_run
        )
        if strategy:
            strategies_executed.append(strategy)

    total_freed = sum(s.expected_freed_size for s in strategies_executed)
    self.logger.info(
        f"Automated cleanup complete. Total freed: "
        f"{self._format_bytes(total_freed)}"
    )

    return strategies_executed

clean_cache_by_age(max_age_days=30, dry_run=True)

Clean cache entries older than specified age.

Parameters:

Name Type Description Default
max_age_days int

Remove revisions older than this many days

30
dry_run

If True, show what would be deleted without executing size_threshold: Only delete if total cache size exceeds this (e.g., “10GB”)

True

Returns:

Type Description
DeleteCacheStrategy

DeleteCacheStrategy object that can be executed

Source code in tfbpapi/HfCacheManager.py
def clean_cache_by_age(
    self,
    max_age_days: int = 30,
    dry_run: bool = True,
) -> DeleteCacheStrategy:
    """
    Clean cache entries older than specified age.

    :param max_age_days: Remove revisions older than this many days
    :param  dry_run: If True, show what would be deleted without executing
        size_threshold: Only delete if total cache size exceeds this (e.g., "10GB")

    :return: DeleteCacheStrategy object that can be executed

    """
    cache_info = scan_cache_dir()
    cutoff_date = datetime.now() - timedelta(days=max_age_days)

    old_revisions = []
    for repo in cache_info.repos:
        for revision in repo.revisions:
            # Check if revision is older than cutoff
            revision_date = datetime.fromtimestamp(revision.last_modified)
            if revision_date < cutoff_date:
                old_revisions.append(revision.commit_hash)
                self.logger.debug(
                    f"Marking for deletion: {revision.commit_hash} "
                    f"(last modified: {revision.last_modified})"
                )

    if not old_revisions:
        self.logger.info("No old revisions found to delete")
        # return None

    delete_strategy = cache_info.delete_revisions(*old_revisions)

    self.logger.info(
        f"Found {len(old_revisions)} old revisions. "
        f"Will free {delete_strategy.expected_freed_size_str}"
    )

    if not dry_run:
        delete_strategy.execute()
        self.logger.info(
            f"Cache cleanup completed. Freed "
            f"{delete_strategy.expected_freed_size_str}"
        )
    else:
        self.logger.info("Dry run completed. Use dry_run=False to execute deletion")

    return delete_strategy

clean_cache_by_size(target_size, strategy='oldest_first', dry_run=True)

Clean cache to reach target size by removing revisions.

Parameters:

Name Type Description Default
target_size str

Target cache size (e.g., “5GB”, “500MB”)

required
strategy Literal['oldest_first', 'largest_first', 'least_used']

Deletion strategy - “oldest_first”, “largest_first”, “least_used”

'oldest_first'
dry_run bool

If True, show what would be deleted without executing

True

Returns:

Type Description
DeleteCacheStrategy

DeleteCacheStrategy object that can be executed

Source code in tfbpapi/HfCacheManager.py
def clean_cache_by_size(
    self,
    target_size: str,
    strategy: Literal[
        "oldest_first", "largest_first", "least_used"
    ] = "oldest_first",
    dry_run: bool = True,
) -> DeleteCacheStrategy:
    """
    Clean cache to reach target size by removing revisions.

    :param target_size: Target cache size (e.g., "5GB", "500MB")
    :param strategy: Deletion strategy - "oldest_first", "largest_first",
        "least_used"
    :param dry_run: If True, show what would be deleted without executing

    :return: DeleteCacheStrategy object that can be executed

    """
    cache_info = scan_cache_dir()
    current_size = cache_info.size_on_disk
    target_bytes = self._parse_size_string(target_size)

    if current_size <= target_bytes:
        self.logger.info(
            f"Cache size ({cache_info.size_on_disk_str}) already below "
            f"target ({target_size})"
        )

    bytes_to_free = current_size - target_bytes

    # Get all revisions sorted by strategy
    all_revisions = []
    for repo in cache_info.repos:
        for revision in repo.revisions:
            all_revisions.append(revision)

    # Sort revisions based on strategy
    if strategy == "oldest_first":
        all_revisions.sort(key=lambda r: r.last_modified)
    elif strategy == "largest_first":
        all_revisions.sort(key=lambda r: r.size_on_disk, reverse=True)
    elif strategy == "least_used":
        # Use last_modified as proxy for usage
        all_revisions.sort(key=lambda r: r.last_modified)
    else:
        raise ValueError(f"Unknown strategy: {strategy}")

    # Select revisions to delete
    revisions_to_delete = []
    freed_bytes = 0

    for revision in all_revisions:
        if freed_bytes >= bytes_to_free:
            break
        revisions_to_delete.append(revision.commit_hash)
        freed_bytes += revision.size_on_disk

    if not revisions_to_delete:
        self.logger.warning("No revisions selected for deletion")

    delete_strategy = cache_info.delete_revisions(*revisions_to_delete)

    self.logger.info(
        f"Selected {len(revisions_to_delete)} revisions for deletion. "
        f"Will free {delete_strategy.expected_freed_size_str}"
    )

    if not dry_run:
        delete_strategy.execute()
        self.logger.info(
            f"Cache cleanup completed. Freed "
            f"{delete_strategy.expected_freed_size_str}"
        )
    else:
        self.logger.info("Dry run completed. Use dry_run=False to execute deletion")

    return delete_strategy

clean_unused_revisions(keep_latest=2, dry_run=True)

Clean unused revisions, keeping only the latest N revisions per repo.

Parameters:

Name Type Description Default
keep_latest int

Number of latest revisions to keep per repo

2
dry_run bool

If True, show what would be deleted without executing

True

Returns:

Type Description
DeleteCacheStrategy

DeleteCacheStrategy object that can be executed

Source code in tfbpapi/HfCacheManager.py
def clean_unused_revisions(
    self, keep_latest: int = 2, dry_run: bool = True
) -> DeleteCacheStrategy:
    """
    Clean unused revisions, keeping only the latest N revisions per repo.

    :param keep_latest: Number of latest revisions to keep per repo
    :param dry_run: If True, show what would be deleted without executing
    :return: DeleteCacheStrategy object that can be executed

    """
    cache_info = scan_cache_dir()
    revisions_to_delete = []

    for repo in cache_info.repos:
        # Sort revisions by last modified (newest first)
        sorted_revisions = sorted(
            repo.revisions, key=lambda r: r.last_modified, reverse=True
        )

        # Keep the latest N, mark the rest for deletion
        if len(sorted_revisions) > keep_latest:
            old_revisions = sorted_revisions[keep_latest:]
            for revision in old_revisions:
                revisions_to_delete.append(revision.commit_hash)
                self.logger.debug(
                    f"Marking old revision for deletion: {repo.repo_id} - "
                    f"{revision.commit_hash}"
                )

    delete_strategy = cache_info.delete_revisions(*revisions_to_delete)

    self.logger.info(
        f"Found {len(revisions_to_delete)} unused revisions. "
        f"Will free {delete_strategy.expected_freed_size_str}"
    )

    if not dry_run:
        delete_strategy.execute()
        self.logger.info(
            f"Cache cleanup completed. Freed "
            f"{delete_strategy.expected_freed_size_str}"
        )
    else:
        self.logger.info("Dry run completed. Use dry_run=False to execute deletion")

    return delete_strategy