Regression using TensorFlow with Google Earth Engine Python API#

This tutorial demonstrates how to train a model to predict continuous output (regression) from geospatial data. To do this, we will use the impervious surface area dataset from NLCD and a Landsat 8 composite. Both datasets will be acquired from Google Earth Engine. Using the Keras sequential API, we will show how to train a U-Net model to predict impervious surface area percentage for each pixel in an image.

This tutorial is an adaptation of this example.

This tutorial covers:

  1. Generating and exporting training/testing patches from Google Earth Engine (henceforth referred to as Earth Engine or EE).

  2. Using an iterator to supply batches furing training and testing.

  3. Training the regression model and saving it.

  4. Generating predictions with the trained model and plotting them.

What is regression?#

Suppose we want to predict something like earthquake intensity or soil mositure given a satellite image. These express as continuous targets across pixels. This makes the objective different than classification, where a discrete set of classes are predicted across pixels. Models designed to predict a continuous output are performing regression. In a supervised context, the goal of regression is to produce results with the minimum difference (error) between the true and predicted values. We can approach this through a number of machine learning algorithms, to include deep learning with convolutional neural networks. You may have seen ConvNets used extensively for classification, but they have significant power for modeling regression problems. That said, it’s important to be aware of when deep learning is useful for regression.

There are two common types of regression problems:

  1. Linear, in which the input (independent) values are linearly correlated with the output (dependent) values. For example, if an input variable increases so does the output. Linear regression produces real values of an arbitrary range.

  2. Logistic, which does not assume a linear correlation between the input and output, but rather seeks to model liklihood (probabilities). Logistic regression produces propabilities so within the range of [0,1].

Where a linear relationship exists, we don’t need a complex approach to accurately predict the target dependent variable. It is problems in which the relationship between the independent variables is varied and context-dependent that we might use deep learning for regression. One of the intrinsic characteristics of deep learning is the non-linearity between the layers of a network, which makes them a useful tool for solving logistic regression type problems.

The main distinctions that present when using ConvNets for regression instead of classification are:

  1. The labels are continuous. They will likely appear as float values between an arbitrary range that starts with zero. Note: It’s often helpful in deep learning to scale raw values to a sensible range, e.g. [0,1].

  2. The loss function is different. A common loss function used in regression is mean squared error (MSE), which measures the average of the squared differences between the predicted and true values. The lower the value of MSE the better, with a perfect value being 0.0. Note: squaring is used to amplify and penalize larger differences more than smaller ones.

  3. For regression of real values, either a linear or no activation function is used for the output layer. For regression of probabilities, a logistic activation function is used.

https://www.researchgate.net/profile/Fenglin-Sun-2/publication/352181228/figure/fig2/AS:1032041529368581@1623069283910/A-U-net-regression-architecture-example-of-50-80-pixels-in-the-lowest-resolution.png

Fig. 23 U-Net for regression from “Deep Learning-Based Radar Composite Reflectivity Factor Estimations from Fengyun-4A Geostationary Satellite Observations” by Sun et. al, 2021#

In this tutorial, we’ll look at a practical example of using satellite imagery with descriptive features to predict pixel-wise percentages. Naturally, our labels and predictions will be constrained to a range of [0,1]. The goals will be to train a model that:

  1. Generates output images in which pixels values are percentages learned from input images with descriptive features.

  2. Minimize the difference between the true percentages in the corresponding label images and the predicted percentages in the output images.

Imports and authentication#

# Cloud authentication.
from google.colab import auth
auth.authenticate_user()
# Import, authenticate and initialize the Earth Engine library.
import ee
ee.Authenticate()
ee.Initialize()
import tensorflow as tf
#print(tf.__version__)
from tensorflow.keras import layers, losses, models, metrics, optimizers

import os, folium, glob, json, tifffile
from PIL import Image
import numpy as np
from random import randrange
import time
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow
from pprint import pprint
from google.colab import drive

