diff --git a/obp-api/src/main/protobuf/api.proto b/obp-api/src/main/protobuf/api.proto index ed828e3ae5..0850bea036 100644 --- a/obp-api/src/main/protobuf/api.proto +++ b/obp-api/src/main/protobuf/api.proto @@ -218,16 +218,13 @@ message AccountsBalancesV310JsonGrpc { } service ObpService { - // Sends a greeting rpc getBanks(google.protobuf.Empty) returns (BanksJson400Grpc) {} - //Returns the list of accounts at BANK_ID that the user has access to. - //For each account the API returns the account ID and the available views. - rpc getPrivateAccountsAtOneBank(BankIdUserIdGrpc) returns (AccountsGrpc) {} - - //Get the Balances for the Accounts of the current User at one bank. - rpc getBankAccountsBalances(BankIdGrpc) returns (AccountsBalancesV310JsonGrpc) {} - - //Returns transactions list (Core info) of the account specified by ACCOUNT_ID. - rpc getCoreTransactionsForBankAccount(BankIdAccountIdAndUserIdGrpc) returns (CoreTransactionsJsonV300Grpc) {} + // Temporarily disabled — see ApiProto.scala javaDescriptor filter and + // ObpServiceGrpc.scala bindService. Re-enable by un-commenting here and + // in those files. + // + //rpc getPrivateAccountsAtOneBank(BankIdUserIdGrpc) returns (AccountsGrpc) {} + //rpc getBankAccountsBalances(BankIdGrpc) returns (AccountsBalancesV310JsonGrpc) {} + //rpc getCoreTransactionsForBankAccount(BankIdAccountIdAndUserIdGrpc) returns (CoreTransactionsJsonV300Grpc) {} } diff --git a/obp-api/src/main/protobuf/log_cache.proto b/obp-api/src/main/protobuf/log_cache.proto new file mode 100644 index 0000000000..5b7b9cc733 --- /dev/null +++ b/obp-api/src/main/protobuf/log_cache.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; +package code.obp.grpc.logcache.g1; + +import "google/protobuf/timestamp.proto"; + +// Log level. Wire format: varint int32. Mirrors RedisLogger.LogLevel. +// ALL is the aggregate firehose — gated by canGetSystemLogCacheAll entitlement. +enum LogLevel { + LOG_LEVEL_UNSPECIFIED = 0; + TRACE = 1; + DEBUG = 2; + INFO = 3; + WARNING = 4; + ERROR = 5; + ALL = 6; +} + +message StreamLogCacheRequest { + LogLevel level = 1; +} + +message LogCacheEntry { + LogLevel level = 1; + string message = 2; + google.protobuf.Timestamp timestamp = 3; + // Identifies which OBP instance (pod) emitted this entry. Sourced from + // the `api_instance_id` prop — either a configured value suffixed with a + // per-JVM UUID, or a pure UUID if the prop is unset. See + // code.api.constant.Constant.ApiInstanceId. + string api_instance_id = 4; +} + +// Live tail of the Redis-backed log cache. +// History is served by the REST endpoints GET /system/log-cache/{level}; +// this service delivers only new entries via Redis pub/sub channels. +// +// Per-level channels: subscribing to TRACE only delivers TRACE; ALL is a +// separate firehose requiring canGetSystemLogCacheAll. +service LogCacheStreamService { + rpc StreamLogCacheEntries(StreamLogCacheRequest) returns (stream LogCacheEntry); +} diff --git a/obp-api/src/main/protobuf/metrics_stream.proto b/obp-api/src/main/protobuf/metrics_stream.proto new file mode 100644 index 0000000000..cf9c254dd3 --- /dev/null +++ b/obp-api/src/main/protobuf/metrics_stream.proto @@ -0,0 +1,52 @@ +syntax = "proto3"; +package code.obp.grpc.metricsstream.g1; + +// Server-side filters. Empty string = no filter on that field. +// Filters AND together: passing consumer_id + verb = events matching BOTH. +// url_substring matches if the event's url contains the given substring. +message StreamMetricsRequest { + string consumer_id = 1; + string user_id = 2; + string verb = 3; + string url_substring = 4; + string implemented_by_partial_function = 5; + string app_name = 6; +} + +// Per-REST-call metric record, mirrors APIMetrics.saveMetric args and +// MetricJsonV600 (REST v6.0.0). Field names track REST v6.0.0 verbatim. +// +// `response_body` is intentionally omitted — can be large and contain PII; +// fetch via the REST /management/metrics endpoint if needed. +message MetricEvent { + string url = 1; + // ISO-8601 UTC, seconds precision — matches REST v6.0.0 (yyyy-MM-dd'T'HH:mm:ss'Z'). + string date = 2; + int64 duration = 3; + string user_id = 4; + string username = 5; + string app_name = 6; + string developer_email = 7; + string consumer_id = 8; + string implemented_by_partial_function = 9; + string implemented_in_version = 10; + string verb = 11; + int32 status_code = 12; + string correlation_id = 13; + string source_ip = 14; + string target_ip = 15; + // Identifies which OBP instance (pod) served this request. Sourced from + // the `api_instance_id` prop — either a configured value suffixed with a + // per-JVM UUID, or a pure UUID if the prop is unset. See + // code.api.Constant.ApiInstanceId. + string api_instance_id = 16; + // OBP operation id, e.g. "OBPv6.0.0-getBanks". Matches MetricJsonV600.operation_id. + string operation_id = 17; +} + +// Live tail of API metrics as they are written. +// History is served by the REST endpoint GET /management/metrics; this +// service delivers only new metrics via a Redis pub/sub channel. +service MetricsStreamService { + rpc StreamMetrics(StreamMetricsRequest) returns (stream MetricEvent); +} diff --git a/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/SwaggerDefinitionsJSON.scala b/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/SwaggerDefinitionsJSON.scala index 10959b48e8..a4bd7c8409 100644 --- a/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/SwaggerDefinitionsJSON.scala +++ b/obp-api/src/main/scala/code/api/ResourceDocs1_4_0/SwaggerDefinitionsJSON.scala @@ -3191,7 +3191,9 @@ object SwaggerDefinitionsJSON { source_ip = ExampleValue.ipAddressExample.value, target_ip = ExampleValue.ipAddressExample.value, response_body = json.parse("""{"code":401,"message":"OBP-20001: User not logged in. Authentication is required!"}"""), - operation_id = "OBPv4.0.0-getBanks" + status_code = 401, + operation_id = "OBPv4.0.0-getBanks", + api_instance_id = "obp_node_a" ) lazy val metricsJsonV600 = MetricsJsonV600( metrics = List(metricJsonV600) diff --git a/obp-api/src/main/scala/code/api/cache/RedisLogger.scala b/obp-api/src/main/scala/code/api/cache/RedisLogger.scala index 02db0209c8..2307f15bbf 100644 --- a/obp-api/src/main/scala/code/api/cache/RedisLogger.scala +++ b/obp-api/src/main/scala/code/api/cache/RedisLogger.scala @@ -2,8 +2,11 @@ package code.api.cache import code.api.util.ApiRole._ import code.api.util.{APIUtil, ApiRole} +import code.logcache.LogCacheEventBus import net.liftweb.common.{Box, Empty, Failure => LiftFailure, Full, Logger} +import net.liftweb.json +import net.liftweb.json.Serialization.write import redis.clients.jedis.{Jedis, Pipeline} import java.util.concurrent.{Executors, ScheduledThreadPoolExecutor, TimeUnit} @@ -108,10 +111,7 @@ object RedisLogger { while (attempt < maxRetries) { try { withPipeline { pipeline => - // log to requested level - configs.get(level).foreach(cfg => pushLog(pipeline, cfg, message)) - // also log to ALL - configs.get(LogLevel.ALL).foreach(cfg => pushLog(pipeline, cfg, s"[$level] $message")) + writeAndPublish(pipeline, level, message) pipeline.sync() } @@ -212,8 +212,7 @@ object RedisLogger { try { withPipeline { pipeline => entriesToFlush.asScala.foreach { logEntry => - configs.get(logEntry.level).foreach(cfg => pushLog(pipeline, cfg, logEntry.message)) - configs.get(LogLevel.ALL).foreach(cfg => pushLog(pipeline, cfg, s"[${logEntry.level}] ${logEntry.message}")) + writeAndPublish(pipeline, logEntry.level, logEntry.message) } pipeline.sync() } @@ -269,6 +268,25 @@ object RedisLogger { } } + private implicit val streamFormats = json.DefaultFormats + + /** + * Write a log entry to the REST-facing Redis lists (level queue + ALL queue) + * and publish it once to the gRPC stream bus. All commands go through the + * caller's pipeline so the whole thing is one Redis round-trip. + */ + private def writeAndPublish(pipeline: Pipeline, level: LogLevel.LogLevel, message: String): Unit = { + configs.get(level).foreach(cfg => pushLog(pipeline, cfg, message)) + configs.get(LogLevel.ALL).foreach(cfg => pushLog(pipeline, cfg, s"[$level] $message")) + val payload = write(Map( + "level" -> level.toString, + "message" -> message, + "ts" -> System.currentTimeMillis(), + "api_instance_id" -> code.api.Constant.ApiInstanceId + )) + LogCacheEventBus.publishInPipeline(pipeline, level, payload) + } + case class LogTailEntry(level: String, message: String) case class LogTail(entries: List[LogTailEntry]) diff --git a/obp-api/src/main/scala/code/api/util/WriteMetricUtil.scala b/obp-api/src/main/scala/code/api/util/WriteMetricUtil.scala index fec3931367..82e7bec744 100644 --- a/obp-api/src/main/scala/code/api/util/WriteMetricUtil.scala +++ b/obp-api/src/main/scala/code/api/util/WriteMetricUtil.scala @@ -4,6 +4,7 @@ import code.api.DirectLogin import code.api.util.APIUtil.{ResourceDoc, buildOperationId, getCorrelationId, getPropsAsBoolValue, getPropsValue, hasAnOAuthHeader, hasDirectLoginHeader} import code.api.util.ErrorMessages.attemptedToOpenAnEmptyBox import code.metrics.APIMetrics +import code.metricsstream.MetricsEventBus import code.model.Consumer import code.util.Helper.{MdcLoggable, ObpS} import com.openbankproject.commons.model.User @@ -11,10 +12,12 @@ import net.liftweb.common.{Box, Empty, Full} import net.liftweb.http.S import net.liftweb.util.TimeHelpers.TimeSpan +import java.util.Date import scala.collection.immutable import scala.concurrent.Future import com.openbankproject.commons.ExecutionContext.Implicits.global import net.liftweb.json.{Extraction, JValue, compactRender} +import net.liftweb.json.Serialization.write object WriteMetricUtil extends MdcLoggable { @@ -61,6 +64,8 @@ object WriteMetricUtil extends MdcLoggable { val appName = cc.appName.orNull val developerEmail = cc.developerEmail.orNull + val sourceIp = cc.requestHeaders.find(_.name.toLowerCase() == "x-forwarded-for").map(_.values.mkString(",")).getOrElse("") + val targetIp = cc.requestHeaders.find(_.name.toLowerCase() == "x-forwarded-host").map(_.values.mkString(",")).getOrElse("") APIMetrics.apiMetrics.vend.saveMetric( userId, cc.url, @@ -76,9 +81,13 @@ object WriteMetricUtil extends MdcLoggable { cc.httpCode, cc.correlationId, responseBodyToWrite, - cc.requestHeaders.find(_.name.toLowerCase() == "x-forwarded-for").map(_.values.mkString(",")).getOrElse(""), - cc.requestHeaders.find(_.name.toLowerCase() == "x-forwarded-host").map(_.values.mkString(",")).getOrElse("") + sourceIp, + targetIp, + code.api.Constant.ApiInstanceId ) + publishMetricEvent(userId, cc.url, cc.startTime.getOrElse(null), duration, userName, appName, + developerEmail, consumerId, implementedByPartialFunction, cc.implementedInVersion, cc.verb, + cc.httpCode, cc.correlationId, sourceIp, targetIp, cc.operationId.getOrElse("")) } } case _ => @@ -144,6 +153,9 @@ object WriteMetricUtil extends MdcLoggable { val correlationId = getCorrelationId() val reqHeaders = S.request.openOrThrowException(attemptedToOpenAnEmptyBox).request.headers + val sourceIp = reqHeaders.find(_.name.toLowerCase() == "x-forwarded-for").map(_.values.mkString(",")).getOrElse("") + val targetIp = reqHeaders.find(_.name.toLowerCase() == "x-forwarded-host").map(_.values.mkString(",")).getOrElse("") + //execute saveMetric in future, as we do not need to know result of operation Future { APIMetrics.apiMetrics.vend.saveMetric( @@ -161,12 +173,71 @@ object WriteMetricUtil extends MdcLoggable { None, correlationId, "Not enabled for old style endpoints", - reqHeaders.find(_.name.toLowerCase() == "x-forwarded-for").map(_.values.mkString(",")).getOrElse(""), - reqHeaders.find(_.name.toLowerCase() == "x-forwarded-host").map(_.values.mkString(",")).getOrElse("") + sourceIp, + targetIp, + code.api.Constant.ApiInstanceId ) + publishMetricEvent(userId, url, date, duration, userName, appName, developerEmail, consumerId, + implementedByPartialFunction, implementedInVersion, verb, None, correlationId, sourceIp, targetIp, + rd.map(_.operationId).getOrElse("")) } } } + private val metricFormats = net.liftweb.json.DefaultFormats + + /** + * Publish a metric event to the gRPC pub/sub channel. No-op when the + * stream service is disabled. Safe to call from the metric-write Future + * — exceptions are swallowed so the REST path is never affected. + */ + private def publishMetricEvent(userId: String, + url: String, + date: Date, + duration: Long, + userName: String, + appName: String, + developerEmail: String, + consumerId: String, + implementedByPartialFunction: String, + implementedInVersion: String, + verb: String, + httpCode: Option[Int], + correlationId: String, + sourceIp: String, + targetIp: String, + operationId: String): Unit = { + if (!MetricsEventBus.isEnabled) return + try { + implicit val fmts = metricFormats + // Use Lift's date format (same one REST v6.0.0 uses when serializing + // MetricJsonV600.date) so the stream string matches byte-for-byte. + val dateStr = if (date != null) metricFormats.dateFormat.format(date) else "" + val payload = write(Map( + "url" -> Option(url).getOrElse(""), + "date" -> dateStr, + "duration" -> duration, + "user_id" -> Option(userId).getOrElse(""), + "username" -> Option(userName).getOrElse(""), + "app_name" -> Option(appName).getOrElse(""), + "developer_email" -> Option(developerEmail).getOrElse(""), + "consumer_id" -> Option(consumerId).getOrElse(""), + "implemented_by_partial_function" -> Option(implementedByPartialFunction).getOrElse(""), + "implemented_in_version" -> Option(implementedInVersion).getOrElse(""), + "verb" -> Option(verb).getOrElse(""), + "status_code" -> httpCode.getOrElse(0), + "correlation_id" -> Option(correlationId).getOrElse(""), + "source_ip" -> Option(sourceIp).getOrElse(""), + "target_ip" -> Option(targetIp).getOrElse(""), + "api_instance_id" -> code.api.Constant.ApiInstanceId, + "operation_id" -> Option(operationId).getOrElse("") + )) + MetricsEventBus.publish(payload) + } catch { + case e: Throwable => + logger.warn(s"WriteMetricUtil says: publishMetricEvent failed: ${e.getMessage}") + } + } + } diff --git a/obp-api/src/main/scala/code/api/v6_0_0/JSONFactory6.0.0.scala b/obp-api/src/main/scala/code/api/v6_0_0/JSONFactory6.0.0.scala index e56c797d30..db89ee15d3 100644 --- a/obp-api/src/main/scala/code/api/v6_0_0/JSONFactory6.0.0.scala +++ b/obp-api/src/main/scala/code/api/v6_0_0/JSONFactory6.0.0.scala @@ -445,7 +445,9 @@ case class MetricJsonV600( source_ip: String, target_ip: String, response_body: net.liftweb.json.JValue, - operation_id: String + status_code: Int, + operation_id: String, + api_instance_id: String ) case class MetricsJsonV600(metrics: List[MetricJsonV600]) @@ -1644,7 +1646,9 @@ object JSONFactory600 extends CustomJsonFormats with MdcLoggable { source_ip = metric.getSourceIp(), target_ip = metric.getTargetIp(), response_body = net.liftweb.json.parseOpt(metric.getResponseBody()).getOrElse(net.liftweb.json.JString("Not enabled")), - operation_id = operationId + status_code = metric.getHttpCode(), + operation_id = operationId, + api_instance_id = metric.getApiInstanceId() ) } diff --git a/obp-api/src/main/scala/code/logcache/LogCacheEventBus.scala b/obp-api/src/main/scala/code/logcache/LogCacheEventBus.scala new file mode 100644 index 0000000000..8e5ff1844c --- /dev/null +++ b/obp-api/src/main/scala/code/logcache/LogCacheEventBus.scala @@ -0,0 +1,195 @@ +package code.logcache + +import code.api.cache.{Redis, RedisLogger} +import code.api.util.APIUtil +import code.util.Helper.MdcLoggable +import io.grpc.stub.StreamObserver +import redis.clients.jedis.{Jedis, JedisPubSub, Pipeline} + +import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CopyOnWriteArrayList, TimeUnit} +import java.util.concurrent.atomic.AtomicLong +import scala.collection.JavaConverters._ + +/** + * Redis pub/sub event bus for log cache streaming. + * + * Bridges Redis pub/sub channels (published from `RedisLogger.pushLog`) + * to gRPC streaming clients. + * + * Channel naming: + * - obp_log_cache:trace — TRACE-level entries + * - obp_log_cache:debug — DEBUG-level entries + * - obp_log_cache:info — INFO-level entries + * - obp_log_cache:warning — WARNING-level entries + * - obp_log_cache:error — ERROR-level entries + * - obp_log_cache:all — every entry (aggregate firehose) + * + * Backpressure: each subscribed observer sits behind a bounded queue with + * drop-oldest semantics, plus a dedicated delivery thread. The shared Redis + * subscriber thread only calls `queue.offer` — a slow gRPC client cannot + * stall delivery to other subscribers or to the subscriber thread itself. + */ +object LogCacheEventBus extends MdcLoggable { + + private val CHANNEL_PREFIX = "obp_log_cache:" + private val ENABLED = APIUtil.getPropsAsBoolValue("grpc.log_cache_stream.enabled", true) + private val QUEUE_MAX_ENTRIES = APIUtil.getPropsAsIntValue("grpc.log_cache_stream.observer_queue_max_entries", 1000) + + def isEnabled: Boolean = ENABLED + + // channel suffix → observers subscribed to that channel + private val observers = new ConcurrentHashMap[String, CopyOnWriteArrayList[BufferedObserver]]() + // raw StreamObserver → buffered wrapper, for unsubscribe lookup + private val rawToBuffered = new ConcurrentHashMap[StreamObserver[String], BufferedObserver]() + + @volatile private var subscriberThread: Thread = _ + @volatile private var subscriberJedis: Jedis = _ + @volatile private var pubSub: JedisPubSub = _ + @volatile private var running = false + + // --- Publish (called from RedisLogger.pushLog, within an existing pipeline) --- + + /** + * Publish a log entry to its level channel and to the `all` channel, within + * the caller's Redis pipeline. Cheap: adds two PUBLISH commands to the + * pipeline the caller is already flushing. Skipped entirely when the + * stream service is disabled. + */ + def publishInPipeline(pipeline: Pipeline, level: RedisLogger.LogLevel.LogLevel, payload: String): Unit = { + if (!ENABLED) return + pipeline.publish(CHANNEL_PREFIX + level.toString.toLowerCase, payload) + if (level != RedisLogger.LogLevel.ALL) { + pipeline.publish(CHANNEL_PREFIX + "all", payload) + } + } + + // --- Subscribe/unsubscribe for gRPC StreamObservers --- + + def subscribe(level: RedisLogger.LogLevel.LogLevel, observer: StreamObserver[String]): Unit = { + val key = level.toString.toLowerCase + val buffered = new BufferedObserver(observer, key, QUEUE_MAX_ENTRIES) + rawToBuffered.put(observer, buffered) + observers.computeIfAbsent(key, _ => new CopyOnWriteArrayList[BufferedObserver]()) + observers.get(key).add(buffered) + buffered.start() + logger.info(s"LogCacheEventBus says: Observer subscribed to $key (total: ${observers.get(key).size})") + } + + def unsubscribe(level: RedisLogger.LogLevel.LogLevel, observer: StreamObserver[String]): Unit = { + val key = level.toString.toLowerCase + val buffered = rawToBuffered.remove(observer) + if (buffered != null) { + buffered.stop() + val list = observers.get(key) + if (list != null) { + list.remove(buffered) + logger.info(s"LogCacheEventBus says: Observer unsubscribed from $key (remaining: ${list.size}, dropped during session: ${buffered.droppedCount})") + if (list.isEmpty) observers.remove(key) + } + } + } + + // --- Lifecycle --- + + def start(): Unit = { + if (!ENABLED || running) return + running = true + + pubSub = new JedisPubSub { + override def onPMessage(pattern: String, channel: String, message: String): Unit = { + val key = channel.stripPrefix(CHANNEL_PREFIX) + val list = observers.get(key) + if (list != null) { + list.asScala.foreach(_.offer(message)) + } + } + } + + subscriberThread = new Thread(() => { + try { + subscriberJedis = new Jedis(Redis.url, Redis.port, Redis.timeout) + if (Redis.password != null) subscriberJedis.auth(Redis.password) + logger.info(s"LogCacheEventBus says: Redis subscriber started, pattern-subscribing to ${CHANNEL_PREFIX}*") + subscriberJedis.psubscribe(pubSub, s"${CHANNEL_PREFIX}*") + } catch { + case e: Throwable if running => + logger.error(s"LogCacheEventBus says: Redis subscriber thread died: ${e.getMessage}") + case _: Throwable => // shutting down + } + }, "log-cache-event-bus-subscriber") + subscriberThread.setDaemon(true) + subscriberThread.start() + + logger.info("LogCacheEventBus says: Started") + } + + def stop(): Unit = { + running = false + try { if (pubSub != null) pubSub.punsubscribe() } catch { case _: Throwable => } + try { if (subscriberJedis != null) subscriberJedis.close() } catch { case _: Throwable => } + observers.values.asScala.foreach(_.asScala.foreach(_.stop())) + observers.clear() + rawToBuffered.clear() + logger.info("LogCacheEventBus says: Stopped") + } + + // --- Buffered delivery (per-subscriber backpressure) --- + + /** + * Wraps a raw StreamObserver with a bounded queue and a dedicated delivery + * thread. Redis subscriber thread calls offer(); delivery thread drains. + * On queue-full: drop oldest. On delivery error: stop the thread and let + * the gRPC cancel handler clean up the subscription. + */ + private class BufferedObserver( + inner: StreamObserver[String], + channelKey: String, + queueSize: Int + ) extends MdcLoggable { + private val queue = new ArrayBlockingQueue[String](queueSize) + private val dropped = new AtomicLong(0) + @volatile private var alive = true + private var thread: Thread = _ + + def droppedCount: Long = dropped.get() + + def offer(msg: String): Unit = { + if (!queue.offer(msg)) { + queue.poll() // drop oldest + queue.offer(msg) + val d = dropped.incrementAndGet() + if (d == 1L || d % 1000L == 0L) { + logger.warn(s"LogCacheEventBus says: Dropping messages on $channelKey (slow consumer); dropped so far: $d") + } + } + } + + def start(): Unit = { + thread = new Thread(() => { + try { + while (alive) { + val msg = queue.poll(200L, TimeUnit.MILLISECONDS) + if (msg != null) { + try { + inner.synchronized { inner.onNext(msg) } + } catch { + case e: Throwable => + logger.info(s"LogCacheEventBus says: Delivery to observer on $channelKey failed, stopping delivery thread: ${e.getMessage}") + alive = false + } + } + } + } catch { + case _: InterruptedException => // normal shutdown + } + }, s"log-cache-event-bus-delivery-$channelKey") + thread.setDaemon(true) + thread.start() + } + + def stop(): Unit = { + alive = false + if (thread != null) thread.interrupt() + } + } +} diff --git a/obp-api/src/main/scala/code/metrics/APIMetrics.scala b/obp-api/src/main/scala/code/metrics/APIMetrics.scala index c3999b2181..392cd23eb9 100644 --- a/obp-api/src/main/scala/code/metrics/APIMetrics.scala +++ b/obp-api/src/main/scala/code/metrics/APIMetrics.scala @@ -54,8 +54,9 @@ trait APIMetrics { correlationId: String, responseBody: String, sourceIp: String, - targetIp: String): Unit - + targetIp: String, + apiInstanceId: String): Unit + def saveMetricsArchive(primaryKey: Long, userId: String, url: String, @@ -72,7 +73,8 @@ trait APIMetrics { correlationId: String, responseBody: String, sourceIp: String, - targetIp: String + targetIp: String, + apiInstanceId: String ): Unit // //TODO: ordering of list? should this be by date? currently not enforced @@ -121,6 +123,7 @@ trait APIMetric { def getResponseBody(): String def getSourceIp(): String def getTargetIp(): String + def getApiInstanceId(): String } diff --git a/obp-api/src/main/scala/code/metrics/ElasticsearchMetrics.scala b/obp-api/src/main/scala/code/metrics/ElasticsearchMetrics.scala index 9d46dd15e8..6e7b4a0d87 100644 --- a/obp-api/src/main/scala/code/metrics/ElasticsearchMetrics.scala +++ b/obp-api/src/main/scala/code/metrics/ElasticsearchMetrics.scala @@ -14,16 +14,17 @@ object ElasticsearchMetrics extends APIMetrics { lazy val es = new elasticsearchMetrics override def saveMetric(userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, consumerId: String, implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String, - responseBody: String, sourceIp: String, targetIp: String): Unit = { + responseBody: String, sourceIp: String, targetIp: String, apiInstanceId: String): Unit = { if (APIUtil.getPropsAsBoolValue("allow_elasticsearch", false) && APIUtil.getPropsAsBoolValue("allow_elasticsearch_metrics", false) ) { //TODO ,need to be fixed now add more parameters - es.indexMetric(userId, url, date, duration, userName, appName, developerEmail, correlationId) + es.indexMetric(userId, url, date, duration, userName, appName, developerEmail, correlationId, apiInstanceId) } } override def saveMetricsArchive(primaryKey: Long, userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, consumerId: String, implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String, responseBody: String, sourceIp: String, - targetIp: String): Unit = ??? + targetIp: String, + apiInstanceId: String): Unit = ??? // override def getAllGroupedByUserId(): Map[String, List[APIMetric]] = { // //TODO: replace the following with valid ES query diff --git a/obp-api/src/main/scala/code/metrics/MappedMetrics.scala b/obp-api/src/main/scala/code/metrics/MappedMetrics.scala index a48e448eb1..3c78d70809 100644 --- a/obp-api/src/main/scala/code/metrics/MappedMetrics.scala +++ b/obp-api/src/main/scala/code/metrics/MappedMetrics.scala @@ -113,7 +113,7 @@ object MappedMetrics extends APIMetrics with MdcLoggable{ } override def saveMetric(userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, consumerId: String, implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String, - responseBody: String, sourceIp: String, targetIp: String): Unit = { + responseBody: String, sourceIp: String, targetIp: String, apiInstanceId: String): Unit = { MetricBatchWriter.enqueue( MetricBatchWriter.MetricRow( userId = userId, @@ -131,7 +131,8 @@ object MappedMetrics extends APIMetrics with MdcLoggable{ correlationId = correlationId, responseBody = responseBody, sourceIp = sourceIp, - targetIp = targetIp + targetIp = targetIp, + apiInstanceId = apiInstanceId ) ) } @@ -140,7 +141,8 @@ object MappedMetrics extends APIMetrics with MdcLoggable{ appName: String, developerEmail: String, consumerId: String, implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String, - responseBody: String, sourceIp: String, targetIp: String): Unit = { + responseBody: String, sourceIp: String, targetIp: String, + apiInstanceId: String): Unit = { val metric = MetricArchive.find(By(MetricArchive.id, primaryKey)).getOrElse(MetricArchive.create) metric .metricId(primaryKey) @@ -159,6 +161,7 @@ object MappedMetrics extends APIMetrics with MdcLoggable{ .responseBody(responseBody) .sourceIp(sourceIp) .targetIp(targetIp) + .apiInstanceId(apiInstanceId) httpCode match { case Some(code) => metric.httpCode(code) @@ -640,6 +643,7 @@ class MappedMetric extends APIMetric with LongKeyedMapper[MappedMetric] with IdP object responseBody extends MappedText(this) object sourceIp extends MappedString(this, 64) object targetIp extends MappedString(this, 64) + object apiInstanceId extends MappedString(this, 255) override def getMetricId(): Long = id.get override def getUrl(): String = url.get @@ -658,6 +662,7 @@ class MappedMetric extends APIMetric with LongKeyedMapper[MappedMetric] with IdP override def getResponseBody(): String = responseBody.get override def getSourceIp(): String = sourceIp.get override def getTargetIp(): String = targetIp.get + override def getApiInstanceId(): String = apiInstanceId.get } object MappedMetric extends MappedMetric with LongKeyedMetaMapper[MappedMetric] { @@ -698,6 +703,7 @@ class MetricArchive extends APIMetric with LongKeyedMapper[MetricArchive] with I object responseBody extends MappedText(this) object sourceIp extends MappedString(this, 64) object targetIp extends MappedString(this, 64) + object apiInstanceId extends MappedString(this, 255) override def getMetricId(): Long = metricId.get @@ -717,6 +723,7 @@ class MetricArchive extends APIMetric with LongKeyedMapper[MetricArchive] with I override def getResponseBody(): String = responseBody.get override def getSourceIp(): String = sourceIp.get override def getTargetIp(): String = targetIp.get + override def getApiInstanceId(): String = apiInstanceId.get } object MetricArchive extends MetricArchive with LongKeyedMetaMapper[MetricArchive] { override def dbIndexes = diff --git a/obp-api/src/main/scala/code/metrics/MetricBatchWriter.scala b/obp-api/src/main/scala/code/metrics/MetricBatchWriter.scala index 66e7ad5a20..ff13b02d4b 100644 --- a/obp-api/src/main/scala/code/metrics/MetricBatchWriter.scala +++ b/obp-api/src/main/scala/code/metrics/MetricBatchWriter.scala @@ -39,7 +39,8 @@ object MetricBatchWriter extends MdcLoggable { correlationId: String, responseBody: String, sourceIp: String, - targetIp: String + targetIp: String, + apiInstanceId: String ) private val queue = new ConcurrentLinkedQueue[MetricRow]() @@ -101,8 +102,8 @@ object MetricBatchWriter extends MdcLoggable { userid, url, date_c, duration, username, appname, developeremail, consumerid, implementedbypartialfunction, implementedinversion, verb, httpcode, correlationid, - responsebody, sourceip, targetip - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + responsebody, sourceip, targetip, apiinstanceid + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ // Use Option[String] so Doobie handles nullable fields via Put[Option[String]] @@ -111,7 +112,7 @@ object MetricBatchWriter extends MdcLoggable { (Option[String], Option[String], Timestamp, Long, Option[String], Option[String], Option[String], Option[String], Option[String], Option[String], Option[String], Int, Option[String], - Option[String], Option[String], Option[String]) + Option[String], Option[String], Option[String], Option[String]) ](insertSql) val values = rows.map { r => @@ -120,7 +121,7 @@ object MetricBatchWriter extends MdcLoggable { r.duration, Option(r.userName), Option(r.appName), Option(r.developerEmail), Option(r.consumerId), Option(r.implementedByPartialFunction), Option(r.implementedInVersion), Option(r.verb), r.httpCode, Option(r.correlationId), - Option(r.responseBody), Option(r.sourceIp), Option(r.targetIp) + Option(r.responseBody), Option(r.sourceIp), Option(r.targetIp), Option(r.apiInstanceId) ) } diff --git a/obp-api/src/main/scala/code/metricsstream/MetricsEventBus.scala b/obp-api/src/main/scala/code/metricsstream/MetricsEventBus.scala new file mode 100644 index 0000000000..42255ccd32 --- /dev/null +++ b/obp-api/src/main/scala/code/metricsstream/MetricsEventBus.scala @@ -0,0 +1,183 @@ +package code.metricsstream + +import code.api.cache.Redis +import code.api.util.APIUtil +import code.util.Helper.MdcLoggable +import io.grpc.stub.StreamObserver +import redis.clients.jedis.{Jedis, JedisPubSub} + +import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CopyOnWriteArrayList, TimeUnit} +import java.util.concurrent.atomic.AtomicLong +import scala.collection.JavaConverters._ + +/** + * Redis pub/sub event bus for metrics streaming. + * + * Bridges the Redis pub/sub channel (published from + * `WriteMetricUtil.writeEndpointMetric` after each REST call's metric is + * saved) to gRPC streaming clients. + * + * Channel naming: + * - obp_metrics:all — every metric event (single channel; server-side + * filtering happens in `MetricsStreamServiceImpl`). + * + * Why single-channel: unlike log cache which has a natural low-cardinality + * dimension (level), metrics filters are high-cardinality (consumer_id, + * user_id, url). Pre-computing a Redis channel per filter combination + * would explode. Broadcast + filter in the service is simpler and still + * avoids shipping unwanted events over gRPC. + * + * Backpressure: each subscribed observer sits behind a bounded queue with + * drop-oldest semantics, plus a dedicated delivery thread. The shared Redis + * subscriber thread only calls `queue.offer` — a slow gRPC client cannot + * stall delivery to other subscribers or to the subscriber thread itself. + */ +object MetricsEventBus extends MdcLoggable { + + private val CHANNEL_PREFIX = "obp_metrics:" + private val ALL_CHANNEL = CHANNEL_PREFIX + "all" + private val ENABLED = APIUtil.getPropsAsBoolValue("grpc.metrics_stream.enabled", true) + private val QUEUE_MAX_ENTRIES = APIUtil.getPropsAsIntValue("grpc.metrics_stream.observer_queue_max_entries", 1000) + + def isEnabled: Boolean = ENABLED + + private val observers = new CopyOnWriteArrayList[BufferedObserver]() + private val rawToBuffered = new ConcurrentHashMap[StreamObserver[String], BufferedObserver]() + + @volatile private var subscriberThread: Thread = _ + @volatile private var subscriberJedis: Jedis = _ + @volatile private var pubSub: JedisPubSub = _ + @volatile private var running = false + + // --- Publish (called from WriteMetricUtil after saveMetric) --- + + /** + * Borrow a connection from the pool, publish, release. No-op when disabled. + * Fire-and-forget semantics: callers should not rely on delivery. + */ + def publish(payload: String): Unit = { + if (!ENABLED) return + var jedis: Jedis = null + try { + jedis = Redis.jedisPool.getResource + jedis.publish(ALL_CHANNEL, payload) + } catch { + case e: Throwable => + logger.warn(s"MetricsEventBus says: Failed to publish metric: ${e.getMessage}") + } finally { + if (jedis != null) jedis.close() + } + } + + // --- Subscribe/unsubscribe for gRPC StreamObservers --- + + def subscribe(observer: StreamObserver[String]): Unit = { + val buffered = new BufferedObserver(observer, QUEUE_MAX_ENTRIES) + rawToBuffered.put(observer, buffered) + observers.add(buffered) + buffered.start() + logger.info(s"MetricsEventBus says: Observer subscribed (total: ${observers.size})") + } + + def unsubscribe(observer: StreamObserver[String]): Unit = { + val buffered = rawToBuffered.remove(observer) + if (buffered != null) { + buffered.stop() + observers.remove(buffered) + logger.info(s"MetricsEventBus says: Observer unsubscribed (remaining: ${observers.size}, dropped during session: ${buffered.droppedCount})") + } + } + + // --- Lifecycle --- + + def start(): Unit = { + if (!ENABLED || running) return + running = true + + pubSub = new JedisPubSub { + override def onMessage(channel: String, message: String): Unit = { + observers.asScala.foreach(_.offer(message)) + } + } + + subscriberThread = new Thread(() => { + try { + subscriberJedis = new Jedis(Redis.url, Redis.port, Redis.timeout) + if (Redis.password != null) subscriberJedis.auth(Redis.password) + logger.info(s"MetricsEventBus says: Redis subscriber started, subscribing to $ALL_CHANNEL") + subscriberJedis.subscribe(pubSub, ALL_CHANNEL) + } catch { + case e: Throwable if running => + logger.error(s"MetricsEventBus says: Redis subscriber thread died: ${e.getMessage}") + case _: Throwable => // shutting down + } + }, "metrics-event-bus-subscriber") + subscriberThread.setDaemon(true) + subscriberThread.start() + + logger.info("MetricsEventBus says: Started") + } + + def stop(): Unit = { + running = false + try { if (pubSub != null) pubSub.unsubscribe() } catch { case _: Throwable => } + try { if (subscriberJedis != null) subscriberJedis.close() } catch { case _: Throwable => } + observers.asScala.foreach(_.stop()) + observers.clear() + rawToBuffered.clear() + logger.info("MetricsEventBus says: Stopped") + } + + // --- Buffered delivery (per-subscriber backpressure) --- + + private class BufferedObserver( + inner: StreamObserver[String], + queueSize: Int + ) extends MdcLoggable { + private val queue = new ArrayBlockingQueue[String](queueSize) + private val dropped = new AtomicLong(0) + @volatile private var alive = true + private var thread: Thread = _ + + def droppedCount: Long = dropped.get() + + def offer(msg: String): Unit = { + if (!queue.offer(msg)) { + queue.poll() // drop oldest + queue.offer(msg) + val d = dropped.incrementAndGet() + if (d == 1L || d % 1000L == 0L) { + logger.warn(s"MetricsEventBus says: Dropping messages (slow consumer); dropped so far: $d") + } + } + } + + def start(): Unit = { + thread = new Thread(() => { + try { + while (alive) { + val msg = queue.poll(200L, TimeUnit.MILLISECONDS) + if (msg != null) { + try { + inner.synchronized { inner.onNext(msg) } + } catch { + case e: Throwable => + logger.info(s"MetricsEventBus says: Delivery to observer failed, stopping delivery thread: ${e.getMessage}") + alive = false + } + } + } + } catch { + case _: InterruptedException => // normal shutdown + } + }, "metrics-event-bus-delivery") + thread.setDaemon(true) + thread.start() + } + + def stop(): Unit = { + alive = false + if (thread != null) thread.interrupt() + } + } +} diff --git a/obp-api/src/main/scala/code/obp/grpc/Client.scala b/obp-api/src/main/scala/code/obp/grpc/Client.scala index 08fdecbd31..d51ba17eb7 100644 --- a/obp-api/src/main/scala/code/obp/grpc/Client.scala +++ b/obp-api/src/main/scala/code/obp/grpc/Client.scala @@ -16,13 +16,16 @@ object Client extends App { private val banks: BanksJson400Grpc = obpService.getBanks(Empty.defaultInstance) println(banks) - // get accounts according bankId and userId - private val bankIdUserIdGrpc = BankIdUserIdGrpc("dmo.07.de.de", "0986f84c-78ce-4ce9-a3b7-fa2451acd882") - private val accounts: AccountsGrpc = obpService.getPrivateAccountsAtOneBank(bankIdUserIdGrpc) - println(accounts) - - //get accounts by bankId, accountId and userId - private val bankIdAccountIdAndUserId = BankIdAccountIdAndUserIdGrpc("psd201-bank-y--uk", "my_account_id", "4850d4c3-220a-4a72-9d3c-eeeacaf4b63b") - private val transactionsJsonV300Grpc: CoreTransactionsJsonV300Grpc = obpService.getCoreTransactionsForBankAccount(bankIdAccountIdAndUserId) - println(transactionsJsonV300Grpc) + // Temporarily disabled — see api.proto, ApiProto.scala javaDescriptor filter, + // ObpServiceGrpc.scala, and ObpGrpcServer.scala for the matching changes. + // + //// get accounts according bankId and userId + //private val bankIdUserIdGrpc = BankIdUserIdGrpc("dmo.07.de.de", "0986f84c-78ce-4ce9-a3b7-fa2451acd882") + //private val accounts: AccountsGrpc = obpService.getPrivateAccountsAtOneBank(bankIdUserIdGrpc) + //println(accounts) + // + ////get accounts by bankId, accountId and userId + //private val bankIdAccountIdAndUserId = BankIdAccountIdAndUserIdGrpc("psd201-bank-y--uk", "my_account_id", "4850d4c3-220a-4a72-9d3c-eeeacaf4b63b") + //private val transactionsJsonV300Grpc: CoreTransactionsJsonV300Grpc = obpService.getCoreTransactionsForBankAccount(bankIdAccountIdAndUserId) + //println(transactionsJsonV300Grpc) } diff --git a/obp-api/src/main/scala/code/obp/grpc/ObpGrpcServer.scala b/obp-api/src/main/scala/code/obp/grpc/ObpGrpcServer.scala index 04ff8875a8..57ccc433d9 100644 --- a/obp-api/src/main/scala/code/obp/grpc/ObpGrpcServer.scala +++ b/obp-api/src/main/scala/code/obp/grpc/ObpGrpcServer.scala @@ -44,12 +44,30 @@ class ObpGrpcServer(executionContext: ExecutionContext) extends MdcLoggable { se // Start chat event bus for Redis pub/sub streaming code.chat.ChatEventBus.start() - val serverBuilder = ServerBuilder.forPort(ObpGrpcServer.port) + // Start log cache event bus (no-op if grpc.log_cache_stream.enabled=false) + code.logcache.LogCacheEventBus.start() + + // Start metrics event bus (no-op if grpc.metrics_stream.enabled=false) + code.metricsstream.MetricsEventBus.start() + + val baseBuilder = ServerBuilder.forPort(ObpGrpcServer.port) .addService(ObpServiceGrpc.bindService(ObpServiceImpl, executionContext)) .addService(code.obp.grpc.chat.api.ChatStreamServiceGrpc.bindService( code.obp.grpc.chat.ChatStreamServiceImpl, executionContext)) .addService(io.grpc.protobuf.services.ProtoReflectionService.newInstance()) .intercept(new code.obp.grpc.chat.AuthInterceptor()) + + val withLogCache = + if (code.logcache.LogCacheEventBus.isEnabled) + baseBuilder.addService(code.obp.grpc.logcache.api.LogCacheStreamServiceGrpc.bindService( + code.obp.grpc.logcache.LogCacheStreamServiceImpl, executionContext)) + else baseBuilder + + val serverBuilder = + (if (code.metricsstream.MetricsEventBus.isEnabled) + withLogCache.addService(code.obp.grpc.metricsstream.api.MetricsStreamServiceGrpc.bindService( + code.obp.grpc.metricsstream.MetricsStreamServiceImpl, executionContext)) + else withLogCache) .asInstanceOf[ServerBuilder[_]] server = serverBuilder.build.start; logger.info("Server started, listening on " + ObpGrpcServer.port) @@ -62,6 +80,8 @@ class ObpGrpcServer(executionContext: ExecutionContext) extends MdcLoggable { se def stop(): Unit = { code.chat.ChatEventBus.stop() + code.logcache.LogCacheEventBus.stop() + code.metricsstream.MetricsEventBus.stop() if (server != null) { server.shutdown() server = null @@ -93,59 +113,56 @@ class ObpGrpcServer(executionContext: ExecutionContext) extends MdcLoggable { se }) } - override def getPrivateAccountsAtOneBank(request: BankIdUserIdGrpc): Future[AccountsGrpc] = { - implicit val toBankExtended = code.model.toBankExtended(_) - val callContext: Option[CallContext] = Some(CallContext()) - val bankId = BankId(request.bankId) - val userId = request.userId - - for { - (bank, _) <- NewStyle.function.getBank(bankId, callContext) - (user, _) <- NewStyle.function.findByUserId(userId, callContext) - } yield { - val (privateViewsUserCanAccessAtOneBank, privateAccountAccess) = Views.views.vend.privateViewsUserCanAccessAtBank(user, bankId) - val availablePrivateAccounts = bank.privateAccounts(privateAccountAccess) - val jValue = OBPAPI4_0_0.Implementations2_0_0.processAccounts(privateViewsUserCanAccessAtOneBank, availablePrivateAccounts) - val jArray = JArray( - jValue.asInstanceOf[JArray].arr.map(it => { - val bankIdJObject: JObject = "bankId" -> (it \ "bank_id") - it merge bankIdJObject - }) - ) - val jObject = JObject(List(JField("accounts", jArray))) - val accountsGrpc = jObject.extract[AccountsGrpc] - accountsGrpc - } - } - - override def getBankAccountsBalances(request: BankIdGrpc): Future[AccountsBalancesV310JsonGrpc] = Future { -// val callContext: Option[CallContext] = Some(CallContext()) -// val bankId = BankId(request.value) -// val bankIdAccountIds: List[BankIdAccountId] = Nil -// for { -// (accountsBalances, callContext)<- NewStyle.function.getBankAccountsBalances(bankIdAccountIds, callContext) -// } - ??? - } - - override def getCoreTransactionsForBankAccount(request: BankIdAccountIdAndUserIdGrpc): Future[CoreTransactionsJsonV300Grpc] = { - implicit val toViewExtended = code.model.toViewExtended(_) - implicit val toBankAccountExtended = code.model.toBankAccountExtended(_) - val callContext: Option[CallContext] = Some(CallContext()) - val bankId = BankId(request.bankId) - val accountId = AccountId(request.accountId) - for { - (user, _) <- NewStyle.function.findByUserId(request.userId, callContext) - (bankAccount, callContext) <- NewStyle.function.checkBankAccountExists(bankId, accountId, callContext) - (bank, callContext) <- NewStyle.function.getBank(bankId, callContext) - view <- ViewNewStyle.checkOwnerViewAccessAndReturnOwnerView(user, BankIdAccountId(bankAccount.bankId, bankAccount.accountId), callContext) - (Full(transactionsCore), callContext) <- bankAccount.getModeratedTransactionsCore(bank, Full(user), view, BankIdAccountId(bankId, accountId), Nil, callContext) - obpCoreTransactions: CoreTransactionsJsonV300 = code.api.v3_0_0.JSONFactory300.createCoreTransactionsJSON(transactionsCore.map(ModeratedTransactionCoreWithAttributes(_))) - } yield { - val jValue = Extraction.decompose(obpCoreTransactions) - val coreTransactionsJsonV300Grpc = jValue.extract[CoreTransactionsJsonV300Grpc] - coreTransactionsJsonV300Grpc - } - } + // Temporarily disabled — see api.proto, ApiProto.scala javaDescriptor filter, + // and ObpServiceGrpc.scala for the matching changes. + // + //override def getPrivateAccountsAtOneBank(request: BankIdUserIdGrpc): Future[AccountsGrpc] = { + // implicit val toBankExtended = code.model.toBankExtended(_) + // val callContext: Option[CallContext] = Some(CallContext()) + // val bankId = BankId(request.bankId) + // val userId = request.userId + // + // for { + // (bank, _) <- NewStyle.function.getBank(bankId, callContext) + // (user, _) <- NewStyle.function.findByUserId(userId, callContext) + // } yield { + // val (privateViewsUserCanAccessAtOneBank, privateAccountAccess) = Views.views.vend.privateViewsUserCanAccessAtBank(user, bankId) + // val availablePrivateAccounts = bank.privateAccounts(privateAccountAccess) + // val jValue = OBPAPI4_0_0.Implementations2_0_0.processAccounts(privateViewsUserCanAccessAtOneBank, availablePrivateAccounts) + // val jArray = JArray( + // jValue.asInstanceOf[JArray].arr.map(it => { + // val bankIdJObject: JObject = "bankId" -> (it \ "bank_id") + // it merge bankIdJObject + // }) + // ) + // val jObject = JObject(List(JField("accounts", jArray))) + // val accountsGrpc = jObject.extract[AccountsGrpc] + // accountsGrpc + // } + //} + // + //override def getBankAccountsBalances(request: BankIdGrpc): Future[AccountsBalancesV310JsonGrpc] = Future { + // ??? + //} + // + //override def getCoreTransactionsForBankAccount(request: BankIdAccountIdAndUserIdGrpc): Future[CoreTransactionsJsonV300Grpc] = { + // implicit val toViewExtended = code.model.toViewExtended(_) + // implicit val toBankAccountExtended = code.model.toBankAccountExtended(_) + // val callContext: Option[CallContext] = Some(CallContext()) + // val bankId = BankId(request.bankId) + // val accountId = AccountId(request.accountId) + // for { + // (user, _) <- NewStyle.function.findByUserId(request.userId, callContext) + // (bankAccount, callContext) <- NewStyle.function.checkBankAccountExists(bankId, accountId, callContext) + // (bank, callContext) <- NewStyle.function.getBank(bankId, callContext) + // view <- ViewNewStyle.checkOwnerViewAccessAndReturnOwnerView(user, BankIdAccountId(bankAccount.bankId, bankAccount.accountId), callContext) + // (Full(transactionsCore), callContext) <- bankAccount.getModeratedTransactionsCore(bank, Full(user), view, BankIdAccountId(bankId, accountId), Nil, callContext) + // obpCoreTransactions: CoreTransactionsJsonV300 = code.api.v3_0_0.JSONFactory300.createCoreTransactionsJSON(transactionsCore.map(ModeratedTransactionCoreWithAttributes(_))) + // } yield { + // val jValue = Extraction.decompose(obpCoreTransactions) + // val coreTransactionsJsonV300Grpc = jValue.extract[CoreTransactionsJsonV300Grpc] + // coreTransactionsJsonV300Grpc + // } + //} } } diff --git a/obp-api/src/main/scala/code/obp/grpc/api/ApiProto.scala b/obp-api/src/main/scala/code/obp/grpc/api/ApiProto.scala index a041c9a982..10730dae58 100644 --- a/obp-api/src/main/scala/code/obp/grpc/api/ApiProto.scala +++ b/obp-api/src/main/scala/code/obp/grpc/api/ApiProto.scala @@ -142,8 +142,19 @@ object ApiProto extends _root_.scalapb.GeneratedFileObject { _root_.scalapb.descriptors.FileDescriptor.buildFrom(scalaProto, dependencies.map(_.scalaDescriptor)) } lazy val javaDescriptor: com.google.protobuf.Descriptors.FileDescriptor = { + import scala.collection.JavaConverters._ val javaProto = com.google.protobuf.DescriptorProtos.FileDescriptorProto.parseFrom(ProtoBytes) - com.google.protobuf.Descriptors.FileDescriptor.buildFrom(javaProto, Array( + // Filter ObpService to expose only getBanks. The other methods + // (getPrivateAccountsAtOneBank, getBankAccountsBalances, + // getCoreTransactionsForBankAccount) are temporarily disabled — see + // api.proto and ObpServiceGrpc.scala for the matching changes. + val enabledMethods = Set("getBanks") + val filteredServices = javaProto.getServiceList.asScala.map { svc => + val kept = svc.getMethodList.asScala.filter(m => enabledMethods.contains(m.getName)) + svc.toBuilder.clearMethod().addAllMethod(kept.asJava).build() + } + val filteredProto = javaProto.toBuilder.clearService().addAllService(filteredServices.asJava).build() + com.google.protobuf.Descriptors.FileDescriptor.buildFrom(filteredProto, Array( com.google.protobuf.empty.EmptyProto.javaDescriptor, com.google.protobuf.timestamp.TimestampProto.javaDescriptor )) diff --git a/obp-api/src/main/scala/code/obp/grpc/api/ObpServiceGrpc.scala b/obp-api/src/main/scala/code/obp/grpc/api/ObpServiceGrpc.scala index 6fa498dbf8..46bc8d4bba 100644 --- a/obp-api/src/main/scala/code/obp/grpc/api/ObpServiceGrpc.scala +++ b/obp-api/src/main/scala/code/obp/grpc/api/ObpServiceGrpc.scala @@ -10,48 +10,50 @@ object ObpServiceGrpc { .setResponseMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.BanksJson400Grpc)) .build() - val METHOD_GET_PRIVATE_ACCOUNTS_AT_ONE_BANK: _root_.io.grpc.MethodDescriptor[code.obp.grpc.api.BankIdUserIdGrpc, code.obp.grpc.api.AccountsGrpc] = - _root_.io.grpc.MethodDescriptor.newBuilder() - .setType(_root_.io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(_root_.io.grpc.MethodDescriptor.generateFullMethodName("code.obp.grpc.ObpService", "getPrivateAccountsAtOneBank")) - .setSampledToLocalTracing(true) - .setRequestMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.BankIdUserIdGrpc)) - .setResponseMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.AccountsGrpc)) - .build() - - val METHOD_GET_BANK_ACCOUNTS_BALANCES: _root_.io.grpc.MethodDescriptor[code.obp.grpc.api.BankIdGrpc, code.obp.grpc.api.AccountsBalancesV310JsonGrpc] = - _root_.io.grpc.MethodDescriptor.newBuilder() - .setType(_root_.io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(_root_.io.grpc.MethodDescriptor.generateFullMethodName("code.obp.grpc.ObpService", "getBankAccountsBalances")) - .setSampledToLocalTracing(true) - .setRequestMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.BankIdGrpc)) - .setResponseMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.AccountsBalancesV310JsonGrpc)) - .build() - - val METHOD_GET_CORE_TRANSACTIONS_FOR_BANK_ACCOUNT: _root_.io.grpc.MethodDescriptor[code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc, code.obp.grpc.api.CoreTransactionsJsonV300Grpc] = - _root_.io.grpc.MethodDescriptor.newBuilder() - .setType(_root_.io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(_root_.io.grpc.MethodDescriptor.generateFullMethodName("code.obp.grpc.ObpService", "getCoreTransactionsForBankAccount")) - .setSampledToLocalTracing(true) - .setRequestMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc)) - .setResponseMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.CoreTransactionsJsonV300Grpc)) - .build() - + // Temporarily disabled — see api.proto and ApiProto.scala javaDescriptor filter. + // + //val METHOD_GET_PRIVATE_ACCOUNTS_AT_ONE_BANK: _root_.io.grpc.MethodDescriptor[code.obp.grpc.api.BankIdUserIdGrpc, code.obp.grpc.api.AccountsGrpc] = + // _root_.io.grpc.MethodDescriptor.newBuilder() + // .setType(_root_.io.grpc.MethodDescriptor.MethodType.UNARY) + // .setFullMethodName(_root_.io.grpc.MethodDescriptor.generateFullMethodName("code.obp.grpc.ObpService", "getPrivateAccountsAtOneBank")) + // .setSampledToLocalTracing(true) + // .setRequestMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.BankIdUserIdGrpc)) + // .setResponseMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.AccountsGrpc)) + // .build() + // + //val METHOD_GET_BANK_ACCOUNTS_BALANCES: _root_.io.grpc.MethodDescriptor[code.obp.grpc.api.BankIdGrpc, code.obp.grpc.api.AccountsBalancesV310JsonGrpc] = + // _root_.io.grpc.MethodDescriptor.newBuilder() + // .setType(_root_.io.grpc.MethodDescriptor.MethodType.UNARY) + // .setFullMethodName(_root_.io.grpc.MethodDescriptor.generateFullMethodName("code.obp.grpc.ObpService", "getBankAccountsBalances")) + // .setSampledToLocalTracing(true) + // .setRequestMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.BankIdGrpc)) + // .setResponseMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.AccountsBalancesV310JsonGrpc)) + // .build() + // + //val METHOD_GET_CORE_TRANSACTIONS_FOR_BANK_ACCOUNT: _root_.io.grpc.MethodDescriptor[code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc, code.obp.grpc.api.CoreTransactionsJsonV300Grpc] = + // _root_.io.grpc.MethodDescriptor.newBuilder() + // .setType(_root_.io.grpc.MethodDescriptor.MethodType.UNARY) + // .setFullMethodName(_root_.io.grpc.MethodDescriptor.generateFullMethodName("code.obp.grpc.ObpService", "getCoreTransactionsForBankAccount")) + // .setSampledToLocalTracing(true) + // .setRequestMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc)) + // .setResponseMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.api.CoreTransactionsJsonV300Grpc)) + // .build() + val SERVICE: _root_.io.grpc.ServiceDescriptor = _root_.io.grpc.ServiceDescriptor.newBuilder("code.obp.grpc.ObpService") .setSchemaDescriptor(new _root_.scalapb.grpc.ConcreteProtoFileDescriptorSupplier(code.obp.grpc.api.ApiProto.javaDescriptor)) .addMethod(METHOD_GET_BANKS) - .addMethod(METHOD_GET_PRIVATE_ACCOUNTS_AT_ONE_BANK) - .addMethod(METHOD_GET_BANK_ACCOUNTS_BALANCES) - .addMethod(METHOD_GET_CORE_TRANSACTIONS_FOR_BANK_ACCOUNT) + //.addMethod(METHOD_GET_PRIVATE_ACCOUNTS_AT_ONE_BANK) + //.addMethod(METHOD_GET_BANK_ACCOUNTS_BALANCES) + //.addMethod(METHOD_GET_CORE_TRANSACTIONS_FOR_BANK_ACCOUNT) .build() trait ObpService extends _root_.scalapb.grpc.AbstractService { override def serviceCompanion = ObpService def getBanks(request: com.google.protobuf.empty.Empty): scala.concurrent.Future[code.obp.grpc.api.BanksJson400Grpc] - def getPrivateAccountsAtOneBank(request: code.obp.grpc.api.BankIdUserIdGrpc): scala.concurrent.Future[code.obp.grpc.api.AccountsGrpc] - def getBankAccountsBalances(request: code.obp.grpc.api.BankIdGrpc): scala.concurrent.Future[code.obp.grpc.api.AccountsBalancesV310JsonGrpc] - def getCoreTransactionsForBankAccount(request: code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc): scala.concurrent.Future[code.obp.grpc.api.CoreTransactionsJsonV300Grpc] + //def getPrivateAccountsAtOneBank(request: code.obp.grpc.api.BankIdUserIdGrpc): scala.concurrent.Future[code.obp.grpc.api.AccountsGrpc] + //def getBankAccountsBalances(request: code.obp.grpc.api.BankIdGrpc): scala.concurrent.Future[code.obp.grpc.api.AccountsBalancesV310JsonGrpc] + //def getCoreTransactionsForBankAccount(request: code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc): scala.concurrent.Future[code.obp.grpc.api.CoreTransactionsJsonV300Grpc] } object ObpService extends _root_.scalapb.grpc.ServiceCompanion[ObpService] { @@ -62,28 +64,28 @@ object ObpServiceGrpc { trait ObpServiceBlockingClient { def serviceCompanion = ObpService def getBanks(request: com.google.protobuf.empty.Empty): code.obp.grpc.api.BanksJson400Grpc - def getPrivateAccountsAtOneBank(request: code.obp.grpc.api.BankIdUserIdGrpc): code.obp.grpc.api.AccountsGrpc - def getBankAccountsBalances(request: code.obp.grpc.api.BankIdGrpc): code.obp.grpc.api.AccountsBalancesV310JsonGrpc - def getCoreTransactionsForBankAccount(request: code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc): code.obp.grpc.api.CoreTransactionsJsonV300Grpc + //def getPrivateAccountsAtOneBank(request: code.obp.grpc.api.BankIdUserIdGrpc): code.obp.grpc.api.AccountsGrpc + //def getBankAccountsBalances(request: code.obp.grpc.api.BankIdGrpc): code.obp.grpc.api.AccountsBalancesV310JsonGrpc + //def getCoreTransactionsForBankAccount(request: code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc): code.obp.grpc.api.CoreTransactionsJsonV300Grpc } class ObpServiceBlockingStub(channel: _root_.io.grpc.Channel, options: _root_.io.grpc.CallOptions = _root_.io.grpc.CallOptions.DEFAULT) extends _root_.io.grpc.stub.AbstractStub[ObpServiceBlockingStub](channel, options) with ObpServiceBlockingClient { override def getBanks(request: com.google.protobuf.empty.Empty): code.obp.grpc.api.BanksJson400Grpc = { _root_.io.grpc.stub.ClientCalls.blockingUnaryCall(channel.newCall(METHOD_GET_BANKS, options), request) } - - override def getPrivateAccountsAtOneBank(request: code.obp.grpc.api.BankIdUserIdGrpc): code.obp.grpc.api.AccountsGrpc = { - _root_.io.grpc.stub.ClientCalls.blockingUnaryCall(channel.newCall(METHOD_GET_PRIVATE_ACCOUNTS_AT_ONE_BANK, options), request) - } - - override def getBankAccountsBalances(request: code.obp.grpc.api.BankIdGrpc): code.obp.grpc.api.AccountsBalancesV310JsonGrpc = { - _root_.io.grpc.stub.ClientCalls.blockingUnaryCall(channel.newCall(METHOD_GET_BANK_ACCOUNTS_BALANCES, options), request) - } - - override def getCoreTransactionsForBankAccount(request: code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc): code.obp.grpc.api.CoreTransactionsJsonV300Grpc = { - _root_.io.grpc.stub.ClientCalls.blockingUnaryCall(channel.newCall(METHOD_GET_CORE_TRANSACTIONS_FOR_BANK_ACCOUNT, options), request) - } - + + //override def getPrivateAccountsAtOneBank(request: code.obp.grpc.api.BankIdUserIdGrpc): code.obp.grpc.api.AccountsGrpc = { + // _root_.io.grpc.stub.ClientCalls.blockingUnaryCall(channel.newCall(METHOD_GET_PRIVATE_ACCOUNTS_AT_ONE_BANK, options), request) + //} + // + //override def getBankAccountsBalances(request: code.obp.grpc.api.BankIdGrpc): code.obp.grpc.api.AccountsBalancesV310JsonGrpc = { + // _root_.io.grpc.stub.ClientCalls.blockingUnaryCall(channel.newCall(METHOD_GET_BANK_ACCOUNTS_BALANCES, options), request) + //} + // + //override def getCoreTransactionsForBankAccount(request: code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc): code.obp.grpc.api.CoreTransactionsJsonV300Grpc = { + // _root_.io.grpc.stub.ClientCalls.blockingUnaryCall(channel.newCall(METHOD_GET_CORE_TRANSACTIONS_FOR_BANK_ACCOUNT, options), request) + //} + override def build(channel: _root_.io.grpc.Channel, options: _root_.io.grpc.CallOptions): ObpServiceBlockingStub = new ObpServiceBlockingStub(channel, options) } @@ -91,19 +93,19 @@ object ObpServiceGrpc { override def getBanks(request: com.google.protobuf.empty.Empty): scala.concurrent.Future[code.obp.grpc.api.BanksJson400Grpc] = { scalapb.grpc.Grpc.guavaFuture2ScalaFuture(_root_.io.grpc.stub.ClientCalls.futureUnaryCall(channel.newCall(METHOD_GET_BANKS, options), request)) } - - override def getPrivateAccountsAtOneBank(request: code.obp.grpc.api.BankIdUserIdGrpc): scala.concurrent.Future[code.obp.grpc.api.AccountsGrpc] = { - scalapb.grpc.Grpc.guavaFuture2ScalaFuture(_root_.io.grpc.stub.ClientCalls.futureUnaryCall(channel.newCall(METHOD_GET_PRIVATE_ACCOUNTS_AT_ONE_BANK, options), request)) - } - - override def getBankAccountsBalances(request: code.obp.grpc.api.BankIdGrpc): scala.concurrent.Future[code.obp.grpc.api.AccountsBalancesV310JsonGrpc] = { - scalapb.grpc.Grpc.guavaFuture2ScalaFuture(_root_.io.grpc.stub.ClientCalls.futureUnaryCall(channel.newCall(METHOD_GET_BANK_ACCOUNTS_BALANCES, options), request)) - } - - override def getCoreTransactionsForBankAccount(request: code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc): scala.concurrent.Future[code.obp.grpc.api.CoreTransactionsJsonV300Grpc] = { - scalapb.grpc.Grpc.guavaFuture2ScalaFuture(_root_.io.grpc.stub.ClientCalls.futureUnaryCall(channel.newCall(METHOD_GET_CORE_TRANSACTIONS_FOR_BANK_ACCOUNT, options), request)) - } - + + //override def getPrivateAccountsAtOneBank(request: code.obp.grpc.api.BankIdUserIdGrpc): scala.concurrent.Future[code.obp.grpc.api.AccountsGrpc] = { + // scalapb.grpc.Grpc.guavaFuture2ScalaFuture(_root_.io.grpc.stub.ClientCalls.futureUnaryCall(channel.newCall(METHOD_GET_PRIVATE_ACCOUNTS_AT_ONE_BANK, options), request)) + //} + // + //override def getBankAccountsBalances(request: code.obp.grpc.api.BankIdGrpc): scala.concurrent.Future[code.obp.grpc.api.AccountsBalancesV310JsonGrpc] = { + // scalapb.grpc.Grpc.guavaFuture2ScalaFuture(_root_.io.grpc.stub.ClientCalls.futureUnaryCall(channel.newCall(METHOD_GET_BANK_ACCOUNTS_BALANCES, options), request)) + //} + // + //override def getCoreTransactionsForBankAccount(request: code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc): scala.concurrent.Future[code.obp.grpc.api.CoreTransactionsJsonV300Grpc] = { + // scalapb.grpc.Grpc.guavaFuture2ScalaFuture(_root_.io.grpc.stub.ClientCalls.futureUnaryCall(channel.newCall(METHOD_GET_CORE_TRANSACTIONS_FOR_BANK_ACCOUNT, options), request)) + //} + override def build(channel: _root_.io.grpc.Channel, options: _root_.io.grpc.CallOptions): ObpServiceStub = new ObpServiceStub(channel, options) } @@ -116,27 +118,27 @@ object ObpServiceGrpc { serviceImpl.getBanks(request).onComplete(scalapb.grpc.Grpc.completeObserver(observer))( executionContext) })) - .addMethod( - METHOD_GET_PRIVATE_ACCOUNTS_AT_ONE_BANK, - _root_.io.grpc.stub.ServerCalls.asyncUnaryCall(new _root_.io.grpc.stub.ServerCalls.UnaryMethod[code.obp.grpc.api.BankIdUserIdGrpc, code.obp.grpc.api.AccountsGrpc] { - override def invoke(request: code.obp.grpc.api.BankIdUserIdGrpc, observer: _root_.io.grpc.stub.StreamObserver[code.obp.grpc.api.AccountsGrpc]): Unit = - serviceImpl.getPrivateAccountsAtOneBank(request).onComplete(scalapb.grpc.Grpc.completeObserver(observer))( - executionContext) - })) - .addMethod( - METHOD_GET_BANK_ACCOUNTS_BALANCES, - _root_.io.grpc.stub.ServerCalls.asyncUnaryCall(new _root_.io.grpc.stub.ServerCalls.UnaryMethod[code.obp.grpc.api.BankIdGrpc, code.obp.grpc.api.AccountsBalancesV310JsonGrpc] { - override def invoke(request: code.obp.grpc.api.BankIdGrpc, observer: _root_.io.grpc.stub.StreamObserver[code.obp.grpc.api.AccountsBalancesV310JsonGrpc]): Unit = - serviceImpl.getBankAccountsBalances(request).onComplete(scalapb.grpc.Grpc.completeObserver(observer))( - executionContext) - })) - .addMethod( - METHOD_GET_CORE_TRANSACTIONS_FOR_BANK_ACCOUNT, - _root_.io.grpc.stub.ServerCalls.asyncUnaryCall(new _root_.io.grpc.stub.ServerCalls.UnaryMethod[code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc, code.obp.grpc.api.CoreTransactionsJsonV300Grpc] { - override def invoke(request: code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc, observer: _root_.io.grpc.stub.StreamObserver[code.obp.grpc.api.CoreTransactionsJsonV300Grpc]): Unit = - serviceImpl.getCoreTransactionsForBankAccount(request).onComplete(scalapb.grpc.Grpc.completeObserver(observer))( - executionContext) - })) + //.addMethod( + // METHOD_GET_PRIVATE_ACCOUNTS_AT_ONE_BANK, + // _root_.io.grpc.stub.ServerCalls.asyncUnaryCall(new _root_.io.grpc.stub.ServerCalls.UnaryMethod[code.obp.grpc.api.BankIdUserIdGrpc, code.obp.grpc.api.AccountsGrpc] { + // override def invoke(request: code.obp.grpc.api.BankIdUserIdGrpc, observer: _root_.io.grpc.stub.StreamObserver[code.obp.grpc.api.AccountsGrpc]): Unit = + // serviceImpl.getPrivateAccountsAtOneBank(request).onComplete(scalapb.grpc.Grpc.completeObserver(observer))( + // executionContext) + // })) + //.addMethod( + // METHOD_GET_BANK_ACCOUNTS_BALANCES, + // _root_.io.grpc.stub.ServerCalls.asyncUnaryCall(new _root_.io.grpc.stub.ServerCalls.UnaryMethod[code.obp.grpc.api.BankIdGrpc, code.obp.grpc.api.AccountsBalancesV310JsonGrpc] { + // override def invoke(request: code.obp.grpc.api.BankIdGrpc, observer: _root_.io.grpc.stub.StreamObserver[code.obp.grpc.api.AccountsBalancesV310JsonGrpc]): Unit = + // serviceImpl.getBankAccountsBalances(request).onComplete(scalapb.grpc.Grpc.completeObserver(observer))( + // executionContext) + // })) + //.addMethod( + // METHOD_GET_CORE_TRANSACTIONS_FOR_BANK_ACCOUNT, + // _root_.io.grpc.stub.ServerCalls.asyncUnaryCall(new _root_.io.grpc.stub.ServerCalls.UnaryMethod[code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc, code.obp.grpc.api.CoreTransactionsJsonV300Grpc] { + // override def invoke(request: code.obp.grpc.api.BankIdAccountIdAndUserIdGrpc, observer: _root_.io.grpc.stub.StreamObserver[code.obp.grpc.api.CoreTransactionsJsonV300Grpc]): Unit = + // serviceImpl.getCoreTransactionsForBankAccount(request).onComplete(scalapb.grpc.Grpc.completeObserver(observer))( + // executionContext) + // })) .build() def blockingStub(channel: _root_.io.grpc.Channel): ObpServiceBlockingStub = new ObpServiceBlockingStub(channel) diff --git a/obp-api/src/main/scala/code/obp/grpc/logcache/LogCacheStreamServiceImpl.scala b/obp-api/src/main/scala/code/obp/grpc/logcache/LogCacheStreamServiceImpl.scala new file mode 100644 index 0000000000..725e366ba0 --- /dev/null +++ b/obp-api/src/main/scala/code/obp/grpc/logcache/LogCacheStreamServiceImpl.scala @@ -0,0 +1,98 @@ +package code.obp.grpc.logcache + +import code.api.cache.RedisLogger +import code.api.util.APIUtil +import code.api.util.APIUtil.UserOnly +import code.logcache.LogCacheEventBus +import code.obp.grpc.chat.AuthInterceptor +import code.obp.grpc.logcache.api._ +import code.util.Helper.MdcLoggable +import com.google.protobuf.timestamp.Timestamp +import io.grpc.Status +import io.grpc.stub.{ServerCallStreamObserver, StreamObserver} +import net.liftweb.json +import net.liftweb.json.JsonAST.JValue + +/** + * gRPC service implementation for log cache streaming. + * + * Auth: the shared `AuthInterceptor` validates the token at stream open and + * puts the `User` in gRPC Context. Per-level entitlements are checked here + * against the same `canGetSystemLogCache*` roles the REST endpoint uses. + */ +object LogCacheStreamServiceImpl extends LogCacheStreamServiceGrpc.LogCacheStreamService with MdcLoggable { + + private implicit val formats = json.DefaultFormats + + override def streamLogCacheEntries( + request: StreamLogCacheRequest, + responseObserver: StreamObserver[LogCacheEntry] + ): Unit = { + val user = AuthInterceptor.USER_CONTEXT_KEY.get() + if (user == null) { + responseObserver.onError(Status.UNAUTHENTICATED.withDescription("Not authenticated").asRuntimeException()) + return + } + + val internalLevel = LogLevel.toRedis(request.level) match { + case Some(l) => l + case None => + responseObserver.onError(Status.INVALID_ARGUMENT + .withDescription(s"Unknown or unspecified log level: ${request.level}").asRuntimeException()) + return + } + + val requiredRoles = RedisLogger.LogLevel.requiredRoles(internalLevel) + val callContext = Option(AuthInterceptor.CALL_CONTEXT_KEY.get()) + val consumerId = APIUtil.getConsumerPrimaryKey(callContext) + if (!APIUtil.handleAccessControlWithAuthMode("", user.userId, consumerId, requiredRoles, UserOnly)) { + responseObserver.onError(Status.PERMISSION_DENIED + .withDescription(s"Missing entitlement for log level $internalLevel").asRuntimeException()) + return + } + + logger.info(s"LogCacheStreamServiceImpl says: User ${user.userId} subscribed to $internalLevel log cache stream") + + val bridge = new StreamObserver[String] { + override def onNext(jsonPayload: String): Unit = { + try { + val jv = json.parse(jsonPayload) + responseObserver.onNext(jsonToLogCacheEntry(jv)) + } catch { + case e: Throwable => + logger.warn(s"LogCacheStreamServiceImpl says: Failed to parse log cache entry: ${e.getMessage}") + } + } + override def onError(t: Throwable): Unit = responseObserver.onError(t) + override def onCompleted(): Unit = responseObserver.onCompleted() + } + + LogCacheEventBus.subscribe(internalLevel, bridge) + + responseObserver match { + case ssco: ServerCallStreamObserver[_] => + ssco.setOnCancelHandler(() => { + LogCacheEventBus.unsubscribe(internalLevel, bridge) + logger.info(s"LogCacheStreamServiceImpl says: User ${user.userId} unsubscribed from $internalLevel log cache stream") + }) + case _ => + } + } + + private def jsonToLogCacheEntry(jv: JValue): LogCacheEntry = { + val levelStr = (jv \ "level").extractOrElse[String]("") + val levelInt = try { + LogLevel.fromRedis(RedisLogger.LogLevel.valueOf(levelStr)) + } catch { case _: Throwable => LogLevel.UNSPECIFIED } + val ts = (jv \ "ts").extractOrElse[Long](0L) + val timestamp = + if (ts > 0) Some(Timestamp(seconds = ts / 1000L, nanos = ((ts % 1000L) * 1000000L).toInt)) + else None + LogCacheEntry( + level = levelInt, + message = (jv \ "message").extractOrElse[String](""), + timestamp = timestamp, + apiInstanceId = (jv \ "api_instance_id").extractOrElse[String]("") + ) + } +} diff --git a/obp-api/src/main/scala/code/obp/grpc/logcache/api/LogCacheEntry.scala b/obp-api/src/main/scala/code/obp/grpc/logcache/api/LogCacheEntry.scala new file mode 100644 index 0000000000..375af30f4d --- /dev/null +++ b/obp-api/src/main/scala/code/obp/grpc/logcache/api/LogCacheEntry.scala @@ -0,0 +1,169 @@ +// Hand-written to match the scalapb-generated shape used elsewhere in the +// gRPC layer (see chat/api/ChatMessageEvent.scala). No protoc plugin is +// wired into the Maven build. +// +// Protofile syntax: PROTO3 + +package code.obp.grpc.logcache.api + +@SerialVersionUID(0L) +final case class LogCacheEntry( + level: _root_.scala.Int = 0, + message: _root_.scala.Predef.String = "", + timestamp: _root_.scala.Option[com.google.protobuf.timestamp.Timestamp] = _root_.scala.None, + apiInstanceId: _root_.scala.Predef.String = "" + ) extends scalapb.GeneratedMessage with scalapb.Message[LogCacheEntry] with scalapb.lenses.Updatable[LogCacheEntry] { + @transient + private[this] var __serializedSizeCachedValue: _root_.scala.Int = 0 + private[this] def __computeSerializedValue(): _root_.scala.Int = { + var __size = 0 + if (level != 0) { __size += _root_.com.google.protobuf.CodedOutputStream.computeEnumSize(1, level) } + if (message != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(2, message) } + if (timestamp.isDefined) { + val __v = timestamp.get + val __s = __v.serializedSize + __size += 1 + _root_.com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(__s) + __s + } + if (apiInstanceId != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(4, apiInstanceId) } + __size + } + final override def serializedSize: _root_.scala.Int = { + var read = __serializedSizeCachedValue + if (read == 0) { + read = __computeSerializedValue() + __serializedSizeCachedValue = read + } + read + } + def writeTo(`_output__`: _root_.com.google.protobuf.CodedOutputStream): _root_.scala.Unit = { + { + val __v = level + if (__v != 0) { + _output__.writeEnum(1, __v) + } + }; + { + val __v = message + if (__v != "") { + _output__.writeString(2, __v) + } + }; + timestamp.foreach { __v => + _output__.writeTag(3, 2) + _output__.writeUInt32NoTag(__v.serializedSize) + __v.writeTo(_output__) + }; + { val __v = apiInstanceId; if (__v != "") _output__.writeString(4, __v) }; + } + def mergeFrom(`_input__`: _root_.com.google.protobuf.CodedInputStream): code.obp.grpc.logcache.api.LogCacheEntry = { + var __level = this.level + var __message = this.message + var __timestamp = this.timestamp + var __apiInstanceId = this.apiInstanceId + var _done__ = false + while (!_done__) { + val _tag__ = _input__.readTag() + _tag__ match { + case 0 => _done__ = true + case 8 => + __level = _input__.readEnum() + case 18 => + __message = _input__.readString() + case 26 => + __timestamp = Some(_root_.scalapb.LiteParser.readMessage(_input__, __timestamp.getOrElse(com.google.protobuf.timestamp.Timestamp.defaultInstance))) + case 34 => + __apiInstanceId = _input__.readString() + case tag => _input__.skipField(tag) + } + } + code.obp.grpc.logcache.api.LogCacheEntry( + level = __level, + message = __message, + timestamp = __timestamp, + apiInstanceId = __apiInstanceId + ) + } + def withLevel(__v: _root_.scala.Int): LogCacheEntry = copy(level = __v) + def withMessage(__v: _root_.scala.Predef.String): LogCacheEntry = copy(message = __v) + def getTimestamp: com.google.protobuf.timestamp.Timestamp = timestamp.getOrElse(com.google.protobuf.timestamp.Timestamp.defaultInstance) + def clearTimestamp: LogCacheEntry = copy(timestamp = _root_.scala.None) + def withTimestamp(__v: com.google.protobuf.timestamp.Timestamp): LogCacheEntry = copy(timestamp = Some(__v)) + def withApiInstanceId(__v: _root_.scala.Predef.String): LogCacheEntry = copy(apiInstanceId = __v) + def getFieldByNumber(__fieldNumber: _root_.scala.Int): scala.Any = { + (__fieldNumber: @_root_.scala.unchecked) match { + case 1 => { + val __t = level + if (__t != 0) __t else null + } + case 2 => { + val __t = message + if (__t != "") __t else null + } + case 3 => timestamp.orNull + case 4 => { + val __t = apiInstanceId + if (__t != "") __t else null + } + } + } + def getField(__field: _root_.scalapb.descriptors.FieldDescriptor): _root_.scalapb.descriptors.PValue = { + require(__field.containingMessage eq companion.scalaDescriptor) + (__field.number: @_root_.scala.unchecked) match { + case 1 => _root_.scalapb.descriptors.PInt(level) + case 2 => _root_.scalapb.descriptors.PString(message) + case 3 => timestamp.map(_.toPMessage).getOrElse(_root_.scalapb.descriptors.PEmpty) + case 4 => _root_.scalapb.descriptors.PString(apiInstanceId) + } + } + def toProtoString: _root_.scala.Predef.String = _root_.scalapb.TextFormat.printToUnicodeString(this) + def companion = code.obp.grpc.logcache.api.LogCacheEntry +} + +object LogCacheEntry extends scalapb.GeneratedMessageCompanion[code.obp.grpc.logcache.api.LogCacheEntry] { + implicit def messageCompanion: scalapb.GeneratedMessageCompanion[code.obp.grpc.logcache.api.LogCacheEntry] = this + def fromFieldsMap(__fieldsMap: scala.collection.immutable.Map[_root_.com.google.protobuf.Descriptors.FieldDescriptor, scala.Any]): code.obp.grpc.logcache.api.LogCacheEntry = { + require(__fieldsMap.keys.forall(_.getContainingType() == javaDescriptor), "FieldDescriptor does not match message type.") + val __fields = javaDescriptor.getFields + code.obp.grpc.logcache.api.LogCacheEntry( + __fieldsMap.get(__fields.get(0)).map(_.asInstanceOf[_root_.com.google.protobuf.Descriptors.EnumValueDescriptor].getNumber).getOrElse(0), + __fieldsMap.getOrElse(__fields.get(1), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.get(__fields.get(2)).asInstanceOf[_root_.scala.Option[com.google.protobuf.timestamp.Timestamp]], + __fieldsMap.getOrElse(__fields.get(3), "").asInstanceOf[_root_.scala.Predef.String] + ) + } + implicit def messageReads: _root_.scalapb.descriptors.Reads[code.obp.grpc.logcache.api.LogCacheEntry] = _root_.scalapb.descriptors.Reads{ + case _root_.scalapb.descriptors.PMessage(__fieldsMap) => + require(__fieldsMap.keys.forall(_.containingMessage == scalaDescriptor), "FieldDescriptor does not match message type.") + code.obp.grpc.logcache.api.LogCacheEntry( + __fieldsMap.get(scalaDescriptor.findFieldByNumber(1).get).map(_.as[_root_.scala.Int]).getOrElse(0), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(2).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(3).get).flatMap(_.as[_root_.scala.Option[com.google.protobuf.timestamp.Timestamp]]), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(4).get).map(_.as[_root_.scala.Predef.String]).getOrElse("") + ) + case _ => throw new RuntimeException("Expected PMessage") + } + def javaDescriptor: _root_.com.google.protobuf.Descriptors.Descriptor = LogCacheProto.javaDescriptor.getMessageTypes.get(1) + def scalaDescriptor: _root_.scalapb.descriptors.Descriptor = throw new UnsupportedOperationException("scalaDescriptor not available") + def messageCompanionForFieldNumber(__number: _root_.scala.Int): _root_.scalapb.GeneratedMessageCompanion[_] = { + var __out: _root_.scalapb.GeneratedMessageCompanion[_] = null + __number match { + case 3 => __out = com.google.protobuf.timestamp.Timestamp + } + __out + } + lazy val nestedMessagesCompanions: Seq[_root_.scalapb.GeneratedMessageCompanion[_]] = Seq.empty + def enumCompanionForFieldNumber(__fieldNumber: _root_.scala.Int): _root_.scalapb.GeneratedEnumCompanion[_] = throw new MatchError(__fieldNumber) + lazy val defaultInstance = code.obp.grpc.logcache.api.LogCacheEntry( + ) + implicit class LogCacheEntryLens[UpperPB](_l: _root_.scalapb.lenses.Lens[UpperPB, code.obp.grpc.logcache.api.LogCacheEntry]) extends _root_.scalapb.lenses.ObjectLens[UpperPB, code.obp.grpc.logcache.api.LogCacheEntry](_l) { + def level: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Int] = field(_.level)((c_, f_) => c_.copy(level = f_)) + def message: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.message)((c_, f_) => c_.copy(message = f_)) + def timestamp: _root_.scalapb.lenses.Lens[UpperPB, com.google.protobuf.timestamp.Timestamp] = field(_.getTimestamp)((c_, f_) => c_.copy(timestamp = Some(f_))) + def optionalTimestamp: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Option[com.google.protobuf.timestamp.Timestamp]] = field(_.timestamp)((c_, f_) => c_.copy(timestamp = f_)) + def apiInstanceId: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.apiInstanceId)((c_, f_) => c_.copy(apiInstanceId = f_)) + } + final val LEVEL_FIELD_NUMBER = 1 + final val MESSAGE_FIELD_NUMBER = 2 + final val TIMESTAMP_FIELD_NUMBER = 3 + final val API_INSTANCE_ID_FIELD_NUMBER = 4 +} diff --git a/obp-api/src/main/scala/code/obp/grpc/logcache/api/LogCacheProto.scala b/obp-api/src/main/scala/code/obp/grpc/logcache/api/LogCacheProto.scala new file mode 100644 index 0000000000..1f3f1c7cdb --- /dev/null +++ b/obp-api/src/main/scala/code/obp/grpc/logcache/api/LogCacheProto.scala @@ -0,0 +1,79 @@ +package code.obp.grpc.logcache.api + +import com.google.protobuf.DescriptorProtos._ +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.{Label, Type} + +/** + * Proto file descriptor for the log cache streaming service. + * Built programmatically to support gRPC reflection (service discovery). + */ +object LogCacheProto { + + lazy val javaDescriptor: com.google.protobuf.Descriptors.FileDescriptor = { + val fileProto = FileDescriptorProto.newBuilder() + .setName("log_cache.proto") + .setPackage("code.obp.grpc.logcache.g1") + .setSyntax("proto3") + .addDependency("google/protobuf/timestamp.proto") + // LogLevel enum + .addEnumType(EnumDescriptorProto.newBuilder() + .setName("LogLevel") + .addValue(EnumValueDescriptorProto.newBuilder().setName("LOG_LEVEL_UNSPECIFIED").setNumber(0)) + .addValue(EnumValueDescriptorProto.newBuilder().setName("TRACE").setNumber(1)) + .addValue(EnumValueDescriptorProto.newBuilder().setName("DEBUG").setNumber(2)) + .addValue(EnumValueDescriptorProto.newBuilder().setName("INFO").setNumber(3)) + .addValue(EnumValueDescriptorProto.newBuilder().setName("WARNING").setNumber(4)) + .addValue(EnumValueDescriptorProto.newBuilder().setName("ERROR").setNumber(5)) + .addValue(EnumValueDescriptorProto.newBuilder().setName("ALL").setNumber(6)) + ) + // StreamLogCacheRequest + .addMessageType(DescriptorProto.newBuilder() + .setName("StreamLogCacheRequest") + .addField(enumField("level", 1, ".code.obp.grpc.logcache.g1.LogLevel")) + ) + // LogCacheEntry + .addMessageType(DescriptorProto.newBuilder() + .setName("LogCacheEntry") + .addField(enumField("level", 1, ".code.obp.grpc.logcache.g1.LogLevel")) + .addField(stringField("message", 2)) + .addField(messageField("timestamp", 3, ".google.protobuf.Timestamp")) + .addField(stringField("api_instance_id", 4)) + ) + // LogCacheStreamService + .addService(ServiceDescriptorProto.newBuilder() + .setName("LogCacheStreamService") + .addMethod(MethodDescriptorProto.newBuilder() + .setName("StreamLogCacheEntries") + .setInputType(".code.obp.grpc.logcache.g1.StreamLogCacheRequest") + .setOutputType(".code.obp.grpc.logcache.g1.LogCacheEntry") + .setServerStreaming(true) + ) + ) + .build() + + com.google.protobuf.Descriptors.FileDescriptor.buildFrom( + fileProto, + Array(com.google.protobuf.TimestampProto.getDescriptor) + ) + } + + private def stringField(name: String, number: Int): FieldDescriptorProto.Builder = + FieldDescriptorProto.newBuilder() + .setName(name).setNumber(number) + .setType(Type.TYPE_STRING) + .setLabel(Label.LABEL_OPTIONAL) + + private def enumField(name: String, number: Int, typeName: String): FieldDescriptorProto.Builder = + FieldDescriptorProto.newBuilder() + .setName(name).setNumber(number) + .setType(Type.TYPE_ENUM) + .setTypeName(typeName) + .setLabel(Label.LABEL_OPTIONAL) + + private def messageField(name: String, number: Int, typeName: String): FieldDescriptorProto.Builder = + FieldDescriptorProto.newBuilder() + .setName(name).setNumber(number) + .setType(Type.TYPE_MESSAGE) + .setTypeName(typeName) + .setLabel(Label.LABEL_OPTIONAL) +} diff --git a/obp-api/src/main/scala/code/obp/grpc/logcache/api/LogCacheStreamServiceGrpc.scala b/obp-api/src/main/scala/code/obp/grpc/logcache/api/LogCacheStreamServiceGrpc.scala new file mode 100644 index 0000000000..89d25021c5 --- /dev/null +++ b/obp-api/src/main/scala/code/obp/grpc/logcache/api/LogCacheStreamServiceGrpc.scala @@ -0,0 +1,51 @@ +// Hand-written to match the scalapb-generated shape used elsewhere in the +// gRPC layer (see chat/api/ChatStreamServiceGrpc.scala). No protoc plugin +// is wired into the Maven build. +// +// Protofile syntax: PROTO3 + +package code.obp.grpc.logcache.api + +object LogCacheStreamServiceGrpc { + + val METHOD_STREAM_LOG_CACHE_ENTRIES: _root_.io.grpc.MethodDescriptor[code.obp.grpc.logcache.api.StreamLogCacheRequest, code.obp.grpc.logcache.api.LogCacheEntry] = + _root_.io.grpc.MethodDescriptor.newBuilder() + .setType(_root_.io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING) + .setFullMethodName(_root_.io.grpc.MethodDescriptor.generateFullMethodName("code.obp.grpc.logcache.g1.LogCacheStreamService", "StreamLogCacheEntries")) + .setSampledToLocalTracing(true) + .setRequestMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.logcache.api.StreamLogCacheRequest)) + .setResponseMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.logcache.api.LogCacheEntry)) + .build() + + val SERVICE: _root_.io.grpc.ServiceDescriptor = + _root_.io.grpc.ServiceDescriptor.newBuilder("code.obp.grpc.logcache.g1.LogCacheStreamService") + .setSchemaDescriptor(new _root_.scalapb.grpc.ConcreteProtoFileDescriptorSupplier(code.obp.grpc.logcache.api.LogCacheProto.javaDescriptor)) + .addMethod(METHOD_STREAM_LOG_CACHE_ENTRIES) + .build() + + trait LogCacheStreamService extends _root_.scalapb.grpc.AbstractService { + override def serviceCompanion = LogCacheStreamService + + /** Server-side stream: pushes new log cache entries for the requested level */ + def streamLogCacheEntries(request: code.obp.grpc.logcache.api.StreamLogCacheRequest, + responseObserver: _root_.io.grpc.stub.StreamObserver[code.obp.grpc.logcache.api.LogCacheEntry]): Unit + } + + object LogCacheStreamService extends _root_.scalapb.grpc.ServiceCompanion[LogCacheStreamService] { + implicit def serviceCompanion: _root_.scalapb.grpc.ServiceCompanion[LogCacheStreamService] = this + def javaDescriptor: _root_.com.google.protobuf.Descriptors.ServiceDescriptor = + code.obp.grpc.logcache.api.LogCacheProto.javaDescriptor.getServices().get(0) + } + + def bindService(serviceImpl: LogCacheStreamService, executionContext: scala.concurrent.ExecutionContext): _root_.io.grpc.ServerServiceDefinition = + _root_.io.grpc.ServerServiceDefinition.builder(SERVICE) + .addMethod( + METHOD_STREAM_LOG_CACHE_ENTRIES, + _root_.io.grpc.stub.ServerCalls.asyncServerStreamingCall( + new _root_.io.grpc.stub.ServerCalls.ServerStreamingMethod[code.obp.grpc.logcache.api.StreamLogCacheRequest, code.obp.grpc.logcache.api.LogCacheEntry] { + override def invoke(request: code.obp.grpc.logcache.api.StreamLogCacheRequest, + responseObserver: _root_.io.grpc.stub.StreamObserver[code.obp.grpc.logcache.api.LogCacheEntry]): Unit = + serviceImpl.streamLogCacheEntries(request, responseObserver) + })) + .build() +} diff --git a/obp-api/src/main/scala/code/obp/grpc/logcache/api/LogLevel.scala b/obp-api/src/main/scala/code/obp/grpc/logcache/api/LogLevel.scala new file mode 100644 index 0000000000..5348f04af7 --- /dev/null +++ b/obp-api/src/main/scala/code/obp/grpc/logcache/api/LogLevel.scala @@ -0,0 +1,40 @@ +package code.obp.grpc.logcache.api + +import code.api.cache.RedisLogger + +/** + * Constants matching the proto LogLevel enum. The wire field is an int32 + * varint; these are the values clients will see. + * + * See `log_cache.proto` for the canonical definition. Kept in a separate + * file rather than a scalapb-generated enum class to minimise hand-written + * boilerplate. + */ +object LogLevel { + val UNSPECIFIED: Int = 0 + val TRACE: Int = 1 + val DEBUG: Int = 2 + val INFO: Int = 3 + val WARNING: Int = 4 + val ERROR: Int = 5 + val ALL: Int = 6 + + def fromRedis(level: RedisLogger.LogLevel.LogLevel): Int = level match { + case RedisLogger.LogLevel.TRACE => TRACE + case RedisLogger.LogLevel.DEBUG => DEBUG + case RedisLogger.LogLevel.INFO => INFO + case RedisLogger.LogLevel.WARNING => WARNING + case RedisLogger.LogLevel.ERROR => ERROR + case RedisLogger.LogLevel.ALL => ALL + } + + def toRedis(level: Int): Option[RedisLogger.LogLevel.LogLevel] = level match { + case TRACE => Some(RedisLogger.LogLevel.TRACE) + case DEBUG => Some(RedisLogger.LogLevel.DEBUG) + case INFO => Some(RedisLogger.LogLevel.INFO) + case WARNING => Some(RedisLogger.LogLevel.WARNING) + case ERROR => Some(RedisLogger.LogLevel.ERROR) + case ALL => Some(RedisLogger.LogLevel.ALL) + case _ => None + } +} diff --git a/obp-api/src/main/scala/code/obp/grpc/logcache/api/StreamLogCacheRequest.scala b/obp-api/src/main/scala/code/obp/grpc/logcache/api/StreamLogCacheRequest.scala new file mode 100644 index 0000000000..8c6bfec9e5 --- /dev/null +++ b/obp-api/src/main/scala/code/obp/grpc/logcache/api/StreamLogCacheRequest.scala @@ -0,0 +1,99 @@ +// Hand-written to match the scalapb-generated shape used elsewhere in the +// gRPC layer (see chat/api/StreamMessagesRequest.scala). No protoc plugin is +// wired into the Maven build. +// +// Protofile syntax: PROTO3 + +package code.obp.grpc.logcache.api + +@SerialVersionUID(0L) +final case class StreamLogCacheRequest( + level: _root_.scala.Int = 0 + ) extends scalapb.GeneratedMessage with scalapb.Message[StreamLogCacheRequest] with scalapb.lenses.Updatable[StreamLogCacheRequest] { + @transient + private[this] var __serializedSizeCachedValue: _root_.scala.Int = 0 + private[this] def __computeSerializedValue(): _root_.scala.Int = { + var __size = 0 + if (level != 0) { __size += _root_.com.google.protobuf.CodedOutputStream.computeEnumSize(1, level) } + __size + } + final override def serializedSize: _root_.scala.Int = { + var read = __serializedSizeCachedValue + if (read == 0) { + read = __computeSerializedValue() + __serializedSizeCachedValue = read + } + read + } + def writeTo(`_output__`: _root_.com.google.protobuf.CodedOutputStream): _root_.scala.Unit = { + { + val __v = level + if (__v != 0) { + _output__.writeEnum(1, __v) + } + }; + } + def mergeFrom(`_input__`: _root_.com.google.protobuf.CodedInputStream): code.obp.grpc.logcache.api.StreamLogCacheRequest = { + var __level = this.level + var _done__ = false + while (!_done__) { + val _tag__ = _input__.readTag() + _tag__ match { + case 0 => _done__ = true + case 8 => + __level = _input__.readEnum() + case tag => _input__.skipField(tag) + } + } + code.obp.grpc.logcache.api.StreamLogCacheRequest( + level = __level + ) + } + def withLevel(__v: _root_.scala.Int): StreamLogCacheRequest = copy(level = __v) + def getFieldByNumber(__fieldNumber: _root_.scala.Int): scala.Any = { + (__fieldNumber: @_root_.scala.unchecked) match { + case 1 => { + val __t = level + if (__t != 0) __t else null + } + } + } + def getField(__field: _root_.scalapb.descriptors.FieldDescriptor): _root_.scalapb.descriptors.PValue = { + require(__field.containingMessage eq companion.scalaDescriptor) + (__field.number: @_root_.scala.unchecked) match { + case 1 => _root_.scalapb.descriptors.PInt(level) + } + } + def toProtoString: _root_.scala.Predef.String = _root_.scalapb.TextFormat.printToUnicodeString(this) + def companion = code.obp.grpc.logcache.api.StreamLogCacheRequest +} + +object StreamLogCacheRequest extends scalapb.GeneratedMessageCompanion[code.obp.grpc.logcache.api.StreamLogCacheRequest] { + implicit def messageCompanion: scalapb.GeneratedMessageCompanion[code.obp.grpc.logcache.api.StreamLogCacheRequest] = this + def fromFieldsMap(__fieldsMap: scala.collection.immutable.Map[_root_.com.google.protobuf.Descriptors.FieldDescriptor, scala.Any]): code.obp.grpc.logcache.api.StreamLogCacheRequest = { + require(__fieldsMap.keys.forall(_.getContainingType() == javaDescriptor), "FieldDescriptor does not match message type.") + val __fields = javaDescriptor.getFields + code.obp.grpc.logcache.api.StreamLogCacheRequest( + __fieldsMap.get(__fields.get(0)).map(_.asInstanceOf[_root_.com.google.protobuf.Descriptors.EnumValueDescriptor].getNumber).getOrElse(0) + ) + } + implicit def messageReads: _root_.scalapb.descriptors.Reads[code.obp.grpc.logcache.api.StreamLogCacheRequest] = _root_.scalapb.descriptors.Reads{ + case _root_.scalapb.descriptors.PMessage(__fieldsMap) => + require(__fieldsMap.keys.forall(_.containingMessage == scalaDescriptor), "FieldDescriptor does not match message type.") + code.obp.grpc.logcache.api.StreamLogCacheRequest( + __fieldsMap.get(scalaDescriptor.findFieldByNumber(1).get).map(_.as[_root_.scala.Int]).getOrElse(0) + ) + case _ => throw new RuntimeException("Expected PMessage") + } + def javaDescriptor: _root_.com.google.protobuf.Descriptors.Descriptor = LogCacheProto.javaDescriptor.getMessageTypes.get(0) + def scalaDescriptor: _root_.scalapb.descriptors.Descriptor = throw new UnsupportedOperationException("scalaDescriptor not available") + def messageCompanionForFieldNumber(__number: _root_.scala.Int): _root_.scalapb.GeneratedMessageCompanion[_] = throw new MatchError(__number) + lazy val nestedMessagesCompanions: Seq[_root_.scalapb.GeneratedMessageCompanion[_]] = Seq.empty + def enumCompanionForFieldNumber(__fieldNumber: _root_.scala.Int): _root_.scalapb.GeneratedEnumCompanion[_] = throw new MatchError(__fieldNumber) + lazy val defaultInstance = code.obp.grpc.logcache.api.StreamLogCacheRequest( + ) + implicit class StreamLogCacheRequestLens[UpperPB](_l: _root_.scalapb.lenses.Lens[UpperPB, code.obp.grpc.logcache.api.StreamLogCacheRequest]) extends _root_.scalapb.lenses.ObjectLens[UpperPB, code.obp.grpc.logcache.api.StreamLogCacheRequest](_l) { + def level: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Int] = field(_.level)((c_, f_) => c_.copy(level = f_)) + } + final val LEVEL_FIELD_NUMBER = 1 +} diff --git a/obp-api/src/main/scala/code/obp/grpc/metricsstream/MetricsStreamServiceImpl.scala b/obp-api/src/main/scala/code/obp/grpc/metricsstream/MetricsStreamServiceImpl.scala new file mode 100644 index 0000000000..f70f9c3d5f --- /dev/null +++ b/obp-api/src/main/scala/code/obp/grpc/metricsstream/MetricsStreamServiceImpl.scala @@ -0,0 +1,118 @@ +package code.obp.grpc.metricsstream + +import code.api.util.APIUtil +import code.api.util.APIUtil.UserOnly +import code.api.util.ApiRole.canReadMetrics +import code.metricsstream.MetricsEventBus +import code.obp.grpc.chat.AuthInterceptor +import code.obp.grpc.metricsstream.api._ +import code.util.Helper.MdcLoggable +import io.grpc.Status +import io.grpc.stub.{ServerCallStreamObserver, StreamObserver} +import net.liftweb.json +import net.liftweb.json.JsonAST.JValue + +/** + * gRPC service implementation for metrics streaming. + * + * Auth: the shared `AuthInterceptor` validates the token at stream open and + * puts the `User` in gRPC Context. The user must hold `canReadMetrics` + * (same role the REST `/management/metrics` endpoint requires). + * + * Filters: applied server-side in the bridge observer — we still broadcast + * via a single Redis channel, but only call `onNext` on matching events, + * so slow filters don't waste gRPC wire bandwidth on unwanted events. + */ +object MetricsStreamServiceImpl extends MetricsStreamServiceGrpc.MetricsStreamService with MdcLoggable { + + private implicit val formats = json.DefaultFormats + + override def streamMetrics( + request: StreamMetricsRequest, + responseObserver: StreamObserver[MetricEvent] + ): Unit = { + val user = AuthInterceptor.USER_CONTEXT_KEY.get() + if (user == null) { + responseObserver.onError(Status.UNAUTHENTICATED.withDescription("Not authenticated").asRuntimeException()) + return + } + + val callContext = Option(AuthInterceptor.CALL_CONTEXT_KEY.get()) + val consumerId = APIUtil.getConsumerPrimaryKey(callContext) + if (!APIUtil.handleAccessControlWithAuthMode("", user.userId, consumerId, List(canReadMetrics), UserOnly)) { + responseObserver.onError(Status.PERMISSION_DENIED + .withDescription("Missing entitlement canReadMetrics").asRuntimeException()) + return + } + + logger.info(s"MetricsStreamServiceImpl says: User ${user.userId} subscribed to metrics stream (filters: $request)") + + val bridge = new StreamObserver[String] { + override def onNext(jsonPayload: String): Unit = { + try { + val jv = json.parse(jsonPayload) + if (matchesFilters(jv, request)) { + responseObserver.onNext(jsonToMetricEvent(jv)) + } + } catch { + case e: Throwable => + logger.warn(s"MetricsStreamServiceImpl says: Failed to parse metric event: ${e.getMessage}") + } + } + override def onError(t: Throwable): Unit = responseObserver.onError(t) + override def onCompleted(): Unit = responseObserver.onCompleted() + } + + MetricsEventBus.subscribe(bridge) + + responseObserver match { + case ssco: ServerCallStreamObserver[_] => + ssco.setOnCancelHandler(() => { + MetricsEventBus.unsubscribe(bridge) + logger.info(s"MetricsStreamServiceImpl says: User ${user.userId} unsubscribed from metrics stream") + }) + case _ => + } + } + + /** + * All filters AND together. Empty filter field = no restriction. + * url_substring does a simple `contains`; everything else is an exact + * match. + */ + private def matchesFilters(jv: JValue, req: StreamMetricsRequest): Boolean = { + def matchExact(filter: String, actual: String): Boolean = + filter.isEmpty || filter == actual + def matchSubstring(filter: String, actual: String): Boolean = + filter.isEmpty || actual.contains(filter) + + matchExact(req.consumerId, (jv \ "consumer_id").extractOrElse[String]("")) && + matchExact(req.userId, (jv \ "user_id").extractOrElse[String]("")) && + matchExact(req.verb, (jv \ "verb").extractOrElse[String]("")) && + matchSubstring(req.urlSubstring, (jv \ "url").extractOrElse[String]("")) && + matchExact(req.implementedByPartialFunction, (jv \ "implemented_by_partial_function").extractOrElse[String]("")) && + matchExact(req.appName, (jv \ "app_name").extractOrElse[String]("")) + } + + private def jsonToMetricEvent(jv: JValue): MetricEvent = { + MetricEvent( + url = (jv \ "url").extractOrElse[String](""), + date = (jv \ "date").extractOrElse[String](""), + duration = (jv \ "duration").extractOrElse[Long](0L), + userId = (jv \ "user_id").extractOrElse[String](""), + username = (jv \ "username").extractOrElse[String](""), + appName = (jv \ "app_name").extractOrElse[String](""), + developerEmail = (jv \ "developer_email").extractOrElse[String](""), + consumerId = (jv \ "consumer_id").extractOrElse[String](""), + implementedByPartialFunction = (jv \ "implemented_by_partial_function").extractOrElse[String](""), + implementedInVersion = (jv \ "implemented_in_version").extractOrElse[String](""), + verb = (jv \ "verb").extractOrElse[String](""), + statusCode = (jv \ "status_code").extractOrElse[Int](0), + correlationId = (jv \ "correlation_id").extractOrElse[String](""), + sourceIp = (jv \ "source_ip").extractOrElse[String](""), + targetIp = (jv \ "target_ip").extractOrElse[String](""), + apiInstanceId = (jv \ "api_instance_id").extractOrElse[String](""), + operationId = (jv \ "operation_id").extractOrElse[String]("") + ) + } +} diff --git a/obp-api/src/main/scala/code/obp/grpc/metricsstream/api/MetricEvent.scala b/obp-api/src/main/scala/code/obp/grpc/metricsstream/api/MetricEvent.scala new file mode 100644 index 0000000000..779043c206 --- /dev/null +++ b/obp-api/src/main/scala/code/obp/grpc/metricsstream/api/MetricEvent.scala @@ -0,0 +1,296 @@ +// Hand-written to match the scalapb-generated shape used elsewhere in the +// gRPC layer. No protoc plugin is wired into the Maven build. +// +// Protofile syntax: PROTO3 + +package code.obp.grpc.metricsstream.api + +@SerialVersionUID(0L) +final case class MetricEvent( + url: _root_.scala.Predef.String = "", + date: _root_.scala.Predef.String = "", + duration: _root_.scala.Long = 0L, + userId: _root_.scala.Predef.String = "", + username: _root_.scala.Predef.String = "", + appName: _root_.scala.Predef.String = "", + developerEmail: _root_.scala.Predef.String = "", + consumerId: _root_.scala.Predef.String = "", + implementedByPartialFunction: _root_.scala.Predef.String = "", + implementedInVersion: _root_.scala.Predef.String = "", + verb: _root_.scala.Predef.String = "", + statusCode: _root_.scala.Int = 0, + correlationId: _root_.scala.Predef.String = "", + sourceIp: _root_.scala.Predef.String = "", + targetIp: _root_.scala.Predef.String = "", + apiInstanceId: _root_.scala.Predef.String = "", + operationId: _root_.scala.Predef.String = "" + ) extends scalapb.GeneratedMessage with scalapb.Message[MetricEvent] with scalapb.lenses.Updatable[MetricEvent] { + @transient + private[this] var __serializedSizeCachedValue: _root_.scala.Int = 0 + private[this] def __computeSerializedValue(): _root_.scala.Int = { + var __size = 0 + if (url != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(1, url) } + if (date != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(2, date) } + if (duration != 0L) { __size += _root_.com.google.protobuf.CodedOutputStream.computeInt64Size(3, duration) } + if (userId != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(4, userId) } + if (username != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(5, username) } + if (appName != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(6, appName) } + if (developerEmail != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(7, developerEmail) } + if (consumerId != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(8, consumerId) } + if (implementedByPartialFunction != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(9, implementedByPartialFunction) } + if (implementedInVersion != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(10, implementedInVersion) } + if (verb != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(11, verb) } + if (statusCode != 0) { __size += _root_.com.google.protobuf.CodedOutputStream.computeInt32Size(12, statusCode) } + if (correlationId != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(13, correlationId) } + if (sourceIp != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(14, sourceIp) } + if (targetIp != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(15, targetIp) } + if (apiInstanceId != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(16, apiInstanceId) } + if (operationId != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(17, operationId) } + __size + } + final override def serializedSize: _root_.scala.Int = { + var read = __serializedSizeCachedValue + if (read == 0) { + read = __computeSerializedValue() + __serializedSizeCachedValue = read + } + read + } + def writeTo(`_output__`: _root_.com.google.protobuf.CodedOutputStream): _root_.scala.Unit = { + { val __v = url; if (__v != "") _output__.writeString(1, __v) }; + { val __v = date; if (__v != "") _output__.writeString(2, __v) }; + { val __v = duration; if (__v != 0L) _output__.writeInt64(3, __v) }; + { val __v = userId; if (__v != "") _output__.writeString(4, __v) }; + { val __v = username; if (__v != "") _output__.writeString(5, __v) }; + { val __v = appName; if (__v != "") _output__.writeString(6, __v) }; + { val __v = developerEmail; if (__v != "") _output__.writeString(7, __v) }; + { val __v = consumerId; if (__v != "") _output__.writeString(8, __v) }; + { val __v = implementedByPartialFunction; if (__v != "") _output__.writeString(9, __v) }; + { val __v = implementedInVersion; if (__v != "") _output__.writeString(10, __v) }; + { val __v = verb; if (__v != "") _output__.writeString(11, __v) }; + { val __v = statusCode; if (__v != 0) _output__.writeInt32(12, __v) }; + { val __v = correlationId; if (__v != "") _output__.writeString(13, __v) }; + { val __v = sourceIp; if (__v != "") _output__.writeString(14, __v) }; + { val __v = targetIp; if (__v != "") _output__.writeString(15, __v) }; + { val __v = apiInstanceId; if (__v != "") _output__.writeString(16, __v) }; + { val __v = operationId; if (__v != "") _output__.writeString(17, __v) }; + } + def mergeFrom(`_input__`: _root_.com.google.protobuf.CodedInputStream): code.obp.grpc.metricsstream.api.MetricEvent = { + var __url = this.url + var __date = this.date + var __duration = this.duration + var __userId = this.userId + var __username = this.username + var __appName = this.appName + var __developerEmail = this.developerEmail + var __consumerId = this.consumerId + var __implementedByPartialFunction = this.implementedByPartialFunction + var __implementedInVersion = this.implementedInVersion + var __verb = this.verb + var __statusCode = this.statusCode + var __correlationId = this.correlationId + var __sourceIp = this.sourceIp + var __targetIp = this.targetIp + var __apiInstanceId = this.apiInstanceId + var __operationId = this.operationId + var _done__ = false + while (!_done__) { + val _tag__ = _input__.readTag() + _tag__ match { + case 0 => _done__ = true + case 10 => __url = _input__.readString() + case 18 => __date = _input__.readString() + case 24 => __duration = _input__.readInt64() + case 34 => __userId = _input__.readString() + case 42 => __username = _input__.readString() + case 50 => __appName = _input__.readString() + case 58 => __developerEmail = _input__.readString() + case 66 => __consumerId = _input__.readString() + case 74 => __implementedByPartialFunction = _input__.readString() + case 82 => __implementedInVersion = _input__.readString() + case 90 => __verb = _input__.readString() + case 96 => __statusCode = _input__.readInt32() + case 106 => __correlationId = _input__.readString() + case 114 => __sourceIp = _input__.readString() + case 122 => __targetIp = _input__.readString() + case 130 => __apiInstanceId = _input__.readString() + case 138 => __operationId = _input__.readString() + case tag => _input__.skipField(tag) + } + } + code.obp.grpc.metricsstream.api.MetricEvent( + url = __url, + date = __date, + duration = __duration, + userId = __userId, + username = __username, + appName = __appName, + developerEmail = __developerEmail, + consumerId = __consumerId, + implementedByPartialFunction = __implementedByPartialFunction, + implementedInVersion = __implementedInVersion, + verb = __verb, + statusCode = __statusCode, + correlationId = __correlationId, + sourceIp = __sourceIp, + targetIp = __targetIp, + apiInstanceId = __apiInstanceId, + operationId = __operationId + ) + } + def withUrl(__v: _root_.scala.Predef.String): MetricEvent = copy(url = __v) + def withDate(__v: _root_.scala.Predef.String): MetricEvent = copy(date = __v) + def withDuration(__v: _root_.scala.Long): MetricEvent = copy(duration = __v) + def withUserId(__v: _root_.scala.Predef.String): MetricEvent = copy(userId = __v) + def withUsername(__v: _root_.scala.Predef.String): MetricEvent = copy(username = __v) + def withAppName(__v: _root_.scala.Predef.String): MetricEvent = copy(appName = __v) + def withDeveloperEmail(__v: _root_.scala.Predef.String): MetricEvent = copy(developerEmail = __v) + def withConsumerId(__v: _root_.scala.Predef.String): MetricEvent = copy(consumerId = __v) + def withImplementedByPartialFunction(__v: _root_.scala.Predef.String): MetricEvent = copy(implementedByPartialFunction = __v) + def withImplementedInVersion(__v: _root_.scala.Predef.String): MetricEvent = copy(implementedInVersion = __v) + def withVerb(__v: _root_.scala.Predef.String): MetricEvent = copy(verb = __v) + def withStatusCode(__v: _root_.scala.Int): MetricEvent = copy(statusCode = __v) + def withCorrelationId(__v: _root_.scala.Predef.String): MetricEvent = copy(correlationId = __v) + def withSourceIp(__v: _root_.scala.Predef.String): MetricEvent = copy(sourceIp = __v) + def withTargetIp(__v: _root_.scala.Predef.String): MetricEvent = copy(targetIp = __v) + def withApiInstanceId(__v: _root_.scala.Predef.String): MetricEvent = copy(apiInstanceId = __v) + def withOperationId(__v: _root_.scala.Predef.String): MetricEvent = copy(operationId = __v) + def getFieldByNumber(__fieldNumber: _root_.scala.Int): scala.Any = { + (__fieldNumber: @_root_.scala.unchecked) match { + case 1 => { val __t = url; if (__t != "") __t else null } + case 2 => { val __t = date; if (__t != "") __t else null } + case 3 => { val __t = duration; if (__t != 0L) __t else null } + case 4 => { val __t = userId; if (__t != "") __t else null } + case 5 => { val __t = username; if (__t != "") __t else null } + case 6 => { val __t = appName; if (__t != "") __t else null } + case 7 => { val __t = developerEmail; if (__t != "") __t else null } + case 8 => { val __t = consumerId; if (__t != "") __t else null } + case 9 => { val __t = implementedByPartialFunction; if (__t != "") __t else null } + case 10 => { val __t = implementedInVersion; if (__t != "") __t else null } + case 11 => { val __t = verb; if (__t != "") __t else null } + case 12 => { val __t = statusCode; if (__t != 0) __t else null } + case 13 => { val __t = correlationId; if (__t != "") __t else null } + case 14 => { val __t = sourceIp; if (__t != "") __t else null } + case 15 => { val __t = targetIp; if (__t != "") __t else null } + case 16 => { val __t = apiInstanceId; if (__t != "") __t else null } + case 17 => { val __t = operationId; if (__t != "") __t else null } + } + } + def getField(__field: _root_.scalapb.descriptors.FieldDescriptor): _root_.scalapb.descriptors.PValue = { + require(__field.containingMessage eq companion.scalaDescriptor) + (__field.number: @_root_.scala.unchecked) match { + case 1 => _root_.scalapb.descriptors.PString(url) + case 2 => _root_.scalapb.descriptors.PString(date) + case 3 => _root_.scalapb.descriptors.PLong(duration) + case 4 => _root_.scalapb.descriptors.PString(userId) + case 5 => _root_.scalapb.descriptors.PString(username) + case 6 => _root_.scalapb.descriptors.PString(appName) + case 7 => _root_.scalapb.descriptors.PString(developerEmail) + case 8 => _root_.scalapb.descriptors.PString(consumerId) + case 9 => _root_.scalapb.descriptors.PString(implementedByPartialFunction) + case 10 => _root_.scalapb.descriptors.PString(implementedInVersion) + case 11 => _root_.scalapb.descriptors.PString(verb) + case 12 => _root_.scalapb.descriptors.PInt(statusCode) + case 13 => _root_.scalapb.descriptors.PString(correlationId) + case 14 => _root_.scalapb.descriptors.PString(sourceIp) + case 15 => _root_.scalapb.descriptors.PString(targetIp) + case 16 => _root_.scalapb.descriptors.PString(apiInstanceId) + case 17 => _root_.scalapb.descriptors.PString(operationId) + } + } + def toProtoString: _root_.scala.Predef.String = _root_.scalapb.TextFormat.printToUnicodeString(this) + def companion = code.obp.grpc.metricsstream.api.MetricEvent +} + +object MetricEvent extends scalapb.GeneratedMessageCompanion[code.obp.grpc.metricsstream.api.MetricEvent] { + implicit def messageCompanion: scalapb.GeneratedMessageCompanion[code.obp.grpc.metricsstream.api.MetricEvent] = this + def fromFieldsMap(__fieldsMap: scala.collection.immutable.Map[_root_.com.google.protobuf.Descriptors.FieldDescriptor, scala.Any]): code.obp.grpc.metricsstream.api.MetricEvent = { + require(__fieldsMap.keys.forall(_.getContainingType() == javaDescriptor), "FieldDescriptor does not match message type.") + val __fields = javaDescriptor.getFields + code.obp.grpc.metricsstream.api.MetricEvent( + __fieldsMap.getOrElse(__fields.get(0), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(1), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(2), 0L).asInstanceOf[_root_.scala.Long], + __fieldsMap.getOrElse(__fields.get(3), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(4), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(5), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(6), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(7), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(8), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(9), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(10), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(11), 0).asInstanceOf[_root_.scala.Int], + __fieldsMap.getOrElse(__fields.get(12), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(13), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(14), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(15), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(16), "").asInstanceOf[_root_.scala.Predef.String] + ) + } + implicit def messageReads: _root_.scalapb.descriptors.Reads[code.obp.grpc.metricsstream.api.MetricEvent] = _root_.scalapb.descriptors.Reads{ + case _root_.scalapb.descriptors.PMessage(__fieldsMap) => + require(__fieldsMap.keys.forall(_.containingMessage == scalaDescriptor), "FieldDescriptor does not match message type.") + code.obp.grpc.metricsstream.api.MetricEvent( + __fieldsMap.get(scalaDescriptor.findFieldByNumber(1).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(2).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(3).get).map(_.as[_root_.scala.Long]).getOrElse(0L), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(4).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(5).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(6).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(7).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(8).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(9).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(10).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(11).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(12).get).map(_.as[_root_.scala.Int]).getOrElse(0), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(13).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(14).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(15).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(16).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(17).get).map(_.as[_root_.scala.Predef.String]).getOrElse("") + ) + case _ => throw new RuntimeException("Expected PMessage") + } + def javaDescriptor: _root_.com.google.protobuf.Descriptors.Descriptor = MetricsStreamProto.javaDescriptor.getMessageTypes.get(1) + def scalaDescriptor: _root_.scalapb.descriptors.Descriptor = throw new UnsupportedOperationException("scalaDescriptor not available") + def messageCompanionForFieldNumber(__number: _root_.scala.Int): _root_.scalapb.GeneratedMessageCompanion[_] = throw new MatchError(__number) + lazy val nestedMessagesCompanions: Seq[_root_.scalapb.GeneratedMessageCompanion[_]] = Seq.empty + def enumCompanionForFieldNumber(__fieldNumber: _root_.scala.Int): _root_.scalapb.GeneratedEnumCompanion[_] = throw new MatchError(__fieldNumber) + lazy val defaultInstance = code.obp.grpc.metricsstream.api.MetricEvent() + implicit class MetricEventLens[UpperPB](_l: _root_.scalapb.lenses.Lens[UpperPB, code.obp.grpc.metricsstream.api.MetricEvent]) extends _root_.scalapb.lenses.ObjectLens[UpperPB, code.obp.grpc.metricsstream.api.MetricEvent](_l) { + def url: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.url)((c_, f_) => c_.copy(url = f_)) + def date: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.date)((c_, f_) => c_.copy(date = f_)) + def duration: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Long] = field(_.duration)((c_, f_) => c_.copy(duration = f_)) + def userId: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.userId)((c_, f_) => c_.copy(userId = f_)) + def username: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.username)((c_, f_) => c_.copy(username = f_)) + def appName: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.appName)((c_, f_) => c_.copy(appName = f_)) + def developerEmail: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.developerEmail)((c_, f_) => c_.copy(developerEmail = f_)) + def consumerId: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.consumerId)((c_, f_) => c_.copy(consumerId = f_)) + def implementedByPartialFunction: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.implementedByPartialFunction)((c_, f_) => c_.copy(implementedByPartialFunction = f_)) + def implementedInVersion: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.implementedInVersion)((c_, f_) => c_.copy(implementedInVersion = f_)) + def verb: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.verb)((c_, f_) => c_.copy(verb = f_)) + def statusCode: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Int] = field(_.statusCode)((c_, f_) => c_.copy(statusCode = f_)) + def correlationId: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.correlationId)((c_, f_) => c_.copy(correlationId = f_)) + def sourceIp: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.sourceIp)((c_, f_) => c_.copy(sourceIp = f_)) + def targetIp: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.targetIp)((c_, f_) => c_.copy(targetIp = f_)) + def apiInstanceId: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.apiInstanceId)((c_, f_) => c_.copy(apiInstanceId = f_)) + def operationId: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.operationId)((c_, f_) => c_.copy(operationId = f_)) + } + final val URL_FIELD_NUMBER = 1 + final val DATE_FIELD_NUMBER = 2 + final val DURATION_FIELD_NUMBER = 3 + final val USER_ID_FIELD_NUMBER = 4 + final val USERNAME_FIELD_NUMBER = 5 + final val APP_NAME_FIELD_NUMBER = 6 + final val DEVELOPER_EMAIL_FIELD_NUMBER = 7 + final val CONSUMER_ID_FIELD_NUMBER = 8 + final val IMPLEMENTED_BY_PARTIAL_FUNCTION_FIELD_NUMBER = 9 + final val IMPLEMENTED_IN_VERSION_FIELD_NUMBER = 10 + final val VERB_FIELD_NUMBER = 11 + final val STATUS_CODE_FIELD_NUMBER = 12 + final val CORRELATION_ID_FIELD_NUMBER = 13 + final val SOURCE_IP_FIELD_NUMBER = 14 + final val TARGET_IP_FIELD_NUMBER = 15 + final val API_INSTANCE_ID_FIELD_NUMBER = 16 + final val OPERATION_ID_FIELD_NUMBER = 17 +} diff --git a/obp-api/src/main/scala/code/obp/grpc/metricsstream/api/MetricsStreamProto.scala b/obp-api/src/main/scala/code/obp/grpc/metricsstream/api/MetricsStreamProto.scala new file mode 100644 index 0000000000..0b75ed51ae --- /dev/null +++ b/obp-api/src/main/scala/code/obp/grpc/metricsstream/api/MetricsStreamProto.scala @@ -0,0 +1,80 @@ +package code.obp.grpc.metricsstream.api + +import com.google.protobuf.DescriptorProtos._ +import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.{Label, Type} + +/** + * Proto file descriptor for the metrics streaming service. + * Built programmatically to support gRPC reflection (service discovery). + */ +object MetricsStreamProto { + + lazy val javaDescriptor: com.google.protobuf.Descriptors.FileDescriptor = { + val fileProto = FileDescriptorProto.newBuilder() + .setName("metrics_stream.proto") + .setPackage("code.obp.grpc.metricsstream.g1") + .setSyntax("proto3") + // StreamMetricsRequest + .addMessageType(DescriptorProto.newBuilder() + .setName("StreamMetricsRequest") + .addField(stringField("consumer_id", 1)) + .addField(stringField("user_id", 2)) + .addField(stringField("verb", 3)) + .addField(stringField("url_substring", 4)) + .addField(stringField("implemented_by_partial_function", 5)) + .addField(stringField("app_name", 6)) + ) + // MetricEvent + .addMessageType(DescriptorProto.newBuilder() + .setName("MetricEvent") + .addField(stringField("url", 1)) + .addField(stringField("date", 2)) + .addField(int64Field("duration", 3)) + .addField(stringField("user_id", 4)) + .addField(stringField("username", 5)) + .addField(stringField("app_name", 6)) + .addField(stringField("developer_email", 7)) + .addField(stringField("consumer_id", 8)) + .addField(stringField("implemented_by_partial_function", 9)) + .addField(stringField("implemented_in_version", 10)) + .addField(stringField("verb", 11)) + .addField(int32Field("status_code", 12)) + .addField(stringField("correlation_id", 13)) + .addField(stringField("source_ip", 14)) + .addField(stringField("target_ip", 15)) + .addField(stringField("api_instance_id", 16)) + .addField(stringField("operation_id", 17)) + ) + // MetricsStreamService + .addService(ServiceDescriptorProto.newBuilder() + .setName("MetricsStreamService") + .addMethod(MethodDescriptorProto.newBuilder() + .setName("StreamMetrics") + .setInputType(".code.obp.grpc.metricsstream.g1.StreamMetricsRequest") + .setOutputType(".code.obp.grpc.metricsstream.g1.MetricEvent") + .setServerStreaming(true) + ) + ) + .build() + + com.google.protobuf.Descriptors.FileDescriptor.buildFrom(fileProto, Array.empty) + } + + private def stringField(name: String, number: Int): FieldDescriptorProto.Builder = + FieldDescriptorProto.newBuilder() + .setName(name).setNumber(number) + .setType(Type.TYPE_STRING) + .setLabel(Label.LABEL_OPTIONAL) + + private def int32Field(name: String, number: Int): FieldDescriptorProto.Builder = + FieldDescriptorProto.newBuilder() + .setName(name).setNumber(number) + .setType(Type.TYPE_INT32) + .setLabel(Label.LABEL_OPTIONAL) + + private def int64Field(name: String, number: Int): FieldDescriptorProto.Builder = + FieldDescriptorProto.newBuilder() + .setName(name).setNumber(number) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) +} diff --git a/obp-api/src/main/scala/code/obp/grpc/metricsstream/api/MetricsStreamServiceGrpc.scala b/obp-api/src/main/scala/code/obp/grpc/metricsstream/api/MetricsStreamServiceGrpc.scala new file mode 100644 index 0000000000..37bb115451 --- /dev/null +++ b/obp-api/src/main/scala/code/obp/grpc/metricsstream/api/MetricsStreamServiceGrpc.scala @@ -0,0 +1,50 @@ +// Hand-written to match the scalapb-generated shape used elsewhere in the +// gRPC layer. No protoc plugin is wired into the Maven build. +// +// Protofile syntax: PROTO3 + +package code.obp.grpc.metricsstream.api + +object MetricsStreamServiceGrpc { + + val METHOD_STREAM_METRICS: _root_.io.grpc.MethodDescriptor[code.obp.grpc.metricsstream.api.StreamMetricsRequest, code.obp.grpc.metricsstream.api.MetricEvent] = + _root_.io.grpc.MethodDescriptor.newBuilder() + .setType(_root_.io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING) + .setFullMethodName(_root_.io.grpc.MethodDescriptor.generateFullMethodName("code.obp.grpc.metricsstream.g1.MetricsStreamService", "StreamMetrics")) + .setSampledToLocalTracing(true) + .setRequestMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.metricsstream.api.StreamMetricsRequest)) + .setResponseMarshaller(new scalapb.grpc.Marshaller(code.obp.grpc.metricsstream.api.MetricEvent)) + .build() + + val SERVICE: _root_.io.grpc.ServiceDescriptor = + _root_.io.grpc.ServiceDescriptor.newBuilder("code.obp.grpc.metricsstream.g1.MetricsStreamService") + .setSchemaDescriptor(new _root_.scalapb.grpc.ConcreteProtoFileDescriptorSupplier(code.obp.grpc.metricsstream.api.MetricsStreamProto.javaDescriptor)) + .addMethod(METHOD_STREAM_METRICS) + .build() + + trait MetricsStreamService extends _root_.scalapb.grpc.AbstractService { + override def serviceCompanion = MetricsStreamService + + /** Server-side stream: pushes new API metrics as they are written */ + def streamMetrics(request: code.obp.grpc.metricsstream.api.StreamMetricsRequest, + responseObserver: _root_.io.grpc.stub.StreamObserver[code.obp.grpc.metricsstream.api.MetricEvent]): Unit + } + + object MetricsStreamService extends _root_.scalapb.grpc.ServiceCompanion[MetricsStreamService] { + implicit def serviceCompanion: _root_.scalapb.grpc.ServiceCompanion[MetricsStreamService] = this + def javaDescriptor: _root_.com.google.protobuf.Descriptors.ServiceDescriptor = + code.obp.grpc.metricsstream.api.MetricsStreamProto.javaDescriptor.getServices().get(0) + } + + def bindService(serviceImpl: MetricsStreamService, executionContext: scala.concurrent.ExecutionContext): _root_.io.grpc.ServerServiceDefinition = + _root_.io.grpc.ServerServiceDefinition.builder(SERVICE) + .addMethod( + METHOD_STREAM_METRICS, + _root_.io.grpc.stub.ServerCalls.asyncServerStreamingCall( + new _root_.io.grpc.stub.ServerCalls.ServerStreamingMethod[code.obp.grpc.metricsstream.api.StreamMetricsRequest, code.obp.grpc.metricsstream.api.MetricEvent] { + override def invoke(request: code.obp.grpc.metricsstream.api.StreamMetricsRequest, + responseObserver: _root_.io.grpc.stub.StreamObserver[code.obp.grpc.metricsstream.api.MetricEvent]): Unit = + serviceImpl.streamMetrics(request, responseObserver) + })) + .build() +} diff --git a/obp-api/src/main/scala/code/obp/grpc/metricsstream/api/StreamMetricsRequest.scala b/obp-api/src/main/scala/code/obp/grpc/metricsstream/api/StreamMetricsRequest.scala new file mode 100644 index 0000000000..edf1c27415 --- /dev/null +++ b/obp-api/src/main/scala/code/obp/grpc/metricsstream/api/StreamMetricsRequest.scala @@ -0,0 +1,154 @@ +// Hand-written to match the scalapb-generated shape used elsewhere in the +// gRPC layer (see chat/api/StreamMessagesRequest.scala). No protoc plugin is +// wired into the Maven build. +// +// Protofile syntax: PROTO3 + +package code.obp.grpc.metricsstream.api + +@SerialVersionUID(0L) +final case class StreamMetricsRequest( + consumerId: _root_.scala.Predef.String = "", + userId: _root_.scala.Predef.String = "", + verb: _root_.scala.Predef.String = "", + urlSubstring: _root_.scala.Predef.String = "", + implementedByPartialFunction: _root_.scala.Predef.String = "", + appName: _root_.scala.Predef.String = "" + ) extends scalapb.GeneratedMessage with scalapb.Message[StreamMetricsRequest] with scalapb.lenses.Updatable[StreamMetricsRequest] { + @transient + private[this] var __serializedSizeCachedValue: _root_.scala.Int = 0 + private[this] def __computeSerializedValue(): _root_.scala.Int = { + var __size = 0 + if (consumerId != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(1, consumerId) } + if (userId != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(2, userId) } + if (verb != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(3, verb) } + if (urlSubstring != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(4, urlSubstring) } + if (implementedByPartialFunction != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(5, implementedByPartialFunction) } + if (appName != "") { __size += _root_.com.google.protobuf.CodedOutputStream.computeStringSize(6, appName) } + __size + } + final override def serializedSize: _root_.scala.Int = { + var read = __serializedSizeCachedValue + if (read == 0) { + read = __computeSerializedValue() + __serializedSizeCachedValue = read + } + read + } + def writeTo(`_output__`: _root_.com.google.protobuf.CodedOutputStream): _root_.scala.Unit = { + { val __v = consumerId; if (__v != "") _output__.writeString(1, __v) }; + { val __v = userId; if (__v != "") _output__.writeString(2, __v) }; + { val __v = verb; if (__v != "") _output__.writeString(3, __v) }; + { val __v = urlSubstring; if (__v != "") _output__.writeString(4, __v) }; + { val __v = implementedByPartialFunction; if (__v != "") _output__.writeString(5, __v) }; + { val __v = appName; if (__v != "") _output__.writeString(6, __v) }; + } + def mergeFrom(`_input__`: _root_.com.google.protobuf.CodedInputStream): code.obp.grpc.metricsstream.api.StreamMetricsRequest = { + var __consumerId = this.consumerId + var __userId = this.userId + var __verb = this.verb + var __urlSubstring = this.urlSubstring + var __implementedByPartialFunction = this.implementedByPartialFunction + var __appName = this.appName + var _done__ = false + while (!_done__) { + val _tag__ = _input__.readTag() + _tag__ match { + case 0 => _done__ = true + case 10 => __consumerId = _input__.readString() + case 18 => __userId = _input__.readString() + case 26 => __verb = _input__.readString() + case 34 => __urlSubstring = _input__.readString() + case 42 => __implementedByPartialFunction = _input__.readString() + case 50 => __appName = _input__.readString() + case tag => _input__.skipField(tag) + } + } + code.obp.grpc.metricsstream.api.StreamMetricsRequest( + consumerId = __consumerId, + userId = __userId, + verb = __verb, + urlSubstring = __urlSubstring, + implementedByPartialFunction = __implementedByPartialFunction, + appName = __appName + ) + } + def withConsumerId(__v: _root_.scala.Predef.String): StreamMetricsRequest = copy(consumerId = __v) + def withUserId(__v: _root_.scala.Predef.String): StreamMetricsRequest = copy(userId = __v) + def withVerb(__v: _root_.scala.Predef.String): StreamMetricsRequest = copy(verb = __v) + def withUrlSubstring(__v: _root_.scala.Predef.String): StreamMetricsRequest = copy(urlSubstring = __v) + def withImplementedByPartialFunction(__v: _root_.scala.Predef.String): StreamMetricsRequest = copy(implementedByPartialFunction = __v) + def withAppName(__v: _root_.scala.Predef.String): StreamMetricsRequest = copy(appName = __v) + def getFieldByNumber(__fieldNumber: _root_.scala.Int): scala.Any = { + (__fieldNumber: @_root_.scala.unchecked) match { + case 1 => { val __t = consumerId; if (__t != "") __t else null } + case 2 => { val __t = userId; if (__t != "") __t else null } + case 3 => { val __t = verb; if (__t != "") __t else null } + case 4 => { val __t = urlSubstring; if (__t != "") __t else null } + case 5 => { val __t = implementedByPartialFunction; if (__t != "") __t else null } + case 6 => { val __t = appName; if (__t != "") __t else null } + } + } + def getField(__field: _root_.scalapb.descriptors.FieldDescriptor): _root_.scalapb.descriptors.PValue = { + require(__field.containingMessage eq companion.scalaDescriptor) + (__field.number: @_root_.scala.unchecked) match { + case 1 => _root_.scalapb.descriptors.PString(consumerId) + case 2 => _root_.scalapb.descriptors.PString(userId) + case 3 => _root_.scalapb.descriptors.PString(verb) + case 4 => _root_.scalapb.descriptors.PString(urlSubstring) + case 5 => _root_.scalapb.descriptors.PString(implementedByPartialFunction) + case 6 => _root_.scalapb.descriptors.PString(appName) + } + } + def toProtoString: _root_.scala.Predef.String = _root_.scalapb.TextFormat.printToUnicodeString(this) + def companion = code.obp.grpc.metricsstream.api.StreamMetricsRequest +} + +object StreamMetricsRequest extends scalapb.GeneratedMessageCompanion[code.obp.grpc.metricsstream.api.StreamMetricsRequest] { + implicit def messageCompanion: scalapb.GeneratedMessageCompanion[code.obp.grpc.metricsstream.api.StreamMetricsRequest] = this + def fromFieldsMap(__fieldsMap: scala.collection.immutable.Map[_root_.com.google.protobuf.Descriptors.FieldDescriptor, scala.Any]): code.obp.grpc.metricsstream.api.StreamMetricsRequest = { + require(__fieldsMap.keys.forall(_.getContainingType() == javaDescriptor), "FieldDescriptor does not match message type.") + val __fields = javaDescriptor.getFields + code.obp.grpc.metricsstream.api.StreamMetricsRequest( + __fieldsMap.getOrElse(__fields.get(0), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(1), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(2), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(3), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(4), "").asInstanceOf[_root_.scala.Predef.String], + __fieldsMap.getOrElse(__fields.get(5), "").asInstanceOf[_root_.scala.Predef.String] + ) + } + implicit def messageReads: _root_.scalapb.descriptors.Reads[code.obp.grpc.metricsstream.api.StreamMetricsRequest] = _root_.scalapb.descriptors.Reads{ + case _root_.scalapb.descriptors.PMessage(__fieldsMap) => + require(__fieldsMap.keys.forall(_.containingMessage == scalaDescriptor), "FieldDescriptor does not match message type.") + code.obp.grpc.metricsstream.api.StreamMetricsRequest( + __fieldsMap.get(scalaDescriptor.findFieldByNumber(1).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(2).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(3).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(4).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(5).get).map(_.as[_root_.scala.Predef.String]).getOrElse(""), + __fieldsMap.get(scalaDescriptor.findFieldByNumber(6).get).map(_.as[_root_.scala.Predef.String]).getOrElse("") + ) + case _ => throw new RuntimeException("Expected PMessage") + } + def javaDescriptor: _root_.com.google.protobuf.Descriptors.Descriptor = MetricsStreamProto.javaDescriptor.getMessageTypes.get(0) + def scalaDescriptor: _root_.scalapb.descriptors.Descriptor = throw new UnsupportedOperationException("scalaDescriptor not available") + def messageCompanionForFieldNumber(__number: _root_.scala.Int): _root_.scalapb.GeneratedMessageCompanion[_] = throw new MatchError(__number) + lazy val nestedMessagesCompanions: Seq[_root_.scalapb.GeneratedMessageCompanion[_]] = Seq.empty + def enumCompanionForFieldNumber(__fieldNumber: _root_.scala.Int): _root_.scalapb.GeneratedEnumCompanion[_] = throw new MatchError(__fieldNumber) + lazy val defaultInstance = code.obp.grpc.metricsstream.api.StreamMetricsRequest() + implicit class StreamMetricsRequestLens[UpperPB](_l: _root_.scalapb.lenses.Lens[UpperPB, code.obp.grpc.metricsstream.api.StreamMetricsRequest]) extends _root_.scalapb.lenses.ObjectLens[UpperPB, code.obp.grpc.metricsstream.api.StreamMetricsRequest](_l) { + def consumerId: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.consumerId)((c_, f_) => c_.copy(consumerId = f_)) + def userId: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.userId)((c_, f_) => c_.copy(userId = f_)) + def verb: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.verb)((c_, f_) => c_.copy(verb = f_)) + def urlSubstring: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.urlSubstring)((c_, f_) => c_.copy(urlSubstring = f_)) + def implementedByPartialFunction: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.implementedByPartialFunction)((c_, f_) => c_.copy(implementedByPartialFunction = f_)) + def appName: _root_.scalapb.lenses.Lens[UpperPB, _root_.scala.Predef.String] = field(_.appName)((c_, f_) => c_.copy(appName = f_)) + } + final val CONSUMER_ID_FIELD_NUMBER = 1 + final val USER_ID_FIELD_NUMBER = 2 + final val VERB_FIELD_NUMBER = 3 + final val URL_SUBSTRING_FIELD_NUMBER = 4 + final val IMPLEMENTED_BY_PARTIAL_FUNCTION_FIELD_NUMBER = 5 + final val APP_NAME_FIELD_NUMBER = 6 +} diff --git a/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala b/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala index 123397a0b0..053e963e40 100644 --- a/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala +++ b/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala @@ -132,7 +132,8 @@ object MetricsArchiveScheduler extends MdcLoggable { i.getCorrelationId(), i.getResponseBody(), i.getSourceIp(), - i.getTargetIp() + i.getTargetIp(), + i.getApiInstanceId() ) } diff --git a/obp-api/src/main/scala/code/search/search.scala b/obp-api/src/main/scala/code/search/search.scala index e99dddf3d7..27544ab78a 100644 --- a/obp-api/src/main/scala/code/search/search.scala +++ b/obp-api/src/main/scala/code/search/search.scala @@ -267,7 +267,7 @@ class elasticsearchMetrics extends elasticsearch { } } - def indexMetric(userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, correlationId: String) { + def indexMetric(userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, correlationId: String, apiInstanceId: String) { if (APIUtil.getPropsAsBoolValue("allow_elasticsearch", false) && APIUtil.getPropsAsBoolValue("allow_elasticsearch_metrics", false) ) { try { // we must import the dsl @@ -281,7 +281,8 @@ class elasticsearchMetrics extends elasticsearch { "userName" -> userName, "appName" -> appName, "developerEmail" -> developerEmail, - "correlationId" -> correlationId + "correlationId" -> correlationId, + "apiInstanceId" -> apiInstanceId ) } } diff --git a/obp-api/src/test/scala/code/metrics/MetricsTest.scala b/obp-api/src/test/scala/code/metrics/MetricsTest.scala index 2c0f276cd5..cec7f255ea 100644 --- a/obp-api/src/test/scala/code/metrics/MetricsTest.scala +++ b/obp-api/src/test/scala/code/metrics/MetricsTest.scala @@ -27,6 +27,7 @@ class MetricsTest extends ServerSetup with WipeMetrics { val testVerb = "verb" val testSourceIp = "2001:0db8:3c4d:0015:0000:0000:1a2f:1a2b" val testTargetIp = "2001:0db8:3c4d:0015:0000:0000:1a2f:1a2b" + val testApiInstanceId = "test_instance" val testResponseBody: String = """fbdgbdbg}""".stripMargin val dateFormatter = APIUtil.DateWithSecondsFormat @@ -64,7 +65,7 @@ class MetricsTest extends ServerSetup with WipeMetrics { scenario("We save a new API metric") { metrics.saveMetric(testUserId,testUrl1, day1, -1L, testUserName, testAppName, testDeveloperEmail, testConsumerId, testImplementedByPartialFunction, - testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp) + testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp, testApiInstanceId) MetricBatchWriter.flush() val byUrl = metrics.getAllMetrics(List(OBPLimit(limit))).groupBy(_.getUrl()) @@ -82,16 +83,16 @@ class MetricsTest extends ServerSetup with WipeMetrics { scenario("Group all metrics by url") { metrics.saveMetric(testUserId, testUrl1, day1, -1L, testUserName, testAppName, testDeveloperEmail, testConsumerId, testImplementedByPartialFunction, - testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp) + testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp, testApiInstanceId) metrics.saveMetric(testUserId, testUrl1, day1, -1L, testUserName, testAppName, testDeveloperEmail, testConsumerId, testImplementedByPartialFunction, - testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp) + testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp, testApiInstanceId) metrics.saveMetric(testUserId, testUrl1, day2, -1L, testUserName, testAppName, testDeveloperEmail, testConsumerId, testImplementedByPartialFunction, - testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp) + testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp, testApiInstanceId) metrics.saveMetric(testUserId, testUrl2, day2, -1L, testUserName, testAppName, testDeveloperEmail, testConsumerId, testImplementedByPartialFunction, - testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp) + testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp, testApiInstanceId) MetricBatchWriter.flush() val byUrl = metrics.getAllMetrics(List(OBPLimit(limit1))).groupBy(_.getUrl()) @@ -112,16 +113,16 @@ class MetricsTest extends ServerSetup with WipeMetrics { scenario("Group all metrics by day") { metrics.saveMetric(testUserId, testUrl1, day1, -1L, testUserName, testAppName, testDeveloperEmail, testConsumerId, testImplementedByPartialFunction, - testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp) + testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp, testApiInstanceId) metrics.saveMetric(testUserId, testUrl1, day1, -1L, testUserName, testAppName, testDeveloperEmail, testConsumerId, testImplementedByPartialFunction, - testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp) + testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp, testApiInstanceId) metrics.saveMetric(testUserId, testUrl1, day2, -1L, testUserName, testAppName, testDeveloperEmail, testConsumerId, testImplementedByPartialFunction, - testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp) + testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp, testApiInstanceId) metrics.saveMetric(testUserId, testUrl2, day2, -1L, testUserName, testAppName, testDeveloperEmail, testConsumerId, testImplementedByPartialFunction, - testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp) + testVersion, testVerb, None, getCorrelationId(), testResponseBody, testSourceIp , testTargetIp, testApiInstanceId) MetricBatchWriter.flush() val byDay = metrics.getAllMetrics(List(OBPLimit(limit2))).groupBy(APIMetrics.getMetricDay)