Source code for lidar.filling
"""Module for filling surface depressions.
"""
import os
import time
import numpy as np
import richdem as rd
from scipy import ndimage
from skimage import measure
from osgeo import gdal, ogr, osr
from .common import *
from .filtering import *
[docs]class Depression:
"""The class for storing depression info."""
def __init__(
self,
id,
count,
size,
volume,
meanDepth,
maxDepth,
minElev,
bndElev,
perimeter,
major_axis,
minor_axis,
elongatedness,
eccentricity,
orientation,
area_bbox_ratio,
):
self.id = id
self.count = count
self.size = size
self.volume = volume
self.meanDepth = meanDepth
self.maxDepth = maxDepth
self.minElev = minElev
self.bndElev = bndElev
self.perimeter = perimeter
self.major_axis = major_axis
self.minor_axis = minor_axis
self.elongatedness = elongatedness
self.eccentricity = eccentricity
self.orientation = orientation
self.area_bbox_ratio = area_bbox_ratio
[docs]def regionGroup(img_array, min_size, no_data):
"""IdentifIies regions based on region growing method
Args:
img_array (array): The numpy array containing the image.
min_size (int): The minimum number of pixels to be considered as a depression.
no_data (float): The no_data value of the image.
Returns:
tuple: The labelled objects and total number of labels.
"""
img_array[img_array == no_data] = 0
label_objects, nb_labels = ndimage.label(img_array)
sizes = np.bincount(label_objects.ravel())
mask_sizes = sizes > min_size
mask_sizes[0] = 0
image_cleaned = mask_sizes[label_objects]
label_objects, nb_labels = ndimage.label(image_cleaned)
# nb_labels is the total number of objects. 0 represents background object.
return label_objects, nb_labels
[docs]def np2rdarray(in_array, no_data, projection, geotransform):
"""Converts an numpy array to rdarray.
Args:
in_array (array): The input numpy array.
no_data (float): The no_data value of the array.
projection (str): The projection of the image.
geotransform (str): The geotransform of the image.
Returns:
object: The richDEM array.
"""
out_array = rd.rdarray(in_array, no_data=no_data)
out_array.projection = projection
out_array.geotransform = geotransform
return out_array
[docs]def get_dep_props(objects, resolution):
"""Computes depression attributes.
Args:
objects (object): The labeled objects.
resolution (float): The spatial reoslution of the image.
Returns:
list: A list of depression objects with attributes.
"""
dep_list = []
for object in objects:
unique_id = object.label
count = object.area
size = count * pow(resolution, 2) # depression size
min_elev = np.float(object.min_intensity) # depression min elevation
max_elev = np.float(object.max_intensity) # depression max elevation
max_depth = max_elev - min_elev # depression max depth
mean_depth = np.float(
(max_elev * count - np.sum(object.intensity_image)) / count
) # depression mean depth
volume = mean_depth * count * pow(resolution, 2) # depression volume
perimeter = object.perimeter * resolution
major_axis = object.major_axis_length * resolution
minor_axis = object.minor_axis_length * resolution
if minor_axis == 0:
minor_axis = resolution
elongatedness = major_axis * 1.0 / minor_axis
eccentricity = object.eccentricity
orientation = object.orientation / 3.1415 * 180
area_bbox_ratio = object.extent
dep_list.append(
Depression(
unique_id,
count,
size,
volume,
mean_depth,
max_depth,
min_elev,
max_elev,
perimeter,
major_axis,
minor_axis,
elongatedness,
eccentricity,
orientation,
area_bbox_ratio,
)
)
return dep_list
[docs]def write_dep_csv(dep_list, csv_file):
"""Saves the depression list info to a CSV file.
Args:
dep_list (list): A list of depression objects with attributes.
csv_file (str): File path to the output CSV file.
"""
csv = open(csv_file, "w")
header = (
"region-id"
+ ","
+ "count"
+ ","
+ "area"
+ ","
+ "volume"
+ ","
+ "avg-depth"
+ ","
+ "max-depth"
+ ","
+ "min-elev"
+ ","
+ "max-elev"
+ ","
+ "perimeter"
+ ","
+ "major-axis"
+ ","
+ "minor-axis"
+ ","
+ "elongatedness"
+ ","
+ "eccentricity"
+ ","
+ "orientation"
+ ","
+ "area-bbox-ratio"
)
csv.write(header + "\n")
for dep in dep_list:
line = "{},{},{:.2f},{:.2f},{:.2f},{:.2f},{:.2f},{:.2f}, {:.2f},{:.2f},{:.2f},{:.2f},{:.2f},{:.2f},{:.2f}".format(
dep.id,
dep.count,
dep.size,
dep.volume,
dep.meanDepth,
dep.maxDepth,
dep.minElev,
dep.bndElev,
dep.perimeter,
dep.major_axis,
dep.minor_axis,
dep.elongatedness,
dep.eccentricity,
dep.orientation,
dep.area_bbox_ratio,
)
csv.write(line + "\n")
csv.close()
[docs]def polygonize(img, shp_path):
"""Converts a raster image to vector.
Args:
img (str): File path to the input image.
shp_path (str): File path to the output shapefile.
"""
# mapping between gdal type and ogr field type
type_mapping = {
gdal.GDT_Byte: ogr.OFTInteger,
gdal.GDT_UInt16: ogr.OFTInteger,
gdal.GDT_Int16: ogr.OFTInteger,
gdal.GDT_UInt32: ogr.OFTInteger,
gdal.GDT_Int32: ogr.OFTInteger,
gdal.GDT_Float32: ogr.OFTReal,
gdal.GDT_Float64: ogr.OFTReal,
gdal.GDT_CInt16: ogr.OFTInteger,
gdal.GDT_CInt32: ogr.OFTInteger,
gdal.GDT_CFloat32: ogr.OFTReal,
gdal.GDT_CFloat64: ogr.OFTReal,
}
ds = gdal.Open(img)
prj = ds.GetProjection()
srcband = ds.GetRasterBand(1)
dst_layername = "Shape"
drv = ogr.GetDriverByName("ESRI Shapefile")
dst_ds = drv.CreateDataSource(shp_path)
srs = osr.SpatialReference(wkt=prj)
dst_layer = dst_ds.CreateLayer(dst_layername, srs=srs)
# raster_field = ogr.FieldDefn('id', type_mapping[srcband.DataType])
raster_field = ogr.FieldDefn("id", type_mapping[gdal.GDT_Int32])
dst_layer.CreateField(raster_field)
gdal.Polygonize(srcband, srcband, dst_layer, 0, [], callback=None)
del img, ds, srcband, dst_ds, dst_layer
[docs]def ExtractSinks(in_dem, min_size, out_dir, filled_dem=None):
"""Extract sinks (e.g., maximum depression extent) from a DEM.
Args:
in_dem (str): File path to the input DEM.
min_size (int): The minimum number of pixels to be considered as a sink.
out_dir (str): File path to the output directory.
fill_dem (str, optional): The filled DEM.
Returns:
object: The richDEM array containing sinks.
"""
start_time = time.time()
out_dem = os.path.join(out_dir, "dem.tif")
out_dem_diff = os.path.join(out_dir, "dem_diff.tif")
out_sink = os.path.join(out_dir, "sink.tif")
out_region = os.path.join(out_dir, "region.tif")
out_depth = os.path.join(out_dir, "depth.tif")
out_csv_file = os.path.join(out_dir, "regions_info.csv")
out_vec_file = os.path.join(out_dir, "regions.shp")
if filled_dem is None:
out_dem_filled = os.path.join(out_dir, "dem_filled.tif")
else:
out_dem_filled = filled_dem
# create output folder if nonexistent
if not os.path.exists(out_dir):
os.mkdir(out_dir)
# load the dem and get dem info
print("Loading data ...")
dem = rd.LoadGDAL(in_dem)
no_data = dem.no_data
projection = dem.projection
geotransform = dem.geotransform
cell_size = np.round(geotransform[1], decimals=2)
# get min and max elevation of the dem
max_elev = np.float(np.max(dem[dem != no_data]))
min_elev = np.float(np.min(dem[dem > 0]))
print(
"min = {:.2f}, max = {:.2f}, no_data = {}, cell_size = {}".format(
min_elev, max_elev, no_data, cell_size
)
)
# depression filling
if filled_dem is None:
print("Depression filling ...")
dem_filled = rd.FillDepressions(dem, in_place=False)
else:
dem_filled = rd.LoadGDAL(filled_dem)
dem_diff = dem_filled - dem
dem_diff.no_data = 0
if filled_dem is None:
print("Saving filled dem ...")
rd.SaveGDAL(out_dem_filled, dem_filled)
rd.SaveGDAL(out_dem_diff, dem_diff)
# nb_labels is the total number of objects. 0 represents background object.
print("Region grouping ...")
label_objects, nb_labels = regionGroup(dem_diff, min_size, no_data)
dem_diff[label_objects == 0] = 0
depth = np2rdarray(
dem_diff, no_data=0, projection=projection, geotransform=geotransform
)
rd.SaveGDAL(out_depth, depth)
del dem_diff, depth
print("Computing properties ...")
# objects = measure.regionprops(label_objects, dem, coordinates='xy')
objects = measure.regionprops(label_objects, dem)
dep_list = get_dep_props(objects, cell_size)
write_dep_csv(dep_list, out_csv_file)
del objects, dep_list
# convert numpy to richdem data format
region = np2rdarray(
label_objects, no_data=0, projection=projection, geotransform=geotransform
)
del label_objects
print("Saving sink dem ...")
sink = np.copy(dem)
sink[region == 0] = 0
sink = np2rdarray(sink, no_data=0, projection=projection,
geotransform=geotransform)
rd.SaveGDAL(out_sink, sink)
# del sink
print("Saving refined dem ...")
dem_refined = dem_filled
dem_refined[region > 0] = dem[region > 0]
dem_refined = np2rdarray(
dem_refined, no_data=no_data, projection=projection, geotransform=geotransform
)
rd.SaveGDAL(out_dem, dem_refined)
rd.SaveGDAL(out_region, region)
del dem_refined, region, dem
print("Converting raster to vector ...")
polygonize(out_region, out_vec_file)
end_time = time.time()
print("Total run time:\t\t\t {:.4f} s\n".format(end_time - start_time))
return out_sink
[docs]def extract_sinks_by_bbox(
bbox,
filename,
min_size=10,
tmp_dir=None,
mask=None,
crs="EPSG:5070",
kernel_size=3,
resolution=10,
to_cog=False,
keep_files=True,
ignore_warnings=True,
):
"""Extract sinks from a DEM by HUC8.
Args:
bbox (list): The bounding box in the form of [minx, miny, maxx, maxy].
filename (str, optional): The output depression file name.
min_size (int, optional): The minimum number of pixels to be considered as a sink. Defaults to 10.
tmp_dir (str, optional): The temporary directory. Defaults to None, e.g., using the current directory.
mask (str, optional): The mask file path. Defaults to None.
crs (str, optional): The coordinate reference system. Defaults to "EPSG:5070".
kernel_size (int, optional): The kernel size for smoothing the DEM. Defaults to 3.
resolution (int, optional): The resolution of the DEM. Defaults to 10.
to_cog (bool, optional): Whether to convert the output to COG. Defaults to False.
keep_files (bool, optional): Whether to keep the intermediate files. Defaults to True.
ignore_warnings (bool, optional): Whether to ignore warnings. Defaults to True.
"""
import shutil
import warnings
if ignore_warnings:
warnings.filterwarnings("ignore")
start_time = time.time()
if not filename.endswith(".shp"):
filename = filename + ".shp"
filename = os.path.abspath(filename)
if tmp_dir is None:
tmp_dir = os.path.join(os.getcwd(), "tmp")
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
merge = os.path.join(tmp_dir, "mosaic.tif")
clip = os.path.join(tmp_dir, "clip.tif")
reproj = os.path.join(tmp_dir, "reproj.tif")
image = os.path.join(tmp_dir, "image.tif")
median = os.path.join(tmp_dir, "median.tif")
regions = os.path.join(tmp_dir, "regions.shp")
regions_info = os.path.join(tmp_dir, "regions_info.csv")
try:
download_ned_by_bbox(bbox, out_dir=tmp_dir)
if not os.path.exists(merge):
print("Merging NED tiles ...")
mosaic(tmp_dir, merge)
if mask is not None:
clip_image(merge, mask, clip)
else:
clip = merge
reproject_image(clip, reproj, crs)
resample(reproj, image, resolution)
MedianFilter(image, kernel_size, median)
if to_cog:
image_to_cog(median, median)
ExtractSinks(median, min_size, tmp_dir)
join_tables(regions, regions_info, filename)
for file in [merge, clip, reproj, image]:
if os.path.exists(file):
os.remove(file)
if not keep_files:
shutil.rmtree(tmp_dir)
except Exception as e:
print(e)
return None
end_time = time.time()
print("Total run time:\t\t\t {:.4f} s\n".format(end_time - start_time))
[docs]def extract_sinks_by_huc8(
huc8,
min_size=10,
filename=None,
tmp_dir=None,
wbd=None,
crs="EPSG:5070",
kernel_size=3,
resolution=10,
keep_files=True,
error_file=None,
ignore_warnings=True,
):
"""Extract sinks from a DEM by HUC8.
Args:
huc8 (str): The HUC8 code, e.g., 01010002
min_size (int, optional): The minimum number of pixels to be considered as a sink. Defaults to 10.
filename (str, optional): The output depression file name. Defaults to None, e,g., using the HUC8 code.
tmp_dir (str, optional): The temporary directory. Defaults to None, e.g., using the current directory.
wbd (str | GeoDataFrame, optional): The watershed boundary file. Defaults to None.
crs (str, optional): The coordinate reference system. Defaults to "EPSG:5070".
kernel_size (int, optional): The kernel size for smoothing the DEM. Defaults to 3.
resolution (int, optional): The resolution of the DEM. Defaults to 10.
keep_files (bool, optional): Whether to keep the intermediate files. Defaults to True.
error_file (_type_, optional): The file to save the error IDs. Defaults to None.
ignore_warnings (bool, optional): Whether to ignore warnings. Defaults to True.
"""
import shutil
import warnings
import geopandas as gpd
if ignore_warnings:
warnings.filterwarnings("ignore")
start_time = time.time()
if filename is None:
filename = huc8
if not filename.endswith(".shp"):
filename = filename + ".shp"
filename = os.path.abspath(filename)
if tmp_dir is None:
tmp_dir = os.path.join(os.getcwd(), huc8)
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
merge = os.path.join(tmp_dir, "mosaic.tif")
mask = os.path.join(tmp_dir, "mask.geojson")
clip = os.path.join(tmp_dir, "clip.tif")
reproj = os.path.join(tmp_dir, "reproj.tif")
image = os.path.join(tmp_dir, "image.tif")
median = os.path.join(tmp_dir, "median.tif")
regions = os.path.join(tmp_dir, "regions.shp")
regions_info = os.path.join(tmp_dir, "regions_info.csv")
try:
download_ned_by_huc(huc8, out_dir=tmp_dir)
if wbd is None:
print("Downloading WBD ...")
hu8_url = "https://drive.google.com/file/d/1AVBPVVAzsLs8dnF_bCvFvGMCAEgaPthh/view?usp=sharing"
output = os.path.join(tmp_dir, "WBDHU8_CONUS.zip")
wbd = download_file(hu8_url, output=output, unzip=False)
if isinstance(wbd, str):
print("Reading WBD ...")
gdf = gpd.read_file(wbd)
elif isinstance(wbd, gpd.GeoDataFrame):
gdf = wbd
else:
raise ValueError("shp_path must be a filepath or a GeoDataFrame.")
selected = gdf[gdf["huc8"] == huc8].copy()
selected.to_crs(epsg=4326, inplace=True)
selected.to_file(mask)
if not os.path.exists(merge):
print("Merging NED tiles ...")
mosaic(tmp_dir, merge)
clip_image(merge, mask, clip)
reproject_image(clip, reproj, crs)
resample(reproj, image, resolution)
MedianFilter(image, kernel_size, median)
ExtractSinks(median, min_size, tmp_dir)
join_tables(regions, regions_info, filename)
for file in [merge, mask, clip, reproj, image]:
if os.path.exists(file):
os.remove(file)
if not keep_files:
shutil.rmtree(tmp_dir)
except Exception as e:
if error_file is not None:
with open(error_file, "a") as f:
f.write(huc8 + "\n")
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
print(e)
return None
end_time = time.time()
print("Total run time:\t\t\t {:.4f} s\n".format(end_time - start_time))
[docs]def extract_sinks_by_huc8_batch(
huc_ids=None,
min_size=10,
out_dir=None,
tmp_dir=None,
wbd=None,
crs="EPSG:5070",
kernel_size=3,
resolution=10,
keep_files=False,
reverse=False,
error_file=None,
ignore_warnings=True,
ignored_ids=[],
overwrite=False,
):
"""Extract sinks from NED by HUC8.
Args:
huc8 (str): The HUC8 code, e.g., 01010002
min_size (int, optional): The minimum number of pixels to be considered as a sink. Defaults to 10.
filename (str, optional): The output depression file name. Defaults to None, e,g., using the HUC8 code.
tmp_dir (str, optional): The temporary directory. Defaults to None, e.g., using the current directory.
wbd (str | GeoDataFrame, optional): The watershed boundary file. Defaults to None.
crs (str, optional): The coordinate reference system. Defaults to "EPSG:5070".
kernel_size (int, optional): The kernel size for smoothing the DEM. Defaults to 3.
resolution (int, optional): The resolution of the DEM. Defaults to 10.
keep_files (bool, optional): Whether to keep the intermediate files. Defaults to True.
reverse (bool, optional): Whether to reverse the HUC8 list. Defaults to False.
error_file (_type_, optional): The file to save the error IDs. Defaults to None.
ignore_warnings (bool, optional): Whether to ignore warnings. Defaults to True.
overwrite (bool, optional): Whether to overwrite the existing files. Defaults to False.
"""
import pandas as pd
if huc_ids is None:
url = "https://raw.githubusercontent.com/giswqs/lidar/master/examples/data/huc8.csv"
df = pd.read_csv(url, dtype=str)
huc_ids = df["huc8_id"].tolist()
if not isinstance(huc_ids, list):
huc_ids = [huc_ids]
if reverse:
huc_ids = huc_ids[::-1]
if out_dir is None:
out_dir = os.getcwd()
for index, huc8 in enumerate(huc_ids):
print(f"Processing {index+1}:{len(huc_ids)}: {huc8} ...")
if huc8 in ignored_ids:
continue
filename = os.path.join(out_dir, str(huc8) + ".shp")
if not os.path.exists(filename) or (os.path.exists(filename) and overwrite):
extract_sinks_by_huc8(
huc8,
min_size,
filename,
tmp_dir,
wbd,
crs,
kernel_size,
resolution,
keep_files,
error_file,
ignore_warnings,
)
else:
print(f"File already exists: {filename}")
[docs]def image_to_cog(source, dst_path=None, profile="deflate", **kwargs):
"""Converts an image to a COG file.
Args:
source (str): A dataset path, URL or rasterio.io.DatasetReader object.
dst_path (str, optional): An output dataset path or or PathLike object. Defaults to None.
profile (str, optional): COG profile. More at https://cogeotiff.github.io/rio-cogeo/profile. Defaults to "deflate".
Raises:
ImportError: If rio-cogeo is not installed.
FileNotFoundError: If the source file could not be found.
"""
try:
from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profiles
except ImportError:
raise ImportError(
"The rio-cogeo package is not installed. Please install it with `pip install rio-cogeo` or `conda install rio-cogeo -c conda-forge`."
)
if not source.startswith("http"):
source = check_file_path(source)
if not os.path.exists(source):
raise FileNotFoundError(
"The provided input file could not be found.")
if dst_path is None:
if not source.startswith("http"):
dst_path = os.path.splitext(source)[0] + "_cog.tif"
else:
dst_path = temp_file_path(extension=".tif")
dst_path = check_file_path(dst_path)
dst_profile = cog_profiles.get(profile)
cog_translate(source, dst_path, dst_profile, **kwargs)