# -*- python -*-
# -*- coding: utf-8 -*-
"""eartrack library provides useful functions to track ear on maize
"""
import os
import math as m
import multiprocessing as mp
import numpy as np
import cv2
from matplotlib.figure import Figure
from matplotlib.backends.backend_agg import FigureCanvasAgg
from skimage import measure
from skimage.morphology import skeletonize, medial_axis, label
from skimage import graph
# import matplotlib.pyplot as plt
import openalea.eartrack.binarisation as bin
writing_semaphore = mp.BoundedSemaphore()
[docs]def top_analysis(top_binary_img, existing_angles, center_mask):
""" Top image analysis
Analyse top binary image to determine best side view images allowing to
see the stem and find ear
:param top_binary_img: (numpy array of uint8) representing binary image
:param existing_angles: (list of int) list of existing angle for this
snapshot
:param center_mask: (numpy array of uint8) mask representing the center of
image to know if a leave can be considered as obstructing
:return:
(list of int) informative angles of view to analyse
(numpy array of uint8) result image for log
(string) log to write
"""
log = ""
# Determination of most informative angles for ear tracking
result_img, alpha90, alpha270, exclusions = \
get_view_angles(top_binary_img[::-1, ::-1], center_mask)
if alpha90 == -1 and alpha270 == -1:
log += "Binarisation error for top view image\n\n"
return list(), result_img, log
# TODO : refaire cette méthode plus proprement
existing_angles.sort()
angles_to_keep = list()
for angle in (alpha90, alpha270):
if angle > 345:
angle -= 360
for i in range(len(existing_angles)):
if abs(existing_angles[i] - angle) <= 10:
if i > 0:
angles_to_keep.append(existing_angles[i-1])
else:
angles_to_keep.append(existing_angles[len(existing_angles)-1])
angles_to_keep.append(existing_angles[i])
if i < len(existing_angles)-1:
angles_to_keep.append(existing_angles[i+1])
else:
angles_to_keep.append(existing_angles[0])
break
elif abs(existing_angles[i] - angle) <= 15:
angles_to_keep.append(existing_angles[i])
if existing_angles[i] < angle:
if i < len(existing_angles)-1:
angles_to_keep.append(existing_angles[i+1])
else:
angles_to_keep.append(existing_angles[0])
else:
if i > 0:
angles_to_keep.append(existing_angles[i-1])
else:
angles_to_keep.append(existing_angles[len(existing_angles)-1])
angles_to_keep.sort()
# Exclude some angles which could have leaves hamper the stem detection
excluded_angles = list()
for exclude_angle in exclusions:
exclude_negatives_angles = 1000
if exclude_angle > 335:
exclude_negatives_angles = exclude_angle - 360
i = 0
while i < len(angles_to_keep):
if abs(exclude_angle - angles_to_keep[i]) < 25:
excluded_angles.append(angles_to_keep.pop(i))
elif abs(exclude_negatives_angles - angles_to_keep[i]) < 25:
excluded_angles.append(angles_to_keep.pop(i))
else:
i += 1
# Log intermediate and final data on top images analyse
log += "Top view analyzed : alpha = " + str(alpha90) + ", alpha2 = " + \
str(alpha270)
if len(exclusions):
log += "\nExcluded angles : " + ";".join(map(str, exclusions))
log += "\n\n"
log += "Interesting angles : " + \
";".join(map(str, sorted(angles_to_keep + excluded_angles))) + "\n"
log += "Final kept angles : " + ";".join(map(str, angles_to_keep)) + "\n"
if not len(angles_to_keep):
log += "All views has been excluded\n"
return angles_to_keep, result_img, log
[docs]def side_analysis(binary_img, color_img, angle, pot_height, pot_center):
""" Side image analysis for ear tracking
Perform the analysis of side view maize plant's image to extract ear
position
:param binary_img: (numpy array of uint8) binary image
:param color_img: (numpy array of uint8) color image in BGR matrix
:param angle: (int) view angle of the image
:param pot_height: (int) height position of the top of the pot
:param pot_center: (int) width position of the center of the pot
:return: positions: (np array of uint numpy array) Kept position(s) as
probable(s) ear(s), each position as [x, y, angle]
useful_images: (np array of str) ids of images corresponding to
each position
log: (string) log to write
img_debug: (list of numpy array) list of output images from
different stages of calculation
"""
positions = np.empty([0, 3], 'int')
useful_images = np.empty([0], 'int')
log = ""
img_debug = dict()
image_name = "side_" + str(angle) + ".png"
log += "\n-----------------------------\n"
log += "Loading " + image_name + "\n"
''' LOADING BINARY AND ORIGINAL IMAGE '''
binary_img = bin.close(binary_img, iterations=4)
name, ext = os.path.splitext(image_name)
# Get the biggest region
biggest_binary_region = binary_biggest_region(binary_img)
# Extract skeleton of plant
output_skeleton_img = get_skeleton(biggest_binary_region)
# Extract distance transform
dist_trans_img = distance_transform(biggest_binary_region)
# skimage's graph library and skeleton cleaning
begin, end = get_endpoints(output_skeleton_img, pot_center, pot_height)
if begin == [-1, -1]:
log += "Error in bottom's stem detection\n\n"
return positions, useful_images, log, img_debug
output_skeleton_img = skeleton_cleaning(output_skeleton_img, begin)
route = find_cross_route(output_skeleton_img, begin)
route.reverse()
# Make color image with distance transform '''
output_dt_img = dist_trans_img*255/dist_trans_img.max()
output_dt_img = output_dt_img.astype(int)
# Make image binary and skeletons
output_binary_img = np.zeros(color_img.shape, 'uint8')
output_binary_img[:, :, 0] = biggest_binary_region
output_binary_img[:, :, 1] = biggest_binary_region
output_binary_img[:, :, 2] = biggest_binary_region
for pix in route:
output_binary_img[pix[0], pix[1]-2:pix[1]+2, :] = (0, 0, 255)
# Get main direction of stem, rotate the stem and adapt on it the
# following derivation algorithme
init_stem = np.zeros(biggest_binary_region.shape, 'uint8')
for pix in route:
mask = dist_trans_img[pix[0], pix[1]]
init_stem[pix[0]-mask:pix[0]+mask+1, pix[1]-mask:pix[1]+mask+1] = 255
output_stem_img, a, b, r_xy, alpha = majors_axes_regression_line(init_stem)
# Perform derivation on route to
diff, x, y = derivate(route)
# Eliminate noise on derivation curve
indices = differential_cleaning(diff, x, y, 10, 5, 5)
# Delete extrema error
i = len(indices)-1
while indices[i][2] == 0:
i -= 1
if x[len(y)-1] == x[indices[i][0]] or \
abs(float(y[len(y)-1] - y[indices[i][0]])/float(x[len(y)-1] -
x[indices[i][0]]) - a) > 1:
for j in range(len(indices)-1, i-1, -1):
route = route[:indices[i][0]]
indices.pop(len(indices)-1)
i = 0
while indices[i][2] == 0:
i += 1
if x[indices[i][1]] == x[0] or \
abs(float(y[indices[i][1]] - y[0])/float(x[indices[i][1]] -
x[0]) - a) > 1:
for j in range(i+1):
route = route[indices[0][1]:]
indices.pop(0)
# Stem reconstruction
cleaned_stem = np.zeros(biggest_binary_region.shape, 'uint8')
for pix in route:
mask = dist_trans_img[pix[0], pix[1]]
cleaned_stem[pix[0]-mask:pix[0]+mask+1, pix[1]-mask:pix[1]+mask+1] = \
biggest_binary_region[pix[0]-mask:pix[0]+mask+1,
pix[1]-mask:pix[1]+mask+1]
output_stem_img, a, b, r_xy, alpha = majors_axes_regression_line(cleaned_stem)
if r_xy > 30:
log += "Stem detection error\n\n"
img_debug[name + "_stem_error" + ext] = output_stem_img
return positions, useful_images, log, img_debug
skeleton_stem = np.zeros(binary_img.shape, 'uint8')
for pixel in route:
skeleton_stem[pixel] = 1
begin, end = get_endpoints(skeleton_stem, pot_center, pot_height)
if begin == [-1, -1] or end == [-1, -1]:
log += "Error in bottom or top of stem detection after cleaning stem"
return positions, useful_images, log, img_debug
route = find_route(skeleton_stem, begin, end)
# Statistics on distances curve to detect probable ear position
distances = get_distances(route, dist_trans_img)
distances_length = float(len(distances))
part_1 = int(round(len(distances)/2.5))
position = 0
solutions, stems, pics, poses = ear_detection(distances)
minus_pos = poses[0]
stem_pos_after_ear = poses[1]
kept_solutions = -1
for i in range(len(solutions)):
if solutions[i][1] > 0:
positions = np.append(positions, [[route[solutions[i][0]][0],
route[solutions[i][0]][1],
solutions[i][1]]], axis=0)
useful_images = np.append(useful_images, angle)
if kept_solutions < 0:
kept_solutions = i
position = solutions[i][0]
elif solutions[i][1] > solutions[kept_solutions][1]:
kept_solutions = i
position = solutions[i][0]
log += "Stem width bellow the ear = " + str(distances[minus_pos]) + "\n"
if kept_solutions >= 0:
log += "Stem with up to the ear = " + \
str(distances[stem_pos_after_ear]) + "\n"
log += "Probable ear position : " + str(route[position][0]) + "\n"
else:
log += "Ear detection error\n"
log += "Solutions : \n"
for sol in solutions:
log += "\tsolution : " + str(route[sol[0]][0]) + ", weight : " + \
str(sol[1]) + "\n"
log += "Peaks (leaves) : \n"
for pic in pics:
log += "\tpeak : " + str(route[pic[0]][0]) + ", begin : " + \
str(route[pic[1]][0]) + ", end : " + str(route[pic[2]][0]) + \
", relative length : " + \
str(float(pic[2] - pic[1])*100./distances_length) + "\n"
log += "Troughs (stem part) : \n"
for stem in stems:
log += "\tbegin : " + str(route[stem[0]][0]) + ", end : " + \
str(route[stem[1]][0]) + ", relative length : " + \
str(float(stem[1] - stem[0])*100./distances_length) + "\n"
log += "\n"
# The following part can fail on server without server X
width_curve = None
try:
fig = Figure()
canvas = FigureCanvasAgg(fig)
ax = fig.gca()
ax.plot(distances)
ax.plot(minus_pos, -1, 'rX')
ax.plot(stem_pos_after_ear, 0, 'gX')
ax.plot(position, -1, 'bX')
for i in pics[:, 0]:
ax.plot(i,distances[i], 'r*')
for stem in stems:
ax.plot(range(stem[0], stem[1]), distances[stem[0]:stem[1]], 'r')
im_size = fig.get_size_inches() * fig.dpi
canvas.draw()
width_curve = np.fromstring(canvas.tostring_rgb(), dtype='uint8')
width_curve = width_curve.reshape(int(im_size[1]), int(im_size[0]), 3)
width_curve = width_curve[:, :, ::-1]
except:
pass
# draw yellow square on solution
output_results_img = color_img.copy()
output_results_img[route[position][0]-31:route[position][0]+30,
route[position][1]-31:route[position][1]+30, :] = (0, 255, 255)
for i in range(len(route)):
output_binary_img[route[i][0], route[i][1]-2:route[i][1]+2, :] = (0, 255, 0)
if i < stem_pos_after_ear:
mask = distances[minus_pos]
else:
mask = distances[stem_pos_after_ear]
if distances[i] == distances[minus_pos] and i < part_1:
output_results_img[route[i][0], route[i][1]-mask:route[i][1]+mask+1, 0] \
= 255
elif distances[i] == distances[stem_pos_after_ear] \
and i >= part_1 and position:
output_results_img[route[i][0], route[i][1]-mask:route[i][1]+mask+1, 1] \
= 255
else:
output_results_img[route[i][0], route[i][1]-mask:route[i][1]+mask+1, 2] \
= 255
output_binary_img[route[position][0]-5:route[position][0]+5,
route[position][1]-5:route[position][1]+5, :] = (0, 0, 255)
output_binary_img[route[minus_pos][0]-5:route[minus_pos][0]+5,
route[minus_pos][1]-5:route[minus_pos][1]+5, :] = (255, 0, 0)
# save images
img_debug[name + "_distance_transform" + ext] = output_dt_img
img_debug[name + "_result" + ext] = output_results_img
img_debug[name + "_binary" + ext] = output_binary_img
img_debug[name + "_cleaned_stem" + ext] = output_stem_img
img_debug[name + "_skeleton" + ext] = output_skeleton_img * 255
if isinstance(width_curve, np.ndarray):
img_debug[name + "_width_curve" + ext] = width_curve
# Log distance values
log += "Stem width : " + ';'.join(map(str, distances)) + "\n"
return positions, useful_images, log, img_debug
[docs]def get_skeleton(binary_image):
""" Perform skeleton on image
Use skimage medial axis to perform skeleton on binary image
:param binary_image: (numpy 2D array of binary uint8) binary image to
perform skeleton
:return: (numpy 2D array of binary uint8) binary image of skeleton
"""
return (medial_axis(binary_image > 0)).astype(int)
[docs]def binary_biggest_region(binary_image):
""" Look for the biggest object on a binary image
:param binary_image: (numpy 2D array of binary uint8) binary image to
analyse
:return: (numpy 2D array of binary uint8) binary image containing only the
biggest object
"""
biggest = 0
lab = 0
labelled_img = measure.label(binary_image, neighbors=8)
for region in measure.regionprops(labelled_img):
if region['area'] > biggest and \
binary_image[region['coords'][0][0], region['coords'][0][1]]:
biggest = region['area']
lab = region['label']
return binary_image * (labelled_img == lab)
[docs]def get_endpoints(skeleton, center, height):
""" Look for stem extremities
Try to find the bottom and upper node of the stem in a maize plant
:param skeleton: (numpy 2D array of binary uint8) representing the skeleton
of side view image of a maize plant
:param center: (int) pixel in the width center of the pot
(depending on the plateform and the calibration)
:param height: (int) pixel in the height top of the pot
(depending on the plateform and the calibration)
:return: (list of 2 int) pixel of the bottom of the stem
(list of 2 int) pixel of the top of the stem
"""
mini = skeleton.shape[0]
down_look_for = height
skelet = np.where(skeleton > 0)
up_node = [-1, -1]
down_node = [-1, -1]
for i in range(0, skelet[0].shape[0]):
node = tuple([skelet[0][i], skelet[1][i]])
if node[0] < mini or \
(node[0] == mini and
skeleton[node[0], node[1]-1:node[1]+2].all()):
mini = node[0]
up_node = node
loop_again = 1
while loop_again:
if skeleton[down_look_for, :].any():
indices = np.where(skeleton[down_look_for, :])
loop_again = 0
for y in indices[0]:
if abs(y - center) < abs(down_node[1] - center):
down_node = [down_look_for, y]
down_look_for -= 1
if down_look_for < 1500:
loop_again = 0
return down_node, up_node
[docs]def skeleton_cleaning(skeleton, begin):
""" Clean the skeleton
:param skeleton: (numpy 2D array of binary uint8) representing the skeleton
of side view image of maize plant
:param begin: bottm of stem
:return: (numpy 2D array of binary uint8) representing cleaned skeleton
"""
cleaned_skeleton = np.array(skeleton)
skeleton_inverted = np.array(skeleton, 'float')
skeleton_inverted[skeleton_inverted == 0] = np.Inf
mcp = graph.MCP(skeleton_inverted)
cc, t = mcp.find_costs([begin])
s = np.where(skeleton)
cross = list()
ends = list()
for i in range(len(s[0])):
pattern = skeleton[s[0][i] - 1:s[0][i] + 2, s[1][i] - 1:s[1][i] + 2]
if len(np.where(pattern > 0)[0]) > 3:
cross.append([s[0][i], s[1][i]])
elif len(np.where(pattern > 0)[0]) < 3:
ends.append([s[0][i], s[1][i]])
for end in ends:
temp = list()
current = end
prec_in_cross = 0
loop_again = 1
while loop_again:
direction = t[current[0], current[1]]
if direction == -1:
break
temp.append(current)
a = np.zeros([8])
a[direction] = 1
a = np.insert(a, 4, 0)
a = a[::-1]
a = a.reshape([3, 3])
next_one = np.where(a == 1)
current = [current[0] + next_one[0][0]-1,
current[1] + next_one[1][0]-1]
if current in cross:
prec_in_cross = 1
else:
if prec_in_cross:
temp.pop()
loop_again = 0
if len(temp) < 100:
for pixel in temp:
cleaned_skeleton[pixel[0], pixel[1]] = 0
return cleaned_skeleton
[docs]def find_route(skeleton, begin, end):
""" Perform shortest path algorithm on skeleton image
Find the shortest route on a skeleton between 2 pixels using graph
shortest path algorithm
:param skeleton: (numpy 2D array of binary uint8) representing the skeleton
of side view image of a maize plant
:param begin: (list of 2 int) pixel of the bottom of the stem
:param end: (list of 2 int) pixel of the top of the stem
:return: (list of list of 2 int) list of all the pixels to follow to get
the shortest path between begin and end
"""
skeleton_inverted = np.array(skeleton)
skeleton_inverted[skeleton_inverted == 0] = 255
return graph.route_through_array(skeleton_inverted, begin, end)[0]
[docs]def find_cross_route(skeleton, begin):
""" Perform shortest path algorithm on skeleton image unknowing upper node
Find the shortest route on a skeleton between a beginning pixel and the
upper cross on the skeleton using graph shortest path algorithm
:param skeleton: (numpy 2D array of binary uint8) representing the skeleton
of side view image of a maize plant
:param begin: (list of 2 int) pixel of the bottom of the stem
:return: (list of list of 2 int) list of all the pixels to follow to get
the shortest path between begin and upper cross
"""
s = np.where(skeleton)
end = skeleton.shape
for i in range(len(s[0])):
if len(np.where(skeleton[s[0][i]-1:s[0][i]+2,
s[1][i]-1:s[1][i]+2] > 0)[0]) > 3:
if s[0][i] < end[0]:
end = [s[0][i], s[1][i]]
skeleton_inverted = np.array(skeleton)
skeleton_inverted[skeleton_inverted == 0] = 255
return graph.route_through_array(skeleton_inverted, begin, end)[0]
[docs]def get_distances(route, distance_transform_img):
""" Get the distances transform values along a route
'route' are coordinates in the 'distance_transform_img' shape.
:param route: (list of list of 2 int) list of all the pixels to follow a
route on image
:param distance_transform_img: (numpy 2D array of uint8) binary image
transformed in distances
:return: (list of int) representing the distances values all along the route
"""
distances = list()
for pixel in route:
distances.append(distance_transform_img[pixel])
return distances
[docs]def derivate(route):
""" Perform discrete derivative on a curve
Perform discrete derivative on a route in order to analyse variation of
directions
:param route: (list of list of 2 int) list of all the pixels to follow a
route on image
:return: diff: (list of int) values in [-1, 0, 1] representing the variation
of the route
x: (list of int) x original position of each diff value
y: (list of int) y original position of each diff value
"""
longueur = len(route)
x = np.zeros([1, 0], 'int')
y = np.zeros([1, 0], 'int')
diff = np.zeros([1, 0], 'float')
i = 0
while i < longueur-1:
superior_index = 1
x = np.append(x, route[i][0])
y = np.append(y, route[i][1])
while route[i+superior_index][0] >= route[i][0]:
x = np.append(x, route[i+superior_index][0])
y = np.append(y, route[i+superior_index][1])
diff = np.append(diff, float(route[i+superior_index][1] -
route[i+superior_index-1][1]))
superior_index += 1
if i + superior_index == len(route):
break
if i + superior_index < len(route):
if i == 0:
diff = np.append(diff, float(route[superior_index][1] -
route[0][1]) /
float(route[superior_index][0] - route[0][0]))
else:
if superior_index == 1:
diff = np.append(diff, float(route[i+superior_index][1] -
route[i][1]) /
float(route[i+superior_index][0] -
route[i][0]))
else:
diff = np.append(diff,
np.sign(float(route[i+superior_index][1] -
route[i][1]) /
float(route[i+superior_index][0] -
route[i][0]))*np.Inf)
else:
diff = np.append(diff, 1.)
i += superior_index
if not route[longueur-1][0] == route[longueur-2][0]:
x = np.append(x, route[longueur-1][0])
y = np.append(y, route[longueur-1][1])
diff = np.append(diff, float(route[longueur-1][1] -
route[longueur-2][1]) /
float(route[longueur-1][0] - route[longueur-2][0]))
return diff, x, y
[docs]def differential_cleaning(diff, x, y, max_space, min_length, min_height):
""" Clean derivatives values
Analyse derivatives values to keep only the significant variations
:param diff: (list of int) values in [-1, 0, 1] representing the variation
of a route
:param x: (list of int) x original position of each diff value
:param y: (list of int) y original position of each diff value
:param max_space: (int) max length (in pixels) of diff null to reckon that
the increase or decrease is no longer the same variation
:param min_length: (int) minimum length of variation to reckon that the
variation is significant
:param min_height: minimum height of variation to reckon that the
variation is significant
:return: (list of 3 int list) describing the diff values by parts of same
variation [[begin, end, variation]]
"""
# first loop to separate variations
indices = list()
begin = -1
end = -1
direction = 0
for i in range(0, len(diff)):
if not diff[i] == 0:
if begin > -1 and direction*diff[i] > 0:
end = i
if i == len(diff)-1:
indices.append(list([begin, end, direction]))
else:
if begin > -1 and direction*diff[i] < 0:
indices.append(list([begin, end+1, direction]))
if diff[i] > 0:
direction = 1
else:
direction = -1
begin = i
end = i
else:
if end > -1:
if abs(x[i] - x[end]) > max_space or i == len(diff)-1:
indices.append(list([begin, end+1, direction]))
begin = -1
end = -1
direction = 0
# second loop to group sames variations together
good_index = list()
end = 0
for i in indices:
if end < i[0]:
if len(good_index) > 0 and good_index[len(good_index)-1][2] == 0:
good_index[len(good_index)-1][1] = i[0]
else:
good_index.append(list([end, i[0], 0]))
end = i[0]
if abs(x[i[1]] - x[i[0]]) > min_length \
or abs(y[i[1]] - y[i[0]]) > min_height:
good_index.append(i)
end = i[1]
# Write small plane zone which should have been eliminate beacause of its
# small length
if end < indices[len(indices)-1][1]:
if len(good_index) > 0 and good_index[len(good_index)-1][2] == 0:
good_index[len(good_index)-1][1] = indices[len(indices)-1][1]
else:
good_index.append(list([end, indices[len(indices)-1][0], 0]))
end = indices[len(indices)-1][1]
# Write last plane zone if exists
if indices[len(indices)-1][1] < len(diff)-1:
if len(good_index) > 0 and good_index[len(good_index)-1][2] == 0:
good_index[len(good_index)-1][1] = len(diff)-1
else:
good_index.append(list([indices[len(indices)-1][1],
len(diff)-1, 0]))
return good_index
[docs]def differential_separate(x, y, indices):
""" Deep analysis of derivatives values
Go deeper in derivatives values analyse to find different fast of increase and
decrease in order to detect increases and decreases even on inclined stem
:param x: (list of int) x original position of each diff value
:param y: (list of int) y original position of each diff value
:param indices: (list of 3 int list) describing the differentials values by
parts of same variation [[begin, end, variation]]
:return: new_indexes : (list of 3 int list) describing new variations
total_means : (list of float) slope of each part of 'new_indexes'
"""
new_indexes = list()
total_means = list()
for ind in indices:
direction = ind[2]
if not direction == 0:
tab = list([list(ind)])
while tab[0][1] - tab[0][0] > 10:
temp = list(tab)
tab = list()
for elem in temp:
longueur = int(round((elem[1] - elem[0])/2))
tab.append(list([elem[0], elem[0]+longueur, direction]))
tab.append(list([elem[0]+longueur, elem[1], direction]))
means = list()
for elem in tab:
if x[elem[1]] - x[elem[0]] > 0:
means.append(float(y[elem[1]] - y[elem[0]]) /
float(x[elem[1]] - x[elem[0]]))
else:
means.append(np.sign(float(y[elem[1]] - y[elem[0]]))*np.Inf)
loop_again = 1
while loop_again:
loop_again = 0
i = 0
while 1:
if i+1 < len(means):
if abs(means[i]) == np.inf \
or abs(means[i] - means[i+1]) < 0.2:
tab[i][1] = tab[i+1][1]
if x[tab[i][1]] - x[tab[i][0]] > 0:
means[i] = float(y[tab[i][1]] - y[tab[i][0]]) /\
float(x[tab[i][1]] - x[tab[i][0]])
else:
means[i] = np.sign(float(y[tab[i][1]] -
y[tab[i][0]]))*np.Inf
tab.pop(i+1)
means.pop(i+1)
loop_again = 1
elif abs(means[i+1]) == np.inf:
if i+2 < len(means):
if abs(means[i+2]) > abs(means[i]):
i += 1
else:
tab[i][1] = tab[i+1][1]
if x[tab[i][1]] - x[tab[i][0]] > 0:
means[i] = float(y[tab[i][1]] - y[tab[i][0]])/float(x[tab[i][1]] - x[tab[i][0]])
else:
means[i] = np.sign(float(y[tab[i][1]] - y[tab[i][0]]))*np.Inf
tab.pop(i+1)
means.pop(i+1)
loop_again = 1
else:
tab[i][1] = tab[i+1][1]
if x[tab[i][1]] - x[tab[i][0]] > 0:
means[i] = float(y[tab[i][1]] - y[tab[i][0]])/float(x[tab[i][1]] - x[tab[i][0]])
else:
means[i] = np.sign(float(y[tab[i][1]] - y[tab[i][0]]))*np.Inf
tab.pop(i+1)
means.pop(i+1)
loop_again = 1
else:
i += 1
else:
break
for i in range(len(tab)):
new_indexes.append(tab[i])
total_means.append(means[i])
else:
if ind[1] - ind[0] < 4 and len(new_indexes):
new_indexes[len(new_indexes) - 1][1] = ind[1]
else:
new_indexes.append(ind)
total_means.append(0)
return new_indexes, total_means
[docs]def majors_axes_regression_ww(pixels):
""" Performs a major axis regression on 2D distributed dots
:param pixels: (np array of 2 np array of int) distributed dots to perform
regression
:return: a: (float) slope of regression line
b: (float) intercept of regression line
mean_error: (float) mean error of dots to regression line
"""
values = np.transpose(np.array([pixels[0], pixels[1]]))
mean_values = np.mean(values, 0)
s_xy = ((values[:, 0]-mean_values[0]) *
(values[:, 1]-mean_values[1])).sum()
s_xx = np.power(values[:, 0]-mean_values[0], 2).sum()
s_yy = np.power(values[:, 1]-mean_values[1], 2).sum()
if s_xy > 0:
a = m.sqrt(s_yy/s_xx)
else:
a = -m.sqrt(s_yy/s_xx)
b = mean_values[1] - a*mean_values[0]
errors = np.array(abs(values[:, 1] - a * values[:, 0] - b))
mean_error = np.mean(errors)
return a, b, mean_error
[docs]def majors_axes_regression_line(binary_img):
""" Performs a major axis regression on binary image
True pixels of image are used as distributed dots
:param binary_img: (numpy 2D binary uint8 array) binary image to perform
regression
:return: result: (numpy 3D uint8 array) color image with regression line
draws on it
a: (float) slope of regression line
b: (float) intercept of regression line
mean_error: (float) mean error of pixels to regression line
alpha: angle of regression line (in degrees)
"""
result = np.zeros([binary_img.shape[0], binary_img.shape[1], 3], 'uint8')
result[:, :, 0] = np.array(binary_img)
result[:, :, 1] = np.array(binary_img)
result[:, :, 2] = np.array(binary_img)
pixels = np.where(binary_img > 0)
n = len(pixels[0])
if n:
a, b, errors_means = majors_axes_regression_ww(pixels)
alpha = (m.atan2(a/m.sqrt(m.pow(a, 2)+1),
1/m.sqrt(m.pow(a, 2)+1)))*180/m.pi
cv2.line(result, (int(b + a*pixels[0][0]), pixels[0][0]),
(int(b + a*pixels[0][n-1]), pixels[0][n-1]), (0, 0, 255), 2)
return result, a, b, errors_means, alpha
[docs]def robust_majors_axes_regression_ww(pixels):
""" Performs a robust major axis regression on 2D distributed dots
Robustness come from 'hinich et al.' algorithm
:param pixels: (np array of 2 np array of int) distributed dots to perform
regression
:return: a: (float) slope of robust regression line
b: (float) intercept of robust regression line
useful_pixels: (np array of 2 np array of int) dots kept by robust
regression
useless_pixels: (np array of 2 np array of int) dots ousted by
robust regression
"""
n = len(pixels[0])
a = b = 0
useless_pixels = np.empty([0, 2], 'int')
useful_pixels = np.transpose(np.array([pixels[0], pixels[1]]))
values = useful_pixels[np.random.randint(n, size=int(n/2)), :]
loop_again = 1
while loop_again:
mean_values = np.mean(values, 0)
s_xy = ((values[:, 0]-mean_values[0]) *
(values[:, 1]-mean_values[1])).sum()
s_xx = np.power(values[:, 0]-mean_values[0], 2).sum()
s_yy = np.power(values[:, 1]-mean_values[1], 2).sum()
if s_xy > 0:
a = m.sqrt(s_yy/s_xx)
else:
a = - m.sqrt(s_yy/s_xx)
b = mean_values[1] - a*mean_values[0]
errors = np.array(abs(useful_pixels[:, 1] - a*useful_pixels[:, 0] - b))
sorted_errors = np.sort(errors)
u28 = sorted_errors[int(round(28*n/100))]
u72 = sorted_errors[int(round(72*n/100))]
s = (u72 - u28)/1.654
loop_again = 0
pixels_to_delete = np.where(errors > 4*s)[0]
if pixels_to_delete.shape[0]:
useless_pixels = np.append(useless_pixels,
useful_pixels[pixels_to_delete, :],
axis=0)
useful_pixels = np.delete(useful_pixels, pixels_to_delete, axis=0)
loop_again = 1
values = np.array(useful_pixels)
n = values.shape[0]
return a, b, useful_pixels, useless_pixels
[docs]def get_view_angles(binary_img, mask):
""" Extract interesting view angles from top image
:param binary_img: (numpy array of uint8) representing binary image
:param mask: (numpy array of uint8) mask representing the center of
image to know if a leave can be considered as obstructing
:return:
(list of int) informative angles of view to analyse
(numpy array of uint8) result image for log
(string) log to write
"""
result = np.zeros([binary_img.shape[0], binary_img.shape[1], 3],
'uint8')
pixels = np.where(binary_img > 0)
n = len(pixels[0])
exclusions = list()
if n > 1000:
a, b, useful_pixels, useless_pixels = \
robust_majors_axes_regression_ww(pixels)
alpha = (m.atan2(a/m.sqrt(m.pow(a, 2) + 1),
1/m.sqrt(m.pow(a, 2) + 1)))*180/m.pi
a90 = -1/a
alpha90 = ((m.atan2(a90/m.sqrt(m.pow(a90, 2) + 1),
1/m.sqrt(m.pow(a90, 2) + 1)))*180/m.pi) % 360
alpha270 = (alpha90 + 180) % 360
result[useful_pixels[:, 0], useful_pixels[:, 1], :] = (255, 255, 255)
cv2.line(result, (int(b+a*pixels[0][0]), pixels[0][0]),
(int(b+a*pixels[0][n-1]), pixels[0][n-1]), (0, 0, 255), 3)
cv2.line(result, (int(b+a*pixels[0][0]), pixels[0][0]+2),
(int(b+a*pixels[0][n-1]), pixels[0][n-1]+1), (0, 0, 255), 3)
cv2.line(result, (int(b+a*pixels[0][0]), pixels[0][0]-2),
(int(b+a*pixels[0][n-1]), pixels[0][n-1]-1), (0, 0, 255), 3)
loop_again = 1
while loop_again:
loop_again = 0
temp_img = np.zeros(binary_img.shape, 'uint8')
temp_img[useless_pixels[:, 0], useless_pixels[:, 1]] = 255
useless_pixels = np.empty([0, 2], 'int')
labelled_img = measure.label(temp_img, neighbors=8)
for region in measure.regionprops(labelled_img):
pixels2 = np.where(labelled_img == region['label'])
temp_useful_pixels = \
np.transpose(np.array([pixels2[0], pixels2[1]]))
n2 = region.area
if n2 > n/20:
a2, b2, useful_pixels2, useless_pixels2 = \
robust_majors_axes_regression_ww(pixels2)
alpha2 = (m.atan2(a2/m.sqrt(m.pow(a2, 2) + 1),
1/m.sqrt(m.pow(a2, 2) + 1)))*180/m.pi
errors = np.array(abs(useful_pixels2[:, 1] -
a * useful_pixels2[:, 0] - b))
x_intersection_line = int((b - b2)/(a2 - a))
y_intersection_line = int(a*x_intersection_line + b)
useless_pixels = np.append(useless_pixels, useless_pixels2, axis=0)
if 0 <= x_intersection_line < mask.shape[0] and \
0 <= y_intersection_line < mask.shape[1]:
if abs(alpha-alpha2) > 20 and mask[x_intersection_line, y_intersection_line] and \
errors.max() > 300:
max_error_pos = np.where(errors == errors.max())[0][0]
max_signed_error = useful_pixels2[max_error_pos,1] - a * useful_pixels2[max_error_pos,0] - b
diff = alpha - alpha2
if diff*max_signed_error < 0:
alpha2 = (alpha2 + 180) % 360
else:
alpha2 %= 360
exclusions.append(alpha2)
result[useful_pixels2[:, 0],
useful_pixels2[:, 1], :] = (0, 255, 0)
cv2.line(result,
(int(b2+a2*pixels2[0][0]), pixels2[0][0]),
(int(b2+a2*pixels2[0][n2-1]), pixels2[0][n2-1]),
(255, 0, 255), 2)
cv2.line(result,
(int(b2+a2*pixels2[0][0]), pixels2[0][0]+1),
(int(b2+a2*pixels2[0][n2-1]), pixels2[0][n2-1]+1),
(255, 0, 255), 2)
cv2.line(result,
(int(b2+a2*pixels2[0][0]), pixels2[0][0]-1),
(int(b2+a2*pixels2[0][n2-1]), pixels2[0][n2-1]-1),
(255, 0, 255), 2)
else:
result[temp_useful_pixels[:, 0],
temp_useful_pixels[:, 1], :] = (0, 0, 255)
else:
result[temp_useful_pixels[:, 0],
temp_useful_pixels[:, 1], :] = (0, 0, 255)
loop_again = 1
else:
result[temp_useful_pixels[:, 0],
temp_useful_pixels[:, 1], :] = (0, 0, 255)
return result[::-1, ::-1], alpha90, alpha270, exclusions
else:
return result, -1, -1, exclusions
[docs]def robust_mean(values, images, std_error=20):
""" Look for most representative position in a small set of positions
This function perform a 'vote' between few values to extract the most
representative(s) and the corresponding images
:param values: (2 dimensional numpy float array) the vote will be perform on
first value of each 2 values array
:param images: (numpy array of string) id of image corresponding to each
value
:param std_error: (int) maximum standard error to reckon that 2 values are
in the same group
:return: means: (2 values numpy array) mean value of kept 2 values array
((-1, -1) if standard error remains more than std_error param)
values: (2 dimensional numpy float array) kept values as most
representatives
images: (numpy array of string) id of image corresponding to each
kept value
"""
means = 0
loop_again = 1
while loop_again:
loop_again = 0
means = np.mean(values, 0)
std_deviation = m.sqrt(np.power(values[:, 0]-means[0], 2).sum()/values.shape[0])
if std_deviation > std_error:
loop_again = 1
errors = abs(values[:, 0] - means[0])
if len(np.unique(errors)) == 1:
means = np.array([-1, -1])
images = images[np.unique(values[:, 0], return_index=True)[1]]
values = values[np.unique(values[:, 0],
return_index=True)[1], :]
loop_again = 0
else:
values_to_delete = np.where(errors == errors.max())[0]
values = np.delete(values, values_to_delete, 0)
images = np.delete(images, values_to_delete, 0)
if values.shape[0] <= 1:
means = np.array([-1, -1])
loop_again = 0
else:
images = images[np.unique(values[:, 0], return_index=True)[1]]
values = values[np.unique(values[:, 0], return_index=True)[1], :]
return means, values, images
[docs]def ear_detection(distances):
""" Look for ear in a stem width curve
:param distances: (list of int) representing distance transform values all
along the stem
:return: (list of list of 2 int) first value of each 2 int list is a
probable solution, second value is its weight
(list of (list of (2 int and one list))) representing parts of
distances interpreted as stem (begin, end, [values])
(list of (list of (2 int and one list))) representing parts of
distances interpreted as leaves (begin, end, [values])
(list of 2 int), width of stem under ear and upper ear
"""
distances_length = float(len(distances))
part_1 = int(round(len(distances)/2.5))
td = distances[:part_1]
td.sort()
mini = td[int(round(len(td)*15/100))]
pos_min = np.where(td == mini)[0][0]
# Look for peaks
dist_array = np.array(distances)
sorted_distances = list(distances)
sorted_distances.sort()
median = sorted_distances[int(round(part_1))]
peak_begin = 0
peaks = np.empty([0, 3], 'int')
i = 1
while i < len(distances)-1:
if distances[i] > median:
if not peak_begin:
peak_begin = i
if distances[i] > distances[i+1] and distances[i] > distances[i-1]:
peaks = np.append(peaks, [[i, peak_begin, i]], axis=0)
elif distances[i] > distances[i-1] and \
distances[i] == distances[i+1]:
while distances[i] == distances[i+1]:
i += 1
if i >= len(distances)-1:
break
if (i >= len(distances)-1) or (distances[i] > distances[i+1]):
peaks = np.append(peaks, [[i, peak_begin, i]], axis=0)
elif peak_begin:
if peaks.shape[0]:
peaks[peaks.shape[0]-1, 2] = i
peak_begin = 0
i += 1
if (i < len(distances)) and (distances[i] > distances[i-1] and
distances[i] > median):
peaks = np.append(peaks, [[i, peak_begin, i]], axis=0)
i = 0
while i < peaks.shape[0]-1:
if (dist_array[peaks[i, 0]:peaks[i+1 ,0]] > median).all() or \
(dist_array[peaks[i, 0]:peaks[i+1,0]] <= median).sum() < 10:
peaks[i+1,1] = peaks[i,1]
peaks = np.delete(peaks, i, axis=0)
else:
i += 1
peaks = peaks[np.where(peaks[:,0] >= part_1)[0],:]
# Look for hollows representative of stem
stems = list()
route_distances = list()
for i in range(len(distances)):
route_distances.append([i, distances[i]])
dist_diff, dist_x, dist_y = derivate(route_distances)
dist_array = np.array(dist_y)
dist_indexes = differential_cleaning(dist_diff, dist_x, dist_y, 10, 5, 2)
dist_new_indexes, dist_total_means = differential_separate(dist_x, dist_y,
dist_indexes)
for ind in dist_new_indexes:
if ind[0] > part_1 and ind[1] < len(distances) and ind[1] - ind[0] > 20:
if dist_array[ind[0]:ind[1]].min() <= mini and \
dist_array[ind[0]:ind[1]].max() <= median and \
ind[2] == 0:
stems.append([ind[0], ind[1],
np.mean(dist_array[ind[0]:ind[1]])])
# Group hollows
i = 0
while i < len(stems) - 1:
j = 0
while j < len(peaks) and stems[i][0] > peaks[j, 0]:
j += 1
j -= 1
if (j == peaks.shape[0] - 1) or (stems[i][0] > peaks[j, 0] and
stems[i + 1][1] < peaks[j + 1, 0]):
if abs(stems[i][2] - stems[i + 1][2]) < 3:
stems[i][1] = stems[i + 1][1]
stems.pop(i + 1)
else:
if stems[i][2] > stems[i + 1][2]:
stems.pop(i)
else:
stems.pop(i + 1)
else:
i += 1
# Save previous peak of each hollow and weighting them from criteria
# detailed in method
td = distances[part_1:]
td.sort()
# keep only percentile at 15% on stem width to eliminate possible noise
superior_min = td[int(round(len(td)*15/100))]
solutions = np.empty([0, 2], 'int')
stem_pos_after_ear = 0
best_solution = 0
first_found = False
iteration = 0
while not first_found and iteration < 2:
for stem in stems:
comparison = np.where(peaks[:, 0] < stem[0])[0]
if len(comparison):
pic = peaks[comparison[len(comparison)-1]]
solutions = np.append(solutions, [[pic[0], 0]], axis=0)
if not first_found and \
dist_array[stem[0]:stem[1]].mean() < mini:
for dist in dist_array[stem[0]:stem[1]]:
if superior_min <= dist < mini:
first_found = True
solutions[len(solutions)-1, 1] += 2
break
if 8 < float(pic[2] - pic[1])*100./distances_length < 30:
solutions[len(solutions)-1, 1] += 1
if solutions[len(solutions)-1, 1] > best_solution:
stem_pos_after_ear = stem[0]
# If no solution found with percentile at 15%, redo calculation with
# percentile at 5% to force a result
if not first_found:
solutions = np.empty([0, 2], 'int')
superior_min = td[int(round(len(td)*5/100))]
iteration += 1
return solutions, stems, peaks, [pos_min, stem_pos_after_ear]