Some of the operations we’ll be running, particularly model training, take prohibitively long when using Google Colab because the storage backend, Google Drive, has slow IO.

So in some spots we’ll use precomputed files from the tf-eo-devseed-2-processed-outputs Google Drive folder, which you can create a shortcut to by navigating to this link (no need to do this twice if you did it in an earlier lesson): https://drive.google.com/drive/folders/1FrSTn9Iq458qQhTw_7rbaVSc739TTw9V?usp=share_link

We’ll also create a directory in our personal Google Drive to store user generated outputs, tf-eo-devseed-2-user_outputs_dir

if 'google.colab' in str(get_ipython()):
    # mount google drive
    drive.mount('/content/gdrive')
    processed_outputs_dir = '/content/gdrive/My Drive/tf-eo-devseed-2-processed-outputs/'
    user_outputs_dir = '/content/gdrive/My Drive/tf-eo-devseed-2-user_outputs_dir'
    if not os.path.exists(user_outputs_dir):
        os.makedirs(user_outputs_dir)
    print('Running on Colab')
else:
    processed_outputs_dir = os.path.abspath("./data/tf-eo-devseed-2-processed-outputs")
    user_outputs_dir = os.path.abspath('./tf-eo-devseed-2-user_outputs_dir')
    if not os.path.exists(user_outputs_dir):
        os.makedirs(user_outputs_dir)
        os.makedirs(processed_outputs_dir)
    print(f'Not running on Colab, data needs to be downloaded locally at {os.path.abspath(processed_outputs_dir)}')
# Move to your user_outputs_dir directory in order to write data
%cd $user_outputs_dir

Set global parameters#

Below we’ll set up the FEATURES_DICT metadata objects that will help us access the satellite imagery bands we will use for training. We’ll also define the paths to our dataset splits we’ll use for training.

# Specify names locations for outputs in Google Drive.
FOLDER = 'fcnn-demo'
PREDICTIONS = 'predictions'
TEST_PATCHES = 'test_patches'
TRAINING_BASE = 'training_patches'
VAL_BASE = 'val_patches'

# Specify inputs (Landsat bands) to the model and the response variable.
opticalBands = ['B1', 'B2', 'B3', 'B4', 'B5', 'B6', 'B7']
thermalBands = ['B10', 'B11']
BANDS = opticalBands + thermalBands
RESPONSE = 'impervious'
FEATURES = BANDS + [RESPONSE]

# Specify the size and shape of patches expected by the model.
KERNEL_SIZE = 256
KERNEL_SHAPE = [KERNEL_SIZE, KERNEL_SIZE]
COLUMNS = [
  tf.io.FixedLenFeature(shape=KERNEL_SHAPE, dtype=tf.float32) for k in FEATURES
]
FEATURES_DICT = dict(zip(FEATURES, COLUMNS))
FEATURES_DICT

Then, we’ll specify some training parameters.

Since this is a regression problem, we’ll use Mean Squared Error, as opposed to another metric for classification (Accuracy) or segmentation (Dice/Intsersection over Union).

Larger batch sizes are better, as they allow the model to

  1. more accurately estimate the gradient of the loss function

  2. more fully utilize GPU resources for efficient training

Batch size can also affect model generalization, but it is somewhat configuration and problem dependent on whether this helps or hurts model generalization. It’s best to experiment to find out! See Revisiting Small Batch Training for Deep Neural Networks for an investigation on this issue.

SGD is a standard optimizer for deep learning models.

# Sizes of the training and evaluation datasets.
TRAIN_SIZE = 16000
VAL_SIZE = 8000

# Specify model training parameters.
BATCH_SIZE = 16
EPOCHS = 2
BUFFER_SIZE = 2000
OPTIMIZER = 'SGD'
LOSS = 'MeanSquaredError'
METRICS = ['RootMeanSquaredError']
dirs = [f"{FOLDER}", f"{FOLDER}/{PREDICTIONS}",  f"{FOLDER}/{TEST_PATCHES}"]
for d in dirs:
  if not os.path.exists(d):
    os.makedirs(d)
