#encoding=utf-8 import time import os import sys import pandas as pd sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import time import glob import numpy as np np.random.seed(42) from datetime import datetime import json from src.utils import get_meshgrid, get_world_points, get_camera_points, get_screen_points, write_point_cloud, get_white_mask, get_meshgrid_contour, post_process, find_notch from src.phase import extract_phase, unwrap_phase from src.recons import reconstruction_cumsum from src.pcl_postproc import smooth_pcl, align2ref import matplotlib.pyplot as plt from src.calibration import calibrate_world, calibrate_screen, map_screen_to_world import argparse from src.vis import plot_coords import cv2 from src.eval import get_eval_result import pickle from collections import defaultdict def pmdstart(config_path, img_folder): start_time = time.time() print(f"config_path: {config_path}") #time.sleep(15) main(config_path, img_folder) print(f"img_folder: {img_folder}") print('test pass') end_time = time.time() print(f"Time taken: {end_time - start_time} seconds") return True def main(config_path, img_folder): current_dir = os.path.dirname(os.path.abspath(__file__)) os.chdir(current_dir) cfg = json.load(open(config_path, 'r')) n_cam = 4 num_freq = cfg['num_freq'] save_path = 'debug' debug = False grid_spacing = cfg['grid_spacing'] num_freq = cfg['num_freq'] smooth = True align = True denoise = True cammera_img_path = 'D:\\data\\four_cam\\calibrate\\calibrate-1008' screen_img_path = 'D:\\data\\four_cam\\calibrate\\cam3-screen-1008' #cammera_img_path = 'D:\\data\\four_cam\\calibrate\\calibration_0913' #screen_img_path = 'D:\\data\\four_cam\\calibrate\\screen0920' print(f"开始执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("\n1. 相机标定") preprocess_start = time.time() cam_para_path = os.path.join('config', cfg['cam_params']) if os.path.exists(cam_para_path): #if False: with open(cam_para_path, 'rb') as pkl_file: cam_params = pickle.load(pkl_file) else: cam_params = [] camera_subdir = [item for item in os.listdir(cammera_img_path) if os.path.isdir(os.path.join(cammera_img_path, item))] camera_subdir.sort() assert len(camera_subdir) == 4, f"found {len(camera_subdir)} cameras, should be 4" for i in range(n_cam): cam_img_path = glob.glob(os.path.join(cammera_img_path, camera_subdir[i], "*.bmp")) cam_img_path.sort() print('cam_img_path = ', cam_img_path) cam_param_raw = calibrate_world(cam_img_path, i, cfg['world_chessboard_size'], cfg['world_square_size'], debug=0) cam_params.append(cam_param_raw) with open(cam_para_path, 'wb') as pkl_file: pickle.dump(cam_params, pkl_file) print("\n2. 屏幕标定") screen_cal_start = time.time() screen_img_path = glob.glob(os.path.join(screen_img_path, "*.bmp")) screen_para_path = os.path.join('config', cfg['screen_params']) if os.path.exists(screen_para_path): #if False: with open(screen_para_path, 'rb') as pkl_file: screen_params = pickle.load(pkl_file)[0] else: screen_params = calibrate_screen(screen_img_path, cam_params[3]['camera_matrix'], cam_params[3]['distortion_coefficients'], cfg['screen_chessboard_size'], cfg['screen_square_size'], debug=0) with open(screen_para_path, 'wb') as pkl_file: pickle.dump([screen_params], pkl_file) screen_to_world = map_screen_to_world(screen_params, cam_params[3]) screen_cal_end = time.time() print(f" 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f" 耗时: {screen_cal_end - screen_cal_start:.2f} 秒") print("\n3. 相位提取,相位展开") phase_start = time.time() x_uns, y_uns = [], [] binary_masks = [] for cam_id in range(n_cam): print('cam_id = ', cam_id) white_path = os.path.join(img_folder, f'{cam_id}_frame_24.bmp') binary = get_white_mask(white_path, bin_thresh=12, debug=0) binary_masks.append(binary) phases = extract_phase(img_folder, cam_id, binary, cam_params[cam_id]['camera_matrix'], cam_params[cam_id]['distortion_coefficients'], num_freq=num_freq) x_un, y_un = unwrap_phase(phases, save_path, num_freq, debug=0) x_uns.append(x_un) y_uns.append(y_un) phase_end = time.time() print(f" 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f" 耗时: {phase_end - phase_start:.2f} 秒") print("\n4. 获得不同坐标系下点的位置") get_point_start = time.time() total_cloud_point = np.empty((0, 3)) total_boundary_point = np.empty((0, 3)) for i in range(n_cam): print('cam_id = ', i) contours_point = get_meshgrid_contour(binary_masks[i], save_path, debug=False) world_points, world_points_boundary, world_points_boundary_3 = get_world_points(contours_point, cam_params[i], i, grid_spacing, cfg['d'], erosion_pixels=15, debug=0) camera_points, u_p, v_p = get_camera_points(world_points, cam_params[i], save_path, i, debug=0) point_data = {'x_w': world_points[:, 0], 'y_w': world_points[:, 1], 'z_w': world_points[:, 2], 'x_c': camera_points[:, 0], 'y_c': camera_points[:, 1], 'z_c': camera_points[:, 2], 'u_p': u_p, 'v_p': v_p} screen_points = get_screen_points(point_data, x_uns[i], y_uns[i], screen_params, screen_to_world, cfg, save_path, i, debug=debug) #plot_coords(world_points, camera_points, screen_points) z_raw, aligned, smoothed, denoised = reconstruction_cumsum(world_points, camera_points, screen_points, save_path, i, debug=0, smooth=smooth, align=align, denoise=denoise) z_raw_xy = np.round(z_raw[:, :2]).astype(int) # 创建布尔掩码,初始为 True mask = np.ones(len(z_raw_xy), dtype=bool) # 遍历每个边界点,标记它们在 aligned 中的位置 for boundary_point in world_points_boundary: # 标记与当前边界点相同 xy 坐标的行 mask &= ~np.all(z_raw_xy == boundary_point[:2], axis=1) # 使用掩码过滤出非边界点 non_boundary_points = z_raw[mask] non_boundary_aligned, rotation_matrix = align2ref(non_boundary_points) # 创建布尔掩码,初始为 True mask = np.ones(len(z_raw_xy), dtype=bool) # 遍历每个边界点,标记它们在 aligned 中的位置 for boundary_point in world_points_boundary_3: # 标记与当前边界点相同 xy 坐标的行 mask &= ~np.all(z_raw_xy == boundary_point[:2], axis=1) # 使用掩码过滤出非边界点 non_boundary_points = z_raw[mask] z_raw_aligned = non_boundary_points @ rotation_matrix.T #z_raw_aligned[:,2] = z_raw_aligned[:,2] - np.mean(z_raw_aligned[:, 2]) #non_boundary_points = smoothed write_point_cloud(os.path.join(img_folder, str(i) + '_cloudpoint.txt'), np.round(z_raw_aligned[:, 0]), np.round(z_raw_aligned[:, 1]), 1000*z_raw_aligned[:, 2]) total_cloud_point = np.vstack([total_cloud_point, np.column_stack((z_raw_aligned[:, 0], z_raw_aligned[:, 1], 1000*z_raw_aligned[:, 2]))]) if debug: fig = plt.figure() ax = fig.add_subplot(111, projection='3d') # 提取 x, y, z 坐标 x_vals = total_cloud_point[:, 0] y_vals = total_cloud_point[:, 1] z_vals = total_cloud_point[:, 2] # 绘制3D点云 ax.scatter(x_vals, y_vals, z_vals, c=z_vals, cmap='viridis', marker='o') # 设置轴标签和标题 ax.set_xlabel('X (mm)') ax.set_ylabel('Y (mm)') ax.set_zlabel('Z (mm)') ax.set_title('3D Point Cloud Visualization') plt.show() # fig = plt.figure() # ax = fig.add_subplot(111, projection='3d') # smoothed_total = smooth_pcl(total_cloud_point, 3) # # 提取 x, y, z 坐标 # x_vals = smoothed_total[:, 0] # y_vals = smoothed_total[:, 1] # z_vals = smoothed_total[:, 2] # # 绘制3D点云 # ax.scatter(x_vals, y_vals, z_vals, c=z_vals, cmap='viridis', marker='o') # # 设置轴标签和标题 # ax.set_xlabel('X (mm)') # ax.set_ylabel('Y (mm)') # ax.set_zlabel('Z (mm)') # ax.set_title('smoothed 3D Point Cloud Visualization') get_point_end = time.time() print(f" 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f" 耗时: {get_point_end - get_point_start:.2f} 秒") print("\n5. 后处理") post_process_start = time.time() total_cloud_point[:,0] = np.round(total_cloud_point[:,0]) total_cloud_point[:,1] = np.round(total_cloud_point[:,1]) fitted_points = post_process(total_cloud_point, debug=0) align_fitted, _= align2ref(fitted_points) write_point_cloud(os.path.join(img_folder, 'cloudpoint.txt'), np.round(align_fitted[:, 0]-np.mean(align_fitted[:, 0])), np.round(align_fitted[:, 1]-np.mean(align_fitted[:, 1])), align_fitted[:,2]-np.min(align_fitted[:,2])) if debug: fig = plt.figure() ax = fig.add_subplot(111, projection='3d') # 提取 x, y, z 坐标 x_vals = np.round(fitted_points[:, 0]-np.mean(fitted_points[:, 0])) y_vals = np.round(fitted_points[:, 1]-np.mean(fitted_points[:, 1])) z_vals = align_fitted[:,2]-np.min(align_fitted[:,2]) # 绘制3D点云 ax.scatter(x_vals, y_vals, z_vals, c=z_vals, cmap='viridis', marker='o') # 设置轴标签和标题 ax.set_xlabel('X (mm)') ax.set_ylabel('Y (mm)') ax.set_zlabel('Z (mm)') ax.set_title('3D Point Cloud Visualization') plt.show() post_process_end = time.time() print(f" 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f" 耗时: {post_process_end - post_process_start:.2f} 秒") print("\n6. 评估") eval_start = time.time() point_cloud_path = os.path.join(img_folder, 'cloudpoint.txt') json_path = os.path.join(img_folder, 'result.json') theta_notch = 0 get_eval_result(point_cloud_path, json_path, theta_notch, 0) eval_end = time.time() print(f" 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f" 耗时: {eval_end - eval_start:.2f} 秒") return True if __name__ == '__main__': config_path = 'config\\cfg_3freq_wafer.json' img_folder = 'D:\\data\\four_cam\\1008_storage\\20241009163255262' json_path = os.path.join(img_folder, 'result.json') pmdstart(config_path, img_folder) point_cloud_path = os.path.join(img_folder, 'cloudpoint.txt') theta_notch = 0 #get_eval_result(point_cloud_path, json_path, theta_notch, 0)