Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.audit.AuditLogOperation;
import org.apache.iotdb.commons.audit.IAuditEntity;
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.auth.entity.PrivilegeUnion;
import org.apache.iotdb.commons.conf.CommonDescriptor;
Expand Down Expand Up @@ -95,7 +96,6 @@
import org.apache.iotdb.confignode.manager.pipe.metric.receiver.PipeConfigNodeReceiverMetrics;
import org.apache.iotdb.confignode.manager.pipe.receiver.visitor.PipeConfigPhysicalPlanExceptionVisitor;
import org.apache.iotdb.confignode.manager.pipe.receiver.visitor.PipeConfigPhysicalPlanTSStatusVisitor;
import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigNodeHandshakeV1Req;
import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigNodeHandshakeV2Req;
import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigPlanReq;
import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotPieceReq;
Expand Down Expand Up @@ -185,21 +185,26 @@ public TPipeTransferResp receive(final TPipeTransferReq req) {
.setMessage(
"The receiver ConfigNode has set up a new receiver and the sender must re-send its handshake request."));
}
final TPipeTransferResp authResp = checkPipeTransferAuthenticated(type);
if (Objects.nonNull(authResp)) {
return authResp;
}
final TPipeTransferResp resp;
final long startTime = System.nanoTime();
switch (type) {
case HANDSHAKE_CONFIGNODE_V1:
resp =
handleTransferHandshakeV1(
PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req));
resp = new TPipeTransferResp(getUnsupportedHandshakeV1Status());
PipeConfigNodeReceiverMetrics.getInstance()
.recordHandshakeConfigNodeV1Timer(System.nanoTime() - startTime);
return resp;
case HANDSHAKE_CONFIGNODE_V2:
resp =
handleTransferHandshakeV2(
PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req));
userEntity.setAuditLogOperation(AuditLogOperation.DDL);
if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& Objects.nonNull(userEntity)) {
userEntity.setAuditLogOperation(AuditLogOperation.DDL);
}
PipeConfigNodeReceiverMetrics.getInstance()
.recordHandshakeConfigNodeV2Timer(System.nanoTime() - startTime);
return resp;
Expand Down Expand Up @@ -262,6 +267,37 @@ private boolean needHandshake(final PipeRequestType type) {
&& type != PipeRequestType.HANDSHAKE_CONFIGNODE_V2;
}

private TPipeTransferResp checkPipeTransferAuthenticated(final PipeRequestType type) {
if (!requiresAuthentication(type)) {
return null;
}

final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
if (hasPipeHandshakeCredential || (clientSession != null && clientSession.isLogin())) {
if (!hasPipeHandshakeCredential && clientSession != null) {
username = clientSession.getUsername();
userEntity =
new UserEntity(clientSession.getUserId(), username, clientSession.getClientAddress())
.setAuditLogOperation(AuditLogOperation.DDL);
}
return null;
}

return new TPipeTransferResp(getNotLoggedInStatus());
}

private static boolean requiresAuthentication(final PipeRequestType type) {
switch (type) {
case TRANSFER_CONFIG_PLAN:
case TRANSFER_CONFIG_SNAPSHOT_PIECE:
case TRANSFER_CONFIG_SNAPSHOT_SEAL:
case TRANSFER_COMPRESSED:
return true;
default:
return false;
}
}

