directus

Форк
0
/
subscription.ts 
133 строки · 3.9 Кб
1
import { EventEmitter, on } from 'events';
2
import { useBus } from '../../bus/index.js';
3
import type { GraphQLService } from './index.js';
4
import { getSchema } from '../../utils/get-schema.js';
5
import type { GraphQLResolveInfo, SelectionNode } from 'graphql';
6
import { refreshAccountability } from '../../websocket/authenticate.js';
7
import { getPayload } from '../../websocket/utils/items.js';
8
import type { Subscription } from '../../websocket/types.js';
9
import type { WebSocketEvent } from '../../websocket/messages.js';
10

11
const messages = createPubSub(new EventEmitter());
12

13
export function bindPubSub() {
14
	const messenger = useBus();
15

16
	messenger.subscribe('websocket.event', (message: Record<string, any>) => {
17
		messages.publish(`${message['collection']}_mutated`, message);
18
	});
19
}
20

21
export function createSubscriptionGenerator(self: GraphQLService, event: string) {
22
	return async function* (_x: unknown, _y: unknown, _z: unknown, request: GraphQLResolveInfo) {
23
		const fields = parseFields(self, request);
24
		const args = parseArguments(request);
25

26
		for await (const payload of messages.subscribe(event)) {
27
			const eventData = payload as WebSocketEvent;
28

29
			if ('event' in args && eventData['action'] !== args['event']) {
30
				continue; // skip filtered events
31
			}
32

33
			const accountability = await refreshAccountability(self.accountability);
34
			const schema = await getSchema();
35

36
			const subscription: Omit<Subscription, 'client'> = {
37
				collection: eventData['collection'],
38
				event: eventData['action'],
39
				query: { fields },
40
			};
41

42
			if (eventData['action'] === 'delete') {
43
				// we have no data to send besides the key
44
				for (const key of eventData.keys) {
45
					yield { [event]: { key, data: null, event: eventData['action'] } };
46
				}
47
			}
48

49
			if (eventData['action'] === 'create') {
50
				try {
51
					subscription.item = eventData['key'];
52
					const result = await getPayload(subscription, accountability, schema, eventData);
53

54
					yield {
55
						[event]: {
56
							key: eventData['key'],
57
							data: result['data'],
58
							event: eventData['action'],
59
						},
60
					};
61
				} catch {
62
					// dont notify the subscription of permission errors
63
				}
64
			}
65

66
			if (eventData['action'] === 'update') {
67
				for (const key of eventData['keys']) {
68
					try {
69
						subscription.item = key;
70
						const result = await getPayload(subscription, accountability, schema, eventData);
71

72
						yield {
73
							[event]: {
74
								key,
75
								data: result['data'],
76
								event: eventData['action'],
77
							},
78
						};
79
					} catch {
80
						// dont notify the subscription of permission errors
81
					}
82
				}
83
			}
84
		}
85
	};
86
}
87

88
function createPubSub<P extends { [key: string]: unknown }>(emitter: EventEmitter) {
89
	return {
90
		publish: <T extends Extract<keyof P, string>>(event: T, payload: P[T]) =>
91
			void emitter.emit(event as string, payload),
92
		subscribe: async function* <T extends Extract<keyof P, string>>(event: T): AsyncIterableIterator<P[T]> {
93
			const asyncIterator = on(emitter, event);
94

95
			for await (const [value] of asyncIterator) {
96
				yield value;
97
			}
98
		},
99
	};
100
}
101

102
function parseFields(service: GraphQLService, request: GraphQLResolveInfo) {
103
	const selections = request.fieldNodes[0]?.selectionSet?.selections ?? [];
104

105
	const dataSelections = selections.reduce((result: readonly SelectionNode[], selection: SelectionNode) => {
106
		if (
107
			selection.kind === 'Field' &&
108
			selection.name.value === 'data' &&
109
			selection.selectionSet?.kind === 'SelectionSet'
110
		) {
111
			return selection.selectionSet.selections;
112
		}
113

114
		return result;
115
	}, []);
116

117
	const { fields } = service.getQuery({}, dataSelections, request.variableValues);
118
	return fields ?? [];
119
}
120

121
function parseArguments(request: GraphQLResolveInfo) {
122
	const args = request.fieldNodes[0]?.arguments ?? [];
123
	return args.reduce(
124
		(result, current) => {
125
			if ('value' in current.value && typeof current.value.value === 'string') {
126
				result[current.name.value] = current.value.value;
127
			}
128

129
			return result;
130
		},
131
		{} as Record<string, string>,
132
	);
133
}
134

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.