feat(typesense): add new tasks to typesense directly when they are created
This commit is contained in:
parent
8f4ee3a089
commit
98102e59f2
@ -63,6 +63,7 @@ func RegisterListeners() {
|
|||||||
events.RegisterListener((&TaskRelationDeletedEvent{}).Name(), &HandleTaskUpdateLastUpdated{})
|
events.RegisterListener((&TaskRelationDeletedEvent{}).Name(), &HandleTaskUpdateLastUpdated{})
|
||||||
if config.TypesenseEnabled.GetBool() {
|
if config.TypesenseEnabled.GetBool() {
|
||||||
events.RegisterListener((&TaskDeletedEvent{}).Name(), &RemoveTaskFromTypesense{})
|
events.RegisterListener((&TaskDeletedEvent{}).Name(), &RemoveTaskFromTypesense{})
|
||||||
|
events.RegisterListener((&TaskCreatedEvent{}).Name(), &AddTaskToTypesense{})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -506,6 +507,38 @@ func (s *RemoveTaskFromTypesense) Handle(msg *message.Message) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddTaskToTypesense represents a listener
|
||||||
|
type AddTaskToTypesense struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name defines the name for the AddTaskToTypesense listener
|
||||||
|
func (l *AddTaskToTypesense) Name() string {
|
||||||
|
return "add.task.to.typesense"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle is executed when the event AddTaskToTypesense listens on is fired
|
||||||
|
func (l *AddTaskToTypesense) Handle(msg *message.Message) (err error) {
|
||||||
|
event := &TaskCreatedEvent{}
|
||||||
|
err = json.Unmarshal(msg.Payload, event)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("New task %d created, adding to typesense…", event.Task.ID)
|
||||||
|
|
||||||
|
s := db.NewSession()
|
||||||
|
defer s.Close()
|
||||||
|
ttask, err := getTypesenseTaskForTask(s, event.Task, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = typesenseClient.Collection("tasks").
|
||||||
|
Documents().
|
||||||
|
Create(ttask)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
///////
|
///////
|
||||||
// Project Event Listeners
|
// Project Event Listeners
|
||||||
|
|
||||||
|
@ -247,6 +247,36 @@ func ReindexAllTasks() (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getTypesenseTaskForTask(s *xorm.Session, task *Task, projectsCache map[int64]*Project) (ttask *typesenseTask, err error) {
|
||||||
|
ttask = convertTaskToTypesenseTask(task)
|
||||||
|
|
||||||
|
var p *Project
|
||||||
|
if projectsCache == nil {
|
||||||
|
p, err = GetProjectSimpleByID(s, task.ProjectID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not fetch project %d: %s", task.ProjectID, err.Error())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
var has bool
|
||||||
|
p, has = projectsCache[task.ProjectID]
|
||||||
|
if !has {
|
||||||
|
p, err = GetProjectSimpleByID(s, task.ProjectID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not fetch project %d: %s", task.ProjectID, err.Error())
|
||||||
|
}
|
||||||
|
projectsCache[task.ProjectID] = p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
comment := &TaskComment{TaskID: task.ID}
|
||||||
|
ttask.Comments, _, _, err = comment.ReadAll(s, &user.User{ID: p.OwnerID}, "", -1, -1)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not fetch comments for task %d: %s", task.ID, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func reindexTasks(s *xorm.Session, tasks map[int64]*Task) (err error) {
|
func reindexTasks(s *xorm.Session, tasks map[int64]*Task) (err error) {
|
||||||
|
|
||||||
if len(tasks) == 0 {
|
if len(tasks) == 0 {
|
||||||
@ -263,24 +293,13 @@ func reindexTasks(s *xorm.Session, tasks map[int64]*Task) (err error) {
|
|||||||
|
|
||||||
typesenseTasks := []interface{}{}
|
typesenseTasks := []interface{}{}
|
||||||
for _, task := range tasks {
|
for _, task := range tasks {
|
||||||
searchTask := convertTaskToTypesenseTask(task)
|
|
||||||
|
|
||||||
p, has := projects[task.ProjectID]
|
ttask, err := getTypesenseTaskForTask(s, task, projects)
|
||||||
if !has {
|
|
||||||
p, err = GetProjectSimpleByID(s, task.ProjectID)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not fetch project %d: %s", task.ProjectID, err.Error())
|
|
||||||
}
|
|
||||||
projects[task.ProjectID] = p
|
|
||||||
}
|
|
||||||
|
|
||||||
comment := &TaskComment{TaskID: task.ID}
|
|
||||||
searchTask.Comments, _, _, err = comment.ReadAll(s, &user.User{ID: p.OwnerID}, "", -1, -1)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not fetch comments for task %d: %s", task.ID, err.Error())
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
typesenseTasks = append(typesenseTasks, searchTask)
|
typesenseTasks = append(typesenseTasks, ttask)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = typesenseClient.Collection("tasks").
|
_, err = typesenseClient.Collection("tasks").
|
||||||
@ -487,6 +506,7 @@ func SyncUpdatedTasksIntoTypesense() (err error) {
|
|||||||
|
|
||||||
err = s.
|
err = s.
|
||||||
Where("updated >= ?", lastSync.SyncStartedAt).
|
Where("updated >= ?", lastSync.SyncStartedAt).
|
||||||
|
And("updated != created"). // new tasks are already indexed via the event handler
|
||||||
Find(tasks)
|
Find(tasks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = s.Rollback()
|
_ = s.Rollback()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user