Source code for ultrayolo.ultrayolo

# -*- coding: utf-8 -*-
import numpy as np
from pathlib import Path
import tensorflow as tf
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Lambda
from tensorflow.keras.optimizers import Adam, RMSprop, SGD
from .layers.core import (DarknetBody, ResNetBody, DenseNetBody, MobileNetBody,
                          YoloHead, YoloOutput, DarknetBodyTiny)

from . import losses, helpers
from .helpers import darknet
import multiprocessing
from typing import List
import math

import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


[docs]@tf.function def non_max_suppression(outputs, anchors, masks, classes, iou_threshold, score_threshold, max_boxes_per_image, img_size): """an implementation of non max suppression Arguments: outputs {tf.tensor} -- the outputs of the yolo branches anchors {np.ndarray} -- the anchors scaled in [0,1] masks {np.ndarray} -- the list of the anchors to use classes {list} -- the list of classes iou_threshold {float} -- the minimum intersection over union threshold score_threshold {float} -- the minimum confidence score to use max_boxes_per_image {int} -- the number of maximum boxes to show img_size {int} -- the size of the image Returns: (boxes, scores, classes, valid_detections) -- a tuple of the results """ # boxes, conf, type b, c, t = [], [], [] for o in outputs: b.append(tf.reshape(o[0], (tf.shape(o[0])[0], -1, tf.shape(o[0])[-1]))) c.append(tf.reshape(o[1], (tf.shape(o[1])[0], -1, tf.shape(o[1])[-1]))) t.append(tf.reshape(o[2], (tf.shape(o[2])[0], -1, tf.shape(o[2])[-1]))) bbox = tf.concat(b, axis=1) confidence = tf.concat(c, axis=1) class_probs = tf.concat(t, axis=1) scores = confidence * class_probs boxes, scores, classes, valid_detections = tf.image.combined_non_max_suppression( boxes=tf.reshape(bbox, (tf.shape(bbox)[0], -1, 1, 4)), scores=tf.reshape(scores, (tf.shape(scores)[0], -1, tf.shape(scores)[-1])), max_output_size_per_class=max_boxes_per_image, max_total_size=max_boxes_per_image, iou_threshold=iou_threshold, score_threshold=score_threshold) return tf.math.ceil(boxes * img_size), scores, classes, valid_detections
[docs]class BaseModel(object): def __init__(self, img_shape=(None, None, 3), max_objects=100, iou_threshold=0.5, score_threshold=0.5, anchors=None, num_classes=80, base_grid_size: int = 32, backbone='DarkNet', training=False): self.img_shape = img_shape self.max_objects = max_objects self.iou_threshold = iou_threshold self.score_threshold = score_threshold self.anchors = None self.num_classes = num_classes self.base_grid_size = base_grid_size self.backbone = backbone self.training = training self.anchors_scaled = None self.masks = None self.model = None self.tiny = None
[docs] def summary(self): """return the tensorflow model summary """ self.model.summary()
[docs] def load_weights(self, path): """load: * saved checkpoints in h5 format * saved weights in darknet format Arguments: path {str} -- the path where the weights are saved backbone {str} -- the name of the backbone used """ path = Path(path) logger.info('loading checkpoint from %s', str(path.absolute())) if path.name.split('.')[-1] == 'weights': darknet.load_darknet_weights(self.model, path, self.tiny) elif path.name.split('.')[-1] == 'h5': helpers.unfreeze_checkpoint(path) self.model.load_weights(str(path.absolute()))
[docs] def get_loss_function(self, num_batches, loss_name: str = 'yolo') -> List: """utility to create the loss function Keyword Arguments: loss_fn {str} -- a literal value for Yolo and Focal loss (default: {'yolo'}) (values: {'yolo', 'focal'}) Returns: List -- [description] """ loss_function = losses.make_loss(self.num_classes, self.anchors, self.masks, self.img_shape[0], num_batches, self.iou_threshold, loss_name=loss_name) return loss_function
[docs] def get_optimizer(self, optimizer_name, lrate): """helper to create the optimizer using the class defined members Arguments: optimizer_name {str} -- the name of the optimizer to use: (values: adam, rmsprop, sgd) lrate {float} -- a valid starting value for the learning rate Raises: Exception: raise an exception if the optimizer is not supported Returns: tensorflow.keras.optimizer -- an instance of the selected optimizer """ logger.info('using %s optimize', optimizer_name) if optimizer_name == 'adam': return Adam(learning_rate=lrate, clipvalue=1) elif optimizer_name == 'rmsprop': return RMSprop(learning_rate=lrate, clipvalue=1) elif optimizer_name == 'sgd': return SGD(learning_rate=lrate, momentum=0.95, nesterov=True, clipvalue=1) else: raise Exception(f'not valid optimizer {optimizer_name}')
[docs] def set_mode_train(self): """unfreeze all the net read for a full training """ darknet.unfreeze(self.model)
[docs] def set_mode_transfer(self): """freeze the backbone of the network, check that the head and output layers are unfreezed """ # be sure the model is totally unfreezed darknet.unfreeze(self.model) logger.info('freeze backbone') darknet.freeze_backbone(self.model)
[docs] def set_mode_fine_tuning(self, num_layers_to_train): """unfreeze the backbone and freeze the first `num_layers_to_train` layers Arguments: num_layers_to_train {[type]} -- [description] """ darknet.unfreeze(self.model) darknet.freeze_backbone_layers(self.model, num_layers_to_train)
[docs] def compile(self, optimizer, loss, run_eagerly, summary=True): """compile the model Arguments: optimizer {tf.keras.optimizers} -- a valid tensorflow optimizer loss {ultrayolo.losses.Loss} -- the loss function for yolo run_eagerly {bool} -- if True is uses eager mode, that is you can see more explainable stack traces Keyword Arguments: summary {bool} -- if True print the summary of the model (default: {True}) """ self.model.compile(optimizer, loss, run_eagerly=run_eagerly) if summary: self.model.summary()
[docs] def fit(self, train_dataset, val_dataset, epochs, initial_epoch=0, callbacks=None, workers=1, max_queue_size=64): """train the model Arguments: train_dataset {ultrayolo.datasets.Dataset} -- an instance of the dataset val_dataset {ultrayolo.datasets.Dataset} -- an instance of the dataset epochs {int} -- the number of epochs Keyword Arguments: initial_epoch {int} -- the inital epoch (default: {0}) callbacks {[type]} -- a list of callbacks for the model (default: {None}) workers {int} -- the number of workers (default: {1}) max_queue_size {int} -- the max size of the queue (default: {64}) Returns: [type] -- [description] """ logger.info('training for %s epochs on the dataset %s', epochs, str(train_dataset.base_path.absolute())) if workers == -1: workers = multiprocessing.cpu_count() use_multiprocessing = False if workers > 1: use_multiprocessing = True return self.model.fit(train_dataset, epochs=epochs, validation_data=val_dataset, callbacks=callbacks, workers=workers, use_multiprocessing=use_multiprocessing, max_queue_size=64, initial_epoch=initial_epoch, verbose=1)
[docs] def save(self, path, save_format='h5'): """save the model to the given path Arguments: path {str|pathlib.Path} -- the path to save the checkpoint """ path = str(Path(path).absolute()) self.model.save(path, save_format=save_format)
[docs] def predict(self, x): return self.model.predict(x)
def __call__(self, x): return self.model(x)
[docs]class YoloV3(BaseModel): default_masks = np.array([[6, 7, 8], [3, 4, 5], [0, 1, 2]]) default_anchors = np.array([(10, 13), (16, 30), (33, 23), (30, 61), (62, 45), (59, 119), (116, 90), (156, 198), (373, 326)], np.float32) def __init__(self, img_shape=(None, None, 3), max_objects=100, iou_threshold=0.5, score_threshold=0.5, anchors=None, num_classes=80, base_grid_size: int = 32, backbone='DarkNet', training=False): """Yolo 3 classs Keyword Arguments: img_shape {tuple} -- the tuple (Height, Width, Channel) to represent the target image shape (default: {(None, None, 3)}) (default: {(None, None, 3)}) max_objects {int} -- the maximum number of objects that can be detected (default: {100}) iou_threshold {float} -- the intersection over union threshold used to filter out the multiple boxes for the same object (default: {0.5}) score_threshold {float} -- the minimum confidence score for the output (default: {0.5}) anchors {[type]} -- the list of the anchors used for the detection (default: {None}) num_classes {int} -- the number of classes (default: {80}) base_grid_size {int} -- the grid used for the images in pixels (default: {32}) backbone {str} -- a valid backbone among the following: (default: {'DarkNet'}) * 'DarkNet', 'DarknetTiny' * 'ResNet50V2', 'ResNet101V2', 'ResNet152V2' * 'DenseNet121', 'DenseNet169', 'DenseNet201' * 'MobileNetV2' training {bool} -- True if the model is used for training (default: {False}) """ super().__init__(img_shape, max_objects, iou_threshold, score_threshold, anchors, num_classes, base_grid_size, backbone, training) self.masks = self.default_masks if anchors is None: self.anchors = YoloV3.default_anchors else: self.anchors = anchors.astype(np.float32) self.anchors_scaled = self.anchors / img_shape[1] x = inputs = Input(shape=img_shape) if backbone == 'DarkNet': x36, x61, x = DarknetBody(name=backbone)(x) elif 'ResNet' in backbone: x36, x61, x = ResNetBody(img_shape, version=backbone)(x) elif 'DenseNet' in backbone: x36, x61, x = DenseNetBody(img_shape, version=backbone)(x) elif 'MobileNet' in backbone: x36, x61, x = MobileNetBody(img_shape, version=backbone)(x) # created copies because tensorflow cannot save models with object # thread loc variables masks = self.masks.copy() anchors = self.anchors.copy() anchors_scaled = self.anchors_scaled.copy() if int(img_shape[0] / base_grid_size) == 0: raise ValueError( f'the base_grid_size={base_grid_size} is too large') num_pooling = int(math.log2(base_grid_size) - math.log2(32)) print('num pooling', num_pooling) x = YoloHead(x, 512, name='yolo_head_0') output0 = YoloOutput(x, 512, len(masks[0]), num_classes, num_pooling, name='yolo_output_0') x = YoloHead((x, x61), 256, name='yolo_head_1') output1 = YoloOutput(x, 256, len(masks[1]), num_classes, num_pooling, name='yolo_output_1') x = YoloHead((x, x36), 128, name='yolo_head_2') output2 = YoloOutput(x, 128, len(masks[2]), num_classes, num_pooling, name='yolo_output_2') if training: self.model = Model(inputs, [output0, output1, output2], name='yolov3') else: boxes0 = Lambda(lambda x: losses.process_predictions( x, num_classes, tf.gather(anchors_scaled, masks[0])), name='yolo_boxes_0')(output0) boxes1 = Lambda(lambda x: losses.process_predictions( x, num_classes, tf.gather(anchors_scaled, masks[1])), name='yolo_boxes_1')(output1) boxes2 = Lambda(lambda x: losses.process_predictions( x, num_classes, tf.gather(anchors_scaled, masks[2])), name='yolo_boxes_2')(output2) outputs = Lambda(lambda x: non_max_suppression( x, anchors_scaled, masks, num_classes, iou_threshold, score_threshold, max_objects, img_shape[0]), name='yolo_nms')( (boxes0[:3], boxes1[:3], boxes2[:3])) self.model = Model(inputs, outputs, name='yolov3')
[docs]class YoloV3Tiny(BaseModel): default_anchors = np.array([(10, 14), (23, 27), (37, 58), (81, 82), (135, 169), (344, 319)], np.float32) default_masks = np.array([[3, 4, 5], [0, 1, 2]]) def __init__(self, img_shape=(None, None, 3), max_objects=100, iou_threshold=0.7, score_threshold=0.7, anchors=None, num_classes=80, base_grid_size: int = 32, backbone='DarknetTiny', training=False): """The class that implement yolo v3 tiny Keyword Arguments: img_shape {tuple} -- the tuple (Height, Width, Channel) to represent the target image shape (default: {(None, None, 3)}) max_objects {int} -- the maximum number of objects that can be detected (default: {100}) iou_threshold {float} -- the intersection over union threshold used to filter out the multiple boxes for the same object (default: {0.7}) score_threshold {float} -- the minimum confidence score for the output (default: {0.7}) anchors {np.ndarray} -- the list of the anchors used for the detection (default: {None}) num_classes {int} -- the number of classes (default: {80}) training {bool} -- True if the model is used for training (default: {False}) """ super().__init__(img_shape, max_objects, iou_threshold, score_threshold, anchors, num_classes) self.masks = self.default_masks if anchors is None: self.anchors = YoloV3.default_anchors.copy() self.anchors = self.anchors.astype(np.float32) self.anchors_scaled = self.anchors / img_shape[1] self.training = training self.tiny = True x = inputs = Input(shape=img_shape) x8, x = DarknetBodyTiny(name='DarkNet')(x) num_pooling = int(math.log2(base_grid_size) - math.log2(32)) print('num pooling', num_pooling) x = YoloHead(x, 256, name='yolo_head_0', is_tiny=True) output0 = YoloOutput(x, 256, len(self.masks[0]), self.num_classes, num_pooling, name='yolo_output_0') x = YoloHead((x, x8), 128, name='yolo_head_1', is_tiny=True) output1 = YoloOutput(x, 128, len(self.masks[1]), self.num_classes, num_pooling, name='yolo_output_1') if training: self.model = Model(inputs, [output0, output1], name='yolov3') else: boxes0 = Lambda(lambda x: losses.process_predictions( x, self.num_classes, self.anchors_scaled[self.masks[0]]), name='yolo_boxes_0')(output0) boxes1 = Lambda(lambda x: losses.process_predictions( x, self.num_classes, self.anchors_scaled[self.masks[1]]), name='yolo_boxes_1')(output1) outputs = Lambda(lambda x: non_max_suppression( x, self.anchors_scaled, self.masks, self.num_classes, self. iou_threshold, self.score_threshold, self.max_objects, self. img_shape[0]), name='yolo_nms')((boxes0[:3], boxes1[:3])) self.model = Model(inputs, outputs, name='yolov3_tiny')