linkedIn_auto_jobs_applier_with_AI

Форк
0
/
linkedIn_easy_applier.py 
397 строк · 17.4 Кб
1
import base64
2
import json
3
import os
4
import random
5
import re
6
import tempfile
7
import time
8
import traceback
9
from datetime import date
10
from typing import List, Optional, Any, Tuple
11
from reportlab.lib.pagesizes import letter
12
from reportlab.pdfgen import canvas
13
from selenium.common.exceptions import NoSuchElementException
14
from selenium.webdriver.common.by import By
15
from selenium.webdriver.common.keys import Keys
16
from selenium.webdriver.remote.webelement import WebElement
17
from selenium.webdriver.support import expected_conditions as EC
18
from selenium.webdriver.support.ui import Select, WebDriverWait
19
from selenium.webdriver import ActionChains
20
import src.utils as utils
21

22
class LinkedInEasyApplier:
23
    def __init__(self, driver: Any, resume_dir: Optional[str], set_old_answers: List[Tuple[str, str, str]], gpt_answerer: Any, resume_generator_manager):
24
        if resume_dir is None or not os.path.exists(resume_dir):
25
            resume_dir = None
26
        self.driver = driver
27
        self.resume_path = resume_dir
28
        self.set_old_answers = set_old_answers
29
        self.gpt_answerer = gpt_answerer
30
        self.resume_generator_manager = resume_generator_manager
31
        self.all_data = self._load_questions_from_json()
32

33

34
    def _load_questions_from_json(self) -> List[dict]:
35
        output_file = 'answers.json'
36
        try:
37
            try:
38
                with open(output_file, 'r') as f:
39
                    try:
40
                        data = json.load(f)
41
                        if not isinstance(data, list):
42
                            raise ValueError("JSON file format is incorrect. Expected a list of questions.")
43
                    except json.JSONDecodeError:
44
                        data = []
45
            except FileNotFoundError:
46
                data = []
47
            return data
48
        except Exception:
49
            tb_str = traceback.format_exc()
50
            raise Exception(f"Error loading questions data from JSON file: \nTraceback:\n{tb_str}")
51

52

53
    def job_apply(self, job: Any):
54
        self.driver.get(job.link)
55
        time.sleep(random.uniform(3, 5))
56
        try:
57
            easy_apply_button = self._find_easy_apply_button()
58
            job.set_job_description(self._get_job_description())
59
            job.set_recruiter_link(self._get_job_recruiter())
60
            actions = ActionChains(self.driver)
61
            actions.move_to_element(easy_apply_button).click().perform()
62
            self.gpt_answerer.set_job(job)
63
            self._fill_application_form(job)
64
        except Exception:
65
            tb_str = traceback.format_exc()
66
            self._discard_application()
67
            raise Exception(f"Failed to apply to job! Original exception: \nTraceback:\n{tb_str}")
68

69
    def _find_easy_apply_button(self) -> WebElement:
70
        attempt = 0
71
        while attempt < 2:
72
            self._scroll_page()
73
            buttons = WebDriverWait(self.driver, 10).until(
74
                EC.presence_of_all_elements_located(
75
                    (By.XPATH, '//button[contains(@class, "jobs-apply-button") and contains(., "Easy Apply")]')
76
                )
77
            )
78
            for index, _ in enumerate(buttons):
79
                try:
80
                    button = WebDriverWait(self.driver, 10).until(
81
                        EC.element_to_be_clickable(
82
                            (By.XPATH, f'(//button[contains(@class, "jobs-apply-button") and contains(., "Easy Apply")])[{index + 1}]')
83
                        )
84
                    )
85
                    return button
86
                except Exception as e:
87
                    pass
88
            if attempt == 0:
89
                self.driver.refresh()
90
                time.sleep(3)  
91
            attempt += 1
92
        raise Exception("No clickable 'Easy Apply' button found")
93
    
94

95
    def _get_job_description(self) -> str:
96
        try:
97
            see_more_button = self.driver.find_element(By.XPATH, '//button[@aria-label="Click to see more description"]')
98
            actions = ActionChains(self.driver)
99
            actions.move_to_element(see_more_button).click().perform()
100
            time.sleep(2)
101
            description = self.driver.find_element(By.CLASS_NAME, 'jobs-description-content__text').text
102
            return description
103
        except NoSuchElementException:
104
            tb_str = traceback.format_exc()
