1
0
mirror of https://github.com/ami-sc/fire-detection.git synced 2024-07-07 08:27:50 +02:00

Update project code

This commit is contained in:
ami-sc 2023-11-17 16:34:48 -06:00
parent 83dbf73171
commit 9bc1c56f96
8 changed files with 376 additions and 214 deletions

View File

@ -1,22 +1,29 @@
"""
**Dataset Generation**
Generates a dataset containing wavelet features for a given fire and non-fire
image dataset.
"""
import os
import cv2
import numpy as np
import pandas as pd
from PCAFeatureExtraction import getPCAFeatures
from WaveletFeatureExtraction import getWaveletFeatures
dataset_name = "effective_forest_fire_detection"
fire_dir = f"data/{dataset_name}/fire"
non_fire_dir = f"data/{dataset_name}/nonfire"
# Change this to switch datasets.
dataset_name = "fire_dataset"
fire_dir = f"./data/raw/{dataset_name}/fire"
non_fire_dir = f"./data/raw/{dataset_name}/nonfire"
pca_features = []
wavelet_features = []
print("[ > ] Processing FIRE images.")
# Load fire images.
# Load all fire images.
fire_img_files = os.listdir(fire_dir)
i = 1
@ -26,18 +33,13 @@ for fire_img_file in fire_img_files:
# Load the image.
fire_img = cv2.imread(os.path.join(fire_dir, fire_img_file))
# Get PCA features.
pca = getPCAFeatures(fire_img)
# Get wavelet features.
wav = getWaveletFeatures(fire_img)
# Add the label of the image (1 - FIRE).
pca = np.append(pca, 1)
wav += [1]
# Save extracted features.
pca_features.append(pca)
wavelet_features.append(wav)
i += 1
@ -45,7 +47,7 @@ print("[ > ] Finished processing FIRE images.\n")
print("[ > ] Processing NON-FIRE images.")
# Load non-fire images.
# Load all non-fire images.
non_fire_img_files = os.listdir(non_fire_dir)
i = 1
@ -55,47 +57,26 @@ for non_fire_img_file in non_fire_img_files:
# Load the image.
non_fire_img = cv2.imread(os.path.join(non_fire_dir, non_fire_img_file))
# Get PCA features.
pca = getPCAFeatures(non_fire_img)
# Get wavelet features.
wav = getWaveletFeatures(non_fire_img)
# Add the label of the image (0 - NON-FIRE).
pca = np.append(pca, 0)
wav += [0]
# Save extracted features.
pca_features.append(pca)
wavelet_features.append(wav)
i += 1
print("[ > ] Finished processing NON-FIRE images.\n")
# -- PCA Features --
# Create a feature DataFrame.
dataset = pd.DataFrame(data = pca_features)
dataset = pd.DataFrame(data=wavelet_features)
# Randomize the order of the samples.
dataset = dataset.sample(frac = 1)
dataset.reset_index(drop = True, inplace = True)
dataset = dataset.sample(frac=1)
dataset.reset_index(drop=True, inplace=True)
# Save generated features to a .csv file.
dataset.to_csv(f"data/PCA__{dataset_name}.csv", index = False)
print("[ > ] Saved PCA feature dataset.")
# -- Wavelet Features --
# Create a feature DataFrame.
dataset = pd.DataFrame(data = wavelet_features)
# Randomize the order of the samples.
dataset = dataset.sample(frac = 1)
dataset.reset_index(drop = True, inplace = True)
# Save generated features to a .csv file.
dataset.to_csv(f"data/WAV__{dataset_name}.csv", index = False)
dataset.to_csv(f"./data/processed/WAV__{dataset_name}.csv", index=False, header=False)
print("[ > ] Saved wavelet feature dataset.")

View File

@ -1,36 +0,0 @@
import cv2
import numpy as np
def getImageFeatures (image):
# Get the channels of the image.
b, g, r = cv2.split(image)
# Normalize each channel.
r_norm = r / 255
g_norm = g / 255
b_norm = b / 255
# "Flatten" the channels.
r_norm = r_norm.reshape([-1])
g_norm = g_norm.reshape([-1])
b_norm = b_norm.reshape([-1])
# Stack the individual arrays to a single array.
flat_rgb = np.vstack([r_norm, g_norm, b_norm])
# Center the data.
x_center = flat_rgb - np.mean(flat_rgb, axis = 1, keepdims = True)
# Calculate covariance matrix.
cov = np.cov(x_center)
# Perform eigendecomposition of the covariance matrix.
eigenvalues, eigenvectors = np.linalg.eig(cov)
# Sort the eigenvalues and eigenvectors.
idx = np.argsort(-eigenvalues)
eigenvalues = eigenvalues[idx]
eigenvectors = eigenvectors[:,idx]
return eigenvectors.reshape([-1])

