You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python道路GPX发夹弯检测脚本的索引问题——末尾直线路段缺失导致距离统计不一致

Python道路GPX发夹弯检测脚本的索引问题——末尾直线路段缺失导致距离统计不一致

我现在有一个能处理同目录下GPX文件的脚本,它可以检测路线里的发夹弯,还能分别统计直线路段和弯道路段的长度、坡度数据并生成报告。

脚本的核心算法是通过向前搜索一段距离内的累计方位角变化来识别发夹弯:当在50米范围内,单侧累计方位角变化超过120度时,就判定为发夹弯。

目前脚本整体运行正常,但有个小问题:最后一段直线路段总是被漏掉,导致我在交叉验证总距离(GPX原始总长度、各段累计长度、弯段+直段长度之和)时,能明显看到数据不一致。

脚本自带打印报告和内部调试功能,对比不同方式计算的距离时,这个错误就很明显了:

import math
import os
import gpxpy

def calculate_distance(point1, point2):
    # Create gpxpy points for distance calculation
    p1 = gpxpy.gpx.GPXTrackPoint(point1['latitude'], point1['longitude'])
    p2 = gpxpy.gpx.GPXTrackPoint(point2['latitude'], point2['longitude'])
    return p1.distance_2d(p2)

def calculate_bearing(lat1, lon1, lat2, lon2):
    # Convert to radians
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    
    # Calculate bearing
    dlon = lon2 - lon1
    y = math.sin(dlon) * math.cos(lat2)
    x = math.cos(lat1) * math.sin(lat2) - math.sin(lat1) * math.cos(lat2) * math.cos(dlon)
    bearing = math.atan2(y, x)
    
    # Convert to degrees
    return math.degrees(bearing)

def calculate_angle_change(p1, p2, p3):
    # Calculate bearings for both segments
    bearing1 = calculate_bearing(p1['latitude'], p1['longitude'], 
                               p2['latitude'], p2['longitude'])
    bearing2 = calculate_bearing(p2['latitude'], p2['longitude'], 
                               p3['latitude'], p3['longitude'])
    
    # Calculate the change in bearing
    angle_change = bearing2 - bearing1
    
    # Normalize to -180 to +180
    if angle_change > 180:
        angle_change -= 360
    elif angle_change < -180:
        angle_change += 360
    
    return angle_change  # Positive for right turns, negative for left turns

def combine_segments_by_curvature(points, curve_angle_threshold=120, look_ahead_distance=50):
    print("Starting curve detection...")
    combined = []
    i = 0
    straight_points = [points[0]]  # First point is included
    
    while i < len(points) - 2:
        print(f"Processing point {i} of {len(points)}")
        
        look_ahead_points = []
        cumulative_distance = 0
        cumulative_angle = 0
        last_angle_sign = 0
        j = i + 1
        
        while j < len(points) - 2 and cumulative_distance < look_ahead_distance:
            angle = calculate_angle_change(points[j-1], points[j], points[j+1])
            
            if last_angle_sign == 0:
                cumulative_angle = abs(angle)
                last_angle_sign = 1 if angle > 0 else -1
            elif (angle > 0 and last_angle_sign > 0) or (angle < 0 and last_angle_sign < 0):
                cumulative_angle += abs(angle)
            else:
                cumulative_angle = abs(angle)
                last_angle_sign = 1 if angle > 0 else -1
            
            look_ahead_points.append(points[j])
            cumulative_distance += points[j]['distance']
            
            if cumulative_angle >= curve_angle_threshold:
                # Found a curve - add any collected straight points first
                if straight_points:
                    # Include the distance TO the first point of the curve
                    total_distance = sum(p['distance'] for p in straight_points[1:])
                    straight_segment = {
                        'start_point': straight_points[0],
                        'end_point': straight_points[-1],
                        'distance': total_distance,
                        'elevation_change': straight_points[-1]['elevation'] - straight_points[0]['elevation'],
                        'points': straight_points.copy(),
                        'type': 'straight',
                        'gradient': ((straight_points[-1]['elevation'] - straight_points[0]['elevation']) / 
                                   total_distance * 100) if total_distance > 0 else 0
                    }
                    combined.append(straight_segment)
                    straight_points = []
                
                # Add the curve segment - include ALL points from i to j inclusive
                curve_points = []
                for k in range(i, j + 1):  # Include point j
                    curve_points.append(points[k])
                
                # Calculate curve distance including the distance TO point j
                curve_distance = sum(points[k]['distance'] for k in range(i + 1, j + 1))
                
                curve_segment = {
                    'start_point': points[i],
                    'end_point': points[j],
                    'distance': curve_distance,
                    'elevation_change': points[j]['elevation'] - points[i]['elevation'],
                    'points': curve_points,
                    'type': 'curve',
                    'cumulative_angle': cumulative_angle,
                    'turn_direction': 'right' if last_angle_sign > 0 else 'left',
                    'gradient': ((points[j]['elevation'] - points[i]['elevation']) / 
                               curve_distance * 100) if curve_distance > 0 else 0
                }
                combined.append(curve_segment)
                i = j
                straight_points = [points[j]]  # Start new straight section from end of curve
                break
            
            j += 1
        
        if cumulative_angle < curve_angle_threshold:
            if i < len(points) - 1:
                straight_points.append(points[i+1])
            i += 1
    
    # Add any remaining straight points as final segment
    if straight_points:
        total_distance = sum(p['distance'] for p in straight_points[1:])
        if total_distance > 0:
            straight_segment = {
                'start_point': straight_points[0],
                'end_point': straight_points[-1],
                'distance': total_distance,
                'elevation_change': straight_points[-1]['elevation'] - straight_points[0]['elevation'],
                'points': straight_points,
                'type': 'straight',
                'gradient': ((straight_points[-1]['elevation'] - straight_points[0]['elevation']) / 
                           total_distance * 100) if total_distance > 0 else 0
            }
            combined.append(straight_segment)
    
    print("Finished curve detection")
    return combined
    
