google-research
113 строк · 3.8 Кб
1# coding=utf-8
2# Copyright 2024 The Google Research Authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""`beam.PTransform` to read from alignment files in Stockholm format."""
17
18from __future__ import annotations
19
20import functools
21from typing import ClassVar
22
23import apache_beam as beam
24
25from dedal.preprocessing import types
26from dedal.preprocessing import utils
27
28
29# Type aliases
30KeyRecordPair = types.KeyRecordPair
31
32# Constants
33STOCKHOLM_GS_FIELD_NAMES = {
34'AC': 'accession',
35'DR': 'database_references',
36}
37
38
39def rekey_by_accession(key_record):
40"""Re-keys Pfam domains by their `pfamseq_acc` and endpoints."""
41old_key, record = key_record
42
43accession_field_name = STOCKHOLM_GS_FIELD_NAMES['AC']
44accession = record[accession_field_name]
45# `AC` field should not be duplicated in the Stockholm file.
46if isinstance(accession, list):
47raise ValueError(f'{old_key} has duplicated #=GS AC annotations.')
48if not isinstance(accession, str):
49raise ValueError(f'{accession} must be a string.')
50
51pfamseq_acc = accession.split('.')[0]
52region_str = old_key.split('/')[1]
53
54return f'{pfamseq_acc}/{region_str}', record
55
56
57class Latin1Coder(beam.coders.Coder):
58"""A coder used for reading and writing strings as `latin-1`."""
59
60def encode(self, value):
61return value.encode('latin-1')
62
63def decode(self, value):
64return value.decode('latin-1')
65
66def is_deterministic(self):
67return True
68
69
70class ReadFromStockholm(beam.PTransform):
71"""Generates key-value pairs from alignments in Pfam's Stockholm format."""
72
73# Format: `"#=GS <seqname> <feature> <free text>"` where:
74# + <seqname> has regex `r'\w+/\d+-\d+'`.
75# + <feature> has regex `r'[A-Z]{2}'`.
76# + <free text> has regex `r'\S.*'`.
77GS_PATTERN: ClassVar[str] = r'^#=GS\s+(\w+/\d+-\d+)\s+([A-Z]{2})\s+(\S.*)$'
78# Format: `"<seqname> <aligned sequence>"` where:
79# + <seqname> has regex `r'\w+/\d+-\d+'`.
80# + <aligned sequence> has regex `r'[a-zA-z\.-]+'`.
81ALI_PATTERN: ClassVar[str] = r'^(\w+/\d+-\d+)\s+([a-zA-z\.-]+)$'
82
83def __init__(self, file_pattern):
84self.file_pattern = file_pattern
85
86def expand(self, root):
87lines = (
88root
89| 'ReadFromText' >> beam.io.ReadFromText(
90file_pattern=self.file_pattern,
91coder=Latin1Coder()))
92alignments = (
93lines
94| 'AlignmentRegex' >> beam.Regex.all_matches(regex=self.ALI_PATTERN)
95| 'AlignmentDiscardGroup0' >> beam.Map(lambda matches: matches[1:])
96| 'AlignmentsToKeyValue' >> beam.Map(
97lambda matches: (matches[0], {'gapped_sequence': matches[1]})))
98annotations = (
99lines
100| 'AnnotationsRegex' >> beam.Regex.all_matches(regex=self.GS_PATTERN)
101| 'AnnotationsDiscardGroup0' >> beam.Map(lambda matches: matches[1:])
102| 'AnnotationsToKeyValue' >> beam.Map(
103lambda matches: (matches[0], {matches[1]: matches[2]}))
104| 'RenameElementFields' >> beam.Map(
105functools.partial(
106utils.rename_key_record_fields,
107field_name_mapping=STOCKHOLM_GS_FIELD_NAMES,
108)))
109return (
110{'alignments': alignments, 'annotations': annotations}
111| 'JoinDatabases' >> beam.CoGroupByKey()
112| 'FlattenJoinedElements' >> beam.Map(utils.flatten_grouped_records)
113| 'ReKeyByAccession' >> beam.Map(rekey_by_accession))
114