Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.pipe.it.dual.treemodel.auto.basic;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
import org.apache.iotdb.rpc.TSStatusCode;

import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2DualTreeAutoBasic.class})
public class IoTDBPipeMutualSSLIT extends AbstractPipeDualTreeModelAutoIT {

private static final String STORE_PASSWORD = "thrift";

@Override
protected void setupConfig() {
super.setupConfig();

senderEnv
.getConfig()
.getCommonConfig()
.setTrustStorePath(trustStorePath())
.setTrustStorePwd(STORE_PASSWORD)
.setSslProtocol(SessionConfig.DEFAULT_SSL_PROTOCOL);
receiverEnv
.getConfig()
.getCommonConfig()
.setEnableThriftClientSSL(true)
.setThriftSSLClientAuth(true)
.setKeyStorePath(keyStorePath())
.setKeyStorePwd(STORE_PASSWORD)
.setTrustStorePath(trustStorePath())
.setTrustStorePwd(STORE_PASSWORD)
.setSslProtocol(SessionConfig.DEFAULT_SSL_PROTOCOL);
}

@Test
public void testPipeCanTransferWithMutualSSL() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.pipe_mtls.d1(time, s1) values (1, 11)",
"insert into root.pipe_mtls.d1(time, s1) values (2, 22)",
"flush"),
null);

final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> sinkAttributes = new HashMap<>();

sourceAttributes.put("source.realtime.mode", "log");
sourceAttributes.put("source.user", "root");

sinkAttributes.put("sink", "iotdb-thrift-ssl-sink");
sinkAttributes.put("sink.batch.enable", "false");
sinkAttributes.put("sink.ip", receiverDataNode.getIp());
sinkAttributes.put("sink.port", Integer.toString(receiverDataNode.getPort()));
sinkAttributes.put("sink.ssl.trust-store-path", trustStorePath());
sinkAttributes.put("sink.ssl.trust-store-pwd", STORE_PASSWORD);
sinkAttributes.put("sink.ssl.key-store-path", keyStorePath());
sinkAttributes.put("sink.ssl.key-store-pwd", STORE_PASSWORD);

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(sourceAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList("insert into root.pipe_mtls.d1(time, s1) values (3, 33)", "flush"),
null);

try (final Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX
+ receiverDataNode.getIpAndPortString()
+ "?"
+ Config.USE_SSL
+ "=true&"
+ Config.TRUST_STORE
+ "="
+ trustStorePath()
+ "&"
+ Config.TRUST_STORE_PWD
+ "="
+ STORE_PASSWORD
+ "&"
+ Config.KEY_STORE
+ "="
+ keyStorePath()
+ "&"
+ Config.KEY_STORE_PWD
+ "="
+ STORE_PASSWORD,
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD);
final Statement statement = connection.createStatement()) {
Awaitility.await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
.pollInterval(1L, TimeUnit.SECONDS)
.atMost(600, TimeUnit.SECONDS)
.untilAsserted(
() ->
TestUtils.assertResultSetEqual(
statement.executeQuery("select s1 from root.pipe_mtls.d1"),
"Time,root.pipe_mtls.d1.s1,",
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList("1,11.0,", "2,22.0,", "3,33.0,")))));
}
}
}

private static String keyStorePath() {
return keyDir() + "test-keystore";
}

private static String trustStorePath() {
return keyDir() + "test-truststore";
}

