Source code for aeronet.dataset.raster.band

import os
import numpy as np
import warnings

import rasterio
from rasterio import Affine
from rasterio.crs import CRS
from rasterio.coords import BoundingBox
from rasterio.warp import calculate_default_transform, reproject, Resampling

from ..coords import get_utm_zone
from .geoobject import GeoObject
from ._utils import band_shape_guard, random_name

TMP_DIR = '/tmp/raster'


[docs]class Band(GeoObject): """Filesystem object `Band` - Rasterio DatasetReader wrapper. The Band provides access to a georeferenced raster file placed in the filesystem. It stores all the necessary metadata and allows to read the raster data on request The majority of properties are inherited from rasterio `DatasetReader <https://rasterio.readthedocs.io/en/latest/api/rasterio.io.html#rasterio.io.DatasetReader>`_. """ def __init__(self, fp): """ Args: fp: path to GeoTiff file """ super().__init__() self._band = rasterio.open(fp) self._tmp_file = False def __del__(self): fp = self._band.name self._band.close() if self._tmp_file: os.remove(fp) # ======================== PROPERTY BLOCK ======================== @property def crs(self): return self._band.crs @property def transform(self): return self._band.transform @property def nodata(self): return self._band.nodata @property def res(self): return self._band.res @property def width(self): return self._band.width @property def height(self): return self._band.height @property def count(self): return self._band.count @property def shape(self): return self.height, self.width @property def name(self): """Name of the file associated with the Band, without extension.""" return os.path.basename(self._band.name).split('.')[0] @property def bounds(self): return self._band.bounds @property def meta(self): """The basic metadata of the associated rasterio DatasetReader""" return self._band.meta @property def dtype(self): """ Returns: numpy.dtype: Numerical type of the data stored in raster """ return self._band.dtypes[0] # ======================== METHODS BLOCK ========================
[docs] def numpy(self): """ Read all the raster data into memory Returns: `BandSample` representing the whole Band """ return self.sample(0, 0, self.height, self.width).numpy()
[docs] def same(self, other): """Compare bands by crs, transform, width, height. Returns: True if all match, False otherwise """ res = True res = res and (self.crs == other.crs) res = res and (self.transform == other.transform) res = res and (self.height == other.height) res = res and (self.width == other.width) return res
def _same_extent(self, other: GeoObject): """ Compares the spatial extent of the current and onther Bands based on their CRSes and transforms The extent is treated as 'same' if the boundaries differ not more than half of the biggest pixel Args: other: Band to compare the extent to Returns: bool result of check """ # explicitly calculate the other Band's dimensions and resolution in the current crs other_bounds = rasterio.warp.transform_bounds(other.crs, self.crs, *other.bounds) other_res = [abs(other_bounds[0] - other_bounds[2])/other.width, abs(other_bounds[1] - other_bounds[3])/other.height] max_pixel = [max(self.res[0], other_res[0]), max(self.res[1], other_res[1])] # check every bound to be different not more than half of the bigger pixel if abs(other_bounds[0] - self.bounds[0]) > 0.5*max_pixel[0] or \ abs(other_bounds[1] - self.bounds[1]) > 0.5 * max_pixel[1] or \ abs(other_bounds[2] - self.bounds[2]) > 0.5 * max_pixel[0] or \ abs(other_bounds[3] - self.bounds[3]) > 0.5 * max_pixel[1]: return False else: return True
[docs] def sample(self, y, x, height, width, **kwargs): """ Read sample of the Band to memory. The sample is defined by its size and position in the raster, without respect to the georeference. In case if the sample coordinates spread out of the image boundaries, the image is padded with nodata value. Args: x: pixel horizontal coordinate of left top corner of the sample y: pixel vertical coordinate of left top corner of the sample width: spatial dimension of sample in pixels height: spatial dimension of sample in pixels Return: `BandSample` object """ coord_x = self.transform.c + x * self.transform.a coord_y = self.transform.f + y * self.transform.e dst_crs = self.crs dst_name = os.path.basename(self.name) dst_nodata = self.nodata if self.nodata is not None else 0 dst_transform = Affine(self.transform.a, self.transform.b, coord_x, self.transform.d, self.transform.e, coord_y) dst_raster = self._band.read(window=((y, y + height), (x, x + width)), boundless=True, fill_value=dst_nodata) sample = BandSample(dst_name, dst_raster, dst_crs, dst_transform, dst_nodata) return sample
[docs] def resample(self, dst_res, fp=None, interpolation='nearest'): """ Change spatial resolution of the Args: dst_res: fp: interpolation: Returns: """ # get temporary filepath if such is not provided tmp_file = False if fp is not None else True if fp is None: fp = '{tmp}/resampled/{directory}/{name}.tif'.format( tmp=TMP_DIR, directory=random_name(), name=self.name) os.makedirs(os.path.dirname(fp), exist_ok=True) transform = Affine(dst_res[0], self.transform.b, self.transform.c, self.transform.d, - dst_res[1], self.transform.f) width = round(self.width / (dst_res[0]/self.res[0])) height = round(self.height / (dst_res[1]/self.res[1])) kwargs = self.meta.copy() kwargs.update({ 'transform': transform, 'width': width, 'height': height }) with rasterio.open(fp, 'w', **kwargs) as dst: reproject( source=rasterio.band(self._band, 1), destination=rasterio.band(dst, 1), src_transform=self.transform, src_crs=self.crs, dst_transform=transform, dst_crs=self.crs, resampling=getattr(Resampling, interpolation)) # new band band = Band(fp) band._tmp_file = tmp_file # file will be automatically removed when `Band` instance will be deleted return band
[docs] def reproject(self, dst_crs, fp=None, interpolation='nearest'): if dst_crs == 'utm': dst_crs = get_utm_zone(self.crs, self.transform, (self.height, self.width)) # get temporary filepath if such is not provided tmp_file = False if fp is not None else True if fp is None: fp = '{tmp}/reprojected_{crs}/{directory}/{name}.tif'.format( tmp=TMP_DIR, crs=dst_crs, directory=random_name(), name=self.name) os.makedirs(os.path.dirname(fp), exist_ok=True) # calculate params of new reprojected Band transform, width, height = calculate_default_transform( self.crs, dst_crs, self.width, self.height, *self.bounds) kwargs = self.meta.copy() kwargs.update({ 'crs': dst_crs, 'transform': transform, 'width': width, 'height': height }) # reproject with rasterio.open(fp, 'w', **kwargs) as dst: reproject( source=rasterio.band(self._band, 1), destination=rasterio.band(dst, 1), src_transform=self.transform, src_crs=self.crs, dst_transform=transform, dst_crs=dst_crs, resampling=getattr(Resampling, interpolation)) # new band band = Band(fp) band._tmp_file = tmp_file # file will be automatically removed when `Band` instance will be deleted return band
[docs] def reproject_to(self, other: GeoObject, fp=None, interpolation='nearest'): """ Reprojects self to match exactly the `other`. This function ensures that the raster size, crs and transform will be the same, allowing them to be merged into one BandCollection. If the intial raster exceeds the other in coverage, it will be cut, and if it is insufficient or displaced, it will be zero-padded. It aims to overpass the rounding problem which may cause an image to be misaligned with itself after a different series of transforms. If the images are far from each other, the warning will be shown, because the raster may be zero due to severe misalignment. Args: other: the Band with the parameters to fit to fp: specifies where to save new Band; if None, a temporary file is created interpolation: interpolation type parameter, defaults to Nearest Neighbor. Returns: new reprojected and resampled Band """ if not self._same_extent(other): warnings.warn('You are trying to match two bands that are not even approxiamtely aligned. ' 'The resulting raster may be empty') # get temporary filepath if such is not provided tmp_file = False if fp is not None else True if fp is None: fp = '{tmp}/reprojected_{crs}/{directory}/{name}.tif'.format( tmp=TMP_DIR, crs=other.crs, directory=random_name(), name=self.name) os.makedirs(os.path.dirname(fp), exist_ok=True) kwargs = self.meta.copy() kwargs.update({ 'crs': other.crs, 'transform': other.transform, 'width': other.width, 'height': other.height }) # reproject - as in rio.warp --like with rasterio.open(fp, 'w', **kwargs) as dst: reproject( source=rasterio.band(self._band, 1), destination=rasterio.band(dst, 1), src_transform=self.transform, src_crs=self.crs, dst_transform=other.transform, dst_crs=other.crs, resampling=getattr(Resampling, interpolation)) # new band band = Band(fp) band._tmp_file = tmp_file # file will be automatically removed when `Band` instance will be deleted return band
[docs] def reproject_to_utm(self, fp=None, interpolation='nearest'): """ Alias of `reproject` method with automatic Band utm zone determining """ dst_crs = get_utm_zone(self.crs, self.transform, (self.height, self.width)) return self.reproject(dst_crs, fp=fp, interpolation=interpolation)
[docs] def generate_samples(self, width, height): """ Yield `Sample`s with defined grid Args: width: dimension of sample in pixels and step along `X` axis height: dimension of sample in pixels and step along `Y` axis Returns: Generator object """ for x in range(0, self.width, width): for y in range(0, self.height, height): yield self.sample(y, x, height, width)
[docs]class BandSample(GeoObject): """ A wrapper over numpy array representing an in-memory georeferenced raster image. It implements all the interfaces of the GeoObject, and stores the raster data in memory """ def __init__(self, name, raster, crs, transform, nodata=0): super().__init__() self._name = name self._raster = band_shape_guard(raster) self._nodata = nodata self._transform = Affine(*transform) if not isinstance(transform, Affine) else transform self._crs = CRS(init=crs) if not isinstance(crs, CRS) else crs def __eq__(self, other): res = np.allclose(self.numpy(), other.numpy()) res = res and (self.crs.get('init') == other.crs.get('init')) res = res and np.allclose(np.array(self.transform), np.array(other.transform)) return res def __repr__(self): return '<BandSample: name={}, shape={}, dtype={}>'.format(self.name, self.shape, self.dtype) # ======================== PROPERTY BLOCK ======================== @property def width(self): return self._raster.shape[1] @property def height(self): return self._raster.shape[0] @property def count(self): return 1 @property def shape(self): return self.height, self.width @property def dtype(self): return self._raster.dtype @property def res(self): return abs(self.transform.a), abs(self.transform.e) @property def transform(self): return self._transform @property def crs(self): return self._crs @property def nodata(self): return self._nodata @property def bounds(self): left = self.transform.c top = self.transform.f right= left + self.transform.a * self.width bottom = top + self.transform.e * self.height return BoundingBox(left, bottom, right, top) @property def name(self): return self._name # ======================== METHODS BLOCK ========================
[docs] @classmethod def from_file(cls, fp): band = Band(fp) return band.sample(0, 0, band.width, band.height)
[docs] def same(self, other): """Compare if samples have same resolution, crs and shape. This means that the samples represent the same territory (like different spectral channels of the same image) and can be processed together as collection. Args: other: BandSample object to compare with Returns: True if the BandSamples match in shape and georeference """ res = True res = res and (self.crs == other.crs) res = res and (self.transform == self.transform) res = res and (self.height == self.height) res = res and (self.width == self.width) return res
[docs] def save(self, directory, ext='.tif', **kwargs): fp = os.path.join(directory, self._name + ext) with rasterio.open(fp, mode='w', driver='GTiff', width=self.width, height=self.height, count=1, crs=self.crs.get('init'), transform=self.transform, nodata=self.nodata, dtype=self.dtype, **kwargs) as dst: dst.write(self._raster.squeeze(), 1)
[docs] def sample(self, y, x, height, width): """ Subsample of Sample with specified: x, y - pixel coordinates of left top corner width, height - spatial dimension of sample in pixels Return: `Sample` object """ coord_x = self.transform.c + x * self.transform.a coord_y = self.transform.f + y * self.transform.e dst_transform = Affine(self.transform.a, self.transform.b, coord_x, self.transform.d, self.transform.e, coord_y) dst_raster = self._raster[y:y+height, x:x+width] return BandSample(self.name, dst_raster, self.crs, dst_transform, self.nodata)
[docs] def reproject(self, dst_crs, interpolation='nearest'): if dst_crs == 'utm': dst_crs = get_utm_zone(self.crs, self.transform, (self.height, self.width)) dst_transform, dst_width, dst_height = calculate_default_transform( self.crs, dst_crs, self.width, self.height, *self.bounds) new_raster = np.empty(shape=(1, dst_height, dst_width), dtype=self.dtype) reproject( self._raster, new_raster, src_transform=self.transform, dst_transform=dst_transform, src_crs=self.crs, dst_crs=dst_crs, resampling=getattr(Resampling, interpolation)) return BandSample(self.name, new_raster, dst_crs, dst_transform, self.nodata)
[docs] def reproject_to_utm(self, interpolation='nearest'): """Alias of `reproject` method with automatic Band utm zone determining""" dst_crs = get_utm_zone(self.crs, self.transform, (self.height, self.width)) return self.reproject(dst_crs, interpolation=interpolation)
[docs] def resample(self, dst_res=None, dst_shape=None, interpolation='nearest'): transform = self.transform if dst_res is None else Affine(dst_res[1], self.transform.b, self.transform.c, self.transform.d, - dst_res[0], self.transform.f) if dst_res is not None and dst_shape is None: target_height = int(self.height * self.res[0] / dst_res[0]) target_width = int(self.width * self.res[1] / dst_res[1]) elif dst_shape is not None: target_height = dst_shape[1] target_width = dst_shape[2] else: target_height = self.height target_width = self.width new_raster = np.empty(shape=(1, target_height, target_width), dtype=self.dtype) reproject( self._raster, new_raster, src_transform=self.transform, dst_transform=transform, src_crs=self.crs, dst_crs=self.crs, resampling=getattr(Resampling, interpolation)) return BandSample(self._name, new_raster, self.crs, transform, self.nodata)
[docs] def numpy(self): return self._raster
[docs] def generate_samples(self, width, height): """ Yield `Sample`s with defined grid Args: width: dimension of sample in pixels and step along `X` axis height: dimension of sample in pixels and step along `Y` axis Returns: Generator object """ for x in range(0, self.width, width): for y in range(0, self.height, height): yield self.sample(y, x, height, width)