#!/usr/bin/env python
# ===============================================================================
# Copyright (c) 2014 Geoscience Australia
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither Geoscience Australia nor the names of its contributors may be
# used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# ===============================================================================
import os
from datacube.api.utils import get_satellite_string
__author__ = "Simon Oldfield"
import abc
import datacube.api.workflow as workflow
import logging
import luigi
from datacube.api import dataset_type_arg
from datacube.api.model import Ls57Arg25Bands, get_bands, Tile, DatasetType
_log = logging.getLogger()
[docs]class Workflow(workflow.Workflow):
__metaclass__ = abc.ABCMeta
def __init__(self, name):
# Call method on super class
# super(self.__class__, self).__init__(name)
workflow.Workflow.__init__(self, name)
self.dataset_type = None
self.bands = None
self.chunk_size_x = None
self.chunk_size_y = None
[docs] def setup_arguments(self):
# Call method on super class
# super(self.__class__, self).setup_arguments()
workflow.Workflow.setup_arguments(self)
self.parser.add_argument("--dataset-type", help="The type of dataset to process", action="store",
dest="dataset_type",
type=dataset_type_arg,
choices=self.get_supported_dataset_types(), required=True,
metavar=" ".join(
[dataset_type.name for dataset_type in self.get_supported_dataset_types()]))
self.parser.add_argument("--band", help="The band(s) from the dataset to process", action="store", required=True,
dest="bands", type=str, nargs="+", metavar=" ".join([b.name for b in Ls57Arg25Bands]))
self.parser.add_argument("--chunk-size-x", help="X chunk size", action="store", dest="chunk_size_x", type=int,
choices=range(1, 4000 + 1), required=True)
self.parser.add_argument("--chunk-size-y", help="Y chunk size", action="store", dest="chunk_size_y", type=int,
choices=range(1, 4000 + 1), required=True)
[docs] def process_arguments(self, args):
# Call method on super class
# super(self.__class__, self).process_arguments(args)
workflow.Workflow.process_arguments(self, args)
self.dataset_type = args.dataset_type
self.bands = args.bands
# Verify that all the requested satellites have the requested bands
for satellite in self.satellites:
if not all(item in [b.name for b in get_bands(self.dataset_type, satellite)] for item in self.bands):
_log.error("Requested bands [%s] not ALL present for satellite [%s]", self.bands, satellite)
raise Exception("Not all bands present for all satellites")
self.chunk_size_x = args.chunk_size_x
self.chunk_size_y = args.chunk_size_y
[docs] def log_arguments(self):
# Call method on super class
# super(self.__class__, self).log_arguments()
workflow.Workflow.log_arguments(self)
_log.info("""
dataset to retrieve = {dataset_type}
bands = {bands}
X chunk size = {chunk_size_x}
Y chunk size = {chunk_size_y}
""".format(dataset_type=self.dataset_type.name, bands=self.bands,
chunk_size_x=self.chunk_size_x, chunk_size_y=self.chunk_size_y))
@abc.abstractmethod
[docs] def create_summary_task(self, acq_min, acq_max, months):
raise Exception("Abstract method should be overridden")
@abc.abstractmethod
[docs] def get_supported_dataset_types(self):
raise Exception("Abstract method should be overridden")
[docs]class SummaryTask(workflow.SummaryTask):
__metaclass__ = abc.ABCMeta
dataset_type = luigi.Parameter()
bands = luigi.Parameter(is_list=True)
chunk_size_x = luigi.IntParameter()
chunk_size_y = luigi.IntParameter()
@abc.abstractmethod
[docs] def create_cell_tasks(self, x, y):
raise Exception("Abstract method should be overridden")
[docs]class CellTask(workflow.CellTask):
__metaclass__ = abc.ABCMeta
dataset_type = luigi.Parameter()
bands = luigi.Parameter(is_list=True)
chunk_size_x = luigi.IntParameter()
chunk_size_y = luigi.IntParameter()
[docs] def requires(self):
return [self.create_cell_dataset_band_task(band) for band in self.bands]
@abc.abstractmethod
[docs] def create_cell_dataset_band_task(self, band):
raise Exception("Abstract method should be overridden")
[docs]class CellDatasetBandTask(workflow.Task):
__metaclass__ = abc.ABCMeta
x = luigi.IntParameter()
y = luigi.IntParameter()
acq_min = luigi.DateParameter()
acq_max = luigi.DateParameter()
months = luigi.Parameter(is_list=True)
satellites = luigi.Parameter(is_list=True)
output_directory = luigi.Parameter()
csv = luigi.BooleanParameter()
dummy = luigi.BooleanParameter()
mask_pqa_apply = luigi.BooleanParameter()
mask_pqa_mask = luigi.Parameter()
mask_wofs_apply = luigi.BooleanParameter()
mask_wofs_mask = luigi.Parameter()
dataset_type = luigi.Parameter()
band = luigi.Parameter()
chunk_size_x = luigi.IntParameter()
chunk_size_y = luigi.IntParameter()
[docs] def requires(self):
return [self.create_cell_dataset_band_chunk_task(x_offset, y_offset) for x_offset, y_offset in self.get_chunks()]
[docs] def get_chunks(self):
import itertools
for x_offset, y_offset in itertools.product(range(0, 4000, self.chunk_size_x), range(0, 4000, self.chunk_size_y)):
yield x_offset, y_offset
@abc.abstractmethod
[docs] def create_cell_dataset_band_chunk_task(self, x_offset, y_offset):
raise Exception("Abstract method should be overridden")
[docs]class CellDatasetBandChunkTask(workflow.Task):
__metaclass__ = abc.ABCMeta
x = luigi.IntParameter()
y = luigi.IntParameter()
acq_min = luigi.DateParameter()
acq_max = luigi.DateParameter()
months = luigi.Parameter(is_list=True)
satellites = luigi.Parameter(is_list=True)
output_directory = luigi.Parameter()
csv = luigi.BooleanParameter()
dummy = luigi.BooleanParameter()
mask_pqa_apply = luigi.BooleanParameter()
mask_pqa_mask = luigi.Parameter()
mask_wofs_apply = luigi.BooleanParameter()
mask_wofs_mask = luigi.Parameter()
dataset_type = luigi.Parameter()
band = luigi.Parameter()
x_offset = luigi.IntParameter()
y_offset = luigi.IntParameter()
chunk_size_x = luigi.IntParameter()
chunk_size_y = luigi.IntParameter()
[docs] def get_dataset_types(self):
dataset_types = [self.dataset_type]
if self.mask_pqa_apply:
dataset_types.append(DatasetType.PQ25)
if self.mask_wofs_apply:
dataset_types.append(DatasetType.WATER)
return dataset_types
[docs] def get_tiles(self):
# get list of tiles from CSV
if self.csv:
return list(self.get_tiles_from_csv())
# get list of tiles from DB
else:
return list(self.get_tiles_from_db())
[docs] def get_tiles_from_csv(self):
if os.path.isfile(self.get_tile_csv_filename()):
with open(self.get_tile_csv_filename(), "rb") as f:
import csv
reader = csv.DictReader(f)
for record in reader:
_log.debug("Found CSV record [%s]", record)
yield Tile.from_csv_record(record)
[docs] def get_tile_csv_filename(self):
acq_min = workflow.format_date(self.acq_min)
acq_max = workflow.format_date(self.acq_max)
# TODO other distinguishing characteristics (e.g. dataset types)
return os.path.join(
self.output_directory,
"tiles_{satellites}_{x_min:03d}_{x_max:03d}_{y_min:04d}_{y_max:04d}_{acq_min}_{acq_max}.csv".format(
satellites=get_satellite_string(self.satellites),
x_min=self.x, x_max=self.x, y_min=self.y, y_max=self.y,
acq_min=acq_min, acq_max=acq_max
))
[docs] def get_tiles_from_db(self):
from datacube.api.query import list_tiles
x_list = [self.x]
y_list = [self.y]
for tile in list_tiles(x=x_list, y=y_list, acq_min=self.acq_min, acq_max=self.acq_max,
include=self.months, satellites=[satellite for satellite in self.satellites],
dataset_types=self.get_dataset_types()):
yield tile