Skip to content

SHA-256 issues #10

@Zella0828

Description

@Zella0828

from Bio import SeqIO
from Bio import Entrez
import re
import io
import hashlib
import urllib.request
import urllib.error
import time
from itertools import chain

自定义异常类,用于区分服务器错误

class ServerException(Exception):
pass

class DownloadError(Exception):
"""基类,用于所有下载相关的错误。"""
pass

class ProteinDownloadError(DownloadError):
"""特定于蛋白质序列下载的错误。"""
pass

class CDSDownloadError(DownloadError):
"""特定于CDS序列下载的错误。"""
pass

class GenomicDownloadError(DownloadError):
"""特定于基因组序列下载的错误。"""
pass

重试请求函数

def retry_request(request_func, backoff, timeout, retries=3):
for _ in range(retries):
try:
handle = request_func
return handle
except urllib.error.HTTPError as e:
if e.code == 500:
time.sleep(backoff)
else:
raise ServerException(f"HTTP Error: {e}")
raise ServerException("Max retries exceeded for HTTP request")

添加延迟函数

def add_delay(seconds):
time.sleep(seconds)

定义拉丁学名

organisms = {
"Panax ginseng": "txid4054",
" Hevea brasiliensis": "txid3981",
"Robinia pseudoacacia": "txid35938",
"Pueraria lobata": "txid3893",
"Salvinia natans": "txid42333",
"Albizia julibrissin": "txid3813",
"Cercis chinensis": "txid161750",
}

定义输出文件名

protein_output_filename = "proteins.fasta"
cds_output_filename = "cds.fasta"
genomic_output_filename = "genomic.fasta"
Entrez.email = "[email protected]"

去重集合

seen_sequences = set()

搜索并保存protein序列到FASTA文件

def fetch_and_save_protein_sequences(organism, protein_output_filename):
# 获取esearch的HTTP响应对象
esearch_func = retry_request(lambda: Entrez.esearch(db="protein", term=organism, usehistory="y"), backoff=2, timeout=10)
esearch_response = esearch_func() # 调用函数获取HTTP响应对象

# 解析HTTP响应内容
esearch_record = Entrez.read(esearch_response)  # 使用Entrez.read来解析XML文件

# 关闭HTTP响应对象
esearch_response.close()

add_delay(1)

# 直接获取IdList
seq_ids = esearch_record["IdList"]  # 获取"IdList"键对应的值

# 打开输出文件
with open(protein_output_filename, "w") as protein_output_file:
    for seq_id in seq_ids:
        try:
            # 获取efetch的HTTP响应对象
            efetch_func = retry_request(
                lambda: Entrez.efetch(db="protein", id=seq_id, rettype="fasta", retmode="text"), backoff=2,
                timeout=10)
            efetch_response = efetch_func()  # 调用函数获取HTTP响应对象

            # 读取efetch响应内容并写入文件
            protein_output_file.write(efetch_response.read())

            # 关闭efetch响应对象
            efetch_response.close()
        except urllib.error.URLError as e:
            print(f"URL error for sequence ID {seq_id}: {e.reason}")
            raise ProteinDownloadError(f"Failed to download protein sequence with ID {seq_id}")
        except Exception as e:
            print(f"An unexpected error occurred for sequence ID {seq_id}: {e}")
            raise DownloadError(f"An unexpected error occurred for sequence ID {seq_id}")

print(f"All protein sequences for {organism} have been downloaded and saved to {protein_output_filename}.")

搜索并保存CDS序列到FASTA文件

def fetch_and_save_cds_sequences(organism, cds_output_filename):
# 使用esearch搜索指定物种的mRNA序列
esearch_func = retry_request(lambda: Entrez.esearch(db="nucleotide", term=f"{organism}[mRNA]", usehistory="y"), backoff=2, timeout=10)
esearch_response = esearch_func()
esearch_result = Entrez.read(esearch_response) # 读取esearch响应结果
esearch_response.close()
add_delay(1)

# 直接获取IdList
seq_ids = esearch_result["IdList"]

# 打开输出文件以写入CDS序列
with open(cds_output_filename, "w") as cds_output_file:
    for seq_id in seq_ids:
        try:
            # 获取efetch的HTTP响应对象
            efetch_func = retry_request(
                lambda: Entrez.efetch(db="nucleotide", id=seq_id, rettype="gb", retmode="text"), backoff=2,
                timeout=10)
            efetch_response = efetch_func()
            gb_record = SeqIO.read(io.BytesIO(efetch_response.read()), "genbank")  # 读取GenBank记录

            # 遍历特征并提取CDS序列
            for feature in gb_record.features:
                if feature.type == "CDS":
                    cds_seq = feature.location.extract(gb_record.seq)
                    clean_sequence = re.sub(r"[^A-Za-z*]", "", str(cds_seq))
                    sequence_hash = hashlib.md5(clean_sequence.encode('utf-8')).hexdigest()

                    # 检查序列是否已存在
                    if sequence_hash not in seen_sequences:
                        # 如果序列是唯一的,将其写入文件并添加到去重集合
                        cds_output_file.write(
                            f">CDS_{feature.location.start}..{feature.location.end} {gb_record.id}\n")
                        cds_output_file.write(str(cds_seq) + "\n")
                        seen_sequences.add(sequence_hash)
                    else:
                        print(f"Skipping duplicate sequence with ID {seq_id}.")

            efetch_response.close()

        except urllib.error.URLError as e:
            print(f"URL error for sequence ID {seq_id}: {e.reason}")
        except Exception as e:
            print(f"An error occurred for sequence ID {seq_id}: {e}")

