directus

Форк
0
/
sync-extensions.ts 
88 строк · 3.1 Кб
1
import { useEnv } from '@directus/env';
2
import { exists } from 'fs-extra';
3
import mid from 'node-machine-id';
4
import { createWriteStream } from 'node:fs';
5
import { mkdir, rm } from 'node:fs/promises';
6
import { dirname, join, relative, resolve, sep } from 'node:path';
7
import { pipeline } from 'node:stream/promises';
8
import Queue from 'p-queue';
9
import { useBus } from '../../bus/index.js';
10
import { useLock } from '../../lock/index.js';
11
import { useLogger } from '../../logger.js';
12
import { getStorage } from '../../storage/index.js';
13
import { getExtensionsPath } from './get-extensions-path.js';
14
import { SyncStatus, getSyncStatus, setSyncStatus } from './sync-status.js';
15

16
export const syncExtensions = async (options?: { force: boolean }): Promise<void> => {
17
	const lock = useLock();
18
	const messenger = useBus();
19
	const env = useEnv();
20
	const logger = useLogger();
21

22
	if (!options?.force) {
23
		const isDone = (await getSyncStatus()) === SyncStatus.DONE;
24
		if (isDone) return;
25
	}
26

27
	const machineId = await mid.machineId();
28
	const machineKey = `extensions-sync/${machineId}`;
29

30
	const processId = await lock.increment(machineKey);
31

32
	const currentProcessShouldHandleSync = processId === 1;
33

34
	if (currentProcessShouldHandleSync === false) {
35
		logger.trace('Extensions already being synced to this machine from another process.');
36

37
		// Wait until the process that called the lock publishes a message that the syncing is complete
38
		return new Promise((resolve) => {
39
			messenger.subscribe(machineKey, () => resolve());
40
		});
41
	}
42

43
	try {
44
		const extensionsPath = getExtensionsPath();
45
		const storageExtensionsPath = env['EXTENSIONS_PATH'] as string;
46

47
		if (await exists(extensionsPath)) {
48
			// In case the FS still contains the cached extensions from a previous invocation. We have to
49
			// clear them out to ensure the remote extensions folder remains the source of truth for all
50
			// extensions that are loaded.
51
			await rm(extensionsPath, { recursive: true, force: true });
52
		}
53

54
		// Ensure that the local extensions cache path exists
55
		await mkdir(extensionsPath, { recursive: true });
56
		await setSyncStatus(SyncStatus.SYNCING);
57

58
		logger.trace('Syncing extensions from configured storage location...');
59

60
		const storage = await getStorage();
61

62
		const disk = storage.location(env['EXTENSIONS_LOCATION'] as string);
63

64
		// Make sure we don't overload the file handles
65
		const queue = new Queue({ concurrency: 1000 });
66

67
		for await (const filepath of disk.list(storageExtensionsPath)) {
68
			const readStream = await disk.read(filepath);
69

70
			// We want files to be stored in the root of `$TEMP_PATH/extensions`, so gotta remove the
71
			// extensions path on disk from the start of the file path
72
			const destPath = join(extensionsPath, relative(resolve(sep, storageExtensionsPath), resolve(sep, filepath)));
73

74
			// Ensure that the directory path exists
75
			await mkdir(dirname(destPath), { recursive: true });
76

77
			const writeStream = createWriteStream(destPath);
78

79
			queue.add(() => pipeline(readStream, writeStream));
80
		}
81

82
		await queue.onIdle();
83
		await setSyncStatus(SyncStatus.DONE);
84
		messenger.publish(machineKey, { ready: true });
85
	} finally {
86
		await lock.delete(machineKey);
87
	}
88
};
89

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.