在Go语言开发中,将CSV数据批量导入MS SQL数据库是常见需求,但稍有不慎就可能出现部分记录丢失的问题,需要针对性设计健壮性方案来规避风险。

常见记录丢失原因
首先明确导致记录丢失的核心诱因,才能针对性解决问题:
- CSV文件存在格式异常,比如字段包含未转义的逗号、换行符,导致解析出的数据行不完整
- 字段类型不匹配,比如CSV中的字符串内容超出MS SQL对应字段的长度限制,或者数字格式错误无法转换
- 批量插入时遇到单条记录违反数据库约束(如唯一键冲突、非空约束),导致整个批次插入失败回滚
- 未使用事务或者事务粒度不合理,中途出现异常时已经插入的部分记录无法回滚,或者全部回滚导致有效记录丢失
- 未处理数据库连接的临时异常,插入过程中断后没有重试机制,丢失未处理的记录
健壮性实践方案
1. 数据预校验
在解析CSV后、插入数据库前,先对每一条记录做合法性校验,过滤掉不符合要求的记录并单独记录日志,避免不合格数据进入插入流程。
package main
import (
"encoding/csv"
"fmt"
"os"
"strconv"
"strings"
)
// 定义CSV对应的数据结构
type UserRecord struct {
ID int
Name string
Age int
Email string
}
// 校验单条记录是否合法
func validateRecord(record []string) (*UserRecord, error) {
if len(record) != 4 {
return nil, fmt.Errorf("字段数量不足,期望4个,实际%d个", len(record))
}
// 校验ID是否为数字
id, err := strconv.Atoi(record[0])
if err != nil {
return nil, fmt.Errorf("ID字段格式错误:%s", record[0])
}
// 校验Name不为空且长度不超过50
name := strings.TrimSpace(record[1])
if name == "" {
return nil, fmt.Errorf("Name字段不能为空")
}
if len(name) > 50 {
return nil, fmt.Errorf("Name字段长度超过50")
}
// 校验Age是否为合法数字且在合理范围
age, err := strconv.Atoi(record[2])
if err != nil || age < 0 || age > 150 {
return nil, fmt.Errorf("Age字段不合法:%s", record[2])
}
// 校验Email格式(简单校验)
email := strings.TrimSpace(record[3])
if !strings.Contains(email, "@") {
return nil, fmt.Errorf("Email格式不合法:%s", email)
}
return &UserRecord{
ID: id,
Name: name,
Age: age,
Email: email,
}, nil
}
func main() {
// 打开CSV文件
file, err := os.Open("user_data.csv")
if err != nil {
fmt.Printf("打开文件失败:%v\n", err)
return
}
defer file.Close()
reader := csv.NewReader(file)
// 设置允许字段包含引号包裹的逗号
reader.LazyQuotes = true
// 读取所有记录
records, err := reader.ReadAll()
if err != nil {
fmt.Printf("读取CSV失败:%v\n", err)
return
}
// 存储合法记录和错误记录
var validRecords []*UserRecord
var errorRecords []string
for i, record := range records {
// 跳过表头
if i == 0 {
continue
}
valid, err := validateRecord(record)
if err != nil {
errorRecords = append(errorRecords, fmt.Sprintf("第%d行错误:%v,原始数据:%v", i+1, err, record))
continue
}
validRecords = append(validRecords, valid)
}
fmt.Printf("合法记录数:%d,错误记录数:%d\n", len(validRecords), len(errorRecords))
}2. 分批插入+单条错误隔离
不要一次性插入所有记录,而是按固定批次大小拆分,同时每个批次内部对单条记录做插入异常捕获,避免单条错误导致整个批次失败。
package main
import (
"database/sql"
"fmt"
_ "github.com/denisenkom/go-mssqldb"
"log"
)
// 插入单条记录,返回是否成功和错误信息
func insertSingleRecord(db *sql.DB, record *UserRecord) bool {
query := "INSERT INTO users (id, name, age, email) VALUES (@id, @name, @age, @email)"
_, err := db.Exec(query,
sql.Named("id", record.ID),
sql.Named("name", record.Name),
sql.Named("age", record.Age),
sql.Named("email", record.Email),
)
if err != nil {
fmt.Printf("插入记录失败,ID=%d,错误:%v\n", record.ID, err)
return false
}
return true
}
// 分批插入记录
func batchInsertRecords(db *sql.DB, records []*UserRecord, batchSize int) (successCount int, failCount int) {
for i := 0; i < len(records); i += batchSize {
end := i + batchSize
if end > len(records) {
end = len(records)
}
batch := records[i:end]
// 开启批次事务
tx, err := db.Begin()
if err != nil {
fmt.Printf("开启事务失败:%v\n", err)
failCount += len(batch)
continue
}
batchSuccess := 0
for _, record := range batch {
// 使用事务内的连接插入
query := "INSERT INTO users (id, name, age, email) VALUES (@id, @name, @age, @email)"
_, err := tx.Exec(query,
sql.Named("id", record.ID),
sql.Named("name", record.Name),
sql.Named("age", record.Age),
sql.Named("email", record.Email),
)
if err != nil {
fmt.Printf("批次内插入失败,ID=%d,错误:%v,跳过该记录\n", record.ID, err)
failCount++
} else {
batchSuccess++
}
}
// 提交事务,即使有部分失败也提交成功的记录
err = tx.Commit()
if err != nil {
fmt.Printf("提交事务失败:%v,回滚该批次\n", err)
tx.Rollback()
failCount += batchSuccess
} else {
successCount += batchSuccess
fmt.Printf("批次插入完成,成功%d条,失败%d条\n", batchSuccess, len(batch)-batchSuccess)
}
}
return
}3. 错误重试与日志记录
对于插入失败的记录,尤其是因为数据库连接临时异常导致的失败,可以加入重试机制,同时将所有错误记录写入日志文件,方便后续人工补录。
package main
import (
"fmt"
"log"
"os"
"time"
)
// 带重试的插入单条记录
func insertWithRetry(db *sql.DB, record *UserRecord, maxRetry int) bool {
for i := 0; i <= maxRetry; i++ {
query := "INSERT INTO users (id, name, age, email) VALUES (@id, @name, @age, @email)"
_, err := db.Exec(query,
sql.Named("id", record.ID),
sql.Named("name", record.Name),
sql.Named("age", record.Age),
sql.Named("email", record.Email),
)
if err == nil {
return true
}
if i < maxRetry {
fmt.Printf("插入ID=%d失败,第%d次重试,错误:%v\n", record.ID, i+1, err)
time.Sleep(time.Second * 2)
} else {
fmt.Printf("插入ID=%d重试%d次仍失败,错误:%v\n", record.ID, maxRetry, err)
// 写入错误日志
logErrorRecord(record, err)
return false
}
}
return false
}
// 记录错误记录到日志文件
func logErrorRecord(record *UserRecord, err error) {
f, err := os.OpenFile("import_error.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Printf("打开错误日志文件失败:%v\n", err)
return
}
defer f.Close()
log.SetOutput(f)
log.Printf("时间:%s,失败记录:ID=%d,Name=%s,Age=%d,Email=%s,错误:%v\n",
time.Now().Format("2006-01-02 15:04:05"),
record.ID, record.Name, record.Age, record.Email, err)
}总结
要避免Go语言导入CSV到MS SQL时记录丢失,核心是从数据校验、插入隔离、异常处理三个层面做设计:先过滤不合格数据,再通过分批+单条错误隔离避免批量失败,最后通过重试和日志记录兜底,确保所有有效记录都能被正确导入,异常记录可追溯补录。实际开发中可以根据业务需求调整批次大小、重试次数等参数,进一步提升导入流程的适配性。