11using System . Reflection ;
22using AiServer . ServiceModel ;
3+ using AiServer . ServiceModel . Types ;
34using Microsoft . AspNetCore . Hosting ;
45using Microsoft . Extensions . Logging ;
56using ServiceStack ;
7+ using ServiceStack . Jobs ;
68
79namespace AiServer . ServiceInterface ;
810
911public class ComfyServices ( ILogger < ComfyServices > log ,
10- IWebHostEnvironment env ,
12+ AppData appData ,
1113 ComfyMetadata metadata ,
12- ComfyGateway comfyGateway )
14+ ComfyGateway comfyGateway ,
15+ IBackgroundJobs jobs )
1316 : Service
1417{
18+ public const string ComfyBaseUrl = "http://localhost:7860/api" ;
19+ public const string ComfyApiKey = "" ;
20+
1521 public List < string > Get ( GetComfyWorkflows request )
1622 {
17- var workflowsPath = env . WebRootPath . CombineWith ( "lib" , "data" , "workflows" ) ;
23+ var workflowsPath = appData . WebRootPath . CombineWith ( "lib" , "data" , "workflows" ) ;
1824 var files = Directory . GetFiles ( workflowsPath , "*.json" , SearchOption . AllDirectories ) ;
1925
2026 var allWorkflows = files . Map ( x => x [ workflowsPath . Length ..] . TrimStart ( '/' ) ) ;
2127
22- var overrideWorkflowPath = env . ContentRootPath . CombineWith ( "App_Data" , "overrides" , "workflows" ) ;
28+ var overrideWorkflowPath = appData . ContentRootPath . CombineWith ( "App_Data" , "overrides" , "workflows" ) ;
2329 var overrideFiles = Directory . GetFiles ( overrideWorkflowPath , "*.json" , SearchOption . AllDirectories ) ;
2430
2531 allWorkflows . AddRange ( overrideFiles . Map ( x => x [ overrideWorkflowPath . Length ..] . TrimStart ( '/' ) ) ) ;
@@ -52,11 +58,11 @@ public async Task<ComfyWorkflowInfo> GetWorkflowInfoAsync(string path)
5258 private async Task < string ? > GetWorkflowJsonAsync ( string path )
5359 {
5460 path = path . Replace ( '\\ ' , '/' ) ;
55- var workflowsPath = env . WebRootPath . CombineWith ( "lib" , "data" , "workflows" ) ;
61+ var workflowsPath = appData . WebRootPath . CombineWith ( "lib" , "data" , "workflows" ) ;
5662 if ( ! path . IsPathSafe ( workflowsPath ) )
5763 throw new ArgumentNullException ( nameof ( GetComfyWorkflowInfo . Workflow ) , "Invalid Workflow Path" ) ;
5864
59- var overridePath = env . ContentRootPath . CombineWith ( "App_Data" , "overrides" , "workflows" ) . Replace ( '\\ ' , '/' ) ;
65+ var overridePath = appData . ContentRootPath . CombineWith ( "App_Data" , "overrides" , "workflows" ) . Replace ( '\\ ' , '/' ) ;
6066 string ? workflowJson = null ;
6167
6268 if ( File . Exists ( overridePath . CombineWith ( path ) ) )
@@ -98,23 +104,29 @@ public async Task<ComfyWorkflowInfo> GetWorkflowInfoAsync(string path)
98104 return workflowJson ;
99105 }
100106
101- public const string ComfyBaseUrl = "http://localhost:7860/api" ;
102- public const string ComfyApiKey = "" ;
103-
104107 public async Task < string > Get ( GetComfyApiPrompt request )
105108 {
106109 var client = comfyGateway . CreateHttpClient ( ComfyBaseUrl , ComfyApiKey ) ;
107110 var nodeDefs = await metadata . LoadNodeDefinitionsAsync ( client ) ;
108111 var workflowInfo = await GetWorkflowInfoAsync ( request . Workflow ) ;
109112 var workflowJson = await GetWorkflowJsonAsync ( workflowInfo . Path )
110113 ?? throw HttpError . NotFound ( "Workflow not found" ) ;
111- var apiPromptJson = ComfyConverters . ConvertWorkflowToApiPrompt ( workflowJson , nodeDefs , log ) ;
114+ var apiPromptJson = ComfyConverters . ConvertWorkflowToApiPrompt ( workflowJson , nodeDefs , log : log ) ;
112115 return apiPromptJson ;
113116 }
114117
115- public async Task < object > Post ( ExecuteComfyWorkflow request )
118+ public async Task < object > Post ( QueueComfyWorkflow request )
116119 {
117- var client = comfyGateway . CreateHttpClient ( ComfyBaseUrl , ComfyApiKey ) ;
120+ var candidates = appData . MediaProviders
121+ . Where ( x => x is { Enabled : true , OfflineDate : null , MediaTypeId : "ComfyUI" } ) . ToList ( ) ;
122+
123+ if ( candidates . Count == 0 )
124+ throw new Exception ( "No ComfyUI providers available" ) ;
125+
126+ var randomCandidate = candidates [ new Random ( ) . Next ( candidates . Count ) ] ;
127+ var comfyUiApiBaseUrl = randomCandidate . ApiBaseUrl . CombineWith ( "api" ) ;
128+
129+ var client = comfyGateway . CreateHttpClient ( comfyUiApiBaseUrl , randomCandidate . ApiKey ) ;
118130 var nodeDefs = await metadata . LoadNodeDefinitionsAsync ( client ) ;
119131 var workflowInfo = await GetWorkflowInfoAsync ( request . Workflow ) ;
120132 var workflowJson = await GetWorkflowJsonAsync ( workflowInfo . Path )
@@ -125,9 +137,152 @@ public async Task<object> Post(ExecuteComfyWorkflow request)
125137 var result = ComfyWorkflowParser . MergeWorkflow ( workflowJson , request . Args , nodeDefs ) ;
126138 workflowJson = result . Result ;
127139 }
140+
141+ var clientId = Guid . NewGuid ( ) . ToString ( "N" ) ;
142+ var apiPromptJson = ComfyConverters . ConvertWorkflowToApiPrompt ( workflowJson , nodeDefs , clientId , log : log ) ;
143+ var resultJson = await comfyGateway . ExecuteApiPromptAsync ( comfyUiApiBaseUrl , randomCandidate . ApiKey , apiPromptJson ) ;
144+ var resultObj = ( Dictionary < string , object > ) JSON . parse ( resultJson ) ;
145+ var promptId = resultObj . GetValueOrDefault ( "prompt_id" ) ? . ToString ( )
146+ ?? throw new Exception ( "Invalid ComfyUI Queue Result" ) ;
147+
148+ var KeyId = ( Request . GetApiKey ( ) as ApiKeysFeature . ApiKey ) ? . Id ?? 0 ;
149+ log . LogInformation ( "Received QueueComfyWorkflow from '{KeyId}' to execute workflow '{Workflow}' using '{Provider}'" ,
150+ KeyId , request . Workflow , randomCandidate . ApiBaseUrl ) ;
151+
152+ var args = new Dictionary < string , string > {
153+ [ nameof ( KeyId ) ] = $ "{ KeyId } ",
154+ } ;
128155
129- var apiPromptJson = ComfyConverters . ConvertWorkflowToApiPrompt ( workflowJson , nodeDefs , log ) ;
130- var resultJson = await comfyGateway . ExecuteApiPromptAsync ( ComfyBaseUrl , ComfyApiKey , apiPromptJson ) ;
131- return resultJson ;
156+ var jobRef = jobs . EnqueueCommand < GetComfyResultsCommand > ( new GetComfyResults
157+ {
158+ MediaProviderId = randomCandidate . Id ,
159+ ClientId = clientId ,
160+ PromptId = promptId ,
161+ } , new ( ) { RefId = clientId , Args = args } ) ;
162+
163+ return new QueueComfyWorkflowResponse
164+ {
165+ MediaProviderId = randomCandidate . Id ,
166+ RefId = clientId ,
167+ PromptId = promptId ,
168+ JobId = jobRef . Id ,
169+ } ;
132170 }
133171}
172+
173+ public class GetComfyResults
174+ {
175+ public long MediaProviderId { get ; set ; }
176+ public string PromptId { get ; set ; }
177+ public string ClientId { get ; set ; }
178+ public TimeSpan ? Timeout { get ; set ; }
179+ }
180+
181+ public class GetComfyResultsCommand (
182+ ILogger < GetComfyResultsCommand > logger ,
183+ IBackgroundJobs jobs ,
184+ AppData appData ,
185+ AppConfig appConfig ,
186+ ComfyGateway comfyGateway )
187+ : AsyncCommandWithResult < GetComfyResults , ComfyResult >
188+ {
189+ protected override async Task < ComfyResult > RunAsync ( GetComfyResults request , CancellationToken token )
190+ {
191+ var job = Request . GetBackgroundJob ( ) ;
192+ var log = Request . CreateJobLogger ( jobs , logger ) ;
193+
194+ var mediaProvider = appData . MediaProviders . FirstOrDefault ( x => x . Id == request . MediaProviderId )
195+ ?? throw new Exception ( $ "Media Provider { request . MediaProviderId } not available") ;
196+
197+ var keyId = job . Args ? . TryGetValue ( "KeyId" , out var oKeyId ) == true ? oKeyId : "0" ;
198+ var timeout = request . Timeout ?? TimeSpan . FromSeconds ( 5 * 60 ) ;
199+ var startedAt = DateTime . UtcNow ;
200+ while ( DateTime . UtcNow - startedAt < timeout )
201+ {
202+ using var client = comfyGateway . CreateHttpClient ( mediaProvider . ApiBaseUrl ! , mediaProvider . ApiKey ) ;
203+ var response = await client . GetAsync ( $ "/api/history/{ request . PromptId } ", token ) ;
204+ response . EnsureSuccessStatusCode ( ) ;
205+ var historyJson = await response . Content . ReadAsStringAsync ( token ) ;
206+
207+ if ( historyJson . IndexOf ( request . PromptId , StringComparison . OrdinalIgnoreCase ) >= 0 )
208+ {
209+ log . LogInformation ( "Prompt {Prompt} from {Url} has completed" , request . PromptId , mediaProvider . ApiBaseUrl ) ;
210+
211+ var now = DateTime . UtcNow ;
212+ var result = ComfyConverters . ParseComfyResult ( historyJson , mediaProvider . ApiBaseUrl . CombineWith ( "api" ) ) ;
213+
214+ if ( result . Assets ? . Count > 0 )
215+ {
216+ log . LogInformation ( "Downloading {Count} Assets for {Prompt} from {Url}" ,
217+ result . Assets . Count , request . PromptId , mediaProvider . ApiBaseUrl ) ;
218+
219+ var tasks = result . Assets . Map ( async x =>
220+ {
221+ var output = new ComfyAssetOutput
222+ {
223+ NodeId = x . NodeId ,
224+ Type = x . Type ,
225+ FileName = x . FileName ,
226+ } ;
227+ var url = x . Url ;
228+ if ( ! url . StartsWith ( "http://" ) && ! url . StartsWith ( "https://" ) )
229+ {
230+ url = mediaProvider . ApiBaseUrl . CombineWith ( url ) ;
231+ }
232+
233+ var ext = output . FileName . LastRightPart ( '.' ) ;
234+ if ( output . Type == AssetType . Image )
235+ {
236+ url = url . AddQueryParam ( "preview" , "webp" ) ;
237+ ext = "webp" ;
238+ }
239+
240+ var response = await client . GetAsync ( new Uri ( url ) , token ) ;
241+ if ( ! response . IsSuccessStatusCode )
242+ {
243+ log . LogError ( "Failed to download {Url}: {Message}" ,
244+ url , response . ReasonPhrase ?? response . StatusCode . ToString ( ) ) ;
245+ return output ;
246+ }
247+
248+ var imageBytes = await response . Content . ReadAsByteArrayAsync ( token ) ;
249+ var sha256 = imageBytes . ComputeSha256 ( ) ;
250+ output . FileName = $ "{ sha256 } .{ ext } ";
251+ var relativePath = $ "{ now : yyyy} /{ now : MM} /{ now : dd} /{ keyId } /{ output . FileName } ";
252+ var path = appConfig . ArtifactsPath . CombineWith ( relativePath ) ;
253+ Path . GetDirectoryName ( path ) . AssertDir ( ) ;
254+ await File . WriteAllBytesAsync ( path , imageBytes , token ) ;
255+ output . Url = $ "/artifacts/{ relativePath } ";
256+ return output ;
257+ } ) ;
258+
259+ var allTasks = await Task . WhenAll ( tasks ) ;
260+ var completedTasks = allTasks
261+ . Where ( x => x . Url != null ) . ToList ( ) ;
262+
263+ log . LogInformation ( "Downloaded {Count}/{Total} Assets for Prompt {Prompt}:\n {Urls}" ,
264+ completedTasks . Count , allTasks . Length , request . PromptId ,
265+ string . Join ( '\n ' , completedTasks . Map ( x => appConfig . AssetsBaseUrl . CombineWith ( x . Url ) ) ) ) ;
266+
267+ result . Assets = completedTasks ;
268+ }
269+ else if ( ( result . Texts ? . Count ?? 0 ) == 0 )
270+ {
271+ log . LogError ( "Prompt {Prompt} from {Url} did not return any results" ,
272+ request . PromptId , mediaProvider . ApiBaseUrl ) ;
273+
274+ throw new Exception ( $ "Prompt { request . PromptId } from { mediaProvider . ApiBaseUrl } did not return any results") ;
275+ }
276+
277+ return result ;
278+ }
279+
280+ await Task . Delay ( 1000 , token ) ;
281+ }
282+
283+ log . LogError ( "Exceeded timeout of {Seconds} seconds for Prompt {Prompt}" ,
284+ timeout . TotalSeconds , request . PromptId ) ;
285+
286+ throw new TimeoutException ( $ "Exceeded timeout of { timeout . TotalSeconds } seconds for Prompt { request . PromptId } ") ;
287+ }
288+ }
0 commit comments