From 82c59532dd93be8fae81412a49195dd0b2c4139b Mon Sep 17 00:00:00 2001 From: General Kroll Date: Sat, 4 Apr 2026 09:52:22 +1100 Subject: [PATCH] format-requirements Extended query protocol support with error recovery Summary: - Implement full extended query protocol (Parse, Bind, Describe, Execute, Close, Sync, Flush) with proper wire message parsing and serialization - Add IExtendedQueryBackend interface with opt-in support via type assertion; DefaultExtendedQueryBackend delegates to simple query path for backwards compat - Restructure command loop: ReadyForQuery sent once initially, then only after SimpleQuery completion and Sync (per PostgreSQL protocol spec) - Add error state tracking: after extended query failure, discard messages until Sync, then send ReadyForQuery('E') matching PostgreSQL behavior - Fix ClientClose ('C') to close prepared statements/portals, not the connection; connection close moved to ClientTerminate ('X') where it belongs - Add text format bypass in Column.Write: string/[]byte sources write raw bytes directly, preserving exact backend representation (e.g. sqlite "t"/"f" for bools) - Thread resultFormats from Bind through to RowDescription and data encoding, supporting per-column text/binary format selection per protocol spec - Per-connection PreparedStatement and Portal caches - Enable previously disabled pgx query test - Add comprehensive test coverage: extended query happy path, error recovery (Parse/Bind/Describe/Execute errors), statement/portal lifecycle, multiple Sync recovery, simple query after extended query error, text bypass preservation, resolveResultFormat protocol rules, sqlbackend unit tests - Soft linting. - Better doco. --- .claude/.gitignore | 1 + .github/workflows/build.yaml | 32 ++++ .github/workflows/lint.yaml | 31 +++- .golangci.yml | 178 ++++++++++++++++++ Makefile | 4 +- command.go | 8 +- docs/aot_metadata.md | 31 ---- docs/developer-guide.md | 8 + docs/postgres_emulation.md | 62 +++++++ docs/stackql_backend_migration.md | 217 ---------------------- extended_query.go | 36 +++- row.go | 38 +++- row_test.go | 293 ++++++++++++++++++++++++++++++ writer.go | 11 +- 14 files changed, 679 insertions(+), 271 deletions(-) create mode 100644 .claude/.gitignore create mode 100644 .github/workflows/build.yaml create mode 100644 .golangci.yml delete mode 100644 docs/aot_metadata.md create mode 100644 docs/developer-guide.md create mode 100644 docs/postgres_emulation.md delete mode 100644 docs/stackql_backend_migration.md create mode 100644 row_test.go diff --git a/.claude/.gitignore b/.claude/.gitignore new file mode 100644 index 0000000..be6bdc7 --- /dev/null +++ b/.claude/.gitignore @@ -0,0 +1 @@ +projects/ \ No newline at end of file diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 0000000..ae4faac --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,32 @@ +name: Build and test + +on: + push: + tags: + - 'v*' + - 'build*' + branches: + - main + - develop + pull_request: + branches: + - main + - develop + +jobs: + + test: + name: test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.17 + + - name: Unit tests + run: | + go mod tidy + go test ./... -v diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 8540a34..9a9b2f5 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -1,11 +1,27 @@ -name: CI +name: Linting checks on: push: + tags: + - 'v*' + - 'build*' + - 'lint*' + branches: + - main + - develop + pull_request: + branches: + - main + - develop + +env: + GOLANGCI_LINT_VERSION: ${{ vars.GOLANGCI_LINT_VERSION == '' && 'v2.5.0' || vars.GOLANGCI_LINT_VERSION }} + DEFAULT_STEP_TIMEOUT: ${{ vars.DEFAULT_STEP_TIMEOUT_MIN == '' && '20' || vars.DEFAULT_STEP_TIMEOUT_MIN }} jobs: lint: + name: lint runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 @@ -16,16 +32,23 @@ jobs: go-version: 1.17 - name: Restore bin - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ./bin key: ${{ runner.os }}-bin-${{ hashFiles('**/go.sum') }} - name: Linting - run: make lint + run: | + make lint GOLANGCI_LINT_VERSION=${GOLANGCI_LINT_VERSION} | tee lint.log 2>&1 || true + + - name: Upload linting results + uses: actions/upload-artifact@v4 + with: + name: lint-results + path: lint.log - name: Cache bin - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ./bin key: ${{ runner.os }}-bin-${{ hashFiles('**/go.sum') }} diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..3456286 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,178 @@ +version: "2" +output: + formats: + text: + path: stdout +linters: + default: none + enable: + - asasalint + - asciicheck + - bidichk + - bodyclose + - cyclop + - dupl + - durationcheck + - errcheck + - errname + - errorlint + - exhaustive + - forbidigo + - funlen + - gocheckcompilerdirectives + - gochecknoglobals + - gochecknoinits + - gocognit + # - goconst + - gocritic + - gocyclo + - godot + - gomodguard + - goprintffuncname + - gosec + - govet + - ineffassign + - lll + - loggercheck + - makezero + - mnd + - musttag + - nakedret + - nestif + - nilerr + - nilnil + # - noctx + # - nolintlint + - nonamedreturns + - nosprintfhostport + - predeclared + - promlinter + - reassign + - revive + - rowserrcheck + - sqlclosecheck + # - staticcheck + - testableexamples + - testpackage + - tparallel + - unconvert + - unparam + - unused + - usestdlibvars + - wastedassign + - whitespace + settings: + cyclop: + max-complexity: 30 + package-average: 10 + errcheck: + check-type-assertions: true + exhaustive: + check: + - switch + - map + exhaustruct: + exclude: + - ^net/http.Client$ + - ^net/http.Cookie$ + - ^net/http.Request$ + - ^net/http.Response$ + - ^net/http.Server$ + - ^net/http.Transport$ + - ^net/url.URL$ + - ^os/exec.Cmd$ + - ^reflect.StructField$ + - ^github.com/Shopify/sarama.Config$ + - ^github.com/Shopify/sarama.ProducerMessage$ + - ^github.com/mitchellh/mapstructure.DecoderConfig$ + - ^github.com/prometheus/client_golang/.+Opts$ + - ^github.com/spf13/cobra.Command$ + - ^github.com/spf13/cobra.CompletionOptions$ + - ^github.com/stretchr/testify/mock.Mock$ + - ^github.com/testcontainers/testcontainers-go.+Request$ + - ^github.com/testcontainers/testcontainers-go.FromDockerfile$ + - ^golang.org/x/tools/go/analysis.Analyzer$ + - ^google.golang.org/protobuf/.+Options$ + - ^gopkg.in/yaml.v3.Node$ + funlen: + lines: 100 + statements: 50 + gocognit: + min-complexity: 20 + gocritic: + settings: + captLocal: + paramsOnly: false + underef: + skipRecvDeref: false + gomodguard: + blocked: + modules: + - github.com/golang/protobuf: + recommendations: + - google.golang.org/protobuf + reason: see https://developers.google.com/protocol-buffers/docs/reference/go/faq#modules + - github.com/satori/go.uuid: + recommendations: + - github.com/google/uuid + reason: satori's package is not maintained + - github.com/gofrs/uuid: + recommendations: + - github.com/google/uuid + reason: gofrs' package is not go module + govet: + disable: + - fieldalignment + enable-all: true + settings: + shadow: + strict: true + mnd: + ignored-functions: + - os.Chmod + - os.Mkdir + - os.MkdirAll + - os.OpenFile + - os.WriteFile + - prometheus.ExponentialBuckets + - prometheus.ExponentialBucketsRange + - prometheus.LinearBuckets + nakedret: + max-func-lines: 0 + nolintlint: + require-explanation: true + require-specific: true + allow-no-explanation: + - funlen + - gocognit + - lll + rowserrcheck: + packages: + - github.com/jmoiron/sqlx + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + rules: + - linters: + - stylecheck + path: command\.go + paths: + - third_party$ + - '.*_test\.go$' + - builtin$ + - examples$ +issues: + max-same-issues: 50 +formatters: + enable: + - goimports + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/Makefile b/Makefile index a3bb998..2bd7081 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,8 @@ GOPATH = $(HOME)/go GOBIN = $(GOPATH)/bin GO ?= GOGC=off $(shell which go) +GOLANGCI_LINT_VERSION = v2.5.0 + # Printing V = 0 Q = $(if $(filter 1,$V),,@) @@ -23,7 +25,7 @@ $(BIN)/%: | $(BIN) ; $(info $(M) building $(@F)…) GOLANGCI_LINT = $(BIN)/golangci-lint $(BIN)/golangci-lint: | $(BIN) ; - curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s v1.42.1 + curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/$(GOLANGCI_LINT_VERSION)/install.sh| sh -s $(GOLANGCI_LINT_VERSION) STRINGER = $(BIN)/stringer GOIMPORTS = $(BIN)/goimports diff --git a/command.go b/command.go index 6d43936..0bbb8fa 100644 --- a/command.go +++ b/command.go @@ -244,7 +244,7 @@ func (srv *Server) handleSimpleQuery(ctx context.Context, cn SQLConnection) erro } if !headersWritten { headersWritten = true - srv.writeSQLResultHeader(ctx, res, dw) + srv.writeSQLResultHeader(ctx, res, dw, nil) } srv.writeSQLResultRows(ctx, res, dw) // TODO: add debug messages, configurably @@ -283,16 +283,16 @@ func (srv *Server) writeSQLResultRows(ctx context.Context, res sqldata.ISQLResul return nil } -func (srv *Server) writeSQLResultHeader(ctx context.Context, res sqldata.ISQLResult, writer DataWriter) error { +func (srv *Server) writeSQLResultHeader(ctx context.Context, res sqldata.ISQLResult, writer DataWriter, resultFormats []int16) error { var colz Columns - for _, c := range res.GetColumns() { + for i, c := range res.GetColumns() { colz = append(colz, Column{ Table: c.GetTableId(), Name: c.GetName(), Oid: oid.Oid(c.GetObjectID()), Width: c.GetWidth(), - Format: TextFormat, + Format: resolveResultFormat(resultFormats, i), }, ) } diff --git a/docs/aot_metadata.md b/docs/aot_metadata.md deleted file mode 100644 index d351ab1..0000000 --- a/docs/aot_metadata.md +++ /dev/null @@ -1,31 +0,0 @@ - -# AOT metadata - -The postgres wire protocol supports many ahead of time (AOT) metadata exchanges, eg: - -- Columns and types returned by a query. -- Other aspects of extended querying per [the postgres documentation](https://www.postgresql.org/docs/16/protocol-overview.html#PROTOCOL-QUERY-CONCEPTS). - - -These are required for prepared statement execution and are used liberally by client libraries in various language runtimes. - - -## Important Aspects - -### Support for full wire protocol messages - -The main focus is support for extended queries; the theory being that this results in seamless consumption of stackql by myriad client libraries. - -That said, there is little reason not to fully support all messages [per the docs](https://www.postgresql.org/docs/16/protocol-message-formats.html). - -The SQL backend interface needs some breadth in order to achieve this. We support a blended and opt in implementation that leverages a combination of simple query only implementation plus method stubs for extended. A word of caution, the prior logic is integral to correct function of the simple query function and so ideally changes should not break existing clients. The regression test suite is visible in [the `stackql` core repository](https://github.com/stackql/stackql). - -### Full wire protocol support with stackql - -The `psql-wire` library exposes an `IExtendedQueryBackend` interface that stackql can implement to participate in the extended query protocol. A `DefaultExtendedQueryBackend` exists that delegates execution to the simple query path, so basic compatibility comes for free. Richer support involves the following areas within stackql: - -- **Parameter type resolution**: `HandleParse` receives parameter OIDs (which may be zero/unspecified). stackql resolves these against its own type system and returns concrete OIDs so that clients can format bind values correctly. -- **Result column metadata**: `HandleDescribeStatement` and `HandleDescribePortal` return column names, types, and widths. stackql derives this from its query planner or schema metadata so that clients like pgx can allocate typed scan targets before any rows arrive. -- **Parameterised execution**: `HandleExecute` receives the bound parameter values alongside the original query. stackql substitutes these into its execution pipeline, replacing the simple string-only path. -- **Statement and portal lifecycle**: `HandleCloseStatement` and `HandleClosePortal` allow stackql to release any cached plans or intermediate state associated with a named prepared statement or portal. -- **Bind-time validation**: `HandleBind` gives stackql an opportunity to validate parameter formats and values before execution, enabling early error reporting within the extended query error-recovery model (errors before `Sync` do not tear down the connection). diff --git a/docs/developer-guide.md b/docs/developer-guide.md new file mode 100644 index 0000000..9f789d5 --- /dev/null +++ b/docs/developer-guide.md @@ -0,0 +1,8 @@ + +# Developer Guide + +## Running linter locally + +```bash +make lint +``` diff --git a/docs/postgres_emulation.md b/docs/postgres_emulation.md new file mode 100644 index 0000000..7c50238 --- /dev/null +++ b/docs/postgres_emulation.md @@ -0,0 +1,62 @@ + +# Postgres protocol emulation + +How `psql-wire` + stackql emulate a real PostgreSQL server for client libraries. + +## What is implemented + +### Connection lifecycle + +Handshake, TLS upgrade, clear-text password auth, parameter exchange, and graceful termination per [protocol flow](https://www.postgresql.org/docs/16/protocol-flow.html). + +### Simple query protocol + +Full support. `ClientSimpleQuery` ('Q') parses, executes, and streams results with `RowDescription`, `DataRow`, `CommandComplete`, and `ReadyForQuery` per [message formats](https://www.postgresql.org/docs/16/protocol-message-formats.html). Compound queries (`;`-separated) are split and executed sequentially. + +### Extended query protocol + +Full message support for the [extended query cycle](https://www.postgresql.org/docs/16/protocol-overview.html#PROTOCOL-QUERY-CONCEPTS): + +| Message | Direction | Status | +|---------|-----------|--------| +| Parse ('P') | client | Implemented — caches prepared statement | +| Bind ('B') | client | Implemented — binds params, creates portal | +| Describe ('D') | client | Implemented — returns ParameterDescription + RowDescription/NoData | +| Execute ('E') | client | Implemented — executes portal, streams results | +| Close ('C') | client | Implemented — closes statement or portal | +| Sync ('S') | client | Implemented — ends cycle, sends ReadyForQuery | +| Flush ('H') | client | Implemented — no-op (writes are unbuffered) | + +### Error recovery + +Matches PostgreSQL behaviour: after an extended query error, all messages are discarded until `Sync`, which sends `ReadyForQuery('E')`. Subsequent `Sync` returns to `ReadyForQuery('I')`. + +### Text format passthrough + +For `FormatCode=0` (text), string/`[]byte` values bypass `pgtype` encoding and write raw bytes directly. This preserves exact backend representation (e.g. sqlite `"t"`/`"f"` for booleans) while advertising correct OIDs in `RowDescription`. + +### Result format negotiation + +`resultFormats` from `Bind` are threaded through to `RowDescription` and data encoding, supporting per-column text/binary format selection per [protocol spec](https://www.postgresql.org/docs/16/protocol-message-formats.html). + +## What requires stackql-side implementation + +The `IExtendedQueryBackend` interface is fully wired. A `DefaultExtendedQueryBackend` delegates to `HandleSimpleQuery`, providing basic compatibility. For full fidelity, stackql implements: + +- **`HandleParse`** — resolve unspecified parameter OIDs against stackql's type system. +- **`HandleDescribeStatement` / `HandleDescribePortal`** — return column metadata from the query planner so clients can allocate typed scan targets. +- **`HandleExecute`** — substitute bound parameter values into the execution pipeline. +- **`HandleBind`** — validate parameter formats/values before execution. +- **`HandleCloseStatement` / `HandleClosePortal`** — release cached plans. + +See [stackql core repository](https://github.com/stackql/stackql) for the backend implementation. + +## Not implemented + +- **Copy protocol** — `CopyData`/`CopyDone`/`CopyFail` messages are intentionally ignored per [protocol spec](https://www.postgresql.org/docs/16/protocol-message-formats.html). +- **SASL / MD5 / SCRAM auth** — only clear-text password is supported. +- **Transaction blocks** — `ReadyForQuery` status is always `'I'` (idle) or `'E'` (error); `'T'` (in transaction) is never sent. +- **Portal suspension** — `Execute` with `maxRows > 0` does not yet suspend and resume portals via `ServerPortalSuspended`. +- **`FunctionCall` message** — deprecated in PostgreSQL, not implemented. +- **Notification / `LISTEN`/`NOTIFY`** — async notification messages are not emitted. + diff --git a/docs/stackql_backend_migration.md b/docs/stackql_backend_migration.md deleted file mode 100644 index 95c134d..0000000 --- a/docs/stackql_backend_migration.md +++ /dev/null @@ -1,217 +0,0 @@ -# Migrating a stackql backend to extended query support - -This document describes how to add extended query protocol support to a stackql `ISQLBackend` implementation, replacing the default stubs provided by `psql-wire`. - -## Background - -The `psql-wire` library auto-detects whether an `ISQLBackend` also implements `IExtendedQueryBackend` via a type assertion in `connection.go`: - -```go -if eb, ok := sqlBackend.(sqlbackend.IExtendedQueryBackend); ok { - extBackend = eb -} else if sqlBackend != nil { - extBackend = sqlbackend.NewDefaultExtendedQueryBackend(sqlBackend) -} -``` - -If the backend does not implement `IExtendedQueryBackend`, a `DefaultExtendedQueryBackend` wraps it. This default delegates `HandleExecute` to `HandleSimpleQuery` and stubs out everything else. Client libraries like pgx can connect and run unparameterised queries through this path. - -No factory or wiring changes are needed to opt in. Adding the methods to the existing struct is sufficient. - -## The IExtendedQueryBackend interface - -```go -type IExtendedQueryBackend interface { - HandleParse(ctx context.Context, stmtName string, query string, paramOIDs []uint32) ([]uint32, error) - HandleBind(ctx context.Context, portalName string, stmtName string, paramFormats []int16, paramValues [][]byte, resultFormats []int16) error - HandleDescribeStatement(ctx context.Context, stmtName string, query string, paramOIDs []uint32) ([]uint32, []sqldata.ISQLColumn, error) - HandleDescribePortal(ctx context.Context, portalName string, stmtName string, query string, paramOIDs []uint32) ([]sqldata.ISQLColumn, error) - HandleExecute(ctx context.Context, portalName string, stmtName string, query string, paramFormats []int16, paramValues [][]byte, resultFormats []int16, maxRows int32) (sqldata.ISQLResultStream, error) - HandleCloseStatement(ctx context.Context, stmtName string) error - HandleClosePortal(ctx context.Context, portalName string) error -} -``` - -## Migration steps - -### Step 1: Add no-op stubs to the existing backend - -Locate the struct in stackql that implements `ISQLBackend` (the one with `HandleSimpleQuery`, `SplitCompoundQuery`, and `GetDebugStr`). Add the seven methods below. These are direct copies of the `DefaultExtendedQueryBackend` behaviour, so robot tests should produce identical results. - -```go -func (sb *YourBackend) HandleParse(ctx context.Context, stmtName string, query string, paramOIDs []uint32) ([]uint32, error) { - return paramOIDs, nil -} - -func (sb *YourBackend) HandleBind(ctx context.Context, portalName string, stmtName string, paramFormats []int16, paramValues [][]byte, resultFormats []int16) error { - return nil -} - -func (sb *YourBackend) HandleDescribeStatement(ctx context.Context, stmtName string, query string, paramOIDs []uint32) ([]uint32, []sqldata.ISQLColumn, error) { - return paramOIDs, nil, nil -} - -func (sb *YourBackend) HandleDescribePortal(ctx context.Context, portalName string, stmtName string, query string, paramOIDs []uint32) ([]sqldata.ISQLColumn, error) { - return nil, nil -} - -func (sb *YourBackend) HandleExecute(ctx context.Context, portalName string, stmtName string, query string, paramFormats []int16, paramValues [][]byte, resultFormats []int16, maxRows int32) (sqldata.ISQLResultStream, error) { - return sb.HandleSimpleQuery(ctx, query) -} - -func (sb *YourBackend) HandleCloseStatement(ctx context.Context, stmtName string) error { - return nil -} - -func (sb *YourBackend) HandleClosePortal(ctx context.Context, portalName string) error { - return nil -} -``` - -**Verification**: run the full robot test suite. Behaviour should be unchanged. - -### Step 2: Add a compile-time interface check - -Near the top of the file, add: - -```go -var _ sqlbackend.IExtendedQueryBackend = (*YourBackend)(nil) -``` - -This ensures the compiler catches missing methods if the interface changes. - -### Step 3: Implement HandleExecute with parameter substitution - -`HandleExecute` receives the query string and bound parameter values. The parameters arrive as: - -- `paramFormats []int16` — one entry per parameter: `0` = text, `1` = binary. May be empty (all text) or length 1 (applies to all). -- `paramValues [][]byte` — raw bytes for each parameter. `nil` entry = SQL NULL. -- `resultFormats []int16` — requested result column formats (currently safe to ignore; the wire library encodes as text). - -#### Option A: String interpolation (simplest) - -Replace positional parameters (`$1`, `$2`, ...) with their text values, applying appropriate quoting, then delegate to `HandleSimpleQuery`: - -```go -func (sb *YourBackend) HandleExecute(ctx context.Context, portalName string, stmtName string, query string, paramFormats []int16, paramValues [][]byte, resultFormats []int16, maxRows int32) (sqldata.ISQLResultStream, error) { - resolved := substituteParams(query, paramFormats, paramValues) - return sb.HandleSimpleQuery(ctx, resolved) -} -``` - -Where `substituteParams` replaces `$N` tokens with the corresponding text value. Rules: - -- `paramValues[i] == nil` → substitute `NULL` (no quotes). -- Otherwise use the text representation from `paramValues[i]`. Quote string values with single quotes and escape embedded single quotes by doubling them (`'` → `''`). -- If `paramFormats` is empty or has length 1, treat all parameters as that format (usually 0 = text). - -#### Option B: Native parameterisation - -If the stackql execution engine supports parameterised queries, pass the values through directly. This avoids quoting issues and is more correct long-term. - -**Verification**: write a test that connects with pgx, runs a parameterised query, and checks the results: - -```go -rows, err := conn.Query(ctx, "SELECT $1::text, $2::int", "hello", 42) -``` - -### Step 4: Implement HandleDescribeStatement - -Client libraries call Describe after Parse to learn the result column types before any rows arrive. This allows typed scanning (e.g., pgx allocates `int32` vs `string` targets). - -Return parameter OIDs and result column metadata: - -```go -func (sb *YourBackend) HandleDescribeStatement(ctx context.Context, stmtName string, query string, paramOIDs []uint32) ([]uint32, []sqldata.ISQLColumn, error) { - columns, err := sb.planQuery(query) // derive columns from query planner / schema - if err != nil { - return nil, nil, err - } - return paramOIDs, columns, nil -} -``` - -Each `ISQLColumn` requires: -- `GetName()` — column name -- `GetObjectID()` — PostgreSQL type OID (e.g., `25` for text, `23` for int4, `16` for bool) -- `GetWidth()` — column width in bytes (use `-1` if variable) -- `GetTableId()`, `GetAttrNum()` — can be `0` if not applicable -- `GetTypeModifier()` — usually `-1` -- `GetFormat()` — `"text"` for text format - -If the query planner cannot derive columns (e.g., for DDL), return `nil` columns — the wire library sends `NoData`, which is valid. - -**Verification**: use pgx to prepare a statement and check that `FieldDescriptions()` returns the expected column metadata. - -### Step 5: Implement HandleDescribePortal - -Similar to `HandleDescribeStatement`, but for a bound portal. The portal already has its parameters bound, so column metadata may be more precise. In many cases this can delegate to the same logic: - -```go -func (sb *YourBackend) HandleDescribePortal(ctx context.Context, portalName string, stmtName string, query string, paramOIDs []uint32) ([]sqldata.ISQLColumn, error) { - _, columns, err := sb.HandleDescribeStatement(ctx, stmtName, query, paramOIDs) - return columns, err -} -``` - -### Step 6: Implement HandleParse with type resolution - -If stackql can resolve unspecified parameter types (OID = 0) from the query, do so in `HandleParse`. Otherwise, the current pass-through is fine — clients that send OID 0 will format parameters as text, which works with string interpolation. - -```go -func (sb *YourBackend) HandleParse(ctx context.Context, stmtName string, query string, paramOIDs []uint32) ([]uint32, error) { - resolved, err := sb.resolveParamTypes(query, paramOIDs) - if err != nil { - return nil, err - } - return resolved, nil -} -``` - -### Step 7: Implement HandleBind with validation - -If stackql can validate parameter values at bind time, do so here. Errors returned from `HandleBind` are reported to the client before execution, and the connection enters error recovery mode (messages discarded until Sync). This gives the client a chance to re-bind with corrected values. - -### Step 8: Implement HandleCloseStatement / HandleClosePortal - -If stackql caches query plans or intermediate state, release them here. If not, the no-ops from step 1 are correct. - -## Error recovery - -The wire library handles error recovery automatically. If any `IExtendedQueryBackend` method returns an error: - -1. An `ErrorResponse` is sent to the client. -2. All subsequent messages are discarded until the client sends `Sync`. -3. `Sync` sends `ReadyForQuery('E')` (failed transaction status). -4. The client can then retry or issue new commands. - -Backend methods should return errors freely. They do not need to manage connection state. - -## Testing strategy - -Each step should be verified independently: - -1. **Step 1 (stubs)**: full robot test suite — no regressions. -2. **Step 3 (execute)**: pgx test with parameterised `SELECT $1::text` — returns correct value. -3. **Step 4 (describe)**: pgx `Prepare` + check `FieldDescriptions()` — column names and OIDs match. -4. **Step 5 (describe portal)**: pgx query with `QueryRow` — typed scan works without explicit type hints. -5. **Step 6 (parse)**: pgx query with untyped parameters — server resolves types, client formats correctly. - -For each step, a pgx integration test is the most realistic validation since pgx exercises the full Parse → Describe → Bind → Execute → Sync pipeline. - -## Common OIDs for reference - -| Type | OID | Go type | -|---------|------|---------------| -| bool | 16 | bool | -| int2 | 21 | int16 | -| int4 | 23 | int32 | -| int8 | 20 | int64 | -| float4 | 700 | float32 | -| float8 | 701 | float64 | -| text | 25 | string | -| varchar | 1043 | string | -| json | 114 | string/[]byte | -| jsonb | 3802 | string/[]byte | - -These are defined in `github.com/lib/pq/oid` as `oid.T_bool`, `oid.T_int4`, etc. diff --git a/extended_query.go b/extended_query.go index 52457f7..9dd74c7 100644 --- a/extended_query.go +++ b/extended_query.go @@ -214,8 +214,9 @@ func (srv *Server) handleDescribeStatement(ctx context.Context, conn SQLConnecti } // Send RowDescription or NoData + // Describe on a statement has no result formats yet (Bind hasn't happened) if columns != nil { - return writeRowDescriptionFromSQLColumns(ctx, conn, columns) + return writeRowDescriptionFromSQLColumns(ctx, conn, columns, nil) } return writeNoData(conn) } @@ -238,7 +239,7 @@ func (srv *Server) handleDescribePortal(ctx context.Context, conn SQLConnection, } if columns != nil { - return writeRowDescriptionFromSQLColumns(ctx, conn, columns) + return writeRowDescriptionFromSQLColumns(ctx, conn, columns, portal.ResultFormats) } return writeNoData(conn) } @@ -291,8 +292,9 @@ func (srv *Server) handleExecute(ctx context.Context, conn SQLConnection) error } dw := &dataWriter{ - ctx: ctx, - client: conn, + ctx: ctx, + client: conn, + resultFormats: portal.ResultFormats, } var headersWritten bool @@ -307,7 +309,7 @@ func (srv *Server) handleExecute(ctx context.Context, conn SQLConnection) error } if !headersWritten { headersWritten = true - srv.writeSQLResultHeader(ctx, res, dw) + srv.writeSQLResultHeader(ctx, res, dw, portal.ResultFormats) } srv.writeSQLResultRows(ctx, res, dw) dw.Complete(notices, "OK") @@ -416,17 +418,35 @@ func writeParameterDescription(writer buffer.Writer, paramOIDs []uint32) error { return writer.End() } -func writeRowDescriptionFromSQLColumns(ctx context.Context, writer buffer.Writer, columns []sqldata.ISQLColumn) error { +func writeRowDescriptionFromSQLColumns(ctx context.Context, writer buffer.Writer, columns []sqldata.ISQLColumn, resultFormats []int16) error { var colz Columns - for _, c := range columns { + for i, c := range columns { colz = append(colz, Column{ Table: c.GetTableId(), Name: c.GetName(), AttrNo: c.GetAttrNum(), Oid: oid.Oid(c.GetObjectID()), Width: c.GetWidth(), - Format: TextFormat, + Format: resolveResultFormat(resultFormats, i), }) } return colz.Define(ctx, writer) } + +// resolveResultFormat determines the format code for column i based on the +// result format codes from the Bind message, per the PostgreSQL protocol: +// - 0 format codes: all columns use text +// - 1 format code: all columns use that format +// - N format codes: each column uses its corresponding format +func resolveResultFormat(formats []int16, i int) FormatCode { + if len(formats) == 0 { + return TextFormat + } + if len(formats) == 1 { + return FormatCode(formats[0]) + } + if i < len(formats) { + return FormatCode(formats[i]) + } + return TextFormat +} diff --git a/row.go b/row.go index de852dc..28eb0c1 100644 --- a/row.go +++ b/row.go @@ -78,12 +78,32 @@ func (column Column) Define(ctx context.Context, writer buffer.Writer) { // Write encodes the given source value using the column type definition and connection // info. The encoded byte buffer is added to the given write buffer. This method -// Is used to encode values and return them inside a DataRow message. +// is used to encode values and return them inside a DataRow message. +// +// For text format (FormatCode=0), if the source is already a string or []byte, +// the raw bytes are written directly without going through pgtype encoding. +// This preserves the exact representation from the backend (e.g. sqlite's "t"/"f" +// for booleans) while still advertising the correct OID in RowDescription. func (column Column) Write(ctx context.Context, writer buffer.Writer, src interface{}) (err error) { if ctx.Err() != nil { return ctx.Err() } + // For text format, bypass pgtype encoding when the source is already + // a string or []byte. This avoids representation changes (e.g. pgtype.Bool + // encoding "true"/"false" instead of the original "t"/"f"). + if column.Format == TextFormat { + if b, ok := asTextBytes(src); ok { + if b == nil { + writer.AddInt32(-1) // NULL + return nil + } + writer.AddInt32(int32(len(b))) + writer.AddBytes(b) + return nil + } + } + ci := TypeInfo(ctx) if ci == nil { return errors.New("postgres connection info has not been defined inside the given context") @@ -120,3 +140,19 @@ func (column Column) Write(ctx context.Context, writer buffer.Writer, src interf return nil } + +// asTextBytes extracts raw bytes from string or []byte sources for text-format +// passthrough. Returns (nil, true) for NULL-representing strings. +func asTextBytes(src interface{}) ([]byte, bool) { + switch v := src.(type) { + case string: + if strings.ToLower(v) == "null" { + return nil, true + } + return []byte(v), true + case []byte: + return v, true + default: + return nil, false + } +} diff --git a/row_test.go b/row_test.go new file mode 100644 index 0000000..1250866 --- /dev/null +++ b/row_test.go @@ -0,0 +1,293 @@ +package wire + +import ( + "bytes" + "context" + "encoding/binary" + "testing" + + "github.com/lib/pq/oid" + "github.com/stackql/psql-wire/internal/buffer" +) + +// readDataRowValues reads DataRow column values from raw bytes written by Columns.Write. +// Returns the raw byte slices for each column (nil for NULL). +func readDataRowValues(t *testing.T, data []byte) [][]byte { + t.Helper() + r := bytes.NewReader(data) + + // ServerDataRow type byte + msgType := make([]byte, 1) + if _, err := r.Read(msgType); err != nil { + t.Fatalf("read type byte: %v", err) + } + + // message length (int32) + var msgLen int32 + if err := binary.Read(r, binary.BigEndian, &msgLen); err != nil { + t.Fatalf("read msg length: %v", err) + } + + // column count (int16) + var numCols int16 + if err := binary.Read(r, binary.BigEndian, &numCols); err != nil { + t.Fatalf("read column count: %v", err) + } + + values := make([][]byte, numCols) + for i := 0; i < int(numCols); i++ { + var length int32 + if err := binary.Read(r, binary.BigEndian, &length); err != nil { + t.Fatalf("read value length for col %d: %v", i, err) + } + if length == -1 { + values[i] = nil + } else { + val := make([]byte, length) + if _, err := r.Read(val); err != nil { + t.Fatalf("read value for col %d: %v", i, err) + } + values[i] = val + } + } + return values +} + +func TestAsTextBytes(t *testing.T) { + tests := []struct { + name string + src interface{} + wantData []byte + wantOK bool + }{ + { + name: "string value", + src: "hello", + wantData: []byte("hello"), + wantOK: true, + }, + { + name: "byte slice", + src: []byte("world"), + wantData: []byte("world"), + wantOK: true, + }, + { + name: "null string", + src: "null", + wantData: nil, + wantOK: true, + }, + { + name: "NULL string uppercase", + src: "NULL", + wantData: nil, + wantOK: true, + }, + { + name: "integer not matched", + src: 42, + wantData: nil, + wantOK: false, + }, + { + name: "bool not matched", + src: true, + wantData: nil, + wantOK: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + data, ok := asTextBytes(tt.src) + if ok != tt.wantOK { + t.Fatalf("ok = %v, want %v", ok, tt.wantOK) + } + if !bytes.Equal(data, tt.wantData) { + t.Fatalf("data = %q, want %q", data, tt.wantData) + } + }) + } +} + +func TestColumnWrite_TextBypass_PreservesExactString(t *testing.T) { + ctx := setTypeInfo(context.Background()) + + tests := []struct { + name string + colOid oid.Oid + src interface{} + wantText string + }{ + { + name: "bool column with sqlite t/f preserved", + colOid: oid.T_bool, + src: "t", + wantText: "t", + }, + { + name: "bool column with 1/0 preserved", + colOid: oid.T_bool, + src: "1", + wantText: "1", + }, + { + name: "int8 column with string preserved exactly", + colOid: oid.T_int8, + src: "100000001", + wantText: "100000001", + }, + { + name: "text column with plain string", + colOid: oid.T_text, + src: "hello world", + wantText: "hello world", + }, + { + name: "int4 column with byte slice", + colOid: oid.T_int4, + src: []byte("42"), + wantText: "42", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + col := Column{ + Name: "test", + Oid: tt.colOid, + Width: -1, + Format: TextFormat, + } + + var buf bytes.Buffer + writer := buffer.NewWriter(&buf) + + // Wrap in a DataRow message to get valid framing + cols := Columns{col} + err := cols.Write(ctx, writer, []interface{}{tt.src}) + if err != nil { + t.Fatalf("Write error: %v", err) + } + + values := readDataRowValues(t, buf.Bytes()) + if len(values) != 1 { + t.Fatalf("got %d values, want 1", len(values)) + } + + got := string(values[0]) + if got != tt.wantText { + t.Errorf("got %q, want %q (OID %d)", got, tt.wantText, tt.colOid) + } + }) + } +} + +func TestColumnWrite_TextBypass_NullHandling(t *testing.T) { + ctx := setTypeInfo(context.Background()) + + col := Column{ + Name: "test", + Oid: oid.T_text, + Width: -1, + Format: TextFormat, + } + + var buf bytes.Buffer + writer := buffer.NewWriter(&buf) + + cols := Columns{col} + err := cols.Write(ctx, writer, []interface{}{"null"}) + if err != nil { + t.Fatalf("Write error: %v", err) + } + + values := readDataRowValues(t, buf.Bytes()) + if values[0] != nil { + t.Errorf("expected NULL (nil), got %q", values[0]) + } +} + +func TestColumnWrite_NonString_UsesStandardEncoder(t *testing.T) { + ctx := setTypeInfo(context.Background()) + + // When source is a native Go type (not string/[]byte), + // the standard pgtype encoder should be used. + col := Column{ + Name: "test", + Oid: oid.T_int4, + Width: 4, + Format: TextFormat, + } + + var buf bytes.Buffer + writer := buffer.NewWriter(&buf) + + cols := Columns{col} + err := cols.Write(ctx, writer, []interface{}{int32(42)}) + if err != nil { + t.Fatalf("Write error: %v", err) + } + + values := readDataRowValues(t, buf.Bytes()) + got := string(values[0]) + if got != "42" { + t.Errorf("got %q, want %q", got, "42") + } +} + +func TestResolveResultFormat(t *testing.T) { + tests := []struct { + name string + formats []int16 + index int + want FormatCode + }{ + { + name: "empty formats defaults to text", + formats: nil, + index: 0, + want: TextFormat, + }, + { + name: "single format applies to all columns", + formats: []int16{1}, + index: 0, + want: BinaryFormat, + }, + { + name: "single format applies to column 5", + formats: []int16{1}, + index: 5, + want: BinaryFormat, + }, + { + name: "per-column format code 0", + formats: []int16{0, 1, 0}, + index: 0, + want: TextFormat, + }, + { + name: "per-column format code 1", + formats: []int16{0, 1, 0}, + index: 1, + want: BinaryFormat, + }, + { + name: "index beyond formats defaults to text", + formats: []int16{1, 0}, + index: 5, + want: TextFormat, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := resolveResultFormat(tt.formats, tt.index) + if got != tt.want { + t.Errorf("resolveResultFormat(%v, %d) = %d, want %d", tt.formats, tt.index, got, tt.want) + } + }) + } +} diff --git a/writer.go b/writer.go index 197bd00..a065dc0 100644 --- a/writer.go +++ b/writer.go @@ -49,11 +49,12 @@ var ErrClosedWriter = errors.New("closed writer") // dataWriter is a implementation of the DataWriter interface. type dataWriter struct { - columns Columns - ctx context.Context - client buffer.Writer - closed bool - written uint64 + columns Columns + ctx context.Context + client buffer.Writer + closed bool + written uint64 + resultFormats []int16 // from Bind message; nil means all text } func (writer *dataWriter) Define(columns Columns) error {