Browse Source

增加元数据的查询schemalist方法

master
xueyinfei 4 weeks ago
parent
commit
268276d994
  1. 15
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java
  2. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
  3. 118
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java

15
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java

@ -242,6 +242,21 @@ public class DataSourceController extends BaseController {
return dataSourceService.checkConnection(dataSourceParam.getType(), connectionParams);
}
@ApiOperation(value = "listSchemas", notes = "CONNECT_DATA_SOURCE_AND_LISTSCHEMAS")
@GetMapping(value = "/schemas")
@ResponseStatus(HttpStatus.OK)
@ApiException(CONNECT_DATASOURCE_FAILURE)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result getSchemasList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "datasource") int datasource) {
BaseDataSourceParamDTO dataSourceParam = dataSourceService.queryDataSourceById(datasource);
DataSourceUtils.checkDatasourceParam(dataSourceParam);
ConnectionParam connectionParams = DataSourceUtils.buildConnectionParams(dataSourceParam);
Map<String, Object> result = dataSourceService.getSchemasList(dataSourceParam.getType(), connectionParams);
return returnDataList(result);
}
/**
* connection test
*

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java

@ -56,6 +56,7 @@ public interface DataSourceService {
* @return data source detail
*/
Map<String, Object> queryDataSource(int id);
BaseDataSourceParamDTO queryDataSourceById(int id);
/**
* query datasource list by keyword
@ -131,6 +132,8 @@ public interface DataSourceService {
*/
Map<String, Object> authedDatasource(User loginUser, Integer userId);
Map<String, Object> getSchemasList(Integer datasourceId);
/**
* get tables
* @param datasourceId
@ -145,4 +148,6 @@ public interface DataSourceService {
* @return
*/
Map<String, Object> getTableColumns(Integer datasourceId, String tableName);
Map<String, Object> getSchemasList(DbType type, ConnectionParam connectionParams);
}

118
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java

@ -58,6 +58,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.eclipse.jetty.util.ajax.JSON;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -245,6 +247,22 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
putMsg(result, Status.SUCCESS);
return result;
}
@Override
public BaseDataSourceParamDTO queryDataSourceById(int id) {
DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource == null) {
throw new RuntimeException("数据源不存在");
}
// type
BaseDataSourceParamDTO baseDataSourceParamDTO = DataSourceUtils.buildDatasourceParamDTO(
dataSource.getType(), dataSource.getConnectionParams());
baseDataSourceParamDTO.setId(dataSource.getId());
baseDataSourceParamDTO.setName(dataSource.getName());
baseDataSourceParamDTO.setNote(dataSource.getNote());
ObjectNode connParam = JSONUtils.parseObject(dataSource.getConnectionParams());
baseDataSourceParamDTO.setPassword(connParam.path(Constants.PASSWORD).asText());
return baseDataSourceParamDTO;
}
/**
* query datasource list by keyword
@ -517,6 +535,72 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
return result;
}
@Override
public Map<String, Object> getSchemasList(Integer datasourceId) {
Map<String, Object> result = new HashMap<>();
DataSource dataSource = dataSourceMapper.selectById(datasourceId);
List<String> tableList = null;
BaseConnectionParam connectionParam =
(BaseConnectionParam) DataSourceUtils.buildConnectionParams(
dataSource.getType(),
dataSource.getConnectionParams());
if (null == connectionParam) {
putMsg(result, Status.DATASOURCE_CONNECT_FAILED);
return result;
}
Connection connection =
DataSourceUtils.getConnection(dataSource.getType(), connectionParam);
ResultSet tables = null;
try {
if (null == connection) {
putMsg(result, Status.DATASOURCE_CONNECT_FAILED);
return result;
}
DatabaseMetaData metaData = connection.getMetaData();
String schema = null;
try {
schema = metaData.getConnection().getSchema();
} catch (SQLException e) {
logger.error("cant not get the schema : {}", e.getMessage(), e);
}
tables = metaData.getTables(
connectionParam.getDatabase(),
getDbSchemaPattern(dataSource.getType(), schema, connectionParam),
"%", TABLE_TYPES);
if (null == tables) {
putMsg(result, Status.GET_DATASOURCE_TABLES_ERROR);
return result;
}
tableList = new ArrayList<>();
while (tables.next()) {
String name = tables.getString(TABLE_NAME);
tableList.add(name);
}
} catch (Exception e) {
logger.error(e.toString(), e);
putMsg(result, Status.GET_DATASOURCE_TABLES_ERROR);
return result;
} finally {
closeResult(tables);
releaseConnection(connection);
}
List<ParamsOptions> options = getParamsOptions(tableList);
result.put(Constants.DATA_LIST, options);
putMsg(result, Status.SUCCESS);
return result;
}
@Override
public Map<String, Object> getTables(Integer datasourceId) {
Map<String, Object> result = new HashMap<>();
@ -637,6 +721,40 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
return result;
}
@Override
public Map<String, Object> getSchemasList(DbType type, ConnectionParam connectionParams) {
Map<String, Object> result = new HashMap<>();
List<String> schemas = new ArrayList<>();
ResultSet rs = null;
try (Connection connection = DataSourceUtils.getConnection(type, connectionParams)) {
if (connection == null) {
throw new RuntimeException("连接失败");
}
DatabaseMetaData metaData = connection.getMetaData();
rs = metaData.getCatalogs();
while (rs.next()) {
// getString(1) 或 getString("TABLE_CAT") 获取数据库名
String schemaName = rs.getString(1);
schemas.add(schemaName);
}
result.put(Constants.DATA_LIST, schemas);
putMsg(result, Status.SUCCESS);
return result;
} catch (Exception e) {
String message = Optional.of(e).map(Throwable::getCause)
.map(Throwable::getMessage)
.orElse(e.getMessage());
logger.error("datasource test connection error, dbType:{}, connectionParam:{}, message:{}.", type,
connectionParams, message);
// return (Map<String, Object>) new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(), message);
}finally {
if (rs != null) {
try { rs.close(); } catch (SQLException e) { /* 忽略 */ }
}
}
return null;
}
private List<ParamsOptions> getParamsOptions(List<String> columnList) {
List<ParamsOptions> options = null;
if (CollectionUtils.isNotEmpty(columnList)) {

Loading…
Cancel
Save