使用python实现自定义Transformer以对pyspark的pipeline进行增强
一 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from pyspark import keyword_onlyfrom pyspark.ml import Transformerfrom pyspark.ml.param.shared import HasOutputCols, Param, Paramsfrom pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritableclass SelectColsTransformer ( Transformer, DefaultParamsReadable, DefaultParamsWritable ): cols = Param( Params._dummy(), "cols" , "cols to select" ) @keyword_only def __init__ (self, cols:list [float ]=['*' ] ): super (SelectColsTransformer, self ).__init__() self ._setDefault(cols=['*' ]) kwargs = self ._input_kwargs self .setParams(**kwargs) @keyword_only def setParams (self, cols:list [float ]=['*' ] ): kwargs = self ._input_kwargs return self ._set (**kwargs) def _transform (self, dataset ): return dataset.select(self .getOrDefault(self .cols));