diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fe80d63..6ebf847a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Mark schema replacements as `Stable` in sqlc templates, preventing query SQL from having to be reallocated over and over again.. [PR #1242](https://github.com/riverqueue/river/pull/1242). - Fix unsafe concurrent producer map access in client. [PR #1236](https://github.com/riverqueue/river/pull/1236). - Fix bug in `sqltemplate` cached path in order in which named args are passed to a query (previously, the order was unstable). [PR #1243](https://github.com/riverqueue/river/pull/1243). diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index d8a2d445..16a3ae35 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -126,7 +126,7 @@ func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnE schema = "'" + params.Schema + "'" } ctx = sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ - "schema": {Value: schema}, + "schema": {Value: schema, Stable: true}, }, nil) exists, err := dbsqlc.New().ColumnExists(ctx, e.dbtx, &dbsqlc.ColumnExistsParams{ @@ -1247,6 +1247,6 @@ func schemaTemplateParam(ctx context.Context, schema string) context.Context { } return sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ - "schema": {Value: schema}, + "schema": {Value: schema, Stable: true}, }, nil) } diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver_test.go b/riverdriver/riverdatabasesql/river_database_sql_driver_test.go index 3097dbbf..0d23bb43 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver_test.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver_test.go @@ -115,4 +115,59 @@ func TestSchemaTemplateParam(t *testing.T) { require.NoError(t, err) require.Equal(t, `SELECT 1 FROM "custom_schema".river_job`, updatedSQL) }) + + t.Run("SchemaReplacementIsStable", func(t *testing.T) { + t.Parallel() + + replacer, bundle := setup(t) + + const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job" + + updatedSQL1, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, "my_schema"), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, `SELECT 1 FROM "my_schema".river_job`, updatedSQL1) + + // Second call with same SQL + same schema produces identical result. + // Because schema is marked Stable, the Replacer caches the output + // after the first call and short-circuits regex on subsequent calls. + updatedSQL2, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, "my_schema"), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, updatedSQL1, updatedSQL2) + }) + + t.Run("EmptySchemaReplacementIsStable", func(t *testing.T) { + t.Parallel() + + replacer, bundle := setup(t) + + const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job" + + updatedSQL1, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, ""), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, "SELECT 1 FROM river_job", updatedSQL1) + + updatedSQL2, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, ""), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, updatedSQL1, updatedSQL2) + }) } diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index e0201a0e..efbddf1f 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -1310,6 +1310,6 @@ func schemaTemplateParam(ctx context.Context, schema string) context.Context { } return sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ - "schema": {Value: schema}, + "schema": {Value: schema, Stable: true}, }, nil) } diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go b/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go index f89538f4..ae366d5a 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go @@ -238,6 +238,63 @@ func TestSchemaTemplateParam(t *testing.T) { require.NoError(t, err) require.Equal(t, `SELECT 1 FROM "custom_schema".river_job`, updatedSQL) }) + + t.Run("SchemaReplacementIsStable", func(t *testing.T) { + t.Parallel() + + replacer, bundle := setup(t) + + const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job" + + // First call + updatedSQL1, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, "my_schema"), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, `SELECT 1 FROM "my_schema".river_job`, updatedSQL1) + + // Second call with same SQL + same schema produces identical result. + // Because schema is marked Stable, the Replacer caches the output + // after the first call and short-circuits regex on subsequent calls. + updatedSQL2, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, "my_schema"), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, updatedSQL1, updatedSQL2) + }) + + t.Run("EmptySchemaReplacementIsStable", func(t *testing.T) { + t.Parallel() + + replacer, bundle := setup(t) + + const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job" + + updatedSQL1, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, ""), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, "SELECT 1 FROM river_job", updatedSQL1) + + // Repeat — same result from cache + updatedSQL2, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, ""), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, updatedSQL1, updatedSQL2) + }) } type nilConnDBTX struct{} diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 132d0e46..b7d729ae 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -1680,7 +1680,7 @@ func schemaTemplateParam(ctx context.Context, schema string) context.Context { } return sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ - "schema": {Value: schema}, + "schema": {Value: schema, Stable: true}, }, nil) } diff --git a/riverdriver/riversqlite/river_sqlite_driver_test.go b/riverdriver/riversqlite/river_sqlite_driver_test.go index fcba8f36..1d445029 100644 --- a/riverdriver/riversqlite/river_sqlite_driver_test.go +++ b/riverdriver/riversqlite/river_sqlite_driver_test.go @@ -94,4 +94,59 @@ func TestSchemaTemplateParam(t *testing.T) { require.NoError(t, err) require.Equal(t, `SELECT 1 FROM "custom_schema".river_job`, updatedSQL) }) + + t.Run("SchemaReplacementIsStable", func(t *testing.T) { + t.Parallel() + + replacer, bundle := setup(t) + + const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job" + + updatedSQL1, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, "my_schema"), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, `SELECT 1 FROM "my_schema".river_job`, updatedSQL1) + + // Second call with same SQL + same schema produces identical result. + // Because schema is marked Stable, the Replacer caches the output + // after the first call and short-circuits regex on subsequent calls. + updatedSQL2, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, "my_schema"), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, updatedSQL1, updatedSQL2) + }) + + t.Run("EmptySchemaReplacementIsStable", func(t *testing.T) { + t.Parallel() + + replacer, bundle := setup(t) + + const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job" + + updatedSQL1, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, ""), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, "SELECT 1 FROM river_job", updatedSQL1) + + updatedSQL2, _, err := replacer.RunSafely( + schemaTemplateParam(ctx, ""), + bundle.driver.ArgPlaceholder(), + sql, + nil, + ) + require.NoError(t, err) + require.Equal(t, updatedSQL1, updatedSQL2) + }) }