import os
import rasterio
from tqdm import tqdm
from multiprocessing.pool import ThreadPool
from threading import Lock
from .raster import Band
from .raster import BandCollection
[docs]class SequentialSampler:
def __init__(self, band_collection, channels, sample_size, bound=0):
""" Iterate over BandCollection sequentially with specified shape (+ bounds)
Args:
band_collection: BandCollection instance
channels: list of str, required channels with required order
sample_size: (height, width), size of `pure` sample in pixels (bounds not included)
bound: int, bounds in pixels added to sample
Return:
Iterable object (yield SampleCollection instances)
"""
self.band_collection = band_collection
self.sample_size = sample_size
self.bound = bound
self.channels = channels
self.blocks = self._compute_blocks()
def __len__(self):
return len(self.blocks)
def __getitem__(self, i):
block = self.blocks[i]
sample = (self.band_collection
.ordered(*self.channels)
.sample(block['y'], block['x'], block['height'], block['width']))
return sample, block
def _compute_blocks(self):
h, w = self.sample_size
blocks = []
height = h + 2 * self.bound
width = w + 2 * self.bound
for y in range(- self.bound, self.band_collection.height, h):
for x in range(- self.bound, self.band_collection.width, w):
rigth_x_bound = max(self.bound,
x + width - self.band_collection.width)
bottom_y_bound = max(self.bound,
y + height - self.band_collection.height)
blocks.append({'x': x,
'y': y,
'height': height,
'width': width,
'bounds':
[[self.bound, bottom_y_bound], [self.bound, rigth_x_bound]],
})
return blocks
[docs]class SampleWindowWriter:
def __init__(self, fp, shape, transform, crs, nodata, dtype='uint8'):
""" Create empty `Band` (rasterio open file) and write blocks sequentially
Args:
fp: file path of created Band
shape: (height, width), size of band in pixels
transform: rasterio Affine object
crs: rasterio CRS or epsg core of coordinate system
nodata: value of pixels without data
dtype: str, one of rasterio data types
Returns:
when closed return `Band`
Examples:
```python
# create band
bc = BandCollection(['/path/to/RED.tif', '/path/to/GRN.tif'])
src = SequentialSampler(bc, channels, (1024, 1024), 512)
dst = SampleWindowWriter('./test.tif', src.shape, **bc.profile)
for sample, block in src:
# read raster
raster = sample.ordered('RED').numpy()
# transform raster
raster += 1
# write raster
dst.write(raster, **block)
# close file when all data precessed
created_band = dst.close()
```
"""
self.fp = fp
self.shape = shape
self.transform = transform
self.nodata = nodata
self.crs = crs
self.dtype = dtype
self.dst = self.open()
@property
def height(self):
return self.shape[0]
@property
def width(self):
return self.shape[1]
[docs] def open(self):
return rasterio.open(self.fp, 'w', driver='GTiff', transform=self.transform, crs=self.crs,
height=self.height, width=self.width, count=1,
dtype=self.dtype, nodata=self.nodata)
[docs] def close(self):
self.dst.close()
return Band(self.fp)
[docs] def write(self, raster, x, y, width, height, bounds=None):
""" Writes the specified raster into a window in dst
The raster boundaries can be cut by 'bounds' pixels to prevent boundary effects on the algorithm output.
If width and height are not equal to size of raster (after the bounds are cut), which is not typical,
the raster is stretched to the window size (width and height)
Args:
raster: numpy array to be written into dst
x: begin position of window
y: begin position of window
width: size of window
height: size of window
bounds: [[,][,]] - number of pixels to cut off from each side of the raster before writing
Returns:
"""
if bounds:
raster = raster[bounds[0][0]:raster.shape[0]-bounds[0][1], bounds[1][0]:raster.shape[1]-bounds[1][1]]
x += bounds[1][0]
y += bounds[0][0]
width = width - bounds[1][1] - bounds[1][0]
height = height - bounds[0][1] - bounds[0][0]
self.dst.write(raster, 1, window=((y, y+height), (x, x+width)))
[docs]class SampleCollectionWindowWriter:
def __init__(self, directory, channels, shape, transform, crs, nodata, dtype='uint8'):
""" Create empty `Band` (rasterio open file) and write blocks sequentially
Args:
direcory: directory path of created BandCollection
channels: channel names of created BandCollection
shape: (height, width), size of band in pixels
transform: rasterio Affine object
crs: rasterio CRS or epsg core of coordinate system
nodata: value of pixels without data
dtype: str, one of rasterio data types
Returns:
when closed return `BandCollection`
Examples:
```python
# create band
bc = BandCollection(['/path/to/RED.tif', '/path/to/GRN.tif'])
src = SequentialSampler(bc, channels, (1024, 1024), 512)
dst = SampleCollectionWindowWriter('./test.tif', src.shape, **bc.profile)
for sample, block in src:
# read raster
raster = sample.numpy()
# transform raster
raster += 1
# write raster
dst.write(raster, **block)
# close file when all data precessed
created_bc = dst.close()
```
"""
if directory:
os.makedirs(directory, exist_ok=True)
self.fps = [os.path.join(directory, channel + '.tif') for channel in channels]
self.channels = channels
self.shape = shape
self.transform = transform
self.nodata = nodata
self.crs = crs
self.dtype = dtype
self.writers = self.open()
[docs] def open(self):
writers = []
for fp in self.fps:
writers.append(
SampleWindowWriter(fp, self.shape, self.transform,
self.crs, self.nodata, self.dtype)
)
return writers
[docs] def write(self, raster, x, y, height, width, bounds=None):
for i in range(len(self.channels)):
self.writers[i].write(raster[i], x, y, height, width, bounds=bounds)
[docs] def close(self):
bands = [w.close() for w in self.writers]
return BandCollection(bands)
[docs]class Predictor:
def __init__(self, input_channels, output_labels, processing_fn,
sample_size=(1024, 1024), bound=256, n_workers=1, verbose=True, **kwargs):
"""
Args:
input_channels: list of str, names of bands/channels
output_labels: list of str, names of output classes
processing_fn: callable, function that take as an input `SampleCollection`
and return raster with shape (output_labels, H, W)
sample_size: (height, width), size of `pure` sample in pixels (bounds not included)
bound: int, bounds in pixels added to sample
Returns:
processed BandCollection
"""
self.input_channels = input_channels
self.output_labels = output_labels
self.processing_fn = processing_fn
self.sample_size = sample_size
self.bound = bound
self.kwargs = kwargs
self.n_workers = n_workers
self.verbose = verbose
self.lock = Lock()
def _threaded_processing(self, args):
self._processing(*args)
def _processing(self, sample, block, dst):
raster = self.processing_fn(sample)
with self.lock:
dst.write(raster, **block)
[docs] def process(self, bc, output_directory):
src = SequentialSampler(bc, self.input_channels, self.sample_size, self.bound)
dst = SampleCollectionWindowWriter(output_directory, self.output_labels,
bc.shape[1:], **bc.profile, **self.kwargs)
args = [(sample, block, dst) for sample, block in src]
if self.n_workers > 1:
with ThreadPool(self.n_workers) as p:
with tqdm(total=len(args), disable=(not self.verbose)) as pbar:
for _ in p.imap(self._threaded_processing, args):
pbar.update()
else:
with tqdm(args, disable=(not self.verbose)) as data:
for sample, block, dst in data:
self._processing(sample, block, dst)
return dst.close()