import numpy as np import json, os, shutil, bisect, subprocess, sys, time from geopy.distance import geodesic from netCDF4 import Dataset from osgeo import gdal # 获取当前文件的父目录的绝对路径 current_dir = os.path.dirname(os.path.abspath(__file__)) # 获取父目录的绝对路径 parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir)) # 将父目录添加到 Python 模块搜索路径中 sys.path.append(parent_dir) from Utils import utils as globalUtils # 求2个数组的交集 def intersection(list1, list2): set1 = set(list1) set2 = set(list2) return list(set1 & set2) # 读取一个文件的内容,文件的格式是 “10,20,30.....” def readValueListFromTxt(filePath): valueList = [] with open(filePath, "r") as file: for line in file: value = [float(x) for x in line.split(",")] valueList += value return valueList # 测试矩阵 def matriTest(): # 两个示例数组 array1 = [1, 2, 3] array2 = [4, 5, 6] # 将数组转换为NumPy数组 array1_np = np.array(array1) array2_np = np.array(array2) # 创建矩阵,其中每一行分别是数组1和数组2 matrix = np.vstack((array1_np, array2_np)) print("Matrix:") print(matrix) pass # 计算一个数组中与目标值差值绝对值最小的值 def calClosestValue(target, arrList: list): # 定义数组和目标值 arr = np.array(arrList) # 计算数组中每个元素与目标值的绝对差值 differences = np.abs(arr - target) # 找到差值最小的元素的索引 minDifferenceIndex = np.argmin(differences) # 获取差值最小的元素 closestValue = arr[minDifferenceIndex] return closestValue # 找到最近的值 def searchNearest(sourceList: list, computeList: list) -> list: closestValueList = [] for v in sourceList: closestValueList.append(calClosestValue(v, computeList)) return closestValueList # 计算2个点的地理距离 def calDistance(pointA, pointB) -> float: return geodesic(pointA, pointB).miles # 使用geodesic计算距离 # 读取nc文件内容 def readNC(ncFilePath: str): print(os.getcwd()) print(ncFilePath) # 打开 .nc 文件,'r 表示只读 ncData = Dataset(ncFilePath, "r") # 查看文件的维度 print("Dimensions:", ncData.dimensions) # 查看文件的变量 print("Variables:", list(ncData.variables.keys())) # # 获取所有的变量名称 # variables = ncData.variables.keys() # # 根据变量名称获取数据 longitude = ncData.variables["longitude"][:] latitude = ncData.variables["latitude"][:] print("longitude ", len(longitude)) print("latitude ", len(latitude)) print("tco3 ", len(ncData.variables["tco3"][:])) print("tcw ", len(ncData.variables["tcw"][:])) tco3Matrix = ncData.variables["tco3"][0] # 是 721 个元素的数组 print(len(tco3Matrix)) # 每个元素是 1440 长度的数组 print(len(tco3Matrix[0])) tcwMatrix = ncData.variables["tcw"][0] # 是 721 个元素的数组 print(len(tcwMatrix[0])) # 每个元素是 1440 长度的数组 print(len(tcwMatrix[0])) # 获取特定变量的值 # variable_name = 'temperature' # variable_values = nc_file.variables[variable_name][:] # 这里假设 'temperature' 是文件中的一个变量名 # 关闭 .nc 文件 ncData.close() return list(longitude), list(latitude), tcwMatrix, tco3Matrix # 读取tif文件内容 def readTif(tifFilePath: str): dataset = gdal.Open(tifFilePath) band = dataset.GetRasterBand(1) if dataset is not None: # 获取tif文件的投影和变换信息,用于将像素位置转化为地理坐标 print("Projection: ", dataset.GetProjection()) geotransform = dataset.GetGeoTransform() print("GeoTransform: ", geotransform) # 对第一波段的数据进行读取(这里假定大气数据在第一波段) band = dataset.GetRasterBand(1) aodMatrix = band.ReadAsArray() nrows, ncols = aodMatrix.shape # 计算每一个像素对应的经纬度 longitude_left_upper = geotransform[0] latitude_left_upper = geotransform[3] # 纬度是-90 到 90 if latitude_left_upper > 90: latitude_left_upper = 90 x_resolution = geotransform[1] # x像素分辨率 y_resolution = geotransform[5] # y像素分辨率(这个值通常为负) # 根据左上角点坐标以及分辨率计算其他点坐标 lon_list = [longitude_left_upper + i * x_resolution for i in range(ncols)] lat_list = [latitude_left_upper + i * y_resolution for i in range(nrows)] print("lon_list", len(lon_list)) print("lat_list", len(lat_list)) # 在此可以根据计算得到的经纬度信息对大气数据进行处理 # 例如简单地输出数据的形状和基本统计信息 # print('Array shape: ', aodMatrix.shape) # print('Min value: ', array.min()) # print('Max value: ', array.max()) # print('Mean value: ', array.mean()) return lon_list, lat_list, aodMatrix else: print("Could not open the tif file!") pass # 读取地表文件中的经纬度 def readSurfaceCoordinate( latFilePath: str, lonFilePath: str, saFilePath: str, szFilePath: str, vaFilePath: str, vzFilePath: str, ): latList = readValueListFromTxt(latFilePath) lonList = readValueListFromTxt(lonFilePath) szList = readValueListFromTxt(szFilePath) vzList = readValueListFromTxt(vzFilePath) saList = readValueListFromTxt(saFilePath) vaList = readValueListFromTxt(vaFilePath) return lonList, latList, szList, vzList, saList, vaList # 初始化输入参数文件 def initInputFiles(projectSurfaceDir: str, projectTOADir: str) -> bool: # 地表的经纬度文件 surFileList = [ "input_surface.txt", "Lat.txt", "Lon.txt", "SA.txt", "SZ.txt", "VA.txt", "VZ.txt", ] surFilePathList = [os.path.join(projectSurfaceDir, x) for x in surFileList] # 大气的参数、nc和tif文件 toaFileList = ["input_toa.txt", "input.nc", "input.tif"] toaFilePathList = [os.path.join(projectTOADir, x) for x in toaFileList] # 判断这些文件是否存在,如果有1个不存在进行报错 allInputFileList = surFilePathList + toaFilePathList for inputFile in allInputFileList: if not os.path.exists(inputFile): print(f"{inputFile} is not exist!") return False # 本地input目录 inputDir = "input" if not globalUtils.rmDir(inputDir): return False if not globalUtils.mkDir(inputDir): return False # 拷贝参数文件到目录下 for inputFile in allInputFileList: if not globalUtils.copyFile( inputFile, os.path.join(inputDir, os.path.basename(inputFile)) ): return False return True # 获取target在arr中的最接近左值和右值,arr是正序排序 def getLRValue(target, arr: list): # 使用bisect_left找到该数值应当插入的位置以维持有序 index = bisect.bisect_left(arr, target) # 左邻近值 leftValue = arr[index - 1] if index > 0 else None # 右邻近值 rightValue = arr[index] if index < len(arr) else None return leftValue, rightValue # 获取一个点在网格中的最接近点(网格的顶点) # 注意左值和右值可能是None def getNearestPoint(targetPoint, lonList, latList): lonList = sorted([x for x in lonList if x is not None]) latList = sorted([x for x in latList if x is not None]) pointList = [(lat, lon) for lat in latList for lon in lonList] # print(pointList) if len(pointList) == 0: return None, None minPoint = None minDistance = None for point in pointList: # print(targetPoint, point) distance = calDistance(targetPoint, point) if not minDistance or distance < minDistance: minDistance = distance minPoint = point return minPoint, minDistance def readInputSurface(inputSurfaceFilePath: str): # 打开文件 with open(inputSurfaceFilePath, "r") as file: # 使用readlines()方法读取所有行,并将结果存储在lines列表中 lines = file.readlines() # 检查文件是否至少有两行 if len(lines) >= 2: # 选择第二行(索引为1) secondLine = lines[1] splitList = secondLine.split(",") if len(splitList) == 3: month, wavestart, waveend = (int(x) for x in tuple(splitList)) # print("第二行内容:", secondLine) # print(month, wavestart, waveend) return month, wavestart, waveend return None, None, None # return month, wavestart, waveend def readInputTOA(inputTOAFilePath: str): # return aodmodel, atmodel # 打开文件 with open(inputTOAFilePath, "r") as file: # 使用readlines()方法读取所有行,并将结果存储在lines列表中 lines = file.readlines() # 检查文件是否至少有两行 if len(lines) >= 2: # 选择第二行(索引为1) secondLine = lines[1] splitList = [x for x in secondLine.split(" ") if x.strip()] print(splitList) if len(splitList) == 2: aodmodel, atmmodel = (int(x) for x in tuple(splitList)) # print("第二行内容:", secondLine) # print(month, wavestart, waveend) return aodmodel, atmmodel return None, None # 大气每次运行需要一个地表参数文件 def writeSurfaceInputFile( filePath: str, month: int, lon: float, lat: float, sz: float, vz: float, sa: float, va: float, wavestart: int, waveend: int, ): with open(filePath, "w") as file: file.write("month,lon,lat,sz,vz,sa,va,wavestart,waveend\n") file.write(f"{month},{lon},{lat},{sz},{vz},{sa},{va},{wavestart},{waveend}") return filePath # 大气每次运行需要一个地表参数文件 def writeTOAInputFile( filePath: str, sz: float, vz: float, sa: float, va: float, wavestart: int, waveend: int, aodmodel: int, atmmodel: int, aod: float, wv: float, ozone: float, inputpath: str, ): with open(filePath, "w") as file: file.write( "sz vz sa va wavestart waveend aodmodel atmmodel aod wv ozone inputpath\n" ) file.write( f"{sz} {vz} {sa} {va} {wavestart} {waveend} {aodmodel} {atmmodel} {aod} {wv/10} {ozone} {inputpath}" ) return filePath # 执行一次求解 def executeCompute(): # 先删除旧的output.txt globalUtils.rmPath("TOASimu.txt") # 启动求解任务 # result = subprocess.run(['AllSensorSimu.exe']) process = subprocess.Popen(["AllSensorSimu.exe"]) # 等待2秒后关闭 while 1: time.sleep(10) if globalUtils.isFileOpen("TOASimu.txt"): # 正在被其它程序写入,所以等待 print("writting TOASimu.txt") else: if process.poll() is None: # 如果进程还在运行,就终止它 process.kill() print("write TOASimu.txt over!") break # 将一个数组归一化,非排序的数组,每个值都缩小到 [0,1] def normalized(valueList: list): arr = np.array(valueList) return (arr - np.min(arr)) / (np.max(arr) - np.min(arr)) # 每个元素包含3个值,分别对应rgb三个颜色 # valueList: [(value445: float, value565: float, value670: float)] def normalizedColor(valueList: list): # 第1步,归一化 value445NormalizedList = list(normalized([float(x[0]) for x in valueList])) value565NormalizedList = list(normalized([float(x[1]) for x in valueList])) value670NormalizedList = list(normalized([float(x[2]) for x in valueList])) # print(value445NormalizedList) # 第二步,归一化的值转化为255的16进制 valueRList = [ hex(int(x * 255))[2:].zfill(2) if not np.isnan(x) else "ff" for x in value445NormalizedList ] # print(valueRList) # print(hex(int(value445NormalizedList[0] * 255))[2:].zfill(2)) valueGList = [ hex(int(x * 255))[2:].zfill(2) if not np.isnan(x) else "ff" for x in value565NormalizedList ] valueBList = [ hex(int(x * 255))[2:].zfill(2) if not np.isnan(x) else "ff" for x in value670NormalizedList ] return ["#" + "".join(item) for item in zip(valueRList, valueGList, valueBList)]