Skip to content

Commit

Permalink
feat(influx_tools): Add export to parquet files
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Sep 9, 2024
1 parent 0582cf4 commit d150b4f
Show file tree
Hide file tree
Showing 5 changed files with 832 additions and 123 deletions.
6 changes: 6 additions & 0 deletions cmd/influx_tools/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
geninit "github.com/influxdata/influxdb/cmd/influx_tools/generate/init"
"github.com/influxdata/influxdb/cmd/influx_tools/help"
"github.com/influxdata/influxdb/cmd/influx_tools/importer"
"github.com/influxdata/influxdb/cmd/influx_tools/parquet"
"github.com/influxdata/influxdb/cmd/influx_tools/server"
"github.com/influxdata/influxdb/cmd/influxd/run"
"github.com/influxdata/influxdb/services/meta"
Expand Down Expand Up @@ -67,6 +68,11 @@ func (m *Main) Run(args ...string) error {
if err := c.Run(args); err != nil {
return fmt.Errorf("export failed: %s", err)
}
case "export-parquet":
c := parquet.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
return fmt.Errorf("export failed: %s", err)
}
case "import":
c := importer.NewCommand(&ossServer{logger: zap.NewNop()})
if err := c.Run(args); err != nil {
Expand Down
94 changes: 94 additions & 0 deletions cmd/influx_tools/parquet/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package parquet

import (
"context"
"errors"
"flag"
"fmt"
"io"
"os"

"go.uber.org/zap"

"github.com/influxdata/influxdb/cmd/influx_tools/server"
)

// Command represents the program execution for "store query".
type Command struct {
// Standard input/output, overridden for testing.
Stderr io.Writer
Logger *zap.Logger

server server.Interface
}

// NewCommand returns a new instance of the export Command.
func NewCommand(server server.Interface) *Command {
return &Command{
Stderr: os.Stderr,
server: server,
}
}

// Run executes the export command using the specified args.
func (cmd *Command) Run(args []string) (err error) {
var (
configPath string
database string
rp string
measurement string
output string
dryRun bool
)

cwd, err := os.Getwd()
if err != nil {
return fmt.Errorf("getting current working directory failed: %w", err)
}

flags := flag.NewFlagSet("export", flag.ContinueOnError)
flags.StringVar(&configPath, "config", "", "Config file")
flags.StringVar(&database, "database", "", "Database name")
flags.StringVar(&rp, "rp", "", "Retention policy name")
flags.StringVar(&measurement, "measurement", "*", "Measurement name")
flags.StringVar(&output, "output", cwd, "Output directory for parquet files")
flags.BoolVar(&dryRun, "print-only", false, "Print plan to stderr and exit")

if err := flags.Parse(args); err != nil {
return err
}

if database == "" {
return errors.New("database is required")
}

if err := cmd.server.Open(configPath); err != nil {
return err
}
defer cmd.server.Close()

cfg := &config{
Database: database,
RP: rp,
Measurement: measurement,
Output: output,
Stderr: cmd.Stderr,
}
exp, err := newExporter(cmd.server, cfg)
if err != nil {
return err
}

if err := exp.open(); err != nil {
return err
}
defer exp.close()

exp.printPlan()

if dryRun {
return nil
}

return exp.export(context.Background())
}
Loading

0 comments on commit d150b4f

Please sign in to comment.