!ls fcnn-demo

Imagery#

Collect and process the input imagery (cloud masking, compositing). Display the composite.

# Use Landsat 8 surface reflectance data.
l8sr = ee.ImageCollection('LANDSAT/LC08/C01/T1_SR')

# Cloud masking function.
def maskL8sr(image):
  cloudShadowBitMask = ee.Number(2).pow(3).int()
  cloudsBitMask = ee.Number(2).pow(5).int()
  qa = image.select('pixel_qa')
  mask1 = qa.bitwiseAnd(cloudShadowBitMask).eq(0).And(
    qa.bitwiseAnd(cloudsBitMask).eq(0))
  mask2 = image.mask().reduce('min')
  mask3 = image.select(opticalBands).gt(0).And(
          image.select(opticalBands).lt(10000)).reduce('min')
  mask = mask1.And(mask2).And(mask3)
  return image.select(opticalBands).divide(10000).addBands(
          image.select(thermalBands).divide(10).clamp(273.15, 373.15)
            .subtract(273.15).divide(100)).updateMask(mask)

# The image input data is a cloud-masked median composite.
image = l8sr.filterDate('2015-01-01', '2017-12-31').map(maskL8sr).median()

# Use folium to visualize the imagery.
mapid = image.getMapId({'bands': ['B4', 'B3', 'B2'], 'min': 0, 'max': 0.3})
map = folium.Map(location=[38., -122.5])
folium.TileLayer(
    tiles=mapid['tile_fetcher'].url_format,
    attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
    overlay=True,
    name='median composite',
  ).add_to(map)

mapid = image.getMapId({'bands': ['B10'], 'min': 0, 'max': 0.5})
folium.TileLayer(
    tiles=mapid['tile_fetcher'].url_format,
    attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
    overlay=True,
    name='thermal',
  ).add_to(map)
map.add_child(folium.LayerControl())
map

Collect the labels (impervious surface area (in fraction of a pixel)) and display.

nlcd = ee.Image('USGS/NLCD/NLCD2016').select('impervious')
nlcd = nlcd.divide(100).float()

mapid = nlcd.getMapId({'min': 0, 'max': 1})
map = folium.Map(location=[38., -122.5])
folium.TileLayer(
    tiles=mapid['tile_fetcher'].url_format,
    attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
    overlay=True,
    name='nlcd impervious',
  ).add_to(map)
map.add_child(folium.LayerControl())
map

Now we will combine the Landsat composite and NLCD impervious surface raster into a single stacked image array. From that, we will break the image down into patches with width and height of 256 pixels.

To convert the EE multi-band image collection to an image array, we use neighborhoodToArray(), then proceed to sample the image at selective areas.

featureStack = ee.Image.cat([
  image.select(BANDS),
  nlcd.select(RESPONSE)
]).float()

list = ee.List.repeat(1, KERNEL_SIZE)
lists = ee.List.repeat(list, KERNEL_SIZE)
kernel = ee.Kernel.fixed(KERNEL_SIZE, KERNEL_SIZE, lists)

arrays = featureStack.neighborhoodToArray(kernel)

Use some pre-made geometries to sample the stack in strategic locations. Specifically, these are hand-made polygons in which to take the 256x256 samples. Display the sampling polygons on a map, red for training polygons, blue for evaluation.

We will strategiclly sample the imagery using some diverse, representative geometries. The red geometries plotted below are for training, while the blue are for validation.

trainingPolys = ee.FeatureCollection('projects/google/DemoTrainingGeometries')
valPolys = ee.FeatureCollection('projects/google/DemoEvalGeometries')

polyImage = ee.Image(0).byte().paint(trainingPolys, 1).paint(valPolys, 2)
polyImage = polyImage.updateMask(polyImage)

