You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am from dotnet background and new to scala and spark.
We have written some code in scala which process logs from Kafka ,do some modification and re save in Kafka different topic.
I am able to read the data from Kafka and send it back to another topic with minor processing.
I have to do a bit of complex processing in which I have to read the settings from postgres, log the progress in file but when i try to do I get Errors. Following is my code:
SparkConf config = new SparkConf().SetAppName(Constants.SparkAppName).Set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
SparkSession spark =
SparkSession
.Builder()
.Config(config)
.GetOrCreate();
var df = spark.Read().Format("kafka").Option("kafka.bootstrap.servers", "localhost:9092").Option("subscribe", "test").Load();
var t = df.SelectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(offset AS STRING)", "CAST(topic AS STRING)");
Func<Column, Column> udfFunction = Udf<string, string>((str1) =>
{
string colValue = str1.ToString();
var newData = ProcessData(colValue, connString);
return newData;
});
t = t.WithColumn("ProcessedValue", udfFunction(Column("value")));
// var query = t.SelectExpr("CAST(key AS STRING)", "CAST( ProcessedValue AS STRING) as value").WriteStream().Format("kafka").Option("kafka.bootstrap.servers", "localhost:9092").Option("topic", "sparkoutput").Option("checkpointLocation", "checkpoint").Start();
// query.AwaitTermination();
t.SelectExpr("CAST(key AS STRING)", "CAST( ProcessedValue AS STRING) as value").Write().Format("kafka").Option("kafka.bootstrap.servers", "localhost:9092").Option("topic", "sparkoutput").Save();
spark.Stop();
static string ProcessData(string inputJson, string connString)
{
// logger.Information("[ProcessData] starting");
//logger.Error("Insde process data");
inputJson = inputJson + " " + inputJson.Length.ToString() + "***";
using (var conn = new NpgsqlConnection(connString))
{
try
{
conn.Open();
using (var cmd = new NpgsqlCommand("SELECT * FROM \"tablename\"", conn))
{
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
var data = reader.GetString(1);
inputJson = inputJson + " " + data;
}
}
}
}
catch (System.Exception er)
{
Console.WriteLine(er);
}
finally
{
conn.Close();
}
}
return inputJson;
}
As you can see from the above code I am connecting to Kafka reading the data ,trying to process the data using UDF and then sending it back to Kafka.
Currently I am using Read for testing but in real time I will use ReadStream.
When it tries to postgres I get below error:
org.apache.spark.api.python.PythonException: System.TypeInitializationException: The type initializer for 'Npgsql.NpgsqlConnection' threw an exception.
---> System.TypeLoadException: Could not load type 'System.Runtime.CompilerServices.DefaultInterpolatedStringHandler' from assembly 'System.Runtime, Version=4.2.2.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a'.
at Npgsql.NpgsqlConnectionStringBuilder.set_InternalCommandTimeout(Int32 value)
at Npgsql.NpgsqlConnectionStringBuilder.Init()
at Npgsql.NpgsqlConnectionStringBuilder..ctor()
at Npgsql.NpgsqlConnection..cctor()
--- End of inner exception stack trace ---
at Npgsql.NpgsqlConnection..ctor()
at Npgsql.NpgsqlConnection..ctor(String connectionString)
at WiseLakeSystemUsage.Program.ProcessData(String inputJson, String connString) in C:\Projects\SystemUsageProcessorDotnet\SYSTEMUSAGEPROCESSOR\Program.cs:line 99
I need to connect to Postgres because there is some dynamic configuration which I need to fetch and based on that I have to process the Kafka stream.
How do I connect to Postgres to fetch data?
Second Issue is that I tried to use Serilog/Nlog and both of them threw error saying that it is not serailizable. How can I log the progress in UDF in file system.
Any idea can be appreciated?
Project is Console dotnet core 6
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
HI
I am from dotnet background and new to scala and spark.
We have written some code in scala which process logs from Kafka ,do some modification and re save in Kafka different topic.
I am able to read the data from Kafka and send it back to another topic with minor processing.
I have to do a bit of complex processing in which I have to read the settings from postgres, log the progress in file but when i try to do I get Errors. Following is my code:
As you can see from the above code I am connecting to Kafka reading the data ,trying to process the data using UDF and then sending it back to Kafka.
Currently I am using Read for testing but in real time I will use ReadStream.
When it tries to postgres I get below error:
I need to connect to Postgres because there is some dynamic configuration which I need to fetch and based on that I have to process the Kafka stream.
How do I connect to Postgres to fetch data?
Second Issue is that I tried to use Serilog/Nlog and both of them threw error saying that it is not serailizable. How can I log the progress in UDF in file system.
Any idea can be appreciated?
Project is Console dotnet core 6
Beta Was this translation helpful? Give feedback.
All reactions