ClickHouse
173 строки · 6.7 Кб
1#include <Columns/ColumnString.h>
2#include <DataTypes/DataTypeString.h>
3#include <Functions/FunctionFactory.h>
4#include <Functions/FunctionHelpers.h>
5#include <Functions/IFunction.h>
6#include <Interpreters/Context.h>
7#include <IO/ReadBufferFromString.h>
8#include <Common/FieldVisitorToString.h>
9#include "config.h"
10
11#if USE_RAPIDJSON
12
13#include "rapidjson/document.h"
14#include "rapidjson/writer.h"
15#include "rapidjson/stringbuffer.h"
16#include "rapidjson/filewritestream.h"
17#include "rapidjson/prettywriter.h"
18#include "rapidjson/filereadstream.h"
19
20
21namespace DB
22{
23
24namespace ErrorCodes
25{
26extern const int BAD_ARGUMENTS;
27extern const int ILLEGAL_COLUMN;
28extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
29extern const int ILLEGAL_TYPE_OF_ARGUMENT;
30}
31
32namespace
33{
34// select jsonMergePatch('{"a":1}','{"name": "joey"}','{"name": "tom"}','{"name": "zoey"}');
35// ||
36// \/
37// ┌───────────────────────┐
38// │ {"a":1,"name":"zoey"} │
39// └───────────────────────┘
40class FunctionjsonMergePatch : public IFunction
41{
42public:
43static constexpr auto name = "jsonMergePatch";
44static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionjsonMergePatch>(); }
45
46String getName() const override { return name; }
47bool isVariadic() const override { return true; }
48bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
49
50size_t getNumberOfArguments() const override { return 0; }
51bool useDefaultImplementationForConstants() const override { return true; }
52
53DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
54{
55if (arguments.empty())
56throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires at least one argument.", getName());
57
58for (const auto & arg : arguments)
59if (!isString(arg.type))
60throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Function {} requires string arguments", getName());
61
62return std::make_shared<DataTypeString>();
63}
64
65ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
66{
67chassert(!arguments.empty());
68
69rapidjson::Document::AllocatorType allocator;
70std::function<void(rapidjson::Value &, const rapidjson::Value &)> merge_objects;
71
72merge_objects = [&merge_objects, &allocator](rapidjson::Value & dest, const rapidjson::Value & src) -> void
73{
74if (!src.IsObject())
75return;
76
77for (auto it = src.MemberBegin(); it != src.MemberEnd(); ++it)
78{
79rapidjson::Value key(it->name, allocator);
80rapidjson::Value value(it->value, allocator);
81if (dest.HasMember(key))
82{
83if (dest[key].IsObject() && value.IsObject())
84merge_objects(dest[key], value);
85else
86dest[key] = value;
87}
88else
89{
90dest.AddMember(key, value, allocator);
91}
92}
93};
94
95auto parse_json_document = [](const ColumnString & column, rapidjson::Document & document, size_t i)
96{
97auto str_ref = column.getDataAt(i);
98const char * json = str_ref.data;
99
100document.Parse(json);
101if (document.HasParseError() || !document.IsObject())
102throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong JSON string to merge. Expected JSON object");
103};
104
105const bool is_first_const = isColumnConst(*arguments[0].column);
106const auto * first_column_arg_string = is_first_const
107? checkAndGetColumnConstData<ColumnString>(arguments[0].column.get())
108: checkAndGetColumn<ColumnString>(arguments[0].column.get());
109
110if (!first_column_arg_string)
111throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Arguments of function {} must be strings", getName());
112
113std::vector<rapidjson::Document> merged_jsons;
114merged_jsons.reserve(input_rows_count);
115
116for (size_t i = 0; i < input_rows_count; ++i)
117{
118auto & merged_json = merged_jsons.emplace_back(rapidjson::Type::kObjectType, &allocator);
119if (is_first_const)
120parse_json_document(*first_column_arg_string, merged_json, 0);
121else
122parse_json_document(*first_column_arg_string, merged_json, i);
123}
124
125for (size_t col_idx = 1; col_idx < arguments.size(); ++col_idx)
126{
127const bool is_const = isColumnConst(*arguments[col_idx].column);
128const auto * column_arg_string = is_const
129? checkAndGetColumnConstData<ColumnString>(arguments[col_idx].column.get())
130: checkAndGetColumn<ColumnString>(arguments[col_idx].column.get());
131
132if (!column_arg_string)
133throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Arguments of function {} must be strings", getName());
134
135for (size_t i = 0; i < input_rows_count; ++i)
136{
137rapidjson::Document document(&allocator);
138if (is_const)
139parse_json_document(*column_arg_string, document, 0);
140else
141parse_json_document(*column_arg_string, document, i);
142merge_objects(merged_jsons[i], document);
143}
144}
145
146auto result = ColumnString::create();
147auto & result_string = assert_cast<ColumnString &>(*result);
148rapidjson::CrtAllocator buffer_allocator;
149
150for (size_t i = 0; i < input_rows_count; ++i)
151{
152rapidjson::StringBuffer buffer(&buffer_allocator);
153rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
154
155merged_jsons[i].Accept(writer);
156result_string.insertData(buffer.GetString(), buffer.GetSize());
157}
158
159return result;
160}
161};
162
163}
164
165REGISTER_FUNCTION(jsonMergePatch)
166{
167factory.registerFunction<FunctionjsonMergePatch>(FunctionDocumentation{
168.description="Returns the merged JSON object string, which is formed by merging multiple JSON objects."});
169}
170
171}
172
173#endif
174