Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 57 additions & 22 deletions cmd/airbyte-source/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,33 @@ func init() {

func ReadCommand(ch *Helper) *cobra.Command {
readCmd := &cobra.Command{
Use: "read",
Short: "Converts rows from a PlanetScale database into AirbyteRecordMessages",
Run: func(cmd *cobra.Command, args []string) {
Use: "read",
Short: "Converts rows from a PlanetScale database into AirbyteRecordMessages",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

ch.Logger = internal.NewLogger(cmd.OutOrStdout())
if readSourceConfigFilePath == "" {
fmt.Fprintf(cmd.ErrOrStderr(), "Please pass path to a valid source config file via the [%v] argument", "config")
os.Exit(1)
return fmt.Errorf("missing config file path")
}

if readSourceCatalogPath == "" {
fmt.Fprintf(cmd.OutOrStdout(), "Please pass path to a valid source catalog file via the [%v] argument", "config")
os.Exit(1)
return fmt.Errorf("missing catalog file path")
}

psc, err := parseSource(ch.FileReader, readSourceConfigFilePath)
if err != nil {
fmt.Fprintln(cmd.OutOrStdout(), "Please provide path to a valid configuration file")
return
return err
}

ch.Logger.Log(internal.LOGLEVEL_INFO, "Ensure database")
if err := ch.EnsureDB(psc); err != nil {
fmt.Fprintln(cmd.OutOrStdout(), "Unable to connect to PlanetScale Database")
return
return err
}

defer func() {
Expand All @@ -60,19 +61,19 @@ func ReadCommand(ch *Helper) *cobra.Command {
cs, err := checkConnectionStatus(ctx, ch.Database, psc)
if err != nil {
ch.Logger.ConnectionStatus(cs)
return
return err
}

ch.Logger.Log(internal.LOGLEVEL_INFO, "Reading catalog")
catalog, err := readCatalog(readSourceCatalogPath)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to read catalog: %+v", err))
os.Exit(1)
return fmt.Errorf("unable to read catalog: %w", err)
}

if len(catalog.Streams) == 0 {
ch.Logger.Log(internal.LOGLEVEL_ERROR, "Catalog has no streams")
return
return nil
}

state := ""
Expand All @@ -81,7 +82,7 @@ func ReadCommand(ch *Helper) *cobra.Command {
b, err := os.ReadFile(stateFilePath)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to read state : %v", err))
os.Exit(1)
return fmt.Errorf("unable to read state: %w", err)
}
state = string(b)
}
Expand All @@ -90,16 +91,17 @@ func ReadCommand(ch *Helper) *cobra.Command {
shards, err := ch.Database.ListShards(ctx, psc)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to list shards : %v", err))
os.Exit(1)
return fmt.Errorf("unable to list shards: %w", err)
}

ch.Logger.Log(internal.LOGLEVEL_INFO, "Reading state")
syncState, err := readState(state, psc, catalog.Streams, shards, ch.Logger)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to read state : %v", err))
os.Exit(1)
return fmt.Errorf("unable to read state: %w", err)
}

var readErr error
for _, configuredStream := range catalog.Streams {
keyspaceOrDatabase := configuredStream.Stream.Namespace
if keyspaceOrDatabase == "" {
Expand All @@ -109,33 +111,49 @@ func ReadCommand(ch *Helper) *cobra.Command {
streamState, ok := syncState.Streams[streamStateKey]
if !ok {
ch.Logger.Error(fmt.Sprintf("Unable to read state for stream %v", streamStateKey))
os.Exit(1)
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
return fmt.Errorf("unable to read state for stream %v", streamStateKey)
}

ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_STARTED)

streamFailed := false
for shardName, shardState := range streamState.Shards {
var tc *psdbconnectv1alpha1.TableCursor

tc, err = shardState.SerializedCursorToTableCursor(configuredStream)
ch.Logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Using serialized cursor for stream %s", streamStateKey))
if err != nil {
ch.Logger.Error(fmt.Sprintf("Invalid serialized cursor for stream %v, failed with [%v]", streamStateKey, err))
os.Exit(1)
streamFailed = true
break
}

sc, err := ch.Database.Read(ctx, cmd.OutOrStdout(), psc, configuredStream, tc)
if err != nil {
ch.Logger.Error(err.Error())
os.Exit(1)
streamFailed = true
break
}

if sc != nil {
// if we get any new state, we assign it here.
// otherwise, the older state is round-tripped back to Airbyte.
syncState.Streams[streamStateKey].Shards[shardName] = sc
}
ch.Logger.State(syncState)
}

// Always emit state to checkpoint whatever progress was made,
// including partial progress when only some shards succeeded.
ch.Logger.StreamState(keyspaceOrDatabase, configuredStream.Stream.Name, syncState.Streams[streamStateKey])

if streamFailed {
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
readErr = fmt.Errorf("read failed for stream %v", streamStateKey)
} else {
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_COMPLETE)
}
}

return readErr
},
}
readCmd.Flags().StringVar(&readSourceCatalogPath, "catalog", "", "Path to the PlanetScale catalog configuration")
Expand All @@ -153,9 +171,26 @@ func readState(state string, psc internal.PlanetScaleSource, streams []internal.
Streams: map[string]internal.ShardStates{},
}
if state != "" {
err := json.Unmarshal([]byte(state), &syncState)
if err != nil {
return syncState, err
// Try parsing as Airbyte v2 per-stream state array first
var perStreamStates []internal.AirbyteState
if err := json.Unmarshal([]byte(state), &perStreamStates); err == nil && len(perStreamStates) > 0 && perStreamStates[0].Type == internal.STATE_TYPE_STREAM {
logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Parsing Airbyte v2 per-stream state (%d streams)", len(perStreamStates)))
for _, s := range perStreamStates {
if s.Stream != nil && s.Stream.StreamState != nil {
ns := s.Stream.StreamDescriptor.Namespace
if ns == "" {
ns = psc.Database
}
key := ns + ":" + s.Stream.StreamDescriptor.Name
syncState.Streams[key] = *s.Stream.StreamState
}
}
} else {
// Fall back to legacy global state format
err := json.Unmarshal([]byte(state), &syncState)
if err != nil {
return syncState, err
}
}
}

Expand Down
Loading