-
Notifications
You must be signed in to change notification settings - Fork 154
Description
from Bio import SeqIO
from Bio import Entrez
import re
import io
import hashlib
import urllib.request
import urllib.error
import time
from itertools import chain
自定义异常类,用于区分服务器错误
class ServerException(Exception):
pass
class DownloadError(Exception):
"""基类,用于所有下载相关的错误。"""
pass
class ProteinDownloadError(DownloadError):
"""特定于蛋白质序列下载的错误。"""
pass
class CDSDownloadError(DownloadError):
"""特定于CDS序列下载的错误。"""
pass
class GenomicDownloadError(DownloadError):
"""特定于基因组序列下载的错误。"""
pass
重试请求函数
def retry_request(request_func, backoff, timeout, retries=3):
for _ in range(retries):
try:
handle = request_func
return handle
except urllib.error.HTTPError as e:
if e.code == 500:
time.sleep(backoff)
else:
raise ServerException(f"HTTP Error: {e}")
raise ServerException("Max retries exceeded for HTTP request")
添加延迟函数
def add_delay(seconds):
time.sleep(seconds)
定义拉丁学名
organisms = {
"Panax ginseng": "txid4054",
" Hevea brasiliensis": "txid3981",
"Robinia pseudoacacia": "txid35938",
"Pueraria lobata": "txid3893",
"Salvinia natans": "txid42333",
"Albizia julibrissin": "txid3813",
"Cercis chinensis": "txid161750",
}
定义输出文件名
protein_output_filename = "proteins.fasta"
cds_output_filename = "cds.fasta"
genomic_output_filename = "genomic.fasta"
Entrez.email = "[email protected]"
去重集合
seen_sequences = set()
搜索并保存protein序列到FASTA文件
def fetch_and_save_protein_sequences(organism, protein_output_filename):
# 获取esearch的HTTP响应对象
esearch_func = retry_request(lambda: Entrez.esearch(db="protein", term=organism, usehistory="y"), backoff=2, timeout=10)
esearch_response = esearch_func() # 调用函数获取HTTP响应对象
# 解析HTTP响应内容
esearch_record = Entrez.read(esearch_response) # 使用Entrez.read来解析XML文件
# 关闭HTTP响应对象
esearch_response.close()
add_delay(1)
# 直接获取IdList
seq_ids = esearch_record["IdList"] # 获取"IdList"键对应的值
# 打开输出文件
with open(protein_output_filename, "w") as protein_output_file:
for seq_id in seq_ids:
try:
# 获取efetch的HTTP响应对象
efetch_func = retry_request(
lambda: Entrez.efetch(db="protein", id=seq_id, rettype="fasta", retmode="text"), backoff=2,
timeout=10)
efetch_response = efetch_func() # 调用函数获取HTTP响应对象
# 读取efetch响应内容并写入文件
protein_output_file.write(efetch_response.read())
# 关闭efetch响应对象
efetch_response.close()
except urllib.error.URLError as e:
print(f"URL error for sequence ID {seq_id}: {e.reason}")
raise ProteinDownloadError(f"Failed to download protein sequence with ID {seq_id}")
except Exception as e:
print(f"An unexpected error occurred for sequence ID {seq_id}: {e}")
raise DownloadError(f"An unexpected error occurred for sequence ID {seq_id}")
print(f"All protein sequences for {organism} have been downloaded and saved to {protein_output_filename}.")
搜索并保存CDS序列到FASTA文件
def fetch_and_save_cds_sequences(organism, cds_output_filename):
# 使用esearch搜索指定物种的mRNA序列
esearch_func = retry_request(lambda: Entrez.esearch(db="nucleotide", term=f"{organism}[mRNA]", usehistory="y"), backoff=2, timeout=10)
esearch_response = esearch_func()
esearch_result = Entrez.read(esearch_response) # 读取esearch响应结果
esearch_response.close()
add_delay(1)
# 直接获取IdList
seq_ids = esearch_result["IdList"]
# 打开输出文件以写入CDS序列
with open(cds_output_filename, "w") as cds_output_file:
for seq_id in seq_ids:
try:
# 获取efetch的HTTP响应对象
efetch_func = retry_request(
lambda: Entrez.efetch(db="nucleotide", id=seq_id, rettype="gb", retmode="text"), backoff=2,
timeout=10)
efetch_response = efetch_func()
gb_record = SeqIO.read(io.BytesIO(efetch_response.read()), "genbank") # 读取GenBank记录
# 遍历特征并提取CDS序列
for feature in gb_record.features:
if feature.type == "CDS":
cds_seq = feature.location.extract(gb_record.seq)
clean_sequence = re.sub(r"[^A-Za-z*]", "", str(cds_seq))
sequence_hash = hashlib.md5(clean_sequence.encode('utf-8')).hexdigest()
# 检查序列是否已存在
if sequence_hash not in seen_sequences:
# 如果序列是唯一的,将其写入文件并添加到去重集合
cds_output_file.write(
f">CDS_{feature.location.start}..{feature.location.end} {gb_record.id}\n")
cds_output_file.write(str(cds_seq) + "\n")
seen_sequences.add(sequence_hash)
else:
print(f"Skipping duplicate sequence with ID {seq_id}.")
efetch_response.close()
except urllib.error.URLError as e:
print(f"URL error for sequence ID {seq_id}: {e.reason}")
except Exception as e:
print(f"An error occurred for sequence ID {seq_id}: {e}")
print(f"All CDS sequences for {organism} have been downloaded and saved to {cds_output_filename}.")
定义一个函数用于获取并保存基因组序列
def fetch_and_save_genomic_sequences(organism, genomic_output_filename):
# 使用esearch搜索指定物种的基因组序列
esearch_func = retry_request(lambda: Entrez.esearch(db="nucleotide", term=f"{organism}[Genomic]", usehistory="y"), backoff=2, timeout=10)
esearch_response = esearch_func()
esearch_result = Entrez.read(esearch_response) # 读取esearch响应结果
esearch_response.close()
add_delay(1)
# 直接获取IdList
seq_ids = esearch_result["IdList"]
# 打开输出文件以写入基因组序列
with open(genomic_output_filename, "w") as genomic_output_file:
for seq_id in seq_ids:
try:
# 获取efetch的HTTP响应对象
efetch_func = retry_request(
lambda: Entrez.efetch(db="nucleotide", id=seq_id, rettype="gb", retmode="text"), backoff=2,
timeout=10)
efetch_response = efetch_func()
genomic_record = SeqIO.read(io.BytesIO(efetch_response.read()), "genbank") # 读取GenBank记录
# 写入基因组序列
genomic_output_file.write(str(genomic_record.seq) + "\n")
efetch_response.close()
except urllib.error.URLError as e:
print(f"URL error for sequence ID {seq_id}: {e.reason}")
except Exception as e:
print(f"An error occurred for sequence ID {seq_id}: {e}")
print(f"All genomic sequences for {organism} have been downloaded and saved to {genomic_output_filename}.")
调用函数获取并保存蛋白质、CDS和基因组序列
for organism, taxid in organisms.items():
fetch_and_save_protein_sequences(organism, protein_output_filename)
fetch_and_save_cds_sequences(organism, cds_output_filename)
fetch_and_save_genomic_sequences(organism, genomic_output_filename)
调用函数获取并保存蛋白质、CDS和基因组序列
for organism, taxid in organisms.items():
fetch_and_save_protein_sequences(organism, protein_output_filename)
fetch_and_save_cds_sequences(organism, cds_output_filename)
fetch_and_save_genomic_sequences(organism, genomic_output_filename)
定义筛选匹配序列的函数
def find_matching_sequences(input_filename, btw, numc, seen_sequences):
"""
从给定的FASTA文件中筛选出符合特定条件的序列,并进行去重。
"""
with open(input_filename, "r") as input_file:
for record in SeqIO.parse(input_file, "fasta"):
sequence = str(record.seq) # 序列转换为字符串
c_positions = [m.start() for m in re.finditer('C', sequence)] # 找到所有'C'的位置
if len(c_positions) >= numc:
valid_pairs = []
for i in range(len(c_positions) - 1):
if c_positions[i + 1] - c_positions[i] <= btw:
valid_pairs.append((c_positions[i], c_positions[i + 1]))
consecutive_valid_pairs = 0
for i in range(len(valid_pairs) - 1):
if valid_pairs[i][1] == valid_pairs[i + 1][0] - 1: # 检查是否连续
consecutive_valid_pairs += 1
if consecutive_valid_pairs >= numc - 1: # 检查连续对的数量
# 使用SHA-256哈希算法
sequence_hash = hashlib.sha256(sequence.encode('utf-8')).hexdigest()
if sequence_hash not in seen_sequences:
yield record # 生成满足条件的序列
seen_sequences.add(sequence_hash) # 添加到已见集合
break # 找到后退出循环
else:
consecutive_valid_pairs = 0 # 重置计数器
将匹配序列写入文件的函数
def write_matching_sequences_to_file(matching_sequences_generator, output_file):
"""
将筛选出的匹配序列写入到指定的FASTA文件中。
参数:
matching_sequences_generator (迭代器): 包含匹配序列的生成器。
output_file (str): 输出文件的路径。
"""
with open(output_file, "w") as output_handle:
SeqIO.write(matching_sequences_generator, output_handle, "fasta")
设置筛选参数的范围
btw_range = range(20, 100) # btw的取值范围
numc_values = [3, 4, 5] # numc的取值
清空去重集合
seen_sequences.clear()
定义最终结果文件名
final_result_filename = "8.5_result.fasta"
初始化一个空的生成器列表
all_matching_sequences = []
对于每个btw和numc的组合,找到匹配的序列
for btw in btw_range:
for numc in numc_values:
# 找到匹配的序列并添加到生成器列表中
matching_sequences_proteins = find_matching_sequences(protein_output_filename, btw, numc, seen_sequences)
matching_sequences_cds = find_matching_sequences(cds_output_filename, btw, numc, seen_sequences)
matching_sequences_genomic = find_matching_sequences(genomic_output_filename, btw, numc, seen_sequences)
# 将所有匹配的序列合并到一个生成器中
all_matching_sequences.extend([matching_sequences_proteins, matching_sequences_cds, matching_sequences_genomic])
调用函数,传递生成器列表而不是单个生成器
write_matching_sequences_to_file(chain(*all_matching_sequences), final_result_filename)
打印完成消息
print(f"All matching sequences have been saved in {final_result_filename}")