diff --git a/go.mod b/go.mod index 81401c09e4..955df17083 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/charmbracelet/lipgloss v0.11.0 github.com/client9/gospell v0.0.0-20160306015952-90dfc71015df github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52 + github.com/confluentinc/ccloud-sdk-go-v2-internal/ksql v0.5.0 github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0 github.com/confluentinc/ccloud-sdk-go-v2/apikeys v0.4.0 github.com/confluentinc/ccloud-sdk-go-v2/billing v0.3.0 @@ -36,7 +37,6 @@ require ( github.com/confluentinc/ccloud-sdk-go-v2/identity-provider v0.3.0 github.com/confluentinc/ccloud-sdk-go-v2/kafka-quotas v0.4.0 github.com/confluentinc/ccloud-sdk-go-v2/kafkarest v0.0.0-20250909043602-f80bee0eb280 - github.com/confluentinc/ccloud-sdk-go-v2/ksql v0.2.0 github.com/confluentinc/ccloud-sdk-go-v2/mds v0.4.0 github.com/confluentinc/ccloud-sdk-go-v2/metrics v0.2.0 github.com/confluentinc/ccloud-sdk-go-v2/networking v0.14.0 diff --git a/go.sum b/go.sum index 36dd82a19f..3475d86383 100644 --- a/go.sum +++ b/go.sum @@ -172,6 +172,8 @@ github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnht github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52 h1:19qEGhkbZa5fopKCe0VPIV+Sasby4Pv10z9ZaktwWso= github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52/go.mod h1:62EMf+5uFEt1BJ2q8WMrUoI9VUSxAbDnmZCGRt/MbA0= +github.com/confluentinc/ccloud-sdk-go-v2-internal/ksql v0.5.0 h1:/t7vM8UkEGJWfdDxsRkDCaRhDoDhAupphRl2QZ039zI= +github.com/confluentinc/ccloud-sdk-go-v2-internal/ksql v0.5.0/go.mod h1:NHFvzwBb2RJmy7RVvq9Z3gregvm08HgquCUtsjTj1Ko= github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0 h1:zSF4OQUJXWH2JeAo9rsq13ibk+JFdzITGR8S7cFMpzw= github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0/go.mod h1:DoxqzzF3JzvJr3fWkvCiOHFlE0GoYpozWxFZ1Ud9ntA= github.com/confluentinc/ccloud-sdk-go-v2/apikeys v0.4.0 h1:8fWyLwMuy8ec0MVF5Avd54UvbIxhDFhZzanHBVwgxdw= @@ -214,8 +216,6 @@ github.com/confluentinc/ccloud-sdk-go-v2/kafka-quotas v0.4.0 h1:T9e7lNj/VjxE89+t github.com/confluentinc/ccloud-sdk-go-v2/kafka-quotas v0.4.0/go.mod h1:7gqwWFIyj2MAGpL/kf6SGXm/pi2Z6qpMJIjKlgEEhhg= github.com/confluentinc/ccloud-sdk-go-v2/kafkarest v0.0.0-20250909043602-f80bee0eb280 h1:GFVI3pGckhpP66Xb05usB8txzubnnoigZHp292ax5Rg= github.com/confluentinc/ccloud-sdk-go-v2/kafkarest v0.0.0-20250909043602-f80bee0eb280/go.mod h1:b8v8EIBtpQDx0zAxCpGxhuSWBRAwh/+PRFNtaBR5P7c= -github.com/confluentinc/ccloud-sdk-go-v2/ksql v0.2.0 h1:g6OHa1iW3HO3N/YiTAL9Q6Y7rdjMBAjOPYK37akTt0M= -github.com/confluentinc/ccloud-sdk-go-v2/ksql v0.2.0/go.mod h1:0LAvd4VqlaRwKU4yvDEkVCtV43yNezt56+hBe9Lmg7Q= github.com/confluentinc/ccloud-sdk-go-v2/mds v0.4.0 h1:jIXXhGi+Xn+XYFCErnMvd035QijbYXla1Bo8W7V7lFM= github.com/confluentinc/ccloud-sdk-go-v2/mds v0.4.0/go.mod h1:ufn9In8kDsyJ7Nru2ygpAaWdGw7DSDTOTtDhQVSmZjs= github.com/confluentinc/ccloud-sdk-go-v2/metrics v0.2.0 h1:TWwZHdfo2XNKrnGOuxXx4LF8WgahqqDC47Ap51L4thM= diff --git a/internal/ksql/command.go b/internal/ksql/command.go index 739a49ade2..1070781d94 100644 --- a/internal/ksql/command.go +++ b/internal/ksql/command.go @@ -9,7 +9,7 @@ import ( "github.com/spf13/cobra" "golang.org/x/oauth2" - ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2/ksql/v2" + ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2-internal/ksql/v2" pauth "github.com/confluentinc/cli/v4/pkg/auth" "github.com/confluentinc/cli/v4/pkg/ccloudv2" diff --git a/internal/ksql/command_cluster.go b/internal/ksql/command_cluster.go index 97dd022d6a..af6a0f7e25 100644 --- a/internal/ksql/command_cluster.go +++ b/internal/ksql/command_cluster.go @@ -21,6 +21,7 @@ func newClusterCommand(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Comm cmd.AddCommand(c.newDeleteCommand()) cmd.AddCommand(c.newDescribeCommand()) cmd.AddCommand(c.newListCommand()) + cmd.AddCommand(c.newUpdateCommand()) } else { c := &ksqlCommand{pcmd.NewAuthenticatedWithMDSCLICommand(cmd, prerunner)} cmd.AddCommand(c.newListCommandOnPrem()) diff --git a/internal/ksql/command_cluster_configureacls.go b/internal/ksql/command_cluster_configureacls.go index baa552cb99..4ddaba62c0 100644 --- a/internal/ksql/command_cluster_configureacls.go +++ b/internal/ksql/command_cluster_configureacls.go @@ -6,7 +6,7 @@ import ( "github.com/spf13/cobra" kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" - ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2/ksql/v2" + ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2-internal/ksql/v2" "github.com/confluentinc/cli/v4/pkg/acl" "github.com/confluentinc/cli/v4/pkg/ccstructs" diff --git a/internal/ksql/command_cluster_delete.go b/internal/ksql/command_cluster_delete.go index e297e5c878..6f53b5d09a 100644 --- a/internal/ksql/command_cluster_delete.go +++ b/internal/ksql/command_cluster_delete.go @@ -10,7 +10,7 @@ import ( "github.com/spf13/cobra" "golang.org/x/oauth2" - ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2/ksql/v2" + ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2-internal/ksql/v2" pauth "github.com/confluentinc/cli/v4/pkg/auth" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" diff --git a/internal/ksql/command_cluster_update.go b/internal/ksql/command_cluster_update.go new file mode 100644 index 0000000000..25d0042f9a --- /dev/null +++ b/internal/ksql/command_cluster_update.go @@ -0,0 +1,170 @@ +package ksql + +import ( + "fmt" + "sort" + + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/examples" + "github.com/confluentinc/cli/v4/pkg/output" +) + +// Valid CSU sizes that customers may target via self-serve cluster update. +// Mirrors the server-side authoritative list in cc-control-plane-ksql: +// internal/service/update_ksql_cluster_resize.go::validCSUSizes. +// Values 1, 2 are legacy and not user-selectable. Values above maxSelfServeCSU +// (the largest entry in this slice) still require a support ticket. +// +//nolint:gochecknoglobals +var validCsuSizes = []int32{4, 8, 12, 16, 20, 24, 28} + +// maxSelfServeCSU is derived from validCsuSizes so the support-ticket +// threshold and the "Valid values" listing stay in lockstep — if the +// validCsuSizes slice is extended, the threshold moves with it. +// +//nolint:gochecknoglobals +var maxSelfServeCSU = func() int32 { + max := int32(0) + for _, v := range validCsuSizes { + if v > max { + max = v + } + } + return max +}() + +func csuSupportTicketMessage() string { + return fmt.Sprintf( + "CSU values above %d require a support ticket. "+ + "Please contact Confluent Support to request a larger cluster size.", + maxSelfServeCSU) +} + +func (c *ksqlCommand) newUpdateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "update ", + Short: "Update a ksqlDB cluster.", + Long: buildUpdateLongDescription(), + Args: cobra.ExactArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validArgs), + RunE: c.update, + // Hidden while the SDK call is shimmed (see Client.UpdateKsqlCluster + // in pkg/ccloudv2/ksql.go). Once the SDK is regenerated from cc-api + // PR #2507 and the shim is replaced with the real call, drop Hidden. + Hidden: true, + Example: examples.BuildExampleString( + examples.Example{ + Text: `Resize ksqlDB cluster "lksqlc-12345" to 8 CSUs.`, + Code: "confluent ksql cluster update lksqlc-12345 --csu 8", + }, + ), + } + + cmd.Flags().Int32("csu", 0, fmt.Sprintf( + "Target number of CSUs for the cluster. Valid values: %s.", + formatCsuList(validCsuSizes))) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("csu")) + + return cmd +} + +func buildUpdateLongDescription() string { + return fmt.Sprintf( + "Update an existing ksqlDB cluster. Currently only the CSU count may be modified, "+ + "and only to larger sizes (shrink is not supported).\n\n"+ + "Valid CSU values are %s. Larger sizes require a support ticket. "+ + "The cluster will undergo a rolling restart to apply the new size; "+ + "the command returns once the resize has been accepted by the control plane.", + formatCsuList(validCsuSizes)) +} + +func (c *ksqlCommand) update(cmd *cobra.Command, args []string) error { + csu, err := cmd.Flags().GetInt32("csu") + if err != nil { + return err + } + if err := validateCsuForUpdate(csu); err != nil { + return err + } + + environmentId, err := c.Context.EnvironmentId() + if err != nil { + return err + } + + clusterId := args[0] + + // Pre-check current CSU so we can short-circuit a no-op locally before + // issuing the PATCH. The server-side validator also rejects no-op resizes + // with 400 ("new CSU size is the same as old CSU size, no-op"), but a + // client-side check produces a clearer message and avoids a wasted API + // round trip. Note: shrink is not supported server-side either. + current, err := c.V2Client.DescribeKsqlCluster(clusterId, environmentId) + if err != nil { + return errors.CatchKSQLNotFoundError(err, clusterId) + } + currentCsu := current.Spec.GetCsu() + if currentCsu == csu { + return fmt.Errorf("ksqlDB cluster %q is already at %d CSUs; no change requested", + clusterId, csu) + } + if csu < currentCsu { + return fmt.Errorf("ksqlDB cluster %q is currently %d CSUs; shrinking is not supported "+ + "(target %d < current %d)", clusterId, currentCsu, csu, currentCsu) + } + + cluster, err := c.V2Client.UpdateKsqlCluster(clusterId, environmentId, csu) + if err != nil { + return err + } + + // Print the rolling-restart notice only AFTER the PATCH was accepted — + // otherwise a failed call (e.g., a 4xx from the server) would leave the + // customer with a misleading "Resizing…" message even though no resize + // is happening. + output.ErrPrintf(c.Config.EnableColor, + "Resizing ksqlDB cluster %q from %d to %d CSUs. A rolling restart will be "+ + "performed asynchronously; the cluster will continue serving queries during the resize.\n", + clusterId, currentCsu, csu) + + table := output.NewTable(cmd) + table.Add(c.formatClusterForDisplayAndList(&cluster)) + return table.Print() +} + +// validateCsuForUpdate returns nil if csu is in validCsuSizes, and a +// customer-safe error otherwise. The server-side check in +// cc-control-plane-ksql is authoritative; this client-side validation exists +// to fail fast with a clearer message before issuing the API call. +func validateCsuForUpdate(csu int32) error { + if csu > maxSelfServeCSU { + return fmt.Errorf("%d CSUs: %s", csu, csuSupportTicketMessage()) + } + for _, valid := range validCsuSizes { + if csu == valid { + return nil + } + } + return fmt.Errorf("%d is not a valid CSU size for cluster update. Valid sizes are %s", + csu, formatCsuList(validCsuSizes)) +} + +func formatCsuList(sizes []int32) string { + sorted := append([]int32(nil), sizes...) + sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] }) + out := "" + for i, s := range sorted { + if i > 0 { + out += ", " + } + out += fmt.Sprintf("%d", s) + } + return out +} diff --git a/internal/ksql/command_cluster_update_test.go b/internal/ksql/command_cluster_update_test.go new file mode 100644 index 0000000000..ab13bae852 --- /dev/null +++ b/internal/ksql/command_cluster_update_test.go @@ -0,0 +1,78 @@ +package ksql + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestValidateCsuForUpdate(t *testing.T) { + tests := []struct { + name string + csu int32 + expectErr bool + errContains string + }{ + {name: "valid 4", csu: 4}, + {name: "valid 8", csu: 8}, + {name: "valid 12", csu: 12}, + {name: "valid 16", csu: 16}, + {name: "valid 20", csu: 20}, + {name: "valid 24", csu: 24}, + {name: "valid 28", csu: 28}, + { + name: "legacy size 1 rejected", + csu: 1, + expectErr: true, + errContains: "not a valid CSU size", + }, + { + name: "legacy size 2 rejected", + csu: 2, + expectErr: true, + errContains: "not a valid CSU size", + }, + { + name: "in-range but non-canonical (5) rejected", + csu: 5, + expectErr: true, + errContains: "not a valid CSU size", + }, + { + name: "in-range but non-canonical (10) rejected", + csu: 10, + expectErr: true, + errContains: "not a valid CSU size", + }, + { + name: "above 28 routes to support-ticket message", + csu: 32, + expectErr: true, + errContains: "support ticket", + }, + { + name: "well above ceiling routes to support-ticket message", + csu: 128, + expectErr: true, + errContains: "support ticket", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := validateCsuForUpdate(tc.csu) + if tc.expectErr { + require.Error(t, err) + require.Contains(t, err.Error(), tc.errContains) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestFormatCsuList(t *testing.T) { + require.Equal(t, "4, 8, 12, 16, 20, 24, 28", formatCsuList(validCsuSizes)) + // Input order should not matter; output is sorted ascending. + require.Equal(t, "4, 8, 16", formatCsuList([]int32{16, 4, 8})) +} diff --git a/pkg/ccloudv2/client.go b/pkg/ccloudv2/client.go index a736857f5b..914c096e3e 100644 --- a/pkg/ccloudv2/client.go +++ b/pkg/ccloudv2/client.go @@ -20,7 +20,7 @@ import ( iamv2 "github.com/confluentinc/ccloud-sdk-go-v2/iam/v2" identityproviderv2 "github.com/confluentinc/ccloud-sdk-go-v2/identity-provider/v2" kafkaquotasv1 "github.com/confluentinc/ccloud-sdk-go-v2/kafka-quotas/v1" - ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2/ksql/v2" + ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2-internal/ksql/v2" mdsv2 "github.com/confluentinc/ccloud-sdk-go-v2/mds/v2" networkingaccesspointv1 "github.com/confluentinc/ccloud-sdk-go-v2/networking-access-point/v1" networkingdnsforwarderv1 "github.com/confluentinc/ccloud-sdk-go-v2/networking-dnsforwarder/v1" diff --git a/pkg/ccloudv2/ksql.go b/pkg/ccloudv2/ksql.go index 2327e25a6e..585c50c1c1 100644 --- a/pkg/ccloudv2/ksql.go +++ b/pkg/ccloudv2/ksql.go @@ -4,7 +4,7 @@ import ( "context" "net/http" - ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2/ksql/v2" + ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2-internal/ksql/v2" "github.com/confluentinc/cli/v4/pkg/errors" ) @@ -66,10 +66,34 @@ func (c *Client) CreateKsqlCluster(displayName, environmentId, kafkaClusterId, c DisplayName: &displayName, UseDetailedProcessingLog: &useDetailedProcessingLog, Csu: &csus, - KafkaCluster: &ksqlv2.ObjectReference{Id: kafkaClusterId, Related: "-", ResourceName: "-"}, - CredentialIdentity: &ksqlv2.ObjectReference{Id: credentialIdentity, Related: "-", ResourceName: "-"}, - Environment: &ksqlv2.ObjectReference{Id: environmentId, Related: "-", ResourceName: "-"}, + KafkaCluster: &ksqlv2.EnvScopedObjectReference{Id: kafkaClusterId, Related: "-", ResourceName: "-"}, + CredentialIdentity: &ksqlv2.TypedGlobalObjectReference{Id: credentialIdentity, Related: "-", ResourceName: "-"}, + Environment: &ksqlv2.GlobalObjectReference{Id: environmentId, Related: "-", ResourceName: "-"}, }} res, httpResp, err := c.KsqlClient.ClustersKsqldbcmV2Api.CreateKsqldbcmV2Cluster(c.ksqlApiContext()).KsqldbcmV2Cluster(cluster).Execute() return res, errors.CatchCCloudV2Error(err, httpResp) } + +// UpdateKsqlCluster issues PATCH /ksqldbcm/v2/clusters/{id} with +// {"spec":{"environment":{"id":...},"csu":N}} to trigger a self-serve resize. +// +// Currently wired against ccloud-sdk-go-v2-internal/ksql (NOT the public +// SDK) — cc-api PR #2507 (KSQL-14844) is held from merge until release- +// ready per the cc-api owners' direction, and per @sgagniere the internal +// SDK can be regenerated from the cc-api branch. Before merging cli #3368 +// to main, switch back to the public ccloud-sdk-go-v2/ksql SDK once +// cc-api #2507 merges and the public SDK has a release with +// UpdateKsqldbcmV2Cluster. See KSQL-14849 for the work item. +func (c *Client) UpdateKsqlCluster(id, environmentId string, csu int32) (ksqlv2.KsqldbcmV2Cluster, error) { + update := ksqlv2.KsqldbcmV2ClusterUpdate{ + Spec: &ksqlv2.KsqldbcmV2ClusterSpecUpdate{ + Environment: &ksqlv2.GlobalObjectReference{Id: environmentId, Related: "-", ResourceName: "-"}, + Csu: &csu, + }, + } + res, httpResp, err := c.KsqlClient.ClustersKsqldbcmV2Api. + UpdateKsqldbcmV2Cluster(c.ksqlApiContext(), id). + KsqldbcmV2ClusterUpdate(update). + Execute() + return res, errors.CatchCCloudV2Error(err, httpResp) +} diff --git a/test/test-server/ksql_handlers.go b/test/test-server/ksql_handlers.go index b71b52f79f..ae08da514c 100644 --- a/test/test-server/ksql_handlers.go +++ b/test/test-server/ksql_handlers.go @@ -8,18 +8,18 @@ import ( "github.com/gorilla/mux" "github.com/stretchr/testify/require" - ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2/ksql/v2" + ksqlv2 "github.com/confluentinc/ccloud-sdk-go-v2-internal/ksql/v2" ) var ksqlCluster1 = ksqlv2.KsqldbcmV2Cluster{ Id: ksqlv2.PtrString("lksqlc-ksql5"), Spec: &ksqlv2.KsqldbcmV2ClusterSpec{ DisplayName: ksqlv2.PtrString("account ksql"), - KafkaCluster: &ksqlv2.ObjectReference{ + KafkaCluster: &ksqlv2.EnvScopedObjectReference{ Id: "lkc-qwert", Environment: ksqlv2.PtrString("env-12345"), }, - Environment: &ksqlv2.ObjectReference{Id: "env-12345"}, + Environment: &ksqlv2.GlobalObjectReference{Id: "env-12345"}, UseDetailedProcessingLog: ksqlv2.PtrBool(true), }, Status: &ksqlv2.KsqldbcmV2ClusterStatus{ @@ -34,11 +34,11 @@ var ksqlCluster2 = ksqlv2.KsqldbcmV2Cluster{ Id: ksqlv2.PtrString("lksqlc-woooo"), Spec: &ksqlv2.KsqldbcmV2ClusterSpec{ DisplayName: ksqlv2.PtrString("kay cee queue elle"), - KafkaCluster: &ksqlv2.ObjectReference{ + KafkaCluster: &ksqlv2.EnvScopedObjectReference{ Id: "lkc-zxcvb", Environment: ksqlv2.PtrString("env-12345"), }, - Environment: &ksqlv2.ObjectReference{Id: "env-12345"}, + Environment: &ksqlv2.GlobalObjectReference{Id: "env-12345"}, }, Status: &ksqlv2.KsqldbcmV2ClusterStatus{ HttpEndpoint: ksqlv2.PtrString("SASL_SSL://ksql-endpoint"), @@ -52,11 +52,11 @@ var ksqlClusterForDetailedProcessingLogFalse = ksqlv2.KsqldbcmV2Cluster{ Id: ksqlv2.PtrString("lksqlc-woooo"), Spec: &ksqlv2.KsqldbcmV2ClusterSpec{ DisplayName: ksqlv2.PtrString("kay cee queue elle"), - KafkaCluster: &ksqlv2.ObjectReference{ + KafkaCluster: &ksqlv2.EnvScopedObjectReference{ Id: "lkc-zxcvb", Environment: ksqlv2.PtrString("env-12345"), }, - Environment: &ksqlv2.ObjectReference{Id: "env-12345"}, + Environment: &ksqlv2.GlobalObjectReference{Id: "env-12345"}, UseDetailedProcessingLog: ksqlv2.PtrBool(false), }, Status: &ksqlv2.KsqldbcmV2ClusterStatus{ @@ -121,12 +121,12 @@ func handleKsqlCluster(t *testing.T) http.HandlerFunc { Id: ksqlv2.PtrString("lksqlc-ksql1"), Spec: &ksqlv2.KsqldbcmV2ClusterSpec{ DisplayName: ksqlv2.PtrString("account ksql"), - KafkaCluster: &ksqlv2.ObjectReference{ + KafkaCluster: &ksqlv2.EnvScopedObjectReference{ Id: "lkc-12345", Environment: ksqlv2.PtrString("env-12345"), }, - CredentialIdentity: &ksqlv2.ObjectReference{Id: "u-123456"}, - Environment: &ksqlv2.ObjectReference{Id: "env-12345"}, + CredentialIdentity: &ksqlv2.TypedGlobalObjectReference{Id: "u-123456"}, + Environment: &ksqlv2.GlobalObjectReference{Id: "env-12345"}, }, Status: &ksqlv2.KsqldbcmV2ClusterStatus{ HttpEndpoint: ksqlv2.PtrString("SASL_SSL://ksql-endpoint"), @@ -140,12 +140,12 @@ func handleKsqlCluster(t *testing.T) http.HandlerFunc { Id: ksqlv2.PtrString("lksqlc-12345"), Spec: &ksqlv2.KsqldbcmV2ClusterSpec{ DisplayName: ksqlv2.PtrString("account ksql"), - KafkaCluster: &ksqlv2.ObjectReference{ + KafkaCluster: &ksqlv2.EnvScopedObjectReference{ Id: "lkc-abcde", Environment: ksqlv2.PtrString("env-12345"), }, - Environment: &ksqlv2.ObjectReference{Id: "env-12345"}, - CredentialIdentity: ksqlv2.NewObjectReference("sa-12345", "", ""), + Environment: &ksqlv2.GlobalObjectReference{Id: "env-12345"}, + CredentialIdentity: ksqlv2.NewTypedGlobalObjectReference("sa-12345", "", ""), }, Status: &ksqlv2.KsqldbcmV2ClusterStatus{ HttpEndpoint: ksqlv2.PtrString("SASL_SSL://ksql-endpoint"),