aggregated_point_extraction module¶
AggregatedPointExtraction
¶
Source code in skiba/aggregated_point_extraction.py
class AggregatedPointExtraction:
def __init__(self):
# File Upload
self.file_upload = widgets.FileUpload(
accept=".csv, .txt", # Accepted file extension e.g. '.txt', '.pdf', 'image/*', 'image/*,.pdf'
multiple=False, # True to accept multiple files upload else False
)
# Dropdown
url = "https://raw.githubusercontent.com/opengeos/geospatial-data-catalogs/master/gee_catalog.json"
data = AggregatedPointExtraction.fetch_geojson(url)
data_dict = {item["title"]: item["id"] for item in data if "title" in item}
self.dropdown = widgets.Dropdown(
options=data_dict, # keys shown, values returned
description="Dataset:",
disabled=False,
)
self.start_date = widgets.DatePicker(description="Start Date", disabled=False)
self.end_date = widgets.DatePicker(description="End Date", disabled=False)
self.run_button = widgets.Button(
description="Run Query",
disabled=False,
button_style="", # 'success', 'info', 'warning', 'danger' or ''
tooltip="Click me",
icon="rotate right", # (FontAwesome names without the `fa-` prefix)
)
self.output = widgets.Output()
self.run_button.on_click(self.on_button_clicked)
self.dropdown.observe(self.on_dropdown_change, names="value")
self.hbox = widgets.HBox(
[
self.file_upload,
self.dropdown,
self.start_date,
self.end_date,
self.run_button,
]
)
self.vbox = widgets.VBox([self.hbox, self.output])
def on_dropdown_change(self, change):
if change["new"]:
with self.output:
self.output.clear_output()
catalog = "https://raw.githubusercontent.com/opengeos/geospatial-data-catalogs/master/gee_catalog.json"
data = AggregatedPointExtraction.fetch_geojson(catalog)
data_dict = {item["id"]: item["url"] for item in data if "id" in item}
change_value = str(change["new"])
url = data_dict.get(change_value)
print(f"Selected dataset: {change['new']}")
print(f"URL: {url}")
def on_button_clicked(self, b):
with self.output:
self.output.clear_output()
print(
f"You entered: {self.dropdown.value}. CSV file will be saved to Downloads folder under this name."
)
import io
if self.file_upload.value:
# For the first file (if multiple=False)
file_info = self.file_upload.value[0]
content_bytes = file_info["content"].tobytes() # file content as bytes
points = pd.read_csv(io.BytesIO(content_bytes))
lat_cols = ["lat", "latitude", "y", "LAT", "Latitude", "Lat", "Y"]
lon_cols = [
"lon",
"long",
"longitude",
"x",
"LON",
"Longitude",
"Long",
"X",
]
id_cols = ["id", "ID", "plot_ID", "plot_id", "plotID", "plotId"]
def find_column(possible_names, columns):
for name in possible_names:
if name in columns:
return name
# fallback: check case-insensitive match
lower_columns = {c.lower(): c for c in columns}
for name in possible_names:
if name.lower() in lower_columns:
return lower_columns[name.lower()]
raise ValueError(f"No matching column found for {possible_names}")
lat_col = find_column(lat_cols, points.columns)
lon_col = find_column(lon_cols, points.columns)
id_col = find_column(id_cols, points.columns)
points = points.rename(
columns={lat_col: "LAT", lon_col: "LON", id_col: "plot_ID"}
)
else:
print("Please upload a CSV file.")
geedata = self.dropdown.value
start_date = self.start_date.value
end_date = self.end_date.value
self.get_coordinate_data(
data=points, geedata=geedata, start_date=start_date, end_date=end_date
)
def get_coordinate_data(self, data, geedata, start_date, end_date, **kwargs):
"""
Pull data from provided coordinates from GEE.
Args:
data (str): The data to get the coordinate data from.
Returns:
data (str): CSV file contained GEE data.
"""
# Load data with safety checks
if isinstance(data, str):
coordinates = pd.read_csv(data)
gdf = gpd.GeoDataFrame(
coordinates,
geometry=gpd.points_from_xy(coordinates.LON, coordinates.LAT),
crs="EPSG:4326", # Directly set CRS during creation
)
elif isinstance(data, pd.DataFrame):
coordinates = data
gdf = gpd.GeoDataFrame(
coordinates,
geometry=gpd.points_from_xy(coordinates.LON, coordinates.LAT),
crs="EPSG:4326", # Directly set CRS during creation
)
else:
gdf = data.to_crs(epsg=4326) # Ensure WGS84
geojson = gdf.__geo_interface__
fc = gm.geojson_to_ee(geojson)
# Load the GEE dataset as an image
geeimage = AggregatedPointExtraction.load_gee_as_image(
geedata, start_date, end_date
)
name = f"{geedata}"
file_name = name.replace("/", "_")
out_dir = os.path.join(os.path.expanduser("~"), "Downloads")
output_file = f"{file_name}.csv"
out_path = os.path.join(out_dir, output_file)
# Retrieve data from the image using sampleRegions
sampled_data = gm.extract_values_to_points(fc, geeimage, scale=None)
sampled_df = gm.ee_to_df(sampled_data)
filtered_df = sampled_df.drop(["LAT", "LON", "Unnamed: 0"], axis=1)
print("Pre-aggregation data preview:")
print(filtered_df.head())
aggregated_df = filtered_df.groupby("plot_ID").mean()
aggregated_df.to_csv(out_path)
return aggregated_df
def fetch_geojson(url):
try:
response = requests.get(url)
response.raise_for_status() # Raises an exception for HTTP errors
geojson_data = response.json() # Parse the JSON response
return geojson_data
except requests.exceptions.HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
except requests.exceptions.ConnectionError as conn_err:
print(f"Error connecting to the server: {conn_err}")
except Exception as err:
print(f"An error occurred: {err}")
return None
def create_dropdown():
"""
Creates an ipywidgets dropdown menu from a GeoJSON catalog.
Args:
url (str, optional): URL to the GeoJSON catalog. Defaults to the Opengeos catalog.
Returns:
ipywidgets.Dropdown: A dropdown widget with the names from the catalog.
"""
url = "https://raw.githubusercontent.com/opengeos/geospatial-data-catalogs/master/gee_catalog.json"
data = AggregatedPointExtraction.fetch_geojson(url)
data_dict = {item["title"]: item["id"] for item in data if "title" in item}
dropdown = widgets.Dropdown(
options=data_dict, # keys shown, values returned
description="Dataset:",
disabled=False,
)
return dropdown
def add_date_picker():
date_picker = widgets.DatePicker(description="Pick a Date", disabled=False)
return date_picker
def load_gee_as_image(dataset_id, start_date, end_date, **kwargs):
"""
Loads any GEE dataset (Image, ImageCollection, FeatureCollection) as an ee.Image.
Optionally filters by start and end date if applicable.
Parameters:
dataset_id (str): The Earth Engine dataset ID.
start_date (str): Optional start date in 'YYYY-MM-DD' format.
end_date (str): Optional end date in 'YYYY-MM-DD' format.
Returns:
ee.Image: The resulting image.
"""
url = "https://raw.githubusercontent.com/opengeos/geospatial-data-catalogs/master/gee_catalog.json"
response = requests.get(url)
response.raise_for_status() # Raises an exception for HTTP errors
geojson_data = response.json()
data_type = [item["type"] for item in geojson_data if item["id"] == dataset_id]
data_str = " ".join(data_type)
start_date = str(start_date)
end_date = str(end_date)
# Try loading as Image
if data_str == "image":
img = ee.Image(dataset_id)
# If .getInfo() doesn't throw, it's an Image
img.getInfo()
return img
elif data_str == "image_collection":
col = ee.ImageCollection(dataset_id)
# If date filters are provided, apply them
if start_date is None and end_date is None:
col = col.filterDate(start_date, end_date)
else:
pass
# Reduce to a single image (e.g., median composite)
img = col.median()
return img
# Try loading as FeatureCollection (convert to raster)
else:
# fc_temp = ee.FeatureCollection(dataset_id)
# if start_date is None and end_date is None:
# fc_temp = fc_temp.filterDate(start_date, end_date)
# # Convert to raster: burn a value of 1 into a new image
# img = fc_temp.reduceToImage(properties=[], reducer=ee.Reducer.median())
# img.getInfo()
# return img
# or print(f"Dataset must be either an Image or Image Collection")
raise ValueError("Dataset ID is not a valid Image or ImageCollection.")
create_dropdown()
¶
Creates an ipywidgets dropdown menu from a GeoJSON catalog.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url |
str |
URL to the GeoJSON catalog. Defaults to the Opengeos catalog. |
required |
Returns:
| Type | Description |
|---|---|
ipywidgets.Dropdown |
A dropdown widget with the names from the catalog. |
Source code in skiba/aggregated_point_extraction.py
def create_dropdown():
"""
Creates an ipywidgets dropdown menu from a GeoJSON catalog.
Args:
url (str, optional): URL to the GeoJSON catalog. Defaults to the Opengeos catalog.
Returns:
ipywidgets.Dropdown: A dropdown widget with the names from the catalog.
"""
url = "https://raw.githubusercontent.com/opengeos/geospatial-data-catalogs/master/gee_catalog.json"
data = AggregatedPointExtraction.fetch_geojson(url)
data_dict = {item["title"]: item["id"] for item in data if "title" in item}
dropdown = widgets.Dropdown(
options=data_dict, # keys shown, values returned
description="Dataset:",
disabled=False,
)
return dropdown
get_coordinate_data(self, data, geedata, start_date, end_date, **kwargs)
¶
Pull data from provided coordinates from GEE.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
str |
The data to get the coordinate data from. |
required |
Returns:
| Type | Description |
|---|---|
data (str) |
CSV file contained GEE data. |
Source code in skiba/aggregated_point_extraction.py
def get_coordinate_data(self, data, geedata, start_date, end_date, **kwargs):
"""
Pull data from provided coordinates from GEE.
Args:
data (str): The data to get the coordinate data from.
Returns:
data (str): CSV file contained GEE data.
"""
# Load data with safety checks
if isinstance(data, str):
coordinates = pd.read_csv(data)
gdf = gpd.GeoDataFrame(
coordinates,
geometry=gpd.points_from_xy(coordinates.LON, coordinates.LAT),
crs="EPSG:4326", # Directly set CRS during creation
)
elif isinstance(data, pd.DataFrame):
coordinates = data
gdf = gpd.GeoDataFrame(
coordinates,
geometry=gpd.points_from_xy(coordinates.LON, coordinates.LAT),
crs="EPSG:4326", # Directly set CRS during creation
)
else:
gdf = data.to_crs(epsg=4326) # Ensure WGS84
geojson = gdf.__geo_interface__
fc = gm.geojson_to_ee(geojson)
# Load the GEE dataset as an image
geeimage = AggregatedPointExtraction.load_gee_as_image(
geedata, start_date, end_date
)
name = f"{geedata}"
file_name = name.replace("/", "_")
out_dir = os.path.join(os.path.expanduser("~"), "Downloads")
output_file = f"{file_name}.csv"
out_path = os.path.join(out_dir, output_file)
# Retrieve data from the image using sampleRegions
sampled_data = gm.extract_values_to_points(fc, geeimage, scale=None)
sampled_df = gm.ee_to_df(sampled_data)
filtered_df = sampled_df.drop(["LAT", "LON", "Unnamed: 0"], axis=1)
print("Pre-aggregation data preview:")
print(filtered_df.head())
aggregated_df = filtered_df.groupby("plot_ID").mean()
aggregated_df.to_csv(out_path)
return aggregated_df
load_gee_as_image(dataset_id, start_date, end_date, **kwargs)
¶
Loads any GEE dataset (Image, ImageCollection, FeatureCollection) as an ee.Image. Optionally filters by start and end date if applicable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dataset_id |
str |
The Earth Engine dataset ID. |
required |
start_date |
str |
Optional start date in 'YYYY-MM-DD' format. |
required |
end_date |
str |
Optional end date in 'YYYY-MM-DD' format. |
required |
Returns:
| Type | Description |
|---|---|
ee.Image |
The resulting image. |
Source code in skiba/aggregated_point_extraction.py
def load_gee_as_image(dataset_id, start_date, end_date, **kwargs):
"""
Loads any GEE dataset (Image, ImageCollection, FeatureCollection) as an ee.Image.
Optionally filters by start and end date if applicable.
Parameters:
dataset_id (str): The Earth Engine dataset ID.
start_date (str): Optional start date in 'YYYY-MM-DD' format.
end_date (str): Optional end date in 'YYYY-MM-DD' format.
Returns:
ee.Image: The resulting image.
"""
url = "https://raw.githubusercontent.com/opengeos/geospatial-data-catalogs/master/gee_catalog.json"
response = requests.get(url)
response.raise_for_status() # Raises an exception for HTTP errors
geojson_data = response.json()
data_type = [item["type"] for item in geojson_data if item["id"] == dataset_id]
data_str = " ".join(data_type)
start_date = str(start_date)
end_date = str(end_date)
# Try loading as Image
if data_str == "image":
img = ee.Image(dataset_id)
# If .getInfo() doesn't throw, it's an Image
img.getInfo()
return img
elif data_str == "image_collection":
col = ee.ImageCollection(dataset_id)
# If date filters are provided, apply them
if start_date is None and end_date is None:
col = col.filterDate(start_date, end_date)
else:
pass
# Reduce to a single image (e.g., median composite)
img = col.median()
return img
# Try loading as FeatureCollection (convert to raster)
else:
# fc_temp = ee.FeatureCollection(dataset_id)
# if start_date is None and end_date is None:
# fc_temp = fc_temp.filterDate(start_date, end_date)
# # Convert to raster: burn a value of 1 into a new image
# img = fc_temp.reduceToImage(properties=[], reducer=ee.Reducer.median())
# img.getInfo()
# return img
# or print(f"Dataset must be either an Image or Image Collection")
raise ValueError("Dataset ID is not a valid Image or ImageCollection.")