quick-start-connectors

Форк
0
80 строк · 2.3 Кб
1
import requests
2
from flask import current_app as app
3

4
from . import UpstreamProviderError
5

6
client = None
7

8

9
class PagerdutySearchClient:
10
    base_url = "https://api.pagerduty.com"
11
    get_users_endpoint = "/users"
12
    get_teams_endpoint = "/teams"
13
    get_incidents_endpoint = "/incidents"
14

15
    def __init__(self, key, search_types):
16
        self.headers = {"Authorization": f"Token token={key}"}
17
        self.search_types = search_types
18

19
    def get_search_types(self):
20
        return self.search_types
21

22
    def _make_request(self, url, params={}):
23
        response = requests.get(
24
            url,
25
            headers=self.headers,
26
            params=params,
27
        )
28

29
        if response.status_code != 200:
30
            message = response.text or f"Error: HTTP {response.status_code}"
31
            raise UpstreamProviderError(message)
32

33
        return response.json()
34

35
    def search_incidents(self, query):
36
        url = f"{self.base_url}{self.get_incidents_endpoint}"
37
        params = {
38
            "limit": 100,
39
        }
40
        response = self._make_request(url, params)
41

42
        # GET Incidents does not have an in-built query feature, it only returns a list of incidents
43
        # Perform search locally on certain incident properties
44
        query = query.lower()
45
        keywords = query.split()
46
        search_properties = ["title", "description", "summary"]
47
        results = []
48
        for incident in response["incidents"]:
49
            for prop in search_properties:
50
                value = incident.get(prop, "").lower()
51

52
                if any(keyword in value for keyword in keywords):
53
                    results.append(incident)
54

55
        return results
56

57
    def search_users(self, query):
58
        url = f"{self.base_url}{self.get_users_endpoint}"
59
        params = {"query": query}
60
        response = self._make_request(url, params)
61

62
        return response["users"]
63

64
    def search_teams(self, query):
65
        url = f"{self.base_url}{self.get_teams_endpoint}"
66
        params = {"query": query}
67
        response = self._make_request(url, params)
68

69
        return response["teams"]
70

71

72
def get_client():
73
    global client
74
    assert (key := app.config.get("API_KEY")), "PAGERDUTY_API_KEY must be set"
75
    enabled_search_types = app.config.get("ENABLED_SEARCH_TYPES", ["incidents"])
76

77
    if not client:
78
        client = PagerdutySearchClient(key, enabled_search_types)
79

80
    return client
81

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.