# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Task sql."""from__future__importannotationsimportloggingimportrefromcollections.abcimportSequencefrompydolphinscheduler.constantsimportTaskTypefrompydolphinscheduler.core.taskimportBatchTaskfrompydolphinscheduler.models.datasourceimportDatasourcelog=logging.getLogger(__file__)
[docs]classSqlType:"""SQL type, for now it just contain `SELECT` and `NO_SELECT`."""SELECT="0"NOT_SELECT="1"
[docs]classSql(BatchTask):"""Task SQL object, declare behavior for SQL task to dolphinscheduler. It should run sql job in multiply sql lik engine, such as: - ClickHouse - DB2 - HIVE - MySQL - Oracle - Postgresql - Presto - SQLServer You provider datasource_name contain connection information, it decisions which database type and database instance would run this sql. :param name: SQL task name :param datasource_name: datasource name in dolphinscheduler, the name must exists and must be ``online`` datasource instead of ``test``. :param sql: SQL statement, the sql script you want to run. Support resource plugin in this parameter. :param sql_type: SQL type, whether sql statement is select query or not. If not provided, it will be auto detected according to sql statement using :func:`pydolphinscheduler.tasks.sql.Sql.sql_type`, and you can also set it manually. by ``SqlType.SELECT`` for query statement or ``SqlType.NOT_SELECT`` for not query statement. :param pre_statements: SQL statements to be executed before the main SQL statement. :param post_statements: SQL statements to be executed after the main SQL statement. :param display_rows: The number of record rows number to be displayed in the SQL task log, default is 10. """_task_custom_attr={"sql","sql_type","pre_statements","post_statements","display_rows",}ext:set={".sql"}ext_attr:str="_sql"def__init__(self,name:str,datasource_name:str,sql:str,datasource_type:str|None=None,sql_type:str|None=None,pre_statements:str|Sequence[str]|None=None,post_statements:str|Sequence[str]|None=None,display_rows:int|None=10,*args,**kwargs,):self._sql=sqlsuper().__init__(name,TaskType.SQL,*args,**kwargs)self.param_sql_type=sql_typeself.datasource_name=datasource_nameself.datasource_type=datasource_typeself.pre_statements=self.get_stm_list(pre_statements)self.post_statements=self.get_stm_list(post_statements)self.display_rows=display_rows
[docs]@staticmethoddefget_stm_list(stm:str|Sequence[str])->list[str]:"""Convert statement to str of list. :param stm: statements string :return: statements list """ifnotstm:return[]elifisinstance(stm,str):return[stm]returnlist(stm)
@propertydefsql_type(self)->str:"""Judgement sql type, it will return the SQL type for type `SELECT` or `NOT_SELECT`. If `param_sql_type` dot not specific, will use regexp to check which type of the SQL is. But if `param_sql_type` is specific will use the parameter overwrites the regexp way """if(self.param_sql_type==SqlType.SELECTorself.param_sql_type==SqlType.NOT_SELECT):log.info("The sql type is specified by a parameter, with value %s",self.param_sql_type,)returnself.param_sql_typepattern_select_str=("^(?!(.* |)insert |(.* |)delete |(.* |)drop ""|(.* |)update |(.* |)truncate |(.* |)alter |(.* |)create ).*")pattern_select=re.compile(pattern_select_str,re.IGNORECASE)ifpattern_select.match(self._sql)isNone:returnSqlType.NOT_SELECTelse:returnSqlType.SELECT@propertydefdatasource(self)->dict:"""Get datasource for procedure sql."""datasource_task_u=Datasource.get_task_usage_4j(self.datasource_name,self.datasource_type)return{"datasource":datasource_task_u.id,"type":datasource_task_u.type,}@propertydeftask_params(self,camel_attr:bool=True,custom_attr:set=None)->dict:"""Override Task.task_params for sql task. sql task have some specials attribute for task_params, and is odd if we directly set as python property, so we Override Task.task_params here. """params=super().task_paramsparams.update(self.datasource)returnparams