From 38d6103ec778e80513e06eb1a5f98c6fb983a077 Mon Sep 17 00:00:00 2001 From: Nekohy Date: Wed, 7 May 2025 03:41:35 +0800 Subject: [PATCH] feat(thunderx,pikpak): add offline download support for ThunderX; add ctx to specific PikPak functions --- drivers/pikpak/driver.go | 16 +- drivers/thunderx/driver.go | 87 ++++++++++- drivers/thunderx/types.go | 99 ++++++++++++ drivers/thunderx/util.go | 1 + internal/conf/const.go | 3 + internal/offline_download/all.go | 1 + .../offline_download/thunderx/thunderx.go | 142 ++++++++++++++++++ internal/offline_download/thunderx/utils.go | 42 ++++++ internal/offline_download/tool/add.go | 7 + internal/offline_download/tool/download.go | 5 +- server/handles/offline_download.go | 45 ++++++ server/router.go | 1 + 12 files changed, 440 insertions(+), 9 deletions(-) create mode 100644 internal/offline_download/thunderx/thunderx.go create mode 100644 internal/offline_download/thunderx/utils.go diff --git a/drivers/pikpak/driver.go b/drivers/pikpak/driver.go index 6c64e6fb..3a2ada4f 100644 --- a/drivers/pikpak/driver.go +++ b/drivers/pikpak/driver.go @@ -139,7 +139,8 @@ func (d *PikPak) Link(ctx context.Context, file model.Obj, args model.LinkArgs) } _, err := d.request(fmt.Sprintf("https://api-drive.mypikpak.net/drive/v1/files/%s", file.GetID()), http.MethodGet, func(req *resty.Request) { - req.SetQueryParams(queryParams) + req.SetContext(ctx). + SetQueryParams(queryParams) }, &resp) if err != nil { return nil, err @@ -158,7 +159,7 @@ func (d *PikPak) Link(ctx context.Context, file model.Obj, args model.LinkArgs) func (d *PikPak) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error { _, err := d.request("https://api-drive.mypikpak.net/drive/v1/files", http.MethodPost, func(req *resty.Request) { - req.SetBody(base.Json{ + req.SetContext(ctx).SetBody(base.Json{ "kind": "drive#folder", "parent_id": parentDir.GetID(), "name": dirName, @@ -169,7 +170,7 @@ func (d *PikPak) MakeDir(ctx context.Context, parentDir model.Obj, dirName strin func (d *PikPak) Move(ctx context.Context, srcObj, dstDir model.Obj) error { _, err := d.request("https://api-drive.mypikpak.net/drive/v1/files:batchMove", http.MethodPost, func(req *resty.Request) { - req.SetBody(base.Json{ + req.SetContext(ctx).SetBody(base.Json{ "ids": []string{srcObj.GetID()}, "to": base.Json{ "parent_id": dstDir.GetID(), @@ -181,7 +182,7 @@ func (d *PikPak) Move(ctx context.Context, srcObj, dstDir model.Obj) error { func (d *PikPak) Rename(ctx context.Context, srcObj model.Obj, newName string) error { _, err := d.request("https://api-drive.mypikpak.net/drive/v1/files/"+srcObj.GetID(), http.MethodPatch, func(req *resty.Request) { - req.SetBody(base.Json{ + req.SetContext(ctx).SetBody(base.Json{ "name": newName, }) }, nil) @@ -190,7 +191,7 @@ func (d *PikPak) Rename(ctx context.Context, srcObj model.Obj, newName string) e func (d *PikPak) Copy(ctx context.Context, srcObj, dstDir model.Obj) error { _, err := d.request("https://api-drive.mypikpak.net/drive/v1/files:batchCopy", http.MethodPost, func(req *resty.Request) { - req.SetBody(base.Json{ + req.SetContext(ctx).SetBody(base.Json{ "ids": []string{srcObj.GetID()}, "to": base.Json{ "parent_id": dstDir.GetID(), @@ -202,7 +203,7 @@ func (d *PikPak) Copy(ctx context.Context, srcObj, dstDir model.Obj) error { func (d *PikPak) Remove(ctx context.Context, obj model.Obj) error { _, err := d.request("https://api-drive.mypikpak.net/drive/v1/files:batchTrash", http.MethodPost, func(req *resty.Request) { - req.SetBody(base.Json{ + req.SetContext(ctx).SetBody(base.Json{ "ids": []string{obj.GetID()}, }) }, nil) @@ -276,7 +277,8 @@ func (d *PikPak) OfflineDownload(ctx context.Context, fileUrl string, parentDir var resp OfflineDownloadResp _, err := d.request("https://api-drive.mypikpak.net/drive/v1/files", http.MethodPost, func(req *resty.Request) { - req.SetBody(requestBody) + req.SetContext(ctx). + SetBody(requestBody) }, &resp) if err != nil { diff --git a/drivers/thunderx/driver.go b/drivers/thunderx/driver.go index 6ee8901a..b984ff25 100644 --- a/drivers/thunderx/driver.go +++ b/drivers/thunderx/driver.go @@ -2,8 +2,11 @@ package thunderx import ( "context" + "encoding/json" + "errors" "fmt" "net/http" + "strconv" "strings" "github.com/alist-org/alist/v3/drivers/base" @@ -477,7 +480,8 @@ func (xc *XunLeiXCommon) Request(url string, method string, callback base.ReqCal } }, resp) - errResp, ok := err.(*ErrResp) + var errResp *ErrResp + ok := errors.As(err, &errResp) if !ok { return nil, err } @@ -556,3 +560,84 @@ func (xc *XunLeiXCommon) IsLogin() bool { _, err := xc.Request(XLUSER_API_URL+"/user/me", http.MethodGet, nil, nil) return err == nil } + +// 离线下载文件,都和Pikpak接口一致 +func (xc *XunLeiXCommon) OfflineDownload(ctx context.Context, fileUrl string, parentDir model.Obj, fileName string) (*OfflineTask, error) { + requestBody := base.Json{ + "kind": "drive#file", + "name": fileName, + "upload_type": "UPLOAD_TYPE_URL", + "url": base.Json{ + "url": fileUrl, + }, + "params": base.Json{}, + "parent_id": parentDir.GetID(), + } + var resp OfflineDownloadResp // 一样的 + _, err := xc.Request(FILE_API_URL, http.MethodPost, func(req *resty.Request) { + req.SetContext(ctx). + SetBody(requestBody) + }, &resp) + + if err != nil { + return nil, err + } + + return &resp.Task, err +} + +// 获取离线下载任务列表 +func (xc *XunLeiXCommon) OfflineList(ctx context.Context, nextPageToken string, phase []string) ([]OfflineTask, error) { + res := make([]OfflineTask, 0) + if len(phase) == 0 { + phase = []string{"PHASE_TYPE_RUNNING", "PHASE_TYPE_ERROR", "PHASE_TYPE_COMPLETE", "PHASE_TYPE_PENDING"} + } + params := map[string]string{ + "type": "offline", + "thumbnail_size": "SIZE_SMALL", + "limit": "10000", + "page_token": nextPageToken, + "with": "reference_resource", + } + + // 处理 phase 参数 + if len(phase) > 0 { + filters := base.Json{ + "phase": map[string]string{ + "in": strings.Join(phase, ","), + }, + } + filtersJSON, err := json.Marshal(filters) + if err != nil { + return nil, fmt.Errorf("failed to marshal filters: %w", err) + } + params["filters"] = string(filtersJSON) + } + + var resp OfflineListResp + _, err := xc.Request(TASKS_API_URL, http.MethodGet, func(req *resty.Request) { + req.SetContext(ctx). + SetQueryParams(params) + }, &resp) + + if err != nil { + return nil, fmt.Errorf("failed to get offline list: %w", err) + } + res = append(res, resp.Tasks...) + return res, nil +} + +func (xc *XunLeiXCommon) DeleteOfflineTasks(ctx context.Context, taskIDs []string, deleteFiles bool) error { + params := map[string]string{ + "task_ids": strings.Join(taskIDs, ","), + "delete_files": strconv.FormatBool(deleteFiles), + } + _, err := xc.Request(TASKS_API_URL, http.MethodDelete, func(req *resty.Request) { + req.SetContext(ctx). + SetQueryParams(params) + }, nil) + if err != nil { + return fmt.Errorf("failed to delete tasks %v: %w", taskIDs, err) + } + return nil +} diff --git a/drivers/thunderx/types.go b/drivers/thunderx/types.go index 77cfa0f2..8faaa1be 100644 --- a/drivers/thunderx/types.go +++ b/drivers/thunderx/types.go @@ -204,3 +204,102 @@ type UploadTaskResponse struct { File Files `json:"file"` } + +// 添加离线下载响应 +type OfflineDownloadResp struct { + File *string `json:"file"` + Task OfflineTask `json:"task"` + UploadType string `json:"upload_type"` + URL struct { + Kind string `json:"kind"` + } `json:"url"` +} + +// 离线下载列表 +type OfflineListResp struct { + ExpiresIn int64 `json:"expires_in"` + NextPageToken string `json:"next_page_token"` + Tasks []OfflineTask `json:"tasks"` +} + +// offlineTask +type OfflineTask struct { + Callback string `json:"callback"` + CreatedTime string `json:"created_time"` + FileID string `json:"file_id"` + FileName string `json:"file_name"` + FileSize string `json:"file_size"` + IconLink string `json:"icon_link"` + ID string `json:"id"` + Kind string `json:"kind"` + Message string `json:"message"` + Name string `json:"name"` + Params Params `json:"params"` + Phase string `json:"phase"` // PHASE_TYPE_RUNNING, PHASE_TYPE_ERROR, PHASE_TYPE_COMPLETE, PHASE_TYPE_PENDING + Progress int64 `json:"progress"` + ReferenceResource ReferenceResource `json:"reference_resource"` + Space string `json:"space"` + StatusSize int64 `json:"status_size"` + Statuses []string `json:"statuses"` + ThirdTaskID string `json:"third_task_id"` + Type string `json:"type"` + UpdatedTime string `json:"updated_time"` + UserID string `json:"user_id"` +} + +type Params struct { + Age string `json:"age"` + MIMEType *string `json:"mime_type,omitempty"` + PredictType string `json:"predict_type"` + URL string `json:"url"` +} + +type ReferenceResource struct { + Type string `json:"@type"` + Audit interface{} `json:"audit"` + Hash string `json:"hash"` + IconLink string `json:"icon_link"` + ID string `json:"id"` + Kind string `json:"kind"` + Medias []Media `json:"medias"` + MIMEType string `json:"mime_type"` + Name string `json:"name"` + Params map[string]interface{} `json:"params"` + ParentID string `json:"parent_id"` + Phase string `json:"phase"` + Size string `json:"size"` + Space string `json:"space"` + Starred bool `json:"starred"` + Tags []string `json:"tags"` + ThumbnailLink string `json:"thumbnail_link"` +} + +type Media struct { + MediaId string `json:"media_id"` + MediaName string `json:"media_name"` + Video struct { + Height int `json:"height"` + Width int `json:"width"` + Duration int `json:"duration"` + BitRate int `json:"bit_rate"` + FrameRate int `json:"frame_rate"` + VideoCodec string `json:"video_codec"` + AudioCodec string `json:"audio_codec"` + VideoType string `json:"video_type"` + } `json:"video"` + Link struct { + Url string `json:"url"` + Token string `json:"token"` + Expire time.Time `json:"expire"` + } `json:"link"` + NeedMoreQuota bool `json:"need_more_quota"` + VipTypes []interface{} `json:"vip_types"` + RedirectLink string `json:"redirect_link"` + IconLink string `json:"icon_link"` + IsDefault bool `json:"is_default"` + Priority int `json:"priority"` + IsOrigin bool `json:"is_origin"` + ResolutionName string `json:"resolution_name"` + IsVisible bool `json:"is_visible"` + Category string `json:"category"` +} diff --git a/drivers/thunderx/util.go b/drivers/thunderx/util.go index 661da87e..0d4dcbcd 100644 --- a/drivers/thunderx/util.go +++ b/drivers/thunderx/util.go @@ -19,6 +19,7 @@ import ( const ( API_URL = "https://api-pan.xunleix.com/drive/v1" FILE_API_URL = API_URL + "/files" + TASKS_API_URL = API_URL + "/tasks" XLUSER_API_URL = "https://xluser-ssl.xunleix.com/v1" ) diff --git a/internal/conf/const.go b/internal/conf/const.go index 5cb8d850..536bbb92 100644 --- a/internal/conf/const.go +++ b/internal/conf/const.go @@ -69,6 +69,9 @@ const ( // thunder ThunderTempDir = "thunder_temp_dir" + // thunderx + ThunderXTempDir = "thunderx_temp_dir" + // single Token = "token" IndexProgress = "index_progress" diff --git a/internal/offline_download/all.go b/internal/offline_download/all.go index 3d0c7c73..95f27890 100644 --- a/internal/offline_download/all.go +++ b/internal/offline_download/all.go @@ -7,5 +7,6 @@ import ( _ "github.com/alist-org/alist/v3/internal/offline_download/pikpak" _ "github.com/alist-org/alist/v3/internal/offline_download/qbit" _ "github.com/alist-org/alist/v3/internal/offline_download/thunder" + _ "github.com/alist-org/alist/v3/internal/offline_download/thunderx" _ "github.com/alist-org/alist/v3/internal/offline_download/transmission" ) diff --git a/internal/offline_download/thunderx/thunderx.go b/internal/offline_download/thunderx/thunderx.go new file mode 100644 index 00000000..8043fbba --- /dev/null +++ b/internal/offline_download/thunderx/thunderx.go @@ -0,0 +1,142 @@ +package thunderx + +import ( + "context" + "errors" + "fmt" + "github.com/alist-org/alist/v3/drivers/thunderx" + "github.com/alist-org/alist/v3/internal/conf" + "github.com/alist-org/alist/v3/internal/errs" + "github.com/alist-org/alist/v3/internal/model" + "github.com/alist-org/alist/v3/internal/offline_download/tool" + "github.com/alist-org/alist/v3/internal/op" + "github.com/alist-org/alist/v3/internal/setting" + "strconv" +) + +type ThunderX struct { + refreshTaskCache bool +} + +func (t *ThunderX) Name() string { + return "ThunderX" +} + +func (t *ThunderX) Items() []model.SettingItem { + return nil +} + +func (t *ThunderX) Init() (string, error) { + t.refreshTaskCache = false + return "ok", nil +} + +func (t *ThunderX) IsReady() bool { + tempDir := setting.GetStr(conf.ThunderXTempDir) + if tempDir == "" { + return false + } + storage, _, err := op.GetStorageAndActualPath(tempDir) + if err != nil { + return false + } + if _, ok := storage.(*thunderx.ThunderX); !ok { + return false + } + return true +} + +func (t *ThunderX) AddURL(args *tool.AddUrlArgs) (string, error) { + // 添加新任务刷新缓存 + t.refreshTaskCache = true + storage, actualPath, err := op.GetStorageAndActualPath(args.TempDir) + if err != nil { + return "", err + } + thunderXDriver, ok := storage.(*thunderx.ThunderX) + if !ok { + return "", fmt.Errorf("unsupported storage driver for offline download, only ThunderX is supported") + } + + ctx := context.Background() + + if err := op.MakeDir(ctx, storage, actualPath); err != nil { + return "", err + } + + parentDir, err := op.GetUnwrap(ctx, storage, actualPath) + if err != nil { + return "", err + } + + task, err := thunderXDriver.OfflineDownload(ctx, args.Url, parentDir, "") + if err != nil { + return "", fmt.Errorf("failed to add offline download task: %w", err) + } + + return task.ID, nil +} + +func (t *ThunderX) Remove(task *tool.DownloadTask) error { + storage, _, err := op.GetStorageAndActualPath(task.TempDir) + if err != nil { + return err + } + thunderXDriver, ok := storage.(*thunderx.ThunderX) + if !ok { + return fmt.Errorf("unsupported storage driver for offline download, only ThunderX is supported") + } + ctx := context.Background() + err = thunderXDriver.DeleteOfflineTasks(ctx, []string{task.GID}, false) + if err != nil { + return err + } + return nil +} + +func (t *ThunderX) Status(task *tool.DownloadTask) (*tool.Status, error) { + storage, _, err := op.GetStorageAndActualPath(task.TempDir) + if err != nil { + return nil, err + } + thunderXDriver, ok := storage.(*thunderx.ThunderX) + if !ok { + return nil, fmt.Errorf("unsupported storage driver for offline download, only ThunderX is supported") + } + tasks, err := t.GetTasks(thunderXDriver) + if err != nil { + return nil, err + } + s := &tool.Status{ + Progress: 0, + NewGID: "", + Completed: false, + Status: "the task has been deleted", + Err: nil, + } + for _, t := range tasks { + if t.ID == task.GID { + s.Progress = float64(t.Progress) + s.Status = t.Message + s.Completed = t.Phase == "PHASE_TYPE_COMPLETE" + s.TotalBytes, err = strconv.ParseInt(t.FileSize, 10, 64) + if err != nil { + s.TotalBytes = 0 + } + if t.Phase == "PHASE_TYPE_ERROR" { + s.Err = errors.New(t.Message) + } + return s, nil + } + } + s.Err = fmt.Errorf("the task has been deleted") + return s, nil +} + +func (t *ThunderX) Run(task *tool.DownloadTask) error { + return errs.NotSupport +} + +func init() { + tool.Tools.Add(&ThunderX{}) +} diff --git a/internal/offline_download/thunderx/utils.go b/internal/offline_download/thunderx/utils.go new file mode 100644 index 00000000..e645e2f4 --- /dev/null +++ b/internal/offline_download/thunderx/utils.go @@ -0,0 +1,42 @@ +package thunderx + +import ( + "context" + "github.com/Xhofe/go-cache" + "github.com/alist-org/alist/v3/drivers/thunderx" + "github.com/alist-org/alist/v3/internal/op" + "github.com/alist-org/alist/v3/pkg/singleflight" + "time" +) + +var taskCache = cache.NewMemCache(cache.WithShards[[]thunderx.OfflineTask](16)) +var taskG singleflight.Group[[]thunderx.OfflineTask] + +func (t *ThunderX) GetTasks(thunderxDriver *thunderx.ThunderX) ([]thunderx.OfflineTask, error) { + key := op.Key(thunderxDriver, "/drive/v1/task") + if !t.refreshTaskCache { + if tasks, ok := taskCache.Get(key); ok { + return tasks, nil + } + } + t.refreshTaskCache = false + tasks, err, _ := taskG.Do(key, func() ([]thunderx.OfflineTask, error) { + ctx := context.Background() + phase := []string{"PHASE_TYPE_RUNNING", "PHASE_TYPE_ERROR", "PHASE_TYPE_PENDING", "PHASE_TYPE_COMPLETE"} + tasks, err := thunderxDriver.OfflineList(ctx, "", phase) + if err != nil { + return nil, err + } + // 添加缓存 10s + if len(tasks) > 0 { + taskCache.Set(key, tasks, cache.WithEx[[]thunderx.OfflineTask](time.Second*10)) + } else { + taskCache.Del(key) + } + return tasks, nil + }) + if err != nil { + return nil, err + } + return tasks, nil +} diff --git a/internal/offline_download/tool/add.go b/internal/offline_download/tool/add.go index d64e43e8..6e1c0a49 100644 --- a/internal/offline_download/tool/add.go +++ b/internal/offline_download/tool/add.go @@ -9,6 +9,7 @@ import ( _115 "github.com/alist-org/alist/v3/drivers/115" "github.com/alist-org/alist/v3/drivers/pikpak" "github.com/alist-org/alist/v3/drivers/thunder" + "github.com/alist-org/alist/v3/drivers/thunderx" "github.com/alist-org/alist/v3/internal/conf" "github.com/alist-org/alist/v3/internal/errs" "github.com/alist-org/alist/v3/internal/fs" @@ -103,6 +104,12 @@ func AddURL(ctx context.Context, args *AddURLArgs) (task.TaskExtensionInfo, erro } else { tempDir = filepath.Join(setting.GetStr(conf.ThunderTempDir), uid) } + case "ThunderX": + if _, ok := storage.(*thunderx.ThunderX); ok { + tempDir = args.DstDirPath + } else { + tempDir = filepath.Join(setting.GetStr(conf.ThunderXTempDir), uid) + } } taskCreator, _ := ctx.Value("user").(*model.User) // taskCreator is nil when convert failed diff --git a/internal/offline_download/tool/download.go b/internal/offline_download/tool/download.go index 42b2dbfb..9c342008 100644 --- a/internal/offline_download/tool/download.go +++ b/internal/offline_download/tool/download.go @@ -87,6 +87,9 @@ outer: if t.tool.Name() == "Thunder" { return nil } + if t.tool.Name() == "ThunderX" { + return nil + } if t.tool.Name() == "115 Cloud" { // hack for 115 <-time.After(time.Second * 1) @@ -159,7 +162,7 @@ func (t *DownloadTask) Update() (bool, error) { func (t *DownloadTask) Transfer() error { toolName := t.tool.Name() - if toolName == "115 Cloud" || toolName == "PikPak" || toolName == "Thunder" { + if toolName == "115 Cloud" || toolName == "PikPak" || toolName == "Thunder" || toolName == "ThunderX" { // 如果不是直接下载到目标路径,则进行转存 if t.TempDir != t.DstDirPath { return transferObj(t.Ctx(), t.TempDir, t.DstDirPath, t.DeletePolicy) diff --git a/server/handles/offline_download.go b/server/handles/offline_download.go index 24ff7a05..da71d94e 100644 --- a/server/handles/offline_download.go +++ b/server/handles/offline_download.go @@ -4,6 +4,7 @@ import ( _115 "github.com/alist-org/alist/v3/drivers/115" "github.com/alist-org/alist/v3/drivers/pikpak" "github.com/alist-org/alist/v3/drivers/thunder" + "github.com/alist-org/alist/v3/drivers/thunderx" "github.com/alist-org/alist/v3/internal/conf" "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/offline_download/tool" @@ -239,6 +240,50 @@ func SetThunder(c *gin.Context) { common.SuccessResp(c, "ok") } +type SetThunderXReq struct { + TempDir string `json:"temp_dir" form:"temp_dir"` +} + +func SetThunderX(c *gin.Context) { + var req SetThunderXReq + if err := c.ShouldBind(&req); err != nil { + common.ErrorResp(c, err, 400) + return + } + if req.TempDir != "" { + storage, _, err := op.GetStorageAndActualPath(req.TempDir) + if err != nil { + common.ErrorStrResp(c, "storage does not exists", 400) + return + } + if storage.Config().CheckStatus && storage.GetStorage().Status != op.WORK { + common.ErrorStrResp(c, "storage not init: "+storage.GetStorage().Status, 400) + return + } + if _, ok := storage.(*thunderx.ThunderX); !ok { + common.ErrorStrResp(c, "unsupported storage driver for offline download, only ThunderX is supported", 400) + return + } + } + items := []model.SettingItem{ + {Key: conf.ThunderXTempDir, Value: req.TempDir, Type: conf.TypeString, Group: model.OFFLINE_DOWNLOAD, Flag: model.PRIVATE}, + } + if err := op.SaveSettingItems(items); err != nil { + common.ErrorResp(c, err, 500) + return + } + _tool, err := tool.Tools.Get("ThunderX") + if err != nil { + common.ErrorResp(c, err, 500) + return + } + if _, err := _tool.Init(); err != nil { + common.ErrorResp(c, err, 500) + return + } + common.SuccessResp(c, "ok") +} + func OfflineDownloadTools(c *gin.Context) { tools := tool.Tools.Names() common.SuccessResp(c, tools) diff --git a/server/router.go b/server/router.go index 09a0bb44..3dd20895 100644 --- a/server/router.go +++ b/server/router.go @@ -147,6 +147,7 @@ func admin(g *gin.RouterGroup) { setting.POST("/set_115", handles.Set115) setting.POST("/set_pikpak", handles.SetPikPak) setting.POST("/set_thunder", handles.SetThunder) + setting.POST("/set_thunderx", handles.SetThunderX) // retain /admin/task API to ensure compatibility with legacy automation scripts _task(g.Group("/task"))