105
            raise Exception("Job description 'See more' button not found: \nTraceback:\n{tb_str}")
106
        except Exception:
107
            tb_str = traceback.format_exc()
108
            raise Exception(f"Error getting Job description: \nTraceback:\n{tb_str}")
109

110

111
    def _get_job_recruiter(self):
112
        try:
113
            hiring_team_section = WebDriverWait(self.driver, 10).until(
114
                EC.presence_of_element_located((By.XPATH, '//h2[text()="Meet the hiring team"]'))
115
            )
116
            recruiter_element = hiring_team_section.find_element(By.XPATH, './/following::a[contains(@href, "linkedin.com/in/")]')
117
            recruiter_link = recruiter_element.get_attribute('href')
118
            return recruiter_link
119
        except Exception as e:
120
            return ""
121

122
    def _scroll_page(self) -> None:
123
        scrollable_element = self.driver.find_element(By.TAG_NAME, 'html')
124
        utils.scroll_slow(self.driver, scrollable_element, step=300, reverse=False)
125
        utils.scroll_slow(self.driver, scrollable_element, step=300, reverse=True)
126

127
    def _fill_application_form(self, job):
128
        while True:
129
            self.fill_up(job)
130
            if self._next_or_submit():
131
                break
132

133
    def _next_or_submit(self):
134
        next_button = self.driver.find_element(By.CLASS_NAME, "artdeco-button--primary")
135
        button_text = next_button.text.lower()
136
        if 'submit application' in button_text:
137
            self._unfollow_company()
138
            time.sleep(random.uniform(1.5, 2.5))
139
            next_button.click()
140
            time.sleep(random.uniform(1.5, 2.5))
141
            return True
142
        time.sleep(random.uniform(1.5, 2.5))
143
        next_button.click()
144
        time.sleep(random.uniform(3.0, 5.0))
145
        self._check_for_errors()
146

147
    def _unfollow_company(self) -> None:
148
        try:
149
            follow_checkbox = self.driver.find_element(
150
                By.XPATH, "//label[contains(.,'to stay up to date with their page.')]")
151
            follow_checkbox.click()
152
        except Exception as e:
153
            pass
154

155
    def _check_for_errors(self) -> None:
156
        error_elements = self.driver.find_elements(By.CLASS_NAME, 'artdeco-inline-feedback--error')
157
        if error_elements:
158
            raise Exception(f"Failed answering or file upload. {str([e.text for e in error_elements])}")
159

160
    def _discard_application(self) -> None:
161
        try:
162
            self.driver.find_element(By.CLASS_NAME, 'artdeco-modal__dismiss').click()
163
            time.sleep(random.uniform(3, 5))
164
            self.driver.find_elements(By.CLASS_NAME, 'artdeco-modal__confirm-dialog-btn')[0].click()
165
            time.sleep(random.uniform(3, 5))
166
        except Exception as e:
167
            pass
168

169
    def fill_up(self, job) -> None:
170
        easy_apply_content = self.driver.find_element(By.CLASS_NAME, 'jobs-easy-apply-content')
171
        pb4_elements = easy_apply_content.find_elements(By.CLASS_NAME, 'pb4')
172
        for element in pb4_elements:
173
            self._process_form_element(element, job)
174
        
175
    def _process_form_element(self, element: WebElement, job) -> None:
176
        if self._is_upload_field(element):
177
            self._handle_upload_fields(element, job)
178
        else:
179
            self._fill_additional_questions()
180

181
    def _is_upload_field(self, element: WebElement) -> bool:
182
        return bool(element.find_elements(By.XPATH, ".//input[@type='file']"))
183

184
    def _handle_upload_fields(self, element: WebElement, job) -> None:
185
        file_upload_elements = self.driver.find_elements(By.XPATH, "//input[@type='file']")
186
        for element in file_upload_elements:
187
            parent = element.find_element(By.XPATH, "..")
188
            self.driver.execute_script("arguments[0].classList.remove('hidden')", element)
189
            output = self.gpt_answerer.resume_or_cover(parent.text.lower())
190
            if 'resume' in output:
191
                if self.resume_path is not None and self.resume_path.resolve().is_file():
192
                    element.send_keys(str(self.resume_path.resolve()))
193
                else:
194
                    self._create_and_upload_resume(element, job)
195
            elif 'cover' in output:
196
                self._create_and_upload_cover_letter(element)
197

198
    def _create_and_upload_resume(self, element, job):
