# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Task Python."""importinspectimportloggingimportreimporttypesfromtypingimportUnionfrompydolphinscheduler.constantsimportTaskTypefrompydolphinscheduler.core.taskimportTaskfrompydolphinscheduler.exceptionsimportPyDSParamExceptionlog=logging.getLogger(__file__)
[docs]classPython(Task):"""Task Python object, declare behavior for Python task to dolphinscheduler. Python task support two types of parameters for :param:``definition``, and here is an example: Using str type of :param:``definition`` .. code-block:: python python_task = Python(name="str_type", definition="print('Hello Python task.')") Or using Python callable type of :param:``definition`` .. code-block:: python def foo(): print("Hello Python task.") python_task = Python(name="str_type", definition=foo) :param name: The name for Python task. It define the task name. :param definition: String format of Python script you want to execute or Python callable you want to execute. """_task_custom_attr={"raw_script","definition"}ext:set={".py"}ext_attr:Union[str,types.FunctionType]="_definition"def__init__(self,name:str,definition:Union[str,types.FunctionType],*args,**kwargs):self._definition=definitionsuper().__init__(name,TaskType.PYTHON,*args,**kwargs)
[docs]def_build_exe_str(self)->str:"""Build executable string from given definition. Attribute ``self.definition`` almost is a function, we need to call this function after parsing it to string. The easier way to call a function is using syntax ``func()`` and we use it to call it too. """definition=getattr(self,"definition")ifisinstance(definition,types.FunctionType):py_function=inspect.getsource(definition)func_str=f"{py_function}{definition.__name__}()"else:pattern=re.compile("^def (\\w+)\\(")find=pattern.findall(definition)ifnotfind:returndefinition# Keep function str and function callable always have one blank linefunc_str=(f"{definition}{find[0]}()"ifdefinition.endswith("\n")elsef"{definition}\n{find[0]}()")returnfunc_str
@propertydefraw_script(self)->str:"""Get python task define attribute `raw_script`."""ifisinstance(getattr(self,"definition"),(str,types.FunctionType)):returnself._build_exe_str()else:raisePyDSParamException("Parameter definition do not support % for now.",type(getattr(self,"definition")),)