Популярные вопросы по тегу PYSPARK

PySpark разница между pyspark.sql.functions.col и pyspark.sql.functions.lit

... вумя 5 методами из pyspark.sql.functions, поскольку документация на 4 официальном сайте PySpark не очень информативна. Например, следующий 3 код: import pyspark.sql.functions as F print(F.col('col_name')) print(F.lit('col_name')) Результаты: C ...

Как найти медиану и квантили с помощью Spark

... ора и определения 16 медианы. Этот вопрос аналогичен этому вопросу. Однако 15 ответ на вопрос - использование Scala, которого 14 я не знаю. How can I calculate exact median with Apache Spark? Используя размышления для ответа 13 Scala, я пытаюсь написать аналогичный ответ 12 на Python. Я знаю, что сначала хочу отсортировать 11 RDD. Я не знаю как. Я вижу методы sortBy (сортирует 10 этот RDD по заданному keyfunc) ...

Как распечатать только определенный столбец DataFrame в PySpark?

... eError: объект 'Column' не вызывается и 3 это: df[df.col].take(2) дает pyspark.sql.utils.AnalysisException: u 2 "выражение фильтра 'col' строки типа не 1 является логич ...

Pyspark: TaskMemoryManager: не удалось выделить страницу: нужна помощь в анализе ошибок

... (TaskMemoryManager), и 11 из всего 16 ГБ процесс потреблял не более 10 6 ГБ, оставляя 9 + ГБ свободными. Также 9 я установил память драйвера как 10G. так 8 что проход. Но когда я выполняю count() или 7 show() в моем последнем фрейме данных, операция 6 прошла успешно. Но при выполнении toCsv 5 он выдает указанные выше ошибки / предупреждения. На 4 самом деле не понимаю / не догадываюсь, что 3 может быть причиной проблемы. Помогите ...

Когда использовать mapParitions и mapPartitionsWithIndex?

... функции: mapPartitions(f, preservesPartitioning=False) Return a new RDD by applying a function to each partition of this RDD. >>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> def f(iterator): yield sum(iterator) >>> rdd.mapPartitions(f).collect() [3, 7] И 3 ... mapPartitionsWithIndex(f, prese ...

Как получить определение схемы из фрейма данных в PySpark?

... StructField("pressure", DoubleType(), True), StructField("pressure_unit", StringType(), True) ]) Для некоторых источников 10 данных можно вывести схему из источника 9 данных и получить фрейм данных с этим определением 8 схемы. Можно ли получить определение схемы 7 (в форме, описанной выше) из фрейма данных, где 6 ...

Сериализуйте настраиваемый преобразователь с помощью Python для использования в конвейере Pyspark ML.

... park ML не предоставляет возможности 5 для сохранения настраиваемого преобразователя, написанного 4 на Python, каковы другие варианты, чтобы 3 это сделать? Как я мо ...

to_date не может проанализировать дату в Spark 3.0

... ксическом анализаторе. Вы 9 можете установить для spark.sql.legacy.timeParserPolicy 8 значение LEGACY, чтобы восстановить поведение 7 до Spark 3.0, или установить значение CORRECTED 6 и рассматривать его как недопустимую строку 5 даты и времени. Исключение предполагает, что 4 мне следует использовать устаревший анализатор 3 времени, для начала я не знаю, как установить 2 его на устаревший. Вот моя реализация dfWithDate = df.with ...

Чтение / запись одного файла в DataBricks

... ку «Нет такого файла или каталога» Итак, я 6 попытался обернуть свое новое имя в фрейм 5 данных и добавить его в существующий файл, но 4 это также не сработало, поскольку dataframe.write.save 3 предназначен для записи в папки Какой самый 2 простой питон я мог бы использовать для 1 добавлен ...

Чтение файлов CSV с полями в кавычках, содержащими встроенные запятые

... ,'col14').show() +------------------+--------------------+--------------------+ | col12| col13| col14| +------------------+--------------------+--------------------+ |"32 XIY ""W"" JK| RE LK"|SOMETHINGLIKEAPHE...| | null|OUTKAST#THROOTS~W...| 0.0| +------------------+--------------------+--------------------+ Содержим ...

Пользовательский разделитель csv reader spark

... 5 я могу реализовать это при использовании 4 spark.read.csv()? CSV слишком велик для использования 3 pandas, потому что чтение этого файла занимает 2 много времени. ...

