experiments

This commit is contained in:
leca 2025-05-10 13:41:22 +03:00
parent 9f153eae91
commit 8183e8ceed
1 changed files with 78 additions and 45 deletions

123
main.py
View File

@ -4,33 +4,42 @@ from dotenv import load_dotenv
from base64 import b64decode from base64 import b64decode
import re import re
import requests import requests
import tf2onnx
import cv2 import cv2
import keras import keras
import numpy as np import numpy as np
from keras.callbacks import EarlyStopping, ModelCheckpoint
load_dotenv() load_dotenv()
DOWNLOAD_PATH=environ.get("DOWNLOAD_PATH") # Constants
TESTING_PATH=environ.get("TESTING_PATH") IMAGE_HEIGHT = 70
TRAINING_PATH=environ.get("TRAINING_PATH") IMAGE_WIDTH = 200
DOWNLOAD_PATH = environ.get("DOWNLOAD_PATH")
TESTING_PATH = environ.get("TESTING_PATH")
TRAINING_PATH = environ.get("TRAINING_PATH")
PERCENT_OF_TESTING = int(environ.get("PERCENT_OF_TESTING"))
def prepare_dirs(): def prepare_dirs():
"""Create necessary directories for downloading and storing images."""
makedirs(DOWNLOAD_PATH, exist_ok=True) makedirs(DOWNLOAD_PATH, exist_ok=True)
makedirs(TESTING_PATH, exist_ok=True) makedirs(TESTING_PATH, exist_ok=True)
makedirs(TRAINING_PATH, exist_ok=True) makedirs(TRAINING_PATH, exist_ok=True)
def fetch_captcha(id): def fetch_captcha(id):
# print(f"Fetching captcha with id {id}") """Fetch a captcha image by its ID and save it to the download path."""
captcha = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/{id}").json()["captcha"] try:
response = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/{id}")
with open(f"{DOWNLOAD_PATH}/{captcha['hash']}_{captcha['solution']}.jpeg", 'wb') as captcha_file: response.raise_for_status()
captcha_file.write(b64decode(captcha['image'])) captcha = response.json()["captcha"]
captcha_file_path = path.join(DOWNLOAD_PATH, f"{captcha['hash']}_{captcha['solution']}.jpeg")
with open(captcha_file_path, 'wb') as captcha_file:
captcha_file.write(b64decode(captcha['image']))
except requests.RequestException as e:
print(f"Error fetching captcha {id}: {e}")
def search_saved_captcha(hash, path): def search_saved_captcha(hash, path):
# print(f"searching captcha with hash {hash} in {path}") """Check if a captcha with the given hash exists in the specified path."""
regex = re.compile(hash + '_\\w{6}\\.jpeg') regex = re.compile(f"{hash}_\\w{{6}}\\.jpeg")
for _, _, files in walk(path): for _, _, files in walk(path):
for file in files: for file in files:
if regex.match(file): if regex.match(file):
@ -38,41 +47,45 @@ def search_saved_captcha(hash, path):
return False return False
def search_and_download_new(captchas): def search_and_download_new(captchas):
# print(f"Searching and downloading new captchas") """Search for new captchas and download them if they don't already exist."""
for captcha in captchas: for captcha in captchas:
id = captcha["id"] id = captcha["id"]
hash = captcha["hash"] hash = captcha["hash"]
training_exists = search_saved_captcha(hash, TRAINING_PATH) if not (search_saved_captcha(hash, TRAINING_PATH) or
testing_exists = search_saved_captcha(hash, TESTING_PATH) search_saved_captcha(hash, TESTING_PATH) or
new_exists = search_saved_captcha(hash, DOWNLOAD_PATH) search_saved_captcha(hash, DOWNLOAD_PATH)):
if not training_exists and not testing_exists and not new_exists:
fetch_captcha(id) fetch_captcha(id)
def sort_datasets(): def sort_datasets():
# print(f"Sorting datasets") """Sort downloaded captchas into training and testing datasets."""
percent_of_testing = int(environ.get("PERCENT_OF_TESTING")) amount_of_new_data = len([file for file in listdir(DOWNLOAD_PATH) if path.isfile(path.join(DOWNLOAD_PATH, file))])
amount_of_new_data = len([file for file in listdir(DOWNLOAD_PATH) if path.isfile(f'{DOWNLOAD_PATH}/{file}')]) amount_to_send_to_test = round(amount_of_new_data * (PERCENT_OF_TESTING / 100))
amount_to_send_to_test = round(amount_of_new_data * (percent_of_testing / 100))
for _, _, files in walk(DOWNLOAD_PATH): files = listdir(DOWNLOAD_PATH)
for index, file in enumerate(files): for index, file in enumerate(files):
if index < amount_to_send_to_test: if index < amount_to_send_to_test:
move(f"{DOWNLOAD_PATH}/{file}", TESTING_PATH) move(path.join(DOWNLOAD_PATH, file), TESTING_PATH)
else: else:
move(f"{DOWNLOAD_PATH}/{file}", TRAINING_PATH) move(path.join(DOWNLOAD_PATH, file), TRAINING_PATH)
def download_dataset(): def download_dataset():
"""Download the dataset of captchas and sort them into training and testing sets."""
prepare_dirs() prepare_dirs()
try:
captchas = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/all").json()["captchas"] response = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/all")
response.raise_for_status()
search_and_download_new(captchas) captchas = response.json()["captchas"]
sort_datasets() search_and_download_new(captchas)
sort_datasets()
except requests.RequestException as e:
print(f"Error downloading dataset: {e}")
def load_dataset(dataset_path): def load_dataset(dataset_path):
"""Load images and their corresponding solutions from the specified dataset path."""
images = [] images = []
solutions = [] solutions = []
for filename in listdir(dataset_path): for filename in listdir(dataset_path):
img = cv2.imread(f"{dataset_path}/{filename}") img = cv2.imread(path.join(dataset_path, filename))
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img = img / 255.0 img = img / 255.0
images.append(img) images.append(img)
@ -83,39 +96,59 @@ def load_dataset(dataset_path):
solution_to_label = {solution: i for i, solution in enumerate(unique_solutions)} solution_to_label = {solution: i for i, solution in enumerate(unique_solutions)}
labels = [solution_to_label[solution] for solution in solutions] labels = [solution_to_label[solution] for solution in solutions]
return images, labels, unique_solutions return np.array(images), np.array(labels), unique_solutions
def load_training_dataset(): def load_training_dataset():
"""Load the training dataset."""
return load_dataset(TRAINING_PATH) return load_dataset(TRAINING_PATH)
def load_testing_dataset(): def load_testing_dataset():
"""Load the testing dataset."""
return load_dataset(TESTING_PATH) return load_dataset(TESTING_PATH)
def train_nn(): def train_nn():
"""Train the neural network on the training dataset."""
training_images, training_labels, unique_solutions = load_training_dataset() training_images, training_labels, unique_solutions = load_training_dataset()
if int(environ.get("PERCENT_OF_TESTING")) > 0: testing_images, testing_labels = (None, None)
if PERCENT_OF_TESTING > 0:
testing_images, testing_labels, _ = load_testing_dataset() testing_images, testing_labels, _ = load_testing_dataset()
model = keras.Sequential([ model = keras.Sequential([
keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(70, 200, 1)), keras.layers.Conv2D(128, (3, 3), activation='relu', input_shape=(IMAGE_HEIGHT, IMAGE_WIDTH, 1)),
keras.layers.MaxPooling2D((2, 2)), keras.layers.MaxPooling2D((2, 2)),
keras.layers.Conv2D(64, (3, 3), activation='relu'), keras.layers.Conv2D(256, (3, 3), activation='relu'),
keras.layers.MaxPooling2D((2, 2)), keras.layers.MaxPooling2D((2, 2)),
keras.layers.Conv2D(64, (3, 3), activation='relu'), keras.layers.Conv2D(256, (3, 3), activation='relu'),
keras.layers.Flatten(), keras.layers.Flatten(),
keras.layers.Dense(64, activation='relu'), keras.layers.Dense(128, activation='relu'),
keras.layers.Dense(len(unique_solutions), activation='softmax') keras.layers.Dropout(0.5), # Dropout for regularization
keras.layers.Dense(len(unique_solutions), activation='softmax') # Output layer
]) ])
model.summary()
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
if int(environ.get("PERCENT_OF_TESTING")) > 0:
model.fit(np.array(training_images), np.array(training_labels), epochs=10, batch_size=128, validation_data=(np.array(testing_images), np.array(testing_labels))) callbacks = [
EarlyStopping(monitor='accuracy', patience=3),
ModelCheckpoint('best_model.keras', save_best_only=True)
]
EPOCHS = 100
BATCH_SIZE = 8
if PERCENT_OF_TESTING > 0:
model.fit(np.array(training_images), np.array(training_labels),
epochs=EPOCHS, batch_size=BATCH_SIZE, callbacks=callbacks,
validation_data=(np.array(testing_images), np.array(testing_labels)),
)
else: else:
model.fit(np.array(training_images), np.array(training_labels), epochs=10, batch_size=128) model.fit(np.array(training_images), np.array(training_labels),
epochs=EPOCHS, batch_size=BATCH_SIZE, callbacks=callbacks
)
keras.saving.save_model(model, 'captcha_solver.keras') keras.saving.save_model(model, 'captcha_solver.keras')
# model.save('model.h5')
# tf2onnx.convert.from_keras(model, opset=13, output_path='model_onnx')
if __name__ == "__main__": if __name__ == "__main__":