mapid = polyImage.getMapId({'min': 1, 'max': 2, 'palette': ['red', 'blue']})
map = folium.Map(location=[38., -100.], zoom_start=5)
folium.TileLayer(
    tiles=mapid['tile_fetcher'].url_format,
    attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
    overlay=True,
    name='training polygons',
  ).add_to(map)
map.add_child(folium.LayerControl())
map
# How many polygons do we have in total?
trainingPolys.size().getInfo(), valPolys.size().getInfo()

Sampling#

Now we will use those geometries to extract samples from the stacked image array. Within each geometry we take a 256x256 neighborhood of pixels around several points, shard them to prevent memory error and then collect them into a single tfrecord. This is done for the training and validation data separately.

Note: for brevity’s sake in this tutorial, we are only sampling from 2 training geometries. You can revise range(2): #range(trainingPolys.size().getInfo()) if you wish to use all geometries.

 # Convert the feature collections to lists for iteration.
trainingPolysList = trainingPolys.toList(trainingPolys.size())
valPolysList = valPolys.toList(valPolys.size())

# These numbers determined experimentally.
n = 200 # Number of shards in each polygon.
N = 2000 # Total sample size in each polygon.

# Export all the training data (in many pieces), with one task
# per geometry.
for g in range(2): #range(trainingPolys.size().getInfo()):
  geomSample = ee.FeatureCollection([])
  for i in range(n):
    sample = arrays.sample(
      region = ee.Feature(trainingPolysList.get(g)).geometry(),
      scale = 30,
      numPixels = N / n, # Size of the shard.
      seed = i,
      tileScale = 8
    )
    geomSample = geomSample.merge(sample)
    #print(geomSample)

  desc = TRAINING_BASE + '_g' + str(g)
  task_train = ee.batch.Export.table.toDrive(
    collection = geomSample,
    description = desc,
    folder = FOLDER,
    fileNamePrefix = desc,
    fileFormat = 'TFRecord',
    selectors = BANDS + [RESPONSE]
  )
  print(FOLDER, desc)
  task_train.start()

We’ll do a similar sampling procedure for the validation data. To keep the dataset size reasonable for the Colab + Google Drive setup, we will only sample and export 1 validation polygon. Revise the corresponding line for the validation set to sample from the full range of validation polygons.


# Export all the evaluation data.
for g in range(1): #range(valPolys.size().getInfo()):
  geomSample = ee.FeatureCollection([])
  for i in range(n):
    sample = arrays.sample(
      region = ee.Feature(valPolysList.get(g)).geometry(),
      scale = 30,
      numPixels = N / n,
      seed = i,
      tileScale = 8
    )
    geomSample = geomSample.merge(sample)

  desc = VAL_BASE + '_g' + str(g)
  task_validation = ee.batch.Export.table.toDrive(
    collection = geomSample,
    description = desc,
    folder = FOLDER,
    fileNamePrefix = desc,
    fileFormat = 'TFRecord',
    selectors = BANDS + [RESPONSE]
  )
  print(FOLDER, desc)
  task_validation.start()

These tasks take a while depending on the amount of data. In this case, we’re exporting a small TFRecord but it still takes 20 minutes because of slow IO to Google Drive. We can use the EE API’s getTaskStatus function to report the status of the background export task.

def check_task_status(task):
    while True:
        status = ee.data.getTaskStatus(task.id)
        if not status:  # if the status list is empty
            print("Task not found.")
            break

        state = status[0].get('state')
        if state == 'RUNNING':
            print("Task is still running.")
            time.sleep(10)  # waits for n seconds before checking again
        else:
            print(f"Task is no longer running. Current state: {state}")
            break

check_task_status(task_validation)
check_task_status(task_train)

Once the tasks finish, we can see the results here.

!ls fcnn-demo

Since the above cell takes a long time to run, let’s cancel it and switch to using the directory with the prepared intermediate and final data.