Преобразование pyspark.sql.dataframe.DataFrame типа Dataframe в словарь

... Код 4 ниже воспроизводится: from pyspark.sql import Row rdd = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)]) df = rdd.toDF() Пол ...

Как вы отображаете отсортированные имена столбцов Dataframe?

... ли получить 4 его с помощью df.columns, есть ли способ получить 3 имена столбцов (только имена столбцов, а 2 не содержимое столбцов) в отсортированном 1 пор ...

Удаление повторяющихся столбцов после соединения DF в Spark

... вы не можете вызвать столбец id, потому что 10 он неоднозначен, и вы получите следующее 9 исключение: pyspark.sql.utils.AnalysisException: "Reference 'id' is ambiguous, could be: id#5691, id#5918.;" Это делает id непригодным для использования 8 ... Следующая функция решает проблему: def join(df1, df2, cond, ho ...

PySpark Drop Rows

... ая 6 строка, поскольку она обычно содержит имена 5 столбцов в моих наборах данных. Изучая API, я 4 не могу найти простого способа сделать это. Конечно, я 3 мог ...

PySpark groupby и выбор максимального значения

... hi 30/11/2016 panda Delhi 29/11/2016 brata BBSR 28/11/2016 brata Goa 30/10/2016 brata Goa 30/10/2016 Мне 17 нужно найти наиболее предпочтительный ГОРОД 16 для каждого имени, а логика такова: «взять 15 город как fav_city, если у города есть максимальное 14 количест ...

Pyspark в режиме пряжи-кластера

... это нужно таким 5 образом, потому что я интегрирую этот код 4 в веб-приложение django. Когда я пытаюсь 3 запустить любой скрипт в режиме пряжи-кластера, я 2 получаю следующую ошибку: org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkCon ...

Как запустить несколько заданий в одном Sparkcontext из отдельных потоков в PySpark?

... ить 10 для оценки этого действия. Планировщик Spark 9 является полностью потокобезопасным и поддерживает 8 этот вариант использования для включения 7 приложений, которые обслуживают несколько 6 запросов (например, запросы для нескольких 5 пользователей) ". Мне удалось найти 4 несколько примеров такого же кода на Scala 3 и Jav ...

PySpark - суммирует столбец в фрейме данных и возвращает результаты как int

... столбцом 6 чисел. Мне нужно суммировать этот столбец, а 5 затем вернуть результат как int в переменной 4 python. df = spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "Number"]) Я делаю следующее, чтобы суммировать 3 столбец. df.groupBy().sum() Но я получаю ф ...

SparkSQL на pyspark: как сгенерировать временные ряды?

... start | stop ------------------------- 2000-01-01 | 2000-01-05 2012-03-20 | 2012-03-23 В PostgreSQL 4 это сделать очень просто: SELECT generate_series(start, stop, '1 day'::interval)::date AS dt FROM my_table , и он сгенерирует 3 эту таблицу: dt ------------ 2000-01-01 2000-01-02 2 ...

Pyspark, добавьте символ в середину строки

... я хочу, чтобы это стало так: Hour 00:45 23:22 Для того, чтобы 2 после преврат ...

Ошибка инициализации SparkSession - невозможно использовать spark.read

... ateOrReplaceTempView('tempTable') sqlContext.sql("create table customer.temp as select * from tempTable") И я получаю сообщение 7 об ошибке: dfRaw = spark.read.csv ("hdfs: / user 6 /../ test.csv", header = False) AttributeError: объект 5 'Builder' не имеет атрибута 'read' Как правильно 4 настроить объект сеанса Spark для использования 3 команды read.csv? Кроме того ...

Pyspark: как дублировать строку n раз в кадре данных?

... 1 2 9 1 3 8 2 4 1 1 5 3 3 И трансформируем так: A B n 1 2 1 2 9 1 3 8 2 3 8 2 ...

Pyspark: фильтрация фрейма данных, если столбец содержит строку из другого столбца (оператор SQL LIKE)

