13
#define USING_LOG_PREFIX SERVER_OMT
14
#include "observer/omt/ob_multi_tenant.h"
15
#include "observer/omt/ob_multi_tenant_operator.h"
16
#include "share/rc/ob_tenant_base.h"
17
#include "observer/ob_server_struct.h"
18
#include "observer/omt/ob_tenant.h"
25
ObMultiTenantOperator::ObMultiTenantOperator() :
31
ObMultiTenantOperator::~ObMultiTenantOperator()
36
void ObMultiTenantOperator::reset()
41
if (tenant_idx_ >= 0 && tenant_idx_ < tenant_ids_.size()) {
42
uint64_t tenant_id = tenant_ids_.at(tenant_idx_);
43
if (tenant_ != nullptr) {
44
if (tenant_id != tenant_->id()) {
45
LOG_ERROR("ObMultiTenantOperator::reset", K(tenant_id), K(tenant_->id()));
49
share::ObTenantSwitchGuard guard(tenant_);
50
release_last_tenant();
52
tenant_->unlock(handle_);
67
int ObMultiTenantOperator::init()
70
ObMultiTenant *omt = GCTX.omt_;
71
TenantIdList tenant_list;
73
ret = OB_ERR_UNEXPECTED;
74
LOG_ERROR("operator init", K(ret));
77
LOG_ERROR("operator init", K(ret));
79
if (tenant_ids_.size() != 0 || tenant_idx_ != -1) {
80
ret = OB_ERR_UNEXPECTED;
81
LOG_ERROR("operator init", K(ret), K(tenant_idx_), K(tenant_ids_));
82
} else if (FALSE_IT(omt->get_tenant_ids(tenant_list))) {
84
for (int i=0; i < tenant_list.size(); i++) {
85
uint64_t tenant_id = tenant_list.at(i);
86
if (!is_virtual_tenant_id(tenant_id) && is_need_process(tenant_id)) {
87
tenant_ids_.push_back(tenant_id);
96
int ObMultiTenantOperator::execute(common::ObNewRow *&row)
103
if (tenant_idx_ == -1) {
106
while (OB_SUCC(ret)) {
107
if (tenant_idx_ >= tenant_ids_.size()) {
111
uint64_t tenant_id = tenant_ids_.at(tenant_idx_);
112
int process_ret = OB_SUCCESS;
113
if (tenant_ == nullptr) {
114
if (OB_FAIL(GCTX.omt_->get_active_tenant_with_tenant_lock(tenant_id, handle_, tenant_))) {
115
LOG_WARN("get_tenant_with_tenant_lock", K(ret), K(tenant_id));
119
if (tenant_->id() != tenant_id) {
120
LOG_ERROR("ObMultiTenantOperator tenant mismatch", K(tenant_ids_), K(tenant_idx_), K(tenant_id), K(tenant_->id()));
125
share::ObTenantSwitchGuard guard(tenant_);
126
process_ret = process_curr_tenant(row);
129
if (process_ret == OB_SUCCESS) {
132
} else if (process_ret == OB_ITER_END) {
135
share::ObTenantSwitchGuard guard(tenant_);
136
release_last_tenant();
138
tenant_->unlock(handle_);
143
LOG_WARN("operator process", K(ret), K(lbt()));
145
} else if (ret == OB_TENANT_NOT_IN_SERVER) {