199
        folder_path = 'generated_cv'
200
        os.makedirs(folder_path, exist_ok=True)
201
        try:
202
            file_path_pdf = os.path.join(folder_path, f"CV_{random.randint(0, 9999)}.pdf")
203
            with open(file_path_pdf, "xb") as f:
204
                f.write(base64.b64decode(self.resume_generator_manager.pdf_base64(job_description_text=job.description)))
205
            element.send_keys(os.path.abspath(file_path_pdf))
206
            job.pdf_path = os.path.abspath(file_path_pdf)
207
            time.sleep(2)
208
        except Exception:
209
            tb_str = traceback.format_exc()
210
            raise Exception(f"Upload failed: \nTraceback:\n{tb_str}")
211

212
    def _create_and_upload_cover_letter(self, element: WebElement) -> None:
213
        cover_letter = self.gpt_answerer.answer_question_textual_wide_range("Write a cover letter")
214
        with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_pdf_file:
215
            letter_path = temp_pdf_file.name
216
            c = canvas.Canvas(letter_path, pagesize=letter)
217
            _, height = letter
218
            text_object = c.beginText(100, height - 100)
219
            text_object.setFont("Helvetica", 12)
220
            text_object.textLines(cover_letter)
221
            c.drawText(text_object)
222
            c.save()
223
            element.send_keys(letter_path)
224

225
    def _fill_additional_questions(self) -> None:
226
        form_sections = self.driver.find_elements(By.CLASS_NAME, 'jobs-easy-apply-form-section__grouping')
227
        for section in form_sections:
228
            self._process_form_section(section)
229
            
230

231
    def _process_form_section(self, section: WebElement) -> None:
232
        if self._handle_terms_of_service(section):
233
            return
234
        if self._find_and_handle_radio_question(section):
235
            return
236
        if self._find_and_handle_textbox_question(section):
237
            return
238
        if self._find_and_handle_date_question(section):
239
            return
240
        if self._find_and_handle_dropdown_question(section):
241
            return
242

243
    def _handle_terms_of_service(self, element: WebElement) -> bool:
244
        checkbox = element.find_elements(By.TAG_NAME, 'label')
245
        if checkbox and any(term in checkbox[0].text.lower() for term in ['terms of service', 'privacy policy', 'terms of use']):
246
            checkbox[0].click()
247
            return True
248
        return False
249

250
    def _find_and_handle_radio_question(self, section: WebElement) -> bool:
251
        question = section.find_element(By.CLASS_NAME, 'jobs-easy-apply-form-element')
252
        radios = question.find_elements(By.CLASS_NAME, 'fb-text-selectable__option')
253
        if radios:
254
            question_text = section.text.lower()
255
            options = [radio.text.lower() for radio in radios]
256
            
257
            existing_answer = None
258
            for item in self.all_data:
259
                if self._sanitize_text(question_text) in item['question'] and item['type'] == 'radio':
260
                    existing_answer = item
261
                    break
262
            if existing_answer:
263
                self._select_radio(radios, existing_answer['answer'])
264
                return True
265

266
            answer = self.gpt_answerer.answer_question_from_options(question_text, options)
267
            self._save_questions_to_json({'type': 'radio', 'question': question_text, 'answer': answer})
268
            self._select_radio(radios, answer)
269
            return True
270
        return False
271

272
    def _find_and_handle_textbox_question(self, section: WebElement) -> bool:
273
        text_fields = section.find_elements(By.TAG_NAME, 'input') + section.find_elements(By.TAG_NAME, 'textarea')
274
        if text_fields:
275
            text_field = text_fields[0]
276
            question_text = section.find_element(By.TAG_NAME, 'label').text.lower()
277
            is_numeric = self._is_numeric_field(text_field)
278
            if is_numeric:
279
                question_type = 'numeric'
280
                answer = self.gpt_answerer.answer_question_numeric(question_text)
281
            else:
282
                question_type = 'textbox'
283
                answer = self.gpt_answerer.answer_question_textual_wide_range(question_text)
284
            existing_answer = None
285
            for item in self.all_data:
286
                if item['question'] == self._sanitize_text(question_text) and item['type'] == question_type:
287
                    existing_answer = item
288
                    break
289
            if existing_answer:
290
                self._enter_text(text_field, existing_answer['answer'])
291
                return True
292
            self._save_questions_to_json({'type': question_type, 'question': question_text, 'answer': answer})
