【daft框架】和ray分布式计算的结合运行自定义函数

张开发
2026/4/17 17:35:36 15 分钟阅读

分享文章

【daft框架】和ray分布式计算的结合运行自定义函数
daft的框架主要分成python和raft两部分daft在ray上如何运行udf采用分布式执行框架Client 端: RemoteFlotillaRunner 负责把物理计划切成任务分发到各个节点Worker 端: 每个 Ray 节点上只跑一个 RaySwordfishActor内部调度: Actor 内部有任务队列根据 UDF 声明的资源 (num_gpus/num_cpus) 调度任务具体流程步骤1 ray.init()连接 Ray 集群这一步就是 Ray 本身的逻辑和 Daft 无关Daft 复用你已经初始化好的 Ray不需要自己重新初始化步骤2 daft.set_runner_ray()这一步才是 Daft 启动 Actor 的地方def set_runner_ray():# → 创建 RemoteFlotillaRunner# → 在每个 Ray 节点上启动一个 RaySwordfishActor# → 这些 Actors 启动好之后就一直运行等待任务# Daft 在 set_runner_ray 的时候在每个 Ray 节点启动一个 Actorray.remoteclassRaySwordfishActor:def__init__(self):# 这里启动好一直活着self.task_queue...self.resource_scheduler...self.instantiated_udfs{}# 缓存已经实例化的 UDF关键点: 此时只启动了一个空的 RaySwordfishActor 每个节点Actor 只是个空壳里面还没有任何 UDF。步骤3 UDF 定义与注册阶段daft.func(num_gpus0.5,concurrency2)defmy_udf(image):returnmodel.predict(image)dfdf.with_column(prediction,my_udf(col(image)))在 Python 层定义 UDF发生了什么:daft.func/daft.cls 将你的函数/类包装成 Daft 内部的 UDF 对象资源配置num_gpus/num_cpus/ray_options被存在 UDF 对象里UDF 信息被注册到 Daft 的函数注册表 。不申请资源不启动任何东西只是保存信息步骤4 查询规划阶段当执行 df.collect()resultdf.collect()发生了什么:Daft 从逻辑计划 → 优化 → 生成物理执行计划物理计划会把计算切分成多个任务块每个任务块处理一批数据Flotilla 调度器知道哪些 UDF 需要什么资源任务提交与调度物理计划生成后Flotilla 把任务发给各个节点的 RaySwordfishActor:Client → RemoteFlotillaRunner → 分推任务 → 各个 RaySwordfishActor调度逻辑:Daft 会根据每个 UDF 声明的资源需求num_gpus/num_cpus做内部调度concurrencyN 决定同一个 Actor 里最多同时跑几个该 UDF 的任务对于 GPU: 如果你声明 num_gpus0.5同一个 Actor 可以并行跑 2 个共享同一块 GPU这是 Daft 比原生 Ray 好的地方步骤5. UDF 执行当 RaySwordfishActor 收到一个 UDF 任务反序列化: 从任务描述中拿到 UDF 和输入数据实例化: 如果是类 UDFdaft.cls实例化你的类只实例化一次复用实例执行: 调用你的 UDF 处理输入 batch序列化: 把输出结果序列化传回给下游或者 client关键优化:UDF 实例复用: 相同 UDF 只实例化一次不会每个任务都新建节省初始化开销比如模型只加载一次到 GPU批处理: Daft 会把数据攒成批再给你的 UDF提升利用率内存管理: 大批次会自动拆分避免 OOM核心代码位置Flotilla 入口: daft/runners/flotilla.py → RemoteFlotillaRunnerSwordfish Actor: daft/runners/swordfish/actor.py → RaySwordfishActorUDF v2 实现: daft/udf/udf_v2.pyShuffle 实现: src/daft-shuffles/ (Rust)

更多文章