add repository draft
This commit is contained in:
@@ -0,0 +1,24 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
_ "github.com/jackc/pgx/v5/stdlib"
|
||||
)
|
||||
|
||||
func NewPGXPool(datasource string) *pgxpool.Pool {
|
||||
dbPool, err := pgxpool.New(context.Background(), datasource)
|
||||
if err != nil {
|
||||
slog.Error("error connecting to database", "error", err, "datasource", datasource)
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := dbPool.Ping(context.Background()); err != nil {
|
||||
slog.Error("error pinging database, maybe incorrect datasource", "error", err, "datasource", datasource)
|
||||
panic(err)
|
||||
}
|
||||
slog.Info("connected to database", "datasource", datasource)
|
||||
return dbPool
|
||||
}
|
||||
@@ -30,14 +30,14 @@ func (mt *MeteoData) validate() error {
|
||||
}
|
||||
|
||||
type FileIngest interface {
|
||||
Parse(io io.Reader, fs *FileStats) ([]MeteoData, []RejectedMeteoData, error)
|
||||
Parse(io io.Reader) ([]MeteoData, []RejectedMeteoData, error)
|
||||
}
|
||||
|
||||
type CSV struct{}
|
||||
|
||||
var _ FileIngest = (*CSV)(nil)
|
||||
|
||||
func (c *CSV) Parse(r io.Reader, fs *FileStats) ([]MeteoData, []RejectedMeteoData, error) {
|
||||
func (c *CSV) Parse(r io.Reader) ([]MeteoData, []RejectedMeteoData, error) {
|
||||
reader := csv.NewReader(r)
|
||||
reader.Comma = ';'
|
||||
reader.TrimLeadingSpace = true
|
||||
@@ -74,7 +74,6 @@ func (c *CSV) Parse(r io.Reader, fs *FileStats) ([]MeteoData, []RejectedMeteoDat
|
||||
|
||||
meteoData, err := normalize(record)
|
||||
if err != nil {
|
||||
fs.RowsRejected++
|
||||
rejectedDataList = append(rejectedDataList, RejectedMeteoData{
|
||||
RowValue: rowValue,
|
||||
Reason: err.Error(),
|
||||
@@ -83,7 +82,6 @@ func (c *CSV) Parse(r io.Reader, fs *FileStats) ([]MeteoData, []RejectedMeteoDat
|
||||
}
|
||||
|
||||
if err := meteoData.validate(); err != nil {
|
||||
fs.RowsRejected++
|
||||
rejectedDataList = append(rejectedDataList, RejectedMeteoData{
|
||||
RowValue: rowValue,
|
||||
Reason: err.Error(),
|
||||
@@ -92,7 +90,6 @@ func (c *CSV) Parse(r io.Reader, fs *FileStats) ([]MeteoData, []RejectedMeteoDat
|
||||
}
|
||||
|
||||
meteoDataList = append(meteoDataList, *meteoData)
|
||||
fs.RowsInserted++
|
||||
}
|
||||
|
||||
return meteoDataList, rejectedDataList, nil
|
||||
|
||||
@@ -128,7 +128,7 @@ func Test_CSV_ParseFile(t *testing.T) {
|
||||
|
||||
csvIngest := &meteo.CSV{}
|
||||
fileStats := &meteo.FileStats{}
|
||||
inserted, rejected, err := csvIngest.Parse(file, fileStats)
|
||||
inserted, rejected, err := csvIngest.Parse(file)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tt.expectedInserted, fileStats.RowsInserted)
|
||||
|
||||
@@ -13,12 +13,12 @@ import (
|
||||
)
|
||||
|
||||
type Handler struct {
|
||||
csv *CSV
|
||||
*Service
|
||||
}
|
||||
|
||||
func NewHandler() *Handler {
|
||||
func NewHandler(service *Service) *Handler {
|
||||
return &Handler{
|
||||
csv: &CSV{},
|
||||
Service: service,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,8 +50,7 @@ func (h *Handler) IngestCSV(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
inserted, rejected, err := h.csv.Parse(bytes.NewReader(content), fileStats)
|
||||
err = h.Service.IngestCSV(bytes.NewReader(content), fileStats)
|
||||
if err != nil {
|
||||
slog.Error(ErrCannotParseFile.Error(),
|
||||
"filename", header.Filename,
|
||||
@@ -59,7 +58,6 @@ func (h *Handler) IngestCSV(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
fileStats.ElapsedMS = int(time.Since(start).Milliseconds())
|
||||
|
||||
slog.Info("CSV file processed",
|
||||
@@ -68,8 +66,6 @@ func (h *Handler) IngestCSV(w http.ResponseWriter, r *http.Request) {
|
||||
"rows_rejected", fileStats.RowsRejected,
|
||||
"elapsed_ms", fileStats.ElapsedMS,
|
||||
"file_checksum", fileStats.FileChecksum,
|
||||
"inserted_data", inserted,
|
||||
"rejected_data", rejected,
|
||||
)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
package meteo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
b "github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
type Repository interface {
|
||||
InsertAcceptedMeteoData(data []MeteoData) (int, error)
|
||||
InsertRejectedMeteoData(data []RejectedMeteoData) (int, error)
|
||||
}
|
||||
|
||||
type pgxRepo struct {
|
||||
*pgxpool.Pool
|
||||
}
|
||||
|
||||
func NewPGXRepo(pool *pgxpool.Pool) Repository {
|
||||
return &pgxRepo{
|
||||
pool,
|
||||
}
|
||||
}
|
||||
|
||||
const insertAcceptedMeteoData = `insert into public.meteo_data (location_id, max_temp, min_temp, rainfall, cloudiness, created_at) values ($1, $2, $3, $4, $5, $6) returning id`
|
||||
|
||||
func (pgx *pgxRepo) InsertAcceptedMeteoData(data []MeteoData) (int, error) {
|
||||
// TODO: pass context
|
||||
// TODO improve transaction
|
||||
tx, err := pgx.Begin(context.Background())
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error starting transaction: %w", err)
|
||||
}
|
||||
defer tx.Rollback(context.Background())
|
||||
|
||||
batch := &b.Batch{}
|
||||
|
||||
for _, d := range data {
|
||||
batch.Queue(insertAcceptedMeteoData, 1, d.MaxTemp, d.MinTemp, d.Rainfall, d.Cloudiness, d.Timestamp)
|
||||
}
|
||||
|
||||
results := tx.SendBatch(context.Background(), batch)
|
||||
|
||||
for i := range data {
|
||||
_, err := results.Exec()
|
||||
if err != nil {
|
||||
results.Close()
|
||||
return 0, fmt.Errorf("error executing batch command %d: %w", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
results.Close()
|
||||
|
||||
if err = tx.Commit(context.Background()); err != nil {
|
||||
return 0, fmt.Errorf("error committing transaction: %w", err)
|
||||
}
|
||||
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (pgx *pgxRepo) InsertRejectedMeteoData(data []RejectedMeteoData) (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package meteo
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
csv CSV
|
||||
repo Repository
|
||||
}
|
||||
|
||||
func NewService(repo Repository) *Service {
|
||||
return &Service{
|
||||
csv: CSV{},
|
||||
repo: repo,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) IngestCSV(r io.Reader, fs *FileStats) error {
|
||||
accepted, rejected, err := s.csv.Parse(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO call insertToDB
|
||||
s.insertAcceptedToDB(accepted, fs)
|
||||
s.insertRejectedToDB(rejected, fs)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) insertAcceptedToDB(data []MeteoData, fs *FileStats) error {
|
||||
var err error
|
||||
fs.RowsInserted, err = s.repo.InsertAcceptedMeteoData(data)
|
||||
if err != nil {
|
||||
slog.Error("error", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) insertRejectedToDB(data []RejectedMeteoData, fs *FileStats) error {
|
||||
var err error
|
||||
fs.RowsRejected, err = s.repo.InsertRejectedMeteoData(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user