chore: implement duration streaming repo methods

This commit is contained in:
Ferdinand Mütsch
2025-02-20 14:55:26 +01:00
parent b210b4d82c
commit 0ba7a838e8
4 changed files with 834 additions and 782 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -26,6 +26,16 @@ func (m *DurationRepositoryMock) GetAllWithinByFilters(t time.Time, t2 time.Time
return args.Get(0).([]*models.Duration), args.Error(1)
}
func (m *DurationRepositoryMock) StreamAllWithin(t time.Time, t2 time.Time, u *models.User) (chan *models.Duration, error) {
args := m.Called(t, t2, u)
return args.Get(0).(chan *models.Duration), args.Error(1)
}
func (m *DurationRepositoryMock) StreamAllWithinByFilters(t time.Time, t2 time.Time, u *models.User, m2 map[string][]string) (chan *models.Duration, error) {
args := m.Called(t, t2, u, m2)
return args.Get(0).(chan *models.Duration), args.Error(1)
}
func (m *DurationRepositoryMock) DeleteByUser(u *models.User) error {
args := m.Called(u)
return args.Error(0)

View File

@@ -17,7 +17,6 @@ func NewDurationRepository(db *gorm.DB) *DurationRepository {
return &DurationRepository{BaseRepository: NewBaseRepository(db), config: conf.Get()}
}
// TODO: refactor to streaming these instead of fetching as a big batch
func (r *DurationRepository) GetAllWithin(from, to time.Time, user *models.User) ([]*models.Duration, error) {
return r.GetAllWithinByFilters(from, to, user, map[string][]string{})
}
@@ -41,6 +40,37 @@ func (r *DurationRepository) GetAllWithinByFilters(from, to time.Time, user *mod
return durations, nil
}
// note: streaming is only sensible if results are aggregated in some way in the calling function, otherwise we'll end up with an entire list anyway
func (r *DurationRepository) StreamAllWithin(from, to time.Time, user *models.User) (chan *models.Duration, error) {
return r.StreamAllWithinByFilters(from, to, user, map[string][]string{})
}
func (r *DurationRepository) StreamAllWithinByFilters(from, to time.Time, user *models.User, filterMap map[string][]string) (chan *models.Duration, error) {
out := make(chan *models.Duration)
q := r.db.
Where(&models.Duration{UserID: user.ID}).
Where("time >= ?", from.Local()).
Where("time < ?", to.Local()).
Order("time asc")
if len(filterMap) > 0 {
q = filteredQuery(q, filterMap)
}
rows, err := r.db.Rows()
if err != nil {
return nil, err
}
go streamRows[models.Duration](rows, out, r.db, func(err error) {
conf.Log().Error("failed to scan duration row", "user", user.ID, "from", from, "to", to, "error", err, "filters", filterMap)
})
return out, nil
}
func (r *DurationRepository) InsertBatch(durations []*models.Duration) error {
return InsertBatchChunked[*models.Duration](durations, &models.Duration{}, r.db)
}

View File

@@ -51,6 +51,8 @@ type IDurationRepository interface {
InsertBatch([]*models.Duration) error
GetAllWithin(time.Time, time.Time, *models.User) ([]*models.Duration, error)
GetAllWithinByFilters(time.Time, time.Time, *models.User, map[string][]string) ([]*models.Duration, error)
StreamAllWithin(time.Time, time.Time, *models.User) (chan *models.Duration, error)
StreamAllWithinByFilters(time.Time, time.Time, *models.User, map[string][]string) (chan *models.Duration, error)
DeleteByUser(*models.User) error
DeleteByUserBefore(*models.User, time.Time) error
}