print(f"All CDS sequences for {organism} have been downloaded and saved to {cds_output_filename}.")

定义一个函数用于获取并保存基因组序列

def fetch_and_save_genomic_sequences(organism, genomic_output_filename):
# 使用esearch搜索指定物种的基因组序列
esearch_func = retry_request(lambda: Entrez.esearch(db="nucleotide", term=f"{organism}[Genomic]", usehistory="y"), backoff=2, timeout=10)
esearch_response = esearch_func()
esearch_result = Entrez.read(esearch_response) # 读取esearch响应结果
esearch_response.close()
add_delay(1)

# 直接获取IdList
seq_ids = esearch_result["IdList"]

# 打开输出文件以写入基因组序列
with open(genomic_output_filename, "w") as genomic_output_file:
    for seq_id in seq_ids:
        try:
            # 获取efetch的HTTP响应对象
            efetch_func = retry_request(
                lambda: Entrez.efetch(db="nucleotide", id=seq_id, rettype="gb", retmode="text"), backoff=2,
                timeout=10)
            efetch_response = efetch_func()
            genomic_record = SeqIO.read(io.BytesIO(efetch_response.read()), "genbank")  # 读取GenBank记录

            # 写入基因组序列
            genomic_output_file.write(str(genomic_record.seq) + "\n")
            efetch_response.close()
        except urllib.error.URLError as e:
            print(f"URL error for sequence ID {seq_id}: {e.reason}")
        except Exception as e:
            print(f"An error occurred for sequence ID {seq_id}: {e}")

print(f"All genomic sequences for {organism} have been downloaded and saved to {genomic_output_filename}.")

调用函数获取并保存蛋白质、CDS和基因组序列

for organism, taxid in organisms.items():
fetch_and_save_protein_sequences(organism, protein_output_filename)
fetch_and_save_cds_sequences(organism, cds_output_filename)
fetch_and_save_genomic_sequences(organism, genomic_output_filename)

调用函数获取并保存蛋白质、CDS和基因组序列

for organism, taxid in organisms.items():
fetch_and_save_protein_sequences(organism, protein_output_filename)
fetch_and_save_cds_sequences(organism, cds_output_filename)
fetch_and_save_genomic_sequences(organism, genomic_output_filename)

定义筛选匹配序列的函数

def find_matching_sequences(input_filename, btw, numc, seen_sequences):
"""
从给定的FASTA文件中筛选出符合特定条件的序列,并进行去重。
"""
with open(input_filename, "r") as input_file:
for record in SeqIO.parse(input_file, "fasta"):
sequence = str(record.seq) # 序列转换为字符串
c_positions = [m.start() for m in re.finditer('C', sequence)] # 找到所有'C'的位置
if len(c_positions) >= numc:
valid_pairs = []
for i in range(len(c_positions) - 1):
if c_positions[i + 1] - c_positions[i] <= btw:
valid_pairs.append((c_positions[i], c_positions[i + 1]))

            consecutive_valid_pairs = 0
            for i in range(len(valid_pairs) - 1):
                if valid_pairs[i][1] == valid_pairs[i + 1][0] - 1:  # 检查是否连续
                    consecutive_valid_pairs += 1
                    if consecutive_valid_pairs >= numc - 1:  # 检查连续对的数量
                        # 使用SHA-256哈希算法
                        sequence_hash = hashlib.sha256(sequence.encode('utf-8')).hexdigest()
                        if sequence_hash not in seen_sequences:
                            yield record  # 生成满足条件的序列
                            seen_sequences.add(sequence_hash)  # 添加到已见集合
                        break  # 找到后退出循环
                else:
                    consecutive_valid_pairs = 0  # 重置计数器

将匹配序列写入文件的函数

def write_matching_sequences_to_file(matching_sequences_generator, output_file):
"""
将筛选出的匹配序列写入到指定的FASTA文件中。
参数:
matching_sequences_generator (迭代器): 包含匹配序列的生成器。
output_file (str): 输出文件的路径。
"""
with open(output_file, "w") as output_handle:
SeqIO.write(matching_sequences_generator, output_handle, "fasta")

设置筛选参数的范围

btw_range = range(20, 100) # btw的取值范围
numc_values = [3, 4, 5] # numc的取值

清空去重集合

seen_sequences.clear()

定义最终结果文件名

final_result_filename = "8.5_result.fasta"

初始化一个空的生成器列表

all_matching_sequences = []

对于每个btw和numc的组合,找到匹配的序列

for btw in btw_range:
for numc in numc_values:
# 找到匹配的序列并添加到生成器列表中
matching_sequences_proteins = find_matching_sequences(protein_output_filename, btw, numc, seen_sequences)
matching_sequences_cds = find_matching_sequences(cds_output_filename, btw, numc, seen_sequences)
matching_sequences_genomic = find_matching_sequences(genomic_output_filename, btw, numc, seen_sequences)

    # 将所有匹配的序列合并到一个生成器中
    all_matching_sequences.extend([matching_sequences_proteins, matching_sequences_cds, matching_sequences_genomic])

调用函数,传递生成器列表而不是单个生成器

write_matching_sequences_to_file(chain(*all_matching_sequences), final_result_filename)

打印完成消息

print(f"All matching sequences have been saved in {final_result_filename}")

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions