diff --git a/adapter/rest/src/content_type.rs b/adapter/rest/src/content_type.rs new file mode 100644 index 0000000..5b7d925 --- /dev/null +++ b/adapter/rest/src/content_type.rs @@ -0,0 +1,318 @@ +use hyper::{ + HeaderMap, + header::{CONTENT_TYPE, HeaderValue}, +}; +use tucana::shared::{ + Value, number_value, + value::Kind::{self, StringValue}, +}; + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum BodyFormat { + Json, + TextPlain, + Unknown, +} + +#[derive(Debug)] +pub enum BodyParseError { + UnsupportedContentType { observed: String }, + InvalidUtf8(std::str::Utf8Error), + InvalidJson(serde_json::Error), +} + +impl std::fmt::Display for BodyParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::UnsupportedContentType { observed } => { + write!(f, "unsupported content type: {}", observed) + } + Self::InvalidUtf8(err) => write!(f, "invalid UTF-8 body: {}", err), + Self::InvalidJson(err) => write!(f, "invalid JSON body: {}", err), + } + } +} + +impl std::error::Error for BodyParseError {} + +#[derive(Debug)] +pub enum BodyEncodeError { + UnsupportedContentType { observed: String }, + InvalidJson(serde_json::Error), +} + +impl std::fmt::Display for BodyEncodeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::UnsupportedContentType { observed } => { + write!(f, "unsupported content type: {}", observed) + } + Self::InvalidJson(err) => write!(f, "failed to encode JSON body: {}", err), + } + } +} + +impl std::error::Error for BodyEncodeError {} + +pub fn parse_body_from_headers( + headers: &HeaderMap, + body: &[u8], +) -> Result, BodyParseError> { + parse_body(get_content_type(headers), body) +} + +pub fn parse_body( + content_type: Option<&str>, + body: &[u8], +) -> Result, BodyParseError> { + if body.is_empty() { + return Ok(None); + } + + match classify_content_type(content_type) { + BodyFormat::Json => parse_json_body(body), + BodyFormat::TextPlain => parse_text_body(body), + BodyFormat::Unknown => { + // If there is no content type + if content_type.is_none() + && let Ok(value) = parse_text_body(body) + { + return Ok(value); + } + + Err(BodyParseError::UnsupportedContentType { + observed: content_type.unwrap_or("").to_string(), + }) + } + } +} + +pub fn encode_body(content_type: Option<&str>, value: Value) -> Result, BodyEncodeError> { + match classify_content_type(content_type) { + BodyFormat::Json => encode_json_body(value), + BodyFormat::TextPlain => encode_text_body(value), + BodyFormat::Unknown => { + // Missing content type falls back to JSON. + if content_type.is_none() { + return encode_json_body(value); + } + + Err(BodyEncodeError::UnsupportedContentType { + observed: content_type.unwrap_or("").to_string(), + }) + } + } +} + +pub fn classify_content_type(content_type: Option<&str>) -> BodyFormat { + let Some(raw) = content_type else { + return BodyFormat::Unknown; + }; + + let essence = raw + .split(';') + .next() + .unwrap_or(raw) + .trim() + .to_ascii_lowercase(); + + if essence == "application/json" || essence.ends_with("+json") { + return BodyFormat::Json; + } + + if essence == "text/plain" { + return BodyFormat::TextPlain; + } + + BodyFormat::Unknown +} + +fn parse_json_body(body: &[u8]) -> Result, BodyParseError> { + let json_value = + serde_json::from_slice::(body).map_err(BodyParseError::InvalidJson)?; + Ok(Some(tucana::shared::helper::value::from_json_value( + json_value, + ))) +} + +fn parse_text_body(body: &[u8]) -> Result, BodyParseError> { + let text = std::str::from_utf8(body).map_err(BodyParseError::InvalidUtf8)?; + Ok(Some(Value { + kind: Some(StringValue(text.to_string())), + })) +} + +fn encode_json_body(value: Value) -> Result, BodyEncodeError> { + let json_val = tucana::shared::helper::value::to_json_value(value); + serde_json::to_vec_pretty(&json_val).map_err(BodyEncodeError::InvalidJson) +} + +fn encode_text_body(value: Value) -> Result, BodyEncodeError> { + if let Some(text) = scalar_to_text(&value) { + return Ok(text.into_bytes()); + } + + // For lists/objects, return valid JSON text as the plain-text body. + encode_json_body(value) +} + +fn scalar_to_text(value: &Value) -> Option { + match value.kind.as_ref() { + Some(Kind::NullValue(_)) | None => Some("null".to_string()), + Some(Kind::BoolValue(v)) => Some(v.to_string()), + Some(Kind::StringValue(v)) => Some(v.clone()), + Some(Kind::NumberValue(v)) => match v.number.as_ref() { + Some(number_value::Number::Integer(i)) => Some(i.to_string()), + Some(number_value::Number::Float(f)) => Some(f.to_string()), + None => Some("null".to_string()), + }, + _ => None, + } +} + +fn get_content_type(headers: &HeaderMap) -> Option<&str> { + headers.get(CONTENT_TYPE).and_then(|h| h.to_str().ok()) +} + +#[cfg(test)] +mod tests { + use super::*; + use tucana::shared::{NumberValue, Struct, Value}; + + #[test] + fn classify_json_content_type_with_charset() { + let format = classify_content_type(Some("application/json; charset=utf-8")); + assert_eq!(format, BodyFormat::Json); + } + + #[test] + fn classify_vendor_json_content_type() { + let format = classify_content_type(Some("application/problem+json")); + assert_eq!(format, BodyFormat::Json); + } + + #[test] + fn classify_text_plain_content_type() { + let format = classify_content_type(Some("text/plain; charset=utf-8")); + assert_eq!(format, BodyFormat::TextPlain); + } + + #[test] + fn parse_json_body_to_struct_value() { + let body = br#"{"hello":"world","ok":true}"#; + let parsed = parse_body(Some("application/json"), body).unwrap(); + + let Some(Value { + kind: Some(Kind::StructValue(Struct { fields })), + }) = parsed + else { + panic!("expected struct value"); + }; + + assert!(fields.contains_key("hello")); + assert!(fields.contains_key("ok")); + } + + #[test] + fn parse_text_body_to_string_value() { + let body = b"hello"; + let parsed = parse_body(Some("text/plain"), body).unwrap(); + + let Some(Value { + kind: Some(Kind::StringValue(v)), + }) = parsed + else { + panic!("expected string value"); + }; + + assert_eq!(v, "hello"); + } + + #[test] + fn parse_unsupported_content_type_fails() { + let body = br#""#; + let err = parse_body(Some("application/xml"), body).unwrap_err(); + + assert!(matches!(err, BodyParseError::UnsupportedContentType { .. })); + } + + #[test] + fn encode_json_body_from_struct_value() { + let value = Value { + kind: Some(Kind::StructValue(Struct { + fields: [( + "hello".to_string(), + Value { + kind: Some(Kind::StringValue("world".to_string())), + }, + )] + .into_iter() + .collect(), + })), + }; + + let encoded = encode_body(Some("application/json"), value).unwrap(); + let parsed: serde_json::Value = serde_json::from_slice(&encoded).unwrap(); + + assert_eq!(parsed["hello"], "world"); + } + + #[test] + fn encode_text_body_from_string_value() { + let value = Value { + kind: Some(Kind::StringValue("hello".to_string())), + }; + + let encoded = encode_body(Some("text/plain"), value).unwrap(); + assert_eq!(encoded, b"hello".to_vec()); + } + + #[test] + fn encode_text_body_from_number_value() { + let value = Value { + kind: Some(Kind::NumberValue(NumberValue { + number: Some(number_value::Number::Integer(42)), + })), + }; + + let encoded = encode_body(Some("text/plain"), value).unwrap(); + assert_eq!(encoded, b"42".to_vec()); + } + + #[test] + fn encode_text_body_from_struct_value_falls_back_to_json_text() { + let value = Value { + kind: Some(Kind::StructValue(Struct { + fields: [( + "answer".to_string(), + Value { + kind: Some(Kind::NumberValue(NumberValue { + number: Some(number_value::Number::Integer(42)), + })), + }, + )] + .into_iter() + .collect(), + })), + }; + + let encoded = encode_body(Some("text/plain"), value).unwrap(); + let body_text = String::from_utf8(encoded).unwrap(); + + assert!(body_text.contains("\"answer\"")); + assert!(body_text.contains("42")); + } + + #[test] + fn encode_unknown_content_type_fails() { + let value = Value { + kind: Some(Kind::StringValue("x".to_string())), + }; + + let err = encode_body(Some("application/xml"), value).unwrap_err(); + assert!(matches!( + err, + BodyEncodeError::UnsupportedContentType { .. } + )); + } +} diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index d1d26a6..286cba7 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -4,27 +4,29 @@ use base::{ traits::Server as ServerTrait, }; use http_body_util::{BodyExt, Full}; +use hyper::server::conn::http1; use hyper::{Request, Response}; use hyper::{ StatusCode, body::{Bytes, Incoming}, }; -use hyper::{ - header::{HeaderName, HeaderValue}, - server::conn::http1, -}; use hyper_util::rt::TokioIo; -use std::collections::HashMap; use std::convert::Infallible; use std::net::SocketAddr; use std::sync::Arc; use tokio::net::TcpListener; use tonic::async_trait; -use tucana::shared::{AdapterConfiguration, RuntimeFeature, value::Kind}; -use tucana::shared::{Struct, ValidationFlow, Value}; -use tucana::shared::{Translation, value::Kind::StructValue}; +use tucana::shared::{ + AdapterConfiguration, RuntimeFeature, Struct, Translation, ValidationFlow, Value, + helper::{path::get_string, value::ToValue}, + value::Kind, +}; + +use crate::response::{error_to_http_response, value_to_http_response}; mod config; +mod content_type; +mod response; mod route; #[tokio::main] @@ -72,163 +74,41 @@ struct HttpServer { addr: Option, } -fn json_error(status: StatusCode, msg: &str) -> Response> { - let body = format!(r#"{{"error": "{}"}}"#, msg); - Response::builder() - .status(status) - .header("content-type", "application/json") - .body(Full::new(Bytes::from(body))) - .unwrap() -} - -fn build_response( - status: StatusCode, - headers: HashMap, - body: Vec, -) -> Response> { - let mut builder = Response::builder().status(status); - - { - let h = builder.headers_mut().unwrap(); - for (k, v) in headers { - let name = match HeaderName::from_bytes(k.as_bytes()) { - Ok(n) => n, - Err(_) => { - log::warn!("Dropping invalid header name: {}", k); - continue; - } - }; - - let value = match HeaderValue::from_str(&v) { - Ok(v) => v, - Err(_) => { - log::warn!("Dropping invalid header value for {}: {:?}", k, v); - continue; - } - }; - - h.insert(name, value); - } - } - - builder.body(Full::new(Bytes::from(body))).unwrap() -} - async fn execute_flow_to_hyper_response( flow: ValidationFlow, - body: Vec, + body: Value, store: Arc, ) -> Response> { - let value: Option = if body.is_empty() { - None - } else { - match prost::Message::decode(body.as_slice()) { - Ok(v) => Some(v), - Err(e) => { - log::warn!("Failed to decode request body as protobuf Value: {}", e); - return json_error( - StatusCode::BAD_REQUEST, - "Failed to decode request body as protobuf Value", - ); - } - } - }; - - match store.validate_and_execute_flow(flow, value).await { + match store.validate_and_execute_flow(flow, Some(body)).await { Some(result) => { log::debug!("Received Result: {:?}", result); - let Value { - kind: Some(StructValue(Struct { fields })), - } = result - else { - return json_error( - StatusCode::INTERNAL_SERVER_ERROR, - "Flow result was not a struct", - ); - }; - - let Some(headers_val) = fields.get("headers") else { - return json_error( - StatusCode::INTERNAL_SERVER_ERROR, - "Flow result missing headers", - ); - }; - let Some(status_code_val) = fields.get("status_code") else { - return json_error( - StatusCode::INTERNAL_SERVER_ERROR, - "Flow result missing status_code", - ); - }; - let Some(payload_val) = fields.get("payload") else { - return json_error( - StatusCode::INTERNAL_SERVER_ERROR, - "Flow result missing payload", - ); - }; - // headers struct - let Value { + if let Value { kind: Some(Kind::StructValue(Struct { - fields: header_fields, + fields: result_fields, })), - } = headers_val - else { - return json_error( + } = &result + && result_fields.contains_key("name") + && result_fields.contains_key("message") + && !result_fields.contains_key("payload") + && !result_fields.contains_key("headers") + { + log::debug!("Detected a RuntimeError"); + let name = get_string("name", &result); + let message = get_string("message", &result); + + return error_to_http_response( StatusCode::INTERNAL_SERVER_ERROR, - "headers was not a list of header entries", + format!("{}: {}", name.unwrap(), message.unwrap()).as_str(), ); - }; - - let http_headers: HashMap = header_fields - .iter() - .filter_map(|(k, v)| { - if let Value { - kind: Some(Kind::StringValue(x)), - } = v - { - Some((k.clone(), x.clone())) - } else { - None - } - }) - .collect(); - - // status_code number - let Some(Kind::NumberValue(code)) = status_code_val.kind else { - return json_error( - StatusCode::INTERNAL_SERVER_ERROR, - "status_code was not a number", - ); - }; - - // payload -> json bytes - let json_val = tucana::shared::helper::value::to_json_value(payload_val.clone()); - let json = serde_json::to_vec_pretty(&json_val).unwrap_or_else(|err| { - let fallback = serde_json::json!({ - "error": format!("Serialization failed: {}", err), - }); - serde_json::to_vec(&fallback) - .unwrap_or_else(|_| br#"{"error":"Serialization failed"}"#.to_vec()) - }); - - let http_code = match code.number { - Some(num) => match num { - tucana::shared::number_value::Number::Integer(int) => int as u16, - tucana::shared::number_value::Number::Float(float) => float as u16, - }, - None => { - return json_error(StatusCode::INTERNAL_SERVER_ERROR, "Flow execution failed"); - } - }; + } - let status = - StatusCode::from_u16(http_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); - build_response(status, http_headers, json) + value_to_http_response(result) } None => { log::error!("flow execution failed"); - json_error(StatusCode::INTERNAL_SERVER_ERROR, "Flow execution failed") + error_to_http_response(StatusCode::INTERNAL_SERVER_ERROR, "Flow execution failed") } } } @@ -239,22 +119,41 @@ pub async fn handle_request( ) -> Result>, Infallible> { let method = req.method().clone(); let path = req.uri().path().to_string(); + let headers = req.headers().clone(); // Read full body let body_bytes = match BodyExt::collect(req.into_body()).await { Ok(collected) => collected.to_bytes().to_vec(), Err(err) => { log::error!("Failed to read request body: {}", err); - return Ok(json_error( + return Ok(error_to_http_response( StatusCode::BAD_REQUEST, "Failed to read request body", )); } }; + let request_body_value = match content_type::parse_body_from_headers(&headers, &body_bytes) { + Ok(value) => value, + Err(err) => { + log::warn!("Failed to parse request body: {}", err); + let status_code = match err { + content_type::BodyParseError::UnsupportedContentType { .. } => { + StatusCode::UNSUPPORTED_MEDIA_TYPE + } + _ => StatusCode::BAD_REQUEST, + }; + + return Ok(error_to_http_response(status_code, &err.to_string())); + } + }; + // slug matching let Some(slug) = route::extract_slug_from_path(&path) else { - return Ok(json_error(StatusCode::BAD_REQUEST, "Missing slug in path")); + return Ok(error_to_http_response( + StatusCode::BAD_REQUEST, + "Missing slug in path", + )); }; let pattern = format!("REST.{}.*", slug); @@ -265,9 +164,39 @@ pub async fn handle_request( let resp = match store.get_possible_flow_match(pattern, route).await { FlowIdentifyResult::Single(flow) => { - execute_flow_to_hyper_response(flow, body_bytes, store).await + let mut header_fields = std::collections::HashMap::new(); + let mut fields = std::collections::HashMap::new(); + + for (name, value) in headers.iter() { + let key = name.as_str().to_owned(); + let value_str = value + .to_str() + .map(str::to_owned) + .unwrap_or_else(|_| String::from_utf8_lossy(value.as_bytes()).into_owned()); + + header_fields.insert(key, value_str.to_value()); + } + + if let Some(v) = request_body_value { + fields.insert(String::from("payload"), v); + }; + + fields.insert( + String::from("headers"), + Value { + kind: Some(Kind::StructValue(Struct { + fields: header_fields, + })), + }, + ); + + let input = Value { + kind: Some(Kind::StructValue(Struct { fields })), + }; + + execute_flow_to_hyper_response(flow, input, store).await } - _ => json_error(StatusCode::NOT_FOUND, "No flow found for path"), + _ => error_to_http_response(StatusCode::NOT_FOUND, "No flow found for path"), }; Ok(resp) diff --git a/adapter/rest/src/response.rs b/adapter/rest/src/response.rs new file mode 100644 index 0000000..d0230a2 --- /dev/null +++ b/adapter/rest/src/response.rs @@ -0,0 +1,163 @@ +use http_body_util::Full; +use hyper::{ + Response, StatusCode, + body::Bytes, + header::{HeaderName, HeaderValue}, +}; +use std::collections::HashMap; +use tucana::shared::{ + Struct, Value, + value::Kind::{self, StructValue}, +}; + +use crate::content_type; + +pub fn error_to_http_response(status: StatusCode, msg: &str) -> Response> { + let body = format!(r#"{{"error": "{}"}}"#, msg); + Response::builder() + .status(status) + .header("content-type", "application/json") + .body(Full::new(Bytes::from(body))) + .unwrap() +} + +pub fn value_to_http_response(value: Value) -> Response> { + let Value { + kind: Some(StructValue(Struct { fields })), + } = value + else { + return error_to_http_response( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow result was not a struct", + ); + }; + + let Some(headers_val) = fields.get("headers") else { + return error_to_http_response( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow result missing the field: headers", + ); + }; + let Some(status_code_val) = fields.get("http_status_code") else { + return error_to_http_response( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow result missing the field: http_status_code", + ); + }; + let Some(payload_val) = fields.get("payload") else { + return error_to_http_response( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow result missing the field: payload", + ); + }; + + // headers struct + let Value { + kind: Some(Kind::StructValue(Struct { + fields: header_fields, + })), + } = headers_val + else { + return error_to_http_response( + StatusCode::INTERNAL_SERVER_ERROR, + "headers was not a list of header entries", + ); + }; + + let mut http_headers: HashMap = header_fields + .iter() + .filter_map(|(k, v)| { + if let Value { + kind: Some(Kind::StringValue(x)), + } = v + { + Some((k.clone(), x.clone())) + } else { + None + } + }) + .collect(); + + if find_header_value_case_insensitive(&http_headers, "content-type").is_none() { + http_headers.insert("content-type".to_string(), "application/json".to_string()); + } + + // status_code number + let Some(Kind::NumberValue(code)) = status_code_val.kind else { + return error_to_http_response( + StatusCode::INTERNAL_SERVER_ERROR, + "status_code was not a number", + ); + }; + + let content_type_header = find_header_value_case_insensitive(&http_headers, "content-type"); + let encoded_body = match content_type::encode_body(content_type_header, payload_val.clone()) { + Ok(body) => body, + Err(err) => { + log::error!("Failed to encode response payload: {}", err); + return error_to_http_response( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to encode response payload", + ); + } + }; + + let http_code = match code.number { + Some(num) => match num { + tucana::shared::number_value::Number::Integer(int) => int as u16, + tucana::shared::number_value::Number::Float(float) => float as u16, + }, + None => { + return error_to_http_response( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow execution failed", + ); + } + }; + + let status = StatusCode::from_u16(http_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + create_http_response(status, http_headers, encoded_body) +} + +fn find_header_value_case_insensitive<'a>( + headers: &'a HashMap, + key: &str, +) -> Option<&'a str> { + headers + .iter() + .find(|(k, _)| k.eq_ignore_ascii_case(key)) + .map(|(_, v)| v.as_str()) +} + +fn create_http_response( + status: StatusCode, + headers: HashMap, + body: Vec, +) -> Response> { + let mut builder = Response::builder().status(status); + + { + let h = builder.headers_mut().unwrap(); + for (k, v) in headers { + let name = match HeaderName::from_bytes(k.as_bytes()) { + Ok(n) => n, + Err(_) => { + log::warn!("Dropping invalid header name: {}", k); + continue; + } + }; + + let value = match HeaderValue::from_str(&v) { + Ok(v) => v, + Err(_) => { + log::warn!("Dropping invalid header value for {}: {:?}", k, v); + continue; + } + }; + + h.insert(name, value); + } + } + + builder.body(Full::new(Bytes::from(body))).unwrap() +} diff --git a/crates/base/src/store.rs b/crates/base/src/store.rs index f5cd9c5..a057e0f 100644 --- a/crates/base/src/store.rs +++ b/crates/base/src/store.rs @@ -121,7 +121,8 @@ impl AdapterStore { // TODO: Replace body vaidation with triangulus when its ready let uuid = uuid::Uuid::new_v4().to_string(); let flow_id = flow.flow_id; - let execution_flow: ExecutionFlow = Self::convert_validation_flow(flow, input_value); + let execution_flow: ExecutionFlow = + Self::convert_validation_flow(flow, input_value.clone()); let bytes = execution_flow.encode_to_vec(); let topic = format!("execution.{}", uuid); log::info!( @@ -129,6 +130,7 @@ impl AdapterStore { flow_id, uuid ); + log::debug!("Flow Input for Execution ({}) is {:?}", uuid, input_value); let result = self.client.request(topic, bytes.into()).await; match result {