... ark.sql.types import * PN_in_NC = (df .filter(df.long_text.like(concat(lit("%"), df.number, lit("%")))))) Я 6 получаю следующую ошибку: Method like([class org.apache.spark.sql.Column]) does not exist. Я пробовал несколько 5 способов исправить это (например, создать 4 строку '%number%' в качестве столбца перед фильтром, н ...

Как указать путь к классу драйвера при использовании pyspark в ноутбуке jupyter?

... е. Я 10 использую Spark 2.3.1 и Python 3.6.3 и могу 9 подключиться к базе данных из оболочки pyspark, если 8 укажу местоположение jar. pyspark --driver-class-path /home/.../postgresql.jar --jars /home/.../jars/postgresql.jar Спасибо всем, кто 7 может помочь мне в ...

Опишите фрейм данных в PySpark

... ribe()). Я пытался сделать следующее: file_pd = file.toPandas() file_pd.describe() , но, очевидно, для 3 этого потребуется загрузить все данные в 2 память, и это не удастся. Может ли кто-нибудь 1 предложить обходной пу ...

Как создать таблицу как select в pyspark.sql

... st.analysis.CheckAnalysis 49 $ class.failAnalysis (CheckAnalysis.scala: 40) в org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis 48 (Analyzer.scala: 58) в org.apache.spark.sql.catalyst.analysis.CheckAnalysis 47 $$ anonfun $ checkAnalysis $ 1.apply (CheckAnalysis.scala: 374) в org.apache.spark.sql.catalyst.a ...

E-num / получить манекены в pyspark

... создать функцию в PYSPARK, которая будет 5 получать Dataframe и список параметров (коды 4 / категориальные функции) и возвращать фрейм 3 данных с дополнительными фиктивными столбцами, такими 2 как категории функций в списке. PFA до и 1 после DF: before and After data frame- Example ...

Добавить Jar в автономный pyspark

... строку, я 3 могу добавить пакет следующим образом: $ pyspark/spark-submit --packages com.databricks:spark-csv_2.10:1.3.0 Но я не использую ничего из этого. Программа является частью более крупного рабочего процесса, в котором не используется spark-submit. Я 2 смогу запустить свою программу ./foo.py, и 1 она должна просто работать. Я знаю, что вы можете установить свойства Spark для extraC ...

pyspark collect_set или collect_list с помощью groupby

... ать collect_set или collect_list в кадре данных после 1 groupby. например: df.groupby('key').collect_ ...

Исключение: java.lang.Exception: при работе с основной пряжей в среде должен быть установлен HADOOP_CONF_DIR или YARN_CONF_DIR. в искре

... --jars C:\DependencyJars\spark-streaming-eventhubs_2.11-2.0.3.jar,C:\DependencyJars\scalaj-http_2.11-2.3.0.jar,C:\DependencyJars\config-1.3.1.jar,C:\DependencyJars\commons-lang3-3.3.2.jar --conf spark.driver.userClasspathFirst=true --conf spark.executor.extraClassPath=C:\DependencyJars\commons-lang3-3.3.2.jar --conf spark.executor.userClasspathFirst=true --class "GeoLogConsumerRT" C:\sbtazure\target\scala-2 ...

Использование pyspark для подключения к PostgreSQL

... "(SELECT * FROM talent LIMIT 1000) as blah", password = "MichaelJordan", user = "ScottyPippen", source = "jdbc", driver = "org.postgresql.Driver" ) и я получаю ...

Pyspark: передача нескольких столбцов в UDF

... ем, которая 11 будет принимать все столбцы, кроме первого 10 в кадре данных, и выполнять суммирование 9 (или любую другую операцию). Теперь фрейм 8 данных может иногда име ...

PySpark: добавить новое поле в элемент строки фрейма данных

... очу 2 добавить новое поле в a, чтобы a выглядел 1 так: a = Row(ts=1465326926253, myid=u'12 ...

Каков наиболее эффективный способ сокращения фрейма данных в pyspark?

... ми, которые выглядят так: ['station_id', 'country', 'temperature', 'time'] ['12', 'usa', '22', '12:04:14'] Я 4 хочу отображать среднюю т ...

Как объединить / добавить несколько столбцов данных Spark в Pyspark?

... ов данных Pyspark? Я погуглил и не 2 нашел подходящего решения. DF1 var1 3 4 5 DF2 var2 var3 23 31 44 45 52 53 Expected output dataf ...

Как найти максимальное значение в паре RDD?

... , (b,2), (c,1), (d,3)) Как найти ключ с наибольшим 3 счетчиком с помощью Spark Scala API? РЕДАКТИРОВАТЬ: тип 2 данных пары RDD - org.a ...

Python / Pyspark - счетчик NULL, пустой и NaN

... то сообщение об ошибке: TypeError: 'Column' object is not callable Кто-нибудь 2 знает, в чем может быть проблема? Заранее 1 большое спас ...

