diff --git a/cli-definition.json b/cli-definition.json index e37567e5..feefa35f 100644 --- a/cli-definition.json +++ b/cli-definition.json @@ -67,6 +67,11 @@ "use": "run", "example": "\n\tpgstream run --source postgres --source-url --target postgres --target-url --init\n\tpgstream run --source postgres --source-url --target postgres --target-url --snapshot-tables --reset\n\tpgstream run --source kafka --source-url --target elasticsearch --target-url \n\tpgstream run --source postgres --source-url --target kafka --target-url \n\tpgstream run --config config.yaml --log-level info\n\tpgstream run --config config.env", "flags": [ + { + "name": "data-only", + "description": "When used with --snapshot-tables, skip schema restore and only snapshot data (use when schema is already present on target)", + "default": "false" + }, { "name": "dump-file", "description": "File where the pg_dump output will be written if initial snapshot is enabled", diff --git a/cmd/root_cmd.go b/cmd/root_cmd.go index 7d800adb..3503ab6d 100644 --- a/cmd/root_cmd.go +++ b/cmd/root_cmd.go @@ -86,6 +86,7 @@ func Prepare() *cobra.Command { runCmd.Flags().Bool("profile", false, "Whether to expose a /debug/pprof endpoint on localhost:6060") runCmd.Flags().BoolVar(&initFlag, "init", false, "Whether to initialize pgstream before starting replication") runCmd.Flags().String("dump-file", "", "File where the pg_dump output will be written if initial snapshot is enabled") + runCmd.Flags().Bool("data-only", false, "When used with --snapshot-tables, skip schema restore and only snapshot data (use when schema is already present on target)") runCmd.Flags().Bool("with-injector", false, "Whether to enable the injection of pgstream metadata to the WAL events. Required for search targets.") // status cmd diff --git a/cmd/run_cmd.go b/cmd/run_cmd.go index 5c25a592..1399dfab 100644 --- a/cmd/run_cmd.go +++ b/cmd/run_cmd.go @@ -93,10 +93,15 @@ func initialSnapshotFlagBinding(cmd *cobra.Command) { viper.BindPFlag("source.postgres.snapshot.tables", cmd.Flags().Lookup("snapshot-tables")) if len(viper.GetStringSlice("source.postgres.snapshot.tables")) > 0 { viper.Set("source.postgres.mode", "snapshot_and_replication") - viper.Set("source.postgres.snapshot.mode", "full") - viper.Set("source.postgres.snapshot.schema.mode", "schemalog") - if cmd.Flags().Lookup("target").Value.String() == postgres { - viper.Set("source.postgres.snapshot.schema.mode", "pgdump_pgrestore") + dataOnly, _ := cmd.Flags().GetBool("data-only") + if dataOnly { + viper.Set("source.postgres.snapshot.mode", "data") + } else { + viper.Set("source.postgres.snapshot.mode", "full") + viper.Set("source.postgres.snapshot.schema.mode", "schemalog") + if cmd.Flags().Lookup("target").Value.String() == postgres { + viper.Set("source.postgres.snapshot.schema.mode", "pgdump_pgrestore") + } } } diff --git a/internal/postgres/pg_restore.go b/internal/postgres/pg_restore.go index cefac8ee..d1bd8e2b 100644 --- a/internal/postgres/pg_restore.go +++ b/internal/postgres/pg_restore.go @@ -180,6 +180,7 @@ func isErrorLine(line string) bool { func parseErrorLine(line string) error { switch { case strings.Contains(line, "already exists"), + strings.Contains(line, "already a partition"), strings.Contains(line, "multiple primary keys for table"): return &ErrRelationAlreadyExists{Details: line} case strings.Contains(line, "cannot drop schema public because other objects depend on it"): diff --git a/internal/postgres/pg_restore_test.go b/internal/postgres/pg_restore_test.go index 1b97c266..e9a44ea2 100644 --- a/internal/postgres/pg_restore_test.go +++ b/internal/postgres/pg_restore_test.go @@ -127,6 +127,16 @@ pg_restore: error: could not execute query: ERROR: permission denied to grant p }, }, }, + { + name: "partition already attached error", + output: "ERROR: \"linking_queue_000\" is already a partition\n", + + wantErrs: &PGRestoreErrors{ + ignoredErrs: []error{ + &ErrRelationAlreadyExists{Details: "ERROR: \"linking_queue_000\" is already a partition"}, + }, + }, + }, { name: "mixed success and error output", output: `pg_restore: processing data for table "users" @@ -283,6 +293,11 @@ func TestParseErrorLine(t *testing.T) { line: `ERROR: relation "public.vendor_products" does not exist`, wantErr: &ErrRelationDoesNotExist{Details: `ERROR: relation "public.vendor_products" does not exist`}, }, + { + name: "already a partition", + line: `ERROR: "linking_queue_000" is already a partition`, + wantErr: &ErrRelationAlreadyExists{Details: `ERROR: "linking_queue_000" is already a partition`}, + }, { name: "generic error", line: "pg_restore: error: connection failed",