Source code for carpyncho

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2020, 2021, 2022, Juan B Cabral
# License: BSD-3-Clause
#   Full Text: https://github.com/carpyncho/carpyncho-py/blob/master/LICENSE


# =============================================================================
# DOCS
# =============================================================================

"""Python client for Carpyncho VVV dataset collection.

This code access as a Pandas DataFrame all the data of the web version of
Carpyncho https://carpyncho.github.io/.

"""


__all__ = ["Carpyncho", "CARPYNCHOPY_DATA_PATH"]


__version__ = "0.3"


# =============================================================================
# IMPORTS
# =============================================================================

import bz2
import functools
import hashlib
import inspect
import io
import json
import os
import pathlib
import pickle
import typing as t
import urllib

import attr

import diskcache as dcache

import humanize

import pandas as pd

import requests

import tqdm

import typer

# =============================================================================
# CONSTANTS
# =============================================================================

VERSION = __version__

#: Location of the entire dataset index.
CARPYNCHO_INDEX_URL = "https://raw.githubusercontent.com/carpyncho/carpyncho-py/master/data/index.json"  # noqa


#: Where carpyncho gonna store the entire data.
CARPYNCHOPY_DATA_PATH = pathlib.Path(
    os.path.expanduser(os.path.join("~", "carpyncho_py_data"))
)


#: Chunk size when the library are download the big files of Carpyncho.
CHUNK_SIZE = 32768

#: Maximun cache size (10TB)
DEFAULT_CACHE_SIZE_LIMIT = int(1e10)

#: The location of the cache catabase and files.
DEFAULT_CACHE_DIR = CARPYNCHOPY_DATA_PATH / "_cache_"

#: The default carpyncho parquet default
DEFAULT_PARQUET_ENGINE = "auto"


# =============================================================================
# CACHE ORCHESTRATION
# =============================================================================


def from_cache(
    cache, tag, function, cache_expire, force=False, *args, **kwargs
):
    """Simplify cache orchestration.

    Parameters
    ----------
    tag: str
        Normally every function call the cache with their own tag.
        We sugest "module.function" or "module.Class.function"

    function: callable
        The function to be cached

    force: bool (default=False)
        If the vale of the cache must be ignored and re-execute the
        function.

    cache_expire: bool or None
        Time in seconds to expire the function call

    args and kwargs:
        All the parameters needed to execute the function.

    Returns
    -------
    The result of calling the function or the cached version of the same value.

    """
    # start the cache orchestration
    key = dcache.core.args_to_key(
        base=("carpyncho", tag),
        args=args,
        kwargs=kwargs,
        typed=False,
        ignore=[],
    )

    with cache as c:
        c.expire()

        value = (
            dcache.core.ENOVAL
            if force
            else c.get(key, default=dcache.core.ENOVAL, retry=True)
        )

        if value is dcache.core.ENOVAL:
            value = function(**kwargs)
            c.set(
                key,
                value,
                expire=cache_expire,
                tag=f"carpyncho.{tag}",
                retry=True,
            )

    return value


# =============================================================================
# CLIENT
# =============================================================================