%cd $processed_outputs_dir

and check that the files are there

!ls fcnn-demo

Training data#

Now that we have exported a TFRecord from Earth Engine, let’s load it into a tf.data.Dataset. Unfortunately, there isn’t a path to load the data directly from Earth Engine into a tf.data.Dataset.

def parse_tfrecord(example_proto):
  """The parsing function.
  Read a serialized example into the structure defined by FEATURES_DICT.
  Args:
    example_proto: a serialized Example.
  Returns:
    A dictionary of tensors, keyed by feature name.
  """
  return tf.io.parse_single_example(example_proto, FEATURES_DICT)


def to_tuple(inputs):
  """Function to convert a dictionary of tensors to a tuple of (inputs, outputs).
  Turn the tensors returned by parse_tfrecord into a stack in HWC shape.
  Args:
    inputs: A dictionary of tensors, keyed by feature name.
  Returns:
    A tuple of (inputs, outputs).
  """
  inputsList = [inputs.get(key) for key in FEATURES]
  stacked = tf.stack(inputsList, axis=0)
  # Convert from CHW to HWC
  stacked = tf.transpose(stacked, [1, 2, 0])
  return stacked[:,:,:len(BANDS)], stacked[:,:,len(BANDS):]


def get_dataset(pattern):
  """Function to read, parse and format to tuple a set of input tfrecord files.
  Get all the files matching the pattern, parse and convert to tuple.
  Args:
    pattern: A file pattern to match in a google drive bucket.
  Returns:
    A tf.data.Dataset
  """
  glob = tf.io.gfile.glob(pattern)
  dataset = tf.data.TFRecordDataset(glob, compression_type='GZIP')
  dataset = dataset.map(parse_tfrecord, num_parallel_calls=5)
  dataset = dataset.map(to_tuple, num_parallel_calls=5)
  return dataset

Parse the training dataset.

def get_training_dataset():
	"""Get the preprocessed training dataset
  Returns:
    A tf.data.Dataset of training data.
  """
	globb = f"{FOLDER}/{TRAINING_BASE}*"
	dataset = get_dataset(globb)
	dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
	return dataset

training = get_training_dataset()

#view the first element
#print(iter(training.take(1)).next())

Validation data#

Parse the validation dataset. Note that the validation dataset has a batch size of 1 which is different from the training dataset. Another distinction is that the validation dataset is not shuffled.

def get_val_dataset():
	"""Get the preprocessed validation dataset
  Returns:
    A tf.data.Dataset of validation data.
  """
	globb = f"{FOLDER}/{VAL_BASE}*"
	dataset = get_dataset(globb)
	dataset = dataset.batch(1).repeat()
	return dataset

validation = get_val_dataset()

Model#

For our model, we will use a Keras implementation of the U-Net model, and provide the network with 256x256 pixel image patches as input. The output will be probabilities for each pixel (a continuous output).

As this is a regression problem, we apply mean squared error as our loss function. As well, we implement a saturating activation function to address any artifacts that are produced from squashing the range of values to [0,1] (something we need to do to make a sensible measurement of impervious surface fraction per pixel).

def conv_block(input_tensor, num_filters):
	encoder = layers.Conv2D(num_filters, (3, 3), padding='same')(input_tensor)
	encoder = layers.BatchNormalization()(encoder)
	encoder = layers.Activation('relu')(encoder)
	encoder = layers.Conv2D(num_filters, (3, 3), padding='same')(encoder)
	encoder = layers.BatchNormalization()(encoder)
	encoder = layers.Activation('relu')(encoder)
	return encoder

def encoder_block(input_tensor, num_filters):
	encoder = conv_block(input_tensor, num_filters)
	encoder_pool = layers.MaxPooling2D((2, 2), strides=(2, 2))(encoder)
	return encoder_pool, encoder

