DL之MaskR-CNN:基于类MaskR-CNN算法(RetinaNet+mask head)利用数据集(resnet50_coco_v0.2.0.h5)实现图像分割

DL之MaskR-CNN:基于类MaskR-CNN算法(RetinaNet+mask head)利用数据集(resnet50_coco_v0.2.0.h5)实现图像分割


输出结果

更新……

设计思路

参考文章:DL之MaskR-CNN:Mask R-CNN算法的简介(论文介绍)、架构详解、案例应用等配图集合之详细攻略
    在ResNet的基础上,增加了ROI_Align、mask_submodel、masks(ConcatenateBoxes,计算loss的拼接)。

核心代码

更新……

1、retinanet.py

default_mask_model函数内,定义了类别个数num_classes、金字塔特征的大小pyramid_feature_size=256等
    mask_feature_size=256,
    roi_size=(14, 14),
    mask_size=(28, 28),

"""
Copyright 2017-2018 Fizyr (https://fizyr.com)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import keras
import keras.backend

import keras.models
import keras_retinanet.layers
import keras_retinanet.models.retinanet
import keras_retinanet.backend.tensorflow_backend as backend

from ..layers.roi import RoiAlign
from ..layers.upsample import Upsample
from ..layers.misc import Shape, ConcatenateBoxes, Cast

def default_mask_model(
    num_classes,
    pyramid_feature_size=256,
    mask_feature_size=256,
    roi_size=(14, 14),
    mask_size=(28, 28),
    name='mask_submodel',
    mask_dtype=keras.backend.floatx(),
    retinanet_dtype=keras.backend.floatx()
):

    options = {
        'kernel_size'        : 3,
        'strides'            : 1,
        'padding'            : 'same',
        'kernel_initializer' : keras.initializers.normal(mean=0.0, stddev=0.01, seed=None),
        'bias_initializer'   : 'zeros',
        'activation'         : 'relu',
    }

    inputs  = keras.layers.Input(shape=(None, roi_size[0], roi_size[1], pyramid_feature_size))
    outputs = inputs

    # casting to the desidered data type, which may be different than
    # the one used for the underlying keras-retinanet model
    if mask_dtype != retinanet_dtype:
        outputs = keras.layers.TimeDistributed(
            Cast(dtype=mask_dtype),
            name='cast_masks')(outputs)

    for i in range(4):
        outputs = keras.layers.TimeDistributed(keras.layers.Conv2D(
            filters=mask_feature_size,
            **options
        ), name='roi_mask_{}'.format(i))(outputs)

    # perform upsampling + conv instead of deconv as in the paper
    # https://distill.pub/2016/deconv-checkerboard/
    outputs = keras.layers.TimeDistributed(
        Upsample(mask_size),
        name='roi_mask_upsample')(outputs)
    outputs = keras.layers.TimeDistributed(keras.layers.Conv2D(
        filters=mask_feature_size,
        **options
    ), name='roi_mask_features')(outputs)

    outputs = keras.layers.TimeDistributed(keras.layers.Conv2D(
        filters=num_classes,
        kernel_size=1,
        activation='sigmoid'
    ), name='roi_mask')(outputs)

    # casting back to the underlying keras-retinanet model data type
    if mask_dtype != retinanet_dtype:
        outputs = keras.layers.TimeDistributed(
            Cast(dtype=retinanet_dtype),
            name='recast_masks')(outputs)

    return keras.models.Model(inputs=inputs, outputs=outputs, name=name)

def default_roi_submodels(num_classes, mask_dtype=keras.backend.floatx(), retinanet_dtype=keras.backend.floatx()):
    return [
        ('masks', default_mask_model(num_classes, mask_dtype=mask_dtype, retinanet_dtype=retinanet_dtype)),
    ]

def retinanet_mask(
    inputs,
    num_classes,
    retinanet_model=None,
    anchor_params=None,
    nms=True,
    class_specific_filter=True,
    name='retinanet-mask',
    roi_submodels=None,
    mask_dtype=keras.backend.floatx(),
    modifier=None,
    **kwargs
):
    """ Construct a RetinaNet mask model on top of a retinanet bbox model.

    This model uses the retinanet bbox model and appends a few layers to compute masks.

    # Arguments
        inputs                : List of keras.layers.Input. The first input is the image, the second input the blob of masks.
        num_classes           : Number of classes to classify.
        retinanet_model       : keras_retinanet.models.retinanet model, returning regression and classification values.
        anchor_params         : Struct containing anchor parameters. If None, default values are used.
        nms                   : Use NMS.
        class_specific_filter : Use class specific filtering.
        roi_submodels         : Submodels for processing ROIs.
        mask_dtype            : Data type of the masks, can be different from the main one.
        modifier              : Modifier for the underlying retinanet model, such as freeze.
        name                  : Name of the model.
        **kwargs              : Additional kwargs to pass to the retinanet bbox model.
    # Returns
        Model with inputs as input and as output the output of each submodel for each pyramid level and the detections.

        The order is as defined in submodels.
        ```
        [
            regression, classification, other[0], other[1], ..., boxes_masks, boxes, scores, labels, masks, other[0], other[1], ...
        ]
        ```
    """
    if anchor_params is None:
        anchor_params = keras_retinanet.utils.anchors.AnchorParameters.default

    if roi_submodels is None:
        retinanet_dtype = keras.backend.floatx()
        keras.backend.set_floatx(mask_dtype)
        roi_submodels = default_roi_submodels(num_classes, mask_dtype, retinanet_dtype)
        keras.backend.set_floatx(retinanet_dtype)

    image = inputs
    image_shape = Shape()(image)

    if retinanet_model is None:
        retinanet_model = keras_retinanet.models.retinanet.retinanet(
            inputs=image,
            num_classes=num_classes,
            num_anchors=anchor_params.num_anchors(),
            **kwargs
        )

    if modifier:
        retinanet_model = modifier(retinanet_model)

    # parse outputs
    regression     = retinanet_model.outputs[0]
    classification = retinanet_model.outputs[1]
    other          = retinanet_model.outputs[2:]
    features       = [retinanet_model.get_layer(name).output for name in ['P3', 'P4', 'P5', 'P6', 'P7']]

    # build boxes
    anchors = keras_retinanet.models.retinanet.__build_anchors(anchor_params, features)
    boxes = keras_retinanet.layers.RegressBoxes(name='boxes')([anchors, regression])
    boxes = keras_retinanet.layers.ClipBoxes(name='clipped_boxes')([image, boxes])

    # filter detections (apply NMS / score threshold / select top-k)
    detections = keras_retinanet.layers.FilterDetections(
        nms                   = nms,
        class_specific_filter = class_specific_filter,
        max_detections        = 100,
        name                  = 'filtered_detections'
    )([boxes, classification] + other)

    # split up in known outputs and "other"
    boxes  = detections[0]
    scores = detections[1]

    # get the region of interest features
    rois = RoiAlign()([image_shape, boxes, scores] + features)

    # execute maskrcnn submodels
    maskrcnn_outputs = [submodel(rois) for _, submodel in roi_submodels]

    # concatenate boxes for loss computation
    trainable_outputs = [ConcatenateBoxes(name=name)([boxes, output]) for (name, _), output in zip(roi_submodels, maskrcnn_outputs)]

    # reconstruct the new output
    outputs = [regression, classification] + other + trainable_outputs + detections + maskrcnn_outputs

    return keras.models.Model(inputs=inputs, outputs=outputs, name=name)

2、resnet.py

作为骨架,resnet_maskrcnn模型,代码中,也可选用resnet50、resnet101、resnet152骨架模型。

"""
Copyright 2017-2018 Fizyr (https://fizyr.com)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import warnings

import keras
import keras_resnet
import keras_resnet.models
import keras_retinanet.models.resnet
from ..models import retinanet, Backbone

class ResNetBackbone(Backbone, keras_retinanet.models.resnet.ResNetBackbone):
    def maskrcnn(self, *args, **kwargs):
        """ Returns a maskrcnn model using the correct backbone.
        """
        return resnet_maskrcnn(*args, backbone=self.backbone, **kwargs)

def resnet_maskrcnn(num_classes, backbone='resnet50', inputs=None, modifier=None, mask_dtype=keras.backend.floatx(), **kwargs):
    # choose default input
    if inputs is None:
        inputs = keras.layers.Input(shape=(None, None, 3), name='image')

    # create the resnet backbone
    if backbone == 'resnet50':
        resnet = keras_resnet.models.ResNet50(inputs, include_top=False, freeze_bn=True)
    elif backbone == 'resnet101':
        resnet = keras_resnet.models.ResNet101(inputs, include_top=False, freeze_bn=True)
    elif backbone == 'resnet152':
        resnet = keras_resnet.models.ResNet152(inputs, include_top=False, freeze_bn=True)

    # invoke modifier if given
    if modifier:
        resnet = modifier(resnet)

    # create the full model
    model = retinanet.retinanet_mask(inputs=inputs, num_classes=num_classes, backbone_layers=resnet.outputs[1:], mask_dtype=mask_dtype, **kwargs)

    return model

def resnet50_maskrcnn(num_classes, inputs=None, **kwargs):
    return resnet_maskrcnn(num_classes=num_classes, backbone='resnet50', inputs=inputs, **kwargs)

def resnet101_maskrcnn(num_classes, inputs=None, **kwargs):
    return resnet_maskrcnn(num_classes=num_classes, backbone='resnet101', inputs=inputs, **kwargs)

def resnet152_maskrcnn(num_classes, inputs=None, **kwargs):
    return resnet_maskrcnn(num_classes=num_classes, backbone='resnet152', inputs=inputs, **kwargs)

3、roi.py

包含RoiAlign类的实现,包含map_to_level等计算函数

import keras.backend
import keras.layers
import keras_retinanet.backend

from .. import backend

class RoiAlign(keras.layers.Layer):
    def __init__(self, crop_size=(14, 14), **kwargs):
        self.crop_size = crop_size

        super(RoiAlign, self).__init__(**kwargs)

    def map_to_level(self, boxes, canonical_size=224, canonical_level=1, min_level=0, max_level=4):
        x1 = boxes[:, 0]
        y1 = boxes[:, 1]
        x2 = boxes[:, 2]
        y2 = boxes[:, 3]

        w = x2 - x1
        h = y2 - y1

        size = keras.backend.sqrt(w * h)

        levels = backend.floor(canonical_level + backend.log2(size / canonical_size + keras.backend.epsilon()))
        levels = keras.backend.clip(levels, min_level, max_level)

        return levels

    def call(self, inputs, **kwargs):
        # TODO: Support batch_size > 1
        image_shape = keras.backend.cast(inputs[0], keras.backend.floatx())
        boxes       = keras.backend.stop_gradient(inputs[1][0])
        scores      = keras.backend.stop_gradient(inputs[2][0])
        fpn         = [keras.backend.stop_gradient(i[0]) for i in inputs[3:]]

        # compute from which level to get features from
        target_levels = self.map_to_level(boxes)

        # process each pyramid independently
        rois           = []
        ordered_indices = []
        for i in range(len(fpn)):
            # select the boxes and classification from this pyramid level
            indices = keras_retinanet.backend.where(keras.backend.equal(target_levels, i))
            ordered_indices.append(indices)

            level_boxes = keras_retinanet.backend.gather_nd(boxes, indices)
            fpn_shape   = keras.backend.cast(keras.backend.shape(fpn[i]), dtype=keras.backend.floatx())

            # convert to expected format for crop_and_resize
            x1 = level_boxes[:, 0]
            y1 = level_boxes[:, 1]
            x2 = level_boxes[:, 2]
            y2 = level_boxes[:, 3]
            level_boxes = keras.backend.stack([
                (y1 / image_shape[1] * fpn_shape[0]) / (fpn_shape[0] - 1),
                (x1 / image_shape[2] * fpn_shape[1]) / (fpn_shape[1] - 1),
                (y2 / image_shape[1] * fpn_shape[0] - 1) / (fpn_shape[0] - 1),
                (x2 / image_shape[2] * fpn_shape[1] - 1) / (fpn_shape[1] - 1),
            ], axis=1)

            # append the rois to the list of rois
            rois.append(backend.crop_and_resize(
                keras.backend.expand_dims(fpn[i], axis=0),
                level_boxes,
                keras.backend.zeros((keras.backend.shape(level_boxes)[0],), dtype='int32'),
                self.crop_size
            ))

        # concatenate rois to one blob
        rois = keras.backend.concatenate(rois, axis=0)

        # reorder rois back to original order
        indices = keras.backend.concatenate(ordered_indices, axis=0)
        rois    = keras_retinanet.backend.scatter_nd(indices, rois, keras.backend.cast(keras.backend.shape(rois), 'int64'))

        return keras.backend.expand_dims(rois, axis=0)

    def compute_output_shape(self, input_shape):
        return (input_shape[1][0], None, self.crop_size[0], self.crop_size[1], input_shape[3][-1])

    def get_config(self):
        config = super(RoiAlign, self).get_config()
        config.update({
            'crop_size' : self.crop_size,
        })

        return config

4、losses.py

MaskR-CNN计算损失
        # compute mask loss
        mask_loss  = keras.backend.binary_crossentropy(masks_target, masks)
        normalizer = keras.backend.shape(masks)[0] * keras.backend.shape(masks)[1] * keras.backend.shape(masks)[2]
        normalizer = keras.backend.maximum(keras.backend.cast(normalizer, keras.backend.floatx()), 1)
        mask_loss  = keras.backend.sum(mask_loss) / normalizer

import keras.backend
import keras_retinanet.backend
from . import backend

def mask(iou_threshold=0.5, mask_size=(28, 28)):
    def _mask_conditional(y_true, y_pred):
        # if there are no masks annotations, return 0; else, compute the masks loss
        loss = backend.cond(
            keras.backend.any(keras.backend.equal(keras.backend.shape(y_true), 0)),
            lambda: keras.backend.cast_to_floatx(0.0),
            lambda: _mask(y_true, y_pred, iou_threshold=iou_threshold, mask_size=mask_size)
        )
        return loss

    def _mask(y_true, y_pred, iou_threshold=0.5, mask_size=(28, 28)):
        # split up the different predicted blobs
        boxes = y_pred[:, :, :4]
        masks = y_pred[:, :, 4:]

        # split up the different blobs
        annotations  = y_true[:, :, :5]
        width        = keras.backend.cast(y_true[0, 0, 5], dtype='int32')
        height       = keras.backend.cast(y_true[0, 0, 6], dtype='int32')
        masks_target = y_true[:, :, 7:]

        # reshape the masks back to their original size
        masks_target = keras.backend.reshape(masks_target, (keras.backend.shape(masks_target)[0], keras.backend.shape(masks_target)[1], height, width))
        masks        = keras.backend.reshape(masks, (keras.backend.shape(masks)[0], keras.backend.shape(masks)[1], mask_size[0], mask_size[1], -1))

        # TODO: Fix batch_size > 1
        boxes        = boxes[0]
        masks        = masks[0]
        annotations  = annotations[0]
        masks_target = masks_target[0]

        # compute overlap of boxes with annotations
        iou                  = backend.overlap(boxes, annotations)
        argmax_overlaps_inds = keras.backend.argmax(iou, axis=1)
        max_iou              = keras.backend.max(iou, axis=1)

        # filter those with IoU > 0.5
        indices              = keras_retinanet.backend.where(keras.backend.greater_equal(max_iou, iou_threshold))
        boxes                = keras_retinanet.backend.gather_nd(boxes, indices)
        masks                = keras_retinanet.backend.gather_nd(masks, indices)
        argmax_overlaps_inds = keras.backend.cast(keras_retinanet.backend.gather_nd(argmax_overlaps_inds, indices), 'int32')
        labels               = keras.backend.cast(keras.backend.gather(annotations[:, 4], argmax_overlaps_inds), 'int32')

        # make normalized boxes
        x1 = boxes[:, 0]
        y1 = boxes[:, 1]
        x2 = boxes[:, 2]
        y2 = boxes[:, 3]
        boxes = keras.backend.stack([
            y1 / (keras.backend.cast(height, dtype=keras.backend.floatx()) - 1),
            x1 / (keras.backend.cast(width, dtype=keras.backend.floatx()) - 1),
            (y2 - 1) / (keras.backend.cast(height, dtype=keras.backend.floatx()) - 1),
            (x2 - 1) / (keras.backend.cast(width, dtype=keras.backend.floatx()) - 1),
        ], axis=1)

        # crop and resize masks_target
        masks_target = keras.backend.expand_dims(masks_target, axis=3)  # append a fake channel dimension
        masks_target = backend.crop_and_resize(
            masks_target,
            boxes,
            argmax_overlaps_inds,
            mask_size
        )
        masks_target = masks_target[:, :, :, 0]  # remove fake channel dimension

        # gather the predicted masks using the annotation label
        masks = backend.transpose(masks, (0, 3, 1, 2))
        label_indices = keras.backend.stack([
            keras.backend.arange(keras.backend.shape(labels)[0]),
            labels
        ], axis=1)
        masks = keras_retinanet.backend.gather_nd(masks, label_indices)

        # compute mask loss
        mask_loss  = keras.backend.binary_crossentropy(masks_target, masks)
        normalizer = keras.backend.shape(masks)[0] * keras.backend.shape(masks)[1] * keras.backend.shape(masks)[2]
        normalizer = keras.backend.maximum(keras.backend.cast(normalizer, keras.backend.floatx()), 1)
        mask_loss  = keras.backend.sum(mask_loss) / normalizer

        return mask_loss

    return _mask_conditional
(0)

相关推荐