Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cli-definition.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
"use": "run",
"example": "\n\tpgstream run --source postgres --source-url <source-postgres-url> --target postgres --target-url <target-postgres-url> --init\n\tpgstream run --source postgres --source-url <source-postgres-url> --target postgres --target-url <target-postgres-url> --snapshot-tables <schema.table> --reset\n\tpgstream run --source kafka --source-url <kafka-url> --target elasticsearch --target-url <elasticsearch-url>\n\tpgstream run --source postgres --source-url <postgres-url> --target kafka --target-url <kafka-url>\n\tpgstream run --config config.yaml --log-level info\n\tpgstream run --config config.env",
"flags": [
{
"name": "data-only",
"description": "When used with --snapshot-tables, skip schema restore and only snapshot data (use when schema is already present on target)",
"default": "false"
},
{
"name": "dump-file",
"description": "File where the pg_dump output will be written if initial snapshot is enabled",
Expand Down
1 change: 1 addition & 0 deletions cmd/root_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func Prepare() *cobra.Command {
runCmd.Flags().Bool("profile", false, "Whether to expose a /debug/pprof endpoint on localhost:6060")
runCmd.Flags().BoolVar(&initFlag, "init", false, "Whether to initialize pgstream before starting replication")
runCmd.Flags().String("dump-file", "", "File where the pg_dump output will be written if initial snapshot is enabled")
runCmd.Flags().Bool("data-only", false, "When used with --snapshot-tables, skip schema restore and only snapshot data (use when schema is already present on target)")
runCmd.Flags().Bool("with-injector", false, "Whether to enable the injection of pgstream metadata to the WAL events. Required for search targets.")

// status cmd
Expand Down
13 changes: 9 additions & 4 deletions cmd/run_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,15 @@ func initialSnapshotFlagBinding(cmd *cobra.Command) {
viper.BindPFlag("source.postgres.snapshot.tables", cmd.Flags().Lookup("snapshot-tables"))
if len(viper.GetStringSlice("source.postgres.snapshot.tables")) > 0 {
viper.Set("source.postgres.mode", "snapshot_and_replication")
viper.Set("source.postgres.snapshot.mode", "full")
viper.Set("source.postgres.snapshot.schema.mode", "schemalog")
if cmd.Flags().Lookup("target").Value.String() == postgres {
viper.Set("source.postgres.snapshot.schema.mode", "pgdump_pgrestore")
dataOnly, _ := cmd.Flags().GetBool("data-only")
if dataOnly {
viper.Set("source.postgres.snapshot.mode", "data")
} else {
viper.Set("source.postgres.snapshot.mode", "full")
viper.Set("source.postgres.snapshot.schema.mode", "schemalog")
if cmd.Flags().Lookup("target").Value.String() == postgres {
viper.Set("source.postgres.snapshot.schema.mode", "pgdump_pgrestore")
}
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/postgres/pg_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func isErrorLine(line string) bool {
func parseErrorLine(line string) error {
switch {
case strings.Contains(line, "already exists"),
strings.Contains(line, "already a partition"),
strings.Contains(line, "multiple primary keys for table"):
return &ErrRelationAlreadyExists{Details: line}
case strings.Contains(line, "cannot drop schema public because other objects depend on it"):
Expand Down
15 changes: 15 additions & 0 deletions internal/postgres/pg_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ pg_restore: error: could not execute query: ERROR: permission denied to grant p
},
},
},
{
name: "partition already attached error",
output: "ERROR: \"linking_queue_000\" is already a partition\n",

wantErrs: &PGRestoreErrors{
ignoredErrs: []error{
&ErrRelationAlreadyExists{Details: "ERROR: \"linking_queue_000\" is already a partition"},
},
},
},
{
name: "mixed success and error output",
output: `pg_restore: processing data for table "users"
Expand Down Expand Up @@ -283,6 +293,11 @@ func TestParseErrorLine(t *testing.T) {
line: `ERROR: relation "public.vendor_products" does not exist`,
wantErr: &ErrRelationDoesNotExist{Details: `ERROR: relation "public.vendor_products" does not exist`},
},
{
name: "already a partition",
line: `ERROR: "linking_queue_000" is already a partition`,
wantErr: &ErrRelationAlreadyExists{Details: `ERROR: "linking_queue_000" is already a partition`},
},
{
name: "generic error",
line: "pg_restore: error: connection failed",
Expand Down
Loading