1
import { EventEmitter, on } from 'events';
2
import { useBus } from '../../bus/index.js';
3
import type { GraphQLService } from './index.js';
4
import { getSchema } from '../../utils/get-schema.js';
5
import type { GraphQLResolveInfo, SelectionNode } from 'graphql';
6
import { refreshAccountability } from '../../websocket/authenticate.js';
7
import { getPayload } from '../../websocket/utils/items.js';
8
import type { Subscription } from '../../websocket/types.js';
9
import type { WebSocketEvent } from '../../websocket/messages.js';
11
const messages = createPubSub(new EventEmitter());
13
export function bindPubSub() {
14
const messenger = useBus();
16
messenger.subscribe('websocket.event', (message: Record<string, any>) => {
17
messages.publish(`${message['collection']}_mutated`, message);
21
export function createSubscriptionGenerator(self: GraphQLService, event: string) {
22
return async function* (_x: unknown, _y: unknown, _z: unknown, request: GraphQLResolveInfo) {
23
const fields = parseFields(self, request);
24
const args = parseArguments(request);
26
for await (const payload of messages.subscribe(event)) {
27
const eventData = payload as WebSocketEvent;
29
if ('event' in args && eventData['action'] !== args['event']) {
30
continue; // skip filtered events
33
const accountability = await refreshAccountability(self.accountability);
34
const schema = await getSchema();
36
const subscription: Omit<Subscription, 'client'> = {
37
collection: eventData['collection'],
38
event: eventData['action'],
42
if (eventData['action'] === 'delete') {
43
// we have no data to send besides the key
44
for (const key of eventData.keys) {
45
yield { [event]: { key, data: null, event: eventData['action'] } };
49
if (eventData['action'] === 'create') {
51
subscription.item = eventData['key'];
52
const result = await getPayload(subscription, accountability, schema, eventData);
56
key: eventData['key'],
58
event: eventData['action'],
62
// dont notify the subscription of permission errors
66
if (eventData['action'] === 'update') {
67
for (const key of eventData['keys']) {
69
subscription.item = key;
70
const result = await getPayload(subscription, accountability, schema, eventData);
76
event: eventData['action'],
80
// dont notify the subscription of permission errors
88
function createPubSub<P extends { [key: string]: unknown }>(emitter: EventEmitter) {
90
publish: <T extends Extract<keyof P, string>>(event: T, payload: P[T]) =>
91
void emitter.emit(event as string, payload),
92
subscribe: async function* <T extends Extract<keyof P, string>>(event: T): AsyncIterableIterator<P[T]> {
93
const asyncIterator = on(emitter, event);
95
for await (const [value] of asyncIterator) {
102
function parseFields(service: GraphQLService, request: GraphQLResolveInfo) {
103
const selections = request.fieldNodes[0]?.selectionSet?.selections ?? [];
105
const dataSelections = selections.reduce((result: readonly SelectionNode[], selection: SelectionNode) => {
107
selection.kind === 'Field' &&
108
selection.name.value === 'data' &&
109
selection.selectionSet?.kind === 'SelectionSet'
111
return selection.selectionSet.selections;
117
const { fields } = service.getQuery({}, dataSelections, request.variableValues);
121
function parseArguments(request: GraphQLResolveInfo) {
122
const args = request.fieldNodes[0]?.arguments ?? [];
124
(result, current) => {
125
if ('value' in current.value && typeof current.value.value === 'string') {
126
result[current.name.value] = current.value.value;
131
{} as Record<string, string>,