# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Task mlflow."""fromcopyimportdeepcopyfromtypingimportDict,Optionalfrompydolphinscheduler.constantsimportTaskTypefrompydolphinscheduler.core.taskimportTask
[docs]classMLflowModels(BaseMLflow):"""Task MLflow models object, declare behavior for MLflow models task to dolphinscheduler. Deploy machine learning models in diverse serving environments. :param name: task name :param model_uri: Model-URI of MLflow , support models:/<model_name>/suffix format and runs:/ format. See https://mlflow.org/docs/latest/tracking.html#artifact-stores :param mlflow_tracking_uri: MLflow tracking server uri, default is http://127.0.0.1:5000 :param deploy_mode: MLflow deploy mode, support MLFLOW, DOCKER, default is DOCKER :param port: deploy port, default is 7000 """mlflow_task_type=MLflowTaskType.MLFLOW_MODELS_child_task_mlflow_attr={"deploy_type","deploy_model_key","deploy_port",}def__init__(self,name:str,model_uri:str,mlflow_tracking_uri:Optional[str]=DEFAULT_MLFLOW_TRACKING_URI,deploy_mode:Optional[str]=MLflowDeployType.DOCKER,port:Optional[int]=7000,*args,**kwargs):"""Init mlflow models task."""super().__init__(name,mlflow_tracking_uri,*args,**kwargs)self.deploy_type=deploy_mode.upper()self.deploy_model_key=model_uriself.deploy_port=port
[docs]classMLFlowProjectsCustom(BaseMLflow):"""Task MLflow projects object, declare behavior for MLflow Custom projects task to dolphinscheduler. :param name: task name :param repository: Repository url of MLflow Project, Support git address and directory on worker. If it's in a subdirectory, We add # to support this (same as mlflow run) , for example https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native. :param mlflow_tracking_uri: MLflow tracking server uri, default is http://127.0.0.1:5000 :param experiment_name: MLflow experiment name, default is empty :param parameters: MLflow project parameters, default is empty :param version: MLflow project version, default is master """mlflow_task_type=MLflowTaskType.MLFLOW_PROJECTSmlflow_job_type=MLflowJobType.CUSTOM_PROJECT_child_task_mlflow_attr={"mlflow_job_type","experiment_name","params","mlflow_project_repository","mlflow_project_version",}def__init__(self,name:str,repository:str,mlflow_tracking_uri:Optional[str]=DEFAULT_MLFLOW_TRACKING_URI,experiment_name:Optional[str]="",parameters:Optional[str]="",version:Optional[str]="master",*args,**kwargs):"""Init mlflow projects task."""super().__init__(name,mlflow_tracking_uri,*args,**kwargs)self.mlflow_project_repository=repositoryself.experiment_name=experiment_nameself.params=parametersself.mlflow_project_version=version
[docs]classMLFlowProjectsAutoML(BaseMLflow):"""Task MLflow projects object, declare behavior for AutoML task to dolphinscheduler. :param name: task name :param data_path: data path of MLflow Project, Support git address and directory on worker. :param automl_tool: The AutoML tool used, currently supports autosklearn and flaml. :param mlflow_tracking_uri: MLflow tracking server uri, default is http://127.0.0.1:5000 :param experiment_name: MLflow experiment name, default is empty :param model_name: MLflow model name, default is empty :param parameters: MLflow project parameters, default is empty """mlflow_task_type=MLflowTaskType.MLFLOW_PROJECTSmlflow_job_type=MLflowJobType.AUTOML_child_task_mlflow_attr={"mlflow_job_type","experiment_name","model_name","register_model","data_path","params","automl_tool",}def__init__(self,name:str,data_path:str,automl_tool:Optional[str]="flaml",mlflow_tracking_uri:Optional[str]=DEFAULT_MLFLOW_TRACKING_URI,experiment_name:Optional[str]="",model_name:Optional[str]="",parameters:Optional[str]="",*args,**kwargs):"""Init mlflow projects task."""super().__init__(name,mlflow_tracking_uri,*args,**kwargs)self.data_path=data_pathself.experiment_name=experiment_nameself.model_name=model_nameself.params=parametersself.automl_tool=automl_tool.lower()self.register_model=bool(model_name)
[docs]classMLFlowProjectsBasicAlgorithm(BaseMLflow):"""Task MLflow projects object, declare behavior for BasicAlgorithm task to dolphinscheduler. :param name: task name :param data_path: data path of MLflow Project, Support git address and directory on worker. :param algorithm: The selected algorithm currently supports LR, SVM, LightGBM and XGboost based on scikit-learn form. :param mlflow_tracking_uri: MLflow tracking server uri, default is http://127.0.0.1:5000 :param experiment_name: MLflow experiment name, default is empty :param model_name: MLflow model name, default is empty :param parameters: MLflow project parameters, default is empty :param search_params: Whether to search the parameters, default is empty """mlflow_job_type=MLflowJobType.BASIC_ALGORITHMmlflow_task_type=MLflowTaskType.MLFLOW_PROJECTS_child_task_mlflow_attr={"mlflow_job_type","experiment_name","model_name","register_model","data_path","params","algorithm","search_params",}def__init__(self,name:str,data_path:str,algorithm:Optional[str]="lightgbm",mlflow_tracking_uri:Optional[str]=DEFAULT_MLFLOW_TRACKING_URI,experiment_name:Optional[str]="",model_name:Optional[str]="",parameters:Optional[str]="",search_params:Optional[str]="",*args,**kwargs):"""Init mlflow projects task."""super().__init__(name,mlflow_tracking_uri,*args,**kwargs)self.data_path=data_pathself.experiment_name=experiment_nameself.model_name=model_nameself.params=parametersself.algorithm=algorithm.lower()self.search_params=search_paramsself.register_model=bool(model_name)