linkedIn_auto_jobs_applier_with_AI
397 строк · 17.4 Кб
1import base64
2import json
3import os
4import random
5import re
6import tempfile
7import time
8import traceback
9from datetime import date
10from typing import List, Optional, Any, Tuple
11from reportlab.lib.pagesizes import letter
12from reportlab.pdfgen import canvas
13from selenium.common.exceptions import NoSuchElementException
14from selenium.webdriver.common.by import By
15from selenium.webdriver.common.keys import Keys
16from selenium.webdriver.remote.webelement import WebElement
17from selenium.webdriver.support import expected_conditions as EC
18from selenium.webdriver.support.ui import Select, WebDriverWait
19from selenium.webdriver import ActionChains
20import src.utils as utils
21
22class LinkedInEasyApplier:
23def __init__(self, driver: Any, resume_dir: Optional[str], set_old_answers: List[Tuple[str, str, str]], gpt_answerer: Any, resume_generator_manager):
24if resume_dir is None or not os.path.exists(resume_dir):
25resume_dir = None
26self.driver = driver
27self.resume_path = resume_dir
28self.set_old_answers = set_old_answers
29self.gpt_answerer = gpt_answerer
30self.resume_generator_manager = resume_generator_manager
31self.all_data = self._load_questions_from_json()
32
33
34def _load_questions_from_json(self) -> List[dict]:
35output_file = 'answers.json'
36try:
37try:
38with open(output_file, 'r') as f:
39try:
40data = json.load(f)
41if not isinstance(data, list):
42raise ValueError("JSON file format is incorrect. Expected a list of questions.")
43except json.JSONDecodeError:
44data = []
45except FileNotFoundError:
46data = []
47return data
48except Exception:
49tb_str = traceback.format_exc()
50raise Exception(f"Error loading questions data from JSON file: \nTraceback:\n{tb_str}")
51
52
53def job_apply(self, job: Any):
54self.driver.get(job.link)
55time.sleep(random.uniform(3, 5))
56try:
57easy_apply_button = self._find_easy_apply_button()
58job.set_job_description(self._get_job_description())
59job.set_recruiter_link(self._get_job_recruiter())
60actions = ActionChains(self.driver)
61actions.move_to_element(easy_apply_button).click().perform()
62self.gpt_answerer.set_job(job)
63self._fill_application_form(job)
64except Exception:
65tb_str = traceback.format_exc()
66self._discard_application()
67raise Exception(f"Failed to apply to job! Original exception: \nTraceback:\n{tb_str}")
68
69def _find_easy_apply_button(self) -> WebElement:
70attempt = 0
71while attempt < 2:
72self._scroll_page()
73buttons = WebDriverWait(self.driver, 10).until(
74EC.presence_of_all_elements_located(
75(By.XPATH, '//button[contains(@class, "jobs-apply-button") and contains(., "Easy Apply")]')
76)
77)
78for index, _ in enumerate(buttons):
79try:
80button = WebDriverWait(self.driver, 10).until(
81EC.element_to_be_clickable(
82(By.XPATH, f'(//button[contains(@class, "jobs-apply-button") and contains(., "Easy Apply")])[{index + 1}]')
83)
84)
85return button
86except Exception as e:
87pass
88if attempt == 0:
89self.driver.refresh()
90time.sleep(3)
91attempt += 1
92raise Exception("No clickable 'Easy Apply' button found")
93
94
95def _get_job_description(self) -> str:
96try:
97see_more_button = self.driver.find_element(By.XPATH, '//button[@aria-label="Click to see more description"]')
98actions = ActionChains(self.driver)
99actions.move_to_element(see_more_button).click().perform()
100time.sleep(2)
101description = self.driver.find_element(By.CLASS_NAME, 'jobs-description-content__text').text
102return description
103except NoSuchElementException:
104tb_str = traceback.format_exc()
105raise Exception("Job description 'See more' button not found: \nTraceback:\n{tb_str}")
106except Exception:
107tb_str = traceback.format_exc()
108raise Exception(f"Error getting Job description: \nTraceback:\n{tb_str}")
109
110
111def _get_job_recruiter(self):
112try:
113hiring_team_section = WebDriverWait(self.driver, 10).until(
114EC.presence_of_element_located((By.XPATH, '//h2[text()="Meet the hiring team"]'))
115)
116recruiter_element = hiring_team_section.find_element(By.XPATH, './/following::a[contains(@href, "linkedin.com/in/")]')
117recruiter_link = recruiter_element.get_attribute('href')
118return recruiter_link
119except Exception as e:
120return ""
121
122def _scroll_page(self) -> None:
123scrollable_element = self.driver.find_element(By.TAG_NAME, 'html')
124utils.scroll_slow(self.driver, scrollable_element, step=300, reverse=False)
125utils.scroll_slow(self.driver, scrollable_element, step=300, reverse=True)
126
127def _fill_application_form(self, job):
128while True:
129self.fill_up(job)
130if self._next_or_submit():
131break
132
133def _next_or_submit(self):
134next_button = self.driver.find_element(By.CLASS_NAME, "artdeco-button--primary")
135button_text = next_button.text.lower()
136if 'submit application' in button_text:
137self._unfollow_company()
138time.sleep(random.uniform(1.5, 2.5))
139next_button.click()
140time.sleep(random.uniform(1.5, 2.5))
141return True
142time.sleep(random.uniform(1.5, 2.5))
143next_button.click()
144time.sleep(random.uniform(3.0, 5.0))
145self._check_for_errors()
146
147def _unfollow_company(self) -> None:
148try:
149follow_checkbox = self.driver.find_element(
150By.XPATH, "//label[contains(.,'to stay up to date with their page.')]")
151follow_checkbox.click()
152except Exception as e:
153pass
154
155def _check_for_errors(self) -> None:
156error_elements = self.driver.find_elements(By.CLASS_NAME, 'artdeco-inline-feedback--error')
157if error_elements:
158raise Exception(f"Failed answering or file upload. {str([e.text for e in error_elements])}")
159
160def _discard_application(self) -> None:
161try:
162self.driver.find_element(By.CLASS_NAME, 'artdeco-modal__dismiss').click()
163time.sleep(random.uniform(3, 5))
164self.driver.find_elements(By.CLASS_NAME, 'artdeco-modal__confirm-dialog-btn')[0].click()
165time.sleep(random.uniform(3, 5))
166except Exception as e:
167pass
168
169def fill_up(self, job) -> None:
170easy_apply_content = self.driver.find_element(By.CLASS_NAME, 'jobs-easy-apply-content')
171pb4_elements = easy_apply_content.find_elements(By.CLASS_NAME, 'pb4')
172for element in pb4_elements:
173self._process_form_element(element, job)
174
175def _process_form_element(self, element: WebElement, job) -> None:
176if self._is_upload_field(element):
177self._handle_upload_fields(element, job)
178else:
179self._fill_additional_questions()
180
181def _is_upload_field(self, element: WebElement) -> bool:
182return bool(element.find_elements(By.XPATH, ".//input[@type='file']"))
183
184def _handle_upload_fields(self, element: WebElement, job) -> None:
185file_upload_elements = self.driver.find_elements(By.XPATH, "//input[@type='file']")
186for element in file_upload_elements:
187parent = element.find_element(By.XPATH, "..")
188self.driver.execute_script("arguments[0].classList.remove('hidden')", element)
189output = self.gpt_answerer.resume_or_cover(parent.text.lower())
190if 'resume' in output:
191if self.resume_path is not None and self.resume_path.resolve().is_file():
192element.send_keys(str(self.resume_path.resolve()))
193else:
194self._create_and_upload_resume(element, job)
195elif 'cover' in output:
196self._create_and_upload_cover_letter(element)
197
198def _create_and_upload_resume(self, element, job):
199folder_path = 'generated_cv'
200os.makedirs(folder_path, exist_ok=True)
201try:
202file_path_pdf = os.path.join(folder_path, f"CV_{random.randint(0, 9999)}.pdf")
203with open(file_path_pdf, "xb") as f:
204f.write(base64.b64decode(self.resume_generator_manager.pdf_base64(job_description_text=job.description)))
205element.send_keys(os.path.abspath(file_path_pdf))
206job.pdf_path = os.path.abspath(file_path_pdf)
207time.sleep(2)
208except Exception:
209tb_str = traceback.format_exc()
210raise Exception(f"Upload failed: \nTraceback:\n{tb_str}")
211
212def _create_and_upload_cover_letter(self, element: WebElement) -> None:
213cover_letter = self.gpt_answerer.answer_question_textual_wide_range("Write a cover letter")
214with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_pdf_file:
215letter_path = temp_pdf_file.name
216c = canvas.Canvas(letter_path, pagesize=letter)
217_, height = letter
218text_object = c.beginText(100, height - 100)
219text_object.setFont("Helvetica", 12)
220text_object.textLines(cover_letter)
221c.drawText(text_object)
222c.save()
223element.send_keys(letter_path)
224
225def _fill_additional_questions(self) -> None:
226form_sections = self.driver.find_elements(By.CLASS_NAME, 'jobs-easy-apply-form-section__grouping')
227for section in form_sections:
228self._process_form_section(section)
229
230
231def _process_form_section(self, section: WebElement) -> None:
232if self._handle_terms_of_service(section):
233return
234if self._find_and_handle_radio_question(section):
235return
236if self._find_and_handle_textbox_question(section):
237return
238if self._find_and_handle_date_question(section):
239return
240if self._find_and_handle_dropdown_question(section):
241return
242
243def _handle_terms_of_service(self, element: WebElement) -> bool:
244checkbox = element.find_elements(By.TAG_NAME, 'label')
245if checkbox and any(term in checkbox[0].text.lower() for term in ['terms of service', 'privacy policy', 'terms of use']):
246checkbox[0].click()
247return True
248return False
249
250def _find_and_handle_radio_question(self, section: WebElement) -> bool:
251question = section.find_element(By.CLASS_NAME, 'jobs-easy-apply-form-element')
252radios = question.find_elements(By.CLASS_NAME, 'fb-text-selectable__option')
253if radios:
254question_text = section.text.lower()
255options = [radio.text.lower() for radio in radios]
256
257existing_answer = None
258for item in self.all_data:
259if self._sanitize_text(question_text) in item['question'] and item['type'] == 'radio':
260existing_answer = item
261break
262if existing_answer:
263self._select_radio(radios, existing_answer['answer'])
264return True
265
266answer = self.gpt_answerer.answer_question_from_options(question_text, options)
267self._save_questions_to_json({'type': 'radio', 'question': question_text, 'answer': answer})
268self._select_radio(radios, answer)
269return True
270return False
271
272def _find_and_handle_textbox_question(self, section: WebElement) -> bool:
273text_fields = section.find_elements(By.TAG_NAME, 'input') + section.find_elements(By.TAG_NAME, 'textarea')
274if text_fields:
275text_field = text_fields[0]
276question_text = section.find_element(By.TAG_NAME, 'label').text.lower()
277is_numeric = self._is_numeric_field(text_field)
278if is_numeric:
279question_type = 'numeric'
280answer = self.gpt_answerer.answer_question_numeric(question_text)
281else:
282question_type = 'textbox'
283answer = self.gpt_answerer.answer_question_textual_wide_range(question_text)
284existing_answer = None
285for item in self.all_data:
286if item['question'] == self._sanitize_text(question_text) and item['type'] == question_type:
287existing_answer = item
288break
289if existing_answer:
290self._enter_text(text_field, existing_answer['answer'])
291return True
292self._save_questions_to_json({'type': question_type, 'question': question_text, 'answer': answer})
293self._enter_text(text_field, answer)
294return True
295return False
296
297def _find_and_handle_date_question(self, section: WebElement) -> bool:
298date_fields = section.find_elements(By.CLASS_NAME, 'artdeco-datepicker__input ')
299if date_fields:
300date_field = date_fields[0]
301question_text = section.text.lower()
302answer_date = self.gpt_answerer.answer_question_date()
303answer_text = answer_date.strftime("%Y-%m-%d")
304
305
306existing_answer = None
307for item in self.all_data:
308if self._sanitize_text(question_text) in item['question'] and item['type'] == 'date':
309existing_answer = item
310break
311if existing_answer:
312self._enter_text(date_field, existing_answer['answer'])
313return True
314
315self._save_questions_to_json({'type': 'date', 'question': question_text, 'answer': answer_text})
316self._enter_text(date_field, answer_text)
317return True
318return False
319
320def _find_and_handle_dropdown_question(self, section: WebElement) -> bool:
321try:
322question = section.find_element(By.CLASS_NAME, 'jobs-easy-apply-form-element')
323question_text = question.find_element(By.TAG_NAME, 'label').text.lower()
324dropdown = question.find_element(By.TAG_NAME, 'select')
325if dropdown:
326select = Select(dropdown)
327options = [option.text for option in select.options]
328
329existing_answer = None
330for item in self.all_data:
331if self._sanitize_text(question_text) in item['question'] and item['type'] == 'dropdown':
332existing_answer = item
333break
334if existing_answer:
335self._select_dropdown_option(dropdown, existing_answer['answer'])
336return True
337
338answer = self.gpt_answerer.answer_question_from_options(question_text, options)
339self._save_questions_to_json({'type': 'dropdown', 'question': question_text, 'answer': answer})
340self._select_dropdown_option(dropdown, answer)
341return True
342except Exception:
343return False
344
345def _is_numeric_field(self, field: WebElement) -> bool:
346field_type = field.get_attribute('type').lower()
347if 'numeric' in field_type:
348return True
349class_attribute = field.get_attribute("id")
350return class_attribute and 'numeric' in class_attribute
351
352def _enter_text(self, element: WebElement, text: str) -> None:
353element.clear()
354element.send_keys(text)
355
356def _select_radio(self, radios: List[WebElement], answer: str) -> None:
357for radio in radios:
358if answer in radio.text.lower():
359radio.find_element(By.TAG_NAME, 'label').click()
360return
361radios[-1].find_element(By.TAG_NAME, 'label').click()
362
363def _select_dropdown_option(self, element: WebElement, text: str) -> None:
364select = Select(element)
365select.select_by_visible_text(text)
366
367def _save_questions_to_json(self, question_data: dict) -> None:
368output_file = 'answers.json'
369question_data['question'] = self._sanitize_text(question_data['question'])
370try:
371try:
372with open(output_file, 'r') as f:
373try:
374data = json.load(f)
375if not isinstance(data, list):
376raise ValueError("JSON file format is incorrect. Expected a list of questions.")
377except json.JSONDecodeError:
378data = []
379except FileNotFoundError:
380data = []
381data.append(question_data)
382with open(output_file, 'w') as f:
383json.dump(data, f, indent=4)
384except Exception:
385tb_str = traceback.format_exc()
386raise Exception(f"Error saving questions data to JSON file: \nTraceback:\n{tb_str}")
387
388
389def _sanitize_text(self, text: str) -> str:
390sanitized_text = text.lower()
391sanitized_text = sanitized_text.strip()
392sanitized_text = sanitized_text.replace('"', '')
393sanitized_text = sanitized_text.replace('\\', '')
394sanitized_text = re.sub(r'[\x00-\x1F\x7F]', '', sanitized_text)
395sanitized_text = sanitized_text.replace('\n', ' ').replace('\r', '')
396sanitized_text = sanitized_text.rstrip(',')
397return sanitized_text
398