Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions databases/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Database interface {
SpecificSearchParameters() map[string]any
// search for files visible to the user with the given ORCID using the given
// parameters
// NOTE: returned results will not contain duplicate file IDs!
Search(orcid string, params SearchParameters) (SearchResults, error)
// Returns a slice of Frictionless descriptors associated with files with the
// given IDs that are visible to the user with the given ORCID. A descriptor can
Expand All @@ -53,12 +54,14 @@ type Database interface {
// appear only in a transfer manifest. If any data descriptors are returned
// by this call, the number of descriptors returned will exceed the number of
// requested file IDs by the number of data descriptors.
// NOTE: you may assume that there are no duplicate file IDs
Descriptors(orcid string, fileIds []string) ([]map[string]any, error)
// Returns a list of endpoint names that can be used as sources or destinations
// for transfer with this database
EndpointNames() []string
// begins staging the files visible to the user with the given ORCID for
// transfer, returning a UUID representing the staging operation
// NOTE: you may assume that there are no duplicate file IDs
StageFiles(orcid string, fileIds []string) (uuid.UUID, error)
// returns the status of a given staging operation
StagingStatus(id uuid.UUID) (StagingStatus, error)
Expand Down
93 changes: 57 additions & 36 deletions databases/jdp/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,55 +181,71 @@ func (db *Database) Descriptors(orcid string, fileIds []string) ([]map[string]an
indexForId[strippedFileIds[i]] = i
}

// NOTE: the JDP search/by_file_ids/ endpoint (unofficial, undocumented!) only seems to
// NOTE: accept around 50 file IDs at a time, so we have to batch our requests

batchSize := 50
numBatches := len(strippedFileIds) / batchSize
if numBatches*batchSize < len(strippedFileIds) {
numBatches++
}

var descriptors []map[string]any

type MetadataRequest struct {
Ids []string `json:"ids"`
Aggregations bool `json:"aggregations"`
IncludePrivateData int `json:"include_private_data"`
}
data, err := json.Marshal(MetadataRequest{
Ids: strippedFileIds,
Aggregations: true,
IncludePrivateData: 1,
})
if err != nil {
return nil, err
}
for b := range numBatches {
begin := batchSize * b
end := min(batchSize*(b+1), len(strippedFileIds))
data, err := json.Marshal(MetadataRequest{
Ids: strippedFileIds[begin:end],
Aggregations: true,
IncludePrivateData: 1,
})
if err != nil {
return nil, err
}

body, err := db.post("search/by_file_ids/", orcid, bytes.NewReader(data))
if err != nil {
return nil, err
}
body, err := db.post("search/by_file_ids/", orcid, bytes.NewReader(data))
if err != nil {
return nil, err
}

descriptors, err := descriptorsFromResponseBody(body, nil)
if err != nil {
return nil, err
// get a de-duped list of descriptors
batchDescriptors, err := descriptorsFromResponseBody(body, nil)
if err != nil {
return nil, err
}
descriptors = append(descriptors, batchDescriptors...)
}

// reorder the descriptors to match that of the requested file IDs, and track file IDs that aren't
// matched to descriptors
descriptorsByFileId := make(map[string]map[string]any)
fileIdsFound := make(map[string]bool)
for _, descriptor := range descriptors {
descriptorsByFileId[descriptor["id"].(string)] = descriptor
fileIdsFound[descriptor["id"].(string)] = true
}

// if any file IDs don't have corresponding descriptors, find out which ones and issue an error
if len(descriptors) < len(fileIds) {
if len(descriptorsByFileId) < len(fileIds) {
missingResources := make([]string, 0)
for _, fileId := range fileIds {
if _, found := fileIdsFound[fileId]; !found {
if _, found := descriptorsByFileId[fileId]; !found {
missingResources = append(missingResources, fileId)
}
}
if len(missingResources) > 0 {
return nil, databases.ResourcesNotFoundError{
return nil, &databases.ResourcesNotFoundError{
Database: "JDP",
ResourceIds: missingResources,
}
}
}

descriptors = make([]map[string]any, len(fileIds))
for i := range fileIds {
descriptors[i] = descriptorsByFileId[fileIds[i]]
}
Expand Down Expand Up @@ -307,7 +323,6 @@ func (db *Database) StagingStatus(id uuid.UUID) (databases.StagingStatus, error)
if err != nil {
return databases.StagingStatusUnknown, err
}
slog.Debug(fmt.Sprintf("Queried JDP for staging status of transfer with staging ID %s (request ID: %d); results: %s", id.String(), request.Id, string(body)))
type JDPResult struct {
Status string `json:"status"` // "new", "pending", or "ready"
}
Expand All @@ -322,9 +337,10 @@ func (db *Database) StagingStatus(id uuid.UUID) (databases.StagingStatus, error)
"ready": databases.StagingStatusSucceeded,
}
if status, ok := statusForString[jdpResult.Status]; ok {
slog.Debug(fmt.Sprintf("Queried JDP for staging status of transfer with staging ID %s (request ID: %d): %s", id, request.Id, jdpResult.Status))
return status, nil
}
return databases.StagingStatusUnknown, fmt.Errorf("unrecognized staging status string: %s", jdpResult.Status)
return databases.StagingStatusUnknown, fmt.Errorf("unrecognized JDP staging status string: %s", jdpResult.Status)
} else {
slog.Info(fmt.Sprintf("No staging request found for transfer with staging ID %s", id.String()))
return databases.StagingStatusUnknown, nil
Expand Down Expand Up @@ -657,7 +673,8 @@ func (db *Database) post(resource, orcid string, body io.Reader) ([]byte, error)
}
}

// this helper extracts files for the JDP /search GET query with given parameters
// this helper extracts files for the JDP /search GET query with given parameters, returning a
// de-duped list of descriptors
func descriptorsFromResponseBody(body []byte, extraFields []string) ([]map[string]any, error) {
type JDPResults struct {
Organisms []Organism `json:"organisms"`
Expand All @@ -669,26 +686,30 @@ func descriptorsFromResponseBody(body []byte, extraFields []string) ([]map[strin
}

descriptors := make([]map[string]any, 0)
idsEncountered := make(map[string]bool)

for _, org := range jdpResults.Organisms {
for _, file := range org.Files {
descriptor := descriptorFromOrganismAndFile(org, file)

// add any requested additional metadata
if extraFields != nil {
extras := make(map[string]any)
for _, field := range extraFields {
switch field {
case "project_id":
extras["project_id"] = org.Id
case "img_taxon_oid":
extras["img_taxon_oid"] = file.Metadata.IMG.TaxonOID
descriptorId := descriptor["id"].(string)
if _, found := idsEncountered[descriptorId]; !found {
// add any requested additional metadata
if extraFields != nil {
extras := make(map[string]any)
for _, field := range extraFields {
switch field {
case "project_id":
extras["project_id"] = org.Id
case "img_taxon_oid":
extras["img_taxon_oid"] = file.Metadata.IMG.TaxonOID
}
}
descriptor["extra"] = extras
}
descriptor["extra"] = extras
}

descriptors = append(descriptors, descriptor)
descriptors = append(descriptors, descriptor)
idsEncountered[descriptorId] = true
}
}
}
return descriptors, nil
Expand Down
13 changes: 10 additions & 3 deletions services/prototype.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,13 @@ func (service *prototype) fetchFileMetadata(ctx context.Context,
}
ids := strings.Split(input.Ids, ",")

// have we been given duplicate IDs?
duplicates := duplicateFileIds(ids)
if duplicates != nil {
return nil, huma.Error400BadRequest(fmt.Sprintf("The following requested file IDs have duplicates, which are forbidden: %s",
strings.Join(duplicates, ", ")))
}

slog.Info(fmt.Sprintf("Fetching file metadata for %d files in database %s...",
len(ids), input.Database))
db, err := databases.NewDatabase(input.Database)
Expand Down Expand Up @@ -627,10 +634,10 @@ type TransferOutput struct {

// returns a string slice containing file IDs in the TransferRequest that have duplicates, or nil
// if no duplicates are found
func DuplicateFileIds(transferRequest TransferRequest) []string {
func duplicateFileIds(fileIds []string) []string {
fileIdsEncountered := make(map[string]struct{})
var duplicates []string
for _, fileId := range transferRequest.FileIds {
for _, fileId := range fileIds {
if _, found := fileIdsEncountered[fileId]; found {
duplicates = append(duplicates, fileId)
} else {
Expand Down Expand Up @@ -662,7 +669,7 @@ func (service *prototype) createTransfer(ctx context.Context,
user.Orcid = input.Body.Orcid

// inspect the list of files, making sure there are no duplicates
duplicates := DuplicateFileIds(input.Body)
duplicates := duplicateFileIds(input.Body.FileIds)
if duplicates != nil {
return nil, huma.Error400BadRequest(fmt.Sprintf("The following requested file IDs have duplicates, which are forbidden: %s",
strings.Join(duplicates, ", ")))
Expand Down
2 changes: 1 addition & 1 deletion services/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// Version numbers
var majorVersion = 0
var minorVersion = 11
var patchVersion = 1
var patchVersion = 2

// Version string
var version = fmt.Sprintf("%d.%d.%d", majorVersion, minorVersion, patchVersion)
Loading