288
src/MotionDetection.py Normal file
View File

@ -0,0 +1,288 @@
"""
**Motion Detection**
Detect moving regions in a given video.
"""
import cv2
import numpy as np
import xgboost as xgb
from WaveletFeatureExtraction import getWaveletFeatures
class MotionDetection:
def __init__(self, video_path=None, a=0.95, c=2, initial_threshold=40):
if video_path is None:
# Use camera by default.
self.video = cv2.VideoCapture(0)
else:
# If a path is given, take the video from that path.
self.video = cv2.VideoCapture(video_path)
# Load the model.
self.model = xgb.Booster()
self.model.load_model("./models/XGBoost_Tuned_Model.json")
self.run = False
self.contained = []
self.img = None
self.copy_img = 1
self.rectangles = []
self.rectangles_m = []
self.small_rectangles_index_m = []
self.moving_rectangles_index = []
# List with the index of all bigger moving rectangles.
self.moving_rectangles_index_m = []
# Parameters for the motion detection algorithm.
self.a = a
self.c = c
# Retrieve the shape of the input image.
self.shape = self.video.read(0)[1].shape
# Threshold matrix of equivalent size.
self.threshold = np.full(self.shape, initial_threshold)
# Red-colored matrix of equivalent size.
self.red = np.full(self.shape, [0, 0, 255], dtype=np.uint8)
def define_rectangles(self, bigger_rectangles=1):
"""
This method defines the borders of the moving regions.
"""
def find_overlapping_rectangles(bg_rectangles, smaller_rectangles):
"""
This is a helper function used only by this method. Its aim is to
find the index of the smaller rectangles that overlap bigger
rectangles. In this way, each bigger rectangle is associated to
a certain amount of smaller rectangles
"""
overlapping_lists = []
for big_idx, big_rect in enumerate(bg_rectangles):
overlapping_indices = []
for small_idx, small_rect in enumerate(smaller_rectangles):
if (small_rect[0] < big_rect[0] + big_rect[2] and
small_rect[0] + small_rect[2] > big_rect[0] and
small_rect[1] < big_rect[1] + big_rect[3] and
small_rect[1] + small_rect[3] > big_rect[1]):
overlapping_indices.append(small_idx)
overlapping_lists.append(overlapping_indices)
return overlapping_lists
# Number of rows of smaller rectangles.
h_n = 15
# Number of columns of smaller rectangles.
w_n = 15
# Shape of the image.
h = self.shape[0]
w = self.shape[1]
# Take w_n points from 0 to w.
width_points = np.linspace(0, w, w_n, dtype=int)
# Take h_n points from 0 to h.
height_points = np.linspace(0, h, h_n, dtype=int)
# Generate all the top left angles of each rectangle.
width_start, height_start = np.meshgrid(
width_points[:-1], height_points[:-1])
# Generate all bottom right angles of each rectangle.
width_end, height_end = np.meshgrid(
width_points[1:], height_points[1:])
self.rectangles = np.stack(
(width_start, height_start, width_end, height_end),
axis=-1).reshape(-1, 4)
# Could also be done recursively, having always bigger rectangles.
# degree of rectangles = "degree of smaller rectangle +1"
if bigger_rectangles != 0:
# self.rectangles_m = [[[X1_top, Y1_top, X1_bottom, Y1_bottom],
# [...], ...],
# [[X2_top, Y2_top, X2_bottom, Y2_bottom],
# [...], ...]]]
# 1 is for rectangles of degree 1, 2 is for rectangles of degree 2.
# self.small_rectangles_index_m = [[[1,5,...], [...], ...],
# [[4,0,...], [...], ...], [...], ...]
# The ith list represents the rectangles of degree i.
# The elements of the ith list represent, for each bigger rectangle,
# the indices of the smaller rectangles overlapping the bigger
# rectangle.
for i in range(1, bigger_rectangles + 1):
# Same procedure...
# If i = 1 the rectangles will be double the size of the
# smallest rectangles.
# If i = 2 the rectangles will be 4x the size of smallest
# rectangles.
width_points = np.linspace(0, w, w_n // (2 * i), dtype=int)
height_points = np.linspace(0, h, h_n // (2 * i), dtype=int)
width_start, height_start = np.meshgrid(
width_points[:-1], height_points[:-1])
width_end, height_end = np.meshgrid(
width_points[1:], height_points[1:])
bigger_rectangles = np.stack(
(width_start, height_start, width_end, height_end),
axis=-1).reshape(-1, 4)
# End of generation of rectangles of ith degree.
self.rectangles_m.append(bigger_rectangles)
self.small_rectangles_index_m.append(
find_overlapping_rectangles(bigger_rectangles,
self.rectangles))
def draw_rectangles(self):
# Draw bigger rectangles.
self.copy_img = self.img.copy()
for degree, moving_rectangles_degree in enumerate(
self.moving_rectangles_index_m):
for rectangle_index in moving_rectangles_degree:
u = self.rectangles_m[degree][rectangle_index]
top_left, bottom_right = (u[0], u[1]), (u[2], u[3])
image = self.img[top_left[1]:bottom_right[1], top_left[0]:bottom_right[0]]
# Get features for rectangle.
fts = np.array([getWaveletFeatures(image)])
# Predict using features.
a = self.model.predict(xgb.DMatrix(fts))
# Draw the borders of the rectangle.
cv2.rectangle(self.copy_img, top_left, bottom_right, [
255, 255, 0], 3)
# Show prediction score.
cv2.putText(
self.copy_img, f"{a[0]:.2f}", top_left,
cv2.FONT_HERSHEY_SIMPLEX, 1, (36, 255, 12), 2)
# Draw smaller rectangles.
for i in self.moving_rectangles_index:
u = self.rectangles[i]
top_left, bottom_right = (u[0], u[1]), (u[2], u[3])
image = self.img[top_left[1]:bottom_right[1], top_left[0]:bottom_right[0]]
# Get features for rectangle.
fts = np.array([getWaveletFeatures(image)])
# Predict using features.
a = self.model.predict(xgb.DMatrix(fts))
# Draw the borders of the rectangle.
cv2.rectangle(self.copy_img, top_left, bottom_right, [
255, 0, 0], 3)
# Show prediction score.
cv2.putText(self.copy_img, f"{a[0]:.2f}", top_left,
cv2.FONT_HERSHEY_SIMPLEX, 1, (36, 255, 12), 2)
def rectangle_moving(self, moving_pixels):
# List with the index of all smaller moving rectangles.
self.moving_rectangles_index = []
for i, borders in enumerate(self.rectangles):
# Take the pixels in the region.
rectangle = moving_pixels[borders[1]:borders[3], borders[0]:borders[2]]
# If more that 20% of the pixels are moving, then append the index
# of the rectangle to the list of moving rectangles index.
if rectangle.sum() > rectangle.size * 0.2:
self.moving_rectangles_index.append(i)
self.moving_rectangles_index = np.array(self.moving_rectangles_index)
for i, small_rectangles_index in enumerate(
self.small_rectangles_index_m):
# Contains the index of the rectangles of (i-1)th degree contained
# by the rectangles of (i)th degree.
self.contained = set()
# Create an empty list of the rectangles of ith degree.
self.moving_rectangles_index_m.append([])
# For every rectangle of ith degree.
for j, smaller_rectangles_in_i in enumerate(small_rectangles_index):
# Take the moving rectangles of (i-1)th degree contained in the
# ith degree rectangle.
common = np.intersect1d(
np.array(smaller_rectangles_in_i),
self.moving_rectangles_index)
# If more than 25% of rectangles of (i-1)th degree contained
# are moving...
if len(common) / len(smaller_rectangles_in_i) > 0.2:
# ...then the ith degree rectangle is moving.
self.moving_rectangles_index_m[i].append(j)
# Put the rectangles of (i-1)th degree in the set of moving
# and contained rectangles of (i-1)th degree.
self.contained.update(common)
# The rectangles of (i-1)th that are moving and are not contained.
self.moving_rectangles_index = np.setdiff1d(
self.moving_rectangles_index, list(self.contained))
def stop(self):
self.run = False
def fit(self):
cv2.namedWindow("Video", cv2.WINDOW_KEEPRATIO)
background = self.video.read()[1]
previous_image = self.video.read()[1]
threshold = self.threshold
a = self.a
c = self.c
self.define_rectangles()
self.run = True
# Start loop.
i = -1
while self.run:
i += 1
self.img = self.video.read()[1]
# Find if pixel is moving.
difference_with_previous = abs(self.img - previous_image)
changes = difference_with_previous > threshold # binary mask
# Update background.
updated_background = background * a + (1 - a) * self.img
# background if false, updated_background if true
background = np.where(changes, updated_background, background)
# Update threshold.
difference_with_background = abs(self.img - background)
changes_compared_to_background = difference_with_background > threshold # binary mask
updated_threshold = threshold * a + (1 - a) * (
c * difference_with_background)
threshold = np.where(changes, updated_threshold, threshold)
# Detecting moving regions.
moving_pixels = np.any(changes_compared_to_background, axis=2)
previous_image = self.img
self.rectangle_moving(moving_pixels)
self.draw_rectangles()
cv2.imshow("Video", self.copy_img)
if cv2.waitKey(1) & 0xFF == ord('q'):
break

View File

@ -1,48 +0,0 @@
import cv2
import numpy as np
def getPCAFeatures (image):
# Get the channels of the image.
b, g, r = cv2.split(image)
# Create an HSV representation of the image.
img_hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# Get the HSV channels of the image.
h, s, v = cv2.split(img_hsv)
# Normalize each channel.
r_norm = r / 255
g_norm = g / 255
b_norm = b / 255
h_norm = h / 179
s_norm = s / 255
v_norm = v / 255
# "Flatten" the channels.
r_norm = r_norm.reshape([-1])
g_norm = g_norm.reshape([-1])
b_norm = b_norm.reshape([-1])
h_norm = h_norm.reshape([-1])
s_norm = s_norm.reshape([-1])
v_norm = v_norm.reshape([-1])
# Stack the individual arrays to a single array.
flat_ch = np.vstack([r_norm, g_norm, b_norm, h_norm, s_norm, v_norm])
# Center the data.
x_center = flat_ch - np.mean(flat_ch, axis = 1, keepdims = True)
# Calculate covariance matrix.
cov = np.cov(x_center)
# Perform eigendecomposition of the covariance matrix.
eigenvalues, eigenvectors = np.linalg.eig(cov)
# Sort the eigenvalues and eigenvectors.
idx = np.argsort(-eigenvalues)
eigenvalues = eigenvalues[idx]
eigenvectors = eigenvectors[:,idx]
return eigenvectors.reshape([-1])

16
src/README.md Normal file
View File

@ -0,0 +1,16 @@
# `/src`
***
## Directory Contents
- `./README.md`:
- This file.
- `./main.py`:
- Contains the GUI and driver code for the application.
- `./MotionDetection.py`:
- The motion detection algorithm and model prediction code.
- `./WaveletFeatureExtraction.py`:
- The wavelet feature extractor code that takes a given image or video frame and extracts relevant features from the color histograms.
- `./DatasetGeneration.py`:
- Code for generating the `WAV` `.csv` files from the image datasets.

View File

@ -1,7 +1,16 @@
"""
**Wavelet Feature Extraction**
Performs a Wavelet Decomposition of the RGB and HSV channels of a given image,
and generates features given each coefficient of the decomposition.
"""
import cv2
import pywt
import numpy as np
def getWaveletFeatures(img):
# Create an HSV representation of the image.
@ -26,25 +35,25 @@ def getWaveletFeatures(img):
for hist in img_hists:
# Get the first (most relevant) wavelet coefficient.
coeff = pywt.wavedec(hist, "db20")[0]
wav_coefficient = pywt.wavedec(hist, "db20")[0]
# Get features for the coewfficient.
img_features += getCoefficientFeatures(coeff)
# Get features for the coefficient.
img_features += getCoefficientFeatures(wav_coefficient)
return img_features
def getCoefficientFeatures(coeff):
n5 = np.nanpercentile(coeff, 5)
n25 = np.nanpercentile(coeff, 25)
n75 = np.nanpercentile(coeff, 75)
n95 = np.nanpercentile(coeff, 95)
def getCoefficientFeatures(wav_coefficient):
median = np.nanpercentile(coeff, 50)
mean = np.nanmean(coeff)
std = np.nanstd(coeff)
var = np.nanvar(coeff)
rms = np.nanmean(np.sqrt(coeff**2))
n5 = np.nanpercentile(wav_coefficient, 5)
n25 = np.nanpercentile(wav_coefficient, 25)
n75 = np.nanpercentile(wav_coefficient, 75)
n95 = np.nanpercentile(wav_coefficient, 95)
median = np.nanpercentile(wav_coefficient, 50)
mean = np.nanmean(wav_coefficient)
std = np.nanstd(wav_coefficient)
var = np.nanvar(wav_coefficient)
rms = np.nanmean(np.sqrt(wav_coefficient**2))
return [n5, n25, n75, n95, median, mean, std, var, rms]

30
src/main.py Normal file
View File

@ -0,0 +1,30 @@
import os
from PySimpleGUI import Window, WIN_CLOSED, Button
from threading import Thread
from MotionDetection import MotionDetection
layout = [[Button("START", size=(10, 1), font="Helvetica 14")],
[Button("STOP", size=(10, 1), font="Helvetica 14")]]
window = Window("Demo Application - Fire Detection",
layout, location=(800, 400))
# Change to select a different video.
motion_detect = MotionDetection(video_path="./assets/example_video.webm")
md_thread = Thread(target=motion_detect.fit, daemon=True)
while True:
event, values = window.read(timeout=20)
if event == "Exit" or event == WIN_CLOSED or event == "STOP":
motion_detect.run = False
md_thread.join()
break
elif event == "START":
md_thread.start()
os._exit(0)

View File

@ -1,78 +0,0 @@
import cv2
import numpy as np
from matplotlib import pyplot as plt
# import pickle
# import os
import pandas as pd
from FeatureExtraction import getImageFeatures
#structure of the folder:
#projects
#|fire-detection
#||fire_dataset
#|||fire_images
#|||non_fire_images
#||preprocessing
#||data
#|||dataset.pk
#do this only the first time, then comment from here:
#we remove image 370 from fire_images because it gives some errors
######################################################
#os.remove("projects/fire-detection/fire_dataset/fire_images/fire.370.png")
#renaming to keep consistency
#for i in range(371, 756):
# os.rename(f"projects/fire-detection/fire_dataset/fire_images/fire.{i}.png",
# f"projects/fire-detection/fire_dataset/fire_images/fire.{i-1}.png")
#to here:
######################################################"""
#1) coment this out to load the images the first time
fire_images = [cv2.imread(f"data/fire_dataset/fire_images/fire.{i}.png") for i in range(1,756)]
non_fire_images = [cv2.imread(f"data/fire_dataset/non_fire_images/non_fire.{i}.png") for i in range(1,245)]
# #2) comment this out to save lists in a file
# with open("projects/fire-detection/data/dataset.pk", "wb") as f:
# pickle.dump((fire_images, non_fire_images), f)
# #3) comment this out to load the lists from the file
# """with open("projects/fire-detection/data/dataset.pk", "rb") as f:
# fire_images, non_fire_images = pickle.load(f)"""
# ******************************************************************************
# Store extracted features from PCA.
img_features = []
i = 1
# Extract features from "fire" images.
for image in fire_images:
print(f"[ > ] Extracting features from fire image {i}.")
img_features.append(getImageFeatures(image))
i += 1
i = 1
# Extract features from "non-fire" images.
for image in non_fire_images:
print(f"[ > ] Extracting features from non-fire image {i}.")
img_features.append(getImageFeatures(image))
i += 1
# Generate the labels for the data.
fire_labels = np.ones(len(fire_images))
non_fire_labels = np.zeros(len(non_fire_images))
label_set = np.hstack([fire_labels, non_fire_labels])
# Create a feature DataFrame.
dataset = pd.DataFrame(data = img_features)
dataset = pd.concat([dataset, pd.DataFrame(label_set, columns = ["Label"])],
axis = 1)
# Randomize the order of the samples.
dataset = dataset.sample(frac = 1)
dataset.reset_index(drop = True, inplace = True)
# Save generated features to a .csv file.
dataset.to_csv("data/PCA_Features.csv", index = False)