293
            self._enter_text(text_field, answer)
294
            return True
295
        return False
296

297
    def _find_and_handle_date_question(self, section: WebElement) -> bool:
298
        date_fields = section.find_elements(By.CLASS_NAME, 'artdeco-datepicker__input ')
299
        if date_fields:
300
            date_field = date_fields[0]
301
            question_text = section.text.lower()
302
            answer_date = self.gpt_answerer.answer_question_date()
303
            answer_text = answer_date.strftime("%Y-%m-%d")
304

305

306
            existing_answer = None
307
            for item in self.all_data:
308
                if  self._sanitize_text(question_text) in item['question'] and item['type'] == 'date':
309
                    existing_answer = item
310
                    break
311
            if existing_answer:
312
                self._enter_text(date_field, existing_answer['answer'])
313
                return True
314

315
            self._save_questions_to_json({'type': 'date', 'question': question_text, 'answer': answer_text})
316
            self._enter_text(date_field, answer_text)
317
            return True
318
        return False
319

320
    def _find_and_handle_dropdown_question(self, section: WebElement) -> bool:
321
        try:
322
            question = section.find_element(By.CLASS_NAME, 'jobs-easy-apply-form-element')
323
            question_text = question.find_element(By.TAG_NAME, 'label').text.lower()
324
            dropdown = question.find_element(By.TAG_NAME, 'select')
325
            if dropdown:
326
                select = Select(dropdown)
327
                options = [option.text for option in select.options]
328

329
                existing_answer = None
330
                for item in self.all_data:
331
                    if  self._sanitize_text(question_text) in item['question'] and item['type'] == 'dropdown':
332
                        existing_answer = item
333
                        break
334
                if existing_answer:
335
                    self._select_dropdown_option(dropdown, existing_answer['answer'])
336
                    return True
337

338
                answer = self.gpt_answerer.answer_question_from_options(question_text, options)
339
                self._save_questions_to_json({'type': 'dropdown', 'question': question_text, 'answer': answer})
340
                self._select_dropdown_option(dropdown, answer)
341
                return True
342
        except Exception:
343
            return False
344

345
    def _is_numeric_field(self, field: WebElement) -> bool:
346
        field_type = field.get_attribute('type').lower()
347
        if 'numeric' in field_type:
348
            return True
349
        class_attribute = field.get_attribute("id")
350
        return class_attribute and 'numeric' in class_attribute
351

352
    def _enter_text(self, element: WebElement, text: str) -> None:
353
        element.clear()
354
        element.send_keys(text)
355

356
    def _select_radio(self, radios: List[WebElement], answer: str) -> None:
357
        for radio in radios:
358
            if answer in radio.text.lower():
359
                radio.find_element(By.TAG_NAME, 'label').click()
360
                return
361
        radios[-1].find_element(By.TAG_NAME, 'label').click()
362

363
    def _select_dropdown_option(self, element: WebElement, text: str) -> None:
364
        select = Select(element)
365
        select.select_by_visible_text(text)
366

367
    def _save_questions_to_json(self, question_data: dict) -> None:
368
        output_file = 'answers.json'
369
        question_data['question'] = self._sanitize_text(question_data['question'])
370
        try:
371
            try:
372
                with open(output_file, 'r') as f:
373
                    try:
374
                        data = json.load(f)
375
                        if not isinstance(data, list):
376
                            raise ValueError("JSON file format is incorrect. Expected a list of questions.")
377
                    except json.JSONDecodeError:
378
                        data = []
379
            except FileNotFoundError:
380
                data = []
381
            data.append(question_data)
382
            with open(output_file, 'w') as f:
383
                json.dump(data, f, indent=4)
384
        except Exception:
385
            tb_str = traceback.format_exc()
386
            raise Exception(f"Error saving questions data to JSON file: \nTraceback:\n{tb_str}")
387

388

389
    def _sanitize_text(self, text: str) -> str:
390
        sanitized_text = text.lower()
391
        sanitized_text = sanitized_text.strip()
392
        sanitized_text = sanitized_text.replace('"', '')
393
        sanitized_text = sanitized_text.replace('\\', '')
394
        sanitized_text = re.sub(r'[\x00-\x1F\x7F]', '', sanitized_text)
395
        sanitized_text = sanitized_text.replace('\n', ' ').replace('\r', '')
396
        sanitized_text = sanitized_text.rstrip(',')
397
        return sanitized_text
398

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.