Skip to content

Commit 362443f

Browse files
authored
fix: revert back to a working state (#319)
1 parent 056f584 commit 362443f

File tree

11 files changed

+1221
-447
lines changed

11 files changed

+1221
-447
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ tracing.workspace = true
4646
uuid.workspace = true
4747
validator.workspace = true
4848
posthog-rs.workspace = true
49-
once_cell = "1.20.2"
5049

5150
[dev-dependencies]
5251
testcontainers-modules = { workspace = true, features = ["mongo", "redis"] }

api/src/logic/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub mod events;
3434
pub mod knowledge;
3535
pub mod metrics;
3636
pub mod oauth;
37+
pub mod openapi;
3738
pub mod passthrough;
3839
pub mod platform;
3940
pub mod platform_page;

api/src/logic/openapi/builder.rs

Lines changed: 620 additions & 0 deletions
Large diffs are not rendered by default.

api/src/logic/openapi/mod.rs

Lines changed: 372 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,372 @@
1+
mod builder;
2+
3+
use crate::server::AppState;
4+
use axum::extract::{Json, State};
5+
use bson::doc;
6+
use builder::{generate_openapi_schema, generate_path_item};
7+
use convert_case::{Case, Casing};
8+
use futures::{Stream, StreamExt, TryStreamExt};
9+
use indexmap::IndexMap;
10+
use mongodb::error::Error as MongoError;
11+
use openapiv3::*;
12+
use osentities::{
13+
algebra::{MongoStore, TimedExt},
14+
common_model::{CommonEnum, CommonModel},
15+
InternalError, PicaError,
16+
};
17+
use serde::{Deserialize, Serialize};
18+
use std::{
19+
collections::HashSet,
20+
pin::Pin,
21+
sync::{Arc, RwLock},
22+
};
23+
use tokio::task::JoinHandle;
24+
use tracing::{debug, error, info};
25+
26+
#[derive(Clone, Default, Debug)]
27+
pub struct OpenAPIData {
28+
state: Arc<RwLock<CachedSchema>>,
29+
}
30+
31+
impl OpenAPIData {
32+
pub fn get(&self) -> Result<CachedSchema, anyhow::Error> {
33+
self.state.read().map(|state| state.clone()).map_err(|e| {
34+
anyhow::Error::msg(format!("Could not get openapi schema from cache: {e}"))
35+
})
36+
}
37+
38+
pub fn set(&self, value: CachedSchema) -> Result<(), anyhow::Error> {
39+
self.state
40+
.write()
41+
.map(|mut state| *state = value)
42+
.map_err(|e| anyhow::Error::msg(format!("Could not set openapi schema in cache: {e}")))
43+
}
44+
45+
pub fn clear(&self) -> Result<(), anyhow::Error> {
46+
self.set(CachedSchema::default())
47+
}
48+
49+
pub fn spawn_openapi_generation(
50+
&self,
51+
cm_store: MongoStore<CommonModel>,
52+
ce_store: MongoStore<CommonEnum>,
53+
) -> JoinHandle<Result<(), anyhow::Error>> {
54+
spawn_openapi_generation(cm_store, ce_store, self.clone())
55+
}
56+
}
57+
58+
#[derive(Debug, Clone, Default)]
59+
pub struct CachedSchema {
60+
schema: Vec<u8>,
61+
is_generating: bool,
62+
error: Option<String>,
63+
}
64+
65+
#[derive(Debug, Clone, Serialize, Deserialize)]
66+
#[serde(rename_all = "camelCase", untagged)]
67+
pub enum OpenApiSchema {
68+
OpenAPI(Box<OpenAPI>),
69+
Accepted(String),
70+
Error(String),
71+
}
72+
73+
struct PathWithSchema {
74+
path: IndexMap<String, ReferenceOr<PathItem>>,
75+
schema: IndexMap<String, ReferenceOr<Schema>>,
76+
}
77+
78+
struct PathIter {
79+
paths: Vec<IndexMap<String, ReferenceOr<PathItem>>>,
80+
components: IndexMap<String, ReferenceOr<Schema>>,
81+
}
82+
83+
impl PathIter {
84+
/// Takes a list of paths and components, merges the components, collects
85+
/// all the paths and returns a PathIter
86+
fn from_paths(paths: Vec<PathWithSchema>) -> Self {
87+
let mut components = IndexMap::new();
88+
89+
for path in &paths {
90+
components.extend(path.schema.clone());
91+
}
92+
93+
let paths = paths
94+
.into_iter()
95+
.map(|path| path.path)
96+
.collect::<Vec<IndexMap<String, ReferenceOr<PathItem>>>>();
97+
98+
Self { paths, components }
99+
}
100+
}
101+
102+
type StreamResult = Pin<Box<dyn Stream<Item = Result<CommonModel, MongoError>> + Send>>;
103+
104+
pub async fn refresh_openapi(
105+
state: State<Arc<AppState>>,
106+
) -> Result<Json<OpenApiSchema>, PicaError> {
107+
state.openapi_data.clone().clear().map_err(|e| {
108+
error!("Could not clear openapi schema from cache: {:?}", e);
109+
InternalError::io_err("Could not clear openapi schema", None)
110+
})?;
111+
112+
spawn_openapi_generation(
113+
state.app_stores.common_model.clone(),
114+
state.app_stores.common_enum.clone(),
115+
state.openapi_data.clone(),
116+
);
117+
118+
Ok(Json(OpenApiSchema::Accepted(
119+
"OpenAPI schema is being regenerated".to_string(),
120+
)))
121+
}
122+
123+
pub async fn get_openapi(state: State<Arc<AppState>>) -> Result<Json<OpenApiSchema>, PicaError> {
124+
let schema = state.openapi_data.get().map_err(|e| {
125+
error!("Could not get openapi schema from cache: {:?}", e);
126+
127+
InternalError::io_err("Could not get openapi schema", None)
128+
})?;
129+
130+
if schema.is_generating {
131+
info!("OpenAPI schema is being generated");
132+
return Ok(Json(OpenApiSchema::Accepted(
133+
"You're early, the schema is being generated".to_string(),
134+
)));
135+
}
136+
137+
if let Some(error) = &schema.error {
138+
info!("OpenAPI schema generation failed: {}, retrying...", error);
139+
spawn_openapi_generation(
140+
state.app_stores.common_model.clone(),
141+
state.app_stores.common_enum.clone(),
142+
state.openapi_data.clone(),
143+
);
144+
return Err(InternalError::unknown(
145+
&format!("OpenAPI schema generation failed: {}", error),
146+
None,
147+
));
148+
}
149+
150+
let openapi = serde_json::from_slice(schema.schema.as_ref()).map_err(|e| {
151+
error!("Could not deserialize openapi schema: {:?}", e);
152+
153+
InternalError::io_err("Could not deserialize openapi schema", None)
154+
})?;
155+
156+
Ok(Json(OpenApiSchema::OpenAPI(openapi)))
157+
}
158+
159+
fn spawn_openapi_generation(
160+
cm_store: MongoStore<CommonModel>,
161+
ce_store: MongoStore<CommonEnum>,
162+
state: OpenAPIData,
163+
) -> JoinHandle<Result<(), anyhow::Error>> {
164+
tokio::spawn(async move {
165+
let stream: StreamResult = cm_store
166+
.collection
167+
.find(doc! { "primary": true })
168+
.await
169+
.map_err(|e| {
170+
error!("Could not fetch common model: {:?}", e);
171+
e
172+
})?
173+
.boxed();
174+
175+
let cached_schema = CachedSchema {
176+
schema: Vec::new(),
177+
is_generating: true,
178+
error: None,
179+
};
180+
181+
info!("Setting openapi schema as generating in cache");
182+
state.set(cached_schema.clone()).map_err(|e| {
183+
error!("Could not set openapi schema as generating in cache: {e}");
184+
e
185+
})?;
186+
187+
let result = stream
188+
.map(|cm| async {
189+
let cm_store = cm_store.clone();
190+
let ce_store = ce_store.clone();
191+
match cm {
192+
Ok(cm) => Some(
193+
generate_references_data(cm, cm_store, ce_store)
194+
.timed(|_, elapsed| {
195+
debug!("Common model processed in {:?}", elapsed);
196+
})
197+
.await,
198+
),
199+
Err(e) => {
200+
error!("Could not fetch common model: {e}");
201+
None
202+
}
203+
}
204+
})
205+
.buffer_unordered(10)
206+
.filter_map(|x| async { x })
207+
.try_collect::<Vec<PathWithSchema>>()
208+
.await;
209+
210+
match result {
211+
Ok(paths) => {
212+
info!("Generating openapi schema");
213+
let paths = PathIter::from_paths(paths);
214+
let schema = generate_openapi_schema(paths.paths, paths.components);
215+
216+
info!("Deserializing openapi schema");
217+
let schema = serde_json::to_vec(&schema).map_err(|e| {
218+
error!("Could not serialize openapi schema: {e}");
219+
e
220+
});
221+
222+
if schema.is_err() {
223+
state
224+
.set(CachedSchema {
225+
schema: vec![],
226+
is_generating: false,
227+
error: Some(
228+
"Could not serialize openapi schema, retrying...".to_string(),
229+
),
230+
})
231+
.map_err(|e| {
232+
error!("Could not set openapi schema in cache: {e}");
233+
e
234+
})?;
235+
}
236+
237+
info!("Setting openapi schema in cache");
238+
if let Ok(schema) = schema {
239+
state
240+
.set(CachedSchema {
241+
schema,
242+
is_generating: false,
243+
error: None,
244+
})
245+
.map_err(|e| {
246+
error!("Could not set openapi schema in cache: {e}");
247+
e
248+
})?;
249+
}
250+
Ok(())
251+
}
252+
Err(err) => {
253+
error!("Could not generate openapi schema: {err}");
254+
state
255+
.set(CachedSchema {
256+
schema: vec![],
257+
is_generating: false,
258+
error: Some(format!("Could not generate openapi schema: {err}")),
259+
})
260+
.map_err(|e| {
261+
error!("Could not set openapi schema in cache: {e}");
262+
e
263+
})
264+
}
265+
}
266+
})
267+
}
268+
269+
async fn generate_references_data(
270+
cm: CommonModel,
271+
cm_store: MongoStore<CommonModel>,
272+
ce_store: MongoStore<CommonEnum>,
273+
) -> Result<PathWithSchema, anyhow::Error> {
274+
let mut schema = IndexMap::new();
275+
let (child_cms, missing) = cm
276+
.fetch_all_children_common_models(cm_store.clone())
277+
.await?;
278+
// PERF: Use fetch_all_children_common_enums instead
279+
let mut enum_references = cm
280+
.get_enum_references()
281+
.into_iter()
282+
.filter_map(|x| match x.datatype {
283+
osentities::common_model::DataType::Enum { reference, .. } => {
284+
Some(reference.to_case(Case::Pascal))
285+
}
286+
_ => None,
287+
})
288+
.collect::<HashSet<_>>();
289+
290+
if !missing.is_empty() {
291+
debug!("Missing children. Contact platform to create {:?}", missing);
292+
}
293+
294+
// Add properties for children
295+
for (k, child_cm) in child_cms.into_iter() {
296+
schema.insert(k, ReferenceOr::Item(child_cm.reference()));
297+
let references = child_cm
298+
.get_enum_references()
299+
.into_iter()
300+
.filter_map(|x| match x.datatype {
301+
osentities::common_model::DataType::Enum { reference, .. } => {
302+
Some(reference.to_case(Case::Pascal))
303+
}
304+
_ => None,
305+
})
306+
.collect::<HashSet<_>>();
307+
308+
enum_references.extend(references);
309+
}
310+
311+
// Add properties for enum references
312+
let enum_references = ce_store
313+
.get_many(
314+
Some(doc! {
315+
"name": {
316+
"$in": bson::to_bson(&enum_references)?
317+
}
318+
}),
319+
None,
320+
None,
321+
None,
322+
None,
323+
)
324+
.await?;
325+
326+
enum_references.into_iter().for_each(|ce| {
327+
schema.insert(
328+
ce.name.clone(),
329+
ReferenceOr::Item(Schema {
330+
schema_data: Default::default(),
331+
schema_kind: SchemaKind::Type(Type::String(StringType {
332+
format: VariantOrUnknownOrEmpty::Unknown(ce.name.to_case(Case::Camel)),
333+
enumeration: ce
334+
.options
335+
.iter()
336+
.map(|option| Some(option.to_owned()))
337+
.collect(),
338+
..Default::default()
339+
})),
340+
}),
341+
);
342+
});
343+
344+
// Add dummy properties for missing children
345+
for r#ref in missing {
346+
let schema_item = Schema {
347+
schema_data: Default::default(),
348+
schema_kind: SchemaKind::Type(Type::Object(ObjectType {
349+
properties: {
350+
IndexMap::from_iter(vec![(
351+
r#ref.clone(),
352+
ReferenceOr::Item(Box::new(Schema {
353+
schema_data: Default::default(),
354+
schema_kind: SchemaKind::Type(Type::Object(ObjectType {
355+
properties: Default::default(),
356+
..Default::default()
357+
})),
358+
})),
359+
)])
360+
},
361+
..Default::default()
362+
})),
363+
};
364+
schema.insert(r#ref.clone(), ReferenceOr::Item(schema_item));
365+
}
366+
367+
// Add properties for the common model itself
368+
schema.insert(cm.name.clone(), ReferenceOr::Item(cm.reference()));
369+
370+
let path = generate_path_item(&cm);
371+
Ok(PathWithSchema { path, schema })
372+
}

0 commit comments

Comments
 (0)