使用python实现自定义Transformer以对pyspark的pipeline进行增强
一 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from  pyspark import  keyword_onlyfrom  pyspark.ml import  Transformerfrom  pyspark.ml.param.shared import  HasOutputCols, Param, Paramsfrom  pyspark.ml.util import  DefaultParamsReadable, DefaultParamsWritableclass  SelectColsTransformer ("cols" ,"cols to select"     @keyword_only def  __init__ (self, cols:list [float ]=['*' ] ):super (SelectColsTransformer, self ).__init__()self ._setDefault(cols=['*' ])self ._input_kwargsself .setParams(**kwargs)    @keyword_only def  setParams (self, cols:list [float ]=['*' ] ):self ._input_kwargsreturn  self ._set (**kwargs)def  _transform (self, dataset ):return  dataset.select(self .getOrDefault(self .cols));