def decoder_block(input_tensor, concat_tensor, num_filters):
	decoder = layers.Conv2DTranspose(num_filters, (2, 2), strides=(2, 2), padding='same')(input_tensor)
	decoder = layers.concatenate([concat_tensor, decoder], axis=-1)
	decoder = layers.BatchNormalization()(decoder)
	decoder = layers.Activation('relu')(decoder)
	decoder = layers.Conv2D(num_filters, (3, 3), padding='same')(decoder)
	decoder = layers.BatchNormalization()(decoder)
	decoder = layers.Activation('relu')(decoder)
	decoder = layers.Conv2D(num_filters, (3, 3), padding='same')(decoder)
	decoder = layers.BatchNormalization()(decoder)
	decoder = layers.Activation('relu')(decoder)
	return decoder

def get_model():
	inputs = layers.Input(shape=[None, None, len(BANDS)]) # 256
	encoder0_pool, encoder0 = encoder_block(inputs, 32) # 128
	encoder1_pool, encoder1 = encoder_block(encoder0_pool, 64) # 64
	encoder2_pool, encoder2 = encoder_block(encoder1_pool, 128) # 32
	encoder3_pool, encoder3 = encoder_block(encoder2_pool, 256) # 16
	encoder4_pool, encoder4 = encoder_block(encoder3_pool, 512) # 8
	center = conv_block(encoder4_pool, 1024) # center
	decoder4 = decoder_block(center, encoder4, 512) # 16
	decoder3 = decoder_block(decoder4, encoder3, 256) # 32
	decoder2 = decoder_block(decoder3, encoder2, 128) # 64
	decoder1 = decoder_block(decoder2, encoder1, 64) # 128
	decoder0 = decoder_block(decoder1, encoder0, 32) # 256
	outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(decoder0)

	model = models.Model(inputs=[inputs], outputs=[outputs])

	model.compile(
		optimizer=optimizers.get(OPTIMIZER),
		loss=losses.get(LOSS),
		metrics=[metrics.get(metric) for metric in METRICS])

	return model

Training the model#

Now we train the compiled network by calling .fit(). Typically we would train at least a full epoch, which is a full pass through the trainind dataset. However, this takes prohibitively long when using Google Clab + Google Drive for storage, an epoch takes 10 minutes on a T4 GPU. We will instead train for 2 reduced epochs of step size 20, meaning we will train on 40 samples, which works for a demo. On a faster infrasturcture setup, the model may improve with more iteration, which can be experimented with by hyperparameter tuning and implementation of early stopping mechanisms.

Notice in the Google Colab Resource usage window how long it takes for GPU memory to start being utilized. it’s helpful during training to inspect the GPU and CPU resource usage in real time to understand what resources your model training needs.

m = get_model()
m.fit(
    x=training,
    epochs=EPOCHS,
    steps_per_epoch=20, # typically we would specify int(TRAIN_SIZE / BATCH_SIZE) to iterate through all batches
    validation_data=validation,
    validation_steps=VAL_SIZE)

Let’s save the trained model to file. We need to switch back to the user outputs dir briefly in order to save to a writeable user folder in your personal Google Drive.

%cd $user_outputs_dir
save_model_path = os.path.join(f"{FOLDER}/model_out_batch_{BATCH_SIZE}_ep{EPOCHS}/")
if (not os.path.isdir(save_model_path)):
  os.mkdir(save_model_path)
m.save(save_model_path)

We will switch back to the already processed outputs to load a model trained for 2 full epochs.

%cd $processed_outputs_dir

After saving the trained model, you may want to run predictions later. Let’s try loading our saved model to show how it can be reused without having to retrain the model.

# Load a trained model.
MODEL_DIR = f"{FOLDER}/model_out_batch_{BATCH_SIZE}_ep{EPOCHS}/"
m = tf.keras.models.load_model(MODEL_DIR)
m.summary()

Prediction#

