Skip to content

Commit

Permalink
feat(host): (#245)
Browse files Browse the repository at this point in the history
1. adapt pytorch distributed communication style in torch-elastic.
2. support traffic no redirect in hostnetwork mode.

Signed-off-by: SimonCqk <[email protected]>
  • Loading branch information
SimonCqk authored May 10, 2022
1 parent 32bc0df commit 5318ba3
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
15 changes: 11 additions & 4 deletions controllers/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

training "github.com/alibaba/kubedl/apis/training/v1alpha1"
"github.com/alibaba/kubedl/cmd/options"
"github.com/alibaba/kubedl/pkg/features"
"github.com/alibaba/kubedl/pkg/gang_schedule/registry"
"github.com/alibaba/kubedl/pkg/job_controller"
v1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1"
Expand Down Expand Up @@ -210,14 +211,20 @@ func (r *PytorchJobReconciler) SetClusterSpec(ctx context.Context, job interface
return err
}

masterRole := rtype == strings.ToLower(string(training.PyTorchReplicaTypeMaster))
if masterHostPort, ok := job_controller.GetHostNetworkPortFromContext(ctx, "master", "0"); job_controller.EnableHostNetwork(pytorchJob) && ok {
if masterRole || features.KubeDLFeatureGates.Enabled(features.HostNetWithHeadlessSvc) {
masterPort = masterHostPort
}
}

masterAddr := commonutil.GenGeneralName(pytorchJob.Name, strings.ToLower(string(training.PyTorchReplicaTypeMaster)), strconv.Itoa(0))
if rtype == strings.ToLower(string(training.PyTorchReplicaTypeMaster)) {
if masterRole {
if rank != 0 {
return fmt.Errorf("invalid config: There should be only a single master with index=0")
}
masterAddr = "localhost"
if hostPort, ok := job_controller.GetHostNetworkPortFromContext(ctx, rtype, index); ok && job_controller.EnableHostNetwork(pytorchJob) {
masterPort = hostPort
if features.KubeDLFeatureGates.Enabled(features.PyTorchLocalMasterAddr) {
masterAddr = "localhost"
}
} else {
rank++
Expand Down
16 changes: 13 additions & 3 deletions pkg/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ const (
// DAGScheduling enables DAG scheduling workflow between different job roles.
DAGScheduling featuregate.Feature = "DAGScheduling"

// TODO: migrate other features into featuregates pattern.
// PyTorchLocalMasterAddr explicitly declare to use localhost as master self listened
// address, it's usually adopted in version < torch 1.9, in >=1.9 distributed communication
// style, master address value should be aligned with workers, set by master service name.
PyTorchLocalMasterAddr featuregate.Feature = "PyTorchLocalMasterAddr"

// HostNetWithHeadlessSvc constructs connections intra pods leveraging headless service
// instead of normal service with different port/targetPort, it bypasses traffic routing
// but pod may not find correct host port after fail-overed.
HostNetWithHeadlessSvc featuregate.Feature = "HostNetWithHeadlessSvc"
)

func init() {
Expand All @@ -39,7 +47,9 @@ var (
KubeDLFeatureGates = featuregate.NewFeatureGate()

defaultKubeDLFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
GangScheduling: {Default: true, PreRelease: featuregate.Beta},
DAGScheduling: {Default: true, PreRelease: featuregate.Beta},
GangScheduling: {Default: true, PreRelease: featuregate.Beta},
DAGScheduling: {Default: true, PreRelease: featuregate.Beta},
PyTorchLocalMasterAddr: {Default: true, PreRelease: featuregate.Beta},
HostNetWithHeadlessSvc: {Default: false, PreRelease: featuregate.Alpha},
}
)
3 changes: 2 additions & 1 deletion pkg/job_controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

log "github.com/sirupsen/logrus"

"github.com/alibaba/kubedl/pkg/features"
apiv1 "github.com/alibaba/kubedl/pkg/job_controller/api/v1"
commonutil "github.com/alibaba/kubedl/pkg/util"
)
Expand Down Expand Up @@ -290,7 +291,7 @@ func (jc *JobController) CreateNewService(ctx context.Context, job metav1.Object
targetPort := svcPort
clusterIP := "None"

if EnableHostNetwork(job) {
if !features.KubeDLFeatureGates.Enabled(features.HostNetWithHeadlessSvc) && EnableHostNetwork(job) {
// Communications between replicas use headless services by default, as for hostnetwork mode,
// headless service can not forward traffic from one port to another, so we use normal service
// when hostnetwork enabled.
Expand Down

0 comments on commit 5318ba3

Please sign in to comment.