private TPipeTransferResp handleTransferConfigPlan(final PipeTransferConfigPlanReq req)
throws IOException {
return new TPipeTransferResp(
Expand Down Expand Up @@ -1221,11 +1257,19 @@ protected String getClusterId() {
// 2. The detection period (300s) is too long for configPlans.
@Override
protected boolean shouldLogin() {
return true;
final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
return hasPipeHandshakeCredential || clientSession == null || !clientSession.isLogin();
}

@Override
protected TSStatus login() {
final IClientSession session = SESSION_MANAGER.getCurrSession();
if (!hasPipeHandshakeCredential) {
return session != null && session.isLogin()
? RpcUtils.SUCCESS_STATUS
: getNotLoggedInStatus();
}

return configManager.login(username, password, false).getStatus();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.pipe.agent.task.builder;

import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
Expand Down Expand Up @@ -47,6 +48,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY;
Expand Down Expand Up @@ -131,6 +133,8 @@ public PipeDataNodeTask build() {
sinkStage.getPipeSinkPendingQueue(),
PROCESSOR_EXECUTOR,
pipeTaskMeta,
getSourceUserEntity(sourceParameters),
getSourcePassword(sourceParameters),
pipeStaticMeta
.getSinkParameters()
.getStringOrDefault(
Expand All @@ -143,6 +147,38 @@ public PipeDataNodeTask build() {
pipeStaticMeta.getPipeName(), regionId, sourceStage, processorStage, sinkStage);
}

private UserEntity getSourceUserEntity(final PipeParameters sourceParameters) {
final String username =
sourceParameters.getStringByKeys(
PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY,
PipeSourceConstant.SOURCE_IOTDB_USER_KEY,
PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY,
PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY);
if (Objects.isNull(username)) {
return null;
}

final String userId =
sourceParameters.getStringOrDefault(
Arrays.asList(
PipeSourceConstant.EXTRACTOR_IOTDB_USER_ID,
PipeSourceConstant.SOURCE_IOTDB_USER_ID),
"-1");
final String cliHostname =
sourceParameters.getStringOrDefault(
Arrays.asList(
PipeSourceConstant.EXTRACTOR_IOTDB_CLI_HOSTNAME,
PipeSourceConstant.SOURCE_IOTDB_CLI_HOSTNAME),
"");
return new UserEntity(Long.parseLong(userId), username, cliHostname);
}

private String getSourcePassword(final PipeParameters sourceParameters) {
return sourceParameters.getStringByKeys(
PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY,
PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY);
}

private void generateSystemParameters() {
if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex)
|| pipeTaskMeta.isNewlyAdded()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.pipe.agent.task.stage;

import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.agent.task.connection.EventSupplier;
Expand Down Expand Up @@ -70,12 +71,14 @@ public PipeTaskProcessorStage(
final UnboundedBlockingPendingQueue<Event> pipeSinkOutputPendingQueue,
final PipeProcessorSubtaskExecutor executor,
final PipeTaskMeta pipeTaskMeta,
final UserEntity sourceUserEntity,
final String sourcePassword,
final boolean forceTabletFormat,
final boolean skipParsing) {
final PipeProcessorRuntimeConfiguration runtimeConfiguration =
new PipeTaskRuntimeConfiguration(
new PipeTaskProcessorRuntimeEnvironment(
pipeName, creationTime, regionId, pipeTaskMeta));
pipeName, creationTime, regionId, pipeTaskMeta, sourceUserEntity, sourcePassword));
final PipeProcessor pipeProcessor =
StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId))
|| PipeRuntimeMeta.isSourceExternal(regionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest;
import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.RequestType;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
Expand All @@ -37,6 +39,7 @@
public class TwoStageAggregateReceiver implements IoTDBReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageAggregateReceiver.class);
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();

@Override
public IoTDBSinkRequestVersion getVersion() {
Expand All @@ -46,6 +49,14 @@ public IoTDBSinkRequestVersion getVersion() {
@Override
public TPipeTransferResp receive(TPipeTransferReq req) {
try {
final IClientSession clientSession = SESSION_MANAGER.getCurrSession();
if (!SESSION_MANAGER.checkLogin(clientSession)) {
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.NOT_LOGIN,
"Log in failed. Either you are not authorized or the session has timed out."));
}

final short rawRequestType = req.getType();
if (RequestType.isValidatedRequestType(rawRequestType)) {
switch (RequestType.valueOf(rawRequestType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.iotdb.db.pipe.processor.twostage.exchange.sender;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
Expand All @@ -32,14 +34,18 @@
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;

import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.ZoneId;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -55,8 +61,12 @@ public class TwoStageAggregateSender implements AutoCloseable {

private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();

private static final String USE_ENCRYPTED_PASSWORD_KEY = "use_encrypted_password";

private final String pipeName;
private final long creationTime;
private final UserEntity sourceUserEntity;
private final String sourcePassword;

private static final AtomicLong DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME = new AtomicLong(0);
private static final AtomicReference<Map<Integer, TEndPoint>> DATANODE_ID_2_END_POINTS =
Expand All @@ -67,8 +77,15 @@ public class TwoStageAggregateSender implements AutoCloseable {
new ConcurrentHashMap<>();

public TwoStageAggregateSender(String pipeName, long creationTime) {
this(pipeName, creationTime, null, null);
}

public TwoStageAggregateSender(
String pipeName, long creationTime, UserEntity sourceUserEntity, String sourcePassword) {
this.pipeName = pipeName;
this.creationTime = creationTime;
this.sourceUserEntity = sourceUserEntity;
this.sourcePassword = sourcePassword;
}

public synchronized TPipeTransferResp request(long watermark, TPipeTransferReq req)
Expand Down Expand Up @@ -178,7 +195,7 @@ private void tryConstructClients(boolean endPointsChanged) {

try {
endPointIoTDBSyncClientMap.put(endPoint, constructIoTDBSyncClient(endPoint));
} catch (TTransportException e) {
} catch (TException e) {
LOGGER.warn(DataNodePipeMessages.FAILED_TO_CONSTRUCT_IOTDBSYNCCLIENT, e);
}
}
Expand All @@ -194,8 +211,7 @@ private void tryConstructClients(boolean endPointsChanged) {
}
}

private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint)
throws TTransportException {
private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint) throws TException {
final IoTDBSyncClient oldClient = endPointIoTDBSyncClientMap.remove(endPoint);
if (oldClient != null) {
try {
Expand All @@ -209,17 +225,46 @@ private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint)
return newClient;
}

private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TTransportException {
return new IoTDBSyncClient(
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
.build(),
endPoint.getIp(),
endPoint.getPort(),
false,
null,
null);
private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TException {
final IoTDBSyncClient client =
new IoTDBSyncClient(
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
.build(),
endPoint.getIp(),
endPoint.getPort(),
false,
null,
null);
openSession(client);
return client;
}

private void openSession(final IoTDBSyncClient client) throws TException {
if (Objects.isNull(sourceUserEntity) || Objects.isNull(sourcePassword)) {
throw new PipeException(
String.format(
"Missing source credentials for two-stage aggregate pipe %s-%s.",
pipeName, creationTime));
}

final TSOpenSessionReq openSessionReq = new TSOpenSessionReq();
openSessionReq.setUsername(sourceUserEntity.getUsername());
openSessionReq.setPassword(sourcePassword);
openSessionReq.setZoneId(ZoneId.systemDefault().toString());
openSessionReq.setClient_protocol(TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
openSessionReq.putToConfiguration("version", IoTDBConstant.ClientVersion.V_1_0.toString());
openSessionReq.putToConfiguration(USE_ENCRYPTED_PASSWORD_KEY, Boolean.TRUE.toString());

final TSOpenSessionResp openSessionResp = client.openSession(openSessionReq);
if (openSessionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
"Failed to login for two-stage aggregate pipe %s-%s, status: %s.",
pipeName, creationTime, openSessionResp.getStatus()));
}
client.setTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.pipe.processor.twostage.plugin;

import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
Expand Down Expand Up @@ -102,6 +103,9 @@ public class TwoStageCountProcessor implements PipeProcessor {
private final Queue<Pair<long[], ProgressIndex> /* ([timestamp, local count], progress index) */>
localCommitQueue = new ConcurrentLinkedQueue<>();

private UserEntity sourceUserEntity;
private String sourcePassword;

private TwoStageAggregateSender twoStageAggregateSender;
private final Queue<Pair<Long, Long> /* (timestamp, global count) */> globalCountQueue =
new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -139,6 +143,8 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati
creationTime = runtimeEnvironment.getCreationTime();
regionId = runtimeEnvironment.getRegionId();
pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta();
sourceUserEntity = runtimeEnvironment.getSourceUserEntity();
sourcePassword = runtimeEnvironment.getSourcePassword();
dataBaseName =
StorageEngine.getInstance()
.getDataRegion(new DataRegionId(runtimeEnvironment.getRegionId()))
Expand Down Expand Up @@ -176,7 +182,8 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati
PipeCombineHandlerManager.getInstance()
.register(
pipeName, creationTime, (combineId) -> new CountOperator(combineId, globalCountQueue));
twoStageAggregateSender = new TwoStageAggregateSender(pipeName, creationTime);
twoStageAggregateSender =
new TwoStageAggregateSender(pipeName, creationTime, sourceUserEntity, sourcePassword);
}

static PartialPath parseOutputSeries(final PipeParameters parameters)
Expand Down
Loading
Loading