Now, let’s make predictions for a new area to see how the model generalizes to new data. Bear in mind, the model was trained on data in the US because that is where the labels were available, but let’s see the trained model applies to a region of Lima, Peru.

Again, we will use a defined geometry to process and export imagery from Earth Engine in TFRecord format. Then we’ll use our trained model to predict impervious surface area percentages on the new imagery and write the predictions to both a TFRecord file and image patches (true color and prediction).

We separate the image export from the predict function because the export only needs to happen once, but perhaps you’ll experiment with the model setup and run new predictions several times.

def doExport(out_image_base, kernel_buffer, region):
  """Run the image export task.  Block until complete.
  """

  task = ee.batch.Export.image.toDrive(
    image = image.select(BANDS),
    description = out_image_base,
    folder = FOLDER,
    fileNamePrefix = f"{out_image_base}",
    region = region.getInfo()['coordinates'],
    scale = 30,
    fileFormat = 'TFRecord',
    maxPixels = 1e10,
    formatOptions = {
      'patchDimensions': KERNEL_SHAPE,
      'kernelSize': kernel_buffer,
      'compressed': True,
      'maxFileSize': 104857600
    }
  )

  task.start()

  # Block until the task completes.
  print('Running image export to google drive...')
  import time
  while task.active():
    time.sleep(30)

  # Error condition
  if task.status()['state'] != 'COMPLETED':
    print('Error with image export.')
  else:
    print('Image export completed.')
def get_files_list(out_image_base):
    """Retrieve the list of image and JSON files."""
    filesList = glob.glob(f"{FOLDER}/*")
    exportFilesList = [s for s in filesList if out_image_base in s]

    imageFilesList = []
    jsonFile = None
    for f in exportFilesList:
        if f.endswith('.tfrecord.gz'):
            imageFilesList.append(f)
        elif f.endswith('.json'):
            jsonFile = f
            
    imageFilesList.sort()
    
    return imageFilesList, jsonFile

def load_json(jsonFile):
    """Load the JSON mixer file."""
    jsonText = !cat {jsonFile}
    mixer = json.loads(jsonText.nlstr)
    return mixer

def save_predictions(predictions, patches, x_buffer, y_buffer):
    """Process and save the predictions."""
    for i, prediction in zip(range(len(predictions)), predictions):
        predictionPatch = prediction[
            x_buffer:x_buffer+KERNEL_SIZE, y_buffer:y_buffer+KERNEL_SIZE]
        p_image_array = np.array(predictionPatch)
        tifffile.imsave(f"{FOLDER}/{PREDICTIONS}/patch_pred_{i}.tif", p_image_array)

def get_image_features_dict(bands, buffered_shape):
    """Generate the image features dictionary."""
    imageColumns = [
        tf.io.FixedLenFeature(shape=buffered_shape, dtype=tf.float32) for k in bands
    ]
    imageFeaturesDict = dict(zip(bands, imageColumns))
    return imageFeaturesDict

def process_dataset(imageFilesList, imageFeaturesDict):
    """Process the image dataset."""
    def parse_image(example_proto):
        return tf.io.parse_single_example(example_proto, imageFeaturesDict)

    def toTupleImage(inputs):
        inputsList = [inputs.get(key) for key in BANDS]
        stacked = tf.stack(inputsList, axis=0)
        stacked = tf.transpose(stacked, [1, 2, 0])
        return stacked

    imageDataset = tf.data.TFRecordDataset(imageFilesList, compression_type='GZIP')
    imageDataset = imageDataset.map(parse_image, num_parallel_calls=5)
    imageDataset = imageDataset.map(toTupleImage).batch(1)
    
    return imageDataset

