Source code for datacube.api.workflow.cell_dataset_band_chunk

#!/usr/bin/env python

# ===============================================================================
# Copyright (c)  2014 Geoscience Australia
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither Geoscience Australia nor the names of its contributors may be
#       used to endorse or promote products derived from this software
#       without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# ===============================================================================
import os
from datacube.api.utils import get_satellite_string

__author__ = "Simon Oldfield"


import abc
import datacube.api.workflow as workflow
import logging
import luigi
from datacube.api import dataset_type_arg
from datacube.api.model import Ls57Arg25Bands, get_bands, Tile, DatasetType

_log = logging.getLogger()


[docs]class Workflow(workflow.Workflow): __metaclass__ = abc.ABCMeta def __init__(self, name): # Call method on super class # super(self.__class__, self).__init__(name) workflow.Workflow.__init__(self, name) self.dataset_type = None self.bands = None self.chunk_size_x = None self.chunk_size_y = None
[docs] def setup_arguments(self): # Call method on super class # super(self.__class__, self).setup_arguments() workflow.Workflow.setup_arguments(self) self.parser.add_argument("--dataset-type", help="The type of dataset to process", action="store", dest="dataset_type", type=dataset_type_arg, choices=self.get_supported_dataset_types(), required=True, metavar=" ".join( [dataset_type.name for dataset_type in self.get_supported_dataset_types()])) self.parser.add_argument("--band", help="The band(s) from the dataset to process", action="store", required=True, dest="bands", type=str, nargs="+", metavar=" ".join([b.name for b in Ls57Arg25Bands])) self.parser.add_argument("--chunk-size-x", help="X chunk size", action="store", dest="chunk_size_x", type=int, choices=range(1, 4000 + 1), required=True) self.parser.add_argument("--chunk-size-y", help="Y chunk size", action="store", dest="chunk_size_y", type=int, choices=range(1, 4000 + 1), required=True)
[docs] def process_arguments(self, args): # Call method on super class # super(self.__class__, self).process_arguments(args) workflow.Workflow.process_arguments(self, args) self.dataset_type = args.dataset_type self.bands = args.bands # Verify that all the requested satellites have the requested bands for satellite in self.satellites: if not all(item in [b.name for b in get_bands(self.dataset_type, satellite)] for item in self.bands): _log.error("Requested bands [%s] not ALL present for satellite [%s]", self.bands, satellite) raise Exception("Not all bands present for all satellites") self.chunk_size_x = args.chunk_size_x self.chunk_size_y = args.chunk_size_y
[docs] def log_arguments(self): # Call method on super class # super(self.__class__, self).log_arguments() workflow.Workflow.log_arguments(self) _log.info(""" dataset to retrieve = {dataset_type} bands = {bands} X chunk size = {chunk_size_x} Y chunk size = {chunk_size_y} """.format(dataset_type=self.dataset_type.name, bands=self.bands, chunk_size_x=self.chunk_size_x, chunk_size_y=self.chunk_size_y))
@abc.abstractmethod
[docs] def create_summary_task(self, acq_min, acq_max, months): raise Exception("Abstract method should be overridden")
@abc.abstractmethod
[docs] def get_supported_dataset_types(self): raise Exception("Abstract method should be overridden")
[docs]class SummaryTask(workflow.SummaryTask): __metaclass__ = abc.ABCMeta dataset_type = luigi.Parameter() bands = luigi.Parameter(is_list=True) chunk_size_x = luigi.IntParameter() chunk_size_y = luigi.IntParameter() @abc.abstractmethod
[docs] def create_cell_tasks(self, x, y): raise Exception("Abstract method should be overridden")
[docs]class CellTask(workflow.CellTask): __metaclass__ = abc.ABCMeta dataset_type = luigi.Parameter() bands = luigi.Parameter(is_list=True) chunk_size_x = luigi.IntParameter() chunk_size_y = luigi.IntParameter()
[docs] def requires(self): return [self.create_cell_dataset_band_task(band) for band in self.bands]
@abc.abstractmethod
[docs] def create_cell_dataset_band_task(self, band): raise Exception("Abstract method should be overridden")
[docs]class CellDatasetBandTask(workflow.Task): __metaclass__ = abc.ABCMeta x = luigi.IntParameter() y = luigi.IntParameter() acq_min = luigi.DateParameter() acq_max = luigi.DateParameter() months = luigi.Parameter(is_list=True) satellites = luigi.Parameter(is_list=True) output_directory = luigi.Parameter() csv = luigi.BooleanParameter() dummy = luigi.BooleanParameter() mask_pqa_apply = luigi.BooleanParameter() mask_pqa_mask = luigi.Parameter() mask_wofs_apply = luigi.BooleanParameter() mask_wofs_mask = luigi.Parameter() dataset_type = luigi.Parameter() band = luigi.Parameter() chunk_size_x = luigi.IntParameter() chunk_size_y = luigi.IntParameter()
[docs] def requires(self): return [self.create_cell_dataset_band_chunk_task(x_offset, y_offset) for x_offset, y_offset in self.get_chunks()]
[docs] def get_chunks(self): import itertools for x_offset, y_offset in itertools.product(range(0, 4000, self.chunk_size_x), range(0, 4000, self.chunk_size_y)): yield x_offset, y_offset
@abc.abstractmethod
[docs] def create_cell_dataset_band_chunk_task(self, x_offset, y_offset): raise Exception("Abstract method should be overridden")
[docs]class CellDatasetBandChunkTask(workflow.Task): __metaclass__ = abc.ABCMeta x = luigi.IntParameter() y = luigi.IntParameter() acq_min = luigi.DateParameter() acq_max = luigi.DateParameter() months = luigi.Parameter(is_list=True) satellites = luigi.Parameter(is_list=True) output_directory = luigi.Parameter() csv = luigi.BooleanParameter() dummy = luigi.BooleanParameter() mask_pqa_apply = luigi.BooleanParameter() mask_pqa_mask = luigi.Parameter() mask_wofs_apply = luigi.BooleanParameter() mask_wofs_mask = luigi.Parameter() dataset_type = luigi.Parameter() band = luigi.Parameter() x_offset = luigi.IntParameter() y_offset = luigi.IntParameter() chunk_size_x = luigi.IntParameter() chunk_size_y = luigi.IntParameter()
[docs] def get_dataset_types(self): dataset_types = [self.dataset_type] if self.mask_pqa_apply: dataset_types.append(DatasetType.PQ25) if self.mask_wofs_apply: dataset_types.append(DatasetType.WATER) return dataset_types
[docs] def get_tiles(self): # get list of tiles from CSV if self.csv: return list(self.get_tiles_from_csv()) # get list of tiles from DB else: return list(self.get_tiles_from_db())
[docs] def get_tiles_from_csv(self): if os.path.isfile(self.get_tile_csv_filename()): with open(self.get_tile_csv_filename(), "rb") as f: import csv reader = csv.DictReader(f) for record in reader: _log.debug("Found CSV record [%s]", record) yield Tile.from_csv_record(record)
[docs] def get_tile_csv_filename(self): acq_min = workflow.format_date(self.acq_min) acq_max = workflow.format_date(self.acq_max) # TODO other distinguishing characteristics (e.g. dataset types) return os.path.join( self.output_directory, "tiles_{satellites}_{x_min:03d}_{x_max:03d}_{y_min:04d}_{y_max:04d}_{acq_min}_{acq_max}.csv".format( satellites=get_satellite_string(self.satellites), x_min=self.x, x_max=self.x, y_min=self.y, y_max=self.y, acq_min=acq_min, acq_max=acq_max ))
[docs] def get_tiles_from_db(self): from datacube.api.query import list_tiles x_list = [self.x] y_list = [self.y] for tile in list_tiles(x=x_list, y=y_list, acq_min=self.acq_min, acq_max=self.acq_max, include=self.months, satellites=[satellite for satellite in self.satellites], dataset_types=self.get_dataset_types()): yield tile