1
import { ForbiddenError, InvalidPayloadError } from '@directus/errors';
10
} from '@directus/types';
11
import { parseJSON, toArray } from '@directus/utils';
12
import { format, isValid, parseISO } from 'date-fns';
13
import { unflatten } from 'flat';
15
import type { Knex } from 'knex';
16
import { clone, cloneDeep, isNil, isObject, isPlainObject, omit, pick } from 'lodash-es';
17
import { randomUUID } from 'node:crypto';
18
import { parse as wktToGeoJSON } from 'wellknown';
19
import type { Helpers } from '../database/helpers/index.js';
20
import { getHelpers } from '../database/helpers/index.js';
21
import getDatabase from '../database/index.js';
22
import type { AbstractServiceOptions, ActionEventParams, MutationOptions } from '../types/index.js';
23
import { generateHash } from '../utils/generate-hash.js';
25
type Action = 'create' | 'read' | 'update';
28
[type: string]: (context: {
31
payload: Partial<Item>;
32
accountability: Accountability | null;
42
export class PayloadService {
43
accountability: Accountability | null;
47
schema: SchemaOverview;
49
constructor(collection: string, options: AbstractServiceOptions) {
50
this.accountability = options.accountability || null;
51
this.knex = options.knex || getDatabase();
52
this.helpers = getHelpers(this.knex);
53
this.collection = collection;
54
this.schema = options.schema;
59
public transformers: Transformers = {
60
async hash({ action, value }) {
63
if (action === 'create' || action === 'update') {
64
return await generateHash(String(value));
69
async uuid({ action, value }) {
70
if (action === 'create' && !value) {
76
async 'cast-boolean'({ action, value }) {
77
if (action === 'read') {
78
if (value === true || value === 1 || value === '1') {
80
} else if (value === false || value === 0 || value === '0') {
82
} else if (value === null || value === '') {
89
async 'cast-json'({ action, value }) {
90
if (action === 'read') {
91
if (typeof value === 'string') {
93
return parseJSON(value);
102
async conceal({ action, value }) {
103
if (action === 'read') return value ? '**********' : null;
106
async 'user-created'({ action, value, accountability }) {
107
if (action === 'create') return accountability?.user || null;
110
async 'user-updated'({ action, value, accountability }) {
111
if (action === 'update') return accountability?.user || null;
114
async 'role-created'({ action, value, accountability }) {
115
if (action === 'create') return accountability?.role || null;
118
async 'role-updated'({ action, value, accountability }) {
119
if (action === 'update') return accountability?.role || null;
122
async 'date-created'({ action, value, helpers }) {
123
if (action === 'create') return new Date(helpers.date.writeTimestamp(new Date().toISOString()));
126
async 'date-updated'({ action, value, helpers }) {
127
if (action === 'update') return new Date(helpers.date.writeTimestamp(new Date().toISOString()));
130
async 'cast-csv'({ action, value }) {
131
if (Array.isArray(value) === false && typeof value !== 'string') return;
133
if (action === 'read') {
134
if (Array.isArray(value)) return value;
136
if (value === '') return [];
138
return value.split(',');
141
if (Array.isArray(value)) {
142
return value.join(',');
149
processValues(action: Action, payloads: Partial<Item>[]): Promise<Partial<Item>[]>;
150
processValues(action: Action, payload: Partial<Item>): Promise<Partial<Item>>;
151
processValues(action: Action, payloads: Partial<Item>[], aliasMap: Record<string, string>): Promise<Partial<Item>[]>;
152
processValues(action: Action, payload: Partial<Item>, aliasMap: Record<string, string>): Promise<Partial<Item>>;
155
payload: Partial<Item> | Partial<Item>[],
156
aliasMap: Record<string, string> = {},
157
): Promise<Partial<Item> | Partial<Item>[]> {
158
const processedPayload = toArray(payload);
160
if (processedPayload.length === 0) return [];
162
const fieldsInPayload = Object.keys(processedPayload[0]!);
163
const fieldEntries = Object.entries(this.schema.collections[this.collection]!.fields);
164
const aliasEntries = Object.entries(aliasMap);
166
let specialFields: [string, FieldOverview][] = [];
168
for (const [name, field] of fieldEntries) {
169
if (field.special && field.special.length > 0) {
170
specialFields.push([name, field]);
172
for (const [aliasName, fieldName] of aliasEntries) {
173
if (fieldName === name) {
174
specialFields.push([aliasName, { ...field, field: aliasName }]);
180
if (action === 'read') {
181
specialFields = specialFields.filter(([name]) => {
182
return fieldsInPayload.includes(name);
187
processedPayload.map(async (record: any) => {
189
specialFields.map(async ([name, field]) => {
190
const newValue = await this.processField(field, record, action, this.accountability);
191
if (newValue !== undefined) record[name] = newValue;
197
this.processGeometries(processedPayload, action);
198
this.processDates(processedPayload, action);
200
if (['create', 'update'].includes(action)) {
201
processedPayload.forEach((record) => {
202
for (const [key, value] of Object.entries(record)) {
203
if (Array.isArray(value) || (typeof value === 'object' && !(value instanceof Date) && value !== null)) {
204
if (!value.isRawInstance) {
205
record[key] = JSON.stringify(value);
212
if (action === 'read') {
213
this.processAggregates(processedPayload);
216
if (Array.isArray(payload)) {
217
return processedPayload;
220
return processedPayload[0]!;
223
processAggregates(payload: Partial<Item>[]) {
224
const aggregateKeys = Object.keys(payload[0]!).filter((key) => key.includes('->'));
226
if (aggregateKeys.length) {
227
for (const item of payload) {
228
Object.assign(item, unflatten(pick(item, aggregateKeys), { delimiter: '->' }));
229
aggregateKeys.forEach((key) => delete item[key]);
235
field: SchemaOverview['collections'][string]['fields'][string],
236
payload: Partial<Item>,
238
accountability: Accountability | null,
240
if (!field.special) return payload[field.field];
241
const fieldSpecials = field.special ? toArray(field.special) : [];
243
let value = clone(payload[field.field]);
245
for (const special of fieldSpecials) {
246
if (special in this.transformers) {
247
value = await this.transformers[special]!({
252
specials: fieldSpecials,
253
helpers: this.helpers,
267
processGeometries<T extends Partial<Record<string, any>>[]>(payloads: T, action: Action): T {
270
? (value: any) => (typeof value === 'string' ? wktToGeoJSON(value) : value)
271
: (value: any) => this.helpers.st.fromGeoJSON(typeof value == 'string' ? parseJSON(value) : value);
273
const fieldsInCollection = Object.entries(this.schema.collections[this.collection]!.fields);
274
const geometryColumns = fieldsInCollection.filter(([_, field]) => field.type.startsWith('geometry'));
276
for (const [name] of geometryColumns) {
277
for (const payload of payloads) {
279
payload[name] = process(payload[name]);
291
processDates(payloads: Partial<Record<string, any>>[], action: Action): Partial<Record<string, any>>[] {
292
const fieldsInCollection = Object.entries(this.schema.collections[this.collection]!.fields);
294
const dateColumns = fieldsInCollection.filter(([_name, field]) =>
295
['dateTime', 'date', 'timestamp'].includes(field.type),
298
const timeColumns = fieldsInCollection.filter(([_name, field]) => {
299
return field.type === 'time';
302
if (dateColumns.length === 0 && timeColumns.length === 0) return payloads;
304
for (const [name, dateColumn] of dateColumns) {
305
for (const payload of payloads) {
306
let value: number | string | Date = payload[name];
308
if (value === null || (typeof value === 'string' && /^[.0 :-]{10,}$/.test(value))) {
309
payload[name] = null;
313
if (!value) continue;
315
if (action === 'read') {
316
if (typeof value === 'number' || typeof value === 'string') {
317
value = new Date(value);
320
if (dateColumn.type === 'timestamp') {
321
const newValue = this.helpers.date.readTimestampString(value.toISOString());
322
payload[name] = newValue;
325
if (dateColumn.type === 'dateTime') {
326
const year = String(value.getFullYear());
327
const month = String(value.getMonth() + 1).padStart(2, '0');
328
const day = String(value.getDate()).padStart(2, '0');
329
const hours = String(value.getHours()).padStart(2, '0');
330
const minutes = String(value.getMinutes()).padStart(2, '0');
331
const seconds = String(value.getSeconds()).padStart(2, '0');
333
const newValue = `${year}-${month}-${day}T${hours}:${minutes}:${seconds}`;
334
payload[name] = newValue;
337
if (dateColumn.type === 'date') {
338
const year = String(value.getFullYear());
339
const month = String(value.getMonth() + 1).padStart(2, '0');
340
const day = String(value.getDate()).padStart(2, '0');
343
const newValue = `${year}-${month}-${day}`;
344
payload[name] = newValue;
347
if (value instanceof Date === false && typeof value === 'string') {
348
if (dateColumn.type === 'date') {
349
const parsedDate = parseISO(value);
351
if (!isValid(parsedDate)) {
352
throw new InvalidPayloadError({ reason: `Invalid Date format in field "${dateColumn.field}"` });
355
payload[name] = parsedDate;
358
if (dateColumn.type === 'dateTime') {
359
const parsedDate = parseISO(value);
361
if (!isValid(parsedDate)) {
362
throw new InvalidPayloadError({ reason: `Invalid DateTime format in field "${dateColumn.field}"` });
365
payload[name] = parsedDate;
368
if (dateColumn.type === 'timestamp') {
369
const newValue = this.helpers.date.writeTimestamp(value);
370
payload[name] = newValue;
381
for (const [name] of timeColumns) {
382
for (const payload of payloads) {
383
const value = payload[name];
385
if (!value) continue;
387
if (action === 'read') {
388
if (value instanceof Date) payload[name] = format(value, 'HH:mm:ss');
401
opts?: MutationOptions,
402
): Promise<{ payload: Partial<Item>; revisions: PrimaryKey[]; nestedActionEvents: ActionEventParams[] }> {
403
const relations = this.schema.relations.filter((relation) => {
404
return relation.collection === this.collection;
407
const revisions: PrimaryKey[] = [];
409
const nestedActionEvents: ActionEventParams[] = [];
411
const payload = cloneDeep(data);
414
const relationsToProcess = relations.filter((relation) => {
415
return relation.field in payload && isPlainObject(payload[relation.field]);
418
for (const relation of relationsToProcess) {
420
if (!relation.meta?.one_collection_field || !relation.meta?.one_allowed_collections) continue;
422
const relatedCollection = payload[relation.meta.one_collection_field];
424
if (!relatedCollection) {
425
throw new InvalidPayloadError({
426
reason: `Can't update nested record "${relation.collection}.${relation.field}" without field "${relation.collection}.${relation.meta.one_collection_field}" being set`,
430
const allowedCollections = relation.meta.one_allowed_collections;
432
if (allowedCollections.includes(relatedCollection) === false) {
433
throw new InvalidPayloadError({
434
reason: `"${relation.collection}.${relation.field}" can't be linked to collection "${relatedCollection}"`,
438
const { getService } = await import('../utils/get-service.js');
440
const service = getService(relatedCollection, {
441
accountability: this.accountability,
446
const relatedPrimary = this.schema.collections[relatedCollection]!.primary;
447
const relatedRecord: Partial<Item> = payload[relation.field];
449
if (['string', 'number'].includes(typeof relatedRecord)) continue;
451
const hasPrimaryKey = relatedPrimary in relatedRecord;
453
let relatedPrimaryKey: PrimaryKey = relatedRecord[relatedPrimary];
458
.select(relatedPrimary)
459
.from(relatedCollection)
460
.where({ [relatedPrimary]: relatedPrimaryKey })
464
const fieldsToUpdate = omit(relatedRecord, relatedPrimary);
466
if (Object.keys(fieldsToUpdate).length > 0) {
467
await service.updateOne(relatedPrimaryKey, relatedRecord, {
468
onRevisionCreate: (pk) => revisions.push(pk),
469
bypassEmitAction: (params) =>
470
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
471
emitEvents: opts?.emitEvents,
472
mutationTracker: opts?.mutationTracker,
476
relatedPrimaryKey = await service.createOne(relatedRecord, {
477
onRevisionCreate: (pk) => revisions.push(pk),
478
bypassEmitAction: (params) =>
479
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
480
emitEvents: opts?.emitEvents,
481
mutationTracker: opts?.mutationTracker,
486
payload[relation.field] = relatedPrimaryKey;
489
return { payload, revisions, nestedActionEvents };
497
opts?: MutationOptions,
498
): Promise<{ payload: Partial<Item>; revisions: PrimaryKey[]; nestedActionEvents: ActionEventParams[] }> {
499
const payload = cloneDeep(data);
502
const revisions: PrimaryKey[] = [];
504
const nestedActionEvents: ActionEventParams[] = [];
507
const relations = this.schema.relations.filter((relation) => {
508
return relation.collection === this.collection;
512
const relationsToProcess = relations.filter((relation) => {
513
return relation.field in payload && isObject(payload[relation.field]);
516
for (const relation of relationsToProcess) {
518
if (!relation.related_collection) continue;
519
const relatedPrimaryKeyField = this.schema.collections[relation.related_collection]!.primary;
521
const { getService } = await import('../utils/get-service.js');
523
const service = getService(relation.related_collection, {
524
accountability: this.accountability,
529
const relatedRecord: Partial<Item> = payload[relation.field];
531
if (['string', 'number'].includes(typeof relatedRecord)) continue;
533
const hasPrimaryKey = relatedPrimaryKeyField in relatedRecord;
535
let relatedPrimaryKey: PrimaryKey = relatedRecord[relatedPrimaryKeyField];
540
.select(relatedPrimaryKeyField)
541
.from(relation.related_collection)
542
.where({ [relatedPrimaryKeyField]: relatedPrimaryKey })
546
const fieldsToUpdate = omit(relatedRecord, relatedPrimaryKeyField);
548
if (Object.keys(fieldsToUpdate).length > 0) {
549
await service.updateOne(relatedPrimaryKey, relatedRecord, {
550
onRevisionCreate: (pk) => revisions.push(pk),
551
bypassEmitAction: (params) =>
552
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
553
emitEvents: opts?.emitEvents,
554
mutationTracker: opts?.mutationTracker,
558
relatedPrimaryKey = await service.createOne(relatedRecord, {
559
onRevisionCreate: (pk) => revisions.push(pk),
560
bypassEmitAction: (params) =>
561
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
562
emitEvents: opts?.emitEvents,
563
mutationTracker: opts?.mutationTracker,
568
payload[relation.field] = relatedPrimaryKey;
571
return { payload, revisions, nestedActionEvents };
580
opts?: MutationOptions,
581
): Promise<{ revisions: PrimaryKey[]; nestedActionEvents: ActionEventParams[] }> {
582
const revisions: PrimaryKey[] = [];
584
const nestedActionEvents: ActionEventParams[] = [];
586
const relations = this.schema.relations.filter((relation) => {
587
return relation.related_collection === this.collection;
590
const payload = cloneDeep(data);
593
const relationsToProcess = relations.filter((relation) => {
594
if (!relation.meta?.one_field) return false;
595
return relation.meta.one_field in payload;
598
const nestedUpdateSchema = Joi.object({
599
create: Joi.array().items(Joi.object().unknown()),
600
update: Joi.array().items(Joi.object().unknown()),
601
delete: Joi.array().items(Joi.string(), Joi.number()),
604
for (const relation of relationsToProcess) {
605
if (!relation.meta) continue;
607
const currentPrimaryKeyField = this.schema.collections[relation.related_collection!]!.primary;
608
const relatedPrimaryKeyField = this.schema.collections[relation.collection]!.primary;
610
const { getService } = await import('../utils/get-service.js');
612
const service = getService(relation.collection, {
613
accountability: this.accountability,
618
const recordsToUpsert: Partial<Item>[] = [];
619
const savedPrimaryKeys: PrimaryKey[] = [];
622
const field = payload[relation.meta!.one_field!];
624
if (!field || Array.isArray(field)) {
625
const updates = field || [];
627
for (let i = 0; i < updates.length; i++) {
628
const relatedRecord = updates[i];
630
let record = cloneDeep(relatedRecord);
632
if (typeof relatedRecord === 'string' || typeof relatedRecord === 'number') {
633
const existingRecord = await this.knex
634
.select(relatedPrimaryKeyField, relation.field)
635
.from(relation.collection)
636
.where({ [relatedPrimaryKeyField]: record })
639
if (!!existingRecord === false) {
640
throw new ForbiddenError();
650
isNil(existingRecord[relation.field]) === false &&
651
(existingRecord[relation.field] == parent ||
652
existingRecord[relation.field] == payload[currentPrimaryKeyField])
654
savedPrimaryKeys.push(existingRecord[relatedPrimaryKeyField]);
659
[relatedPrimaryKeyField]: relatedRecord,
663
recordsToUpsert.push({
665
[relation.field]: parent || payload[currentPrimaryKeyField],
669
savedPrimaryKeys.push(
670
...(await service.upsertMany(recordsToUpsert, {
671
onRevisionCreate: (pk) => revisions.push(pk),
672
bypassEmitAction: (params) =>
673
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
674
emitEvents: opts?.emitEvents,
675
mutationTracker: opts?.mutationTracker,
679
const query: Query = {
688
[relatedPrimaryKeyField]: {
689
_nin: savedPrimaryKeys,
697
if (relation.meta.one_deselect_action === 'delete') {
699
await service.deleteByQuery(query, {
700
bypassEmitAction: (params) =>
701
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
702
emitEvents: opts?.emitEvents,
703
mutationTracker: opts?.mutationTracker,
706
await service.updateByQuery(
708
{ [relation.field]: null },
710
onRevisionCreate: (pk) => revisions.push(pk),
711
bypassEmitAction: (params) =>
712
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
713
emitEvents: opts?.emitEvents,
714
mutationTracker: opts?.mutationTracker,
721
const alterations = field as Alterations;
722
const { error } = nestedUpdateSchema.validate(alterations);
723
if (error) throw new InvalidPayloadError({ reason: `Invalid one-to-many update structure: ${error.message}` });
725
if (alterations.create) {
726
const sortField = relation.meta.sort_field;
728
let createPayload: Alterations['create'];
730
if (sortField !== null) {
731
const highestOrderNumber: Record<'max', number | null> | undefined = await this.knex
732
.from(relation.collection)
733
.where({ [relation.field]: parent })
734
.whereNotNull(sortField)
735
.max(sortField, { as: 'max' })
738
createPayload = alterations.create.map((item, index) => {
739
const record = cloneDeep(item);
742
if (parent !== null && record[sortField] === undefined) {
743
record[sortField] = highestOrderNumber?.max ? highestOrderNumber.max + index + 1 : index + 1;
748
[relation.field]: parent || payload[currentPrimaryKeyField],
752
createPayload = alterations.create.map((item) => ({
754
[relation.field]: parent || payload[currentPrimaryKeyField],
758
await service.createMany(createPayload, {
759
onRevisionCreate: (pk) => revisions.push(pk),
760
bypassEmitAction: (params) =>
761
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
762
emitEvents: opts?.emitEvents,
763
mutationTracker: opts?.mutationTracker,
767
if (alterations.update) {
768
const primaryKeyField = this.schema.collections[relation.collection]!.primary;
770
for (const item of alterations.update) {
771
await service.updateOne(
772
item[primaryKeyField],
775
[relation.field]: parent || payload[currentPrimaryKeyField],
778
onRevisionCreate: (pk) => revisions.push(pk),
779
bypassEmitAction: (params) =>
780
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
781
emitEvents: opts?.emitEvents,
782
mutationTracker: opts?.mutationTracker,
788
if (alterations.delete) {
789
const query: Query = {
798
[relatedPrimaryKeyField]: {
799
_in: alterations.delete,
806
if (relation.meta.one_deselect_action === 'delete') {
807
await service.deleteByQuery(query, {
808
bypassEmitAction: (params) =>
809
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
810
emitEvents: opts?.emitEvents,
811
mutationTracker: opts?.mutationTracker,
814
await service.updateByQuery(
816
{ [relation.field]: null },
818
onRevisionCreate: (pk) => revisions.push(pk),
819
bypassEmitAction: (params) =>
820
opts?.bypassEmitAction ? opts.bypassEmitAction(params) : nestedActionEvents.push(params),
821
emitEvents: opts?.emitEvents,
822
mutationTracker: opts?.mutationTracker,
830
return { revisions, nestedActionEvents };
837
async prepareDelta(data: Partial<Item>): Promise<string | null> {
838
let payload = cloneDeep(data);
840
for (const key in payload) {
841
if (payload[key]?.isRawInstance) {
842
payload[key] = payload[key].bindings[0];
846
payload = await this.processValues('read', payload);
848
if (Object.keys(payload).length === 0) return null;
850
return JSON.stringify(payload);