Pyspark: Сериализованная задача превышает максимально допустимое значение. Рассмотрите возможность увеличения spark.rpc.message.maxSize или использования широковещательных переменных для больших значений.

... g("spark.dynamicAllocation.maxExecutors", "12") .config("spark.driver.maxResultSize", "3g") .config("spark.kryoserializer.buffer.max.mb", "2047mb") .config("spark.rpc.message.maxSize", "1000mb") .getOrCreate()) Я также попытался переразбить свой 3 фрейм данных, используя: dfscoring=dfscoring.repartition(100) но вс ...

Система не может найти ошибку указанного пути при запуске pyspark

... После 10 загрузки я выполнил шаги, упомянутые здесь 9 pyspark installation for windows 10. Я использовал комментарий bin \ pyspark 8 для запуска искры и получил сообщение об 7 ошибке The system cannot find the path specified Прикрепленный снимок экрана с сообщением 6 ...

PySpark: добавить новый столбец с кортежем, созданным из столбцов

... атафрейм, созданный следующим 7 образом: df = spark.createDataFrame([('a',5,'R','X'),('b',7,'G','S'),('c',8,'G','S')], ["Id","V1","V2","V3"]) Похоже +---+---+---+---+ | Id| V1| V2| V3| +---+---+---+---+ | a| 5| R| X| | b| 7| G| S| | c| 8| G| S| +---+---+---+---+ Я хочу добавить столбец, представляющий 6 собой кортеж, состоящий из V1, V2, V3. Результат 5 должен выглядеть как +---+---+---+---+------- ...

Как преобразовать Spark RDD в pandas dataframe в ipython?

... rame мы можем сделать df = rdd1.toDF() Но я хочу преобразовать 1 RDD в pandas dataframe, а не в обычный d ...

Как сохранить фрейм данных в файл рассола с помощью Pyspark

... нужно сохранить фрейм данных в файл 1 Pickle, но он возвращает ...

Столбец Pivot String в Pyspark Dataframe

... rame(rdd, ["id","type", "cost", "date", "ship"]) df_data.show() +---+----+----+------+----+ | id|type|cost| date|ship| +---+----+----+------+----+ | 0| A| 223|201603|PORT| | 0| A| 22|201602|PORT| | 0| A| 422|201601|DOCK| | 1| B|3213|201602|DOCK| | 1| B|3213|201601|PORT| | 2| C|2321|201601|DOCK| +---+----+----+------+----+ и ...

Низкая скорость записи JDBC из Spark в MySQL

... о. Как я могу 1 это улучшить? Код ниже: df = sqlContext.createDataFrame(rdd, schema) df.write.jd ...

Настраиваемая агрегация на фреймах данных PySpark

... 0 1 0]] Мне нужен вывод в виде строки: ["1234", [ 1 1 0]], чтобы 3 вектор представлял собой сумму всех векторов, сгруппированных 2 по userid. Как мне этого добиться? Операция агрегирования 1 суммы ...

pyspark генерирует хэш строки определенных столбцов и добавляет его как новый столбец

... )) concat_str = concat_str[2:] #preserve concatenated value for testing (this can be removed later) row_dict["sha_values"] = concat_str row_dict["sha_hash"] = hashlib.sha256(concat_str).hexdigest() return Row(**row_dict) Затем 4 передано как: df1.rdd.map(lambda row: sha_concat(row,hash_col)).toDF().show(truncate=False) Однако теперь он выдает ошибку: UnicodeEncodeError: 'ascii' codec can't encode character u'\uff ...

Запись фрейма данных pyspark в один файл json с определенным именем

... writing in folder 'file_name.json' and files with part-XXX df2.toJSON().saveAsTextFile('/path/file_name.json') # didnt work, writing in folder 'file ...

findspark.init() IndexError: ошибка индекса списка вне допустимого диапазона

... lt;module>() 1 import findspark ----> 2 findspark.init() 3 4 import pyspark /.../anaconda/envs/pyspark/lib/python3.5/site-packages/findspark.py in init(spark_home, python_path, edit_rc, edit_profile) 132 # add pyspark to sys.path 133 spark_python = os.pat ...

Добавление столбца с постоянным значением в фрейм данных Spark

... 5 меня есть фрейм данных с именем wamp, в который 4 я хочу добавить столбец с именем region, который 3 должен принимать постоянное значение NE. Однако 2 я получаю сообщение об ошибке NameError: name 'lit' is n ...