must
/
main.py
433 строки · 18.6 Кб
1import os
2import sys
3import time
4import datetime
5import json
6import random
7import ctypes
8import log
9import com_base
10import com_tcp
11import com_udp
12import backend_base
13import backend_winmm
14import backend_sdl2
15import backend_fmodex
16
17
18class App:
19def __init__(self, argv: any) -> None:
20self.exit_code = 1
21self.first_arg = argv[0]
22self.argv = argv[1:]
23self.is_le = sys.byteorder == 'little'
24self.cwd = os.path.dirname(__file__) or os.getcwd()
25self.encoding = 'utf-8'
26if sys.platform == 'win32':
27self.sig_kill = 0
28self.paths = [self.cwd] + (os.getenv('PATH') or '').split(';')
29self.auto_postfix = ''
30self.auto_prefix = ''
31self.load_library = ctypes.windll.LoadLibrary
32else:
33self.sig_kill = 9
34self.auto_postfix = '.so'
35self.auto_prefix = 'lib'
36self.load_library = ctypes.CDLL
37self.paths = [self.cwd] + (os.getenv('LD_LIBRARY_PATH') or '').split(':')\
38+ (os.getenv('PATH') or '').split(':')
39self.config_path = os.path.join(self.cwd, 'config.json')
40if not os.path.isfile(self.config_path):
41self.write_json(
42os.path.join(self.cwd, 'config.json'), self.read_json(os.path.join(self.cwd, 'default_config.json'))
43)
44self.config = self.read_json(self.config_path)
45log.enable_logging = self.config['allow_logging']
46try:
47if '--client-only' in self.argv or (self.config['need_server_arg'] and '--server-only' not in self.argv):
48raise RuntimeError('Client Only!')
49if self.config['com_type'] == 'tcp':
50self.server: com_base.BaseServer = com_tcp.TCPServer(self)
51elif self.config['com_type'] == 'udp':
52self.server: com_base.BaseServer = com_udp.UDPServer(self)
53else:
54raise FileNotFoundError('Unknown communication type')
55except RuntimeError:
56if '--server-only' in self.argv:
57raise RuntimeError('Server Only!')
58if self.config['com_type'] == 'tcp':
59self.client: com_base.BaseClient = com_tcp.TCPClient(self)
60elif self.config['com_type'] == 'udp':
61self.client: com_base.BaseClient = com_udp.UDPClient(self)
62else:
63raise FileNotFoundError('Unknown communication type')
64if self.argv and not (len(self.argv) <= 1 and self.argv[0] == '--client-only'):
65self.client.send(';'.join(self.argv))
66self.exit_code = 0
67# self.client.send('disconnect')
68else:
69self.client_prompt()
70self.client.destroy()
71# self.exit_code = 0
72return
73if self.config['audio_backend'] == 'sdl2':
74self.search_libs('libopusfile-0', 'libopus-0', 'libogg-0', 'libmodplug-1')
75self.bk: backend_base.BaseBackend = backend_sdl2.SDL2Backend(
76self, self.search_libs('SDL2', 'SDL2_mixer', prefix=self.auto_prefix)
77)
78elif self.config['audio_backend'] == 'fmod':
79if sys.platform == 'win32':
80self.search_libs('VCRUNTIME140_APP')
81self.search_libs('libfsbvorbis64')
82self.bk: backend_base.BaseBackend = backend_fmodex.FmodExBackend(
83self, self.search_libs('opus', 'media_foundation', 'fsbank', 'fmod', prefix=self.auto_prefix)
84)
85elif self.config['audio_backend'] == 'winmm' and sys.platform == 'win32':
86self.bk: backend_base.BaseBackend = backend_winmm.WinMMBackend(self, ctypes.windll.winmm)
87else:
88raise FileNotFoundError('Unknown audio backend')
89if self.config['force_try_init']:
90for i in range(20):
91try:
92self.bk.init()
93break
94except RuntimeError as _err:
95if i == 19:
96raise _err
97time.sleep(0.25)
98continue
99else:
100self.bk.init()
101self.display_info()
102self.volume = self.config['volume']
103self.speed = self.config['speed']
104if self.volume > 1.0:
105raise RuntimeError(f'Volume {self.volume} is bigger than 1.0')
106self.full_list = []
107self.temp_list = []
108self.full_list_group = {}
109self.rescan()
110self.current_music: base_backend.BaseMusic = None # noqa
111self.running = True
112self.default_track_id = -1
113self.next_is_switch_to_main = False
114try:
115self.main_loop()
116self.should_kill = self.server.should_kill
117except KeyboardInterrupt:
118self.should_kill = self.server.should_kill
119self.cleanup()
120self.bk.quit()
121self.bk.destroy()
122self.exit_code = 0
123if self.should_kill:
124os.kill(os.getpid(), self.sig_kill) # FIXME
125
126def rescan(self) -> None:
127self.full_list.clear()
128self.full_list_group.clear()
129for arg in self.argv:
130ext = arg.split('.')[-1].lower()
131if ext not in self.config['formats']:
132continue
133self.full_list.append(arg)
134if not self.full_list and self.config['music_path']:
135for fn in os.listdir(self.config['music_path']):
136ext = fn.split('.')[-1].lower()
137if ext not in self.config['formats']:
138continue
139self.full_list.append(os.path.join(self.config['music_path'], fn))
140for track_fp in self.full_list:
141music_group = os.path.basename(track_fp).split(' - ')[0].strip()
142if music_group in self.full_list_group:
143self.full_list_group[music_group].append(track_fp)
144else:
145self.full_list_group[music_group] = [track_fp]
146if self.config['main_playlist_mode'] == 'random_pick':
147random.shuffle(self.full_list)
148log.info('Music scan results:', len(self.full_list), 'tracks in the full list')
149
150def track_loop(self) -> None:
151if self.config['print_json'] and self.config['print_json_time']:
152last_format = ''
153len_format = self.format_time(self.current_music.length)
154while self.running and self.current_music and self.current_music.is_playing():
155self.server.update()
156self.poll_commands()
157self.bk.update()
158if self.config['print_json'] and self.config['print_json_time']:
159cur_format = self.format_time(self.current_music.get_pos())
160if not cur_format == last_format: # noqa
161last_format = cur_format
162output = {
163'text': '[' + cur_format + '/' + len_format + '] ' + # noqa
164self.current_music.fn_no_ext,
165'class': 'custom-mediaplayer',
166'alt': 'mediaplayer'
167}
168sys.stdout.write(json.dumps(output) + '\n')
169sys.stdout.flush()
170
171def next_track(self) -> any:
172# TODO: maybe allow to change mode in real time?
173if self.temp_list:
174fp = self.temp_list.pop(0) # Only default and random pick modes currently
175if not self.temp_list:
176self.next_is_switch_to_main = True
177try:
178return self.bk.open_music(fp)
179except RuntimeError:
180return None
181if self.next_is_switch_to_main:
182self.next_is_switch_to_main = False
183log.info('Switched back to main list')
184if self.config['main_playlist_mode'] in ('default', 'random_pick'):
185self.default_track_id += 1
186if self.default_track_id >= len(self.full_list):
187if self.config['main_playlist_mode'] == 'random_pick':
188random.shuffle(self.full_list)
189self.default_track_id = 0
190fp = self.full_list[self.default_track_id]
191try:
192return self.bk.open_music(fp)
193except RuntimeError:
194return None
195elif self.config['main_playlist_mode'] == 'random_full':
196fp = random.choice(self.full_list)
197try:
198return self.bk.open_music(fp)
199except RuntimeError:
200return None
201elif self.config['main_playlist_mode'] == 'random_group':
202group_tracks = random.choice(tuple(self.full_list_group.values()))
203fp = random.choice(group_tracks)
204try:
205return self.bk.open_music(fp)
206except RuntimeError:
207return None
208return None
209
210@staticmethod
211def format_time(need_time: float) -> str:
212round_time = round(need_time)
213sec_str = str(round(round_time) % 60)
214return str(int(round_time / 60)) + ':' + ('0' if len(sec_str) <= 1 else '') + sec_str
215
216def main_loop(self) -> None:
217pause_first = self.config['pause_first']
218while self.running:
219mus: backend_base.BaseMusic = self.next_track()
220while not mus:
221mus = self.next_track()
222self.play_new_music(mus)
223if pause_first:
224mus.set_paused(True)
225pause_first = False
226stat = os.stat(mus.fp)
227info = mus.fn_no_ext
228if mus.length:
229info += f' [{self.format_time(mus.length)}]'
230if mus.freq:
231info += f' [{int(mus.freq)}Hz]'
232if not mus.type == 'none':
233info += f' [{mus.type.upper()}]'
234if stat.st_mtime:
235info += f' [{str(datetime.datetime.fromtimestamp(int(stat.st_mtime)))}]'
236log.info(info)
237if self.config['print_json'] and not self.config['print_json_time']:
238output = {
239'text': mus.fn_no_ext,
240'class': 'custom-mediaplayer',
241'alt': 'mediaplayer'
242}
243sys.stdout.write(json.dumps(output) + '\n')
244sys.stdout.flush()
245self.track_loop()
246
247def play_new_music(self, mus: backend_base.BaseMusic) -> None:
248if self.current_music:
249self.current_music.stop()
250self.current_music.destroy()
251mus.play()
252mus.set_volume(self.volume)
253mus.set_speed(self.speed)
254self.current_music = mus
255
256def poll_commands(self) -> None:
257temp_mus = []
258while self.server.commands:
259cmds = self.server.commands.pop(0)
260for _cmd in cmds.split(';'):
261cmd = _cmd.strip()
262if os.path.isfile(cmd) and cmd.split('.')[-1].lower() in self.config['formats']:
263temp_mus.append(cmd)
264continue
265if cmd == 'next':
266if self.current_music:
267self.current_music.stop()
268elif cmd in ('toggle_pause', 'pause', 'resume'):
269if self.current_music:
270if cmd == 'toggle_pause':
271paused = not self.current_music.paused
272else:
273paused = cmd == 'pause'
274self.current_music.set_paused(paused)
275log.info('Paused:', self.current_music.paused)
276elif cmd == '--client-only' or cmd == '--server-only':
277pass
278elif cmd == 'clear_temp':
279self.temp_list.clear()
280if self.current_music:
281self.current_music.stop()
282# log.info('Temp music list cleared')
283elif cmd == 'show_pos':
284if self.current_music:
285log.info(f'Music Position: {self.format_time(self.current_music.get_pos())}')
286elif cmd == 'rewind':
287if self.current_music:
288self.current_music.rewind()
289elif cmd.startswith('pos_sec'):
290if not self.current_music:
291continue
292try:
293new_pos = float(cmd.split(' ')[-1])
294except (ValueError, IndexError) as _err:
295log.warn(f'Could not convert position value:', _err)
296continue
297if cmd.startswith('pos_sec '):
298cur_pos = 0.0
299else:
300cur_pos = self.current_music.get_pos()
301need_pos = max(min(cur_pos + new_pos, 60.0 * 60.0 * 100.0), 0.0)
302self.current_music.set_pos(need_pos)
303got_pos = self.current_music.get_pos()
304log.info('New Position:', self.format_time(got_pos))
305elif cmd.startswith('pos_rel'):
306if not self.current_music or not self.current_music.length:
307continue
308try:
309new_pos = float(cmd.split(' ')[-1])
310except (ValueError, IndexError) as _err:
311log.warn(f'Could not convert position value:', _err)
312continue
313if cmd.startswith('pos_rel '):
314cur_pos = 0.0
315else:
316cur_pos = self.current_music.get_pos() / self.current_music.length
317need_pos = max(min(cur_pos + new_pos, 100.0), 0.0)
318self.current_music.set_pos(need_pos * self.current_music.length)
319got_pos = self.current_music.get_pos()
320log.info('New Position:', self.format_time(got_pos))
321elif cmd.startswith('volume'):
322try:
323new_volume = float(cmd.split(' ')[-1])
324except (ValueError, IndexError) as _err:
325log.warn(f'Could not convert volume value:', _err)
326continue
327if cmd.startswith('volume '):
328self.volume = 0.0
329self.volume = max(min(self.volume + new_volume, 1.0), 0.0)
330if self.current_music:
331self.current_music.set_volume(self.volume)
332log.info('New Volume:', self.volume)
333elif cmd.startswith('speed'):
334try:
335new_speed = float(cmd.split(' ')[-1])
336except (ValueError, IndexError) as _err:
337log.warn(f'Could not convert speed value:', _err)
338continue
339if cmd.startswith('speed '):
340self.speed = 0.0
341self.speed = max(min(self.speed + new_speed, 1000.0), -1000.0)
342if self.current_music:
343self.current_music.set_speed(self.speed)
344log.info('New Speed:', self.speed)
345elif cmd == 'rescan':
346self.rescan()
347elif cmd == 'exit' or cmd == 'quit':
348self.running = False
349else:
350log.warn('Unknown Command', cmd)
351if temp_mus:
352self.temp_list = temp_mus
353self.temp_list_prepare()
354log.info('Playing Temp Playlist')
355if self.current_music:
356self.current_music.stop()
357
358def temp_list_prepare(self) -> None:
359if self.config['main_playlist_mode'] == 'default' and self.temp_list[-1] in self.full_list:
360self.default_track_id = self.full_list.index(self.temp_list[-1]) + 1
361if self.config['temp_playlist_mode'] == 'random_pick': # Trick
362random.shuffle(self.temp_list)
363
364def display_info(self) -> None:
365log.info('Welcome to the Pixelsuft MUST!')
366log.info('Found Drivers:', ', '.join(self.bk.get_audio_drivers()))
367log.info('Current Driver:', self.bk.get_current_audio_driver())
368log.info('Found Devices:', ', '.join(self.bk.get_audio_devices_names()))
369log.info('Current Device:', self.bk.get_current_audio_device_name())
370
371def cleanup(self) -> None:
372if self.server:
373self.server.destroy()
374# self.server = None
375if self.current_music:
376self.current_music.stop()
377self.current_music.destroy()
378self.current_music = None
379
380def client_prompt(self) -> None:
381msg = 'i_want_to_live_please_do\'nt_die'
382while msg is not None:
383try:
384self.client.send(msg)
385if msg == 'disconnect' or msg == 'exit' or msg == 'quit':
386self.exit_code = 0
387return
388except RuntimeError:
389return
390msg = input('>>> ')
391
392def read_json(self, fp: str) -> dict:
393f = open(fp, 'r', encoding=self.encoding)
394content = f.read()
395f.close()
396return json.loads(content)
397
398def write_json(self, fp: str, content: dict) -> int:
399f = open(fp, 'w', encoding=self.encoding)
400result = f.write(json.dumps(content, indent=4))
401f.close()
402return result
403
404def stb(self, str_to_encode: str, encoding=None) -> bytes:
405return str_to_encode.encode(encoding or self.encoding, errors='replace')
406
407def bts(self, bytes_to_decode: bytes, encoding=None) -> str:
408return bytes_to_decode.decode(encoding or self.encoding, errors='replace')
409
410def search_libs(self, *libs: any, prefix: str = '') -> dict:
411result = {}
412for path in self.paths:
413for lib in libs:
414if result.get(lib):
415continue
416try:
417result[lib] = self.load_library(
418os.path.join(path, prefix + lib + self.auto_postfix)
419)
420except (FileNotFoundError, OSError):
421continue
422for lib in libs:
423if result.get(lib):
424continue
425try:
426result[lib] = self.load_library(prefix + lib + self.auto_postfix)
427except (FileNotFoundError, OSError):
428continue
429return result
430
431
432if __name__ == '__main__':
433sys.exit(App(sys.argv).exit_code)
434