这是一个用 Go 语言开发的通达信数据导出工具,可以将通达信的本地数据导出为多种格式,方便用户进行数据分析和处理。
主要功能
- 支持多种数据类型导出:
- 日线数据
- 5分钟线数据
- 1分钟线数据
- 支持多种导出格式:
- CSV 格式
- SQLite 数据库
- Excel 文件
- Postgres数据库连接导出
特点
- 图形化界面,操作简单直观
- 支持增量导出,避免重复处理
- 可配置数据源和导出路径
- 实时显示处理进度
- 支持批量处理大量数据
使用说明
- 设置通达信数据路径
- 选择要导出的数据类型
- 选择导出格式
- 点击导出按钮开始处理
技术栈
- Go 语言开发
- Fyne GUI 框架
- SQLite/CSV/Excel/postgres 数据处理
配置要求
- 需要本地安装通达信软件
- 需要有通达信的历史数据
注意事项
- 首次使用需要配置通达信数据路径
- 建议定期备份导出的数据
- 大量数据导出可能需要较长时间所有测试工作都在在mac中进行,源码可以在多平台运行.
- 程序目录结构
main.go
package mainimport ("fmt""os""path/filepath""strconv""tdx_exporter/config""tdx_exporter/tdx""time"_ "github.com/lib/pq""image/color""fyne.io/fyne/v2""fyne.io/fyne/v2/app""fyne.io/fyne/v2/container""fyne.io/fyne/v2/dialog""fyne.io/fyne/v2/layout""fyne.io/fyne/v2/theme""fyne.io/fyne/v2/widget"
)var (dayGroup *widget.CheckGroupminGroup *widget.CheckGroup
)// 创建自定义主题
type myTheme struct {fyne.Theme
}func (m myTheme) Color(name fyne.ThemeColorName, variant fyne.ThemeVariant) color.Color {if name == theme.ColorNameForeground {return color.NRGBA{R: 0, G: 0, B: 0, A: 255} // 黑色}return theme.DefaultTheme().Color(name, variant)
}func main() {// 设置中文编码os.Setenv("LANG", "zh_CN.UTF-8")myApp := app.New()// 设置中文字体myApp.Settings().SetTheme(&myTheme{theme.DefaultTheme()})myWindow := myApp.NewWindow("通达信数据工具")// 创建数据类型选择容器,分组显示dayGroup = widget.NewCheckGroup([]string{"日据"}, nil)dayGroup.SetSelected([]string{"日线数据"})minGroup = widget.NewCheckGroup([]string{"1分钟线", "5分钟线"}, nil)// 然后创建格式选择formatSelect := widget.NewSelect([]string{"Postgres", "CSV", "SQLite", "Excel"}, func(value string) {if value == "SQLite" || value == "Postgres" {// 默认选中所有选项,但保持可选状态if len(dayGroup.Selected) == 0 {dayGroup.SetSelected([]string{"日线数据"})}if len(minGroup.Selected) == 0 {minGroup.SetSelected([]string{"1分钟线", "5分钟线"})}// 移除禁用代码,保持控件可选dayGroup.Enable()minGroup.Enable()} else {// CSV或Excel格式时保持原有逻辑dayGroup.Enable()minGroup.Enable()}})formatSelect.SetSelected("Postgres")// 创建结果显示区域 - 修改为更大的显示区域resultArea := widget.NewMultiLineEntry()resultArea.Disable()resultArea.SetPlaceHolder("导出结果将在这里显示")resultArea.Resize(fyne.NewSize(580, 300)) // 设置更大的尺寸resultArea.SetMinRowsVisible(15) // 显示更多行// 创建按钮settingsBtn := widget.NewButtonWithIcon("设置", theme.SettingsIcon(), func() {showSettingsDialog(myWindow)})var exportBtn, updateBtn *widget.Button// 创建左侧布局leftPanel := container.NewVBox(widget.NewLabel("导出格式:"),formatSelect,widget.NewSeparator(), // 添加隔线widget.NewLabel("数据类型选择:"),dayGroup,minGroup,)exportBtn = widget.NewButtonWithIcon("导出", theme.DocumentSaveIcon(), func() {exportBtn.Disable()settingsBtn.Disable()formatSelect.Disable()startExport(myWindow, formatSelect.Selected, func() {exportBtn.Enable()settingsBtn.Enable()formatSelect.Enable()})})updateBtn = widget.NewButtonWithIcon("更新", theme.ViewRefreshIcon(), func() {updateBtn.Disable()settingsBtn.Disable()exportBtn.Disable()formatSelect.Disable()startUpdate(myWindow, func() {updateBtn.Enable()settingsBtn.Enable()exportBtn.Enable()formatSelect.Enable()})})// 创建按钮布局buttons := container.NewHBox(layout.NewSpacer(), // 添加弹性空间使按钮居中settingsBtn,exportBtn,updateBtn,layout.NewSpacer(),)// 创建 memo 控件memo := widget.NewMultiLineEntry()memo.Disable() // 设置为只读memo.SetPlaceHolder("提示信息将在这里显示")memo.SetMinRowsVisible(5) // 设置最小显示行数memo.SetText("欢迎使用通达信数据工具\n请选择要导出的数据类型和格式")// 创建主布局content := container.NewBorder(buttons,nil,container.NewPadded(leftPanel),nil,memo,)myWindow.SetContent(content)myWindow.Resize(fyne.NewSize(800, 400))myWindow.ShowAndRun()
}func showSettingsDialog(window fyne.Window) {settings, err := config.LoadSettings()if err != nil {dialog.ShowError(err, window)return}// 基本设置页面tdxPath := widget.NewEntry()tdxPath.SetText(settings.TdxPath)tdxPath.SetPlaceHolder("请输入通达信数据目路径")exportPath := widget.NewEntry()exportPath.SetText(settings.ExportPath)exportPath.SetPlaceHolder("请输入导出数据保存路径")// 数据库设置页面dbHost := widget.NewEntry()dbHost.SetText(settings.DBConfig.Host)dbHost.SetPlaceHolder("数据库主机地址")dbPort := widget.NewEntry()dbPort.SetText(fmt.Sprintf("%d", settings.DBConfig.Port))dbPort.SetPlaceHolder("端口号")dbUser := widget.NewEntry()dbUser.SetText(settings.DBConfig.User)dbUser.SetPlaceHolder("用户名")dbPassword := widget.NewPasswordEntry()dbPassword.SetText(settings.DBConfig.Password)dbPassword.SetPlaceHolder("密码")dbName := widget.NewEntry()dbName.SetText(settings.DBConfig.DBName)dbName.SetPlaceHolder("数据库名")testConnBtn := widget.NewButton("测试连接", func() {// ... 测试连接代码 ...})// 修改数据库设置页面布局dbSettings := container.NewVBox(// 添加测试连接按钮到顶部container.NewHBox(layout.NewSpacer(),testConnBtn,layout.NewSpacer(),),widget.NewSeparator(), // 分隔线container.NewGridWithColumns(2,container.NewVBox(widget.NewLabel("主机地址:"),dbHost,widget.NewLabel("用户名:"),dbUser,widget.NewLabel("数据库名:"),dbName,),container.NewVBox(widget.NewLabel("端口号:"),dbPort,widget.NewLabel("密码:"),dbPassword,),),)// 修改基本设置页面布局basicSettings := container.NewVBox(widget.NewLabel("通达信数据路径:"),container.NewPadded(tdxPath),widget.NewSeparator(),widget.NewLabel("导出数据保存路径:"),container.NewPadded(exportPath),)// 创建标签页tabs := container.NewAppTabs(container.NewTabItem("基本设置", container.NewPadded(basicSettings)),container.NewTabItem("数据库设置", container.NewPadded(dbSettings)),)tabs.SetTabLocation(container.TabLocationTop)dialog := dialog.NewCustomConfirm("参数设置","确定","取消",tabs,func(ok bool) {if !ok {return}port, _ := strconv.Atoi(dbPort.Text)newSettings := &config.Settings{TdxPath: tdxPath.Text,ExportPath: exportPath.Text,DBConfig: config.DBConfig{Host: dbHost.Text,Port: port,User: dbUser.Text,Password: dbPassword.Text,DBName: dbName.Text,},ExportPaths: settings.ExportPaths,}if err := config.SaveSettings(newSettings); err != nil {dialog.ShowError(err, window)return}dialog.ShowInformation("成功", "设置已保存", window)},window,)// 设置对话框大小dialog.Resize(fyne.NewSize(500, 400))dialog.Show()
}// 修改 startExport 函数
func startExport(window fyne.Window, format string, onComplete func()) {settings, err := config.LoadSettings()if err != nil {dialog.ShowError(err, window)onComplete()return}if settings.TdxPath == "" {dialog.ShowError(fmt.Errorf("请先在参数设置中设置通达信数据路径"), window)onComplete()return}go func() {processor := tdx.NewDataProcessor(settings.TdxPath)lastExportInfo, hasLastExport := settings.GetLastExportInfo(format)exportOpts := tdx.ExportOptions{IsIncremental: hasLastExport,LastExportTime: lastExportInfo.LastTime,DataTypes: tdx.DataTypes{Day: contains(dayGroup.Selected, "日线数据"),Min1: contains(minGroup.Selected, "1分钟线"),Min5: contains(minGroup.Selected, "5分钟线"),},TargetDir: settings.ExportPath,}var exportErr errorswitch format {case "Postgres":dbConfig := tdx.DBConfig{Host: settings.DBConfig.Host,Port: settings.DBConfig.Port,User: settings.DBConfig.User,Password: settings.DBConfig.Password,DBName: settings.DBConfig.DBName,}exportErr = processor.ExportToPostgres(dbConfig, exportOpts)case "CSV":exportErr = processor.ExportToCSV(settings.ExportPath, exportOpts)case "SQLite":outputPath := filepath.Join(settings.ExportPath, "export.db")exportErr = processor.ExportToSQLite(outputPath, exportOpts)case "Excel":outputPath := filepath.Join(settings.ExportPath, "export.xlsx")exportErr = processor.ExportToExcel(outputPath, exportOpts)}if exportErr != nil {dialog.ShowError(exportErr, window)} else {dialog.ShowInformation("成功", "数据导出完成", window)settings.UpdateExportInfo(format, settings.ExportPath, time.Now().Format("2006-01-02"))config.SaveSettings(settings)}onComplete()}()
}// 添加更新处理函数
func startUpdate(window fyne.Window, onComplete func()) {settings, err := config.LoadSettings()if err != nil {dialog.ShowError(err, window)onComplete()return}if settings.TdxPath == "" {dialog.ShowError(fmt.Errorf("请先在参数设置中设置通达信数据路径"), window)onComplete()return}go func() {processor := tdx.NewDataProcessor(settings.TdxPath)progress := dialog.NewProgress("更新数据", "正在更新...", window)progress.Show()progressCallback := func(stockCode string, current, total int) {progress.SetValue(float64(current) / float64(total))}if err := processor.UpdateData(progressCallback); err != nil {progress.Hide()dialog.ShowError(err, window)} else {progress.Hide()dialog.ShowInformation("成功", "数据更新完成", window)}onComplete()}()
}// 辅助函数:检查字符串是否在切片中
func contains(slice []string, str string) bool {for _, v := range slice {if v == str {return true}}return false
}
go.mod
module tdx_exportergo 1.23.1require (fyne.io/fyne/v2 v2.5.2github.com/lib/pq v1.10.9github.com/mattn/go-sqlite3 v1.14.24github.com/xuri/excelize/v2 v2.9.0
)require (fyne.io/systray v1.11.0 // indirectgithub.com/BurntSushi/toml v1.4.0 // indirectgithub.com/davecgh/go-spew v1.1.1 // indirectgithub.com/fredbi/uri v1.1.0 // indirectgithub.com/fsnotify/fsnotify v1.7.0 // indirectgithub.com/fyne-io/gl-js v0.0.0-20220119005834-d2da28d9ccfe // indirectgithub.com/fyne-io/glfw-js v0.0.0-20240101223322-6e1efdc71b7a // indirectgithub.com/fyne-io/image v0.0.0-20220602074514-4956b0afb3d2 // indirectgithub.com/go-gl/gl v0.0.0-20211210172815-726fda9656d6 // indirectgithub.com/go-gl/glfw/v3.3/glfw v0.0.0-20240506104042-037f3cc74f2a // indirectgithub.com/go-text/render v0.2.0 // indirectgithub.com/go-text/typesetting v0.2.0 // indirectgithub.com/godbus/dbus/v5 v5.1.0 // indirectgithub.com/gopherjs/gopherjs v1.17.2 // indirectgithub.com/jeandeaual/go-locale v0.0.0-20240223122105-ce5225dcaa49 // indirectgithub.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e // indirectgithub.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirectgithub.com/nicksnyder/go-i18n/v2 v2.4.0 // indirectgithub.com/pmezard/go-difflib v1.0.0 // indirectgithub.com/richardlehane/mscfb v1.0.4 // indirectgithub.com/richardlehane/msoleps v1.0.4 // indirectgithub.com/rymdport/portal v0.2.6 // indirectgithub.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c // indirectgithub.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef // indirectgithub.com/stretchr/testify v1.8.4 // indirectgithub.com/xuri/efp v0.0.0-20240408161823-9ad904a10d6d // indirectgithub.com/xuri/nfp v0.0.0-20240318013403-ab9948c2c4a7 // indirectgithub.com/yuin/goldmark v1.7.1 // indirectgolang.org/x/crypto v0.28.0 // indirectgolang.org/x/image v0.18.0 // indirectgolang.org/x/mobile v0.0.0-20231127183840-76ac6878050a // indirectgolang.org/x/net v0.30.0 // indirectgolang.org/x/sys v0.26.0 // indirectgolang.org/x/text v0.19.0 // indirectgopkg.in/yaml.v3 v3.0.1 // indirect
)
data_processor.go
package tdximport ("bytes""database/sql""encoding/binary""encoding/csv""errors""fmt""io""os""path/filepath""strconv""strings"_ "github.com/mattn/go-sqlite3""github.com/xuri/excelize/v2"
)type DBConfig struct {Host stringPort intUser stringPassword stringDBName string
}type DayData struct {Date string `json:"date"`Open float64 `json:"open"`High float64 `json:"high"`Low float64 `json:"low"`Close float64 `json:"close"`Amount float64 `json:"amount"`Volume int64 `json:"volume"`
}type TdxData struct {Date stringOpen float64High float64Low float64Close float64Volume int64Amount float64
}type DataProcessor struct {DataPath string
}type ExportOptions struct {LastExportTime stringIsIncremental boolTargetDir stringDataTypes DataTypesLogCallback LogCallback
}type LogCallback func(format string, args ...interface{})// 添加进度回调函数类型
type ProgressCallback func(stockCode string, current, total int)// 添加数据类型结构
type DataTypes struct {Day boolMin5 boolMin1 bool
}// 添加数据结构定义
type tdxMinRecord struct {Date uint16 // 日期,2字节Minute uint16 // 分钟,2字节Open float32 // 开盘价,4字节High float32 // 最高价,4字节Low float32 // 最低价,4字节Close float32 // 收盘价,4字节Amount float32 // 20-23字节:成交额(元),single floatVolume uint32 // 24-27字节:成交量(股),ulongReserved uint32 // 28-31字节:保留
}// 修改记录结构定义,分开日线和分钟线
type tdxDayRecord struct {Date uint32 // 日期,4字节,格式: YYYYMMDDOpen uint32 // 开盘价,4字节High uint32 // 最高价,4字节Low uint32 // 最低价,4字节Close uint32 // 收盘价,4字节Amount float32 // 成交额,4字节Volume uint32 // 成交量,4字节Reserved uint32 // 保留,4字节
}func NewDataProcessor(path string) *DataProcessor {return &DataProcessor{DataPath: path,}
}func (dp *DataProcessor) ExportToCSV(outputPath string, opts ExportOptions) error {// 使用传入的输出路径作为基础目录opts.TargetDir = outputPathreturn dp.TransformData(opts)
}func (dp *DataProcessor) ExportToSQLite(dbPath string, opts ExportOptions) error {log := opts.LogCallbackif log == nil {log = func(format string, args ...interface{}) {fmt.Printf(format+"\n", args...)}}log("创建SQLite数据库...")db, err := sql.Open("sqlite3", dbPath)if err != nil {return fmt.Errorf("创建SQLite数据库失败: %v", err)}defer db.Close()log("创建数据表...")if err := dp.createTables(db); err != nil {return fmt.Errorf("创建表失败: %v", err)}// 处理不同周期的数据if opts.DataTypes.Day {log("正在导出日线数据到SQLite...")if err := dp.exportDayDataToSQLite(db, log); err != nil {return fmt.Errorf("导出日线数据失败: %v", err)}}if opts.DataTypes.Min5 {log("正在导出5分钟数据到SQLite...")if err := dp.exportMinDataToSQLite(db, "5min", "fivemin", log); err != nil {return fmt.Errorf("导出5分钟数据失败: %v", err)}}if opts.DataTypes.Min1 {log("正在导出1分钟数据到SQLite...")if err := dp.exportMinDataToSQLite(db, "1min", "onemin", log); err != nil {return fmt.Errorf("导出1分钟数据失败: %v", err)}}log("数据导出完成")return nil
}func (dp *DataProcessor) createTables(db *sql.DB) error {// 日线数据表_, err := db.Exec(`CREATE TABLE IF NOT EXISTS stock_day_data (代码 TEXT,日期 TEXT,开盘价 REAL,最高价 REAL,最低价 REAL,收盘价 REAL,成交额 REAL,成交量 INTEGER,PRIMARY KEY (代码, 日期))`)if err != nil {return err}// 5分钟数据表_, err = db.Exec(`CREATE TABLE IF NOT EXISTS stock_5min_data (代码 TEXT,日期 TEXT,时间 TEXT,开盘价 REAL,最高价 REAL,最低价 REAL,收盘价 REAL,成交额 REAL,成交量 INTEGER,PRIMARY KEY (代码, 日期, 时间))`)if err != nil {return err}// 1分钟数据表_, err = db.Exec(`CREATE TABLE IF NOT EXISTS stock_1min_data (代码 TEXT,日期 TEXT,时间 TEXT,开盘价 REAL,最高价 REAL,最低价 REAL,收盘价 REAL,成交额 REAL,成交量 INTEGER,PRIMARY KEY (代码, 日期, 时间))`)return err
}func (dp *DataProcessor) exportMinDataToSQLite(db *sql.DB, period string, dirName string, log LogCallback) error {csvDir := filepath.Join(os.Getenv("HOME"), "tdx_export", dirName)files, err := os.ReadDir(csvDir)if err != nil {return fmt.Errorf("读取CSV目录失败: %v", err)}// 开始事务tx, err := db.Begin()if err != nil {return fmt.Errorf("开始事务失败: %v", err)}// 准备插入语句tableName := fmt.Sprintf("stock_%s_data", period)stmt, err := tx.Prepare(fmt.Sprintf(`INSERT OR REPLACE INTO %s (代码, 日期, 时间, 开盘价, 最高价, 最低价, 收盘价, 成交额, 成交量) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, tableName))if err != nil {tx.Rollback()return fmt.Errorf("准备SQL语句失败: %v", err)}defer stmt.Close()// 处理每个CSV文件fileCount := 0for _, file := range files {if !file.IsDir() && strings.HasSuffix(strings.ToLower(file.Name()), ".csv") {stockCode := strings.TrimSuffix(file.Name(), ".csv")if stockCode == "all_codes" {continue}fileCount++log("正在处理%s数据,股票代码:%s (%d/%d)", period, stockCode, fileCount, len(files)-1)// 读取CSV文件csvPath := filepath.Join(csvDir, file.Name())csvFile, err := os.Open(csvPath)if err != nil {tx.Rollback()return fmt.Errorf("打开CSV文件失败 %s: %v", file.Name(), err)}reader := csv.NewReader(csvFile)// 跳过标题行reader.Read()// 读取数据recordCount := 0for {record, err := reader.Read()if err == io.EOF {break}if err != nil {csvFile.Close()tx.Rollback()return fmt.Errorf("读取CSV记录失败: %v", err)}// 转换数据类型open, _ := strconv.ParseFloat(record[2], 64)high, _ := strconv.ParseFloat(record[3], 64)low, _ := strconv.ParseFloat(record[4], 64)close, _ := strconv.ParseFloat(record[5], 64)amount, _ := strconv.ParseFloat(record[6], 64)volume, _ := strconv.ParseInt(record[7], 10, 64)recordCount++// 插入数据_, err = stmt.Exec(stockCode,record[0], // 日期record[1], // 时间open,high,low,close,amount,volume,)if err != nil {csvFile.Close()tx.Rollback()return fmt.Errorf("插入数据失败: %v", err)}}log("完成处理 %s,共导入 %d 条记录", stockCode, recordCount)csvFile.Close()}}// 提交事务if err := tx.Commit(); err != nil {return fmt.Errorf("提交事务失败: %v", err)}log("完成导出%s数据,共处理 %d 个文件", period, fileCount)return nil
}func (dp *DataProcessor) exportDayDataToSQLite(db *sql.DB, log LogCallback) error {csvDir := filepath.Join(os.Getenv("HOME"), "tdx_export", "day")files, err := os.ReadDir(csvDir)if err != nil {log("读取CSV目录失败: %v", err)return fmt.Errorf("读取CSV目录失败: %v", err)}log("开始导出日线数据到SQLite...")tx, err := db.Begin()if err != nil {return fmt.Errorf("开始事务失败: %v", err)}stmt, err := tx.Prepare(`INSERT OR REPLACE INTO stock_day_data (代码, 日期, 开盘价, 最高价, 最低价, 收盘价, 成交额, 成交量) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`)if err != nil {tx.Rollback()return fmt.Errorf("准备SQL语句失败: %v", err)}defer stmt.Close()for _, file := range files {if !file.IsDir() && strings.HasSuffix(strings.ToLower(file.Name()), ".csv") {stockCode := strings.TrimSuffix(file.Name(), ".csv")if stockCode == "all_codes" {continue}log("正在处理股票: %s", stockCode)csvFile, err := os.Open(filepath.Join(csvDir, file.Name()))if err != nil {tx.Rollback()return fmt.Errorf("打开CSV文件失败 %s: %v", file.Name(), err)}reader := csv.NewReader(csvFile)reader.Read() // 跳过标题行for {record, err := reader.Read()if err == io.EOF {break}if err != nil {csvFile.Close()tx.Rollback()return fmt.Errorf("读取CSV记录失败: %v", err)}open, _ := strconv.ParseFloat(record[1], 64)high, _ := strconv.ParseFloat(record[2], 64)low, _ := strconv.ParseFloat(record[3], 64)close, _ := strconv.ParseFloat(record[4], 64)amount, _ := strconv.ParseFloat(record[5], 64)amount /= 100volume, _ := strconv.ParseInt(record[6], 10, 64)volume /= 100_, err = stmt.Exec(stockCode,record[0], // 日期open, high, low, close,amount, volume,)if err != nil {csvFile.Close()tx.Rollback()return fmt.Errorf("插入数据失败: %v", err)}}csvFile.Close()}}log("日线数据导出完成")return tx.Commit()
}func (dp *DataProcessor) ReadTdxData() ([]TdxData, error) {if dp.DataPath == "" {return nil, errors.New("通达信数据路径未设置")}// TODO: 实现通达信数据获取return nil, nil
}func (dp *DataProcessor) TransformData(opts ExportOptions) error {log := opts.LogCallbackif log == nil {log = func(format string, args ...interface{}) {fmt.Printf(format+"\n", args...)}}// 使用传入的输出路径baseDir := opts.TargetDirif baseDir == "" {baseDir = filepath.Join(os.Getenv("HOME"), "tdx_export")}// 创建不同时间周期的目录targetDirs := map[string]string{"day": filepath.Join(baseDir, "day"),"min1": filepath.Join(baseDir, "min1"),"min5": filepath.Join(baseDir, "min5"),}// 根据选择的数据类型过滤源var selectedSources []struct {path stringinterval string}// 根据用户选择添加数据源if opts.DataTypes.Day {selectedSources = append(selectedSources,struct{ path, interval string }{filepath.Join(dp.DataPath, "vipdoc", "sz", "lday"), "day"},struct{ path, interval string }{filepath.Join(dp.DataPath, "vipdoc", "sh", "lday"), "day"},)}if opts.DataTypes.Min1 {selectedSources = append(selectedSources,struct{ path, interval string }{filepath.Join(dp.DataPath, "vipdoc", "sz", "minline"), "min1"},struct{ path, interval string }{filepath.Join(dp.DataPath, "vipdoc", "sh", "minline"), "min1"},)}if opts.DataTypes.Min5 {selectedSources = append(selectedSources,struct{ path, interval string }{filepath.Join(dp.DataPath, "vipdoc", "sz", "fzline"), "min5"},struct{ path, interval string }{filepath.Join(dp.DataPath, "vipdoc", "sh", "fzline"), "min5"},)}// 确保目标目录存在for _, dir := range targetDirs {if err := os.MkdirAll(dir, 0755); err != nil {return fmt.Errorf("创建目标目录失败: %v", err)}}// 处理选中的数据源for _, source := range selectedSources {files, err := os.ReadDir(source.path)if err != nil {continue}for _, file := range files {if !file.IsDir() {var fileExt stringswitch source.interval {case "day":fileExt = ".day"case "min1":fileExt = ".lc1"case "min5":fileExt = ".lc5"}if strings.HasSuffix(strings.ToLower(file.Name()), fileExt) {if err := dp.convertToCSV(source.path, file.Name(), targetDirs[source.interval], source.interval); err != nil {continue}}}}}return nil
}func (dp *DataProcessor) convertToCSV(sourcePath, fileName, targetDir string, dataType string) error {// 读取源文件sourceFile, err := os.ReadFile(filepath.Join(sourcePath, fileName))if err != nil {return err}// 创建目标文件targetFile, err := os.Create(filepath.Join(targetDir, strings.TrimSuffix(fileName, filepath.Ext(fileName))+".csv"))if err != nil {return err}defer targetFile.Close()// 写入CSV头var header stringswitch dataType {case "day":header = "日期,开盘价,最高价,最低价,收盘价,成交额,成交量\n"case "min1", "min5":header = "日期,时间,开盘价,最高价,最低价,收盘价,成交额,成交量\n"}// 写入 UTF-8 BOM,确保 Excel 正确识别中文if _, err := targetFile.Write([]byte{0xEF, 0xBB, 0xBF}); err != nil {return fmt.Errorf("写入 BOM 失败: %v", err)}if _, err := targetFile.WriteString(header); err != nil {return fmt.Errorf("写入CSV头失败: %v", err)}// 处理记录recordSize := 32recordCount := len(sourceFile) / recordSizefor i := 0; i < recordCount; i++ {offset := i * recordSizevar line stringswitch dataType {case "day":var record tdxDayRecordbinary.Read(bytes.NewReader(sourceFile[offset:offset+recordSize]), binary.LittleEndian, &record)line = dp.formatDayRecord(record)case "min1", "min5":var record tdxMinRecordbinary.Read(bytes.NewReader(sourceFile[offset:offset+recordSize]), binary.LittleEndian, &record)line = dp.formatMinRecord(record)}targetFile.WriteString(line)}return nil
}// UpdateData 增量更新数据
func (dp *DataProcessor) UpdateData(progress ProgressCallback) error {// 取CSV文件目录csvDir := filepath.Join(os.Getenv("HOME"), "tdx_export", "day")// 读取所有股票列表codes, err := dp.readAllCodes(filepath.Join(csvDir, "all_codes.csv"))if err != nil {return fmt.Errorf("读取代码列表失败: %v", err)}for i, code := range codes {if progress != nil {progress(code, i+1, len(codes))}// 取现有CSV文件的最后一个日期csvPath := filepath.Join(csvDir, code+".csv")lastDate, err := dp.getLastDate(csvPath)if err != nil {return fmt.Errorf("读取文件 %s 失败: %v", code, err)}// 确定数据文件路径market := "sz"if strings.HasPrefix(code, "6") || strings.HasPrefix(code, "5") {market = "sh"}dayPath := filepath.Join(dp.DataPath, "vipdoc", market, "lday", code+".day")// 增量更新数据if err := dp.appendNewData(dayPath, csvPath, lastDate); err != nil {return fmt.Errorf("更新文件 %s 失败: %v", code, err)}}return nil
}// readAllCodes 读取代码列表文件
func (dp *DataProcessor) readAllCodes(filepath string) ([]string, error) {file, err := os.Open(filepath)if err != nil {return nil, err}defer file.Close()reader := csv.NewReader(file)records, err := reader.ReadAll()if err != nil {return nil, err}var codes []stringfor i, record := range records {if i == 0 { // 跳过标题行continue}codes = append(codes, record[0])}return codes, nil
}// getLastDate 获取CSV文件中最后一个日期
func (dp *DataProcessor) getLastDate(filepath string) (string, error) {file, err := os.Open(filepath)if err != nil {return "", err}defer file.Close()reader := csv.NewReader(file)var lastDate stringfor {record, err := reader.Read()if err == io.EOF {break}if err != nil {return "", err}if len(record) > 0 {lastDate = record[0] // 第一是日期}}return lastDate, nil
}// appendNewData 追加新数据到CSV文件
func (dp *DataProcessor) appendNewData(dayPath, csvPath, lastDate string) error {// 读取day文件dayFile, err := os.ReadFile(dayPath)if err != nil {return err}// 打开CSV文件用于追加csvFile, err := os.OpenFile(csvPath, os.O_APPEND|os.O_WRONLY, 0644)if err != nil {return err}defer csvFile.Close()// 处理每条记录recordSize := 32recordCount := len(dayFile) / recordSizefor i := 0; i < recordCount; i++ {offset := i * recordSizevar record tdxMinRecorderr := binary.Read(strings.NewReader(string(dayFile[offset:offset+recordSize])), binary.LittleEndian, &record)if err != nil {return err}// 转换日期 - 使用 YYYYMMDD 格式year := record.Date / 10000month := (record.Date % 10000) / 100day := record.Date % 100date := fmt.Sprintf("%d-%02d-%02d", year, month, day)// 只追加新数据if date <= lastDate {continue}// 写入新数据line := fmt.Sprintf("%s,%.2f,%.2f,%.2f,%.2f,%.2f,%d\n",date,float64(record.Open)/100.0,float64(record.High)/100.0,float64(record.Low)/100.0,float64(record.Close)/100.0,float64(record.Amount)/100.0,record.Volume)if _, err := csvFile.WriteString(line); err != nil {return err}}return nil
}func (dp *DataProcessor) formatDayRecord(record tdxDayRecord) string {// Format date: YYYYMMDDdate := fmt.Sprintf("%d-%02d-%02d",record.Date/10000,(record.Date%10000)/100,record.Date%100)// Day prices need to be divided by 100 to get the actual valuereturn fmt.Sprintf("%s,%.2f,%.2f,%.2f,%.2f,%.2f,%d\n",date,float64(record.Open)/100.0,float64(record.High)/100.0,float64(record.Low)/100.0,float64(record.Close)/100.0,float64(record.Amount)/100.0, // Amount is already in correct formatint(record.Volume)/100)
}func (dp *DataProcessor) formatMinRecord(record tdxMinRecord) string {// 解析日期year := 2004 + (record.Date / 2048)month := (record.Date % 2048) / 100day := record.Date % 2048 % 100date := fmt.Sprintf("%d-%02d-%02d", year, month, day)// 解析时间hour := record.Minute / 60minute := record.Minute % 60time := fmt.Sprintf("%02d:%02d", hour, minute)// 格式化输出,将日期和时间分为两个字段return fmt.Sprintf("%s,%s,%.2f,%.2f,%.2f,%.2f,%.2f,%d\n",date, // 日期字段time, // 时间字段record.Open, // 开盘价record.High, // 最高价record.Low, // 最低价record.Close, // 收盘价float64(record.Amount)/100, // 成交额record.Volume/100)
}func (dp *DataProcessor) ExportToExcel(outputPath string, opts ExportOptions) error {// 创建 Excel 主目录excelDir := filepath.Join(outputPath, "excel")// 创建不同时间周期的目录excelDirs := map[string]string{"day": filepath.Join(excelDir, "day"),"min1": filepath.Join(excelDir, "min1"),"min5": filepath.Join(excelDir, "min5"),}// 确保目标目录存在for _, dir := range excelDirs {if err := os.MkdirAll(dir, 0755); err != nil {return fmt.Errorf("创建Excel目录失败: %v", err)}}// 先导出到CSVif err := dp.TransformData(opts); err != nil {return fmt.Errorf("转换数据失败: %v", err)}// 处理不同周期的数据if opts.DataTypes.Day {if err := dp.exportDayDataToExcel(excelDirs["day"]); err != nil {return fmt.Errorf("导出日线数据失败: %v", err)}}if opts.DataTypes.Min5 {if err := dp.exportMinDataToExcel(excelDirs["min5"], "fivemin", "5分钟"); err != nil {return fmt.Errorf("导出5分钟数据失败: %v", err)}}if opts.DataTypes.Min1 {if err := dp.exportMinDataToExcel(excelDirs["min1"], "onemin", "1分钟"); err != nil {return fmt.Errorf("导出1分钟数据失败: %v", err)}}return nil
}func (dp *DataProcessor) exportMinDataToExcel(outputPath, dirName, sheetPrefix string) error {csvDir := filepath.Join(os.Getenv("HOME"), "tdx_export", dirName)files, err := os.ReadDir(csvDir)if err != nil {return fmt.Errorf("读取CSV目录失败: %v", err)}fileCount := 0for _, file := range files {if !file.IsDir() && strings.HasSuffix(strings.ToLower(file.Name()), ".csv") {stockCode := strings.TrimSuffix(file.Name(), ".csv")if stockCode == "all_codes" {continue}fileCount++fmt.Printf("正在处理%s数据,股票代码:%s (%d/%d)\n", sheetPrefix, stockCode, fileCount, len(files)-1)// 创建新的Excel文件f := excelize.NewFile()defer f.Close()// 设置默认sheet名称sheetName := fmt.Sprintf("%s_%s", stockCode, sheetPrefix)index, err := f.NewSheet(sheetName)if err != nil {return fmt.Errorf("创建Sheet失败: %v", err)}f.DeleteSheet("Sheet1")f.SetActiveSheet(index)// 写入表头headers := []string{"日期", "时间", "开盘价", "最高价", "最低价", "收盘价", "成交额", "成交量"}for i, header := range headers {cell := fmt.Sprintf("%c1", 'A'+i)f.SetCellValue(sheetName, cell, header)}// 读取CSV数据csvPath := filepath.Join(csvDir, file.Name())csvFile, err := os.Open(csvPath)if err != nil {return fmt.Errorf("打开CSV文件失败 %s: %v", file.Name(), err)}reader := csv.NewReader(csvFile)reader.Read() // 跳过标题行row := 2 // 从第2行开始写入数据for {record, err := reader.Read()if err == io.EOF {break}if err != nil {csvFile.Close()return fmt.Errorf("读取CSV记录失败: %v", err)}// 写入数据行for i, value := range record {cell := fmt.Sprintf("%c%d", 'A'+i, row)f.SetCellValue(sheetName, cell, value)}row++}csvFile.Close()// 保存Excel文件excelPath := filepath.Join(outputPath, fmt.Sprintf("%s.xlsx", stockCode))if err := f.SaveAs(excelPath); err != nil {return fmt.Errorf("保存Excel文件失败: %v", err)}}}fmt.Printf("完成导出%s数据,共处理 %d 个文件\n", sheetPrefix, fileCount)return nil
}func (dp *DataProcessor) exportDayDataToExcel(outputPath string) error {csvDir := filepath.Join(os.Getenv("HOME"), "tdx_export", "day")files, err := os.ReadDir(csvDir)if err != nil {return fmt.Errorf("读取CSV目录失败: %v", err)}fileCount := 0for _, file := range files {if !file.IsDir() && strings.HasSuffix(strings.ToLower(file.Name()), ".csv") {stockCode := strings.TrimSuffix(file.Name(), ".csv")if stockCode == "all_codes" {continue}fileCount++fmt.Printf("正在处理日线数据,股票代码:%s (%d/%d)\n", stockCode, fileCount, len(files)-1)// 创建新的Excel文件f := excelize.NewFile()defer f.Close()// 设置sheet名称sheetName := fmt.Sprintf("%s_日线", stockCode)index, err := f.NewSheet(sheetName)if err != nil {return fmt.Errorf("创建Sheet失败: %v", err)}f.DeleteSheet("Sheet1")f.SetActiveSheet(index)// 写入表头headers := []string{"日期", "开盘价", "最高价", "最低价", "收盘价", "成交额", "成交量"}for i, header := range headers {cell := fmt.Sprintf("%c1", 'A'+i)f.SetCellValue(sheetName, cell, header)}// 读取CSV数据csvPath := filepath.Join(csvDir, file.Name())csvFile, err := os.Open(csvPath)if err != nil {return fmt.Errorf("打开CSV文件失败 %s: %v", file.Name(), err)}reader := csv.NewReader(csvFile)reader.Read() // 跳过标题行row := 2 // 从第2行开始写入数据for {record, err := reader.Read()if err == io.EOF {break}if err != nil {csvFile.Close()return fmt.Errorf("读取CSV记录��败: %v", err)}// 写入数据行for i, value := range record {cell := fmt.Sprintf("%c%d", 'A'+i, row)f.SetCellValue(sheetName, cell, value)}row++}csvFile.Close()// 保存Excel文件excelPath := filepath.Join(outputPath, fmt.Sprintf("%s.xlsx", stockCode))if err := f.SaveAs(excelPath); err != nil {return fmt.Errorf("保存Excel文件失败: %v", err)}}}fmt.Printf("完成导出日线数据,共处理 %d 个文件\n", fileCount)return nil
}func (dp *DataProcessor) ExportToPostgres(dbConfig DBConfig, opts ExportOptions) error {connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",dbConfig.Host, dbConfig.Port, dbConfig.User, dbConfig.Password, dbConfig.DBName)db, err := sql.Open("postgres", connStr)if err != nil {return fmt.Errorf("连接数据库失败: %v", err)}defer db.Close()// 测试连接if err := db.Ping(); err != nil {return fmt.Errorf("数据库连接测试失败: %v", err)}// 根据选择创建对应的表if err := dp.createSelectedTables(db, opts.DataTypes); err != nil {return fmt.Errorf("创建表失败: %v", err)}// 先导出到CSVif err := dp.TransformData(opts); err != nil {return fmt.Errorf("转换数据失败: %v", err)}// 根据选择导入数据if opts.DataTypes.Day {fmt.Println("开始导入日线数据...")if err := dp.exportDayDataToPostgres(db); err != nil {return fmt.Errorf("导出日线数据失败: %v", err)}}if opts.DataTypes.Min5 {fmt.Println("开始导入5分钟线数据...")if err := dp.exportMinDataToPostgres(db, "5min", "fivemin"); err != nil {return fmt.Errorf("导出5分钟数据失败: %v", err)}}if opts.DataTypes.Min1 {fmt.Println("开始导入1分钟线数据...")if err := dp.exportMinDataToPostgres(db, "1min", "onemin"); err != nil {return fmt.Errorf("导出1分钟数据失败: %v", err)}}return nil
}// 只创建选中的表
func (dp *DataProcessor) createSelectedTables(db *sql.DB, types DataTypes) error {if types.Day {if err := dp.createDayTable(db); err != nil {return err}}if types.Min1 {if err := dp.createMinTable(db, "1min"); err != nil {return err}}if types.Min5 {if err := dp.createMinTable(db, "5min"); err != nil {return err}}return nil
}func (dp *DataProcessor) createDayTable(db *sql.DB) error {// 日线数据表_, err := db.Exec(`CREATE TABLE IF NOT EXISTS stock_day_data (代码 TEXT,日期 DATE,开盘价 NUMERIC(10,2),最高价 NUMERIC(10,2),最低价 NUMERIC(10,2),收盘价 NUMERIC(10,2),成交额 NUMERIC(16,2),成交量 BIGINT,CONSTRAINT stock_day_data_key UNIQUE (代码, 日期))`)if err != nil {return err}return nil
}func (dp *DataProcessor) createMinTable(db *sql.DB, period string) error {// 分钟线数据表_, err := db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS stock_%s_data (代码 TEXT,日期 DATE,时间 TIME,开盘价 NUMERIC(10,2),最高价 NUMERIC(10,2),最低价 NUMERIC(10,2),收盘价 NUMERIC(10,2),成交额 NUMERIC(16,2),成交量 BIGINT,CONSTRAINT stock_%s_data_key UNIQUE (代码, 日期, 时间))`, period, period))if err != nil {return err}return nil
}func (dp *DataProcessor) exportDayDataToPostgres(db *sql.DB) error {stmt, err := db.Prepare(`INSERT INTO stock_day_data (代码, 日期, 开盘价, 最高价, 最低价, 收盘价, 成交额, 成交量)VALUES ($1, $2, $3, $4, $5, $6, $7, $8)ON CONFLICT (代码, 日期) DO UPDATE SET开盘价 = EXCLUDED.开盘价,最高价 = EXCLUDED.最高价,最低价 = EXCLUDED.最低价,收盘价 = EXCLUDED.收盘价,成交额 = EXCLUDED.成交额,成交量 = EXCLUDED.成交量`)if err != nil {return err}defer stmt.Close()// 读取CSV文件并导入数据csvDir := filepath.Join(os.Getenv("HOME"), "tdx_export", "day")return dp.importCSVToPostgres(csvDir, stmt, false)
}func (dp *DataProcessor) exportMinDataToPostgres(db *sql.DB, period string, dirName string) error {stmt, err := db.Prepare(fmt.Sprintf(`INSERT INTO stock_%s_data (代码, 日期, 时间, 开盘价, 最高价, 最低价, 收盘价, 成交额, 成交量)VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)ON CONFLICT (代码, 日期, 时间) DO UPDATE SET开盘价 = EXCLUDED.开盘价,最高价 = EXCLUDED.最高价,最低价 = EXCLUDED.最低价,收���价 = EXCLUDED.收盘价,成交额 = EXCLUDED.成交额,成交量 = EXCLUDED.成交量`, period))if err != nil {return err}defer stmt.Close()// 读取CSV文件并导入数据csvDir := filepath.Join(os.Getenv("HOME"), "tdx_export", dirName)return dp.importCSVToPostgres(csvDir, stmt, true)
}func (dp *DataProcessor) importCSVToPostgres(csvDir string, stmt *sql.Stmt, hasTime bool) error {files, err := os.ReadDir(csvDir)if err != nil {return err}// 计算总文件数(排除 all_codes.csv)totalFiles := 0for _, file := range files {if !file.IsDir() && strings.HasSuffix(file.Name(), ".csv") && file.Name() != "all_codes.csv" {totalFiles++}}fmt.Printf("开始导入数据,共 %d 个文件需要处理\n", totalFiles)processedFiles := 0for _, file := range files {if !file.IsDir() && strings.HasSuffix(file.Name(), ".csv") {stockCode := strings.TrimSuffix(file.Name(), ".csv")if stockCode == "all_codes" {continue}processedFiles++fmt.Printf("正在处理 [%d/%d] %s\n", processedFiles, totalFiles, stockCode)csvFile, err := os.Open(filepath.Join(csvDir, file.Name()))if err != nil {return err}reader := csv.NewReader(csvFile)reader.Read() // 跳过标题行recordCount := 0for {record, err := reader.Read()if err == io.EOF {break}if err != nil {csvFile.Close()return err}// 转换数据类型values := make([]interface{}, 0)values = append(values, stockCode, record[0])if hasTime {values = append(values, record[1])record = record[2:]}for _, v := range record[1:] {val, _ := strconv.ParseFloat(v, 64)values = append(values, val)}if _, err := stmt.Exec(values...); err != nil {csvFile.Close()return err}recordCount++}csvFile.Close()fmt.Printf("完成处理 %s,导入 %d 条记录\n", stockCode, recordCount)}}fmt.Printf("数据导入完成,共处理 %d 个文件\n", processedFiles)return nil
}
settings.go
package configimport ("encoding/json""os""path/filepath"
)type ExportInfo struct {LastPath string `json:"last_path"` // 上次导出路径LastTime string `json:"last_time"` // 上次导出时间
}// 添加数据库连接配置结构
type DBConfig struct {Host string `json:"host"`Port int `json:"port"`User string `json:"user"`Password string `json:"password"`DBName string `json:"dbname"`
}type Settings struct {TdxPath string `json:"tdx_path"` // 通达信数据路径ExportPath string `json:"export_path"` // 导出数据保存路径ExportPaths map[string]ExportInfo `json:"export_paths"` // 不同格式的导出信息DBConfig DBConfig `json:"db_config"` // 数据库连接配置
}func NewSettings() *Settings {// 默认导出到用户目录下的 tdx_exporthomeDir, _ := os.UserHomeDir()return &Settings{ExportPath: filepath.Join(homeDir, "tdx_export"),ExportPaths: make(map[string]ExportInfo),DBConfig: DBConfig{Host: "localhost",Port: 5432,User: "postgres",DBName: "tdx_data",},}
}// UpdateExportInfo 更新导出信息
func (s *Settings) UpdateExportInfo(format, path string, exportTime string) {s.ExportPaths[format] = ExportInfo{LastPath: path,LastTime: exportTime,}
}// GetLastExportInfo 获取上次导出信息
func (s *Settings) GetLastExportInfo(format string) (ExportInfo, bool) {info, exists := s.ExportPaths[format]return info, exists
}func getConfigPath() string {// 获取当前工作目录currentDir, err := os.Getwd()if err != nil {return ""}// 创建配置目录configDir := filepath.Join(currentDir, "config")if err := os.MkdirAll(configDir, 0755); err != nil {return ""}// 返回配置文件完整路径return filepath.Join(configDir, "settings.json")
}func SaveSettings(settings *Settings) error {configPath := getConfigPath()// 格式化 JSON 以便于阅读和编辑data, err := json.MarshalIndent(settings, "", " ")if err != nil {return err}return os.WriteFile(configPath, data, 0644)
}func LoadSettings() (*Settings, error) {configPath := getConfigPath()// 如果配置文件不存在,创建默认配置if _, err := os.Stat(configPath); os.IsNotExist(err) {settings := NewSettings()if err := SaveSettings(settings); err != nil {return nil, err}return settings, nil}data, err := os.ReadFile(configPath)if err != nil {return NewSettings(), nil}var settings Settingsif err := json.Unmarshal(data, &settings); err != nil {return NewSettings(), nil}// 确保 ExportPaths 已初始化if settings.ExportPaths == nil {settings.ExportPaths = make(map[string]ExportInfo)}return &settings, nil
}
settings.json
{"tdx_path": "/Users/Apple/Downloads/tdx","export_path": "/Users/Apple/Downloads/tdx/exportdata","export_paths": {"CSV": {"last_path": "/Users/Apple/Downloads/tdx/exportdata","last_time": "2024-10-08"},"SQLite": {"last_path": "/Users/Apple/Downloads/tdx/exportdata","last_time": "2024-10-08"}},"db_config": {"host": "127.0.0.1","port": 5432,"user": "postgres","password": "postgres","dbname": "stock"}
}