以下是使用并行方式执行DynamoDB查询的示例代码:
import boto3
from botocore.exceptions import NoCredentialsError
from multiprocessing import Pool
def query_items(table_name, key_name, keys):
try:
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(table_name)
response = table.batch_get_item(
RequestItems={
table_name: {
'Keys': keys,
'AttributesToGet': [key_name] # optional
}
}
)
return response['Responses'][table_name]
except NoCredentialsError:
print("AWS credentials not found")
if __name__ == "__main__":
table_name = 'your_table_name'
key_name = 'your_key_name'
keys = [{'your_key_name': 'your_key_value1'}, {'your_key_name': 'your_key_value2'}, ...]
# Number of parallel threads/processes
num_processes = 4
# Split the keys into chunks for parallel processing
chunk_size = len(keys) // num_processes
key_chunks = [keys[i:i+chunk_size] for i in range(0, len(keys), chunk_size)]
# Create a pool of processes
pool = Pool(processes=num_processes)
results = []
# Submit each chunk of keys to the pool for parallel processing
for key_chunk in key_chunks:
result = pool.apply_async(query_items, (table_name, key_name, key_chunk))
results.append(result)
# Wait for all processes to complete
pool.close()
pool.join()
# Collect the results from all processes
all_responses = []
for result in results:
all_responses.extend(result.get())
# Process the combined responses
for response in all_responses:
print(response)
请注意,上述代码示例使用了Python的multiprocessing.Pool
来实现并行处理。首先,将要查询的key按照要启动的并行线程数进行划分,然后将每个子集分配给不同的进程进行查询。最后,收集并处理所有进程的结果。