@@ -183,6 +183,21 @@ func ResourceManagedKafkaCluster() *schema.Resource {
183183 ForceNew : true ,
184184 Description : `ID of the location of the Kafka resource. See https://cloud.google.com/managed-kafka/docs/locations for a list of supported locations.` ,
185185 },
186+ "broker_capacity_config" : {
187+ Type : schema .TypeList ,
188+ Optional : true ,
189+ Description : `Capacity configuration at a per-broker level within the Kafka cluster. The config will be appled to each broker in the cluster.` ,
190+ MaxItems : 1 ,
191+ Elem : & schema.Resource {
192+ Schema : map [string ]* schema.Schema {
193+ "disk_size_gb" : {
194+ Type : schema .TypeString ,
195+ Optional : true ,
196+ Description : `The disk to provision for each broker in Gigabytes. Minimum: 100 GB.` ,
197+ },
198+ },
199+ },
200+ },
186201 "labels" : {
187202 Type : schema .TypeMap ,
188203 Optional : true ,
@@ -312,6 +327,12 @@ func resourceManagedKafkaClusterCreate(d *schema.ResourceData, meta interface{})
312327 } else if v , ok := d .GetOkExists ("capacity_config" ); ! tpgresource .IsEmptyValue (reflect .ValueOf (capacityConfigProp )) && (ok || ! reflect .DeepEqual (v , capacityConfigProp )) {
313328 obj ["capacityConfig" ] = capacityConfigProp
314329 }
330+ brokerCapacityConfigProp , err := expandManagedKafkaClusterBrokerCapacityConfig (d .Get ("broker_capacity_config" ), d , config )
331+ if err != nil {
332+ return err
333+ } else if v , ok := d .GetOkExists ("broker_capacity_config" ); ! tpgresource .IsEmptyValue (reflect .ValueOf (brokerCapacityConfigProp )) && (ok || ! reflect .DeepEqual (v , brokerCapacityConfigProp )) {
334+ obj ["brokerCapacityConfig" ] = brokerCapacityConfigProp
335+ }
315336 rebalanceConfigProp , err := expandManagedKafkaClusterRebalanceConfig (d .Get ("rebalance_config" ), d , config )
316337 if err != nil {
317338 return err
@@ -494,6 +515,12 @@ func resourceManagedKafkaClusterUpdate(d *schema.ResourceData, meta interface{})
494515 } else if v , ok := d .GetOkExists ("capacity_config" ); ! tpgresource .IsEmptyValue (reflect .ValueOf (v )) && (ok || ! reflect .DeepEqual (v , capacityConfigProp )) {
495516 obj ["capacityConfig" ] = capacityConfigProp
496517 }
518+ brokerCapacityConfigProp , err := expandManagedKafkaClusterBrokerCapacityConfig (d .Get ("broker_capacity_config" ), d , config )
519+ if err != nil {
520+ return err
521+ } else if v , ok := d .GetOkExists ("broker_capacity_config" ); ! tpgresource .IsEmptyValue (reflect .ValueOf (v )) && (ok || ! reflect .DeepEqual (v , brokerCapacityConfigProp )) {
522+ obj ["brokerCapacityConfig" ] = brokerCapacityConfigProp
523+ }
497524 rebalanceConfigProp , err := expandManagedKafkaClusterRebalanceConfig (d .Get ("rebalance_config" ), d , config )
498525 if err != nil {
499526 return err
@@ -530,6 +557,10 @@ func resourceManagedKafkaClusterUpdate(d *schema.ResourceData, meta interface{})
530557 updateMask = append (updateMask , "capacityConfig" )
531558 }
532559
560+ if d .HasChange ("broker_capacity_config" ) {
561+ updateMask = append (updateMask , "brokerCapacityConfig" )
562+ }
563+
533564 if d .HasChange ("rebalance_config" ) {
534565 updateMask = append (updateMask , "rebalanceConfig" )
535566 }
@@ -976,6 +1007,32 @@ func expandManagedKafkaClusterCapacityConfigMemoryBytes(v interface{}, d tpgreso
9761007 return v , nil
9771008}
9781009
1010+ func expandManagedKafkaClusterBrokerCapacityConfig (v interface {}, d tpgresource.TerraformResourceData , config * transport_tpg.Config ) (interface {}, error ) {
1011+ if v == nil {
1012+ return nil , nil
1013+ }
1014+ l := v .([]interface {})
1015+ if len (l ) == 0 || l [0 ] == nil {
1016+ return nil , nil
1017+ }
1018+ raw := l [0 ]
1019+ original := raw .(map [string ]interface {})
1020+ transformed := make (map [string ]interface {})
1021+
1022+ transformedDiskSizeGb , err := expandManagedKafkaClusterBrokerCapacityConfigDiskSizeGb (original ["disk_size_gb" ], d , config )
1023+ if err != nil {
1024+ return nil , err
1025+ } else if val := reflect .ValueOf (transformedDiskSizeGb ); val .IsValid () && ! tpgresource .IsEmptyValue (val ) {
1026+ transformed ["diskSizeGb" ] = transformedDiskSizeGb
1027+ }
1028+
1029+ return transformed , nil
1030+ }
1031+
1032+ func expandManagedKafkaClusterBrokerCapacityConfigDiskSizeGb (v interface {}, d tpgresource.TerraformResourceData , config * transport_tpg.Config ) (interface {}, error ) {
1033+ return v , nil
1034+ }
1035+
9791036func expandManagedKafkaClusterRebalanceConfig (v interface {}, d tpgresource.TerraformResourceData , config * transport_tpg.Config ) (interface {}, error ) {
9801037 if v == nil {
9811038 return nil , nil
0 commit comments