#encoding=utf-8 import time import os import sys import pandas as pd sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import time import glob import numpy as np np.random.seed(42) from datetime import datetime import json from src.utils import get_meshgrid, get_world_points, get_camera_points, get_screen_points, write_point_cloud, get_white_mask, get_meshgrid_contour, post_process, find_notch, post_process_with_grad from src.phase import extract_phase, unwrap_phase from src.recons import reconstruction_cumsum, poisson_recons_with_smoothed_gradients from src.pcl_postproc import smooth_pcl, align2ref import matplotlib.pyplot as plt from src.calibration import calibrate_world, calibrate_screen_chessboard, calibrate_screen_circlegrid, map_screen_to_world from src.calibration.get_camera_params import calibrate_world_aprilgrid import argparse from src.vis import plot_coords import cv2 from src.eval import get_eval_result import pickle from collections import defaultdict from scipy.io import loadmat, savemat from scipy.optimize import minimize import copy def pmdstart(config_path, img_folder): start_time = time.time() print(f"config_path: {config_path}") #time.sleep(15) main(config_path, img_folder) print(f"img_folder: {img_folder}") print('test pass') end_time = time.time() print(f"Time taken: {end_time - start_time} seconds") return True def optimize_calibration_with_gradient_constraint(cam_params, screen_params, point_cloud, gradient_data, config_path): # 只使用屏幕参数(旋转矩阵和平移向量) initial_params = np.concatenate([ screen_params['screen_rotation_matrix'].flatten(), screen_params['screen_translation_vector'].flatten() ]) # 定义屏幕参数的尺度因子 scale_factors = np.ones_like(initial_params) scale_factors[0:9] = 1.0 # 旋转矩阵 scale_factors[9:] = 100.0 # 平移向量(假设单位是毫米) # 归一化初始参数 normalized_params = initial_params / scale_factors iteration_count = [0] print("\nInitial screen parameters:") print(f"Rotation matrix:\n{screen_params['screen_rotation_matrix']}") print(f"Translation vector:\n{screen_params['screen_translation_vector']}") def objective_function(normalized_params): try: # 反归一化参数 params = normalized_params * scale_factors iteration_count[0] += 1 print(f"\nIteration {iteration_count[0]}:") # 解析屏幕参数 screen_R = params[:9].reshape(3, 3) screen_T = params[9:].reshape(3, 1) # 构建参数字典 temp_cam_params = [{ 'camera_matrix': cam_params['camera_matrix'], 'distortion_coefficients': cam_params['distortion_coefficients'], 'rotation_matrix': cam_params['rotation_matrix'], 'rotation_vector': cam_params['rotation_vector'], 'translation_vector': cam_params['translation_vector'] }] temp_screen_params = { 'screen_matrix': screen_params['screen_matrix'], 'screen_distortion': screen_params['screen_distortion'], 'screen_rotation_matrix': screen_R, 'screen_rotation_vector': cv2.Rodrigues(screen_R)[0], 'screen_translation_vector': screen_T } if iteration_count[0] % 5 == 0: # 每5次迭代打印一次详细信息 print("\nCurrent screen parameters:") print(f"Rotation matrix:\n{screen_R}") print(f"Translation vector:\n{screen_T.flatten()}") try: reconstructed_points, reconstructed_gradients = reconstruct_surface( temp_cam_params, temp_screen_params, config_path, img_folder ) # 只计算平面度误差 planarity_error = compute_planarity_error(reconstructed_points) total_error = planarity_error print(f"Planarity error: {planarity_error:.6f}") # 监控点云变化 if hasattr(objective_function, 'prev_points'): point_changes = reconstructed_points - objective_function.prev_points print(f"Max point change: {np.max(np.abs(point_changes)):.8f}") print(f"Mean point change: {np.mean(np.abs(point_changes)):.8f}") objective_function.prev_points = reconstructed_points.copy() return total_error except Exception as e: print(f"Error in reconstruction: {str(e)}") import traceback traceback.print_exc() return 1e10 except Exception as e: print(f"Error in parameter processing: {str(e)}") import traceback traceback.print_exc() return 1e10 # 设置边界(只针对屏幕参数) bounds = [] # 旋转矩阵边界 for i in range(9): bounds.append((normalized_params[i]-0.5, normalized_params[i]+0.5)) # 平移向量边界 for i in range(3): bounds.append((normalized_params[i+9]-0.5, normalized_params[i+9]+0.5)) # 优化 result = minimize( objective_function, normalized_params, method='L-BFGS-B', bounds=bounds, options={ 'maxiter': 100, 'ftol': 1e-8, 'gtol': 1e-8, 'disp': True, 'eps': 1e-3 } ) # 反归一化获取最终结果 final_params = result.x * scale_factors # 构建最终的参数字典 optimized_screen_params = { 'screen_matrix': screen_params['screen_matrix'], 'screen_distortion': screen_params['screen_distortion'], 'screen_rotation_matrix': final_params[:9].reshape(3, 3), 'screen_rotation_vector': cv2.Rodrigues(final_params[:9].reshape(3, 3))[0], 'screen_translation_vector': final_params[9:].reshape(3, 1) } print("\nOptimization completed!") print("Final screen parameters:") print(f"Rotation matrix:\n{optimized_screen_params['screen_rotation_matrix']}") print(f"Translation vector:\n{optimized_screen_params['screen_translation_vector']}") return cam_params, optimized_screen_params def compute_planarity_error(points): """计算点云的平面度误差""" # 移除中心 centered_points = points - np.mean(points, axis=0) # 使用SVD找到最佳拟合平面 _, s, vh = np.linalg.svd(centered_points) # 最奇异值表示点到平面的平均距离 planarity_error = s[2] / len(points) return planarity_error # 在主程序中使用 def optimize_parameters(cam_params, screen_params, point_cloud, gradient_data): """ 优化参数的包装函数 """ print("开始优化标定参数...") # 保存原始参数 original_cam_params = copy.deepcopy(cam_params) original_screen_params = copy.deepcopy(screen_params) try: # 执行优化 new_cam_params, new_screen_params = optimize_calibration_with_gradient_constraint( cam_params, screen_params, point_cloud, gradient_data, config_path ) # 打印优化前后的参数变化 print("\n参数优化结果:") print("机内参变化:") print("原始值:\n", original_cam_params['camera_matrix']) print("优化后:\n", new_cam_params['camera_matrix']) print("\n屏幕姿态变化:") print("原始平移向量:", original_screen_params['screen_translation_vector']) print("优化后平移向量:", new_screen_params['screen_translation_vector']) return new_cam_params, new_screen_params except Exception as e: print("优化过程出错:") print(f"错误类型: {type(e).__name__}") print(f"错误信息: {str(e)}") print("错误详细信息:") import traceback traceback.print_exc() return original_cam_params, original_screen_params def optimization_callback(xk): """ 优化过程的回调函数,用于监控优化进度 """ global iteration_count iteration_count += 1 # 每10次迭代打印一次当前误差 if iteration_count % 10 == 0: error = objective_function(xk) print(f"Iteration {iteration_count}, Error: {error:.6f}") # 可以保存中间结果 points, grads = reconstruct_surface(...) np.save(f'intermediate_points_{iteration_count}.npy', points) np.save(f'intermediate_grads_{iteration_count}.npy', grads) def validate_optimization(original_points, original_grads, optimized_points, optimized_grads): """ 验证优化结果 """ # 计算平面度改善 original_planarity = compute_planarity_error(original_points) optimized_planarity = compute_planarity_error(optimized_points) # 计算梯度改善 - 使用法向量梯度 original_gradient_error = np.sum(np.abs(original_grads[:, 2:])) # gx, gy optimized_gradient_error = np.sum(np.abs(optimized_grads[:, 2:])) print("\n优化结果验证:") print(f"平面度误差: {original_planarity:.6f} -> {optimized_planarity:.6f}") print(f"梯度误差: {original_gradient_error:.6f} -> {optimized_gradient_error:.6f}") # 可视化结 fig = plt.figure(figsize=(15, 5)) # 原始点云 ax1 = fig.add_subplot(131, projection='3d') ax1.scatter(original_points[:, 0], original_points[:, 1], original_points[:, 2], c=original_grads[:, 2], cmap='viridis') ax1.set_title('Original Surface') # 优化后点云 ax2 = fig.add_subplot(132, projection='3d') ax2.scatter(optimized_points[:, 0], optimized_points[:, 1], optimized_points[:, 2], c=optimized_grads[:, 2], cmap='viridis') ax2.set_title('Optimized Surface') # 梯度分布对比 ax3 = fig.add_subplot(133) ax3.hist([original_grads[:, 2], optimized_grads[:, 2]], label=['Original', 'Optimized'], bins=50, alpha=0.7) ax3.set_title('Gradient Distribution') ax3.legend() plt.tight_layout() plt.show() def reconstruct_surface(cam_params, screen_params, config_path, img_folder): # 添加参数验证 # print("\nDebug - reconstruct_surface input parameters:") # print("Camera parameters:") # print(f"Matrix:\n{cam_params[0]['camera_matrix']}") # print(f"Translation:\n{cam_params[0]['translation_vector']}") cfg = json.load(open(config_path, 'r')) n_cam = cfg['cam_num'] grid_spacing = cfg['grid_spacing'] num_freq = cfg['num_freq'] x_uns, y_uns = [], [] binary_masks = [] for cam_id in range(n_cam): print('cam_id = ', cam_id) # 验证使用的是传入的参数而不是其他来源 #print(f"\nUsing camera parameters for camera {cam_id}:") #print(f"Matrix:\n{cam_params[cam_id]['camera_matrix']}") white_path = os.path.join(img_folder, f'{cam_id}_frame_24.bmp') #binary = get_white_mask(white_path, bin_thresh=82, size_thresh=0.5, debug=0) #凹 凸 平晶 平面镜 binary = get_white_mask(white_path, bin_thresh=40, size_thresh=0.8, debug=0) #binary = get_white_mask(white_path, bin_thresh=30, size_thresh=0.8, debug=0) #四相机 binary_masks.append(binary) mtx = cam_params[cam_id]['camera_matrix'] dist_coeffs = cam_params[cam_id]['distortion_coefficients'] phases = extract_phase(img_folder, cam_id, binary, mtx, dist_coeffs, num_freq) x_un, y_un = unwrap_phase(phases, num_freq, debug=0) x_uns.append(x_un) y_uns.append(y_un) np.save('./x_phase_unwrapped_python.npy', x_un) np.save('./y_phase_unwrapped_python.npy', y_un) if n_cam == 1: #screen_to_world = map_screen_to_world(screen_params, cam_params[0], -30, 10, 80) #screen_to_world = map_screen_to_world(screen_params, cam_params[0], -10, -10, 20) screen_to_world = map_screen_to_world(screen_params, cam_params[0], 0, 0, 0) elif n_cam == 4: screen_to_world = map_screen_to_world(screen_params, cam_params[3], 0, 0, 0) else: print('camera number should be 1 or 4') return 0 print("\n4. 获得不同坐标系下点的位置") total_cloud_point = np.empty((0, 3)) total_gradient = np.empty((0, 4)) for i in range(n_cam): print('cam_id = ', i) contours_point = get_meshgrid_contour(binary_masks[i], debug=0) world_points, world_points_boundary, world_points_boundary_3 = get_world_points(contours_point, cam_params[i], i, grid_spacing, cfg['d'], erosion_pixels=0, debug=0) camera_points, u_p, v_p = get_camera_points(world_points, cam_params[i], i, debug=0) point_data = {'x_w': world_points[:, 0], 'y_w': world_points[:, 1], 'z_w': world_points[:, 2], 'x_c': camera_points[:, 0], 'y_c': camera_points[:, 1], 'z_c': camera_points[:, 2], 'u_p': u_p, 'v_p': v_p} screen_points = get_screen_points(point_data, x_uns[i], y_uns[i], screen_to_world, cfg, i, debug=0) # 添加调试信息 # print(f"\nDebug - Reconstruction:") # print(f"Camera points shape: {camera_points.shape}") # print(f"Screen points shape: {screen_points.shape}") #ds(world_points, camera_points, screen_points) #plot_coords(world_points, camera_points, screen_points) z1_cumsum, gradient_xy = reconstruction_cumsum(world_points, camera_points, screen_points, debug=0) write_point_cloud(os.path.join(img_folder, str(i) + '_cloudpoint.txt'), np.round(z1_cumsum[:, 0]), np.round(z1_cumsum[:, 1]), z1_cumsum[:, 2]) np.savetxt(os.path.join(img_folder, str(i) + '_gradient.txt'), gradient_xy, fmt='%.10f', delimiter=',') total_cloud_point = np.vstack([total_cloud_point, np.column_stack((z1_cumsum[:, 0], z1_cumsum[:, 1], z1_cumsum[:, 2]))]) total_gradient = np.vstack([total_gradient, np.column_stack((gradient_xy[:, 0], gradient_xy[:, 1], gradient_xy[:, 2], gradient_xy[:, 3]))]) # 检查返回值 # print(f"z1_cumsum shape: {z1_cumsum.shape}") # print(f"gradient_xy shape: {gradient_xy.shape}") # print(f"Sample z1_cumsum values:\n{z1_cumsum[:3]}") return total_cloud_point, total_gradient def main(config_path, img_folder): current_dir = os.path.dirname(os.path.abspath(__file__)) os.chdir(current_dir) cfg = json.load(open(config_path, 'r')) n_cam = cfg['cam_num'] matlab_align = 0 if n_cam == 4: screen_img_path = 'D:\\data\\four_cam\\calibrate\\cam3-screen-1008' camera_img_path = 'D:\\data\\four_cam\\calibrate\\calibrate-1016' elif n_cam == 1: camera_img_path = 'D:\\data\\one_cam\\padtest1125\\test1\\' screen_img_path = 'D:\\data\\one_cam\\pad-test-1125\\test4\\calibration\\screen\\' print(f"开始执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("\n1. 机标定") preprocess_start = time.time() cam_para_path = os.path.join(current_dir, 'config', cfg['cam_params']) print('cam_para_path = ', cam_para_path) if os.path.exists(cam_para_path): #if False: with open(cam_para_path, 'rb') as pkl_file: cam_params = pickle.load(pkl_file) else: if n_cam == 4: cam_params = [] camera_subdir = [item for item in os.listdir(camera_img_path) if os.path.isdir(os.path.join(camera_img_path, item))] camera_subdir.sort() assert len(camera_subdir) == 4, f"found {len(camera_subdir)} cameras, should be 4" for i in range(n_cam): cam_img_path = glob.glob(os.path.join(camera_img_path, camera_subdir[i], "*.bmp")) cam_img_path.sort() cam_param_raw = calibrate_world_aprilgrid(cam_img_path, cfg['world_chessboard_size'], cfg['world_square_size'], cfg['world_square_spacing'], debug=1) cam_params.append(cam_param_raw) with open(cam_para_path, 'wb') as pkl_file: pickle.dump(cam_params, pkl_file) elif n_cam == 1: cam_params = [] cam_img_path = glob.glob(os.path.join(camera_img_path, "*.bmp")) cam_img_path.sort() if matlab_align: cfg['world_chessboard_size'] = [41, 40] cfg['world_square_size'] = 2 cam_param_raw = calibrate_world(cam_img_path, cfg['world_chessboard_size'], cfg['world_square_size'], debug=1) cam_params.append(cam_param_raw) with open(cam_para_path, 'wb') as pkl_file: pickle.dump(cam_params, pkl_file) print('raw cam_param = ', cam_params) print("\n2. 屏幕标定") screen_cal_start = time.time() screen_img_path = glob.glob(os.path.join(screen_img_path, "*.bmp")) screen_para_path = os.path.join('config', cfg['screen_params']) if os.path.exists(screen_para_path): #if False: with open(screen_para_path, 'rb') as pkl_file: screen_params = pickle.load(pkl_file) else: if n_cam == 3: screen_params = calibrate_screen_chessboard(screen_img_path, cam_params[3]['camera_matrix'], cam_params[3]['distortion_coefficients'], cfg['screen_chessboard_size'], cfg['screen_square_size'], debug=0) elif n_cam == 1: raw_cam_mtx = cam_params[0]['camera_matrix'] screen_params = calibrate_screen_circlegrid(screen_img_path, raw_cam_mtx, cam_params[0]['distortion_coefficients'], cfg['screen_chessboard_size'], cfg['screen_square_size'], debug=1) with open(screen_para_path, 'wb') as pkl_file: pickle.dump(screen_params, pkl_file) screen_cal_end = time.time() print(f" 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f" 耗时: {screen_cal_end - screen_cal_start:.2f} s") total_cloud_point, total_gradient = reconstruct_surface(cam_params, screen_params, config_path, img_folder) print("\n5. 后处理") post_process_start = time.time() total_cloud_point[:,0] = np.round(total_cloud_point[:,0]) total_cloud_point[:,1] = np.round(total_cloud_point[:,1]) #fitted_points = post_process(total_cloud_point, debug=0) clout_points = post_process_with_grad(img_folder, n_cam, 1) fitted_points = total_cloud_point align_fitted = fitted_points write_point_cloud(os.path.join(img_folder, 'cloudpoint.txt'), np.round(clout_points[:, 0]-np.mean(clout_points[:, 0])), np.round(clout_points[:, 1]-np.mean(clout_points[:, 1])), 1000 * clout_points[:,2]) print("\n6. 评估") eval_start = time.time() point_cloud_path = os.path.join(img_folder, 'cloudpoint.txt') json_path = os.path.join(img_folder, 'result.json') theta_notch = 0 get_eval_result(point_cloud_path, json_path, theta_notch, 1) eval_end = time.time() print(f" 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f" 耗时: {eval_end - eval_start:.2f} 秒") # new_cam_params, new_screen_params = optimize_parameters( # cam_params[0], # 假设单相系统 # screen_params, # total_cloud_point, # total_gradient # ) #print('after optimazation:', new_cam_params, new_screen_params) #total_cloud_point, total_gradient = reconstruct_surface(cam_params, screen_params, config_path) #print('cam_params = ', cam_params) #print('screen_params = ', screen_params) # print("\n3. 相位提取,相位展开") # phase_start = time.time() # scales = [0.995291, 0.993975, 0.993085, 0.994463] # scales = [1,1,1,1] # phase_end = time.time() # print(f" 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # print(f" 耗时: {phase_end - phase_start:.2f} 秒") # z_raw_xy = np.round(z_poisson[:, :2]).astype(int) # #创建布尔掩码,初始为 True # mask = np.ones(len(z_raw_xy), dtype=bool) # # 使用掩码过滤出非边界点 # non_boundary_points = z_poisson[mask] # z_raw_aligned, _ = align2ref(non_boundary_points) # #non_boundary_points = smoothed # write_point_cloud(os.path.join(img_folder, str(i) + '_cloudpoint.txt'), np.round(z_raw_aligned[:, 0]), np.round(z_raw_aligned[:, 1]), z_raw_aligned[:, 2]) # np.savetxt(os.path.join(img_folder, str(i) + '_gradient.txt'), gradient_xy, fmt='%.10f', delimiter=',') # total_cloud_point = np.vstack([total_cloud_point, np.column_stack((z_raw_aligned[:, 0], z_raw_aligned[:, 1], z_raw_aligned[:, 2]))]) # total_gradient = np.vstack([total_gradient, np.column_stack((gradient_xy[:, 0], gradient_xy[:, 1], gradient_xy[:, 2], gradient_xy[:, 3]))]) # get_point_end = time.time() # print(f" 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # print(f" 耗时: {get_point_end - get_point_start:.2f} 秒") # if 0: # fig = plt.figure() # ax = fig.add_subplot(111, projection='3d') # # 提取 x, y, z 坐标 # x_vals = np.round(fitted_points[:, 0]-np.mean(fitted_points[:, 0])) # y_vals = np.round(fitted_points[:, 1]-np.mean(fitted_points[:, 1])) # z_vals = align_fitted[:,2]-np.min(align_fitted[:,2]) # x_vals = fitted_points[:, 0] # y_vals = fitted_points[:, 1] # z_vals = fitted_points[:, 2] # # 绘制3D点云 # ax.scatter(x_vals, y_vals, z_vals, c=z_vals, cmap='viridis', marker='o') # # 设置轴标签和标题 # ax.set_xlabel('X (mm)') # ax.set_ylabel('Y (mm)') # ax.set_zlabel('Z (mm)') # ax.set_title('post 3D Point Cloud Visualization') # plt.show() # post_process_end = time.time() # print(f" 完成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # print(f" 耗时: {post_process_end - post_process_start:.2f} 秒") return True if __name__ == '__main__': #config_path = 'config\\cfg_3freq_wafer_1226.json' config_path = 'config\\cfg_3freq_wafer_1226.json' #config_path = 'config\\cfg_3freq_wafer_matlab.json' img_folder = 'D:\\data\\one_cam\\pad-test-1125\\test4\\pingjing\\' img_folder = 'D:\\data\\one_cam\\pad-test-1106\\1104-test-other\\' #img_folder = 'D:\\data\\one_cam\\pad-test-1125\\test7-4\\20241213183308114\\' #凸 #img_folder = 'D:\\data\\one_cam\\pad-test-1125\\test7-4\\20241213183451799\\' #凹 img_folder = 'D:\\data\\one_cam\\pad-test-1125\\test7-4\\20241213182959977\\' #平面镜 img_folder = 'D:\\data\\one_cam\\pad-test-1125\\test7-4\\20241219180143344\\' #平晶 #img_folder = 'D:\\data\\four_cam\\1223\\20241223134629640-0\\' #晶圆 #img_folder = 'D:\\data\\four_cam\\1223\\20241223135156305-1\\' #晶圆 #img_folder = 'D:\\data\\four_cam\\1223\\20241223135626457-2-0\\' #晶圆 img_folder = 'D:\\data\\four_cam\\1223\\20241223135935517-2-1\\' #晶圆 #img_folder = 'D:\\data\\four_cam\\1223\\20241223172437775-2-0\\' #晶圆 #img_folder = 'D:\\data\\four_cam\\1223\\20241223172712226-2-1\\' #晶圆 # img_folder = 'D:\\data\\four_cam\\1223\\20241223172931654-1\\' #晶圆 img_folder = 'D:\\data\\four_cam\\1223\\20241223173521117-0\\' #晶圆 #img_folder = 'D:\\data\\four_cam\\betone_1011\\20241011142901251-2\\' # img_folder = 'D:\\data\\one_cam\\1226image\\20241226142937478\\' #凹 # #img_folder = 'D:\\data\\one_cam\\1226image\\20241226143014962\\' #凸 # #img_folder = 'D:\\data\\one_cam\\1226image\\20241226143043070\\' #平面镜 # #img_folder = 'D:\\data\\one_cam\\1226image\\20241226142826690\\' #平晶 # img_folder = 'D:\\data\\one_cam\\1226image\\20241226143916513\\' #晶圆 # #img_folder = 'D:\\data\\one_cam\\1226image\\20241226144357273\\' # img_folder = 'D:\\data\\one_cam\\1226image\\20241226144616239\\' img_folder = 'D:\\data\\one_cam\\betone1230\\20241230110516029\\' img_folder = 'D:\\data\\one_cam\\betone1230\\20241230110826833\\' #2 #img_folder = 'D:\\data\\one_cam\\betone1230\\20241230111002224\\' #img_folder = 'D:\\data\\one_cam\\betone1230\\20250103151342402-pingjing\\' # img_folder = 'D:\\data\\one_cam\\betone1230\\20250102181845157-1\\' #img_folder = 'D:\\data\\one_cam\\betone1230\\20250102181648331-2\\' #img_folder = 'D:\\data\\one_cam\\betone1230\\20250102182008126-3\\' json_path = os.path.join(img_folder, 'result.json') pmdstart(config_path, img_folder)