[docs]@attr.s(hash=False, frozen=True) class Carpyncho: """Client to access the *Carpyncho VVV dataset collection*. This code access as a Pandas Dataframe all the data of the web version of Carpyncho. https://carpyncho.github.io/. Parameters ---------- cache : ``diskcache.Cache``, ``diskcache.Fanout``, or ``None`` (default: ``None``) Any instance of ``diskcache.Cache``, ``diskcache.Fanout`` or ``None`` (Default). If it's ``None`` a ``diskcache.Cache`` istance is created with the parameter ``directory = carpyncho.DEFAULT_CACHE_DIR``. More information: http://www.grantjenks.com/docs/diskcache cache_expire : ``float`` or None (default=``None``) Seconds until item expires (default ``None``, no expiry) More information: http://www.grantjenks.com/docs/diskcache parquet_engine : ``str`` (default="auto") Default Parquet library to use. Remotely carpyncho stores all the data as compresses parquet files; When the download happend a this must be parsed. If ‘auto’, then the option io.parquet.engine is used. The default io.parquet.engine behavior is to try ‘pyarrow’, falling back to ‘fastparquet’ if ‘pyarrow’ is unavailable. """ #: Location of the catalog cache cache_path: str = attr.ib(default=DEFAULT_CACHE_DIR) #: Default timeout of the catalog-cache. #: Try to always set to None (default), the catalogs are big and mostly #: never change. cache_expire: float = attr.ib(default=None, repr=False) #: Default Parquet library to use. parquet_engine: str = attr.ib(DEFAULT_PARQUET_ENGINE) #: Location of the carpyncho index (usefull for development) index_url: str = attr.ib(default=CARPYNCHO_INDEX_URL) # ========================================================================= # Cache properti # ========================================================================= @property @functools.lru_cache(maxsize=None) def cache(self): """Return the internal cache of the client the internal cache.""" return dcache.Cache( directory=self.cache_path, size_limit=DEFAULT_CACHE_SIZE_LIMIT, default_pickle_protocol=pickle.DEFAULT_PROTOCOL, ) # ========================================================================= # UTILITIES FOR CHECK THE REMOTE DATA # =========================================================================
[docs] def retrieve_index(self, reset): """Access the remote index of the Carpyncho-Dataset. The index is stored internally for 1 hr. Parameters ---------- reset: bool If its True the entire cache is ignored and a new index is donwloaded and cached. Returns ------- dict with the index structure. """ def get_json_data(url): parsed = urllib.parse.urlparse(url) if parsed.scheme in ("http", "https", "ftp"): response = requests.get( url, headers={"Cache-Control": "no-cache"} ) return response.json() with open(url) as fp: return json.load(fp) return from_cache( cache=self.cache, tag="get_index", function=get_json_data, cache_expire=3600, force=reset, url=self.index_url, )
@property def index_(self): """Structure of the Carpyncho dataset information as a Python-dict.""" return self.retrieve_index(reset=False)
[docs] def list_tiles(self): """Retrieve available tiles with catalogs as a tuple of str.""" index = self.index_ return tuple(k for k in index.keys() if not k.startswith("_"))
[docs] def list_catalogs(self, tile): """Retrieve the available catalogs for a given tile. Parameters ---------- tile: str The name of the tile to retrieve the catalogs. Returns ------- tuple of str: The names of available catalogs in the given tile. Raises ------ ValueError: If the tile is not found. """ index = self.index_ if tile not in index: raise ValueError(f"Tile {tile} not found") return tuple(index[tile])
[docs] def has_catalog(self, tile, catalog): """Check if a given catalog and tile exists. Parameters ---------- tile: str The name of the tile. catalog: The name of the catalog. Returns ------- bool: True if the convination tile+catalog exists. """ cat = self.index_.get(tile, {}).get(catalog) return bool(cat)
[docs] def catalog_info(self, tile, catalog): """Retrieve the information about a given catalog. Parameters ---------- tile: str The name of the tile. catalog: The name of the catalog. Returns ------- dict: The entire information of the given catalog file. This include url, md5 checksum, size in bytes, number of total records, etc. Raises ------ ValueError: If the tile or the catalog is not found. """ index = self.index_ if tile not in index: raise ValueError(f"Tile {tile} not found") tile = index[tile] if catalog not in tile: raise ValueError(f"Catalog {catalog} for tile {tile} not found") return tile[catalog]
# ========================================================================= # THE DOWNLOAD PART # ========================================================================= def _http_download(self, tile, catalog, url, size, md5sum): # prepare the parameters and download the token session = requests.Session() # make the real deal request response = session.get( url, stream=True, ) # progress bar pbar = tqdm.tqdm( total=size, initial=0, unit="B", unit_scale=True, desc=f"{tile}-{catalog}", ) # the file is a bz2 file, we are going to decompress and store # the raw parquet data into a BytesIO decompressor = bz2.BZ2Decompressor() parquet_stream = io.BytesIO() # ademas necesitamos fijarnos que el md5 este ok file_hash = hashlib.md5() # retrive all the data one chunk at the time for chunk in response.iter_content(CHUNK_SIZE): if not chunk: break decompressed = decompressor.decompress(chunk) parquet_stream.write(decompressed) file_hash.update(chunk) pbar.update(CHUNK_SIZE) # stop the progress bar pbar.close() # check if the file was download correctly if file_hash.hexdigest() != md5sum: raise IOError( f"'{tile}-{catalog}' incorrect download.\n" f"expected: {md5sum}\n" f"caclulated: {file_hash.hexdigest()}" ) # read the entire stream into a dataframe parquet_stream.seek(0) df = pd.read_parquet(parquet_stream, engine=self.parquet_engine) return df
[docs] def get_catalog(self, tile, catalog, force=False): """Retrieve a catalog from the carpyncho dataset. Parameters ---------- tile: str The name of the tile. catalog: The name of the catalog. force: bool (default=False) If its True, the cached version of the catalog is ignored and redownloaded. Try to always set force to False. Returns ------- pandas.DataFrame: The columns of the DataFrame changes between the different catalog. Raises ------ ValueError: If the tile or the catalog is not found. IOError: If the checksum not match. """ info = self.catalog_info(tile, catalog) url, size = info["url"], info["size"] md5sum = info["md5sum"].split()[0].strip().lower() df = from_cache( cache=self.cache, tag="get_catalog", function=self._http_download, cache_expire=self.cache_expire, force=force, # params to _http_download tile=tile, catalog=catalog, url=url, size=size, md5sum=md5sum, ) return df
# ============================================================================= # CLI # ============================================================================= @attr.s(frozen=True) class CLI: """Carpyncho console client. Explore and download the entire https://carpyncho.github.io/ catalogs from your command line. """ footnotes = "\n".join( [ "This software is under the BSD 3-Clause License.", "Copyright (c) 2020, 2021, 2022, Juan Cabral.", "For bug reporting or other instructions please check:" " https://github.com/carpyncho/carpyncho-py", ] ) run = attr.ib(init=False) client_config = attr.ib(factory=dict) @run.default def _set_run_default(self): app = typer.Typer() decorator = app.callback() decorator(self._set_global_state) for k in dir(self): if k.startswith("_"): continue v = getattr(self, k) if inspect.ismethod(v) and not k.startswith("_"): decorator = app.command() decorator(v) return app def _set_global_state( self, cache_path: str = typer.Option( default=DEFAULT_CACHE_DIR, help="Path of the cache." ), cache_expire: t.Optional[float] = typer.Option( default=None, help="Default timeout of the cache. By default never expire.", ), parquet_engine: str = typer.Option( default=DEFAULT_PARQUET_ENGINE, help="Parquet engine to decode de file", ), index_url: str = typer.Option( default=CARPYNCHO_INDEX_URL, help="Path of the index.json file" ), ): self.client_config.update( cache_path=cache_path, cache_expire=cache_expire, parquet_engine=parquet_engine, index_url=index_url, ) def version(self): """Print Carpyncho version.""" typer.echo(VERSION) def list_tiles(self): """Show available tiles.""" client = Carpyncho(**self.client_config) msg = typer.style("Tiles", fg=typer.colors.GREEN) typer.echo(msg) for tile in client.list_tiles(): typer.echo(f" - {tile}") def list_catalogs( self, tile: str = typer.Argument( ..., help="The name of the tile to retrieve the catalogs" ), ): """Show the available catalogs for a given tile.""" client = Carpyncho(**self.client_config) msg = typer.style(f"Tile '{tile}'", fg=typer.colors.GREEN) typer.echo(msg) for catalog in client.list_catalogs(tile=tile): typer.echo(f" - {catalog}") def has_catalog( self, tile: str = typer.Argument(..., help="The name of the tile"), catalog: str = typer.Argument(..., help="Tha name of the catalog"), ): """Check if a given catalog and tile exists.""" client = Carpyncho(**self.client_config) if client.has_catalog(tile, catalog): has, fg = "exists", typer.colors.GREEN else: has, fg = "NO exists", typer.colors.RED msg = typer.style( f"Catalog '{catalog}' or tile '{tile}': {has}", fg=fg ) typer.echo(msg) def catalog_info( self, tile: str = typer.Argument(..., help="The name of the tile"), catalog: str = typer.Argument(..., help="Tha name of the catalog"), ): """Retrieve the information about a given catalog.""" FORMATTERS = { "size": functools.partial(humanize.naturalsize, binary=True), "records": humanize.intcomma, } client = Carpyncho(**self.client_config) msg = typer.style( f"Catalog {tile}-{catalog}", fg=typer.colors.GREEN, bold=True ) typer.echo(msg) for k, v in client.catalog_info(tile, catalog).items(): fmt = FORMATTERS.get(k, str) typer.echo(f" - {k}: {fmt(v)}") def download_catalog( self, tile: str = typer.Argument(..., help="The name of the tile"), catalog: str = typer.Argument(..., help="Tha name of the catalog"), force: bool = typer.Option( default=False, help=( "Force to ignore the cached value and redownload the catalog. " "Try to always set force to False." ), ), out: str = typer.Option( ..., help=( "Path to store the catalog. The extension of the file " "determines the format. Options are '.xlsx' (Excel), '.csv', " "'.pkl' (Python pickle) and '.parquet'" ), ), ): """Retrives a catalog from th Carpyncho dataset collection. tile: The name of the tile. catalog: The name of the catalog. out: The location to store the catalog. force: Download a new version of the catalog even if it already exists in the cache. """ PARSERS = { ".xlsx": pd.DataFrame.to_excel, ".csv": pd.DataFrame.to_csv, ".pkl": pd.DataFrame.to_pickle, ".parquet": pd.DataFrame.to_parquet, } client = Carpyncho(**self.client_config) df = client.get_catalog(tile, catalog, force=force) ext = os.path.splitext(out)[-1].lower() if ext not in PARSERS: typer.echo(f"format '{ext}' not recognized", err=True) raise typer.Exit() typer.echo(f"Writing {out}...") parser = PARSERS[ext] parser(df, out) def main(): """Run the carpyncho CLI interface.""" cli = CLI() cli.run() if __name__ == "__main__": main()