private static String keyDir() {
return System.getProperty("user.dir")
+ File.separator
+ "target"
+ File.separator
+ "test-classes"
+ File.separator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ public static class ValueHider {

static {
KEYS.add("ssl.trust-store-pwd");
KEYS.add("ssl.key-store-pwd");
KEYS.add("scp.password");
KEYS.add("password");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,16 @@ public void keyReducerTest() {
parameters.addAttribute("opcua.sink.value-name", "false");
Assert.assertNull(parameters.getString("value-name"));
}

@Test
public void valueHiderShouldHideSslKeyStorePassword() {
Assert.assertEquals(
"******", PipeParameters.ValueHider.hide("sink.ssl.key-store-pwd", "secret"));
Assert.assertEquals(
"******", PipeParameters.ValueHider.hide("connector.ssl.key-store-pwd", "secret"));
Assert.assertEquals("******", PipeParameters.ValueHider.hide("ssl.key-store-pwd", "secret"));
Assert.assertEquals(
"******", PipeParameters.ValueHider.hide("connector.ssl.trust-store-pwd", "secret"));
Assert.assertEquals("secret", PipeParameters.ValueHider.hide("ssl.key-store-path", "secret"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public IoTDBConnection(String url, Properties info) throws SQLException, TTransp
}
params = Utils.parseUrl(url, info);
this.url = url;
this.userName = info.get("user").toString();
this.userName = params.getUsername();
this.networkTimeout = params.getNetworkTimeout();
this.zoneId = ZoneId.of(params.getTimeZone());
this.charset = params.getCharset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public IoTDBConfigNodeSyncClientManager(
boolean useSSL,
String trustStorePath,
String trustStorePwd,
String keyStorePath,
String keyStorePwd,
/* The following parameters are used locally. */
String loadBalanceStrategy,
/* The following parameters are used to handshake with the receiver. */
Expand All @@ -54,6 +56,8 @@ public IoTDBConfigNodeSyncClientManager(
useSSL,
trustStorePath,
trustStorePwd,
keyStorePath,
keyStorePwd,
false,
loadBalanceStrategy,
userEntity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ protected IoTDBSyncClientManager constructClient(
final boolean useSSL,
final String trustStorePath,
final String trustStorePwd,
final String keyStorePath,
final String keyStorePwd,
/* The following parameters are used locally. */
final boolean useLeaderCache,
final String loadBalanceStrategy,
Expand All @@ -84,6 +86,8 @@ protected IoTDBSyncClientManager constructClient(
useSSL,
Objects.nonNull(trustStorePath) ? ConfigNodeConfig.addHomeDir(trustStorePath) : null,
trustStorePwd,
Objects.nonNull(keyStorePath) ? ConfigNodeConfig.addHomeDir(keyStorePath) : null,
keyStorePwd,
loadBalanceStrategy,
userEntity,
password,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,50 @@ public void testIoTDBSchemaConnector() {
Assert.fail();
}
}

@Test
public void testIoTDBConfigRegionSinkAcceptsMutualSslParameters() {
try (IoTDBConfigRegionSink connector = new IoTDBConfigRegionSink()) {
connector.validate(
new PipeParameterValidator(
new PipeParameters(
new HashMap<String, String>() {
{
put(
PipeSinkConstant.CONNECTOR_KEY,
BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName());
put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668");
put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, "truststore");
put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, "trustpwd");
put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, "keystore");
put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY, "keypwd");
}
})));
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}

@Test
public void testIoTDBConfigRegionSinkRejectsIncompleteKeyStoreParameters() {
try (IoTDBConfigRegionSink connector = new IoTDBConfigRegionSink()) {
connector.validate(
new PipeParameterValidator(
new PipeParameters(
new HashMap<String, String>() {
{
put(
PipeSinkConstant.CONNECTOR_KEY,
BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName());
put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668");
put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, "truststore");
put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, "trustpwd");
put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, "keystore");
}
})));
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(PipeSinkConstant.SINK_IOTDB_SSL_KEY_STORE_PWD_KEY));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public IoTDBDataNodeSyncClientManager(
final boolean useSSL,
final String trustStorePath,
final String trustStorePwd,
final String keyStorePath,
final String keyStorePwd,
/* The following parameters are used locally. */
final boolean useLeaderCache,
final String loadBalanceStrategy,
Expand All @@ -65,6 +67,8 @@ public IoTDBDataNodeSyncClientManager(
useSSL,
trustStorePath,
trustStorePwd,
keyStorePath,
keyStorePwd,
useLeaderCache,
loadBalanceStrategy,
userEntity,
Expand Down
Loading
Loading