def doPrediction(out_image_base, kernel_buffer, save_prediction = False):
    """Perform inference on exported imagery, upload to Earth Engine."""
    # Assuming BANDS, KERNEL_SHAPE, and FOLDER are defined globally or can be set here
    
    # 1. Calculate required parameters
    x_buffer = int(kernel_buffer[0] / 2)
    y_buffer = int(kernel_buffer[1] / 2)
    buffered_shape = [
        KERNEL_SHAPE[0] + kernel_buffer[0],
        KERNEL_SHAPE[1] + kernel_buffer[1]
    ]
    
    # 2. Retrieve the list of files
    imageFilesList, jsonFile = get_files_list(out_image_base)

    # 3. Generate the image features dictionary
    imageFeaturesDict = get_image_features_dict(BANDS, buffered_shape)

    # 4. Process the image dataset
    imageDataset = process_dataset(imageFilesList, imageFeaturesDict)

    # 5. Perform predictions
    mixer = load_json(jsonFile)
    patches = mixer['totalPatches']
    predictions = m.predict(imageDataset, steps=patches, verbose=1)

    # 6. Save the predictions
    if save_prediction:
      save_predictions(predictions, patches, x_buffer, y_buffer)

    return predictions, patches

Let’s supply our area of interest to make predictions for. We also provide some string parameters for file naming and finally, the shape for the image outputs. On that, the model can accept larger dimensions than 256x256 (which it was trained on) provided that they are uniform in width and height (note that we didn’t specify an input shape in the first layer of the network) but at some point as the dimensions increase a memory ceiling will be encountered (reference). So, proceed with caution on that. We will try one technique here to address the common issue of edge artifacts. Specifically, we will buffer our images during prediction using a 128x128 kernel, which pads the image with an additional 64 pixels on both width and height, and then clip the prediction down to the original central region of 256x256 pixels.

# Output assets

# Base file name to use for TFRecord files and assets.
lima_image_base = 'FCNN_demo_lima_384_'
# Half this will extend on the sides of each patch.
lima_kernel_buffer = [128, 128]
# Lima [-77.133581 -12.164808 -76.876993 -11.93859]
lima_region = ee.Geometry.Polygon(
        [[[-77.133581, -11.93859],
          [-77.133581, -12.164808],
          [-76.876993, -12.164808],
          [-76.876993, -11.93859]]], None, False)

Exporting the image we ran prediction takes too long with Colab + Google Drive, so we’ll comment this out for now.

# Run the export.
# doExport(lima_image_base, lima_kernel_buffer, lima_region)
# Run the prediction.
predictions, patches = doPrediction(lima_image_base, lima_kernel_buffer, save_prediction=False)

Display the output#

Let’s take a look at randomly indexed samples from our prediction output. We also will check the minimum and maximum values (impervious surface area percentages) in the prediction image.

import matplotlib.gridspec as gridspec

# Define a gridspec layout to keep the left and right plot the same size
fig = plt.figure(figsize=(12, 6))
gs = gridspec.GridSpec(1, 3, width_ratios=[1, 1, 0.05])

ax0 = plt.subplot(gs[0])
ax1 = plt.subplot(gs[1])
cax = plt.subplot(gs[2])

index_random = randrange(1) # randomly pick different integers
print(index_random)

ax0.imshow(np.array(Image.open(f"{FOLDER}/{TEST_PATCHES}/patch_test_{index_random}.tif")))
ax0.set_title(f'RGB').set_fontsize(14)

# Show the predicted image with a colormap (e.g., 'viridis') which indicates probability.
im = ax1.imshow(np.array(Image.open(f"{FOLDER}/{PREDICTIONS}/patch_pred_{index_random}.tif")), cmap='viridis', vmin=0, vmax=1)
ax1.set_title(f'Predicted impervious surface area percentage').set_fontsize(14)

# Add a colorbar for the prediction image in its own axis
cbar = fig.colorbar(im, cax=cax, orientation='vertical')

plt.tight_layout()
plt.show()
image_test = np.array(Image.open(f"{FOLDER}/{PREDICTIONS}/patch_pred_{index_random}.tif"))
image_test.min(), image_test.max()