def analyze_gpx(gpx_file):
    with open(gpx_file, 'r') as f:
        gpx = gpxpy.parse(f)
    
    # Get the true track length
    true_length_2d = gpx.length_2d()
    
    # Process points and calculate distances between them
    points = []
    
    for track in gpx.tracks:
        for segment in track.segments:
            # First, collect all points without distances
            track_points = []
            for point in segment.points:
                track_points.append({
                    'latitude': point.latitude,
                    'longitude': point.longitude,
                    'elevation': point.elevation,
                    'distance': 0
                })
            
            # Then calculate distances, starting from the first point
            for i in range(len(track_points)):
                if i > 0:  # For all points except the first
                    p1 = gpxpy.gpx.GPXTrackPoint(
                        track_points[i-1]['latitude'], 
                        track_points[i-1]['longitude'], 
                        elevation=track_points[i-1]['elevation']
                    )
                    p2 = gpxpy.gpx.GPXTrackPoint(
                        track_points[i]['latitude'], 
                        track_points[i]['longitude'], 
                        elevation=track_points[i]['elevation']
                    )
                    track_points[i]['distance'] = p1.distance_2d(p2)
            
            points.extend(track_points)

    # Continue with segment analysis...
    segments = combine_segments_by_curvature(points)
    
    # Calculate overall statistics
    curve_segments = [s for s in segments if s['type'] == 'curve']
    straight_segments = [s for s in segments if s['type'] == 'straight']
    
    curve_distance = sum(s['distance'] for s in curve_segments)
    straight_distance = sum(s['distance'] for s in straight_segments)

    # Verify distances
    calculated_total = sum(p['distance'] for p in points)
    print(f"Check: Curve distances and straight distances should match total length: {curve_distance + straight_distance:.1f} m")
    print(f"Check: The distance diff. from cum. segments and gpx length is: {calculated_total:.1f} m, {true_length_2d:.1f} m\n")
    if abs(calculated_total - true_length_2d) > 1:  # Allow 1m difference for rounding
        print(f"Warning: Distance mismatch!")
        print(f"  True GPX length: {true_length_2d:.1f} m")
        print(f"  Calculated total: {calculated_total:.1f} m")
        print(f"  Difference: {abs(true_length_2d - calculated_total):.1f} m")
    
    return {
        'filename': os.path.basename(gpx_file),
        'total_distance': true_length_2d,  # Use 2D length
        'curve_distance': curve_distance,
        'straight_distance': straight_distance,
        'curve_percentage': (curve_distance / true_length_2d * 100) if true_length_2d > 0 else 0,
        # 原内容此处存在粘贴错误,已修正
    }

备注:内容来源于stack exchange,提问作者Kay

火山引擎 最新活动