# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Module database."""fromtypingimportDictfrompy4j.protocolimportPy4JJavaErrorfrompydolphinscheduler.exceptionsimportPyDSParamExceptionfrompydolphinscheduler.java_gatewayimportlaunch_gateway
[docs]classDatabase(dict):"""database object, get information about database. You provider database_name contain connection information, it decisions which database type and database instance would run task. """def__init__(self,database_name:str,type_key,database_key,*args,**kwargs):super().__init__(*args,**kwargs)self._database={}self.database_name=database_nameself[type_key]=self.database_typeself[database_key]=self.database_id@propertydefdatabase_type(self)->str:"""Get database type from java gateway, a wrapper for :func:`get_database_info`."""returnself.get_database_info(self.database_name).get("type")@propertydefdatabase_id(self)->str:"""Get database id from java gateway, a wrapper for :func:`get_database_info`."""returnself.get_database_info(self.database_name).get("id")
[docs]defget_database_info(self,name)->Dict:"""Get database info from java gateway, contains database id, type, name."""ifself._database:returnself._databaseelse:gateway=launch_gateway()try:self._database=gateway.entry_point.getDatasourceInfo(name)# Handler database source do not exists error, for now we just terminate the process.exceptPy4JJavaErrorasex:raisePyDSParamException(str(ex.java_exception))returnself._database