download module¶
This module provides functions to download data, including NAIP imagery and building data from Overture Maps.
convert_vector_format(input_file, output_format='geojson', filter_expression=None)
¶
Convert the downloaded data to a different format or filter it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_file |
str |
Path to the input file. |
required |
output_format |
str |
Format to convert to, one of "geojson", "parquet", "shapefile", "csv". |
'geojson' |
filter_expression |
Optional[str] |
Optional GeoDataFrame query expression to filter the data. |
None |
Returns:
Type | Description |
---|---|
str |
Path to the converted file. |
Source code in geoai/download.py
def convert_vector_format(
input_file: str,
output_format: str = "geojson",
filter_expression: Optional[str] = None,
) -> str:
"""Convert the downloaded data to a different format or filter it.
Args:
input_file: Path to the input file.
output_format: Format to convert to, one of "geojson", "parquet", "shapefile", "csv".
filter_expression: Optional GeoDataFrame query expression to filter the data.
Returns:
Path to the converted file.
"""
try:
# Read the input file
logger.info(f"Reading {input_file}")
gdf = gpd.read_file(input_file)
# Apply filter if specified
if filter_expression:
logger.info(f"Filtering data using expression: {filter_expression}")
gdf = gdf.query(filter_expression)
logger.info(f"After filtering: {len(gdf)} features")
# Define output file path
base_path = os.path.splitext(input_file)[0]
if output_format == "geojson":
output_file = f"{base_path}.geojson"
logger.info(f"Converting to GeoJSON: {output_file}")
gdf.to_file(output_file, driver="GeoJSON")
elif output_format == "parquet":
output_file = f"{base_path}.parquet"
logger.info(f"Converting to Parquet: {output_file}")
gdf.to_parquet(output_file)
elif output_format == "shapefile":
output_file = f"{base_path}.shp"
logger.info(f"Converting to Shapefile: {output_file}")
gdf.to_file(output_file)
elif output_format == "csv":
output_file = f"{base_path}.csv"
logger.info(f"Converting to CSV: {output_file}")
# For CSV, we need to convert geometry to WKT
gdf["geometry_wkt"] = gdf.geometry.apply(lambda g: g.wkt)
# Save to CSV with geometry as WKT
gdf.drop(columns=["geometry"]).to_csv(output_file, index=False)
else:
raise ValueError(f"Unsupported output format: {output_format}")
return output_file
except Exception as e:
logger.error(f"Error converting data: {str(e)}")
raise
download_naip(bbox, output_dir, year=None, max_items=10, overwrite=False, preview=False, **kwargs)
¶
Download NAIP imagery from Planetary Computer based on a bounding box.
This function searches for NAIP (National Agriculture Imagery Program) imagery from Microsoft's Planetary Computer that intersects with the specified bounding box. It downloads the imagery and saves it as GeoTIFF files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bbox |
Tuple[float, float, float, float] |
Bounding box in the format (min_lon, min_lat, max_lon, max_lat) in WGS84 coordinates. |
required |
output_dir |
str |
Directory to save the downloaded imagery. |
required |
year |
Optional[int] |
Specific year of NAIP imagery to download (e.g., 2020). If None, returns imagery from all available years. |
None |
max_items |
int |
Maximum number of items to download. |
10 |
overwrite |
bool |
If True, overwrite existing files with the same name. |
False |
preview |
bool |
If True, display a preview of the downloaded imagery. |
False |
Returns:
Type | Description |
---|---|
List[str] |
List of downloaded file paths. |
Exceptions:
Type | Description |
---|---|
Exception |
If there is an error downloading or saving the imagery. |
Source code in geoai/download.py
def download_naip(
bbox: Tuple[float, float, float, float],
output_dir: str,
year: Optional[int] = None,
max_items: int = 10,
overwrite: bool = False,
preview: bool = False,
**kwargs: Any,
) -> List[str]:
"""Download NAIP imagery from Planetary Computer based on a bounding box.
This function searches for NAIP (National Agriculture Imagery Program) imagery
from Microsoft's Planetary Computer that intersects with the specified bounding box.
It downloads the imagery and saves it as GeoTIFF files.
Args:
bbox: Bounding box in the format (min_lon, min_lat, max_lon, max_lat) in WGS84 coordinates.
output_dir: Directory to save the downloaded imagery.
year: Specific year of NAIP imagery to download (e.g., 2020). If None, returns imagery from all available years.
max_items: Maximum number of items to download.
overwrite: If True, overwrite existing files with the same name.
preview: If True, display a preview of the downloaded imagery.
Returns:
List of downloaded file paths.
Raises:
Exception: If there is an error downloading or saving the imagery.
"""
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Create a geometry from the bounding box
geometry = box(*bbox)
# Connect to Planetary Computer STAC API
catalog = Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")
# Build query for NAIP data
search_params = {
"collections": ["naip"],
"intersects": geometry,
"limit": max_items,
}
# Add year filter if specified
if year:
search_params["query"] = {"naip:year": {"eq": year}}
for key, value in kwargs.items():
search_params[key] = value
# Search for NAIP imagery
search_results = catalog.search(**search_params)
items = list(search_results.items())
if len(items) > max_items:
items = items[:max_items]
if not items:
print("No NAIP imagery found for the specified region and parameters.")
return []
print(f"Found {len(items)} NAIP items.")
# Download and save each item
downloaded_files = []
for i, item in enumerate(items):
# Sign the assets (required for Planetary Computer)
signed_item = pc.sign(item)
# Get the RGB asset URL
rgb_asset = signed_item.assets.get("image")
if not rgb_asset:
print(f"No RGB asset found for item {i+1}")
continue
# Use the original filename from the asset
original_filename = os.path.basename(
rgb_asset.href.split("?")[0]
) # Remove query parameters
output_path = os.path.join(output_dir, original_filename)
if not overwrite and os.path.exists(output_path):
print(f"Skipping existing file: {output_path}")
downloaded_files.append(output_path)
continue
print(f"Downloading item {i+1}/{len(items)}: {original_filename}")
try:
# Open and save the data with progress bar
# For direct file download with progress bar
if rgb_asset.href.startswith("http"):
download_with_progress(rgb_asset.href, output_path)
#
else:
# Fallback to direct rioxarray opening (less common case)
data = rxr.open_rasterio(rgb_asset.href)
data.rio.to_raster(output_path)
downloaded_files.append(output_path)
print(f"Successfully saved to {output_path}")
# Optional: Display a preview (uncomment if needed)
if preview:
data = rxr.open_rasterio(output_path)
preview_raster(data)
except Exception as e:
print(f"Error downloading item {i+1}: {str(e)}")
return downloaded_files
download_overture_buildings(bbox, output, overture_type='building', **kwargs)
¶
Download building data from Overture Maps for a given bounding box using the overturemaps CLI tool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bbox |
Tuple[float, float, float, float] |
Bounding box in the format (min_lon, min_lat, max_lon, max_lat) in WGS84 coordinates. |
required |
output |
str |
Path to save the output file. |
required |
overture_type |
str |
The Overture Maps data type to download (building, place, etc.). |
'building' |
Returns:
Type | Description |
---|---|
str |
Path to the output file. |
Source code in geoai/download.py
def download_overture_buildings(
bbox: Tuple[float, float, float, float],
output: str,
overture_type: str = "building",
**kwargs: Any,
) -> str:
"""Download building data from Overture Maps for a given bounding box using the overturemaps CLI tool.
Args:
bbox: Bounding box in the format (min_lon, min_lat, max_lon, max_lat) in WGS84 coordinates.
output: Path to save the output file.
overture_type: The Overture Maps data type to download (building, place, etc.).
Returns:
Path to the output file.
"""
return get_overture_data(
overture_type=overture_type, bbox=bbox, output=output, **kwargs
)
download_pc_stac_item(item_url, bands=None, output_dir=None, show_progress=True, merge_bands=False, merged_filename=None, overwrite=False, cell_size=None)
¶
Downloads a STAC item from Microsoft Planetary Computer with specified bands.
This function fetches a STAC item by URL, signs the assets using Planetary Computer credentials, and downloads the specified bands with a progress bar. Can optionally merge bands into a single multi-band GeoTIFF.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item_url |
str |
The URL of the STAC item to download. |
required |
bands |
list |
List of specific bands to download (e.g., ['B01', 'B02']). If None, all available bands will be downloaded. |
None |
output_dir |
str |
Directory to save downloaded bands. If None, bands are returned as xarray DataArrays. |
None |
show_progress |
bool |
Whether to display a progress bar. Default is True. |
True |
merge_bands |
bool |
Whether to merge downloaded bands into a single multi-band GeoTIFF file. Default is False. |
False |
merged_filename |
str |
Filename for the merged bands. If None and merge_bands is True, uses "{item_id}_merged.tif". |
None |
overwrite |
bool |
Whether to overwrite existing files. Default is False. |
False |
cell_size |
float |
Resolution in meters for the merged output. If None, uses the resolution of the first band. |
None |
Returns:
Type | Description |
---|---|
dict |
Dictionary mapping band names to their corresponding xarray DataArrays or file paths if output_dir is provided. If merge_bands is True, also includes a 'merged' key with the path to the merged file. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the item cannot be retrieved or a requested band is not available. |
Source code in geoai/download.py
def download_pc_stac_item(
item_url,
bands=None,
output_dir=None,
show_progress=True,
merge_bands=False,
merged_filename=None,
overwrite=False,
cell_size=None,
):
"""
Downloads a STAC item from Microsoft Planetary Computer with specified bands.
This function fetches a STAC item by URL, signs the assets using Planetary Computer
credentials, and downloads the specified bands with a progress bar. Can optionally
merge bands into a single multi-band GeoTIFF.
Args:
item_url (str): The URL of the STAC item to download.
bands (list, optional): List of specific bands to download (e.g., ['B01', 'B02']).
If None, all available bands will be downloaded.
output_dir (str, optional): Directory to save downloaded bands. If None,
bands are returned as xarray DataArrays.
show_progress (bool, optional): Whether to display a progress bar. Default is True.
merge_bands (bool, optional): Whether to merge downloaded bands into a single
multi-band GeoTIFF file. Default is False.
merged_filename (str, optional): Filename for the merged bands. If None and
merge_bands is True, uses "{item_id}_merged.tif".
overwrite (bool, optional): Whether to overwrite existing files. Default is False.
cell_size (float, optional): Resolution in meters for the merged output. If None,
uses the resolution of the first band.
Returns:
dict: Dictionary mapping band names to their corresponding xarray DataArrays
or file paths if output_dir is provided. If merge_bands is True, also
includes a 'merged' key with the path to the merged file.
Raises:
ValueError: If the item cannot be retrieved or a requested band is not available.
"""
from rasterio.enums import Resampling
# Get the item ID from the URL
item_id = item_url.split("/")[-1]
collection = item_url.split("/collections/")[1].split("/items/")[0]
# Connect to the Planetary Computer STAC API
catalog = Client.open(
"https://planetarycomputer.microsoft.com/api/stac/v1",
modifier=pc.sign_inplace,
)
# Search for the specific item
search = catalog.search(collections=[collection], ids=[item_id])
# Get the first item from the search results
items = list(search.get_items())
if not items:
raise ValueError(f"Item with ID {item_id} not found")
item = items[0]
# Determine which bands to download
available_assets = list(item.assets.keys())
if bands is None:
# If no bands specified, download all band assets
bands_to_download = [
asset for asset in available_assets if asset.startswith("B")
]
else:
# Verify all requested bands exist
missing_bands = [band for band in bands if band not in available_assets]
if missing_bands:
raise ValueError(
f"The following bands are not available: {missing_bands}. "
f"Available assets are: {available_assets}"
)
bands_to_download = bands
# Create output directory if specified and doesn't exist
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
result = {}
band_data_arrays = []
resampled_arrays = []
band_names = [] # Track band names in order
# Set up progress bar
progress_iter = (
tqdm(bands_to_download, desc="Downloading bands")
if show_progress
else bands_to_download
)
# Download each requested band
for band in progress_iter:
if band not in item.assets:
if show_progress and not isinstance(progress_iter, list):
progress_iter.write(
f"Warning: Band {band} not found in assets, skipping."
)
continue
band_url = item.assets[band].href
if output_dir:
file_path = os.path.join(output_dir, f"{item.id}_{band}.tif")
# Check if file exists and skip if overwrite is False
if os.path.exists(file_path) and not overwrite:
if show_progress and not isinstance(progress_iter, list):
progress_iter.write(
f"File {file_path} already exists, skipping (use overwrite=True to force download)."
)
# Still need to open the file to get the data for merging
if merge_bands:
band_data = rxr.open_rasterio(file_path)
band_data_arrays.append((band, band_data))
band_names.append(band)
result[band] = file_path
continue
if show_progress and not isinstance(progress_iter, list):
progress_iter.set_description(f"Downloading {band}")
band_data = rxr.open_rasterio(band_url)
# Store the data array for potential merging later
if merge_bands:
band_data_arrays.append((band, band_data))
band_names.append(band)
if output_dir:
file_path = os.path.join(output_dir, f"{item.id}_{band}.tif")
band_data.rio.to_raster(file_path)
result[band] = file_path
else:
result[band] = band_data
# Merge bands if requested
if merge_bands and output_dir:
if merged_filename is None:
merged_filename = f"{item.id}_merged.tif"
merged_path = os.path.join(output_dir, merged_filename)
# Check if merged file exists and skip if overwrite is False
if os.path.exists(merged_path) and not overwrite:
if show_progress:
print(
f"Merged file {merged_path} already exists, skipping (use overwrite=True to force creation)."
)
result["merged"] = merged_path
else:
if show_progress:
print("Resampling and merging bands...")
# Determine target cell size if not provided
if cell_size is None and band_data_arrays:
# Use the resolution of the first band (usually 10m for B02, B03, B04, B08)
# Get the affine transform (containing resolution info)
first_band_data = band_data_arrays[0][1]
# Extract resolution from transform
cell_size = abs(first_band_data.rio.transform()[0])
if show_progress:
print(f"Using detected resolution: {cell_size}m")
elif cell_size is None:
# Default to 10m if no bands are available
cell_size = 10
if show_progress:
print(f"Using default resolution: {cell_size}m")
# Process bands in memory-efficient way
for i, (band_name, data_array) in enumerate(band_data_arrays):
if show_progress:
print(f"Processing band: {band_name}")
# Get current resolution
current_res = abs(data_array.rio.transform()[0])
# Resample if needed
if (
abs(current_res - cell_size) > 0.01
): # Small tolerance for floating point comparison
if show_progress:
print(
f"Resampling {band_name} from {current_res}m to {cell_size}m"
)
# Use bilinear for downsampling (higher to lower resolution)
# Use nearest for upsampling (lower to higher resolution)
resampling_method = (
Resampling.bilinear
if current_res < cell_size
else Resampling.nearest
)
resampled = data_array.rio.reproject(
data_array.rio.crs,
resolution=(cell_size, cell_size),
resampling=resampling_method,
)
resampled_arrays.append(resampled)
else:
resampled_arrays.append(data_array)
if show_progress:
print("Stacking bands...")
# Concatenate all resampled arrays along the band dimension
try:
merged_data = xr.concat(resampled_arrays, dim="band")
if show_progress:
print(f"Writing merged data to {merged_path}...")
# Add description metadata
merged_data.attrs["description"] = (
f"Multi-band image containing {', '.join(band_names)}"
)
# Create a dictionary mapping band indices to band names
band_descriptions = {}
for i, name in enumerate(band_names):
band_descriptions[i + 1] = name
# Write the merged data to file with band descriptions
merged_data.rio.to_raster(
merged_path,
tags={"BAND_NAMES": ",".join(band_names)},
descriptions=band_names,
)
result["merged"] = merged_path
if show_progress:
print(f"Merged bands saved to: {merged_path}")
print(f"Band order in merged file: {', '.join(band_names)}")
except Exception as e:
if show_progress:
print(f"Error during merging: {str(e)}")
print(f"Error details: {type(e).__name__}: {str(e)}")
raise
return result
download_with_progress(url, output_path)
¶
Download a file with a progress bar.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
URL of the file to download. |
required |
output_path |
str |
Path where the file will be saved. |
required |
Source code in geoai/download.py
def download_with_progress(url: str, output_path: str) -> None:
"""Download a file with a progress bar.
Args:
url: URL of the file to download.
output_path: Path where the file will be saved.
"""
response = requests.get(url, stream=True)
total_size = int(response.headers.get("content-length", 0))
block_size = 1024 # 1 Kibibyte
with (
open(output_path, "wb") as file,
tqdm(
desc=os.path.basename(output_path),
total=total_size,
unit="iB",
unit_scale=True,
unit_divisor=1024,
) as bar,
):
for data in response.iter_content(block_size):
size = file.write(data)
bar.update(size)
extract_building_stats(data)
¶
Extract statistics from the building data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
str |
Path to the GeoJSON file or GeoDataFrame containing building data. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Dictionary with statistics. |
Source code in geoai/download.py
def extract_building_stats(data: str) -> Dict[str, Any]:
"""Extract statistics from the building data.
Args:
data: Path to the GeoJSON file or GeoDataFrame containing building data.
Returns:
Dictionary with statistics.
"""
try:
# Read the GeoJSON file
if isinstance(data, gpd.GeoDataFrame):
gdf = data
else:
gdf = gpd.read_file(data)
# Calculate statistics
bbox = gdf.total_bounds.tolist()
# Convert numpy values to Python native types
bbox = [float(x) for x in bbox]
stats = {
"total_buildings": int(len(gdf)),
"has_height": (
int(gdf["height"].notna().sum()) if "height" in gdf.columns else 0
),
"has_name": (
int(gdf["names.common.value"].notna().sum())
if "names.common.value" in gdf.columns
else 0
),
"bbox": bbox,
}
return stats
except Exception as e:
logger.error(f"Error extracting statistics: {str(e)}")
return {"error": str(e)}
get_all_overture_types()
¶
Get a list of all available Overture Maps data types.
Returns:
Type | Description |
---|---|
list |
List of available Overture Maps data types. |
Source code in geoai/download.py
def get_all_overture_types():
"""Get a list of all available Overture Maps data types.
Returns:
list: List of available Overture Maps data types.
"""
from overturemaps import core
return core.get_all_overture_types()
get_overture_data(overture_type, bbox=None, columns=None, output=None, **kwargs)
¶
Fetches overture data and returns it as a GeoDataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
overture_type |
str |
The type of overture data to fetch.It can be one of the following: address|building|building_part|division|division_area|division_boundary|place| segment|connector|infrastructure|land|land_cover|land_use|water |
required |
bbox |
Tuple[float, float, float, float] |
The bounding box to filter the data. Defaults to None. |
None |
columns |
List[str] |
The columns to include in the output. Defaults to None. |
None |
output |
str |
The file path to save the output GeoDataFrame. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
gpd.GeoDataFrame |
The fetched overture data as a GeoDataFrame. |
Exceptions:
Type | Description |
---|---|
ImportError |
If the overture package is not installed. |
Source code in geoai/download.py
def get_overture_data(
overture_type: str,
bbox: Tuple[float, float, float, float] = None,
columns: List[str] = None,
output: str = None,
**kwargs: Any,
) -> "gpd.GeoDataFrame":
"""Fetches overture data and returns it as a GeoDataFrame.
Args:
overture_type (str): The type of overture data to fetch.It can be one of the following:
address|building|building_part|division|division_area|division_boundary|place|
segment|connector|infrastructure|land|land_cover|land_use|water
bbox (Tuple[float, float, float, float], optional): The bounding box to
filter the data. Defaults to None.
columns (List[str], optional): The columns to include in the output.
Defaults to None.
output (str, optional): The file path to save the output GeoDataFrame.
Defaults to None.
Returns:
gpd.GeoDataFrame: The fetched overture data as a GeoDataFrame.
Raises:
ImportError: If the overture package is not installed.
"""
try:
from overturemaps import core
except ImportError:
raise ImportError("The overturemaps package is required to use this function")
gdf = core.geodataframe(overture_type, bbox=bbox)
if columns is not None:
gdf = gdf[columns]
gdf.crs = "EPSG:4326"
out_dir = os.path.dirname(os.path.abspath(output))
if not os.path.exists(out_dir):
os.makedirs(out_dir, exist_ok=True)
if output is not None:
gdf.to_file(output, **kwargs)
return gdf
get_overture_latest_release(patch=True)
¶
Retrieves the value of the 'latest' key from the Overture Maps release JSON file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
patch |
bool |
If True, returns the full version string (e.g., "2025-02-19.0"). |
True |
Returns:
Type | Description |
---|---|
str |
The value of the 'latest' key from the releases.json file. |
Exceptions:
Type | Description |
---|---|
requests.RequestException |
If there's an issue with the HTTP request. |
KeyError |
If the 'latest' key is not found in the JSON data. |
json.JSONDecodeError |
If the response cannot be parsed as JSON. |
Source code in geoai/download.py
def get_overture_latest_release(patch=True) -> str:
"""
Retrieves the value of the 'latest' key from the Overture Maps release JSON file.
Args:
patch (bool): If True, returns the full version string (e.g., "2025-02-19.0").
Returns:
str: The value of the 'latest' key from the releases.json file.
Raises:
requests.RequestException: If there's an issue with the HTTP request.
KeyError: If the 'latest' key is not found in the JSON data.
json.JSONDecodeError: If the response cannot be parsed as JSON.
"""
url = "https://labs.overturemaps.org/data/releases.json"
try:
response = requests.get(url)
response.raise_for_status() # Raise an exception for HTTP errors
data = response.json()
if patch:
latest_release = data.get("latest")
else:
latest_release = data.get("latest").split(".")[
0
] # Extract the version number
if latest_release is None:
raise KeyError("The 'latest' key was not found in the releases.json file")
return latest_release
except requests.RequestException as e:
print(f"Error making the request: {e}")
raise
except json.JSONDecodeError as e:
print(f"Error parsing JSON response: {e}")
raise
except KeyError as e:
print(f"Key error: {e}")
raise
json_serializable(obj)
¶
Convert NumPy types to native Python types for JSON serialization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
Any |
Any object to convert. |
required |
Returns:
Type | Description |
---|---|
Any |
JSON serializable version of the object. |
Source code in geoai/download.py
def json_serializable(obj: Any) -> Any:
"""Convert NumPy types to native Python types for JSON serialization.
Args:
obj: Any object to convert.
Returns:
JSON serializable version of the object.
"""
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return obj
pc_collection_list(endpoint='https://planetarycomputer.microsoft.com/api/stac/v1', detailed=False, filter_by=None, sort_by='id')
¶
Retrieves and displays the list of available collections from Planetary Computer.
This function connects to the Planetary Computer STAC API and retrieves the list of all available collections, with options to filter and sort the results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
endpoint |
str |
STAC API endpoint URL. Defaults to "https://planetarycomputer.microsoft.com/api/stac/v1". |
'https://planetarycomputer.microsoft.com/api/stac/v1' |
detailed |
bool |
Whether to return detailed information for each collection. If False, returns only basic info. Defaults to False. |
False |
filter_by |
dict |
Dictionary of field:value pairs to filter collections. For example, {"license": "CC-BY-4.0"}. Defaults to None. |
None |
sort_by |
str |
Field to sort the collections by. Defaults to "id". |
'id' |
Returns:
Type | Description |
---|---|
pandas.DataFrame |
DataFrame containing collection information. |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If there's an issue connecting to the API. |
Source code in geoai/download.py
def pc_collection_list(
endpoint="https://planetarycomputer.microsoft.com/api/stac/v1",
detailed=False,
filter_by=None,
sort_by="id",
):
"""
Retrieves and displays the list of available collections from Planetary Computer.
This function connects to the Planetary Computer STAC API and retrieves the
list of all available collections, with options to filter and sort the results.
Args:
endpoint (str, optional): STAC API endpoint URL.
Defaults to "https://planetarycomputer.microsoft.com/api/stac/v1".
detailed (bool, optional): Whether to return detailed information for each
collection. If False, returns only basic info. Defaults to False.
filter_by (dict, optional): Dictionary of field:value pairs to filter
collections. For example, {"license": "CC-BY-4.0"}. Defaults to None.
sort_by (str, optional): Field to sort the collections by.
Defaults to "id".
Returns:
pandas.DataFrame: DataFrame containing collection information.
Raises:
ConnectionError: If there's an issue connecting to the API.
"""
# Initialize the STAC client
try:
catalog = Client.open(endpoint)
except Exception as e:
raise ConnectionError(f"Failed to connect to STAC API at {endpoint}: {str(e)}")
# Get all collections
try:
collections = list(catalog.get_collections())
except Exception as e:
raise Exception(f"Error retrieving collections: {str(e)}")
# Basic info to extract from all collections
collection_info = []
# Extract information based on detail level
for collection in collections:
# Basic information always included
info = {
"id": collection.id,
"title": collection.title or "No title",
"description": (
collection.description[:100] + "..."
if collection.description and len(collection.description) > 100
else collection.description
),
}
# Add detailed information if requested
if detailed:
# Get temporal extent if available
temporal_extent = "Unknown"
if collection.extent and collection.extent.temporal:
interval = (
collection.extent.temporal.intervals[0]
if collection.extent.temporal.intervals
else None
)
if interval:
start = interval[0] or "Unknown Start"
end = interval[1] or "Present"
if isinstance(start, datetime.datetime):
start = start.strftime("%Y-%m-%d")
if isinstance(end, datetime.datetime):
end = end.strftime("%Y-%m-%d")
temporal_extent = f"{start} to {end}"
# Add additional details
info.update(
{
"license": collection.license or "Unknown",
"keywords": (
", ".join(collection.keywords)
if collection.keywords
else "None"
),
"temporal_extent": temporal_extent,
"asset_count": len(collection.assets) if collection.assets else 0,
"providers": (
", ".join([p.name for p in collection.providers])
if collection.providers
else "Unknown"
),
}
)
# Add spatial extent if available
if collection.extent and collection.extent.spatial:
info["bbox"] = (
str(collection.extent.spatial.bboxes[0])
if collection.extent.spatial.bboxes
else "Unknown"
)
collection_info.append(info)
# Convert to DataFrame for easier filtering and sorting
df = pd.DataFrame(collection_info)
# Apply filtering if specified
if filter_by:
for field, value in filter_by.items():
if field in df.columns:
df = df[df[field].astype(str).str.contains(value, case=False, na=False)]
# Apply sorting
if sort_by in df.columns:
df = df.sort_values(by=sort_by)
print(f"Retrieved {len(df)} collections from Planetary Computer")
# # Print a nicely formatted table
# if not df.empty:
# print("\nAvailable collections:")
# print(tabulate(df, headers="keys", tablefmt="grid", showindex=False))
return df
pc_item_asset_list(item)
¶
Retrieve the list of asset keys from a STAC item in the Planetary Computer catalog.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
str |
The URL of the STAC item. |
required |
Returns:
Type | Description |
---|---|
list |
A list of asset keys available in the signed STAC item. |
Source code in geoai/download.py
def pc_item_asset_list(item):
"""
Retrieve the list of asset keys from a STAC item in the Planetary Computer catalog.
Args:
item (str): The URL of the STAC item.
Returns:
list: A list of asset keys available in the signed STAC item.
"""
if isinstance(item, str):
item = pystac.Item.from_file(item)
if not isinstance(item, pystac.Item):
raise ValueError("item_url must be a string (URL) or a pystac.Item object")
return list(item.assets.keys())
pc_stac_download(items, output_dir='.', assets=None, max_workers=4, skip_existing=True)
¶
Download assets from STAC items retrieved from the Planetary Computer.
This function downloads specified assets from a list of STAC items to the specified output directory. It supports parallel downloads and can skip already downloaded files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
items |
list or pystac.Item |
STAC Item object or list of STAC Item objects. |
required |
output_dir |
str |
Directory where assets will be saved. Defaults to current directory. |
'.' |
assets |
list |
List of asset keys to download. If None, downloads all available assets. Defaults to None. |
None |
max_workers |
int |
Maximum number of concurrent download threads. Defaults to 4. |
4 |
skip_existing |
bool |
Skip download if the file already exists. Defaults to True. |
True |
sign_urls |
bool |
Whether to sign URLs for authenticated access. Defaults to True. |
required |
Returns:
Type | Description |
---|---|
dict |
Dictionary mapping STAC item IDs to dictionaries of their downloaded assets {asset_key: file_path}. |
Exceptions:
Type | Description |
---|---|
TypeError |
If items is not a STAC Item or list of STAC Items. |
IOError |
If there's an error writing the downloaded assets to disk. |
Source code in geoai/download.py
def pc_stac_download(
items,
output_dir=".",
assets=None,
max_workers=4,
skip_existing=True,
):
"""
Download assets from STAC items retrieved from the Planetary Computer.
This function downloads specified assets from a list of STAC items to the
specified output directory. It supports parallel downloads and can skip
already downloaded files.
Args:
items (list or pystac.Item): STAC Item object or list of STAC Item objects.
output_dir (str, optional): Directory where assets will be saved.
Defaults to current directory.
assets (list, optional): List of asset keys to download. If None,
downloads all available assets. Defaults to None.
max_workers (int, optional): Maximum number of concurrent download threads.
Defaults to 4.
skip_existing (bool, optional): Skip download if the file already exists.
Defaults to True.
sign_urls (bool, optional): Whether to sign URLs for authenticated access.
Defaults to True.
Returns:
dict: Dictionary mapping STAC item IDs to dictionaries of their downloaded
assets {asset_key: file_path}.
Raises:
TypeError: If items is not a STAC Item or list of STAC Items.
IOError: If there's an error writing the downloaded assets to disk.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
# Handle single item case
if isinstance(items, pystac.Item) or isinstance(items, str):
items = [items]
elif not isinstance(items, list):
raise TypeError("items must be a STAC Item or list of STAC Items")
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Function to download a single asset
def download_asset(item, asset_key, asset):
item = pc.sign(item)
item_id = item.id
# Get the asset URL and sign it if needed
asset_url = item.assets[asset_key].href
# Determine output filename
if asset.media_type:
# Use appropriate file extension based on media type
if "tiff" in asset.media_type or "geotiff" in asset.media_type:
ext = ".tif"
elif "jpeg" in asset.media_type:
ext = ".jpg"
elif "png" in asset.media_type:
ext = ".png"
elif "json" in asset.media_type:
ext = ".json"
else:
# Default extension based on the original URL
ext = os.path.splitext(asset_url.split("?")[0])[1] or ".data"
else:
# Default extension based on the original URL
ext = os.path.splitext(asset_url.split("?")[0])[1] or ".data"
output_path = os.path.join(output_dir, f"{item_id}_{asset_key}{ext}")
# Skip if file exists and skip_existing is True
if skip_existing and os.path.exists(output_path):
print(f"Skipping existing asset: {asset_key} -> {output_path}")
return asset_key, output_path
try:
# Download the asset with progress bar
with requests.get(asset_url, stream=True) as r:
r.raise_for_status()
total_size = int(r.headers.get("content-length", 0))
with open(output_path, "wb") as f:
with tqdm(
total=total_size,
unit="B",
unit_scale=True,
unit_divisor=1024,
desc=f"Downloading {item_id}_{asset_key}",
ncols=100,
) as pbar:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
pbar.update(len(chunk))
return asset_key, output_path
except Exception as e:
print(f"Error downloading {asset_key} for item {item_id}: {str(e)}")
if os.path.exists(output_path):
os.remove(output_path) # Clean up partial download
return asset_key, None
# Process all items and their assets
results = {}
for item in items:
item_assets = {}
if isinstance(item, str):
item = pystac.Item.from_file(item)
item_id = item.id
print(f"Processing STAC item: {item_id}")
# Determine which assets to download
if assets:
assets_to_download = {k: v for k, v in item.assets.items() if k in assets}
if not assets_to_download:
print(
f"Warning: None of the specified asset keys {assets} found in item {item_id}"
)
print(f"Available asset keys: {list(item.assets.keys())}")
continue
else:
assets_to_download = item.assets
# Download assets concurrently
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all download tasks
future_to_asset = {
executor.submit(download_asset, item, asset_key, asset): (
asset_key,
asset,
)
for asset_key, asset in assets_to_download.items()
}
# Process results as they complete
for future in as_completed(future_to_asset):
asset_key, asset = future_to_asset[future]
try:
key, path = future.result()
if path:
item_assets[key] = path
except Exception as e:
print(
f"Error processing asset {asset_key} for item {item_id}: {str(e)}"
)
results[item_id] = item_assets
# Count total downloaded assets
total_assets = sum(len(assets) for assets in results.values())
print(f"\nDownloaded {total_assets} assets for {len(results)} items")
return results
pc_stac_search(collection, bbox=None, time_range=None, query=None, limit=10, max_items=None, endpoint='https://planetarycomputer.microsoft.com/api/stac/v1')
¶
Search for STAC items in the Planetary Computer catalog.
This function queries the Planetary Computer STAC API to find items matching the specified criteria, including collection, bounding box, time range, and additional query parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection |
str |
The STAC collection ID to search within. |
required |
bbox |
list |
Bounding box coordinates [west, south, east, north]. Defaults to None. |
None |
time_range |
str or tuple |
Time range as a string "start/end" or a tuple of (start, end) datetime objects. Defaults to None. |
None |
query |
dict |
Additional query parameters for filtering. Defaults to None. |
None |
limit |
int |
Number of items to return per page. Defaults to 10. |
10 |
max_items |
int |
Maximum total number of items to return. Defaults to None (returns all matching items). |
None |
endpoint |
str |
STAC API endpoint URL. Defaults to "https://planetarycomputer.microsoft.com/api/stac/v1". |
'https://planetarycomputer.microsoft.com/api/stac/v1' |
Returns:
Type | Description |
---|---|
list |
List of STAC Item objects matching the search criteria. |
Exceptions:
Type | Description |
---|---|
ValueError |
If invalid parameters are provided. |
ConnectionError |
If there's an issue connecting to the API. |
Source code in geoai/download.py
def pc_stac_search(
collection,
bbox=None,
time_range=None,
query=None,
limit=10,
max_items=None,
endpoint="https://planetarycomputer.microsoft.com/api/stac/v1",
):
"""
Search for STAC items in the Planetary Computer catalog.
This function queries the Planetary Computer STAC API to find items matching
the specified criteria, including collection, bounding box, time range, and
additional query parameters.
Args:
collection (str): The STAC collection ID to search within.
bbox (list, optional): Bounding box coordinates [west, south, east, north].
Defaults to None.
time_range (str or tuple, optional): Time range as a string "start/end" or
a tuple of (start, end) datetime objects. Defaults to None.
query (dict, optional): Additional query parameters for filtering.
Defaults to None.
limit (int, optional): Number of items to return per page. Defaults to 10.
max_items (int, optional): Maximum total number of items to return.
Defaults to None (returns all matching items).
endpoint (str, optional): STAC API endpoint URL.
Defaults to "https://planetarycomputer.microsoft.com/api/stac/v1".
Returns:
list: List of STAC Item objects matching the search criteria.
Raises:
ValueError: If invalid parameters are provided.
ConnectionError: If there's an issue connecting to the API.
"""
import datetime
# Initialize the STAC client
try:
catalog = Client.open(endpoint)
except Exception as e:
raise ConnectionError(f"Failed to connect to STAC API at {endpoint}: {str(e)}")
# Process time_range if provided
if time_range:
if isinstance(time_range, tuple) and len(time_range) == 2:
# Convert datetime objects to ISO format strings
start, end = time_range
if isinstance(start, datetime.datetime):
start = start.isoformat()
if isinstance(end, datetime.datetime):
end = end.isoformat()
time_str = f"{start}/{end}"
elif isinstance(time_range, str):
time_str = time_range
else:
raise ValueError(
"time_range must be a 'start/end' string or tuple of (start, end)"
)
else:
time_str = None
# Create the search object
search = catalog.search(
collections=[collection], bbox=bbox, datetime=time_str, query=query, limit=limit
)
# Collect the items
items = []
try:
# Use max_items if specified, otherwise get all items
if max_items:
items_gen = search.get_items()
for item in items_gen:
items.append(item)
if len(items) >= max_items:
break
else:
items = list(search.get_items())
except Exception as e:
raise Exception(f"Error retrieving search results: {str(e)}")
print(f"Found {len(items)} items matching search criteria")
return items
preview_raster(data, title=None)
¶
Display a preview of the downloaded imagery.
This function creates a visualization of the downloaded NAIP imagery by converting it to an RGB array and displaying it with matplotlib.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
The raster data as a rioxarray object. |
required |
title |
str |
The title for the preview plot. |
None |
Source code in geoai/download.py
def preview_raster(data: Any, title: str = None) -> None:
"""Display a preview of the downloaded imagery.
This function creates a visualization of the downloaded NAIP imagery
by converting it to an RGB array and displaying it with matplotlib.
Args:
data: The raster data as a rioxarray object.
title: The title for the preview plot.
"""
# Convert to 8-bit RGB for display
rgb_data = data.transpose("y", "x", "band").values[:, :, 0:3]
rgb_data = np.where(rgb_data > 255, 255, rgb_data).astype(np.uint8)
plt.figure(figsize=(10, 10))
plt.imshow(rgb_data)
if title is not None:
plt.title(title)
plt.axis("off")
plt.show()
read_pc_item_asset(item, asset, output=None, as_cog=True, **kwargs)
¶
Read a specific asset from a STAC item in the Planetary Computer catalog.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
str |
The URL of the STAC item. |
required |
asset |
str |
The key of the asset to read. |
required |
output |
str |
If specified, the path to save the asset as a raster file. |
None |
as_cog |
bool |
If True, save the asset as a Cloud Optimized GeoTIFF (COG). |
True |
Returns:
Type | Description |
---|---|
xarray.DataArray |
The data array for the specified asset. |
Source code in geoai/download.py
def read_pc_item_asset(item, asset, output=None, as_cog=True, **kwargs):
"""
Read a specific asset from a STAC item in the Planetary Computer catalog.
Args:
item (str): The URL of the STAC item.
asset (str): The key of the asset to read.
output (str, optional): If specified, the path to save the asset as a raster file.
as_cog (bool, optional): If True, save the asset as a Cloud Optimized GeoTIFF (COG).
Returns:
xarray.DataArray: The data array for the specified asset.
"""
if isinstance(item, str):
item = pystac.Item.from_file(item)
if not isinstance(item, pystac.Item):
raise ValueError("item must be a string (URL) or a pystac.Item object")
signed_item = pc.sign(item)
if asset not in signed_item.assets:
raise ValueError(
f"Asset '{asset}' not found in item '{item.id}'. It has available assets: {list(signed_item.assets.keys())}"
)
asset_url = signed_item.assets[asset].href
ds = rxr.open_rasterio(asset_url)
if as_cog:
kwargs["driver"] = "COG" # Ensure the output is a Cloud Optimized GeoTIFF
if output:
print(f"Saving asset '{asset}' to {output}...")
ds.rio.to_raster(output, **kwargs)
print(f"Asset '{asset}' saved successfully.")
return ds