diff --git a/pkg/sql/adapters_pgx.go b/pkg/sql/adapters_pgx.go index 81db775..67b5422 100644 --- a/pkg/sql/adapters_pgx.go +++ b/pkg/sql/adapters_pgx.go @@ -66,25 +66,25 @@ func (c PgxBeginner) BeginTx(ctx context.Context, options *stdSQL.TxOptions) (Tx } func (c PgxBeginner) ExecContext(ctx context.Context, query string, args ...any) (Result, error) { - res, err := c.Conn.Exec(ctx, query, args...) + res, err := c.Exec(ctx, query, args...) return PgxResult{res}, err } func (c PgxBeginner) QueryContext(ctx context.Context, query string, args ...any) (Rows, error) { - rows, err := c.Conn.Query(ctx, query, args...) + rows, err := c.Query(ctx, query, args...) return PgxRows{rows}, err } func (t PgxTx) ExecContext(ctx context.Context, query string, args ...any) (Result, error) { - res, err := t.Tx.Exec(ctx, query, args...) + res, err := t.Exec(ctx, query, args...) return PgxResult{res}, err } func (t PgxTx) QueryContext(ctx context.Context, query string, args ...any) (Rows, error) { - rows, err := t.Tx.Query(ctx, query, args...) + rows, err := t.Query(ctx, query, args...) return PgxRows{rows}, err } diff --git a/pkg/sql/queue_schema_adapter_mysql_test.go b/pkg/sql/queue_schema_adapter_mysql_test.go index a1b0787..e0c6c8a 100644 --- a/pkg/sql/queue_schema_adapter_mysql_test.go +++ b/pkg/sql/queue_schema_adapter_mysql_test.go @@ -56,6 +56,7 @@ func TestMySQLQueueSchemaAdapter(t *testing.T) { } var receivedMessages []*message.Message +ReceiveLoop: for i := 0; i < 5; i++ { select { case msg := <-messages: @@ -63,7 +64,7 @@ func TestMySQLQueueSchemaAdapter(t *testing.T) { msg.Ack() case <-time.After(5 * time.Second): t.Errorf("expected to receive message") - break + break ReceiveLoop } } diff --git a/pkg/sql/queue_schema_adapter_postgresql.go b/pkg/sql/queue_schema_adapter_postgresql.go index 5faa769..974a3ab 100644 --- a/pkg/sql/queue_schema_adapter_postgresql.go +++ b/pkg/sql/queue_schema_adapter_postgresql.go @@ -74,7 +74,7 @@ func queueInsertMarkers(count int) string { index := 1 for i := 0; i < count; i++ { - result.WriteString(fmt.Sprintf("($%d,$%d,$%d),", index, index+1, index+2)) + fmt.Fprintf(&result, "($%d,$%d,$%d),", index, index+1, index+2) index += 3 } @@ -104,7 +104,7 @@ func (s PostgreSQLQueueSchema) SelectQuery(params SelectQueryParams) (Query, err if s.GenerateWhereClause != nil { where, args = s.GenerateWhereClause(whereParams) if where != "" { - where = "AND " + where + where = "AND (" + where + ")" } } diff --git a/pkg/sql/queue_schema_adapter_postgresql_test.go b/pkg/sql/queue_schema_adapter_postgresql_test.go index 20f151b..73a0a0e 100644 --- a/pkg/sql/queue_schema_adapter_postgresql_test.go +++ b/pkg/sql/queue_schema_adapter_postgresql_test.go @@ -56,6 +56,7 @@ func TestPostgreSQLQueueSchemaAdapter(t *testing.T) { } var receivedMessages []*message.Message +ReceiveLoop: for i := 0; i < 5; i++ { select { case msg := <-messages: @@ -63,7 +64,7 @@ func TestPostgreSQLQueueSchemaAdapter(t *testing.T) { msg.Ack() case <-time.After(5 * time.Second): t.Errorf("expected to receive message") - break + break ReceiveLoop } } diff --git a/pkg/sql/schema_adapter_postgresql.go b/pkg/sql/schema_adapter_postgresql.go index 4088cd0..1197e8b 100644 --- a/pkg/sql/schema_adapter_postgresql.go +++ b/pkg/sql/schema_adapter_postgresql.go @@ -95,7 +95,7 @@ func defaultInsertMarkers(count int) string { index := 1 for i := 0; i < count; i++ { - result.WriteString(fmt.Sprintf("($%d,$%d,$%d,pg_current_xact_id()),", index, index+1, index+2)) + fmt.Fprintf(&result, "($%d,$%d,$%d,pg_current_xact_id()),", index, index+1, index+2) index += 3 }