DL之MaskR-CNN:基于类MaskR-CNN算法(RetinaNet+mask head)利用数据集(resnet50_coco_v0.2.0.h5)实现图像分割
DL之MaskR-CNN:基于类MaskR-CNN算法(RetinaNet+mask head)利用数据集(resnet50_coco_v0.2.0.h5)实现图像分割
输出结果
更新……
设计思路
参考文章:DL之MaskR-CNN:Mask R-CNN算法的简介(论文介绍)、架构详解、案例应用等配图集合之详细攻略
在ResNet的基础上,增加了ROI_Align、mask_submodel、masks(ConcatenateBoxes,计算loss的拼接)。
核心代码
更新……
1、retinanet.py
default_mask_model函数内,定义了类别个数num_classes、金字塔特征的大小pyramid_feature_size=256等
mask_feature_size=256,
roi_size=(14, 14),
mask_size=(28, 28),
"""
Copyright 2017-2018 Fizyr (https://fizyr.com)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import keras
import keras.backend
import keras.models
import keras_retinanet.layers
import keras_retinanet.models.retinanet
import keras_retinanet.backend.tensorflow_backend as backend
from ..layers.roi import RoiAlign
from ..layers.upsample import Upsample
from ..layers.misc import Shape, ConcatenateBoxes, Cast
def default_mask_model(
num_classes,
pyramid_feature_size=256,
mask_feature_size=256,
roi_size=(14, 14),
mask_size=(28, 28),
name='mask_submodel',
mask_dtype=keras.backend.floatx(),
retinanet_dtype=keras.backend.floatx()
):
options = {
'kernel_size' : 3,
'strides' : 1,
'padding' : 'same',
'kernel_initializer' : keras.initializers.normal(mean=0.0, stddev=0.01, seed=None),
'bias_initializer' : 'zeros',
'activation' : 'relu',
}
inputs = keras.layers.Input(shape=(None, roi_size[0], roi_size[1], pyramid_feature_size))
outputs = inputs
# casting to the desidered data type, which may be different than
# the one used for the underlying keras-retinanet model
if mask_dtype != retinanet_dtype:
outputs = keras.layers.TimeDistributed(
Cast(dtype=mask_dtype),
name='cast_masks')(outputs)
for i in range(4):
outputs = keras.layers.TimeDistributed(keras.layers.Conv2D(
filters=mask_feature_size,
**options
), name='roi_mask_{}'.format(i))(outputs)
# perform upsampling + conv instead of deconv as in the paper
# https://distill.pub/2016/deconv-checkerboard/
outputs = keras.layers.TimeDistributed(
Upsample(mask_size),
name='roi_mask_upsample')(outputs)
outputs = keras.layers.TimeDistributed(keras.layers.Conv2D(
filters=mask_feature_size,
**options
), name='roi_mask_features')(outputs)
outputs = keras.layers.TimeDistributed(keras.layers.Conv2D(
filters=num_classes,
kernel_size=1,
activation='sigmoid'
), name='roi_mask')(outputs)
# casting back to the underlying keras-retinanet model data type
if mask_dtype != retinanet_dtype:
outputs = keras.layers.TimeDistributed(
Cast(dtype=retinanet_dtype),
name='recast_masks')(outputs)
return keras.models.Model(inputs=inputs, outputs=outputs, name=name)
def default_roi_submodels(num_classes, mask_dtype=keras.backend.floatx(), retinanet_dtype=keras.backend.floatx()):
return [
('masks', default_mask_model(num_classes, mask_dtype=mask_dtype, retinanet_dtype=retinanet_dtype)),
]
def retinanet_mask(
inputs,
num_classes,
retinanet_model=None,
anchor_params=None,
nms=True,
class_specific_filter=True,
name='retinanet-mask',
roi_submodels=None,
mask_dtype=keras.backend.floatx(),
modifier=None,
**kwargs
):
""" Construct a RetinaNet mask model on top of a retinanet bbox model.
This model uses the retinanet bbox model and appends a few layers to compute masks.
# Arguments
inputs : List of keras.layers.Input. The first input is the image, the second input the blob of masks.
num_classes : Number of classes to classify.
retinanet_model : keras_retinanet.models.retinanet model, returning regression and classification values.
anchor_params : Struct containing anchor parameters. If None, default values are used.
nms : Use NMS.
class_specific_filter : Use class specific filtering.
roi_submodels : Submodels for processing ROIs.
mask_dtype : Data type of the masks, can be different from the main one.
modifier : Modifier for the underlying retinanet model, such as freeze.
name : Name of the model.
**kwargs : Additional kwargs to pass to the retinanet bbox model.
# Returns
Model with inputs as input and as output the output of each submodel for each pyramid level and the detections.
The order is as defined in submodels.
```
[
regression, classification, other[0], other[1], ..., boxes_masks, boxes, scores, labels, masks, other[0], other[1], ...
]
```
"""
if anchor_params is None:
anchor_params = keras_retinanet.utils.anchors.AnchorParameters.default
if roi_submodels is None:
retinanet_dtype = keras.backend.floatx()
keras.backend.set_floatx(mask_dtype)
roi_submodels = default_roi_submodels(num_classes, mask_dtype, retinanet_dtype)
keras.backend.set_floatx(retinanet_dtype)
image = inputs
image_shape = Shape()(image)
if retinanet_model is None:
retinanet_model = keras_retinanet.models.retinanet.retinanet(
inputs=image,
num_classes=num_classes,
num_anchors=anchor_params.num_anchors(),
**kwargs
)
if modifier:
retinanet_model = modifier(retinanet_model)
# parse outputs
regression = retinanet_model.outputs[0]
classification = retinanet_model.outputs[1]
other = retinanet_model.outputs[2:]
features = [retinanet_model.get_layer(name).output for name in ['P3', 'P4', 'P5', 'P6', 'P7']]
# build boxes
anchors = keras_retinanet.models.retinanet.__build_anchors(anchor_params, features)
boxes = keras_retinanet.layers.RegressBoxes(name='boxes')([anchors, regression])
boxes = keras_retinanet.layers.ClipBoxes(name='clipped_boxes')([image, boxes])
# filter detections (apply NMS / score threshold / select top-k)
detections = keras_retinanet.layers.FilterDetections(
nms = nms,
class_specific_filter = class_specific_filter,
max_detections = 100,
name = 'filtered_detections'
)([boxes, classification] + other)
# split up in known outputs and "other"
boxes = detections[0]
scores = detections[1]
# get the region of interest features
rois = RoiAlign()([image_shape, boxes, scores] + features)
# execute maskrcnn submodels
maskrcnn_outputs = [submodel(rois) for _, submodel in roi_submodels]
# concatenate boxes for loss computation
trainable_outputs = [ConcatenateBoxes(name=name)([boxes, output]) for (name, _), output in zip(roi_submodels, maskrcnn_outputs)]
# reconstruct the new output
outputs = [regression, classification] + other + trainable_outputs + detections + maskrcnn_outputs
return keras.models.Model(inputs=inputs, outputs=outputs, name=name)
2、resnet.py
作为骨架,resnet_maskrcnn模型,代码中,也可选用resnet50、resnet101、resnet152骨架模型。
"""
Copyright 2017-2018 Fizyr (https://fizyr.com)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import warnings
import keras
import keras_resnet
import keras_resnet.models
import keras_retinanet.models.resnet
from ..models import retinanet, Backbone
class ResNetBackbone(Backbone, keras_retinanet.models.resnet.ResNetBackbone):
def maskrcnn(self, *args, **kwargs):
""" Returns a maskrcnn model using the correct backbone.
"""
return resnet_maskrcnn(*args, backbone=self.backbone, **kwargs)
def resnet_maskrcnn(num_classes, backbone='resnet50', inputs=None, modifier=None, mask_dtype=keras.backend.floatx(), **kwargs):
# choose default input
if inputs is None:
inputs = keras.layers.Input(shape=(None, None, 3), name='image')
# create the resnet backbone
if backbone == 'resnet50':
resnet = keras_resnet.models.ResNet50(inputs, include_top=False, freeze_bn=True)
elif backbone == 'resnet101':
resnet = keras_resnet.models.ResNet101(inputs, include_top=False, freeze_bn=True)
elif backbone == 'resnet152':
resnet = keras_resnet.models.ResNet152(inputs, include_top=False, freeze_bn=True)
# invoke modifier if given
if modifier:
resnet = modifier(resnet)
# create the full model
model = retinanet.retinanet_mask(inputs=inputs, num_classes=num_classes, backbone_layers=resnet.outputs[1:], mask_dtype=mask_dtype, **kwargs)
return model
def resnet50_maskrcnn(num_classes, inputs=None, **kwargs):
return resnet_maskrcnn(num_classes=num_classes, backbone='resnet50', inputs=inputs, **kwargs)
def resnet101_maskrcnn(num_classes, inputs=None, **kwargs):
return resnet_maskrcnn(num_classes=num_classes, backbone='resnet101', inputs=inputs, **kwargs)
def resnet152_maskrcnn(num_classes, inputs=None, **kwargs):
return resnet_maskrcnn(num_classes=num_classes, backbone='resnet152', inputs=inputs, **kwargs)
3、roi.py
包含RoiAlign类的实现,包含map_to_level等计算函数
import keras.backend
import keras.layers
import keras_retinanet.backend
from .. import backend
class RoiAlign(keras.layers.Layer):
def __init__(self, crop_size=(14, 14), **kwargs):
self.crop_size = crop_size
super(RoiAlign, self).__init__(**kwargs)
def map_to_level(self, boxes, canonical_size=224, canonical_level=1, min_level=0, max_level=4):
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
w = x2 - x1
h = y2 - y1
size = keras.backend.sqrt(w * h)
levels = backend.floor(canonical_level + backend.log2(size / canonical_size + keras.backend.epsilon()))
levels = keras.backend.clip(levels, min_level, max_level)
return levels
def call(self, inputs, **kwargs):
# TODO: Support batch_size > 1
image_shape = keras.backend.cast(inputs[0], keras.backend.floatx())
boxes = keras.backend.stop_gradient(inputs[1][0])
scores = keras.backend.stop_gradient(inputs[2][0])
fpn = [keras.backend.stop_gradient(i[0]) for i in inputs[3:]]
# compute from which level to get features from
target_levels = self.map_to_level(boxes)
# process each pyramid independently
rois = []
ordered_indices = []
for i in range(len(fpn)):
# select the boxes and classification from this pyramid level
indices = keras_retinanet.backend.where(keras.backend.equal(target_levels, i))
ordered_indices.append(indices)
level_boxes = keras_retinanet.backend.gather_nd(boxes, indices)
fpn_shape = keras.backend.cast(keras.backend.shape(fpn[i]), dtype=keras.backend.floatx())
# convert to expected format for crop_and_resize
x1 = level_boxes[:, 0]
y1 = level_boxes[:, 1]
x2 = level_boxes[:, 2]
y2 = level_boxes[:, 3]
level_boxes = keras.backend.stack([
(y1 / image_shape[1] * fpn_shape[0]) / (fpn_shape[0] - 1),
(x1 / image_shape[2] * fpn_shape[1]) / (fpn_shape[1] - 1),
(y2 / image_shape[1] * fpn_shape[0] - 1) / (fpn_shape[0] - 1),
(x2 / image_shape[2] * fpn_shape[1] - 1) / (fpn_shape[1] - 1),
], axis=1)
# append the rois to the list of rois
rois.append(backend.crop_and_resize(
keras.backend.expand_dims(fpn[i], axis=0),
level_boxes,
keras.backend.zeros((keras.backend.shape(level_boxes)[0],), dtype='int32'),
self.crop_size
))
# concatenate rois to one blob
rois = keras.backend.concatenate(rois, axis=0)
# reorder rois back to original order
indices = keras.backend.concatenate(ordered_indices, axis=0)
rois = keras_retinanet.backend.scatter_nd(indices, rois, keras.backend.cast(keras.backend.shape(rois), 'int64'))
return keras.backend.expand_dims(rois, axis=0)
def compute_output_shape(self, input_shape):
return (input_shape[1][0], None, self.crop_size[0], self.crop_size[1], input_shape[3][-1])
def get_config(self):
config = super(RoiAlign, self).get_config()
config.update({
'crop_size' : self.crop_size,
})
return config
4、losses.py
MaskR-CNN计算损失
# compute mask loss
mask_loss = keras.backend.binary_crossentropy(masks_target, masks)
normalizer = keras.backend.shape(masks)[0] * keras.backend.shape(masks)[1] * keras.backend.shape(masks)[2]
normalizer = keras.backend.maximum(keras.backend.cast(normalizer, keras.backend.floatx()), 1)
mask_loss = keras.backend.sum(mask_loss) / normalizer
import keras.backend
import keras_retinanet.backend
from . import backend
def mask(iou_threshold=0.5, mask_size=(28, 28)):
def _mask_conditional(y_true, y_pred):
# if there are no masks annotations, return 0; else, compute the masks loss
loss = backend.cond(
keras.backend.any(keras.backend.equal(keras.backend.shape(y_true), 0)),
lambda: keras.backend.cast_to_floatx(0.0),
lambda: _mask(y_true, y_pred, iou_threshold=iou_threshold, mask_size=mask_size)
)
return loss
def _mask(y_true, y_pred, iou_threshold=0.5, mask_size=(28, 28)):
# split up the different predicted blobs
boxes = y_pred[:, :, :4]
masks = y_pred[:, :, 4:]
# split up the different blobs
annotations = y_true[:, :, :5]
width = keras.backend.cast(y_true[0, 0, 5], dtype='int32')
height = keras.backend.cast(y_true[0, 0, 6], dtype='int32')
masks_target = y_true[:, :, 7:]
# reshape the masks back to their original size
masks_target = keras.backend.reshape(masks_target, (keras.backend.shape(masks_target)[0], keras.backend.shape(masks_target)[1], height, width))
masks = keras.backend.reshape(masks, (keras.backend.shape(masks)[0], keras.backend.shape(masks)[1], mask_size[0], mask_size[1], -1))
# TODO: Fix batch_size > 1
boxes = boxes[0]
masks = masks[0]
annotations = annotations[0]
masks_target = masks_target[0]
# compute overlap of boxes with annotations
iou = backend.overlap(boxes, annotations)
argmax_overlaps_inds = keras.backend.argmax(iou, axis=1)
max_iou = keras.backend.max(iou, axis=1)
# filter those with IoU > 0.5
indices = keras_retinanet.backend.where(keras.backend.greater_equal(max_iou, iou_threshold))
boxes = keras_retinanet.backend.gather_nd(boxes, indices)
masks = keras_retinanet.backend.gather_nd(masks, indices)
argmax_overlaps_inds = keras.backend.cast(keras_retinanet.backend.gather_nd(argmax_overlaps_inds, indices), 'int32')
labels = keras.backend.cast(keras.backend.gather(annotations[:, 4], argmax_overlaps_inds), 'int32')
# make normalized boxes
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
boxes = keras.backend.stack([
y1 / (keras.backend.cast(height, dtype=keras.backend.floatx()) - 1),
x1 / (keras.backend.cast(width, dtype=keras.backend.floatx()) - 1),
(y2 - 1) / (keras.backend.cast(height, dtype=keras.backend.floatx()) - 1),
(x2 - 1) / (keras.backend.cast(width, dtype=keras.backend.floatx()) - 1),
], axis=1)
# crop and resize masks_target
masks_target = keras.backend.expand_dims(masks_target, axis=3) # append a fake channel dimension
masks_target = backend.crop_and_resize(
masks_target,
boxes,
argmax_overlaps_inds,
mask_size
)
masks_target = masks_target[:, :, :, 0] # remove fake channel dimension
# gather the predicted masks using the annotation label
masks = backend.transpose(masks, (0, 3, 1, 2))
label_indices = keras.backend.stack([
keras.backend.arange(keras.backend.shape(labels)[0]),
labels
], axis=1)
masks = keras_retinanet.backend.gather_nd(masks, label_indices)
# compute mask loss
mask_loss = keras.backend.binary_crossentropy(masks_target, masks)
normalizer = keras.backend.shape(masks)[0] * keras.backend.shape(masks)[1] * keras.backend.shape(masks)[2]
normalizer = keras.backend.maximum(keras.backend.cast(normalizer, keras.backend.floatx()), 1)
mask_loss = keras.backend.sum(mask_loss) / normalizer
return mask_loss
return _mask_conditional