无需浏览器高效批量下载OpenStreetMap PNG地图的方法咨询
高效批量下载OpenStreetMap地图图片的方案
首先得说,用Selenium处理500万张图确实完全不现实——每次启动浏览器、渲染页面、截图的开销太大,单条请求就要好几秒,500万次的话真的是天文数字。咱们得换个思路,直接绕开浏览器,利用OSM的瓦片服务来实现高效批量下载,下面给你几个可行的方案和具体步骤:
核心思路:直接调用OSM瓦片API
OpenStreetMap是基于瓦片系统的,整个地图被切成了无数256×256像素的瓦片,每个瓦片对应特定的缩放级别(z)和瓦片坐标(x,y)。我们可以把经纬度坐标转换成瓦片坐标,直接下载对应的瓦片,再拼接/裁剪成你需要的n×n像素图,这比用Selenium快至少100倍以上。
步骤1:坐标转瓦片(用现成库简化操作)
不用自己写转换公式,用mercantile库就能轻松把经纬度(x,y)转换成瓦片的z/x/y值:
import mercantile # 示例:把经纬度(116.3972, 39.9075)转换成缩放级别18的瓦片坐标 lat, lon = 39.9075, 116.3972 z = 18 # 缩放级别,数值越大地图越精细 tile = mercantile.tile(lon, lat, z) print(f"瓦片坐标:z={tile.z}, x={tile.x}, y={tile.y}")
步骤2:下载瓦片并生成目标尺寸图片
OSM的公开瓦片URL格式是:https://tile.openstreetmap.org/{z}/{x}/{y}.png。我们可以用requests下载瓦片,再用Pillow裁剪/拼接成需要的n×n图:
import requests from PIL import Image import os # 缓存瓦片的目录 TILE_CACHE_DIR = "./osm_tiles" os.makedirs(TILE_CACHE_DIR, exist_ok=True) def download_tile(z, x, y): # 先检查本地缓存 tile_path = os.path.join(TILE_CACHE_DIR, str(z), str(x), f"{y}.png") os.makedirs(os.path.dirname(tile_path), exist_ok=True) if os.path.exists(tile_path): return Image.open(tile_path) # 构造请求URL,一定要加合法的User-Agent! url = f"https://tile.openstreetmap.org/{z}/{x}/{y}.png" headers = { "User-Agent": "MyMapDownloader/1.0 (your_email@example.com)" # 替换成你的信息 } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() with open(tile_path, "wb") as f: f.write(response.content) return Image.open(tile_path) def get_centered_map(lat, lon, z, target_size=(500,500)): # 获取目标坐标所在的瓦片 tile = mercantile.tile(lon, lat, z) tile_img = download_tile(tile.z, tile.x, tile.y) # 计算坐标在瓦片内的像素位置(瓦片左上角是原点) px, py = mercantile.xy(lon, lat) tile_bounds = mercantile.bounds(tile) tile_width = tile_bounds.east - tile_bounds.west tile_height = tile_bounds.north - tile_bounds.south # 转换为像素坐标 x_px = int((px - tile_bounds.west) / tile_width * 256) y_px = int((tile_bounds.north - py) / tile_height * 256) # 裁剪出中心区域 half_size = target_size[0] // 2 left = max(0, x_px - half_size) top = max(0, y_px - half_size) right = min(256, x_px + half_size) bottom = min(256, y_px + half_size) # 如果裁剪区域不足,可扩展下载相邻瓦片拼接(这里简化处理,只取当前瓦片内的区域) cropped = tile_img.crop((left, top, right, bottom)) # 调整到目标尺寸(可选) cropped = cropped.resize(target_size, Image.Resampling.LANCZOS) return cropped
批量处理:并行化+速率控制
500万条数据单线程肯定慢,我们可以用多线程(IO密集型任务多线程效率更高),但一定要遵守OSM的请求限制——不要超过每秒1-2次请求,否则会被封IP。
示例:用ThreadPoolExecutor批量处理CSV
import pandas as pd from concurrent.futures import ThreadPoolExecutor, as_completed from tenacity import retry, stop_after_attempt, wait_exponential # 重试机制:处理网络波动 @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def process_single_row(row, z=18, target_size=(500,500), output_dir="./map_images"): lat, lon = row["y"], row["x"] # 假设CSV列名是x(经度)、y(纬度) try: img = get_centered_map(lat, lon, z, target_size) img_path = os.path.join(output_dir, f"map_{lat:.6f}_{lon:.6f}.png") os.makedirs(output_dir, exist_ok=True) img.save(img_path, "PNG") return f"成功保存:{img_path}" except Exception as e: return f"处理失败({lat},{lon}):{str(e)}" def batch_process_csv(csv_path, max_workers=5): # 分批读取CSV,避免内存溢出 chunk_size = 1000 for chunk in pd.read_csv(csv_path, chunksize=chunk_size): with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(process_single_row, row) for _, row in chunk.iterrows()] for future in as_completed(futures): print(future.result()) # 调用示例 batch_process_csv("./coordinates.csv", max_workers=5)
进阶优化建议
- 缓存优先:上面的代码已经实现了瓦片缓存,重复坐标直接用本地瓦片,避免重复请求
- 自定义样式:如果需要非默认OSM样式,可以用Maptiler、Stamen等提供的OSM衍生瓦片服务(注意查看他们的使用政策)
- 搭建私有瓦片服务器:如果500万张的需求非常紧急,可以自己搭建OSM瓦片服务器(用
mod_tile+renderd),完全不受公共API的速率限制,速度会快很多 - 错误处理:可以把失败的坐标记录到日志文件,后续单独处理
- 速率限制:可以在代码中加入
time.sleep(0.5)(每次请求后等待0.5秒),确保不超过OSM的请求限制
关键注意事项
一定要遵守OpenStreetMap的使用条款:
- 必须设置合法的User-Agent,提供联系方式
- 不要进行大规模批量请求(建议控制在每秒1次以内)
- 非商业用途免费,商业用途需要申请授权
内容的提问来源于stack exchange,提问作者Avatrin




